perf(index): reduce TwoFileShuffler peak memory via interleave sort#7295
perf(index): reduce TwoFileShuffler peak memory via interleave sort#7295wjones127 wants to merge 1 commit into
Conversation
Previously the shuffler used `rechunk_stream_by_size` to accumulate and concat incoming batches before sorting with `take`, producing two full-size data copies in sequence and peaking at ~3-4× batch_size_bytes. Replace both steps with a single pass: - Accumulate incoming batches in a `Vec<RecordBatch>` without concat. - `sort_to_interleave_indices` builds `(part_id, batch_idx, row_idx)` tuples over the UInt32 part-id columns only, sorts them in one `sort_unstable` call, and returns `(batch_idx, row_idx)` pairs + per-partition counts. - The sorted output is streamed to the data file via `interleave_batches` in fixed-size chunks (8 Ki rows), so the interleave output never exceeds a small constant fraction of the source data. Peak memory drops to ~1× batch_size_bytes, which also allows increasing `LANCE_SHUFFLE_BATCH_BYTES` aggressively to reduce the number of flush groups and improve read-time I/O patterns. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Benchmark: TwoFileShuffler memory & throughput vs batch sizeRun against 500k rows × 128-dim float32 (244 MB vectors, 1024 partitions):
Flush groups (fewer = less random I/O at query time)Why peak RSS looks flatThe benchmark pre-generates all 244 MB of test data before any shuffle runs, so the OS RSS baseline already reflects that allocation. The marginal memory attributable to the shuffler itself is small and gets lost in the noise. The real benefit shows up in the theoretical model:
The old code did Practical impactWith the old 3–4× overhead, setting Reproduce# (from lance-index/ directory, bench not checked in)
cargo bench --bench shuffler_mem 2>&1 |
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
Memory benchmark: TwoFileShuffler interleave-sortMethodologyTwo approaches:
Rust micro-benchmark (shuffler phase in isolation)Dataset: 500K rows × 128-dim float32, 244 MB, 1024 partitions.
Old code scales at ~3× batch_size (accumulated input + concat copy + sorted-take copy). At 256 MB with 244 MB total data, old code peaks at ~3× 244 MB ≈ 732 MB extra above baseline. New code overhead is essentially zero: the sort buffer (n × 12 bytes) and streaming 8192-row interleave chunks are negligible.
Python end-to-end (main vs branch)Dataset: ~1M rows × 3072-dim float32, ~204 MB PQ-encoded (index replaced between runs).
End-to-end peak is dominated by k-means training (~9–10 GB), which masks most of the shuffler improvement. The 128 MB case shows the clearest signal (−790 MB), consistent with the micro-benchmark prediction of ~382 MB shuffler overhead eliminated plus measurement variance. The smaller batch sizes show measurement noise in both directions. The improvement is most impactful for production-scale workloads where users set |
201d21a to
a9a314d
Compare
There was a problem hiding this comment.
Pull request overview
This PR optimizes TwoFileShuffler (vector v3) to reduce peak memory during IVF shuffling by avoiding full-data concatenation/take copies and instead sorting only partition-id columns to produce interleave indices, then streaming the sorted output via interleave_batches in fixed-size chunks.
Changes:
- Replaces the previous
rechunk_stream_by_size+concat_batches+takeapproach with a sort over(part_id, batch_idx, row_idx)keys to generate interleave indices. - Streams sorted output to
shuffle_data.lanceinSHUFFLE_WRITE_CHUNK_ROWSchunks usinginterleave_batches, and writes corresponding per-flush-group offsets toshuffle_offsets.lance. - Simplifies loss tracking by summing per-input-batch metadata directly (no
Mutex/atomic counters).
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let mut total_loss = 0.0f64; | ||
| let mut accumulated: Vec<RecordBatch> = Vec::new(); | ||
| let mut acc_bytes: usize = 0; | ||
|
|
||
| let mut rechunked = std::pin::pin!(rechunked); | ||
| while let Some(batch) = rechunked.next().await { | ||
| num_batches_ref.fetch_add(1, std::sync::atomic::Ordering::Relaxed); | ||
| let mut data = std::pin::pin!(data); | ||
| while let Some(batch) = data.next().await { |
| let pid = *part_id as usize; | ||
| if pid < num_partitions { | ||
| partition_counts[pid] += 1; | ||
| } else { | ||
| log::warn!( | ||
| "Partition ID {} is out of range [0, {})", | ||
| pid, | ||
| num_partitions | ||
| ); | ||
| } | ||
| interleave_indices.push((*batch_idx as usize, *row_idx as usize)); |
| /// Sorts `accumulated` batches by partition ID and writes the result to the data | ||
| /// and offsets files. | ||
| /// | ||
| /// Returns `(total_rows_written, per_partition_row_counts)`. | ||
| async fn flush_shuffle_batch( | ||
| accumulated: Vec<RecordBatch>, | ||
| file_writer: &mut FileWriter, | ||
| offsets_writer: &mut FileWriter, | ||
| offsets_schema: Arc<Schema>, |
| let total_rows: usize = part_id_columns.iter().map(|a| a.len()).sum(); | ||
| let mut keys: Vec<(u32, u32, u32)> = Vec::with_capacity(total_rows); | ||
| for (batch_idx, col) in part_id_columns.iter().enumerate() { | ||
| let batch_idx = batch_idx as u32; | ||
| for (row_idx, &part_id) in col.values().iter().enumerate() { | ||
| keys.push((part_id, batch_idx, row_idx as u32)); | ||
| } | ||
| } | ||
| keys.sort_unstable_by_key(|k| k.0); | ||
|
|
||
| let mut partition_counts = vec![0u64; num_partitions]; | ||
| let mut interleave_indices = Vec::with_capacity(total_rows); | ||
| for (part_id, batch_idx, row_idx) in &keys { |
Summary
rechunk_stream_by_size+concat_batches+take(two full-data copies, peak ~3–4×batch_size_bytes) with a single-pass sort over the UInt32 part-id columns only, producing(batch_idx, row_idx)interleave indices.interleave_batchesin 8 Ki-row chunks, so the interleave output adds only a small constant overhead above the accumulated source data.batch_size_bytes, which enables settingLANCE_SHUFFLE_BATCH_BYTESmuch larger to reduce flush-group count and improve read-time I/O locality.Closes #7299.
🤖 Generated with Claude Code