Skip to main content

3 posts tagged with "primer"

View All Tags

A Velox Primer, Part 3

· 9 min read
Orri Erling
Software Engineer @ Meta
Pedro Pedreira
Software Engineer @ Meta

At the end of the previous article, we were halfway through running our first distributed query:

SELECT l_partkey, count(*) FROM lineitem GROUP BY l_partkey;

We discussed how a query starts, how tasks are set up, and the interactions between plans, operators, and drivers. We have also presented how the first stage of the query is executed, from table scan to partitioned output - or the producer side of the shuffle.  

In this article, we will discuss the second query stage, or the consumer side of the shuffle.

Shuffle Consumer

As presented in the previous post, on the first query stage each worker reads the table, then produces a series of information packets (SerializedPages) intended for different workers of stage 2. In our example, the lineitem table has no particular physical partitioning or clustering key. This means that any row of the table can have any value of l_partkey in any of the files forming the table. So in order to group data based on l_partkey, we first need to make sure that rows containing the same values for l_partkey are processed by the same worker – the data shuffle at the end of stage 1.

The overall query structure is as follows:

The query coordinator distributes table scan splits to stage 1 workers in no particular order. The workers process these and, as a side effect, fill destination buffers that will be consumed by stage 2 workers. Assuming there are 100 stage 2 workers, every stage 1 Driver has its own PartitionedOutput which has 100 destinations. When the buffered serializations grow large enough, these are handed off to the worker's OutputBufferManager. 

Now let’s focus on the stage 2 query fragment. Each stage 2 worker has the following plan fragment: PartitionedOutput over Aggregation over LocalExchange over Exchange.  

Each stage 2 Task corresponds to one destination in the OutputBufferManager of each stage 1 worker. The first stage 2 Tasks fetches destination zero from all the stage 1 Tasks. The second stage 2 fetches destination 1 from the first stage Tasks and so on.  Everybody talks to everybody else. The shuffle proceeds without any central coordination.

But let's zoom in to what actually happens at the start of stage 2.

The plan has a LocalExchange node after the Exchange. This becomes two pipelines: Exchange and LocalPartition on one side, and LocalExchange, HashAggregation, and PartitionedOutput on the other side. 

The Velox Task is intended to be multithreaded, with typically 5 to 30 Drivers. There can be hundreds of Tasks per stage, thus amounting to thousands of threads per stage. So, each of the 100 second stage workers is consuming 1/100 of the total output of the first stage. But it is doing this in a multithreaded manner, with many threads consuming from the ExchangeSource. We will explain this later.

In order to execute a multithreaded group by, we can either have a thread-safe hash table or we can partition the data in n disjoint streams, and then proceed aggregating each stream on its own thread. On a CPU, we almost always prefer to have threads working on their own memory, so data will be locally partitioned based on l_partkey using a local exchange. CPUs have complex cache coherence protocols to give observers a consistent ordered view of memory, so reconciliation after many cores have written the same cache line is both mandatory and expensive. Strict memory-to-thread affinity is what makes multicore scalability work. 

LocalPartition and LocalExchange

To create efficient and independent memory access patterns, the second stage reshuffles the data using a local exchange. In concept, this is like the remote exchange between tasks, but is scoped inside the Task. The producer side (LocalPartition) calculates a hash on the partitioning column l_partkey, and divides this into one destination per Driver in the consumer pipeline. The consumer pipeline has a source operator LocalExchange that reads from the queue filled by LocalPartition. See LocalPartition.h for details. Also see the code in Task.cpp for setting up the queues between local exchange producers and consumers.

While remote shuffles work with serialized data, local exchanges pass in-memory vectors between threads. This is also the first time we encounter the notion of using columnar encodings to accelerate vectorized execution. Velox became known by its extensive use of such techniques, which we call compressed execution. In this instance, we use dictionaries to slice vectors across multiple destinations – we will discuss it next.

Dictionary Magic

Query execution often requires changes to the cardinality (number of rows in a result) of the data. This is essentially what filters and joins do – they either reduce the cardinality of the data, e.g., filters or selective joins, or increase the cardinality of the data, e.g. joins with multiple key matches.

Repartitioning in LocalPartition assigns a destination to each row of the input based on the partitioning key. It then makes a vector for each destination with just the rows for that destination. Suppose rows 2, 8 and 100 of the current input hash to 1. Then the vector for destination 1 would only have rows 2, 8 and 100 from the original input. We could make a vector of three rows by copying the data. Instead, we save the copy by wrapping each column of the original input in a DictionaryVector with length 3 and indices 2, 8 and 100. This is much more efficient than copying, especially for wide and nested data.

