Skip to content

Commit ad19afe

Browse files
wjones127claude
andcommitted
feat: add SpillStore for generic RAII scratch space
Adds a `SpillStore` trait on `Session` providing uniform, reclaimable scratch space for intermediate state that overflows memory (e.g. index build posting lists, shuffle runs, BTree pages). - `SpillStore` / `SpillFile` (lance-io): `create_spill_file()` vends a RAII handle; `writer()` / `reader()` hand back `Box<dyn Writer>` / `Box<dyn Reader>` so callers feed spill files directly into `FileWriter::try_new` and a v2 `FileReader` without leaking an `ObjectStore` + path. The file is deleted on drop and its bytes are released back to the store's usage counter. - `LocalSpillStore`: writes to an OS temp directory; optionally enforces a byte cap. Enforcement lives entirely in the spill store: the spill file decorates the writer with a quota-enforcing `QuotaWriter` (reserve-on-write, release-on-drop-by-stat) rather than threading a field through `ObjectStore` and every provider, so it works for any backend the store opens. - `From<io::Error>` recovers a wrapped lance `Error`, so typed errors such as `DiskCapExceeded` survive the `AsyncWrite` boundary. - `ScanScheduler::open_reader` builds a `FileScheduler` over an already-open `Reader` (no path/size lookup). - `Session` gains a `spill_store` field (defaults to uncapped `LocalSpillStore`), a `with_spill_store()` builder, and a `spill_store()` accessor so callers and tests can inject alternatives. Mechanism only; consumer migration (IVF shuffler) is a follow-up. Closes #7300 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 0f2745d commit ad19afe

5 files changed

Lines changed: 576 additions & 0 deletions

File tree

rust/lance-core/src/error.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,16 @@ pub enum Error {
238238
/// A requested field was not found in a schema.
239239
#[snafu(transparent)]
240240
FieldNotFound { source: FieldNotFoundError },
241+
242+
#[snafu(display(
243+
"Spill disk cap of {cap_bytes} bytes exceeded; currently using {used_bytes} bytes, {location}"
244+
))]
245+
DiskCapExceeded {
246+
cap_bytes: u64,
247+
used_bytes: u64,
248+
#[snafu(implicit)]
249+
location: Location,
250+
},
241251
}
242252

