Skip to main content

One post tagged with "spill"

View All Tags

Multi-Round Lazy Start Merge

· 6 min read
Meng Duan (macduan)
Software Engineer @ ByteDance
Xiaoxuan Meng
Software Engineer @ Meta
Pedro Pedreira
Software Engineer @ Meta

Background

Efficiently merging sorted data partitions at scale is crucial for a variety of training data preparation workloads, especially for Generative Recommenders (GRs) a new paradigm introduced in the paper Actions Speak Louder than Words: Trillion-Parameter Sequential Transducers for Generative Recommendations. A key requirement is to merge training data across partitions—for example, merging hourly partitions into daily ones—while ensuring that all rows sharing the same primary key are stored consecutively. Training data is typically partitioned and bucketed by primary key, with rows sharing the same key stored consecutively, so merging across partitions essentially becomes a multi-way merge problem.

Normally, Apache Spark can be used for this sort-merge requirement — for example, via CLUSTER BY. However, training datasets for a single job can often reach the PB scale, which in turn generates shuffle data at PB scale. Although we typically apply bucketing and ordering by key when preparing training data in production, Spark can eliminate the shuffle when merging training data from multiple hourly partitions. However, each Spark task can only read the files planned from various partitions within a split sequentially, placing them into the sorter and spilling as needed. Only after all files have been read does Spark perform a sort-merge of the spilled files. This process produces a large number of small spill files, which further degrades efficiency.

Moreover, Spark’s spill is row-based with a low compression ratio, resulting in approximately 4 times amplification compared to the original columnar training data in the data lake. These factors significantly degrade task stability and performance. Velox has a LocalMerge operator that can be introduced into Apache Spark via Gluten or PySpark on Velox.

Note: To keep the focus on merging, the remainder of this article also assumes that each partition’s training data is already sorted by primary key—a common setup in training data pipelines.

LocalMerge Operator

The LocalMerge operator consolidates its sources’ outputs into a single, sorted stream of rows. It runs single-threaded, while its upstream sources may run multi-threaded within the same task, producing multiple sorted inputs concurrently. For example, when merging 24 hourly partitions into a single daily partition (as shown in the figure below), the merge plan fragment is split into two pipelines:

  • Pipeline 0: contains two operators, TableScan and CallbackSink. 24 drivers are instantiated to scan the 24 hourly partitions.
  • Pipeline 1: contains only a single operator, LocalMerge, with one driver responsible for performing the sort merge.

A CallbackSink operator is installed at the end of each driver in Pipeline 0. It pushes the TableScan operator’s output vectors into the queues backing the merge streams. Inside LocalMerge, a TreeOfLosers performs a k-way merge over the 24 merge streams supplied by the Pipeline 0 drivers, producing a single, globally sorted output stream.

Multi-Round Spill Merge

Although LocalMerge minimizes comparisons during merging, preserves row-ordering guarantees, and cleanly isolates the single-threaded merge from the multi-threaded scan phase for predictable performance, it can cause substantial memory pressure—particularly in training-data pipelines. In these workloads, extremely wide tables are common, and even after column pruning, thousands of columns may remain.

Moreover, training data is typically stored in PAX-style formats such as Parquet, ORC, or DRWF. Using Parquet as an example, the reader often needs to keep at least one page per column in memory. As a result, simply opening a Parquet file with thousands of columns can consume significant memory even before any merging occurs. Wide schemas further amplify per-column metadata, dictionary pages, and decompression buffers, inflating the overall footprint. In addition, the k-way merge must hold input vectors from multiple sources concurrently, which drives peak memory usage even higher.

To cap memory usage and avoid OOM when merging a large number of partitions, we extend LocalMerge to process fewer local sources at a time, leverage existing spill facilities to persist intermediate results, and introduce lazy-start activation for merge inputs. Using the case of merging 24 hourly partitions into a single daily partition, the process is organized into two phases:

Phase 1

  1. Break the scan-and-merge into multiple rounds (e.g., 3 rounds).
  2. In each round, lazily start a limited number of drivers (e.g., drivers 0–7, eight at a time).
  3. The started drivers scan data and push it into the queues backing their respective merge streams.
  4. Perform an in-memory k-way merge and spill the results, producing a spill-file group (one or more spill files per group).
  5. After all inputs from drivers 0–7 are consumed and spilled, the drivers will be closed, and close the file streams opened by their TableScan operators, and release associated memory.
  6. Repeat the above steps for the remaining rounds (drivers 8–15, then drivers 16–23), ensuring peak memory stays within budget.

Phase 2

  1. Create a concatenated file stream for each spill-file group produced in Phase 1.
  2. Schedule one async callback for each concatenated stream to prefetch and push data into a merge stream.
  3. Merge the outputs of the three merge streams using a k-way merge (e.g., a loser-tree), and begin streaming the final, globally sorted results to downstream operators.
  4. The output batch rows is limited adaptively by estimating row size from the merge streams which use the averaged row size from the first batch.

How To Use

Set local_merge_spill_enabled to true to enable spilling for the LocalMerge operator (it is false by default). Then, set local_merge_max_num_merge_sources to control the number of merge sources per round according to your memory management strategy.

Note: An executor must be configured for spilling, as it would schedule an asynchronous callback for each concatenated stream to prefetch data and push it into the merge stream.

Future Work

The number of merge sources is adjusted dynamically based on available memory, rather than being determined by the local_merge_max_num_merge_sources parameter. The process starts with a small number of sources, such as 2, and incrementally increases this number for subsequent rounds (e.g., to 4) as long as sufficient memory is available. The number of sources stops increasing once it reaches a memory-constrained limit.

Acknowledgements

Thanks to Xiaoxuan Meng and Pedro Pederia for their guidance, review, and brainstorming. I also appreciate the excellent collaboration and work from my colleagues, Xiang Yao, Gang Wang, and Weixin Xu.