Later on, the LocalExchange consumer thread running destination 1 will see a DictionaryVector of 3 rows. When this is accessed by the HashAggregation Operator, the aggregation sees a dictionary and will then take the indirection and will access for row 0 the value at 2 in the base (inner) vector, for row 1 the value at 8, and so forth. The consumer for destination 0 does the exact same thing but will access, for example, rows 4, 9 and 50.

The base of the dictionaries is the same on all the consumer threads. Each consumer thread just looks at a different subset. The cores read the same cache lines, but because the base is not written to (read-only), there is no cache-coherence overhead. 

To summarize, a DictionaryVector<T> is a wrapper around any vector of T. The DictionaryVector specifies indices, which give indices into the base vector. Dictionary encoding is typically used when there are few distinct values in a column. Take the strings “experiment” and “baseline”. If a column only has these values, it is far more efficient to represent it as a vector with “experiment” at 0 and “baseline” at 1, and a DictionaryVector that has, say, 1000 elements, where these are either the index 0 or 1.  

Besides this, DictionaryVectors can also be used to denote a subset or a reordering of elements of the base. Because all places that accept vectors also accept DictionaryVectors, the DictionaryVector becomes the universal way of representing selection. This is a central precept of Velox and other modern vectorized engines. We will often come across this concept.

Aggregation and Pipeline Barrier

We have now arrived at the second pipeline of stage 2. It begins with LocalExchange, which then feeds into HashAggregation. The LocalExchange picks a fraction of the Task's input specific to its local destination, about 1/number-of-drivers of the task's input.

We will talk about hash tables, their specific layout and other tricks in a later post. For now, we look at HashAggregation as a black box. This specific aggregation is a final aggregation, which is a full pipeline barrier that only produces output after all input is received.

How does the aggregation know it has received all its input? Let's trace the progress of the completion signal through the shuffles. A leaf worker knows that it is at end if the Task has received the “no more splits” message in the last task update from the coordinator.

So, if one DataSource inside a tableScan is at end and there are no more splits, this particular TableScan is not blocked and it is at end. This will have the Driver call PartitionedOutput::noMoreInput() on the tableScan's PartitionedOutput. This will cause any buffered data for any destination to get flushed and moved over to OutputBufferManager with a note that no more is coming. OutputBufferManager knows how many Drivers there are in the TableScan pipeline. After it has received this many “no more data coming” messages, it can tell all destinations that this Task will generate no more data for them. 

Now, when stage 2 tasks query stage 1 producers, they will know that they are at end when all the producers have signalled that there is no more data coming. The response to the get data for destination request has a flag identifying the last batch. The ExchangeSource on the stage 2 Task set the no-more-data flag. This is then queried by all the Drivers and each of the Exchange Operators sees this. This then calls noMoreInput in the LocalPartition. This queues up a “no more data” signal in the local exchange queues. If the LocalExchange at the start of the second pipeline of stage 2 sees a “no more data” from each of its sources, then it is at end and noMoreInput is called on the HashAggregation. 

This is how the end of data propagates. Up until now, HashAggregation has produced no output, since the counts are not known until all input is received. Now, HashAggregation starts producing batches of output, which contain the l_partkey value and the number of times this has been seen. This reaches the last PartitionedOutput, which in this case has only one destination, the final worker that produces the result set. This will be at end when all the 100 sources have reported their own end of data.

Recap

We have finally walked through the distributed execution of a simple query. We presented how data is partitioned between workers in the cluster, and then a second time over inside each worker.

Velox and Presto are designed to aggressively parallelize execution, which means creating distinct, non-overlapping sets of data to process on each thread. The more threads, the more throughput. Also remember that for CPU threads to be effective, they must process tasks that are large enough (often over 100 microseconds worth of cpu time), and not communicate too much with other threads or write to memory other threads are writing to. This is accomplished with local exchanges.

Another important thing to remember from this article is how columnar encodings (DictionaryVectors, in particular) can be used as a zero-copy way of representing selection/reorder/duplication. We will see this pattern again with filters, joins, and other relational operations.

Next we will be looking at joins, filters, and hash aggregations. Stay tuned!

A Velox Primer, Part 2

· 9 min read
Orri Erling
Software Engineer @ Meta
Pedro Pedreira
Software Engineer @ Meta

In this article, we will discuss how a distributed compute engine executes a query similar to the one presented in our first article:

SELECT l_partkey, count(*) FROM lineitem GROUP BY l_partkey;