243253
impl Error {
@@ -431,6 +441,15 @@ impl Error {
431441
IncompatibleTransactionSnafu.into_error(source)
432442
}
433443

444+
#[track_caller]
445+
pub fn disk_cap_exceeded(cap_bytes: u64, used_bytes: u64) -> Self {
446+
DiskCapExceededSnafu {
447+
cap_bytes,
448+
used_bytes,
449+
}
450+
.build()
451+
}
452+
434453
/// Create an External error from a boxed error source.
435454
pub fn external(source: BoxedError) -> Self {
436455
Self::External { source }
@@ -512,6 +531,17 @@ impl From<&ArrowError> for Error {
512531
impl From<std::io::Error> for Error {
513532
#[track_caller]
514533
fn from(e: std::io::Error) -> Self {
534+
// A lance `Error` may have been wrapped in an `io::Error` (e.g. via
535+
// `io::Error::other(Error::...)`) to cross an `AsyncWrite`/`AsyncRead`
536+
// boundary. Recover it so typed errors such as `DiskCapExceeded`
537+
// survive the round-trip instead of collapsing into an opaque `IO`.
538+
if e.get_ref().is_some_and(|inner| inner.is::<Self>()) {
539+
return *e
540+
.into_inner()
541+
.expect("checked Some above")
542+
.downcast::<Self>()
543+
.expect("checked type above");
544+
}
515545
Self::io_source(box_error(e))
516546
}
517547
}
@@ -718,6 +748,33 @@ mod test {
718748

719749
impl std::error::Error for MyCustomError {}
720750

751+
#[test]
752+
fn test_io_error_recovers_wrapped_lance_error() {
753+
// A lance Error wrapped in io::Error::other should round-trip back to
754+
// the original variant rather than collapsing into Error::IO.
755+
let io_err = std::io::Error::other(Error::disk_cap_exceeded(100, 50));
756+
let recovered: Error = io_err.into();
757+
match recovered {
758+
Error::DiskCapExceeded {
759+
cap_bytes,
760+
used_bytes,
761+
..
762+
} => {
763+
assert_eq!(cap_bytes, 100);
764+
assert_eq!(used_bytes, 50);
765+
}
766+
other => panic!("expected DiskCapExceeded, got {other:?}"),
767+
}
768+
}
769+
770+
#[test]
771+
fn test_io_error_without_lance_error_stays_io() {
772+
// A plain io::Error (no wrapped lance Error) should become Error::IO.
773+
let io_err = std::io::Error::new(std::io::ErrorKind::NotFound, "missing");
774+
let converted: Error = io_err.into();
775+
assert!(matches!(converted, Error::IO { .. }));
776+
}
777+
721778
#[test]
722779
fn test_external_error_creation() {
723780
let custom_err = MyCustomError {

rust/lance-io/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub mod object_reader;
1818
pub mod object_store;
1919
pub mod object_writer;
2020
pub mod scheduler;
21+
pub mod spill;
2122
pub mod stream;
2223
#[cfg(test)]
2324
pub mod testing;

rust/lance-io/src/scheduler.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -730,6 +730,28 @@ impl ScanScheduler {
730730
self.open_file_with_priority(path, 0, file_size_bytes).await
731731
}
732732

733+
/// Open a [`FileScheduler`] over an already-open [`Reader`].
734+
///
735+
/// Unlike [`Self::open_file`], this skips the path lookup and size probe and
736+
/// schedules I/O against `reader` directly. This is useful when the reader
737+
/// was produced outside the scheduler's object store (e.g. a spill file
738+
/// opened via [`crate::spill::SpillFile::reader`]), since a bare `Reader`
739+
/// cannot otherwise drive a v2 `FileReader` (which needs a scheduler).
740+
///
741+
/// Uses a base priority of 0; chain [`FileScheduler::with_priority`] to set
742+
/// a different one.
743+
pub fn open_reader(self: &Arc<Self>, reader: Arc<dyn Reader>) -> FileScheduler {
744+
FileScheduler {
745+
reader,
746+
block_size: self.object_store.block_size() as u64,
747+
root: self.clone(),
748+
base_priority: 0,
749+
max_iop_size: self.object_store.max_iop_size(),
750+
bypass_backpressure: false,
751+
extra_stats: None,
752+
}
753+
}
754+
733755
fn do_submit_request(
734756
&self,
735757
reader: Arc<dyn Reader>,
@@ -1193,6 +1215,31 @@ mod tests {
11931215
}
11941216
}
11951217

1218+
#[tokio::test]
1219+
async fn test_open_reader_bridge() {
1220+
let tmp_file = TempObjFile::default();
1221+
1222+
let obj_store = Arc::new(ObjectStore::local());
1223+
1224+
const DATA_SIZE: u64 = 64 * 1024;
1225+
let mut some_data = vec![0; DATA_SIZE as usize];
1226+
rand::rng().fill_bytes(&mut some_data);
1227+
obj_store.put(&tmp_file, &some_data).await.unwrap();
1228+
1229+
let config = SchedulerConfig::default_for_testing();
1230+
let scheduler = ScanScheduler::new(obj_store.clone(), config);
1231+
1232+
// Open a bare Reader ourselves, then bridge it into a FileScheduler.
1233+
let reader: Arc<dyn Reader> = obj_store.open(&tmp_file).await.unwrap().into();
1234+
let file_scheduler = scheduler.open_reader(reader);
1235+
1236+
let bytes = file_scheduler
1237+
.submit_request(vec![0..DATA_SIZE], 0)
1238+
.await
1239+
.unwrap();
1240+
assert_eq!(bytes[0], some_data);
1241+
}
1242+
11961243
#[tokio::test]
11971244
async fn test_split_coalesce() {
11981245
let tmp_file = TempObjFile::default();

0 commit comments

Comments
 (0)