Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 15 additions & 77 deletions datafusion/physical-plan/src/sorts/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ use arrow::record_batch::RecordBatch;
use datafusion_common::{DataFusionError, Result};
use datafusion_execution::memory_pool::MemoryReservation;
use log::warn;
use std::any::Any;
use std::panic::{AssertUnwindSafe, catch_unwind};
use std::sync::Arc;

#[derive(Debug, Copy, Clone, Default)]
Expand Down Expand Up @@ -143,7 +141,7 @@ impl BatchBuilder {
.iter()
.map(|(_, batch)| batch.column(column_idx).as_ref())
.collect();
recover_offset_overflow_from_panic(|| interleave(&arrays, indices))
interleave(&arrays, indices).map_err(Into::into)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth adding a short comment here noting that this now relies on Arrow 58.1.0+ returning OffsetOverflowError directly.

That would make the cleanup easier to understand in this file, especially since the removed shim was guarding this exact call site.

})
.collect::<Result<Vec<_>>>()
}
Expand Down Expand Up @@ -243,33 +241,11 @@ fn is_offset_overflow(e: &DataFusionError) -> bool {
)
}

#[cfg(test)]
fn offset_overflow_error() -> DataFusionError {
DataFusionError::ArrowError(Box::new(ArrowError::OffsetOverflowError(0)), None)
}

fn recover_offset_overflow_from_panic<T, F>(f: F) -> Result<T>
where
F: FnOnce() -> std::result::Result<T, ArrowError>,
{
// Arrow's interleave can panic on i32 offset overflow with
// `.expect("overflow")` / `.expect("offset overflow")`.
// Catch only those specific panics so the caller can retry
// with fewer rows while unrelated defects still unwind.
//
// TODO: remove once arrow-rs#9549 lands — interleave will return
// OffsetOverflowError directly instead of panicking.
match catch_unwind(AssertUnwindSafe(f)) {
Ok(result) => Ok(result?),
Err(panic_payload) => {
if is_arrow_offset_overflow_panic(panic_payload.as_ref()) {
Err(offset_overflow_error())
} else {
std::panic::resume_unwind(panic_payload);
}
}
}
}

fn retry_interleave<T, F>(
mut rows_to_emit: usize,
total_rows: usize,
Expand All @@ -281,6 +257,7 @@ where
loop {
match interleave(rows_to_emit) {
Ok(value) => return Ok((rows_to_emit, value)),
// Only offset overflow is recoverable by emitting fewer rows.
Err(e) if is_offset_overflow(&e) => {
rows_to_emit /= 2;
if rows_to_emit == 0 {
Expand All @@ -295,26 +272,9 @@ where
}
}

fn panic_message(payload: &(dyn Any + Send)) -> Option<&str> {
if let Some(msg) = payload.downcast_ref::<&str>() {
return Some(msg);
}
if let Some(msg) = payload.downcast_ref::<String>() {
return Some(msg.as_str());
}
None
}

/// Returns true if a caught panic payload matches the Arrow offset overflows
/// raised by interleave's offset builders.
fn is_arrow_offset_overflow_panic(payload: &(dyn Any + Send)) -> bool {
matches!(panic_message(payload), Some("overflow" | "offset overflow"))
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::error::ArrowError;

#[test]
fn test_retry_interleave_halves_rows_until_success() {
Expand All @@ -336,43 +296,21 @@ mod tests {
}

#[test]
fn test_recover_offset_overflow_from_panic() {
let error = recover_offset_overflow_from_panic(
|| -> std::result::Result<(), ArrowError> { panic!("offset overflow") },
)
.unwrap_err();

assert!(is_offset_overflow(&error));
fn test_is_offset_overflow_matches_arrow_error() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to move this coverage up a layer and exercise BatchBuilder::build_record_batch, or even the sort-preserving merge drain path, using a real ArrowError::OffsetOverflowError?

Right now the tests only stub retry_interleave, so they might miss regressions if interleave starts surfacing the overflow differently again.

assert!(is_offset_overflow(&offset_overflow_error()));
}

#[test]
fn test_recover_offset_overflow_from_panic_rethrows_unrelated_panics() {
let panic_payload = catch_unwind(AssertUnwindSafe(|| {
let _ = recover_offset_overflow_from_panic(
|| -> std::result::Result<(), ArrowError> { panic!("capacity overflow") },
);
}));

assert!(panic_payload.is_err());
}
fn test_retry_interleave_does_not_retry_non_offset_errors() {
let mut attempts = Vec::new();

#[test]
fn test_is_arrow_offset_overflow_panic() {
let overflow = Box::new("overflow") as Box<dyn Any + Send>;
assert!(is_arrow_offset_overflow_panic(overflow.as_ref()));

let offset_overflow =
Box::new(String::from("offset overflow")) as Box<dyn Any + Send>;
assert!(is_arrow_offset_overflow_panic(offset_overflow.as_ref()));

let capacity_overflow = Box::new("capacity overflow") as Box<dyn Any + Send>;
assert!(!is_arrow_offset_overflow_panic(capacity_overflow.as_ref()));

let arithmetic_overflow =
Box::new(String::from("attempt to multiply with overflow"))
as Box<dyn Any + Send>;
assert!(!is_arrow_offset_overflow_panic(
arithmetic_overflow.as_ref()
));
let error = retry_interleave(4, 4, |rows_to_emit| {
attempts.push(rows_to_emit);
Err::<(), _>(DataFusionError::Execution("boom".into()))
})
.unwrap_err();

assert_eq!(attempts, vec![4]);
assert!(matches!(error, DataFusionError::Execution(msg) if msg == "boom"));
}
}
Loading