Skip to main content

2 posts tagged with "functions"

View All Tags

· 5 min read
Masha Basmanova

Definition

Reduce_aggis the only lambda aggregate Presto function. It allows users to define arbitrary aggregation logic using 2 lambda functions.
reduce_agg(inputValue T, initialState S, inputFunction(S, T, S), combineFunction(S, S, S)) → S

Reduces all non-NULL input values into a single value. inputFunction will be invoked for
each non-NULL input value. If all inputs are NULL, the result is NULL. In addition to taking
the input value, inputFunction takes the current state, initially initialState, and returns the
new state. combineFunction will be invoked to combine two states into a new state. The final
state is returned. Throws an error if initialState is NULL or inputFunction or combineFunction
returns a NULL.

Once can think of reduce_agg as using inputFunction to implement partial aggregation and combineFunction to implement final aggregation. Partial aggregation processes a list of input values and produces an intermediate state:

auto s = initialState;
for (auto x : input) {
s = inputFunction(s, x);
}

return s;

Final aggregation processes a list of intermediate states and computes the final state.

auto s = intermediates[0];
for (auto i = 1; i < intermediates.size(); ++i)
s = combineFunction(s, intermediates[i]);
}

return s;

For example, one can implement SUM aggregation using reduce_agg as follows:

reduce_agg(c, 0, (s, x) -> s + x, (s, s2) -> s + s2)

Implementation of AVG aggregation is a bit trickier. For AVG, state is a tuple of sum and count. Hence, reduce_agg can be used to compute (sum, count) pair, but it cannot compute the actual average. One needs to apply a scalar function on top of reduce_agg to get the average.

SELECT id, sum_and_count.sum / sum_and_count.count FROM (
SELECT id, reduce_agg(value, CAST(row(0, 0) AS row(sum double, count bigint)),
(s, x) -> CAST(row(s.sum + x, s.count + 1) AS row(sum double, count bigint)),
(s, s2) -> CAST(row(s.sum + s2.sum, s.count + s2.count) AS row(sum double, count bigint))) AS sum_and_count
FROM t
GROUP BY id
);

The examples of using reduce_agg to compute SUM and AVG are for illustrative purposes. One should not use reduce_agg if a specialized aggregation function is available.

One use case for reduce_agg we see in production is to compute a product of input values.

reduce_agg(c, 1.0, (s, x) -> s * x, (s, s2) -> s * s2)

Another example is to compute a list of top N distinct values from all input arrays.

reduce_agg(x, array[],
(a, b) -> slice(reverse(array_sort(array_distinct(concat(a, b)))), 1, 1000),
(a, b) -> slice(reverse(array_sort(array_distinct(concat(a, b)))), 1, 1000))

Note that this is equivalent to the following query:

SELECT array_agg(v) FROM (
SELECT DISTINCT v
FROM t, UNNEST(x) AS u(v)
ORDER BY v DESC
LIMIT 1000
)

Implementation

Efficient implementation of reduce_agg lambda function is not straightforward. Let’s consider the logic for partial aggregation.

auto s = initialState;
for (auto x : input) {
s = inputFunction(s, x);
}

This is a data-dependent loop, i.e. the next loop iteration depends on the results of the previous iteration. inputFunction needs to be invoked on each input value x separately. Since inputFunction is a user-defined lambda, invoking inputFunction means evaluating an expression. And since expression evaluation in Velox is optimized for processing large batches of values at a time, evaluating expressions on one value at a time is very inefficient. To optimize the implementation of reduce_agg we need to reduce the number of times we evaluate user-defined lambdas and increase the number of values we process each time.

One approach is to

  1. convert all input values into states by evaluating inputFunction(initialState, x);
  2. split states into pairs and evaluate combineFunction on all pairs;
  3. repeat step (2) until we have only one state left.

Let’s say we have 1024 values to process. Step 1 evaluates inputFunction expression on 1024 values at once. Step 2 evaluates combineFunction on 512 pairs, then on 256 pairs, then on 128 pairs, 64, 32, 16, 8, 4, 2, finally producing a single state. Step 2 evaluates combineFunction 9 times. In total, this implementation evaluates user-defined expressions 10 times on multiple values each time. This is a lot more efficient than the original implementation that evaluates user-defined expressions 1024 times.