We use the TPC-H schema to illustrate the example, and Prestissimo as the compute engine orchestrating distributed query execution. Prestissimo is responsible for the query engine frontend (parsing, resolving metadata, planning, optimizing) and distributed execution (allocating resources and shipping query fragments), and Velox is responsible for the execution of plan fragments within a single worker node. Throughout this article, we will present which functions are performed by Velox and which by the distributed engine - Prestissimo, in this example.

Query Setup

Prestissimo first receives the query through a coordinator node, which is responsible for parsing and planning the query. For our sample query, a distributed query plan with three query fragments will be created:

  1. The first fragment reads the l_partkey column from the lineitem table and divides its output according to a hash of l_partkey.
  2. The second fragment reads the output from the first fragment and updates a hash table from l_partkey containing the number of times the particular value of l_partkey has been seen (the count(*) aggregate function implementation).
  3. The final fragment then reads the content of the hash tables, once the second fragment has received all the rows from the first fragment.

The shuffle between the two first fragments partitions the data according to l_partkey. Suppose there are 100 instances of the second fragment. If the hash of l_partkey modulo 100 is 0, the row goes to the first task in the second stage; if it is 1, the row goes to the second task, and so forth. In this way, each second stage Task gets a distinct subset of the rows. The shuffle between the second and third stage is a gather, meaning that there is one Task in the third stage that will read the output of all 100 tasks in the second stage.

A Stage is the set of Tasks that share the same plan fragment. A Task is the main integration point between Prestissimo and Velox; it’s the Velox execution instance that physically processes all or part of the data that passes through the stage.

To set up the distributed execution, Prestissimo first selects the workers from the pool of Prestissimo server processes it manages. Assuming that stage 1 is 10 workers wide, it selects 10 server processes and sends the first stage plan to them. It then selects 100 workers for stage 2 and sends the second stage plan to these. The last stage that gathers the result has only one worker, so Prestissimo sends the final plan to only one worker. The set of workers for each stage may overlap, so a single worker process may host multiple stages of one query.

Let's now look more closely at what each worker does at query setup time.

Task Setup

In Prestissimo, the message that sets up a Task in a worker is called Task Update. A Task Update has the following information: the plan, configuration settings, and an optional list of splits. Splits are further qualified by what plan node they are intended for, and whether more splits for the recipient plan node and split group will be coming.

Since split generation involves enumerating files from storage (so they may take a while), Presto allows splits to be sent to workers asynchronously, such that the generation of splits can run in parallel with the execution of the first splits. Therefore, the first task update specifies the plan and the config. Subsequent ones only add more splits.

Besides the plan, the coordinator provides configs as maps from string key to string value, both top level and connector level. The connector configs have settings for each connector; connectors are used by table scan and table writer to deal with storage and file formats. These configs and other information, like thread pools, top level memory pool etc. are handed to the Task in a QueryCtx object. See velox/core/QueryCtx.h in the Velox repo for details.

From Plans to Drivers and Operators

Once the Velox Task is created, a TaskManager hands it splits to work on. This is done with Task::addSplit(), and can be done after the Task has started executing. See velox/exec/Task.h for details.

Let's zoom into what happens at Task creation: A PlanNode tree specifying what the Task does is given to the Task as part of a PlanFragment. The most important step done at Task creation is splitting the plan tree into pipelines. Each pipeline then gets a DriverFactory, which is the factory class that makes Drivers for the pipeline. The Drivers, in their turn, contain the Operators that do the work of running the query. The DriverFactories are made in LocalPlanner.cpp. See LocalPlanner::plan for details.

Following the execution model known as Volcano, the plan is represented by an operator tree where each node consumes the output of its child operators, and returns output to the parent operator. The root node is typically a PartitionedOutputNode or a TableWriteNode. The leaf nodes are either TableScanNode, ExchangeNode or ValuesNode (used for query literals). The full set of Velox PlanNode can be found at velox/core/PlanNode.h.

The PlanNodes mostly correspond to Operators. PlanNodes are not executable as such; they are only a structure describing how to make Drivers and Operators, which do the actual execution. If the tree of nodes has a single branch, then the plan is a single pipeline. If it has nodes with more than one child (input), then the second input of the node becomes a separate pipeline.

Task::start() creates the DriverFactories, which then create the Drivers. To start execution, the Drivers are queued on a thread pool executor. The main function that runs Operators is Driver::runInternal(). See this function for the details of how Operators and the Driver interface: Operator::isBlocked() determines if the Driver can advance. If it cannot, it goes off thread until a future is realized, which then puts it back on the executor.

