Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 41 additions & 33 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,77 +582,85 @@ impl ArrowWriterOptions {
/// Sets the [`PageStoreFactory`] used to buffer completed pages while a row
/// group is being written.
///
/// By default (an [`InMemoryPageStore`] per column chunk) completed pages
/// are buffered on the heap until the row group is flushed, so peak memory
/// grows with the row group size. Supplying a factory that spills to a temp
/// file or object storage instead bounds peak write memory, decoupling it
/// from the row group size while keeping large, read-optimal row groups.
/// The default implementation ([`InMemoryPageStore`]) buffers all completed
/// pages on the heap until the row group is flushed, so peak write memory
/// grows with the row group size. Using this API, pages can be spilled to a
/// file or object storage instead, reducing peak write memory substantially
/// at the expense of an extra write to and read from secondary storage.
///
/// # Example: a custom [`PageStore`]
/// # Example: spilling pages to a temp file
///
/// A store only has to map an opaque, store-allocated [`PageKey`] to a blob
/// and hand the blob back once. The keys need not be dense or sequential —
/// here a `HashMap`-backed store mints sparse handles, proving the writer
/// relies only on the opaque-handle contract. A real spilling backend would
/// write the bytes to a temp file in `put` and read them back in `take`.
/// A simple spilling backend uses one temp file per column chunk; `put`
/// appends the page and `take` reads it back.
///
/// ```
/// # use std::collections::HashMap;
/// # use std::fs::File;
/// # use std::io::{Read, Seek, SeekFrom, Write};
/// # use std::sync::Arc;
/// # use bytes::Bytes;
/// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
/// # use parquet::arrow::arrow_writer::{
/// # ArrowWriter, ArrowWriterOptions, PageKey, PageStore, PageStoreArgs, PageStoreFactory,
/// # };
/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
/// # use parquet::errors::{ParquetError, Result};
/// #[derive(Default)]
/// struct MapPageStore {
/// blobs: HashMap<u64, Bytes>,
/// next: u64,
/// # use parquet::errors::Result;
/// struct TempFilePageStore {
/// file: File,
/// /// Total size of the file
/// end: u64,
/// /// Location of pages: (offset, len)
/// locs: Vec<(u64, usize)>,
/// }
///
/// impl PageStore for MapPageStore {
/// impl PageStore for TempFilePageStore {
/// fn put(&mut self, value: Bytes) -> Result<PageKey> {
/// // Mint a sparse handle (every other integer) to show the writer
/// // never assumes anything about the key's value.
/// let key = PageKey::new(self.next);
/// self.next += 2;
/// self.blobs.insert(key.get(), value);
/// // Append to the end of the file
/// self.file.seek(SeekFrom::Start(self.end))?;
/// self.file.write_all(&value)?;
/// let key = PageKey::new(self.locs.len() as u64);
/// self.locs.push((self.end, value.len()));
/// self.end += value.len() as u64;
/// Ok(key)
/// }
///
/// fn take(&mut self, key: PageKey) -> Result<Bytes> {
/// self.blobs
/// .remove(&key.get())
/// .ok_or_else(|| ParquetError::General(format!("invalid key {}", key.get())))
/// let (offset, len) = self.locs[key.get() as usize];
/// let mut buf = vec![0u8; len];
/// self.file.seek(SeekFrom::Start(offset))?;
/// self.file.read_exact(&mut buf)?;
/// Ok(Bytes::from(buf))
/// }
/// }
///
/// /// Factory for creating [`TempFilePageStore`]
/// #[derive(Debug)]
/// struct MapPageStoreFactory;
/// struct TempFilePageStoreFactory;
///
/// impl PageStoreFactory for MapPageStoreFactory {
/// impl PageStoreFactory for TempFilePageStoreFactory {
/// fn create(&self, args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
/// // `args` exposes the column index and descriptor (physical/logical
/// // type, path), so a real backend could spill only large columns.
/// // type, path), so a real backend might choose to spill only large columns.
/// let _ = (args.column_index(), args.column_descriptor());
/// Ok(Box::new(MapPageStore::default()))
/// Ok(Box::new(TempFilePageStore {
/// file: tempfile::tempfile()?, // temp file is cleaned on drop
/// end: 0,
/// locs: Vec::new(),
/// }))
/// }
/// }
///
/// // write 1000 integers
/// let col = Arc::new(Int64Array::from_iter_values(0..1000)) as ArrayRef;
/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
///
/// let options =
/// ArrowWriterOptions::new().with_page_store_factory(Arc::new(MapPageStoreFactory));
/// ArrowWriterOptions::new().with_page_store_factory(Arc::new(TempFilePageStoreFactory));
/// let mut buffer = Vec::new();
/// let mut writer =
/// ArrowWriter::try_new_with_options(&mut buffer, to_write.schema(), options).unwrap();
/// writer.write(&to_write).unwrap();
/// writer.close().unwrap();
///
/// // The file is byte-identical to one written with the default store.
/// // buffer now holds valid Parquet data, which can be read as normal:
/// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
/// assert_eq!(to_write, reader.next().unwrap().unwrap());
/// ```
Expand Down
9 changes: 6 additions & 3 deletions parquet/src/column/page_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,12 @@ impl PageKey {
/// thread at a time (both methods take `&mut self`), so it needs no internal
/// synchronization — hence only `Send`, not `Sync`.
///
/// The default ([`InMemoryPageStore`]) keeps blobs on the heap. Configure a
/// different backend via
/// [`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory).
/// The default ([`InMemoryPageStore`]) keeps blobs in memory on the heap.
///
/// For an example of configuring the Parquet writer to use an alternate
/// `PageStore` see the [`ArrowWriterOptions::with_page_store_factory`] API.
///
/// [`ArrowWriterOptions::with_page_store_factory`]: crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory
pub trait PageStore: Send {
/// Store `value`, returning a handle that can later be passed to
/// [`take`](Self::take).
Expand Down
Loading