Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions rust/lance-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,16 @@ pub enum Error {
/// A requested field was not found in a schema.
#[snafu(transparent)]
FieldNotFound { source: FieldNotFoundError },

#[snafu(display(
"Spill disk cap of {cap_bytes} bytes exceeded; currently using {used_bytes} bytes, {location}"
))]
DiskCapExceeded {
cap_bytes: u64,
used_bytes: u64,
#[snafu(implicit)]
location: Location,
},
}

impl Error {
Expand Down Expand Up @@ -431,6 +441,15 @@ impl Error {
IncompatibleTransactionSnafu.into_error(source)
}

#[track_caller]
pub fn disk_cap_exceeded(cap_bytes: u64, used_bytes: u64) -> Self {
DiskCapExceededSnafu {
cap_bytes,
used_bytes,
}
.build()
}

/// Create an External error from a boxed error source.
pub fn external(source: BoxedError) -> Self {
Self::External { source }
Expand Down Expand Up @@ -512,6 +531,17 @@ impl From<&ArrowError> for Error {
impl From<std::io::Error> for Error {
#[track_caller]
fn from(e: std::io::Error) -> Self {
// A lance `Error` may have been wrapped in an `io::Error` (e.g. via
// `io::Error::other(Error::...)`) to cross an `AsyncWrite`/`AsyncRead`
// boundary. Recover it so typed errors such as `DiskCapExceeded`
// survive the round-trip instead of collapsing into an opaque `IO`.
if e.get_ref().is_some_and(|inner| inner.is::<Self>()) {
return *e
.into_inner()
.expect("checked Some above")
.downcast::<Self>()
.expect("checked type above");
}
Self::io_source(box_error(e))
}
}
Expand Down Expand Up @@ -718,6 +748,33 @@ mod test {

impl std::error::Error for MyCustomError {}

#[test]
fn test_io_error_recovers_wrapped_lance_error() {
// A lance Error wrapped in io::Error::other should round-trip back to
// the original variant rather than collapsing into Error::IO.
let io_err = std::io::Error::other(Error::disk_cap_exceeded(100, 50));
let recovered: Error = io_err.into();
match recovered {
Error::DiskCapExceeded {
cap_bytes,
used_bytes,
..
} => {
assert_eq!(cap_bytes, 100);
assert_eq!(used_bytes, 50);
}
other => panic!("expected DiskCapExceeded, got {other:?}"),
}
}

#[test]
fn test_io_error_without_lance_error_stays_io() {
// A plain io::Error (no wrapped lance Error) should become Error::IO.
let io_err = std::io::Error::new(std::io::ErrorKind::NotFound, "missing");
let converted: Error = io_err.into();
assert!(matches!(converted, Error::IO { .. }));
}

#[test]
fn test_external_error_creation() {
let custom_err = MyCustomError {
Expand Down
1 change: 1 addition & 0 deletions rust/lance-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod object_reader;
pub mod object_store;
pub mod object_writer;
pub mod scheduler;
pub mod spill;
pub mod stream;
#[cfg(test)]
pub mod testing;
Expand Down
47 changes: 47 additions & 0 deletions rust/lance-io/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,28 @@ impl ScanScheduler {
self.open_file_with_priority(path, 0, file_size_bytes).await
}

/// Open a [`FileScheduler`] over an already-open [`Reader`].
///
/// Unlike [`Self::open_file`], this skips the path lookup and size probe and
/// schedules I/O against `reader` directly. This is useful when the reader
/// was produced outside the scheduler's object store (e.g. a spill file
/// opened via [`crate::spill::SpillFile::reader`]), since a bare `Reader`
/// cannot otherwise drive a v2 `FileReader` (which needs a scheduler).
///
/// Uses a base priority of 0; chain [`FileScheduler::with_priority`] to set
/// a different one.
pub fn open_reader(self: &Arc<Self>, reader: Arc<dyn Reader>) -> FileScheduler {
FileScheduler {
reader,
block_size: self.object_store.block_size() as u64,
root: self.clone(),
base_priority: 0,
max_iop_size: self.object_store.max_iop_size(),
bypass_backpressure: false,
extra_stats: None,
}
}

fn do_submit_request(
&self,
reader: Arc<dyn Reader>,
Expand Down Expand Up @@ -1193,6 +1215,31 @@ mod tests {
}
}

#[tokio::test]
async fn test_open_reader_bridge() {
let tmp_file = TempObjFile::default();

let obj_store = Arc::new(ObjectStore::local());

const DATA_SIZE: u64 = 64 * 1024;
let mut some_data = vec![0; DATA_SIZE as usize];
rand::rng().fill_bytes(&mut some_data);
obj_store.put(&tmp_file, &some_data).await.unwrap();

let config = SchedulerConfig::default_for_testing();
let scheduler = ScanScheduler::new(obj_store.clone(), config);

// Open a bare Reader ourselves, then bridge it into a FileScheduler.
let reader: Arc<dyn Reader> = obj_store.open(&tmp_file).await.unwrap().into();
let file_scheduler = scheduler.open_reader(reader);

let bytes = file_scheduler
.submit_request(vec![0..DATA_SIZE], 0)
.await
.unwrap();
assert_eq!(bytes[0], some_data);
}

#[tokio::test]
async fn test_split_coalesce() {
let tmp_file = TempObjFile::default();
Expand Down
Loading
Loading