getOutput() retrieves data from an Operator and addInputs() feeds data into another Operator. The order of execution is to advance the last Operator which can produce output and then feed this to the next Operator. if an Operator cannot produce output, then getOutput() is called on the Operator before it until one is found that can produce data. If no operator is blocked and no Operator can produce output, then the plan is at end. The noMoreInput() method is called on each operator. This can unblock production of results, for example, an OrderBy can only produce its output after it knows that it has all the input.

The Minimal Pipeline: Table Scan and Repartitioning

Table Scan. With the table scan stage of our sample query, we have one pipeline with two operators: TableScan and PartitionedOutput. Assume this pipeline has five Drivers, and that all these five Drivers go on the thread pool executor. The PartitionedOutput cannot do anything because it has no input. The TableScan::getOutput() is then called. See velox/exec/TableScan.cpp for details. The first action TableScan takes is to look for a Split with task::getSplitOrFuture(). If there is no split available, this returns a future. The Driver will then park itself off thread and install a callback on the future that will reschedule the Driver when a split is available.

It could also be the case that there is no split and the Task has been notified that no more splits will be coming. In this case TableScan would be at the end. Finally, if a Split is available, TableScan interprets it. Given a TableHandle specification provided as part of the plan (list of columns and filters), the Connector (as specified in the Split) makes a DataSource. The DataSource handles the details of IO and file and table formats.

The DataSource is then given the split. After this, DataSource::next() can be called repeatedly to get vectors (batches) of output from the file/section of file specified by the Split. If the DataSource is at the end, TableScan looks for the next split. See Connector.h for the Connector and DataSource interfaces.

Repartitioning. Now we have traced execution up to the TableScan returning its first batch of output. The Driver feeds this to PartitionedOutput::addInput(). See PartitionedOutput.cpp for details. PartitionedOutput first calculates a hash on the partitioning key, in this case, l_partkey, producing a destination number for each row in the batch RowVectorPtr input_.

Each destination has a partly filled serialization buffer (VectorStreamGroup) for each destination worker. If there are 100 Tasks in the second stage, each PartitionedOutput has 100 destinations, each with one VectorStreamGroup. The main function of a VectorStreamGroup is append(), which takes a RowVectorPtr and a set of row numbers in it. It serializes each value identified by the row numbers and adds it to the partly formed serialization. When enough rows are accumulated in the VectorStreamGroup, it produces a SerializedPage. See flush() in PartitionedOutput.cpp.

The SerializedPage is a self contained serialized packet of information that can be transmitted over the wire to the next stage. Each such page only contains rows intended for the same recipient. These pages are then queued up in the worker process' OutputBufferManager. Note the code with BlockingReason in flush(). The buffer manager maintains separate queues of all consumers of all Tasks. If a queue is full, adding output may block. This returns a future that is realized when there is space to add more data to the queue. This depends on when the Task's consumer Task fetches the data.

Shuffles in Prestissimo are implemented by PartitionedOutput at the producer and Exchange at the consumer end. The OutputBufferManager keeps ready serialized data for consumers to pick up. The binding of these to the Presto wire protocols is in TaskManager.cpp for the producer side, and in PrestoExchangeSource.cpp for the consumer side.

Recap

We presented how a plan becomes executable and how data moves between and inside Operators. We discussed that a Driver can block (go off thread) to wait for a Split to become available, or to wait for its output to be consumed. We have now scratched the surface of running a leaf stage of a distributed query. There is much more to Operators and vectors, though. In the next installment of Velox Primer, we will look at what the second stage of our minimal sample query does.

A Velox Primer, Part 1

· 6 min read
Orri Erling
Software Engineer @ Meta
Pedro Pedreira
Software Engineer @ Meta

This is the first part of a series of short articles that will take you through Velox’s internal structures and concepts. In this first part, we will discuss how distributed queries are executed, how data is shuffled among different stages, and present Velox concepts such as Tasks, Splits, Pipelines, Drivers, and Operators that enable such functionality.

Distributed Query Execution

Velox is a library that provides the functions that go into a query fragment in a distributed compute engine. Distributed compute engines, like Presto and Spark run so-called exchange parallel plans. Exchange is also known as a data shuffle, and allows data to flow from one stage to the next. Query fragments are the things connected by shuffles, and comprise the processing that is executed within a single worker node. A shuffle takes input from a set of fragments and routes rows of input to particular consumers based on a characteristic of the data, i.e., a partitioning key. The consumers of the shuffle read from the shuffle and get rows of data from whatever producer, such that the partitioning key of these rows matches the consumer.