In general, given N inputs, the original implementation evaluates expressions N times while the new one log2(N) times.

Note that in case when N is not a power of two, splitting states into pairs may leave an extra state. For example, splitting 11 states produces 5 pairs + one extra state. In this case, we set aside the extra state, evaluate combineFunction on 5 pairs, then bring extra state back to a total of 6 states and continue.

A benchmark, velox/functions/prestosql/aggregates/benchmarks/ReduceAgg.cpp, shows that initial implementation of reduce_agg is 60x slower than SUM, while the optimized implementation is only 3x slower. A specialized aggregation function will always be more efficient than generic reduce_agg, hence, reduce_agg should be used only when specialized aggregation function is not available.

Finally, a side effect of the optimized implementation is that it doesn't support applying reduce_agg to sorted inputs. I.e. one cannot use reduce_agg to compute an equivalent of

    SELECT a, array_agg(b ORDER BY b) FROM t GROUP BY 1

The array_agg computation depends on order of inputs. A comparable implementation using reduce_agg would look like this:

    SELECT a,
reduce_agg(b, array[],
(s, x) -> concat(s, array[x]),
(s, s2) -> concat(s, s2)
ORDER BY b)
FROM t GROUP BY 1

To respect ORDER BY b, the reduce_agg would have to apply inputFunction to each input value one at a time using a data-dependent loop from above. As we saw, this is very expensive. The optimization we apply does not preserve the order of inputs, hence, cannot support the query above. Note that

Presto doesn't support applying reduce_agg to sorted inputs either.

Thank you Orri Erling for brainstorming and Xiaoxuan Meng and

Pedro Eugênio Rocha Pedreira for reviewing the code.

· 6 min read
Masha Basmanova

Presto provides an array_sort function to sort arrays in ascending order with nulls placed at the end.

presto> select array_sort(array[2, 5, null, 1, -1]);
_col0
---------------------
[-1, 1, 2, 5, null]

There is also an array_sort_desc function that sorts arrays in descending order with nulls placed at the end.

presto> select array_sort_desc(array[2, 5, null, 1, -1]);
_col0
---------------------
[5, 2, 1, -1, null]

Both array_sort and array_sort_desc place nulls at the end of the array.

There is also a version of array_sort function that takes a comparator lambda function and uses it to sort the array.

array_sort(array(T), function(T, T, int)) -> array(T)

A common use case is to sort an array of structs using one of the struct fields as the sorting key.

presto> select array_sort(array[row('apples', 23), row('bananas', 12), row('grapes', 44)],
-> (x, y) -> if (x[2] < y[2], -1, if(x[2] > y[2], 1, 0)));

_col0
---------------------------------------------------------------------------------------
[{f0=bananas, f1=12}, {f0=apples, f1=23}, {f0=grapes, f1=44}]

This is all very nice and convenient, but there is a catch.

The documentation says that the "comparator will take two nullable arguments representing two nullable elements of the array."" Did you notice the word "nullable" in "nullable arguments" and "nullable elements"? Do you think it is important? It is Ok if the answer is No or Not Really. Turns out this "nullable" thing is very important. The comparator is expected to handle null inputs and should not assume that inputs are not null or that nulls are handled automatically.

Why is it important to handle null inputs? Let’s see what happens if the comparator doesn’t handle nulls.

presto> select array_sort(array[2, 3, null, 1],
(x, y) -> if (x < y, -1, if (x > y, 1, 0)));
_col0
-----------------
[2, 3, null, 1]

The result array is not sorted! If subsequent logic relies on the array to be sorted the query will silently return wrong results. And if there is no logic that relies on the sortedness of the array then why waste CPU cycles on sorting?

Why is the array not sorted? That’s because the comparator returns 0 whenever x or y is null.

    x < y  returns null if x or y is null, then
x > y returns null if x or y is null, then
result is 0

