feat: add SpillStore trait with local-disk implementation#7311
feat: add SpillStore trait with local-disk implementation#7311wjones127 wants to merge 1 commit into
Conversation
2a5a2b4 to
ad19afe
Compare
Adds a `SpillStore` trait on `Session` providing uniform, reclaimable scratch space for intermediate state that overflows memory (e.g. index build posting lists, shuffle runs, BTree pages). - `SpillStore` / `SpillFile` (lance-io): `create_spill_file()` vends a RAII handle; `writer()` / `reader()` hand back `Box<dyn Writer>` / `Box<dyn Reader>` so callers feed spill files directly into `FileWriter::try_new` and a v2 `FileReader` without leaking an `ObjectStore` + path. The file is deleted on drop and its bytes are released back to the store's usage counter. - `LocalSpillStore`: writes to an OS temp directory; optionally enforces a byte cap. Enforcement lives entirely in the spill store: the spill file decorates the writer with a quota-enforcing `QuotaWriter` (reserve-on-write, release-on-drop-by-stat) rather than threading a field through `ObjectStore` and every provider, so it works for any backend the store opens. - `From<io::Error>` recovers a wrapped lance `Error`, so typed errors such as `DiskCapExceeded` survive the `AsyncWrite` boundary. - `ScanScheduler::open_reader` builds a `FileScheduler` over an already-open `Reader` (no path/size lookup). - `Session` gains a `spill_store` field (defaults to uncapped `LocalSpillStore`), a `with_spill_store()` builder, and a `spill_store()` accessor so callers and tests can inject alternatives. Mechanism only; consumer migration (IVF shuffler) is a follow-up. Closes lance-format#7300 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
ad19afe to
b2c0c08
Compare
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
| //! called only once, so the bytes reserved while writing match the file size | ||
| //! released on drop. Two minor inexactnesses are not engineered around: a write | ||
| //! aborted at the cap leaks its reservation until the store is dropped, and a | ||
| //! file whose size cannot be stat-ed on drop is not released. |
There was a problem hiding this comment.
What does "a file whose size cannot be stat-ed on drop" mean? I guess it's not super important.
| pub trait SpillFile: Send + Sync { | ||
| /// Open a writer over this spill file. | ||
| /// | ||
| /// For a capped store, writes that would exceed the cap fail with | ||
| /// [`lance_core::Error::DiskCapExceeded`]. Spill files are write-once: | ||
| /// implementations may reject a second call (the [`LocalSpillStore`] impl | ||
| /// returns [`lance_core::Error::invalid_input`]). | ||
| async fn writer(&self) -> Result<Box<dyn Writer>>; | ||
|
|
||
| /// Open a reader over this spill file. | ||
| /// | ||
| /// The data must have been fully written (the writer shut down) first. | ||
| async fn reader(&self) -> Result<Box<dyn Reader>>; | ||
| } |
There was a problem hiding this comment.
It's not super important but it seems there is an implicit lifecycle here that could be captured at compile time. The file must be fully written before it can be opened for reads and the writer can only be obtained once.
There was a problem hiding this comment.
That's a good point. I might refactor based on that.
| /// [`SpillStore::create_spill_file`], allowing implementations not backed by a | ||
| /// local file (e.g. in-memory buffers, remote object stores). | ||
| #[async_trait] | ||
| pub trait SpillFile: Send + Sync { |
There was a problem hiding this comment.
Why a trait and not a struct? Are you expecting multiple impls?
| loop { | ||
| let current = self.used.load(Ordering::Relaxed); | ||
| let next = current.saturating_add(n); | ||
| if next > self.cap_bytes { | ||
| return Err(Error::disk_cap_exceeded(self.cap_bytes, current)); | ||
| } | ||
| if self | ||
| .used | ||
| .compare_exchange(current, next, Ordering::Relaxed, Ordering::Relaxed) | ||
| .is_ok() | ||
| { | ||
| return Ok(()); | ||
| } | ||
| // Another thread won the CAS — retry. | ||
| } |
There was a problem hiding this comment.
Personal nit but anytime I start writing CAS loops I give up and just use a mutex 😆
Summary
Adds a generic
SpillStore— reclaimable RAII scratch storage for intermediate state that overflows memory (e.g. index-build posting lists, shuffle runs, BTree pages). Mechanism only; consumer migration (IVF shuffler) is a follow-up.SpillStore/SpillFile(lance-io):create_spill_file()vends a write-once RAII handle whosewriter()/reader()hand backBox<dyn Writer>/Box<dyn Reader>, so callers feed spill files straight intoFileWriter::try_newand a v2FileReaderwithout leaking anObjectStore+ path. Dropping the handle deletes the file and releases its bytes back to the store's budget.LocalSpillStore: writes to an OS temp directory;with_capenforces an optional byte budget shared across all handles, returning a typedError::DiskCapExceededinstead of silently filling the disk. Enforcement lives entirely in the spill store — the spill file decorates the writer with aQuotaWriter(reserve-on-write, release-on-drop-by-stat) rather than threading a field throughObjectStoreand every provider, so it works for any backend the store opens.From<io::Error>recovers a wrapped lanceError, so typed errors such asDiskCapExceededsurvive theAsyncWriteboundary.ScanScheduler::open_readerbuilds aFileSchedulerover an already-openReader(no path/size lookup), bridging a bare spill reader into the v2 reader path.Sessiongains aspill_storefield (default: uncappedLocalSpillStore), awith_spill_store()builder for injection, and aspill_store()accessor.Closes #7300
🤖 Generated with Claude Code