Definition
Reduce_aggis the only lambda aggregate Presto function. It allows users to define arbitrary aggregation logic using 2 lambda functions.reduce_agg(inputValue T, initialState S, inputFunction(S, T, S), combineFunction(S, S, S)) → S
Reduces all non-NULL input values into a single value. inputFunction will be invoked for
each non-NULL input value. If all inputs are NULL, the result is NULL. In addition to taking
the input value, inputFunction takes the current state, initially initialState, and returns the
new state. combineFunction will be invoked to combine two states into a new state. The final
state is returned. Throws an error if initialState is NULL or inputFunction or combineFunction
returns a NULL.
Once can think of reduce_agg as using inputFunction to implement partial aggregation and combineFunction to implement final aggregation. Partial aggregation processes a list of input values and produces an intermediate state:
auto s = initialState;
for (auto x : input) {
s = inputFunction(s, x);
}
return s;
Final aggregation processes a list of intermediate states and computes the final state.
auto s = intermediates[0];
for (auto i = 1; i < intermediates.size(); ++i)
s = combineFunction(s, intermediates[i]);
}
return s;
For example, one can implement SUM aggregation using reduce_agg as follows:
reduce_agg(c, 0, (s, x) -> s + x, (s, s2) -> s + s2)
Implementation of AVG aggregation is a bit trickier. For AVG, state is a tuple of sum and count. Hence, reduce_agg can be used to compute (sum, count) pair, but it cannot compute the actual average. One needs to apply a scalar function on top of reduce_agg to get the average.
SELECT id, sum_and_count.sum / sum_and_count.count FROM (
SELECT id, reduce_agg(value, CAST(row(0, 0) AS row(sum double, count bigint)),
(s, x) -> CAST(row(s.sum + x, s.count + 1) AS row(sum double, count bigint)),
(s, s2) -> CAST(row(s.sum + s2.sum, s.count + s2.count) AS row(sum double, count bigint))) AS sum_and_count
FROM t
GROUP BY id
);
The examples of using reduce_agg to compute SUM and AVG are for illustrative purposes. One should not use reduce_agg if a specialized aggregation function is available.
One use case for reduce_agg we see in production is to compute a product of input values.
reduce_agg(c, 1.0, (s, x) -> s * x, (s, s2) -> s * s2)
Another example is to compute a list of top N distinct values from all input arrays.
reduce_agg(x, array[],
(a, b) -> slice(reverse(array_sort(array_distinct(concat(a, b)))), 1, 1000),
(a, b) -> slice(reverse(array_sort(array_distinct(concat(a, b)))), 1, 1000))
Note that this is equivalent to the following query:
SELECT array_agg(v) FROM (
SELECT DISTINCT v
FROM t, UNNEST(x) AS u(v)
ORDER BY v DESC
LIMIT 1000
)
Implementation
Efficient implementation of reduce_agg lambda function is not straightforward. Let’s consider the logic for partial aggregation.
auto s = initialState;
for (auto x : input) {
s = inputFunction(s, x);
}
This is a data-dependent loop, i.e. the next loop iteration depends on the results of
the previous iteration. inputFunction needs to be invoked on each input value x
separately. Since inputFunction is a user-defined lambda, invoking inputFunction means
evaluating an expression. And since expression evaluation in Velox is optimized for
processing large batches of values at a time, evaluating expressions on one value at
a time is very inefficient. To optimize the implementation of reduce_agg we need to
reduce the number of times we evaluate user-defined lambdas and increase the number
of values we process each time.
One approach is to
- convert all input values into states by evaluating inputFunction(initialState, x);
- split states into pairs and evaluate combineFunction on all pairs;
- repeat step (2) until we have only one state left.
Let’s say we have 1024 values to process. Step 1 evaluates inputFunction expression on 1024 values at once. Step 2 evaluates combineFunction on 512 pairs, then on 256 pairs, then on 128 pairs, 64, 32, 16, 8, 4, 2, finally producing a single state. Step 2 evaluates combineFunction 9 times. In total, this implementation evaluates user-defined expressions 10 times on multiple values each time. This is a lot more efficient than the original implementation that evaluates user-defined expressions 1024 times.
In general, given N inputs, the original implementation evaluates expressions N times while the new one log2(N) times.
Note that in case when N is not a power of two, splitting states into pairs may leave an extra state. For example, splitting 11 states produces 5 pairs + one extra state. In this case, we set aside the extra state, evaluate combineFunction on 5 pairs, then bring extra state back to a total of 6 states and continue.
A benchmark, velox/functions/prestosql/aggregates/benchmarks/ReduceAgg.cpp, shows that initial implementation of reduce_agg is 60x slower than SUM, while the optimized implementation is only 3x slower. A specialized aggregation function will always be more efficient than generic reduce_agg, hence, reduce_agg should be used only when specialized aggregation function is not available.
Finally, a side effect of the optimized implementation is that it doesn't support applying reduce_agg to sorted inputs. I.e. one cannot use reduce_agg to compute an equivalent of
SELECT a, array_agg(b ORDER BY b) FROM t GROUP BY 1
The array_agg computation depends on order of inputs. A comparable implementation using reduce_agg would look like this:
SELECT a,
reduce_agg(b, array[],
(s, x) -> concat(s, array[x]),
(s, s2) -> concat(s, s2)
ORDER BY b)
FROM t GROUP BY 1
To respect ORDER BY b, the reduce_agg would have to apply inputFunction to each input value one at a time using a data-dependent loop from above. As we saw, this is very expensive. The optimization we apply does not preserve the order of inputs, hence, cannot support the query above. Note that
Presto doesn't support applying reduce_agg to sorted inputs either.Thank you Orri Erling for brainstorming and Xiaoxuan Meng and
Pedro Eugênio Rocha Pedreira for reviewing the code.