Take the following query as an example:

SELECT key, count(*) FROM T GROUP BY key

Suppose it has n leaf fragments that scan different parts of T (the green circles at stage 1). At the end of the leaf fragment, assume there is a shuffle that shuffles the rows based on key. There are m consumer fragments (the yellow circles at stage 2), where each gets a non-overlapping selection of rows based on column key. Each consumer then constructs a hash table keyed on key, where they store a count of how many times the specific value of key has been seen.

Now, if there are 100B different values of key, the hash table would not conveniently fit on a single machine. For efficiency, there is a point in dividing this into 100 hash tables of 1B entries each. This is the point of exchange parallel scale-out. Think of shuffle as a way for consumers to each consume their own distinct slice of a large stream produced by multiple workers.

Distributed query engines like Presto and Spark both fit the above description. The difference is in, among other things, how they do the shuffle, but we will get back to that later.

Tasks and Splits

Within a worker node, the Velox representation of a query fragment is called a Task (velox::exec::Task). A task is informed by mainly two things: 

  • Plan - velox::core::PlanNode: specifies what the Task does.
  • Splits - velox::exec::Split: specifies the data the Task operates on.

The Splits correspond to the plan that the Task is executing. For the first stage Tasks (table scanning), the Splits specify pieces of files to scan. For the second stage Tasks (group by), their Splits identify the table scan Tasks from which the group by reads its input. There are file splits (velox::connector::ConnectorSplit) and remote splits (velox::exec::RemoteConnectorSplit). The first identifies data to read, the second identifies a running Task.

The distributed engine makes PlanNodes and Splits. Velox takes these and makes Tasks. Tasks send back statistics, errors and other status information to the distributed engine. 

Pipelines, Drivers and Operators

Inside a Task, there are Pipelines. Each pipeline is a linear sequence of operators (velox::exec::Operator), and operators are the objects that implement relational logic. In the case of the group by example, the first task has one pipeline, with a TableScan (velox::exec::TableScan) and a PartitionedOutput (velox::exec::PartitionedOutput). The second Task too has one pipeline, with an Exchange, a LocalExchange, HashAggregation, and a PartitionedOutput.

Each pipeline has one or more Drivers. A Driver is a container for one linear sequence of Operators, and typically runs on its own thread. The pipeline is the collection of Drivers with the same sequence of Operators. The individual Operator instances belong to each Driver. The Drivers belong to the Task. These are interlinked with smart pointers such that they are kept alive for as long as needed.

An example of a Task with two pipelines is a hash join, with separate pipelines for the build and for the probe side. This makes sense because the build must be complete before the probe can proceed. We will talk more about this later.

The Operators on each Driver communicate with each other through the Driver. The Driver picks output from one Operator, keeps tabs on stats and passes it to the next Operator. The data passed between Operators consists of vectors. In particular, an Operator produces/consumes a RowVector, which is a vector with a child vector for every column of the relation - it is the equivalent of RecordBatch in Arrow. All vectors are subclasses of velox::BaseVector.

Operator Sources, Sinks and State

The first Operator in a Driver is called a source. The source operator takes Splits to figure the file/remote Task that provides its data, and produces a sequence of RowVectors. The last operator in the pipeline is called a sink. The sink operator does not produce an output RowVector, but rather puts the data somewhere for a consumer to retrieve. This is typically a PartitionedOutput. The consumer of the PartitionedOutput is an Exchange in a different task, where the Exchange is in the source position. Operators that are neither sources or sinks are things like FilterProject, HashProbe, HashAggregation and so on, more on these later.

Operators also contain a state. They can be blocked, accepting input, have output to produce or not, or may be notified that no more input is coming. Operators do not call the API functions of other Operators directly, but instead the Driver decides which Operator to advance next. This has the benefit that no part of the Driver's state is captured in nested function calls. The Driver has a flat stack and therefore can go on and off thread at any time, without having to unwind and restore nested function frames. 

Recap

We have seen how a distributed query consists of fragments joined by shuffles. Each fragment has a Task, which has pipelines, Drivers and Operators. PlanNodes represent what the task is supposed to do. These tell the Task how to set up Drivers and Operators. Splits tell the Task where to find input, e.g. from files or from the other Tasks. A Driver corresponds to a thread of execution. It may be running or blocked, e.g. waiting for data to become available or for its consumer to consume already produced data. Operators pass data between themselves as vectors. 

In the next article we will discuss the different stages in the life of a query.