diff --git a/Cargo.lock b/Cargo.lock index 28fe6a43a5c5..7033b5da4431 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -106,7 +106,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -117,7 +117,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -1248,7 +1248,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -2132,7 +2132,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -2958,7 +2958,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -3222,7 +3222,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" dependencies = [ "libc", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -3331,10 +3331,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" dependencies = [ "fastrand", - "getrandom 0.3.4", + "getrandom 0.4.2", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -3344,7 +3344,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "230a1b821ccbd75b185820a1f1ff7b14d21da1e442e22c0863ea5f08771a8874" dependencies = [ "rustix", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -4018,7 +4018,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index dd2c872ede50..ec447c90d468 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -155,6 +155,11 @@ name = "read_with_rowgroup" required-features = ["arrow", "async"] path = "./examples/read_with_rowgroup.rs" +[[example]] +name = "spill_page_store" +required-features = ["arrow", "cli"] +path = "./examples/spill_page_store.rs" + [[test]] name = "arrow_writer_layout" required-features = ["arrow"] diff --git a/parquet/examples/spill_page_store.rs b/parquet/examples/spill_page_store.rs new file mode 100644 index 000000000000..38e1a40e0494 --- /dev/null +++ b/parquet/examples/spill_page_store.rs @@ -0,0 +1,652 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Demonstrates the pluggable [`PageStore`] API by implementing a **spilling** +//! page store that keeps completed Parquet pages in temp files instead of on the +//! heap. +//! +//! # Background +//! +//! Parquet requires every column chunk to be contiguous in the file, but Arrow +//! record batches arrive with all columns interleaved. So while a row group is +//! being written, the writer must buffer every column's completed, compressed +//! pages until the row group is flushed. Peak write memory therefore grows with +//! the row group size — painful for wide schemas with large, skewed columns +//! (e.g. a few `id` columns next to a pile of fat string columns). +//! +//! A [`PageStore`] lets that page buffer live somewhere other than the heap. This +//! example plugs in a [`TempFilePageStore`] (one temp file per column chunk) and +//! compares peak writer memory against the default in-memory buffering. +//! +//! # Concurrency +//! +//! To keep every core busy this example splits the work across two thread pools, +//! sized to the machine: `N/2` **generator** threads building record batches and +//! `N/2` **encoder** threads encoding them (`N` = available cores). It uses the +//! low-level [`ArrowColumnWriter`] API (rather than the single-threaded +//! [`ArrowWriter`]) so encoding can be parallelized: +//! +//! - Each generator claims the next batch index from a shared counter, builds +//! that batch deterministically, and sends it to the main thread. +//! - The main thread re-orders batches by index (so the output is deterministic +//! regardless of how the generators interleave) and broadcasts each one to all +//! encoders. Batches are cheap to share — the columns are reference-counted. +//! - The columns are distributed across the encoder threads, each owning a +//! disjoint subset of [`ArrowColumnWriter`]s backed by their own [`PageStore`]. +//! Each encoder picks out its columns from every batch via [`compute_leaves`]. +//! - The finished [`ArrowColumnChunk`]s are collected, sorted into schema order, +//! and spliced into the row group — streaming back out of the store one page +//! at a time — on the main thread. +//! +//! (For clarity this hand-rolls the pools with threads and channels; production +//! code would use rayon or tokio.) +//! +//! # Running +//! +//! ```sh +//! # Default in-memory page buffering (baseline): peak writer memory grows with +//! # the row group. +//! cargo run --release --features cli --example spill_page_store +//! +//! # Spill completed pages to temp files: peak writer memory stays bounded by +//! # the in-flight encoder buffers, independent of the row group size. +//! cargo run --release --features cli --example spill_page_store -- --spill +//! +//! # Make the schema wider / the skew worse: +//! cargo run --release --features cli --example spill_page_store -- --spill --large-string-columns 40 +//! ``` +//! +//! [`ArrowWriter`]: parquet::arrow::ArrowWriter +//! [`ArrowColumnWriter`]: parquet::arrow::arrow_writer::ArrowColumnWriter +//! [`ArrowColumnChunk`]: parquet::arrow::arrow_writer::ArrowColumnChunk +//! [`compute_leaves`]: parquet::arrow::arrow_writer::compute_leaves +//! [`PageStore`]: parquet::arrow::arrow_writer::PageStore + +use std::collections::HashMap; +use std::fs::File; +use std::io::{Read, Seek, SeekFrom, Write}; +use std::path::PathBuf; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::sync::mpsc::sync_channel; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::Instant; + +use tempfile::NamedTempFile; + +use arrow::array::{ArrayRef, Int64Array, RecordBatch, StringBuilder}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use bytes::Bytes; +use clap::Parser; +use parquet::arrow::ArrowSchemaConverter; +use parquet::arrow::arrow_writer::{ + ArrowColumnChunk, ArrowColumnWriter, ArrowRowGroupWriterFactory, PageKey, PageStore, + PageStoreFactory, compute_leaves, +}; +use parquet::basic::Compression; +use parquet::errors::Result; +use parquet::file::properties::WriterProperties; +use parquet::file::writer::SerializedFileWriter; +use sysinfo::{ProcessRefreshKind, ProcessesToUpdate, RefreshKind, System}; + +/// Write a skewed, wide Parquet file and compare peak writer memory with and +/// without a spilling `PageStore`. +#[derive(Parser)] +#[command(version, about, long_about = None)] +struct Args { + /// Number of large (~8 KiB average) string columns to write — the fat + /// columns that make the in-memory page buffer blow up. + #[arg(long, default_value_t = 10)] + large_string_columns: usize, + + /// Number of small (~20 byte average) string columns to write. + #[arg(long, default_value_t = 5)] + small_string_columns: usize, + + /// Number of integer (`Int64`) columns to write. + #[arg(long, default_value_t = 3)] + int_columns: usize, + + /// Total number of rows, all funnelled into a single row group. + #[arg(long, default_value_t = 2048)] + rows: usize, + + /// Rows per input batch fed to the writer. Each batch is dropped right after + /// it is written, so only the writer's internal buffering accumulates. + #[arg(long, default_value_t = 8192)] + batch_size: usize, + + /// Use the spilling [`TempFilePageStore`] instead of the default in-memory + /// page buffering. + #[arg(long)] + spill: bool, + + /// Optional path to write the Parquet file to. Defaults to `io::sink()` so + /// the produced file bytes never live on the heap and the reported memory + /// reflects only the writer's page buffering. + #[arg(long)] + output: Option, +} + +/// Average length, in bytes, of values in a "large" string column. +const LARGE_AVG_LEN: usize = 8 * 1024; +/// Average length, in bytes, of values in a "small" string column. +const SMALL_AVG_LEN: usize = 20; + +// --------------------------------------------------------------------------- +// The spilling page store. +// +// A `PageStore` is intentionally "dumb": it maps an opaque, store-allocated +// `PageKey` to a blob of bytes and knows nothing about pages, dictionaries, or +// ordering. The caller (`ArrowWriter`) keeps the handles and decides what they +// mean. That is all a backend has to implement to move the page buffer off the +// heap. +// --------------------------------------------------------------------------- + +/// Shared spill bookkeeping, aggregated across every per-column +/// [`TempFilePageStore`]. Each store holds an `Arc` to the same instance, so the +/// totals and per-file records survive the stores being dropped at row group +/// flush. +#[derive(Debug, Default)] +struct SpillStats { + /// Total bytes handed to `put` and written to a temp file. + bytes_written: AtomicU64, + /// Total bytes read back out of a temp file by `take` (at row group flush). + bytes_read: AtomicU64, + /// One record per temp file, pushed when each store is dropped. + files: Mutex>, +} + +/// What a single spill temp file ended up holding. +#[derive(Debug)] +struct FileRecord { + /// Leaf column index the store was created for. + column_index: usize, + /// Filesystem path of the temp file (valid while the store was alive). + path: PathBuf, + /// Total bytes written into the file. + bytes: u64, +} + +/// A spilling [`PageStore`]: one temp file per column chunk. +/// +/// `put` appends the blob to the file and records its `(offset, len)`; `take` +/// seeks and reads it back. A [`NamedTempFile`] is used (rather than an +/// anonymous one) so the file has a reportable path; the OS reclaims it when the +/// store is dropped at row group flush. +struct TempFilePageStore { + file: NamedTempFile, + /// Leaf column index this store backs (used only for reporting). + column_index: usize, + /// Logical end of the file — where the next `put` appends. + end: u64, + /// `(offset, len)` for each stored blob, indexed by the `PageKey` we minted. + locs: Vec<(u64, usize)>, + /// Shared, cross-store spill bookkeeping (see [`SpillStats`]). + stats: Arc, +} + +impl TempFilePageStore { + fn new(stats: Arc, column_index: usize) -> Result { + Ok(Self { + file: NamedTempFile::new()?, + column_index, + end: 0, + locs: Vec::new(), + stats, + }) + } +} + +impl PageStore for TempFilePageStore { + fn put(&mut self, value: Bytes) -> Result { + // Always append at the logical end (a prior `take` may have moved the + // OS file cursor). + self.file.seek(SeekFrom::Start(self.end))?; + self.file.write_all(&value)?; + self.stats + .bytes_written + .fetch_add(value.len() as u64, Ordering::Relaxed); + let key = PageKey::new(self.locs.len() as u64); + self.locs.push((self.end, value.len())); + self.end += value.len() as u64; + Ok(key) + } + + fn take(&mut self, key: PageKey) -> Result { + let (offset, len) = self.locs[key.get() as usize]; + let mut buf = vec![0u8; len]; + self.file.seek(SeekFrom::Start(offset))?; + self.file.read_exact(&mut buf)?; + self.stats + .bytes_read + .fetch_add(len as u64, Ordering::Relaxed); + Ok(Bytes::from(buf)) + } + + // `memory_size` keeps its default of 0: once a blob is handed to `put` it + // lives in the temp file, not on the heap. That zero is what makes + // `ArrowWriter::memory_size()` drop to just the in-flight encoder buffers. +} + +impl Drop for TempFilePageStore { + fn drop(&mut self) { + // Record this file's path and final byte count before the NamedTempFile + // is unlinked, so `main` can list it after the writer is closed. + self.stats.files.lock().unwrap().push(FileRecord { + column_index: self.column_index, + path: self.file.path().to_path_buf(), + bytes: self.end, + }); + } +} + +/// Creates a fresh [`TempFilePageStore`] for every column chunk the writer opens, +/// handing each one an `Arc` to the shared [`SpillStats`]. +#[derive(Debug)] +struct TempFilePageStoreFactory { + stats: Arc, +} + +impl PageStoreFactory for TempFilePageStoreFactory { + fn create(&self, column_index: usize) -> Result> { + Ok(Box::new(TempFilePageStore::new( + self.stats.clone(), + column_index, + )?)) + } +} + +// --------------------------------------------------------------------------- +// Schema + data generation. +// --------------------------------------------------------------------------- + +/// Build the wide, skewed schema: a few integer columns, then a bunch of small +/// string columns, then the fat large string columns. +fn build_schema(args: &Args) -> SchemaRef { + let mut fields = Vec::new(); + for i in 0..args.int_columns { + fields.push(Field::new(format!("int_{i}"), DataType::Int64, false)); + } + for i in 0..args.small_string_columns { + fields.push(Field::new(format!("small_str_{i}"), DataType::Utf8, false)); + } + for i in 0..args.large_string_columns { + fields.push(Field::new(format!("large_str_{i}"), DataType::Utf8, false)); + } + Arc::new(Schema::new(fields)) +} + +/// The per-batch column counts a generator needs (a small `Copy` view of `Args` +/// so it can be handed to each generator thread). +#[derive(Clone, Copy)] +struct BatchSpec { + int_columns: usize, + small_string_columns: usize, + large_string_columns: usize, +} + +/// Fill `buf` with a deterministic value of exactly `len` bytes derived from the +/// counter `n`. +/// +/// The 20-digit zero-padded counter makes every value distinct, so the fat +/// columns stay plain-encoded (high cardinality) rather than dictionary-encoding +/// away; the remainder is padded with a fixed `a`–`z` cycle. No RNG — the content +/// is a pure function of `n`, so a run is fully reproducible. +fn fill_value(buf: &mut String, n: u64, len: usize) { + use std::fmt::Write; + buf.clear(); + let _ = write!(buf, "{n:020}"); + while buf.len() < len { + buf.push((b'a' + (buf.len() % 26) as u8) as char); + } + buf.truncate(len); // all bytes are ASCII, so this is a clean char boundary +} + +/// Build a string column of `rows` values, each exactly `len` bytes, keyed by the +/// global row index (`row_offset + r`) and a per-column `salt` so values are +/// distinct within the column. +fn make_string_array(rows: usize, row_offset: u64, salt: u64, len: usize) -> ArrayRef { + let mut builder = StringBuilder::with_capacity(rows, rows * len); + let mut value = String::new(); + for r in 0..rows { + let n = (row_offset + r as u64).wrapping_mul(101).wrapping_add(salt); + fill_value(&mut value, n, len); + builder.append_value(&value); + } + Arc::new(builder.finish()) +} + +/// Build the record batch covering rows `[row_offset, row_offset + rows)`. +/// +/// Fully deterministic in `row_offset`: every column's values are a pure function +/// of the global row index, so the batch a generator produces depends only on its +/// claimed index — not on thread timing. +fn make_batch(schema: &SchemaRef, spec: BatchSpec, row_offset: u64, rows: usize) -> RecordBatch { + let mut columns: Vec = Vec::with_capacity(schema.fields().len()); + let mut salt = 0u64; // distinguishes columns so they don't all hold equal values + for _ in 0..spec.int_columns { + let s = salt; + salt += 1; + let vals: Vec = (0..rows) + .map(|r| (row_offset + r as u64 + s) as i64) + .collect(); + columns.push(Arc::new(Int64Array::from(vals))); + } + for _ in 0..spec.small_string_columns { + columns.push(make_string_array(rows, row_offset, salt, SMALL_AVG_LEN)); + salt += 1; + } + for _ in 0..spec.large_string_columns { + columns.push(make_string_array(rows, row_offset, salt, LARGE_AVG_LEN)); + salt += 1; + } + RecordBatch::try_new(schema.clone(), columns).unwrap() +} + +// --------------------------------------------------------------------------- +// Memory reporting. +// --------------------------------------------------------------------------- + +/// Current process resident set size (RSS), in bytes. +fn rss_bytes(system: &mut System) -> u64 { + let Ok(pid) = sysinfo::get_current_pid() else { + return 0; + }; + system.refresh_processes_specifics( + ProcessesToUpdate::Some(&[pid]), + true, + ProcessRefreshKind::everything(), + ); + system.process(pid).map(|p| p.memory()).unwrap_or(0) +} + +fn mib(bytes: usize) -> f64 { + bytes as f64 / (1024.0 * 1024.0) +} + +fn main() -> Result<()> { + let start = Instant::now(); + let args = Args::parse(); + let schema = build_schema(&args); + + // One uncompressed row group for the whole dataset, so the page buffer (the + // thing a PageStore governs) is the only thing that grows. Uncompressed keeps + // the reported numbers easy to reason about — the buffer holds the raw page + // bytes. + let props = Arc::new( + WriterProperties::builder() + .set_compression(Compression::UNCOMPRESSED) + .set_max_row_group_row_count(Some(args.rows * 2)) + .build(), + ); + + let spill_stats = Arc::new(SpillStats::default()); + + // Total logical payload across the large columns — the part that dominates. + let large_payload = args.large_string_columns * LARGE_AVG_LEN * args.rows; + println!( + "Writing {} rows × {} columns ({} int, {} small-string ~{}B, {} large-string ~{}B)", + args.rows, + args.int_columns + args.small_string_columns + args.large_string_columns, + args.int_columns, + args.small_string_columns, + SMALL_AVG_LEN, + args.large_string_columns, + LARGE_AVG_LEN, + ); + println!( + "Page buffering: {} (large-column payload ≈ {:.1} MiB)", + if args.spill { + "TempFilePageStore (spilling to temp files)" + } else { + "InMemoryPageStore (default, on the heap)" + }, + mib(large_payload), + ); + // Split the cores: half generate batches, half encode them. + let num_cores = thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(2); + let num_generators = (num_cores / 2).max(1); + let num_encoders = (num_cores / 2).max(1); + let num_batches = args.rows.div_ceil(args.batch_size); + println!( + "Cores: {num_cores} ({num_generators} generator threads, {num_encoders} encoder threads, \ + {num_batches} batches of ≤{} rows)", + args.batch_size, + ); + + let mut system = System::new_with_specifics(RefreshKind::everything()); + let rss_start = rss_bytes(&mut system); + + // Build the lower-level file writer so columns can be encoded in parallel. + // `io::sink()` discards the produced file bytes so they never inflate the + // heap — the measured peak then reflects only the writer's page buffering. + let parquet_schema = ArrowSchemaConverter::new() + .with_coerce_types(props.coerce_types()) + .convert(&schema)?; + let writer_sink: Box = match &args.output { + Some(path) => Box::new(File::create(path)?), + None => Box::new(std::io::sink()), + }; + let mut file_writer = + SerializedFileWriter::new(writer_sink, parquet_schema.root_schema_ptr(), props.clone())?; + + // One `ArrowColumnWriter` per leaf column, each backed by its own PageStore. + // With `--spill`, that store is a TempFilePageStore, so each worker thread's + // completed pages land in a temp file instead of the heap. + let mut factory = ArrowRowGroupWriterFactory::new(&file_writer, schema.clone()); + if args.spill { + factory = factory.with_page_store_factory(Arc::new(TempFilePageStoreFactory { + stats: spill_stats.clone(), + })); + } + let col_writers = factory.create_column_writers(0)?; + + // Distribute the columns across the encoder threads round-robin, so the fat + // columns (contiguous at the end of the schema) spread evenly. Each encoder + // owns a disjoint set of `(column index, ArrowColumnWriter)` pairs. + let mut encoder_cols: Vec> = + (0..num_encoders).map(|_| Vec::new()).collect(); + for (idx, writer) in col_writers.into_iter().enumerate() { + encoder_cols[idx % num_encoders].push((idx, writer)); + } + + // Spawn the encoder pool. Each encoder receives whole batches over a small + // bounded channel (back-pressure), encodes only its own columns from each, + // tracks the peak bytes each writer held resident, and returns the finished + // chunks plus those peaks. + let mut encoder_txs = Vec::with_capacity(num_encoders); + let mut encoder_handles = Vec::with_capacity(num_encoders); + for mut cols in encoder_cols { + let (tx, rx) = sync_channel::>(1); + let schema = schema.clone(); + let handle = thread::spawn(move || -> Result> { + let mut peaks = vec![0usize; cols.len()]; + for batch in rx { + for (slot, (idx, writer)) in cols.iter_mut().enumerate() { + let field = &schema.fields()[*idx]; + for leaf in compute_leaves(field, batch.column(*idx))? { + writer.write(&leaf)?; + } + // `memory_size()` is the bytes this column's writer holds + // resident — pages in its PageStore plus in-flight encoder + // buffers. With the in-memory store it climbs toward the whole + // column chunk; with spilling it stays flat. + peaks[slot] = peaks[slot].max(writer.memory_size()); + } + } + cols.into_iter() + .zip(peaks) + .map(|((idx, writer), peak)| { + let peak = peak.max(writer.memory_size()); + Ok((idx, writer.close()?, peak)) + }) + .collect() + }); + encoder_txs.push(tx); + encoder_handles.push(handle); + } + + // Spawn the generator pool. Each generator claims the next batch index from a + // shared counter, builds that batch deterministically, and sends `(index, + // batch)` to the main thread. The bounded channel applies back-pressure so + // generators don't race arbitrarily far ahead of the encoders. + // Keep the result channel shallow: with wide/huge schemas each batch can be + // hundreds of MiB, and they pipeline, so the in-flight input — not the + // (spilled) page buffer — dominates process RSS. A depth of 1 bounds it to + // roughly the working set the generators and encoders are actively touching. + let next_batch = Arc::new(AtomicUsize::new(0)); + let (result_tx, result_rx) = sync_channel::<(usize, Arc)>(1); + let mut gen_handles = Vec::with_capacity(num_generators); + let spec = BatchSpec { + int_columns: args.int_columns, + small_string_columns: args.small_string_columns, + large_string_columns: args.large_string_columns, + }; + for _ in 0..num_generators { + let schema = schema.clone(); + let next_batch = next_batch.clone(); + let tx = result_tx.clone(); + let (rows, batch_size) = (args.rows, args.batch_size); + let handle = thread::spawn(move || { + loop { + let i = next_batch.fetch_add(1, Ordering::Relaxed); + if i >= num_batches { + break; + } + let row_offset = (i * batch_size) as u64; + let rows_in = batch_size.min(rows - i * batch_size); + let batch = make_batch(&schema, spec, row_offset, rows_in); + if tx.send((i, Arc::new(batch))).is_err() { + break; // main hung up (shouldn't happen on the happy path) + } + } + }); + gen_handles.push(handle); + } + drop(result_tx); // only the generators hold senders now + + // Main: pull generated batches, re-order them by index, and broadcast each in + // index order to every encoder. Re-ordering keeps the written row order — and + // therefore the output file — deterministic regardless of generator timing. + let mut peak_rss = rss_start; + let mut pending: HashMap> = HashMap::new(); + let mut next_emit = 0usize; + while next_emit < num_batches { + if let Some(batch) = pending.remove(&next_emit) { + for tx in &encoder_txs { + tx.send(batch.clone()).unwrap(); // cheap: clones an Arc + } + next_emit += 1; + peak_rss = peak_rss.max(rss_bytes(&mut system)); + } else { + let (i, batch) = result_rx.recv().expect("generators ended early"); + pending.insert(i, batch); + } + } + drop(encoder_txs); // signal end-of-input to the encoders + for handle in gen_handles { + handle.join().expect("generator thread panicked"); + } + + // Collect the encoded chunks, sort them back into schema order, and splice + // each into the row group (streaming pages back out of the store). Columns + // must be appended in schema order. + let mut chunks: Vec<(usize, ArrowColumnChunk, usize)> = Vec::new(); + for handle in encoder_handles { + chunks.extend(handle.join().expect("encoder thread panicked")?); + peak_rss = peak_rss.max(rss_bytes(&mut system)); + } + chunks.sort_by_key(|(idx, _, _)| *idx); + + let mut row_group_writer = file_writer.next_row_group()?; + let mut peak_writer_memory = 0usize; + for (_idx, chunk, col_peak) in chunks { + peak_writer_memory += col_peak; + chunk.append_to_row_group(&mut row_group_writer)?; + peak_rss = peak_rss.max(rss_bytes(&mut system)); + } + row_group_writer.close()?; + file_writer.close()?; + peak_rss = peak_rss.max(rss_bytes(&mut system)); + let elapsed = start.elapsed(); + let written = args.rows; + + println!(); + println!("Done. Wrote {written} rows."); + println!( + "Peak writer memory (Σ per-column): {:>8.1} MiB <- bytes the column writers held on the heap", + mib(peak_writer_memory), + ); + println!( + "Peak process RSS delta : {:>8.1} MiB", + (peak_rss.saturating_sub(rss_start)) as f64 / (1024.0 * 1024.0), + ); + println!( + "Total elapsed time : {:>8.3} s", + elapsed.as_secs_f64(), + ); + println!(); + if args.spill { + let mut files = spill_stats.files.lock().unwrap(); + files.sort_by_key(|f| f.column_index); + let written_bytes = spill_stats.bytes_written.load(Ordering::Relaxed); + let read_bytes = spill_stats.bytes_read.load(Ordering::Relaxed); + println!( + "Spill temp files created : {:>8} <- one per column chunk", + files.len(), + ); + println!( + "Total bytes spilled (written) : {:>8.1} MiB", + mib(written_bytes as usize), + ); + println!( + "Total bytes read back (take) : {:>8.1} MiB", + mib(read_bytes as usize), + ); + println!(); + println!("Per spill file (column index, type, path, bytes stored):"); + for f in files.iter() { + // `schema` is flat, so the leaf column index is also the field index. + let field = schema.field(f.column_index); + println!( + " col {:>3} {:<8} {} {} bytes ({:.1} MiB)", + f.column_index, + format!("{}", field.data_type()), + f.path.display(), + f.bytes, + mib(f.bytes as usize), + ); + } + println!(); + println!( + "With spilling, peak writer memory is bounded by the in-flight encoder \n\ + buffers (a page or two per column), not the {:.1} MiB row group.", + mib(large_payload), + ); + } else { + println!( + "Re-run with --spill to keep those pages off the heap and watch peak \n\ + writer memory drop well below the {:.1} MiB row group payload.", + mib(large_payload), + ); + } + + Ok(()) +} diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 79542caed9b7..a5e0bfc3ae64 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -21,7 +21,6 @@ use crate::column::chunker::ContentDefinedChunker; use bytes::Bytes; use std::io::{Read, Write}; -use std::iter::Peekable; use std::slice::Iter; use std::sync::{Arc, Mutex}; use std::vec::IntoIter; @@ -37,6 +36,7 @@ use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_pr use crate::arrow::ArrowSchemaConverter; use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder; +use crate::basic::PageType; use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter}; use crate::column::page_encryption::PageEncryptor; use crate::column::writer::encoder::ColumnValueEncoder; @@ -49,7 +49,6 @@ use crate::encryption::encrypt::FileEncryptor; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData}; use crate::file::properties::{WriterProperties, WriterPropertiesPtr}; -use crate::file::reader::{ChunkReader, Length}; use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter}; use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift}; use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor}; @@ -58,6 +57,11 @@ use levels::{ArrayLevels, calculate_array_levels}; mod byte_array; mod levels; +#[doc(inline)] +pub use crate::column::page_store::{ + InMemoryPageStore, InMemoryPageStoreFactory, PageKey, PageStore, PageStoreFactory, +}; + /// Encodes [`RecordBatch`] to parquet /// /// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] will be encoded @@ -263,8 +267,12 @@ impl ArrowWriter { let file_writer = SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?; - let row_group_writer_factory = + let mut row_group_writer_factory = ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone()); + if let Some(page_store_factory) = options.page_store_factory { + row_group_writer_factory = + row_group_writer_factory.with_page_store_factory(page_store_factory); + } let cdc_chunkers = props_ptr .content_defined_chunking() @@ -556,6 +564,7 @@ pub struct ArrowWriterOptions { skip_arrow_metadata: bool, schema_root: Option, schema_descr: Option, + page_store_factory: Option>, } impl ArrowWriterOptions { @@ -569,6 +578,21 @@ impl ArrowWriterOptions { Self { properties, ..self } } + /// Sets the [`PageStoreFactory`] used to buffer completed pages while a row + /// group is being written. + /// + /// By default (an [`InMemoryPageStore`] per column chunk) completed pages + /// are buffered on the heap until the row group is flushed, so peak memory + /// grows with the row group size. Supplying a factory that spills to a temp + /// file or object storage instead bounds peak write memory, decoupling it + /// from the row group size while keeping large, read-optimal row groups. + pub fn with_page_store_factory(self, page_store_factory: Arc) -> Self { + Self { + page_store_factory: Some(page_store_factory), + ..self + } + } + /// Skip encoding the embedded arrow metadata (defaults to `false`) /// /// Parquet files generated by the [`ArrowWriter`] contain embedded arrow schema @@ -603,52 +627,108 @@ impl ArrowWriterOptions { } } -/// A single column chunk produced by [`ArrowColumnWriter`] -#[derive(Default)] +/// A single column chunk produced by [`ArrowColumnWriter`]. +/// +/// Holds the serialized page blobs (each page's header ‖ compressed data, in +/// write order) in a [`PageStore`], plus the handles needed to read them back, +/// in order, when the chunk is spliced into the output file. struct ArrowColumnChunkData { length: usize, - data: Vec, + store: Box, + keys: Vec, + /// The dictionary page's serialized blobs (header ‖ data), held in memory + /// rather than the store. + /// + /// A dictionary page is produced at most once and bounded by + /// `dict_page_size_limit`, but it must be written *first* in the chunk even + /// though the data pages reach the writer before it (see + /// [`PageWriter::defers_dictionary_ordering`]). Spilling it would only + /// round-trip ~1 page to the backend and straight back, so it is kept here + /// and emitted ahead of the data pages at splice. Empty for non-dictionary + /// columns. + dictionary: Vec, } -impl Length for ArrowColumnChunkData { - fn len(&self) -> u64 { - self.length as _ +impl ArrowColumnChunkData { + fn new(store: Box) -> Self { + Self { + length: 0, + store, + keys: Vec::new(), + dictionary: Vec::new(), + } + } + + /// Append a data-page blob to the store, recording its handle in write + /// order. + fn push(&mut self, value: Bytes) -> Result<()> { + let key = self.store.put(value)?; + self.keys.push(key); + Ok(()) } -} -impl ChunkReader for ArrowColumnChunkData { - type T = ArrowColumnChunkReader; + /// Retain a dictionary-page blob in memory (emitted first at splice). + fn push_dictionary(&mut self, value: Bytes) { + self.dictionary.push(value); + } - fn get_read(&self, start: u64) -> Result { - assert_eq!(start, 0); // Assume append_column writes all data in one-shot - Ok(ArrowColumnChunkReader( - self.data.clone().into_iter().peekable(), - )) + /// Total serialized size of the in-memory dictionary page, in bytes. + fn dictionary_len(&self) -> usize { + self.dictionary.iter().map(Bytes::len).sum() } - fn get_bytes(&self, _start: u64, _length: usize) -> Result { - unimplemented!() + /// Bytes this chunk currently holds on the heap: whatever the store keeps + /// resident (zero for a spilling backend) plus the in-memory dictionary + /// page. + fn memory_size(&self) -> usize { + self.store.memory_size() + self.dictionary_len() } } -/// A [`Read`] for [`ArrowColumnChunkData`] -struct ArrowColumnChunkReader(Peekable>); +/// A streaming [`Read`] over one column chunk's buffered pages, in final file +/// order: the in-memory dictionary page (if any) first, then the data pages. +/// +/// Each data-page blob is taken back out of the [`PageStore`] *as it is +/// consumed* and released immediately afterwards, so splicing a chunk into the +/// output file never materializes more than a single page in memory at a time. +/// This is what keeps the splice phase within the memory bound for a spilling +/// backend (an in-memory store already holds the bytes, so it is unaffected). +struct StreamingColumnChunkReader { + /// Dictionary-page blobs, emitted before any data page. + dictionary: IntoIter, + store: Box, + keys: IntoIter, + /// The blob currently being drained into the output; emptied as it is read. + current: Bytes, +} + +impl StreamingColumnChunkReader { + fn new(data: ArrowColumnChunkData) -> Self { + Self { + dictionary: data.dictionary.into_iter(), + store: data.store, + keys: data.keys.into_iter(), + current: Bytes::new(), + } + } +} -impl Read for ArrowColumnChunkReader { +impl Read for StreamingColumnChunkReader { fn read(&mut self, out: &mut [u8]) -> std::io::Result { - let buffer = loop { - match self.0.peek_mut() { - Some(b) if b.is_empty() => { - self.0.next(); - continue; - } - Some(b) => break b, - None => return Ok(0), + // Refill from the next blob whenever the current one is drained: the + // dictionary page first, then each data page from the store. + while self.current.is_empty() { + if let Some(blob) = self.dictionary.next() { + self.current = blob; + } else if let Some(key) = self.keys.next() { + self.current = self.store.take(key).map_err(std::io::Error::other)?; + } else { + return Ok(0); } - }; + } - let len = buffer.len().min(out.len()); - let b = buffer.split_to(len); + let len = self.current.len().min(out.len()); + let b = self.current.split_to(len); out[..len].copy_from_slice(&b); Ok(len) } @@ -660,7 +740,6 @@ impl Read for ArrowColumnChunkReader { /// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows type SharedColumnChunk = Arc>; -#[derive(Default)] struct ArrowPageWriter { buffer: SharedColumnChunk, #[cfg(feature = "encryption")] @@ -668,6 +747,15 @@ struct ArrowPageWriter { } impl ArrowPageWriter { + /// Create a page writer that buffers completed pages in `store`. + fn new(store: Box) -> Self { + Self { + buffer: Arc::new(Mutex::new(ArrowColumnChunkData::new(store))), + #[cfg(feature = "encryption")] + page_encryptor: None, + } + } + #[cfg(feature = "encryption")] pub fn with_encryptor(mut self, page_encryptor: Option) -> Self { self.page_encryptor = page_encryptor; @@ -726,12 +814,35 @@ impl PageWriter for ArrowPageWriter { spec.bytes_written = compressed_size as u64; buf.length += compressed_size; - buf.data.push(header); - buf.data.push(data); + if spec.page_type == PageType::DICTIONARY_PAGE { + // Held in memory and emitted first at splice — see + // `ArrowColumnChunkData::dictionary`. The buffer-relative offset in + // `spec` (the dictionary arrives after the data pages on this path) + // is rewritten to its true, dictionary-first position at splice. + buf.push_dictionary(header); + buf.push_dictionary(data); + } else { + buf.push(header)?; + buf.push(data)?; + } Ok(spec) } + fn defers_dictionary_ordering(&self) -> bool { + // The Arrow chunk is buffered in full and spliced at row-group flush, so + // data pages may be accepted before the dictionary page and reordered + // then. This lets `GenericColumnWriter` stream dictionary-column data + // pages straight through instead of buffering them in memory. + true + } + + fn buffered_memory_size(&self) -> usize { + // Only what is actually resident: a spilling store reports ~0 here even + // though the chunk's bytes have all passed through it. + self.buffer.try_lock().unwrap().memory_size() + } + fn close(&mut self) -> Result<()> { Ok(()) } @@ -785,12 +896,39 @@ impl ArrowColumnChunk { &mut self.close } - /// Calls [`SerializedRowGroupWriter::append_column`] with this column's data + /// Splices this column's buffered pages into the row group, streaming them + /// back out of the [`PageStore`] one page at a time. pub fn append_to_row_group( self, writer: &mut SerializedRowGroupWriter<'_, W>, ) -> Result<()> { - writer.append_column(&self.data, self.close) + let ArrowColumnChunk { data, mut close } = self; + + // On the Arrow path the dictionary page is produced *after* the data + // pages (so the data pages can stream straight through rather than + // accumulating in memory), but it must be written *first*. The encoder + // therefore recorded buffer-relative page offsets in production order; + // rewrite them to the final dictionary-first layout (dict at 0, data + // pages following) so the splice's offset remap lands them correctly. + let dictionary_len = data.dictionary_len(); + if dictionary_len > 0 { + close.metadata = close + .metadata + .into_builder() + .set_dictionary_page_offset(Some(0)) + .set_data_page_offset(dictionary_len as i64) + .build()?; + if let Some(offset_index) = close.offset_index.as_mut() { + let mut offset = dictionary_len as i64; + for location in offset_index.page_locations.iter_mut() { + location.offset = offset; + offset += location.compressed_page_size as i64; + } + } + } + + let reader = StreamingColumnChunkReader::new(data); + writer.append_column_from_read(reader, close) } } @@ -1082,6 +1220,7 @@ pub struct ArrowRowGroupWriterFactory { schema: SchemaDescPtr, arrow_schema: SchemaRef, props: WriterPropertiesPtr, + page_store_factory: Arc, #[cfg(feature = "encryption")] file_encryptor: Option>, } @@ -1098,11 +1237,23 @@ impl ArrowRowGroupWriterFactory { schema, arrow_schema, props, + page_store_factory: Arc::new(InMemoryPageStoreFactory), #[cfg(feature = "encryption")] file_encryptor: file_writer.file_encryptor(), } } + /// Set the [`PageStoreFactory`] used to allocate the buffer for each column + /// chunk, e.g. to spill completed pages to a temp file or object storage + /// instead of the heap. Defaults to [`InMemoryPageStoreFactory`]. + pub fn with_page_store_factory( + mut self, + page_store_factory: Arc, + ) -> Self { + self.page_store_factory = page_store_factory; + self + } + fn create_row_group_writer(&self, row_group_index: usize) -> Result { let writers = self.create_column_writers(row_group_index)?; Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema)) @@ -1127,12 +1278,13 @@ impl ArrowRowGroupWriterFactory { #[cfg(feature = "encryption")] fn column_writer_factory(&self, row_group_idx: usize) -> ArrowColumnWriterFactory { ArrowColumnWriterFactory::new() + .with_page_store_factory(self.page_store_factory.clone()) .with_file_encryptor(row_group_idx, self.file_encryptor.clone()) } #[cfg(not(feature = "encryption"))] fn column_writer_factory(&self, _row_group_idx: usize) -> ArrowColumnWriterFactory { - ArrowColumnWriterFactory::new() + ArrowColumnWriterFactory::new().with_page_store_factory(self.page_store_factory.clone()) } } @@ -1159,6 +1311,8 @@ pub fn get_column_writers( /// Creates [`ArrowColumnWriter`] instances struct ArrowColumnWriterFactory { + /// Allocates the per-column-chunk [`PageStore`] backing each page writer. + page_store_factory: Arc, #[cfg(feature = "encryption")] row_group_index: usize, #[cfg(feature = "encryption")] @@ -1168,6 +1322,7 @@ struct ArrowColumnWriterFactory { impl ArrowColumnWriterFactory { pub fn new() -> Self { Self { + page_store_factory: Arc::new(InMemoryPageStoreFactory), #[cfg(feature = "encryption")] row_group_index: 0, #[cfg(feature = "encryption")] @@ -1175,6 +1330,15 @@ impl ArrowColumnWriterFactory { } } + /// Use `page_store_factory` to allocate the buffer for each column chunk. + pub fn with_page_store_factory( + mut self, + page_store_factory: Arc, + ) -> Self { + self.page_store_factory = page_store_factory; + self + } + #[cfg(feature = "encryption")] pub fn with_file_encryptor( mut self, @@ -1199,8 +1363,9 @@ impl ArrowColumnWriterFactory { column_index, &column_path, )?; + let store = self.page_store_factory.create(column_index)?; Ok(Box::new( - ArrowPageWriter::default().with_encryptor(page_encryptor), + ArrowPageWriter::new(store).with_encryptor(page_encryptor), )) } @@ -1208,9 +1373,10 @@ impl ArrowColumnWriterFactory { fn create_page_writer( &self, _column_descriptor: &ColumnDescPtr, - _column_index: usize, + column_index: usize, ) -> Result> { - Ok(Box::::default()) + let store = self.page_store_factory.create(column_index)?; + Ok(Box::new(ArrowPageWriter::new(store))) } /// Gets an [`ArrowColumnWriter`] for the given `data_type`, appending the @@ -1738,6 +1904,141 @@ mod tests { statistics::Statistics, }; + /// A [`PageStore`] that allocates *sparse, non-contiguous* handles and keeps + /// blobs in a `HashMap` — nothing like the default `Vec`. Used to + /// prove the writer relies only on the opaque-handle contract and never on + /// handles being dense `Vec` indices. Records how many blobs were stored. + #[derive(Debug, Default)] + struct RecordingPageStore { + next: u64, + blobs: HashMap, + puts: Arc, + } + + impl PageStore for RecordingPageStore { + fn put(&mut self, value: Bytes) -> Result { + // Deliberately non-sequential, never-zero handles. + let id = 100 + self.next * 7; + self.next += 1; + self.puts.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + self.blobs.insert(id, value); + Ok(PageKey::new(id)) + } + + fn take(&mut self, key: PageKey) -> Result { + self.blobs + .remove(&key.get()) + .ok_or_else(|| ParquetError::General(format!("missing key {}", key.get()))) + } + } + + #[derive(Debug)] + struct RecordingPageStoreFactory { + puts: Arc, + } + + impl PageStoreFactory for RecordingPageStoreFactory { + fn create(&self, _column_index: usize) -> Result> { + Ok(Box::new(RecordingPageStore { + puts: self.puts.clone(), + ..Default::default() + })) + } + } + + /// A custom [`PageStore`] must produce byte-identical files to the in-memory + /// default, across dictionary and non-dictionary columns and multiple row + /// groups (so multiple store instances are exercised). + #[test] + fn custom_page_store_is_byte_identical_to_default() { + let schema = Arc::new(Schema::new(vec![ + Field::new("i", DataType::Int32, true), + // A low-cardinality string column to exercise the dictionary path. + Field::new("s", DataType::Utf8, true), + ])); + let i = Int32Array::from(vec![Some(1), None, Some(3), Some(4), Some(5), Some(6)]); + let s = StringArray::from(vec![ + Some("a"), + Some("bb"), + Some("a"), + None, + Some("bb"), + Some("ccc"), + ]); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(i), Arc::new(s)]).unwrap(); + + // Small row groups so multiple column chunks (hence multiple store + // instances) are produced. + let props = WriterProperties::builder() + .set_max_row_group_row_count(Some(3)) + .build(); + + let write = |factory: Option>| { + let mut buffer = Vec::new(); + let mut opts = ArrowWriterOptions::new().with_properties(props.clone()); + if let Some(factory) = factory { + opts = opts.with_page_store_factory(factory); + } + let mut writer = + ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + buffer + }; + + let default_bytes = write(None); + + let puts = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let custom_bytes = write(Some(Arc::new(RecordingPageStoreFactory { + puts: puts.clone(), + }))); + + assert!( + puts.load(std::sync::atomic::Ordering::Relaxed) > 0, + "custom PageStore was never written to" + ); + assert_eq!( + default_bytes, custom_bytes, + "a custom PageStore must produce byte-identical output to the default" + ); + } + + /// A dictionary-encoded column written through the deferred-ordering Arrow + /// path must round-trip correctly even with the offset index disabled, when + /// only the chunk-level dictionary/data page offsets are rewritten (there is + /// no offset index to rebuild). Spans multiple data pages so the + /// dictionary-first reordering is exercised. + #[test] + fn dictionary_column_round_trips_with_offset_index_disabled() { + let schema = Arc::new(Schema::new(vec![Field::new("k", DataType::Int32, true)])); + + // Low cardinality so the column stays dictionary-encoded; enough rows to + // span several data pages within a single row group. + let values: Vec> = (0..50_000).map(|i| Some(i % 8)).collect(); + let array = Int32Array::from(values.clone()); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap(); + + let props = WriterProperties::builder() + .set_offset_index_disabled(true) + .set_data_page_row_count_limit(4096) + .build(); + let opts = ArrowWriterOptions::new().with_properties(props); + + let mut buffer = Vec::new(); + let mut writer = + ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), values.len()).unwrap(); + let read: Vec = reader.collect::>().unwrap(); + let read_values: Vec> = read + .iter() + .flat_map(|b| b.column(0).as_primitive::().iter()) + .collect(); + assert_eq!(read_values, values); + } + #[test] fn arrow_writer() { // define schema diff --git a/parquet/src/column/mod.rs b/parquet/src/column/mod.rs index 115c8dd01b80..9c7e77d29cba 100644 --- a/parquet/src/column/mod.rs +++ b/parquet/src/column/mod.rs @@ -125,5 +125,6 @@ pub(crate) mod page_encryption; #[cfg(not(feature = "encryption"))] #[path = "page_encryption_disabled.rs"] pub(crate) mod page_encryption; +pub mod page_store; pub mod reader; pub mod writer; diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs index 4cfc07a02883..88cc0d18ef73 100644 --- a/parquet/src/column/page.rs +++ b/parquet/src/column/page.rs @@ -430,6 +430,40 @@ pub trait PageWriter: Send { /// either data page or dictionary page. fn write_page(&mut self, page: CompressedPage) -> Result; + /// Whether this writer resolves the final page layout itself (at flush) + /// rather than committing bytes to their final position as pages arrive. + /// + /// The dictionary page of a column chunk must be written *first*, but it is + /// not finalized until every value has been seen. A writer that commits + /// bytes live (e.g. straight to a file) therefore relies on the column + /// writer buffering the dictionary-encoded data pages in memory until the + /// dictionary page is ready — see [`GenericColumnWriter`]'s `data_pages`. + /// + /// A writer that instead buffers the whole chunk and splices it later (the + /// [`ArrowWriter`] path) can accept data pages *before* the dictionary page + /// and order them itself at flush. Returning `true` tells the column writer + /// to skip that in-memory buffering and stream dictionary-column data pages + /// straight through, bounding the column writer's memory. + /// + /// [`GenericColumnWriter`]: crate::column::writer::GenericColumnWriter + /// [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter + fn defers_dictionary_ordering(&self) -> bool { + false + } + + /// The number of bytes this writer is currently holding **in memory** for + /// pages it has been handed (i.e. completed pages not yet committed to their + /// final destination). + /// + /// Used by the column writer to report its memory footprint. The default is + /// `0`: a writer that streams pages straight to their destination retains + /// nothing. A writer that buffers pages should report what it actually holds + /// on the heap — which, when it spills to a backing store, can be far less + /// than the bytes written. + fn buffered_memory_size(&self) -> usize { + 0 + } + /// Closes resources and flushes underlying sink. /// Page writer should not be used after this method is called. fn close(&mut self) -> Result<()>; diff --git a/parquet/src/column/page_store.rs b/parquet/src/column/page_store.rs new file mode 100644 index 000000000000..c6d90c4f7685 --- /dev/null +++ b/parquet/src/column/page_store.rs @@ -0,0 +1,226 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Pluggable storage for completed, serialized page blobs. +//! +//! While a row group is being written the [`ArrowWriter`] must buffer every +//! column's encoded pages, because Parquet requires each column chunk to be +//! contiguous in the file while record batches arrive with all columns interleaved. +//! By default that buffer lives on the heap, so the writer's peak memory grows +//! with the row group size. A [`PageStore`] lets the buffer live somewhere else +//! — a local temp file, object storage, etc. — bounding peak write memory +//! independently of the row group size. +//! +//! [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter + +use std::fmt::Debug; + +use bytes::Bytes; + +use crate::errors::{ParquetError, Result}; + +/// An opaque, store-allocated handle to a blob held by a [`PageStore`]. +/// +/// Handles are allocated by the store — densely and sequentially — and are only +/// meaningful to the store that produced them. The caller treats them as opaque +/// tokens and decides what they *mean* (ordering, which one is the dictionary +/// page, etc.). +/// +/// Letting the store allocate the handle (rather than the caller choosing keys) +/// lets each backend pick the cheapest possible locator with no hashing: an +/// in-memory backend uses the handle as an index into a `Vec`, a temp-file +/// backend as an index into a `Vec<(offset, len)>`, and so on. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct PageKey(u64); + +impl PageKey { + /// Create a handle wrapping `raw`. + /// + /// A [`PageStore`] implementation calls this to mint the handle it returns + /// from [`put`](PageStore::put). The value is opaque to the caller, so a + /// store is free to use a dense counter, a packed locator, or anything else + /// it can later resolve in [`take`](PageStore::take). + pub const fn new(raw: u64) -> Self { + Self(raw) + } + + /// The raw value passed to [`new`](Self::new). + pub const fn get(self) -> u64 { + self.0 + } +} + +/// A pluggable store for completed, serialized page blobs. +/// +/// The store is intentionally "dumb": it only maps an opaque [`PageKey`] to a +/// blob of bytes. It knows nothing about pages, dictionaries, ordering, or +/// offsets. The caller keeps the handles it gets back from [`put`](Self::put) +/// and decides what they mean. +/// +/// Each store instance is owned by a single column writer and mutated by one +/// thread at a time (both methods take `&mut self`), so it needs no internal +/// synchronization — hence only `Send`, not `Sync`. +/// +/// The default ([`InMemoryPageStore`]) keeps blobs on the heap. Configure a +/// different backend via +/// [`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory). +pub trait PageStore: Send { + /// Store `value`, returning a handle that can later be passed to + /// [`take`](Self::take). + fn put(&mut self, value: Bytes) -> Result; + + /// Take back the blob previously stored under `key`. + /// + /// The caller takes ownership of the returned bytes and will **not** request + /// `key` again, so the store may release any resources backing it — eagerly + /// here, or when the store is dropped. + fn take(&mut self, key: PageKey) -> Result; + + /// The number of bytes this store currently holds **in memory** (resident + /// on the heap), used to report the writer's memory footprint. + /// + /// The default is `0`, which is exactly right for a backend that moves + /// every blob off-heap (a temp file, object storage): the bytes it has been + /// handed no longer occupy heap. The in-memory backend overrides this to + /// report its resident blobs. A backend that keeps a partial in-memory + /// buffer should report that buffer's size. + fn memory_size(&self) -> usize { + 0 + } +} + +/// Creates a fresh [`PageStore`] for each column chunk. +/// +/// See +/// [`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory). +pub trait PageStoreFactory: Send + Sync + Debug { + /// Create a new, empty [`PageStore`] for the leaf column at `column_index`. + /// + /// `column_index` is a hint a backend may use to e.g. name spill files or + /// shard across a bounded pool; it carries no ordering or coordination + /// requirement. + fn create(&self, column_index: usize) -> Result>; +} + +/// The default [`PageStore`], holding blobs on the heap in a `Vec`. +/// +/// This is byte-for-byte equivalent to the writer's historical buffering +/// behavior and adds no overhead: peak memory still grows with the row group +/// size. Use a spilling backend to bound it. +#[derive(Debug, Default)] +pub struct InMemoryPageStore { + blobs: Vec, + /// Running total of resident blob bytes, kept in step with `put`/`take`. + resident: usize, +} + +impl PageStore for InMemoryPageStore { + fn put(&mut self, value: Bytes) -> Result { + let key = PageKey(self.blobs.len() as u64); + self.resident += value.len(); + self.blobs.push(value); + Ok(key) + } + + fn take(&mut self, key: PageKey) -> Result { + // Replace the slot with an empty `Bytes` so the stored blob is released + // as soon as it is taken, keeping memory bounded while the chunk is + // streamed into the output file. + let blob = self + .blobs + .get_mut(key.0 as usize) + .map(std::mem::take) + .ok_or_else(|| ParquetError::General(format!("invalid page key {}", key.0)))?; + self.resident -= blob.len(); + Ok(blob) + } + + fn memory_size(&self) -> usize { + self.resident + } +} + +/// Factory for [`InMemoryPageStore`] — the default used by +/// [`ArrowWriter`](crate::arrow::arrow_writer::ArrowWriter). +#[derive(Debug, Default)] +pub struct InMemoryPageStoreFactory; + +impl PageStoreFactory for InMemoryPageStoreFactory { + fn create(&self, _column_index: usize) -> Result> { + Ok(Box::new(InMemoryPageStore::default())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn in_memory_round_trips_blobs_in_handle_order() { + let mut store = InMemoryPageStore::default(); + let k0 = store.put(Bytes::from_static(b"hello")).unwrap(); + let k1 = store.put(Bytes::from_static(b"world")).unwrap(); + assert_ne!(k0, k1); + assert_eq!(&store.take(k0).unwrap()[..], b"hello"); + assert_eq!(&store.take(k1).unwrap()[..], b"world"); + } + + #[test] + fn in_memory_take_releases_the_slot() { + let mut store = InMemoryPageStore::default(); + let k = store.put(Bytes::from_static(b"abc")).unwrap(); + assert_eq!(&store.take(k).unwrap()[..], b"abc"); + // A second take yields the emptied placeholder rather than the blob, + // confirming the bytes were released on the first take. + assert!(store.take(k).unwrap().is_empty()); + } + + #[test] + fn in_memory_invalid_key_errors() { + let mut store = InMemoryPageStore::default(); + assert!(store.take(PageKey(99)).is_err()); + } + + #[test] + fn in_memory_reports_resident_bytes() { + let mut store = InMemoryPageStore::default(); + assert_eq!(store.memory_size(), 0); + let k0 = store.put(Bytes::from_static(b"hello")).unwrap(); + let k1 = store.put(Bytes::from_static(b"!")).unwrap(); + assert_eq!(store.memory_size(), 6); + store.take(k0).unwrap(); + assert_eq!(store.memory_size(), 1); + store.take(k1).unwrap(); + assert_eq!(store.memory_size(), 0); + } + + #[test] + fn default_store_memory_size_is_zero() { + // A spilling backend that does not override `memory_size` reports 0, + // reflecting that its blobs no longer occupy the heap. + struct OffHeap; + impl PageStore for OffHeap { + fn put(&mut self, _value: Bytes) -> Result { + Ok(PageKey::new(0)) + } + fn take(&mut self, _key: PageKey) -> Result { + Ok(Bytes::new()) + } + } + assert_eq!(OffHeap.memory_size(), 0); + } +} diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 4e53230bbf89..9e2c9fb9ee6f 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -632,7 +632,18 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { /// of the current memory usage and not the final anticipated encoded size. #[cfg(feature = "arrow")] pub(crate) fn memory_size(&self) -> usize { - self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size() + // In-flight encoder buffers, plus any completed pages still held on the + // heap: the dictionary-column data pages buffered here (column-at-a-time + // path), plus whatever the page writer keeps resident. A page writer + // that spills completed pages off-heap reports far less than the bytes + // it was handed, so this tracks real memory rather than bytes written. + self.encoder.estimated_memory_size() + + self + .data_pages + .iter() + .map(|page| page.data().len()) + .sum::() + + self.page_writer.buffered_memory_size() } /// Returns total number of bytes written by this column writer so far. @@ -1271,7 +1282,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { }; // Check if we need to buffer data page or flush it to the sink directly. - if self.encoder.has_dictionary() { + // + // For dictionary-encoded columns the dictionary page must be written + // first, but it is not final until all values are seen, so completed + // data pages are normally buffered here until `close`. A page writer + // that defers final layout (the Arrow path) instead orders pages itself + // at flush, so we stream the data pages straight through and never let + // them accumulate in memory. + if self.encoder.has_dictionary() && !self.page_writer.defers_dictionary_ordering() { self.data_pages.push_back(compressed_page); } else { self.write_data_page(compressed_page)?; diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 942013ea6238..8ec16ba36739 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -684,6 +684,30 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { pub fn append_column( &mut self, reader: &R, + close: ColumnCloseResult, + ) -> Result<()> { + // Position a reader at the start of the buffered chunk, then splice the + // bytes through the shared streaming path. + let metadata = &close.metadata; + let src_offset = metadata + .dictionary_page_offset() + .unwrap_or_else(|| metadata.data_page_offset()); + let read = reader.get_read(src_offset as _)?; + self.append_column_from_read(read, close) + } + + /// Splice an already-encoded column chunk into the row group, reading its + /// bytes sequentially from `read`. + /// + /// `read` must be positioned at the start of the chunk (the dictionary page + /// if present, otherwise the first data page — i.e. `src_offset` below) and + /// yield exactly the chunk's compressed bytes. Unlike [`Self::append_column`] + /// this consumes an owned [`Read`], which lets the caller stream the bytes + /// back from a [`PageStore`](crate::column::page_store::PageStore) one page + /// at a time without materializing the whole chunk in memory. + pub(crate) fn append_column_from_read( + &mut self, + read: R, mut close: ColumnCloseResult, ) -> Result<()> { self.assert_previous_writer_closed()?; @@ -707,7 +731,7 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { let src_length = metadata.compressed_size(); let write_offset = self.buf.bytes_written(); - let mut read = reader.get_read(src_offset as _)?.take(src_length as _); + let mut read = read.take(src_length as _); let write_length = std::io::copy(&mut read, &mut self.buf)?; if src_length as u64 != write_length { diff --git a/parquet/tests/arrow_writer.rs b/parquet/tests/arrow_writer.rs index 020b4c6267e0..bd8dac0e6331 100644 --- a/parquet/tests/arrow_writer.rs +++ b/parquet/tests/arrow_writer.rs @@ -17,13 +17,20 @@ //! Tests for [`ArrowWriter`] -use arrow::array::Float64Array; -use arrow::datatypes::{DataType, Field, Schema}; -use arrow::record_batch::RecordBatch; +use std::alloc::{GlobalAlloc, Layout, System}; +use std::cell::Cell; +use std::fs::File; +use std::io::{Read as _, Seek, SeekFrom, Write as _}; +use std::sync::Arc; + +use arrow::array::{ArrayRef, BinaryArray, Float64Array, Int32Array, RecordBatch}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use bytes::Bytes; use parquet::arrow::ArrowWriter; +use parquet::arrow::arrow_writer::{ArrowWriterOptions, PageKey, PageStore, PageStoreFactory}; use parquet::basic::Encoding; +use parquet::errors::Result; use parquet::file::properties::WriterProperties; -use std::sync::Arc; #[test] #[should_panic( @@ -48,3 +55,329 @@ fn test_delta_bit_pack_type() { let mut writer = ArrowWriter::try_new(&mut buffer, record_batch.schema(), Some(props)).unwrap(); let _ = writer.write(&record_batch); } + +// --------------------------------------------------------------------------- +// Heap-memory regression test for the writer's page buffering. +// +// This proves the headline invariant of the pluggable [`PageStore`]: while a +// row group is being written, the heap used to buffer completed pages grows +// with the row group size for the default in-memory store, but stays bounded +// (≈ a few pages per leaf column) once a spilling backend is plugged in. +// +// Peak heap is measured with a thread-local tracking allocator (the same +// pattern used by `parquet/benches/arrow_reader_peak_memory.rs`), so the test +// needs no external profiling dependency. Tracking is thread-local, so the +// measured peak reflects only allocations made on the measuring thread; the +// default `ArrowWriter` is single-threaded, so the writer's buffering all lands +// there. Each measurement resets the peak to the current live baseline and +// reports the delta, so the threads of unrelated tests in this binary do not +// perturb it. +// +// [`PageStore`]: parquet::arrow::arrow_writer::PageStore +// --------------------------------------------------------------------------- + +thread_local! { + static LIVE_BYTES: Cell = const { Cell::new(0) }; + static PEAK_BYTES: Cell = const { Cell::new(0) }; +} + +struct TrackingAllocator { + inner: System, +} + +#[global_allocator] +static GLOBAL: TrackingAllocator = TrackingAllocator { inner: System }; + +fn add_live_bytes(size: usize) { + LIVE_BYTES.with(|live| { + let new = live.get().saturating_add(size); + live.set(new); + PEAK_BYTES.with(|peak| { + if new > peak.get() { + peak.set(new); + } + }); + }); +} + +fn subtract_live_bytes(size: usize) { + LIVE_BYTES.with(|live| { + live.set(live.get().saturating_sub(size)); + }); +} + +#[allow(unsafe_code)] +unsafe impl GlobalAlloc for TrackingAllocator { + unsafe fn alloc(&self, layout: Layout) -> *mut u8 { + let ptr = unsafe { self.inner.alloc(layout) }; + if !ptr.is_null() { + add_live_bytes(layout.size()); + } + ptr + } + + unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { + subtract_live_bytes(layout.size()); + unsafe { self.inner.dealloc(ptr, layout) }; + } + + unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 { + let new_ptr = unsafe { self.inner.realloc(ptr, layout, new_size) }; + if !new_ptr.is_null() { + let old_size = layout.size(); + if new_size > old_size { + add_live_bytes(new_size - old_size); + } else { + subtract_live_bytes(old_size - new_size); + } + } + new_ptr + } +} + +/// Run `f` and return the peak *additional* live heap (bytes) observed on this +/// thread during it — the delta from the live heap when `f` began. +fn peak_heap_bytes(f: impl FnOnce()) -> usize { + let start = LIVE_BYTES.with(Cell::get); + // Reset the peak to the window's baseline so prior allocations don't count. + PEAK_BYTES.with(|peak| peak.set(start)); + f(); + PEAK_BYTES.with(Cell::get).saturating_sub(start) +} + +/// Width of each value in the one "fat" column, in bytes. +const FAT_VALUE_LEN: usize = 4096; +/// Rows per input batch fed to the writer. Kept small so each batch is dropped +/// promptly — only the writer's *buffering* should accumulate, not the input. +const ROWS_PER_BATCH: usize = 64; +/// Number of batches, all funnelled into a single large row group. +const NUM_BATCHES: usize = 64; +/// Total bytes of fat-column payload written (≈ 16 MiB). +const TOTAL_FAT_BYTES: usize = FAT_VALUE_LEN * ROWS_PER_BATCH * NUM_BATCHES; + +/// A wide schema: one fat, high-cardinality binary column (the spill target) +/// plus several tiny integer columns. +fn skewed_schema() -> SchemaRef { + let mut fields = vec![Field::new("fat", DataType::Binary, false)]; + for i in 0..8 { + fields.push(Field::new(format!("small_{i}"), DataType::Int32, false)); + } + Arc::new(Schema::new(fields)) +} + +/// Build one batch of `ROWS_PER_BATCH` rows. The fat column holds unique, +/// high-entropy values (so they neither dictionary-encode nor compress away), +/// derived deterministically from `batch_index`. +fn make_batch(schema: &SchemaRef, batch_index: usize) -> RecordBatch { + let mut fat: Vec = vec![0u8; FAT_VALUE_LEN * ROWS_PER_BATCH]; + // A cheap xorshift fill keyed by the batch index → distinct, incompressible. + let mut state = (batch_index as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15) | 1; + for byte in fat.iter_mut() { + state ^= state << 13; + state ^= state >> 7; + state ^= state << 17; + *byte = (state >> 24) as u8; + } + let offsets: Vec = (0..=ROWS_PER_BATCH) + .map(|i| (i * FAT_VALUE_LEN) as i32) + .collect(); + let fat_array = BinaryArray::try_new( + arrow::buffer::OffsetBuffer::new(offsets.into()), + arrow::buffer::Buffer::from_vec(fat), + None, + ) + .unwrap(); + + let mut columns: Vec = vec![Arc::new(fat_array)]; + for c in 0..8 { + let vals: Vec = (0..ROWS_PER_BATCH) + .map(|r| (batch_index * ROWS_PER_BATCH + r + c) as i32) + .collect(); + columns.push(Arc::new(Int32Array::from(vals))); + } + RecordBatch::try_new(schema.clone(), columns).unwrap() +} + +/// Writer properties forcing the whole dataset into a single, uncompressed row +/// group (so the page buffer is the only thing that grows). +fn single_row_group_props() -> WriterProperties { + WriterProperties::builder() + .set_compression(parquet::basic::Compression::UNCOMPRESSED) + // One row group for everything: never auto-flush on row count. + .set_max_row_group_row_count(Some(ROWS_PER_BATCH * NUM_BATCHES * 2)) + .build() +} + +/// Write the full skewed dataset with the given writer options, feeding small +/// batches (each dropped immediately) into one row group. +/// +/// The output is sent to [`io::sink`] so the produced file bytes never live on +/// the heap — the measured peak then reflects only the writer's internal page +/// *buffering*, which is exactly what a [`PageStore`] governs. +fn write_skewed_dataset(options: ArrowWriterOptions) { + let schema = skewed_schema(); + let mut writer = + ArrowWriter::try_new_with_options(std::io::sink(), schema.clone(), options).unwrap(); + for b in 0..NUM_BATCHES { + let batch = make_batch(&schema, b); + writer.write(&batch).unwrap(); + // `batch` dropped here — only the writer's internal buffering persists. + } + writer.close().unwrap(); +} + +/// A spilling [`PageStore`]: one temp file per column chunk. `put` appends the +/// blob and records its `(offset, len)`; `take` seeks and reads it back. The +/// file is unlinked on creation (via [`tempfile::tempfile`]) so it is cleaned up +/// when the store is dropped. This is the canonical "spill completed pages off +/// the heap" backend the design targets. +struct TempFilePageStore { + file: File, + end: u64, + locs: Vec<(u64, usize)>, +} + +impl TempFilePageStore { + fn new() -> Result { + Ok(Self { + file: tempfile::tempfile()?, + end: 0, + locs: Vec::new(), + }) + } +} + +impl PageStore for TempFilePageStore { + fn put(&mut self, value: Bytes) -> Result { + // Always append at the logical end (a prior `take` may have moved the + // OS file cursor). + self.file.seek(SeekFrom::Start(self.end))?; + self.file.write_all(&value)?; + let key = PageKey::new(self.locs.len() as u64); + self.locs.push((self.end, value.len())); + self.end += value.len() as u64; + Ok(key) + } + + fn take(&mut self, key: PageKey) -> Result { + let (offset, len) = self.locs[key.get() as usize]; + let mut buf = vec![0u8; len]; + self.file.seek(SeekFrom::Start(offset))?; + self.file.read_exact(&mut buf)?; + Ok(Bytes::from(buf)) + } +} + +#[derive(Debug, Default)] +struct TempFilePageStoreFactory; + +impl PageStoreFactory for TempFilePageStoreFactory { + fn create(&self, _column_index: usize) -> Result> { + Ok(Box::new(TempFilePageStore::new()?)) + } +} + +/// Rows per batch / batches for the dictionary-column scenario (~4.2M rows). +const DICT_ROWS_PER_BATCH: usize = 8192; +const DICT_NUM_BATCHES: usize = 512; + +/// Write a single, low-cardinality (16 distinct values), high-row-count column +/// as one row group. Such a column stays dictionary-encoded, so its completed +/// data pages would historically pile up in `GenericColumnWriter` until close — +/// the second accumulation point that plain page-buffer spilling does not reach. +fn write_dict_dataset(options: ArrowWriterOptions) { + let schema = Arc::new(Schema::new(vec![Field::new("k", DataType::Int32, false)])); + let props = WriterProperties::builder() + .set_compression(parquet::basic::Compression::UNCOMPRESSED) + .set_max_row_group_row_count(Some(DICT_ROWS_PER_BATCH * DICT_NUM_BATCHES * 2)) + .build(); + let options = options.with_properties(props); + let mut writer = + ArrowWriter::try_new_with_options(std::io::sink(), schema.clone(), options).unwrap(); + for b in 0..DICT_NUM_BATCHES { + let vals: Vec = (0..DICT_ROWS_PER_BATCH) + .map(|r| ((b + r) % 16) as i32) + .collect(); + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(vals))]).unwrap(); + writer.write(&batch).unwrap(); + } + writer.close().unwrap(); +} + +/// All measurements run in one function so they execute sequentially on a single +/// thread — the tracking allocator is thread-local, so running them as separate +/// parallel tests would each see only their own thread's allocations (which is +/// fine), but keeping them together also keeps the in-memory/spill comparison on +/// one consistent baseline. +#[test] +fn page_store_bounds_write_memory() { + let props = single_row_group_props(); + + // Baseline: the default in-memory store buffers the whole row group, so peak + // heap is at least the size of the buffered column data. + let in_memory_peak = peak_heap_bytes(|| { + let opts = ArrowWriterOptions::new().with_properties(props.clone()); + write_skewed_dataset(opts); + }); + + // Spilling: the temp-file store keeps completed pages off the heap, so peak + // heap stays bounded by the in-flight encoder/dictionary buffers plus a page + // or two in flight — independent of the row group size. + let spill_peak = peak_heap_bytes(|| { + let opts = ArrowWriterOptions::new() + .with_properties(props.clone()) + .with_page_store_factory(Arc::new(TempFilePageStoreFactory)); + write_skewed_dataset(opts); + }); + + eprintln!( + "peak heap — in-memory: {:.1} MiB, temp-file spill: {:.1} MiB (total fat payload {:.1} MiB)", + in_memory_peak as f64 / (1024.0 * 1024.0), + spill_peak as f64 / (1024.0 * 1024.0), + TOTAL_FAT_BYTES as f64 / (1024.0 * 1024.0), + ); + + // The in-memory store must hold most of the ~16 MiB of buffered data. + let in_memory_floor = TOTAL_FAT_BYTES * 3 / 4; + assert!( + in_memory_peak >= in_memory_floor, + "expected in-memory peak >= {in_memory_floor} bytes, got {in_memory_peak}" + ); + + // The spilling store must stay near the per-column bound — roughly + // (data_page_size + dict_page_size) per leaf column, ~2 MiB × 9 columns — + // and far below the in-memory baseline. We assert a generous 8 MiB ceiling + // (well under the ~16 MiB row group) to stay robust across platforms. + const SPILL_CEILING: usize = 8 * 1024 * 1024; + assert!( + spill_peak < SPILL_CEILING, + "expected spilling peak < {SPILL_CEILING} bytes (bounded by page/dict size × columns), \ + got {spill_peak}" + ); + assert!( + spill_peak * 2 < in_memory_peak, + "expected spilling peak ({spill_peak}) to be far below the in-memory baseline \ + ({in_memory_peak})" + ); + + // Dictionary-encoded column: completed data pages reach the page writer (and + // thus the store) as they are produced, so spilling bounds them too. + let dict_in_memory = peak_heap_bytes(|| write_dict_dataset(ArrowWriterOptions::new())); + let dict_spill = peak_heap_bytes(|| { + write_dict_dataset( + ArrowWriterOptions::new().with_page_store_factory(Arc::new(TempFilePageStoreFactory)), + ) + }); + eprintln!( + "dict column ({} rows) peak heap — in-memory: {:.2} MiB, temp-file spill: {:.2} MiB", + DICT_ROWS_PER_BATCH * DICT_NUM_BATCHES, + dict_in_memory as f64 / (1024.0 * 1024.0), + dict_spill as f64 / (1024.0 * 1024.0), + ); + assert!( + dict_spill * 2 < dict_in_memory, + "expected dict-column spilling peak ({dict_spill}) to be far below the in-memory \ + baseline ({dict_in_memory}) — dictionary data pages should spill, not accumulate" + ); +}