This confuses the sorting algorithm as it sees that 1 == null, 2 == null, 3 == null, but 1 != 2 and 1 != 3. The algorithm assumes that the comparator function is written correctly, e.g. if a < b then b > a and if a == b and b == c then a == c. Comparator function that doesn’t handle nulls does not satisfy these rules and causes unpredictable results.

To handle null inputs, the comparator function needs to be modified, for example, like so:

    (x, y) -> CASE WHEN x IS NULL THEN 1
WHEN y IS NULL THEN -1
WHEN x < y THEN -1
WHEN x > y THEN 1
ELSE 0 END
presto> select array_sort(array[2, 3, null, 1],
-> (x, y) -> CASE WHEN x IS NULL THEN 1
-> WHEN y IS NULL THEN -1
-> WHEN x < y THEN -1
-> WHEN x > y THEN 1
-> ELSE 0 END
-> );
_col0
-----------------
[1, 2, 3, null]

This is longer and harder to read, but the result array is sorted properly. The new comparator says that null is greater than any other value, so null is placed at the end of the array.

Note: When (x, y) return -1, the algorithm assumes that x <= y.

Writing comparators correctly is not easy. Writing comparators that handle null inputs is even harder. Having no feedback when a comparator is written incorrectly makes it yet harder to spot bugs and fix them before a query lands in production and starts producing wrong results.

I feel that Presto’s array_sort function with a custom comparator is dangerous and hard to use and I wonder if it makes sense to replace it with a safer, easier to use alternative.

array_sort(array(T), function(T, U)) -> array(T)

This version takes an array and a transform lambda function that specifies how to compute sorting keys from the array elements.

To sort array of structs by one of the struct fields, one would write

presto> select array_sort(array[row('apples', 23), row('bananas', 12), row('grapes', 44)],
x -> x[2])

_col0
---------------------------------------------------------------------------------------
[{f0=bananas, f1=12}, {f0=apples, f1=23}, {f0=grapes, f1=44}]

This version would sort the array by the sorting keys computed using the specified lambda in ascending order placing nulls at the end of the array.

A matching array_sort_desc function would sort in descending order placing nulls at the end of the array.

These functions would be easier to write and read and null handling will happen automatically.

We implemented these functions in Velox.

We also added partial support for array_sort with a comparator lambda function. Expression compiler in Velox analyzes the comparator expression to determine whether it can be re-written to the alternative version of array_sort. If so, it re-writes the expression and evaluates it. Otherwise, it throws an unsupported exception.

For example,

    array_sort(a, (x, y) -> if (x[2] < y[2], -1, if(x[2] > y[2], 1, 0)))

is re-written to

    array_sort(a, x -> x[2])

This rewrite allows Prestissimo and Presto-on-Spark-on-Velox to support common use cases and do so efficiently.

The rewrite handles a few different ways to express the same comparator. Some examples:

    // becomes array_sort(a, f(x))
(x, y) -> if(f(x) < f(y), -1, if(f(x) > f(y), 1, 0))

// becomes array_sort_desc(a, f(x))
(x, y) -> if(f(x) < f(y), 1, if(f(x) > f(y), -1, 0))

// becomes array_sort(a, f(x))
(x, y) -> if(f(x) < f(y), -1, if(f(x) = f(y), 0, 1))

// becomes array_sort(a, f(x))
(x, y) -> if(f(x) = f(y), 0, if(f(x) < f(y), -1, 1))

// becomes array_sort(a, f(x))
(x, y) -> if(f(y) < f(x), 1, if(f(x) < f(y), -1, 0))

Why didn’t we implement full support for comparator lambda functions in array_sort? Actually, we couldn’t think of an efficient way to do that in a vectorized engine. Velox doesn’t use code generation and interprets expressions. It can do that efficiently if it can process data in large batches. array_sort with custom comparator doesn’t lend itself well to such processing.

array_sort with a transform lambda works well in a vectorized engine. To process a batch of arrays, Velox first evaluates the transform lambda on all the elements of the arrays, then sorts the results.

For further reading, consider the Vectorized and performance-portable Quicksort blog post from Google.

Thank you Orri Erling for brainstorming and Xiaoxuan Meng for reviewing the code.