From b2c0c08a3126b888711f7af6b6fe9d5cf864c739 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 17 Jun 2026 13:50:04 -0700 Subject: [PATCH] feat: add SpillStore for generic RAII scratch space Adds a `SpillStore` trait on `Session` providing uniform, reclaimable scratch space for intermediate state that overflows memory (e.g. index build posting lists, shuffle runs, BTree pages). - `SpillStore` / `SpillFile` (lance-io): `create_spill_file()` vends a RAII handle; `writer()` / `reader()` hand back `Box` / `Box` so callers feed spill files directly into `FileWriter::try_new` and a v2 `FileReader` without leaking an `ObjectStore` + path. The file is deleted on drop and its bytes are released back to the store's usage counter. - `LocalSpillStore`: writes to an OS temp directory; optionally enforces a byte cap. Enforcement lives entirely in the spill store: the spill file decorates the writer with a quota-enforcing `QuotaWriter` (reserve-on-write, release-on-drop-by-stat) rather than threading a field through `ObjectStore` and every provider, so it works for any backend the store opens. - `From` recovers a wrapped lance `Error`, so typed errors such as `DiskCapExceeded` survive the `AsyncWrite` boundary. - `ScanScheduler::open_reader` builds a `FileScheduler` over an already-open `Reader` (no path/size lookup). - `Session` gains a `spill_store` field (defaults to uncapped `LocalSpillStore`), a `with_spill_store()` builder, and a `spill_store()` accessor so callers and tests can inject alternatives. Mechanism only; consumer migration (IVF shuffler) is a follow-up. Closes #7300 Co-Authored-By: Claude Opus 4.8 (1M context) --- rust/lance-core/src/error.rs | 57 ++++ rust/lance-io/src/lib.rs | 1 + rust/lance-io/src/scheduler.rs | 47 +++ rust/lance-io/src/spill.rs | 512 +++++++++++++++++++++++++++++++++ rust/lance/src/session.rs | 63 ++++ 5 files changed, 680 insertions(+) create mode 100644 rust/lance-io/src/spill.rs diff --git a/rust/lance-core/src/error.rs b/rust/lance-core/src/error.rs index 3dcde1fc5b2..516dc695a1d 100644 --- a/rust/lance-core/src/error.rs +++ b/rust/lance-core/src/error.rs @@ -238,6 +238,16 @@ pub enum Error { /// A requested field was not found in a schema. #[snafu(transparent)] FieldNotFound { source: FieldNotFoundError }, + + #[snafu(display( + "Spill disk cap of {cap_bytes} bytes exceeded; currently using {used_bytes} bytes, {location}" + ))] + DiskCapExceeded { + cap_bytes: u64, + used_bytes: u64, + #[snafu(implicit)] + location: Location, + }, } impl Error { @@ -431,6 +441,15 @@ impl Error { IncompatibleTransactionSnafu.into_error(source) } + #[track_caller] + pub fn disk_cap_exceeded(cap_bytes: u64, used_bytes: u64) -> Self { + DiskCapExceededSnafu { + cap_bytes, + used_bytes, + } + .build() + } + /// Create an External error from a boxed error source. pub fn external(source: BoxedError) -> Self { Self::External { source } @@ -512,6 +531,17 @@ impl From<&ArrowError> for Error { impl From for Error { #[track_caller] fn from(e: std::io::Error) -> Self { + // A lance `Error` may have been wrapped in an `io::Error` (e.g. via + // `io::Error::other(Error::...)`) to cross an `AsyncWrite`/`AsyncRead` + // boundary. Recover it so typed errors such as `DiskCapExceeded` + // survive the round-trip instead of collapsing into an opaque `IO`. + if e.get_ref().is_some_and(|inner| inner.is::()) { + return *e + .into_inner() + .expect("checked Some above") + .downcast::() + .expect("checked type above"); + } Self::io_source(box_error(e)) } } @@ -718,6 +748,33 @@ mod test { impl std::error::Error for MyCustomError {} + #[test] + fn test_io_error_recovers_wrapped_lance_error() { + // A lance Error wrapped in io::Error::other should round-trip back to + // the original variant rather than collapsing into Error::IO. + let io_err = std::io::Error::other(Error::disk_cap_exceeded(100, 50)); + let recovered: Error = io_err.into(); + match recovered { + Error::DiskCapExceeded { + cap_bytes, + used_bytes, + .. + } => { + assert_eq!(cap_bytes, 100); + assert_eq!(used_bytes, 50); + } + other => panic!("expected DiskCapExceeded, got {other:?}"), + } + } + + #[test] + fn test_io_error_without_lance_error_stays_io() { + // A plain io::Error (no wrapped lance Error) should become Error::IO. + let io_err = std::io::Error::new(std::io::ErrorKind::NotFound, "missing"); + let converted: Error = io_err.into(); + assert!(matches!(converted, Error::IO { .. })); + } + #[test] fn test_external_error_creation() { let custom_err = MyCustomError { diff --git a/rust/lance-io/src/lib.rs b/rust/lance-io/src/lib.rs index c327a91c1ba..2ef686fd551 100644 --- a/rust/lance-io/src/lib.rs +++ b/rust/lance-io/src/lib.rs @@ -18,6 +18,7 @@ pub mod object_reader; pub mod object_store; pub mod object_writer; pub mod scheduler; +pub mod spill; pub mod stream; #[cfg(test)] pub mod testing; diff --git a/rust/lance-io/src/scheduler.rs b/rust/lance-io/src/scheduler.rs index efe4b9b0c24..29b4529cc62 100644 --- a/rust/lance-io/src/scheduler.rs +++ b/rust/lance-io/src/scheduler.rs @@ -730,6 +730,28 @@ impl ScanScheduler { self.open_file_with_priority(path, 0, file_size_bytes).await } + /// Open a [`FileScheduler`] over an already-open [`Reader`]. + /// + /// Unlike [`Self::open_file`], this skips the path lookup and size probe and + /// schedules I/O against `reader` directly. This is useful when the reader + /// was produced outside the scheduler's object store (e.g. a spill file + /// opened via [`crate::spill::SpillFile::reader`]), since a bare `Reader` + /// cannot otherwise drive a v2 `FileReader` (which needs a scheduler). + /// + /// Uses a base priority of 0; chain [`FileScheduler::with_priority`] to set + /// a different one. + pub fn open_reader(self: &Arc, reader: Arc) -> FileScheduler { + FileScheduler { + reader, + block_size: self.object_store.block_size() as u64, + root: self.clone(), + base_priority: 0, + max_iop_size: self.object_store.max_iop_size(), + bypass_backpressure: false, + extra_stats: None, + } + } + fn do_submit_request( &self, reader: Arc, @@ -1193,6 +1215,31 @@ mod tests { } } + #[tokio::test] + async fn test_open_reader_bridge() { + let tmp_file = TempObjFile::default(); + + let obj_store = Arc::new(ObjectStore::local()); + + const DATA_SIZE: u64 = 64 * 1024; + let mut some_data = vec![0; DATA_SIZE as usize]; + rand::rng().fill_bytes(&mut some_data); + obj_store.put(&tmp_file, &some_data).await.unwrap(); + + let config = SchedulerConfig::default_for_testing(); + let scheduler = ScanScheduler::new(obj_store.clone(), config); + + // Open a bare Reader ourselves, then bridge it into a FileScheduler. + let reader: Arc = obj_store.open(&tmp_file).await.unwrap().into(); + let file_scheduler = scheduler.open_reader(reader); + + let bytes = file_scheduler + .submit_request(vec![0..DATA_SIZE], 0) + .await + .unwrap(); + assert_eq!(bytes[0], some_data); + } + #[tokio::test] async fn test_split_coalesce() { let tmp_file = TempObjFile::default(); diff --git a/rust/lance-io/src/spill.rs b/rust/lance-io/src/spill.rs new file mode 100644 index 00000000000..996e66cbf92 --- /dev/null +++ b/rust/lance-io/src/spill.rs @@ -0,0 +1,512 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Reclaimable scratch storage. +//! +//! A [`SpillStore`] hands out [`SpillFile`] handles for temporary state that is +//! too large to keep in memory and is read back later in the same process (for +//! example, posting lists or shuffle runs accumulated while building an index). +//! The backing storage is reclaimed automatically when a handle is dropped. +//! +//! Key properties: +//! - **`writer()`/`reader()` interface.** A [`SpillFile`] vends a [`Writer`] +//! and a [`Reader`] rather than exposing an [`ObjectStore`] and a path. The +//! writer feeds `FileWriter::try_new` directly; the reader feeds a v2 +//! `FileReader` via [`crate::scheduler::ScanScheduler::open_reader`]. +//! - **Per-file RAII.** One [`SpillFile`] is one file; dropping it deletes the +//! file and releases its bytes back to the store's disk budget, so a caller +//! can hold N files and reclaim them individually. +//! - **Disk cap enforcement.** [`LocalSpillStore::with_cap`] enforces a byte +//! budget shared across all handles, returning a typed +//! [`lance_core::Error::DiskCapExceeded`] rather than silently filling the +//! disk. +//! +//! # Usage contract +//! +//! Hold the [`SpillFile`] alive for the file's lifetime: the [`Writer`] and +//! [`Reader`] reference the backing file, which the handle deletes on drop. The +//! store's temp directory is the backstop for anything leaked when it drops. +//! +//! Accounting is reserve-on-write + release-on-drop (by stat), which is exact +//! for the write-once contract this enforces: [`SpillFile::writer`] may be +//! called only once, so the bytes reserved while writing match the file size +//! released on drop. Two minor inexactnesses are not engineered around: a write +//! aborted at the cap leaks its reservation until the store is dropped, and a +//! file whose size cannot be stat-ed on drop is not released. + +use std::io; +use std::path::PathBuf; +use std::pin::Pin; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::task::{Context, Poll}; + +use async_trait::async_trait; +use object_store::path::Path; +use tokio::io::AsyncWrite; + +use lance_core::{Error, Result}; + +use crate::object_store::ObjectStore; +use crate::object_writer::WriteResult; +use crate::traits::{Reader, Writer}; + +/// A handle to a single unit of reclaimable scratch storage. +/// +/// Data is written through [`SpillFile::writer`] and read back through +/// [`SpillFile::reader`]. The backing storage is released when the handle is +/// dropped. +/// +/// The trait is object-safe so it can be returned as `Box` from +/// [`SpillStore::create_spill_file`], allowing implementations not backed by a +/// local file (e.g. in-memory buffers, remote object stores). +#[async_trait] +pub trait SpillFile: Send + Sync { + /// Open a writer over this spill file. + /// + /// For a capped store, writes that would exceed the cap fail with + /// [`lance_core::Error::DiskCapExceeded`]. Spill files are write-once: + /// implementations may reject a second call (the [`LocalSpillStore`] impl + /// returns [`lance_core::Error::invalid_input`]). + async fn writer(&self) -> Result>; + + /// Open a reader over this spill file. + /// + /// The data must have been fully written (the writer shut down) first. + async fn reader(&self) -> Result>; +} + +/// A factory for [`SpillFile`] handles. +/// +/// The trait is object-safe and `Send + Sync` so it can be held behind an +/// `Arc` (e.g. inside a `Session`). +pub trait SpillStore: Send + Sync + 'static { + /// Allocate a new scratch handle. + /// + /// The backing storage is reclaimed when the returned [`SpillFile`] is + /// dropped. + fn create_spill_file(&self) -> Result>; +} + +/// A shared, cloneable byte budget. +/// +/// Cloning produces another handle to the *same* underlying counter, so a +/// quota shared across many writers enforces a single combined cap. +#[derive(Debug, Clone)] +struct DiskQuota { + cap_bytes: u64, + used: Arc, +} + +impl DiskQuota { + fn new(cap_bytes: u64) -> Self { + Self { + cap_bytes, + used: Arc::new(AtomicU64::new(0)), + } + } + + /// Try to reserve `n` bytes, failing with [`Error::DiskCapExceeded`] if the + /// reservation would push total usage past the cap. + fn try_reserve(&self, n: u64) -> Result<()> { + // `Relaxed` is sufficient throughout: the counter guards no other + // memory, so we rely only on the atomicity of the read-modify-write, + // not on establishing a happens-before relationship with other data. + // Coherence guarantees the CAS loop always re-reads the latest value. + loop { + let current = self.used.load(Ordering::Relaxed); + let next = current.saturating_add(n); + if next > self.cap_bytes { + return Err(Error::disk_cap_exceeded(self.cap_bytes, current)); + } + if self + .used + .compare_exchange(current, next, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + return Ok(()); + } + // Another thread won the CAS — retry. + } + } + + /// Release `n` previously reserved bytes back to the budget. + fn release(&self, n: u64) { + // Saturating sub guards against any double-free bug becoming a panic. + // `Relaxed` for the same reason as `try_reserve`: no other state is + // published through this counter. + self.used + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| { + Some(current.saturating_sub(n)) + }) + .ok(); + } +} + +/// A [`Writer`] decorator that reserves a [`DiskQuota`] as bytes are written. +/// +/// Wrapping the writer keeps cap enforcement inside the spill store rather than +/// pushing it into [`ObjectStore`], and works for any backend the store opens. +struct QuotaWriter { + inner: Box, + quota: DiskQuota, +} + +impl AsyncWrite for QuotaWriter { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let this = self.get_mut(); + // Reserve up-front for the bytes we intend to write, then release the + // remainder the inner writer did not accept so the reservation tracks + // bytes actually buffered (and, for a write-once file, the file size). + if let Err(e) = this.quota.try_reserve(buf.len() as u64) { + return Poll::Ready(Err(io::Error::other(e))); + } + let poll = Pin::new(this.inner.as_mut()).poll_write(cx, buf); + match &poll { + Poll::Ready(Ok(n)) => this.quota.release((buf.len() - *n) as u64), + _ => this.quota.release(buf.len() as u64), + } + poll + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(self.get_mut().inner.as_mut()).poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(self.get_mut().inner.as_mut()).poll_shutdown(cx) + } +} + +#[async_trait] +impl Writer for QuotaWriter { + async fn tell(&mut self) -> Result { + self.inner.tell().await + } + + async fn shutdown(&mut self) -> Result { + self.inner.shutdown().await + } +} + +/// A [`SpillStore`] that writes temporary files to a local temp directory. +/// +/// By default there is no disk cap. Use [`LocalSpillStore::with_cap`] to +/// configure one shared across every handle this store produces. +/// +/// The temp directory is deleted when the store is dropped, cleaning up any +/// files whose handles have already been dropped. +pub struct LocalSpillStore { + store: Arc, + /// Backstop cleanup: removes the whole scratch directory on drop. + temp_dir: Arc, + file_counter: Arc, + /// Byte budget shared across every handle, enforced while writing. + quota: Option, +} + +impl LocalSpillStore { + /// Create a store with no disk cap. + pub fn new() -> Result { + Ok(Self { + store: Arc::new(ObjectStore::local()), + temp_dir: Arc::new(tempfile::tempdir()?), + file_counter: Arc::new(AtomicU64::new(0)), + quota: None, + }) + } + + /// Create a store that returns [`lance_core::Error::DiskCapExceeded`] once + /// total bytes written across all live handles would exceed `cap_bytes`. + pub fn with_cap(cap_bytes: u64) -> Result { + Ok(Self { + store: Arc::new(ObjectStore::local()), + temp_dir: Arc::new(tempfile::tempdir()?), + file_counter: Arc::new(AtomicU64::new(0)), + quota: Some(DiskQuota::new(cap_bytes)), + }) + } +} + +impl Default for LocalSpillStore { + fn default() -> Self { + Self::new().expect("failed to create temp directory for LocalSpillStore") + } +} + +impl SpillStore for LocalSpillStore { + fn create_spill_file(&self) -> Result> { + let idx = self.file_counter.fetch_add(1, Ordering::Relaxed); + let fs_path = self.temp_dir.path().join(format!("spill_{idx:06}.bin")); + let os_path = Path::from_absolute_path(&fs_path)?; + Ok(Box::new(LocalSpillFile { + store: self.store.clone(), + os_path, + fs_path, + quota: self.quota.clone(), + writer_taken: AtomicBool::new(false), + _temp_dir: self.temp_dir.clone(), + })) + } +} + +/// A [`SpillFile`] backed by a single file in the store's temp directory. +struct LocalSpillFile { + store: Arc, + os_path: Path, + fs_path: PathBuf, + quota: Option, + /// Set once a writer has been vended, to reject the write-once violation of + /// reopening a writer (which would truncate the file and leak the first + /// write's reservation against the budget). + writer_taken: AtomicBool, + /// Keep the store's temp directory alive for at least this file's lifetime. + _temp_dir: Arc, +} + +#[async_trait] +impl SpillFile for LocalSpillFile { + async fn writer(&self) -> Result> { + if self.writer_taken.swap(true, Ordering::Relaxed) { + return Err(Error::invalid_input( + "spill files are write-once; this file already has a writer", + )); + } + let writer = self.store.create(&self.os_path).await?; + match &self.quota { + Some(quota) => Ok(Box::new(QuotaWriter { + inner: writer, + quota: quota.clone(), + })), + None => Ok(writer), + } + } + + async fn reader(&self) -> Result> { + self.store.open(&self.os_path).await + } +} + +impl Drop for LocalSpillFile { + fn drop(&mut self) { + // Release the bytes this file occupied back to the budget. We stat the + // persisted file rather than tracking writes, which is exact for the + // write-once contract. + if let Some(quota) = &self.quota + && let Ok(metadata) = std::fs::metadata(&self.fs_path) + { + quota.release(metadata.len()); + } + // Best-effort removal; the temp dir is the backstop. + let _ = std::fs::remove_file(&self.fs_path); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::io::AsyncWriteExt; + + async fn write_spill(spill: &dyn SpillFile, data: &[u8]) -> Result<()> { + let mut writer = spill.writer().await?; + writer.write_all(data).await?; + Writer::shutdown(writer.as_mut()).await?; + Ok(()) + } + + #[test] + fn test_disk_quota_reserve_release() { + let quota = DiskQuota::new(100); + quota.try_reserve(60).unwrap(); + assert!(quota.try_reserve(60).is_err()); + quota.release(60); + quota.try_reserve(60).unwrap(); + } + + #[tokio::test] + async fn test_write_then_read() { + let store = LocalSpillStore::new().unwrap(); + let spill = store.create_spill_file().unwrap(); + + let data = b"hello spill world"; + write_spill(spill.as_ref(), data).await.unwrap(); + + let reader = spill.reader().await.unwrap(); + let read_back = reader.get_all().await.unwrap(); + assert_eq!(read_back.as_ref(), data); + } + + #[tokio::test] + async fn test_raii_cleanup() { + let store = LocalSpillStore::new().unwrap(); + let spill = store.create_spill_file().unwrap(); + write_spill(spill.as_ref(), b"some bytes").await.unwrap(); + + // The first file gets a deterministic name under the store's temp dir. + let path = store.temp_dir.path().join("spill_000000.bin"); + assert!(path.exists()); + drop(spill); + assert!(!path.exists(), "spill file should be deleted on drop"); + } + + #[tokio::test] + async fn test_cap_exceeded() { + let store = LocalSpillStore::with_cap(100).unwrap(); + let spill = store.create_spill_file().unwrap(); + let err = write_spill(spill.as_ref(), &[0u8; 101]).await.unwrap_err(); + assert!( + matches!(err, Error::DiskCapExceeded { cap_bytes: 100, .. }), + "expected DiskCapExceeded, got {err:?}" + ); + } + + #[tokio::test] + async fn test_cap_shared_across_files() { + let store = LocalSpillStore::with_cap(100).unwrap(); + let a = store.create_spill_file().unwrap(); + let b = store.create_spill_file().unwrap(); + + write_spill(a.as_ref(), &[0u8; 60]).await.unwrap(); + // 60 already reserved by `a`; writing 60 more would reach 120 > 100. + let err = write_spill(b.as_ref(), &[0u8; 60]).await.unwrap_err(); + assert!( + matches!(err, Error::DiskCapExceeded { cap_bytes: 100, .. }), + "expected DiskCapExceeded, got {err:?}" + ); + } + + #[tokio::test] + async fn test_cap_freed_on_drop() { + let store = LocalSpillStore::with_cap(100).unwrap(); + + { + let a = store.create_spill_file().unwrap(); + write_spill(a.as_ref(), &[0u8; 80]).await.unwrap(); + // `a` drops here, releasing its 80 bytes. + } + + let b = store.create_spill_file().unwrap(); + // Succeeds because the cap is no longer under pressure. + write_spill(b.as_ref(), &[0u8; 80]).await.unwrap(); + } + + #[tokio::test] + async fn test_custom_implementation() { + // A custom store can satisfy the traits without a local file. + struct MemStore; + struct MemFile; + + #[async_trait] + impl SpillFile for MemFile { + async fn writer(&self) -> Result> { + ObjectStore::memory().create(&Path::from("/mem")).await + } + async fn reader(&self) -> Result> { + ObjectStore::memory().open(&Path::from("/mem")).await + } + } + + impl SpillStore for MemStore { + fn create_spill_file(&self) -> Result> { + Ok(Box::new(MemFile)) + } + } + + let store = MemStore; + let spill = store.create_spill_file().unwrap(); + // Exercise the factory + trait object; the in-memory store is a fresh + // instance per call so we don't round-trip data here. + let _ = spill.writer().await.unwrap(); + } + + #[tokio::test] + async fn test_writer_is_write_once() { + let store = LocalSpillStore::new().unwrap(); + let spill = store.create_spill_file().unwrap(); + + let _writer = spill.writer().await.unwrap(); + let Err(err) = spill.writer().await else { + panic!("second writer() should be rejected"); + }; + assert!( + matches!(err, Error::InvalidInput { .. }), + "second writer() should be rejected with InvalidInput, got {err:?}" + ); + } + + /// A [`Writer`] whose `poll_write` accepts a fixed number of bytes per call, + /// or fails, so we can drive the [`QuotaWriter`] release arms that the local + /// backend (which accepts every write in full) never hits. + struct ControlledWriter { + outcome: Poll>, + } + + impl AsyncWrite for ControlledWriter { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + match &self.outcome { + Poll::Ready(Ok(n)) => Poll::Ready(Ok((*n).min(buf.len()))), + Poll::Ready(Err(e)) => Poll::Ready(Err(io::Error::new(e.kind(), e.to_string()))), + Poll::Pending => Poll::Pending, + } + } + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + } + + #[async_trait] + impl Writer for ControlledWriter { + async fn tell(&mut self) -> Result { + Ok(0) + } + async fn shutdown(&mut self) -> Result { + Ok(WriteResult::default()) + } + } + + #[tokio::test] + async fn test_quota_writer_releases_unaccepted_bytes() { + // Short write: the inner writer accepts only 10 of the 40 reserved bytes, + // so the 30-byte remainder must be returned to the budget. + let quota = DiskQuota::new(100); + let mut writer = QuotaWriter { + inner: Box::new(ControlledWriter { + outcome: Poll::Ready(Ok(10)), + }), + quota: quota.clone(), + }; + let n = writer.write(&[0u8; 40]).await.unwrap(); + assert_eq!(n, 10); + assert_eq!( + quota.used.load(Ordering::Relaxed), + 10, + "only the accepted bytes should remain reserved" + ); + + // Failed write: the full reservation must be released. + let quota = DiskQuota::new(100); + let mut writer = QuotaWriter { + inner: Box::new(ControlledWriter { + outcome: Poll::Ready(Err(io::Error::other("boom"))), + }), + quota: quota.clone(), + }; + writer.write(&[0u8; 40]).await.unwrap_err(); + assert_eq!( + quota.used.load(Ordering::Relaxed), + 0, + "a failed write should release its entire reservation" + ); + } +} diff --git a/rust/lance/src/session.rs b/rust/lance/src/session.rs index 8d5e9717570..868768254c3 100644 --- a/rust/lance/src/session.rs +++ b/rust/lance/src/session.rs @@ -9,6 +9,7 @@ use lance_core::deepsize::DeepSizeOf; use lance_core::{Error, Result}; use lance_index::IndexType; use lance_io::object_store::ObjectStoreRegistry; +use lance_io::spill::{LocalSpillStore, SpillStore}; use crate::dataset::{DEFAULT_INDEX_CACHE_SIZE, DEFAULT_METADATA_CACHE_SIZE}; use crate::session::caches::GlobalMetadataCache; @@ -53,6 +54,8 @@ pub struct Session { pub(crate) index_extensions: HashMap<(IndexType, String), Arc>, store_registry: Arc, + + spill_store: Arc, } impl DeepSizeOf for Session { @@ -107,6 +110,7 @@ impl Session { metadata_cache: GlobalMetadataCache(LanceCache::with_capacity(metadata_cache_size)), index_extensions: HashMap::new(), store_registry, + spill_store: Arc::new(LocalSpillStore::default()), } } @@ -124,9 +128,35 @@ impl Session { metadata_cache: GlobalMetadataCache(LanceCache::with_capacity(metadata_cache_size)), index_extensions: HashMap::new(), store_registry, + spill_store: Arc::new(LocalSpillStore::default()), } } + /// Replace the spill store used by this session. + /// + /// This is a builder-style method that consumes and returns `self`, making + /// it easy to chain during session construction: + /// + /// ```rust,no_run + /// # use lance::session::Session; + /// # use lance_io::spill::LocalSpillStore; + /// # use std::sync::Arc; + /// let session = Session::default() + /// .with_spill_store(Arc::new(LocalSpillStore::with_cap(1 << 30).unwrap())); + /// ``` + pub fn with_spill_store(mut self, store: Arc) -> Self { + self.spill_store = store; + self + } + + /// Return a reference to the session's spill store. + /// + /// Callers use this to obtain reclaimable scratch space for intermediate + /// state that overflows memory (e.g. index builders). + pub fn spill_store(&self) -> &dyn SpillStore { + &*self.spill_store + } + /// Register a new index extension. /// /// A name can only be registered once per type of index extension. @@ -265,6 +295,7 @@ mod tests { use lance_core::cache::{CacheKey, UnsizedCacheKey}; use lance_index::vector::VectorIndex; use std::borrow::Cow; + use tokio::io::AsyncWriteExt; struct TestKey(&'static str); impl CacheKey for TestKey { @@ -339,4 +370,36 @@ mod tests { assert_ne!(index_keys, metadata_keys); } + + #[tokio::test] + async fn test_default_session_has_spill_store() { + let session = Session::default(); + // Should be able to create a spill file and write to it without error. + let spill = session.spill_store().create_spill_file().unwrap(); + let mut writer = spill.writer().await.unwrap(); + writer.write_all(b"scratch").await.unwrap(); + lance_io::traits::Writer::shutdown(writer.as_mut()) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_custom_spill_store_injected() { + let capped = Arc::new(LocalSpillStore::with_cap(50).unwrap()); + let session = Session::default().with_spill_store(capped); + + let spill = session.spill_store().create_spill_file().unwrap(); + let mut writer = spill.writer().await.unwrap(); + // Writing 51 bytes exceeds the 50-byte cap; the typed error is wrapped + // in an io::Error by the writer and recovered on conversion. + let io_err = writer.write_all(&[0u8; 51]).await.unwrap_err(); + let err: lance_core::Error = io_err.into(); + assert!( + matches!( + err, + lance_core::Error::DiskCapExceeded { cap_bytes: 50, .. } + ), + "expected DiskCapExceeded, got {err}" + ); + } }