-
Notifications
You must be signed in to change notification settings - Fork 2k
Follow-up: remove interleave panic recovery after Arrow 58.1.0 #21436
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,8 +24,6 @@ use arrow::record_batch::RecordBatch; | |
| use datafusion_common::{DataFusionError, Result}; | ||
| use datafusion_execution::memory_pool::MemoryReservation; | ||
| use log::warn; | ||
| use std::any::Any; | ||
| use std::panic::{AssertUnwindSafe, catch_unwind}; | ||
| use std::sync::Arc; | ||
|
|
||
| #[derive(Debug, Copy, Clone, Default)] | ||
|
|
@@ -143,7 +141,9 @@ impl BatchBuilder { | |
| .iter() | ||
| .map(|(_, batch)| batch.column(column_idx).as_ref()) | ||
| .collect(); | ||
| recover_offset_overflow_from_panic(|| interleave(&arrays, indices)) | ||
| // Arrow 58.1.0+ returns OffsetOverflowError directly from | ||
| // interleave, allowing retry_interleave to shrink the batch. | ||
| interleave(&arrays, indices).map_err(Into::into) | ||
| }) | ||
| .collect::<Result<Vec<_>>>() | ||
| } | ||
|
|
@@ -243,33 +243,11 @@ fn is_offset_overflow(e: &DataFusionError) -> bool { | |
| ) | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| fn offset_overflow_error() -> DataFusionError { | ||
| DataFusionError::ArrowError(Box::new(ArrowError::OffsetOverflowError(0)), None) | ||
| } | ||
|
|
||
| fn recover_offset_overflow_from_panic<T, F>(f: F) -> Result<T> | ||
| where | ||
| F: FnOnce() -> std::result::Result<T, ArrowError>, | ||
| { | ||
| // Arrow's interleave can panic on i32 offset overflow with | ||
| // `.expect("overflow")` / `.expect("offset overflow")`. | ||
| // Catch only those specific panics so the caller can retry | ||
| // with fewer rows while unrelated defects still unwind. | ||
| // | ||
| // TODO: remove once arrow-rs#9549 lands — interleave will return | ||
| // OffsetOverflowError directly instead of panicking. | ||
| match catch_unwind(AssertUnwindSafe(f)) { | ||
| Ok(result) => Ok(result?), | ||
| Err(panic_payload) => { | ||
| if is_arrow_offset_overflow_panic(panic_payload.as_ref()) { | ||
| Err(offset_overflow_error()) | ||
| } else { | ||
| std::panic::resume_unwind(panic_payload); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn retry_interleave<T, F>( | ||
| mut rows_to_emit: usize, | ||
| total_rows: usize, | ||
|
|
@@ -281,6 +259,7 @@ where | |
| loop { | ||
| match interleave(rows_to_emit) { | ||
| Ok(value) => return Ok((rows_to_emit, value)), | ||
| // Only offset overflow is recoverable by emitting fewer rows. | ||
| Err(e) if is_offset_overflow(&e) => { | ||
| rows_to_emit /= 2; | ||
| if rows_to_emit == 0 { | ||
|
|
@@ -295,26 +274,34 @@ where | |
| } | ||
| } | ||
|
|
||
| fn panic_message(payload: &(dyn Any + Send)) -> Option<&str> { | ||
| if let Some(msg) = payload.downcast_ref::<&str>() { | ||
| return Some(msg); | ||
| } | ||
| if let Some(msg) = payload.downcast_ref::<String>() { | ||
| return Some(msg.as_str()); | ||
| } | ||
| None | ||
| } | ||
|
|
||
| /// Returns true if a caught panic payload matches the Arrow offset overflows | ||
| /// raised by interleave's offset builders. | ||
| fn is_arrow_offset_overflow_panic(payload: &(dyn Any + Send)) -> bool { | ||
| matches!(panic_message(payload), Some("overflow" | "offset overflow")) | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
| use arrow::error::ArrowError; | ||
| use arrow::array::{Array, ArrayDataBuilder, Int32Array, ListArray}; | ||
| use arrow::buffer::Buffer; | ||
| use arrow::datatypes::{DataType, Field, Schema}; | ||
| use datafusion_execution::memory_pool::{ | ||
| MemoryConsumer, MemoryPool, UnboundedMemoryPool, | ||
| }; | ||
|
|
||
| fn overflow_list_batch() -> RecordBatch { | ||
| let values_field = Arc::new(Field::new_list_field(DataType::Int32, true)); | ||
| // SAFETY: This intentionally constructs an invalid child length so | ||
| // Arrow's interleave hits offset overflow before touching child data. | ||
| let list = ListArray::from(unsafe { | ||
| ArrayDataBuilder::new(DataType::List(Arc::clone(&values_field))) | ||
| .len(1) | ||
| .add_buffer(Buffer::from_slice_ref([0_i32, i32::MAX])) | ||
| .add_child_data(Int32Array::from(Vec::<i32>::new()).to_data()) | ||
| .build_unchecked() | ||
| }); | ||
| let schema = Arc::new(Schema::new(vec![Field::new( | ||
| "list_col", | ||
| DataType::List(values_field), | ||
| true, | ||
| )])); | ||
| RecordBatch::try_new(schema, vec![Arc::new(list)]).unwrap() | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_retry_interleave_halves_rows_until_success() { | ||
|
|
@@ -336,43 +323,37 @@ mod tests { | |
| } | ||
|
|
||
| #[test] | ||
| fn test_recover_offset_overflow_from_panic() { | ||
| let error = recover_offset_overflow_from_panic( | ||
| || -> std::result::Result<(), ArrowError> { panic!("offset overflow") }, | ||
| ) | ||
| .unwrap_err(); | ||
|
|
||
| assert!(is_offset_overflow(&error)); | ||
| fn test_is_offset_overflow_matches_arrow_error() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it make sense to move this coverage up a layer and exercise Right now the tests only stub |
||
| assert!(is_offset_overflow(&offset_overflow_error())); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_recover_offset_overflow_from_panic_rethrows_unrelated_panics() { | ||
| let panic_payload = catch_unwind(AssertUnwindSafe(|| { | ||
| let _ = recover_offset_overflow_from_panic( | ||
| || -> std::result::Result<(), ArrowError> { panic!("capacity overflow") }, | ||
| ); | ||
| })); | ||
|
|
||
| assert!(panic_payload.is_err()); | ||
| fn test_retry_interleave_does_not_retry_non_offset_errors() { | ||
| let mut attempts = Vec::new(); | ||
|
|
||
| let error = retry_interleave(4, 4, |rows_to_emit| { | ||
| attempts.push(rows_to_emit); | ||
| Err::<(), _>(DataFusionError::Execution("boom".into())) | ||
| }) | ||
| .unwrap_err(); | ||
|
|
||
| assert_eq!(attempts, vec![4]); | ||
| assert!(matches!(error, DataFusionError::Execution(msg) if msg == "boom")); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_is_arrow_offset_overflow_panic() { | ||
| let overflow = Box::new("overflow") as Box<dyn Any + Send>; | ||
| assert!(is_arrow_offset_overflow_panic(overflow.as_ref())); | ||
|
|
||
| let offset_overflow = | ||
| Box::new(String::from("offset overflow")) as Box<dyn Any + Send>; | ||
| assert!(is_arrow_offset_overflow_panic(offset_overflow.as_ref())); | ||
|
|
||
| let capacity_overflow = Box::new("capacity overflow") as Box<dyn Any + Send>; | ||
| assert!(!is_arrow_offset_overflow_panic(capacity_overflow.as_ref())); | ||
|
|
||
| let arithmetic_overflow = | ||
| Box::new(String::from("attempt to multiply with overflow")) | ||
| as Box<dyn Any + Send>; | ||
| assert!(!is_arrow_offset_overflow_panic( | ||
| arithmetic_overflow.as_ref() | ||
| )); | ||
| fn test_try_interleave_columns_surfaces_arrow_offset_overflow() { | ||
| let batch = overflow_list_batch(); | ||
| let schema = batch.schema(); | ||
| let pool: Arc<dyn MemoryPool> = Arc::new(UnboundedMemoryPool::default()); | ||
| let reservation = MemoryConsumer::new("test").register(&pool); | ||
| let mut builder = BatchBuilder::new(schema, 1, 2, reservation); | ||
| builder.push_batch(0, batch).unwrap(); | ||
|
|
||
| let error = builder | ||
| .try_interleave_columns(&[(0, 0), (0, 0)]) | ||
| .unwrap_err(); | ||
|
|
||
| assert!(is_offset_overflow(&error)); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe worth adding a short comment here noting that this now relies on Arrow 58.1.0+ returning
OffsetOverflowErrordirectly.That would make the cleanup easier to understand in this file, especially since the removed shim was guarding this exact call site.