diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 21650fe26eac..1e6ea4192c3f 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -582,22 +582,20 @@ impl ArrowWriterOptions { /// Sets the [`PageStoreFactory`] used to buffer completed pages while a row /// group is being written. /// - /// By default (an [`InMemoryPageStore`] per column chunk) completed pages - /// are buffered on the heap until the row group is flushed, so peak memory - /// grows with the row group size. Supplying a factory that spills to a temp - /// file or object storage instead bounds peak write memory, decoupling it - /// from the row group size while keeping large, read-optimal row groups. + /// The default implementation ([`InMemoryPageStore`]) buffers all completed + /// pages on the heap until the row group is flushed, so peak write memory + /// grows with the row group size. Using this API, pages can be spilled to a + /// file or object storage instead, reducing peak write memory substantially + /// at the expense of an extra write to and read from secondary storage. /// - /// # Example: a custom [`PageStore`] + /// # Example: spilling pages to a temp file /// - /// A store only has to map an opaque, store-allocated [`PageKey`] to a blob - /// and hand the blob back once. The keys need not be dense or sequential — - /// here a `HashMap`-backed store mints sparse handles, proving the writer - /// relies only on the opaque-handle contract. A real spilling backend would - /// write the bytes to a temp file in `put` and read them back in `take`. + /// A simple spilling backend uses one temp file per column chunk; `put` + /// appends the page and `take` reads it back. /// /// ``` - /// # use std::collections::HashMap; + /// # use std::fs::File; + /// # use std::io::{Read, Seek, SeekFrom, Write}; /// # use std::sync::Arc; /// # use bytes::Bytes; /// # use arrow_array::{ArrayRef, Int64Array, RecordBatch}; @@ -605,54 +603,64 @@ impl ArrowWriterOptions { /// # ArrowWriter, ArrowWriterOptions, PageKey, PageStore, PageStoreArgs, PageStoreFactory, /// # }; /// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader; - /// # use parquet::errors::{ParquetError, Result}; - /// #[derive(Default)] - /// struct MapPageStore { - /// blobs: HashMap, - /// next: u64, + /// # use parquet::errors::Result; + /// struct TempFilePageStore { + /// file: File, + /// /// Total size of the file + /// end: u64, + /// /// Location of pages: (offset, len) + /// locs: Vec<(u64, usize)>, /// } /// - /// impl PageStore for MapPageStore { + /// impl PageStore for TempFilePageStore { /// fn put(&mut self, value: Bytes) -> Result { - /// // Mint a sparse handle (every other integer) to show the writer - /// // never assumes anything about the key's value. - /// let key = PageKey::new(self.next); - /// self.next += 2; - /// self.blobs.insert(key.get(), value); + /// // Append to the end of the file + /// self.file.seek(SeekFrom::Start(self.end))?; + /// self.file.write_all(&value)?; + /// let key = PageKey::new(self.locs.len() as u64); + /// self.locs.push((self.end, value.len())); + /// self.end += value.len() as u64; /// Ok(key) /// } /// /// fn take(&mut self, key: PageKey) -> Result { - /// self.blobs - /// .remove(&key.get()) - /// .ok_or_else(|| ParquetError::General(format!("invalid key {}", key.get()))) + /// let (offset, len) = self.locs[key.get() as usize]; + /// let mut buf = vec![0u8; len]; + /// self.file.seek(SeekFrom::Start(offset))?; + /// self.file.read_exact(&mut buf)?; + /// Ok(Bytes::from(buf)) /// } /// } /// + /// /// Factory for creating [`TempFilePageStore`] /// #[derive(Debug)] - /// struct MapPageStoreFactory; + /// struct TempFilePageStoreFactory; /// - /// impl PageStoreFactory for MapPageStoreFactory { + /// impl PageStoreFactory for TempFilePageStoreFactory { /// fn create(&self, args: &PageStoreArgs<'_>) -> Result> { /// // `args` exposes the column index and descriptor (physical/logical - /// // type, path), so a real backend could spill only large columns. + /// // type, path), so a real backend might choose to spill only large columns. /// let _ = (args.column_index(), args.column_descriptor()); - /// Ok(Box::new(MapPageStore::default())) + /// Ok(Box::new(TempFilePageStore { + /// file: tempfile::tempfile()?, // temp file is cleaned on drop + /// end: 0, + /// locs: Vec::new(), + /// })) /// } /// } - /// + /// // write 1000 integers /// let col = Arc::new(Int64Array::from_iter_values(0..1000)) as ArrayRef; /// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); /// /// let options = - /// ArrowWriterOptions::new().with_page_store_factory(Arc::new(MapPageStoreFactory)); + /// ArrowWriterOptions::new().with_page_store_factory(Arc::new(TempFilePageStoreFactory)); /// let mut buffer = Vec::new(); /// let mut writer = /// ArrowWriter::try_new_with_options(&mut buffer, to_write.schema(), options).unwrap(); /// writer.write(&to_write).unwrap(); /// writer.close().unwrap(); /// - /// // The file is byte-identical to one written with the default store. + /// // buffer now holds valid Parquet data, which can be read as normal: /// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap(); /// assert_eq!(to_write, reader.next().unwrap().unwrap()); /// ``` diff --git a/parquet/src/column/page_store.rs b/parquet/src/column/page_store.rs index 4f821c0e5cad..14bb2b9e8bdc 100644 --- a/parquet/src/column/page_store.rs +++ b/parquet/src/column/page_store.rs @@ -70,9 +70,12 @@ impl PageKey { /// thread at a time (both methods take `&mut self`), so it needs no internal /// synchronization — hence only `Send`, not `Sync`. /// -/// The default ([`InMemoryPageStore`]) keeps blobs on the heap. Configure a -/// different backend via -/// [`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory). +/// The default ([`InMemoryPageStore`]) keeps blobs in memory on the heap. +/// +/// For an example of configuring the Parquet writer to use an alternate +/// `PageStore` see the [`ArrowWriterOptions::with_page_store_factory`] API. +/// +/// [`ArrowWriterOptions::with_page_store_factory`]: crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory pub trait PageStore: Send { /// Store `value`, returning a handle that can later be passed to /// [`take`](Self::take).