From 337775c89c3618b62c2b71789fede9bd282f28b4 Mon Sep 17 00:00:00 2001 From: cl Date: Wed, 3 Jun 2026 00:58:06 +0800 Subject: [PATCH 1/2] arrow-select: specialize filtered batch coalescing Teach BatchCoalescer to coalesce filtered batches without first materializing a filtered RecordBatch when the schema is made of primitive, Utf8View, and BinaryView columns. Store a schema-level capability flag so unsupported columns such as Utf8, Boolean, and Dictionary continue to use the existing materialized filter path. This keeps the sparse-copy fast path local to InProgressArray implementations that have specialized row-copy support. Reuse data buffers for non-inline ByteView fallback filtering, and add direct primitive copy paths for index and slice selections. Signed-off-by: cl --- arrow-select/src/coalesce.rs | 341 +++++++++++++++++++++++-- arrow-select/src/coalesce/byte_view.rs | 167 +++++++++++- arrow-select/src/coalesce/generic.rs | 11 + arrow-select/src/coalesce/primitive.rs | 222 +++++++++++++++- arrow-select/src/filter.rs | 145 ++++++++++- 5 files changed, 844 insertions(+), 42 deletions(-) diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs index 8fe88fb8c377..4c0a21990e61 100644 --- a/arrow-select/src/coalesce.rs +++ b/arrow-select/src/coalesce.rs @@ -20,7 +20,7 @@ //! //! [`filter`]: crate::filter::filter //! [`take`]: crate::take::take -use crate::filter::filter_record_batch; +use crate::filter::{FilterBuilder, FilterPredicate, FilterSelection}; use crate::take::take_record_batch; use arrow_array::types::{BinaryViewType, StringViewType}; use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, downcast_primitive}; @@ -38,6 +38,10 @@ use byte_view::InProgressByteViewArray; use generic::GenericInProgressArray; use primitive::InProgressPrimitiveArray; +fn has_sparse_filter_copy(data_type: &DataType) -> bool { + data_type.is_primitive() || matches!(data_type, DataType::Utf8View | DataType::BinaryView) +} + /// Concatenate multiple [`RecordBatch`]es /// /// Implements the common pattern of incrementally creating output @@ -139,6 +143,8 @@ pub struct BatchCoalescer { target_batch_size: usize, /// In-progress arrays in_progress_arrays: Vec>, + /// True if some column still needs the materialized filter path. + has_non_specialized_filter_columns: bool, /// Buffered row count. Always less than `batch_size` buffered_rows: usize, /// Completed batches @@ -156,6 +162,10 @@ impl BatchCoalescer { /// Typical values are `4096` or `8192` rows. /// pub fn new(schema: SchemaRef, target_batch_size: usize) -> Self { + let has_non_specialized_filter_columns = schema + .fields() + .iter() + .any(|field| !has_sparse_filter_copy(field.data_type())); let in_progress_arrays = schema .fields() .iter() @@ -166,6 +176,7 @@ impl BatchCoalescer { schema, target_batch_size, in_progress_arrays, + has_non_specialized_filter_columns, // We will for sure store at least one completed batch completed: VecDeque::with_capacity(1), buffered_rows: 0, @@ -212,7 +223,7 @@ impl BatchCoalescer { /// Push a batch into the Coalescer after applying a filter /// /// This is semantically equivalent of calling [`Self::push_batch`] - /// with the results from [`filter_record_batch`] + /// with the results from [`crate::filter::filter_record_batch`] /// /// # Example /// ``` @@ -238,10 +249,7 @@ impl BatchCoalescer { batch: RecordBatch, filter: &BooleanArray, ) -> Result<(), ArrowError> { - // TODO: optimize this to avoid materializing (copying the results - // of filter to a new batch) - let filtered_batch = filter_record_batch(&batch, filter)?; - self.push_batch(filtered_batch) + self.push_batch_with_filtered_columns(batch, filter) } /// Push a batch into the Coalescer after applying a set of indices @@ -566,6 +574,84 @@ impl BatchCoalescer { } } +impl BatchCoalescer { + fn filter_predicate_for_batch( + batch: &RecordBatch, + filter: &BooleanArray, + selected_count: usize, + ) -> FilterPredicate { + let mut filter_builder = FilterBuilder::new_with_count(filter, selected_count); + if batch.num_columns() > 1 + || (batch.num_columns() > 0 + && FilterBuilder::is_optimize_beneficial(batch.schema_ref().field(0).data_type())) + { + filter_builder = filter_builder.optimize(); + } + filter_builder.build() + } + + fn push_batch_with_filtered_columns( + &mut self, + batch: RecordBatch, + filter: &BooleanArray, + ) -> Result<(), ArrowError> { + let filter_len = filter.len(); + let batch_num_rows = batch.num_rows(); + let batch_num_columns = batch.num_columns(); + + if filter_len > batch_num_rows { + return Err(ArrowError::InvalidArgumentError(format!( + "Filter predicate of length {} is larger than target array of length {}", + filter_len, batch_num_rows + ))); + } + + let selected_count = filter.true_count(); + if selected_count == 0 { + return Ok(()); + } + + if selected_count == batch_num_rows && filter_len == batch_num_rows { + return self.push_batch(batch); + } + + if batch_num_columns != self.in_progress_arrays.len() { + return Err(ArrowError::InvalidArgumentError(format!( + "Batch has {} columns but BatchCoalescer expects {}", + batch_num_columns, + self.in_progress_arrays.len() + ))); + } + + let exceeds_coalesce_limit = self + .biggest_coalesce_batch_size + .is_some_and(|limit| selected_count > limit); + let does_not_fit_buffer = selected_count > self.target_batch_size - self.buffered_rows; + + if exceeds_coalesce_limit || self.has_non_specialized_filter_columns || does_not_fit_buffer + { + // Use materialized filtering when sparse per-column copying is unavailable. + let predicate = Self::filter_predicate_for_batch(&batch, filter, selected_count); + let filtered_batch = predicate.filter_record_batch(&batch)?; + return self.push_batch(filtered_batch); + } + + let predicate = Self::filter_predicate_for_batch(&batch, filter, selected_count); + let (_schema, arrays, _num_rows) = batch.into_parts(); + + for (in_progress, array) in self.in_progress_arrays.iter_mut().zip(arrays) { + in_progress.copy_rows_by_filter_from(array, &predicate)?; + } + + self.buffered_rows += selected_count; + if self.buffered_rows >= self.target_batch_size { + self.finish_buffered_batch()?; + } + + Ok(()) + } +} + /// Return a new `InProgressArray` for the given data type fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box { macro_rules! instantiate_primitive { @@ -611,6 +697,35 @@ trait InProgressArray: std::fmt::Debug + Send + Sync { /// Return an error if the source array is not set fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError>; + /// Copy rows selected by `filter` from the current source array. + fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(), ArrowError> { + self.copy_rows_by_selection(filter.selection()) + } + + /// Copy rows selected by `filter` from `source`. + fn copy_rows_by_filter_from( + &mut self, + source: ArrayRef, + filter: &FilterPredicate, + ) -> Result<(), ArrowError> { + self.set_source(Some(source)); + let result = self.copy_rows_by_filter(filter); + self.set_source(None); + result + } + + /// Copy rows described by a [`FilterSelection`] from the current source array. + fn copy_rows_by_selection(&mut self, selection: FilterSelection<'_>) -> Result<(), ArrowError> { + match selection { + FilterSelection::None => Ok(()), + FilterSelection::All { len } => self.copy_rows(0, len), + FilterSelection::Slices(slices) => { + slices.try_for_each(|(start, end)| self.copy_rows(start, end - start)) + } + FilterSelection::Indices(indices) => indices.try_for_each(|idx| self.copy_rows(idx, 1)), + } + } + /// Finish the currently in-progress array and return it as an `ArrayRef` fn finish(&mut self) -> Result; } @@ -619,6 +734,7 @@ trait InProgressArray: std::fmt::Debug + Send + Sync { mod tests { use super::*; use crate::concat::concat_batches; + use crate::filter::filter_record_batch; use arrow_array::builder::StringViewBuilder; use arrow_array::cast::AsArray; use arrow_array::types::Int32Type; @@ -1197,6 +1313,195 @@ mod tests { .run(); } + #[test] + fn test_binary_view_filtered() { + let values: Vec> = vec![ + Some(b"foo"), + None, + Some(b"A longer string that is more than 12 bytes"), + ]; + + let binary_view = + BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000)); + let batch = + RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap(); + let filter = sparse_filter(1000); + + Test::new("coalesce_binary_view_filtered") + .with_batch(batch.clone()) + .with_filter(filter.clone()) + .with_batch(batch) + .with_filter(filter) + .with_batch_size(256) + .with_expected_output_sizes(vec![250]) + .run(); + } + + #[test] + fn test_binary_view_filtered_inline() { + let values: Vec> = vec![Some(b"foo"), None, Some(b"barbaz")]; + + let binary_view = + BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000)); + let batch = + RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap(); + let filter = sparse_filter(1000); + + Test::new("coalesce_binary_view_filtered_inline") + .with_batch(batch.clone()) + .with_filter(filter.clone()) + .with_batch(batch) + .with_filter(filter) + .with_batch_size(300) + .with_expected_output_sizes(vec![250]) + .run(); + } + + #[test] + fn test_string_view_filtered_inline() { + let values: Vec> = vec![Some("foo"), None, Some("barbaz")]; + + let string_view = + StringViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000)); + let batch = + RecordBatch::try_from_iter(vec![("c0", Arc::new(string_view) as ArrayRef)]).unwrap(); + let filter = sparse_filter(1000); + + Test::new("coalesce_string_view_filtered_inline") + .with_batch(batch.clone()) + .with_filter(filter.clone()) + .with_batch(batch) + .with_filter(filter) + .with_batch_size(300) + .with_expected_output_sizes(vec![250]) + .run(); + } + + #[test] + fn test_mixed_inline_binary_view_filtered() { + let int_values = + Int32Array::from_iter((0..1000).map(|v| if v % 5 == 0 { None } else { Some(v) })); + let float_values = arrow_array::Float64Array::from_iter((0..1000).map(|v| Some(v as f64))); + let binary_values: Vec> = vec![Some(b"foo"), None, Some(b"barbaz")]; + let binary_view = BinaryViewArray::from_iter( + std::iter::repeat(binary_values.iter()).flatten().take(1000), + ); + + let batch = RecordBatch::try_from_iter(vec![ + ("i", Arc::new(int_values) as ArrayRef), + ("f", Arc::new(float_values) as ArrayRef), + ("b", Arc::new(binary_view) as ArrayRef), + ]) + .unwrap(); + + let filter = sparse_filter(1000); + + Test::new("coalesce_mixed_inline_binary_view_filtered") + .with_batch(batch.clone()) + .with_filter(filter.clone()) + .with_batch(batch) + .with_filter(filter) + .with_batch_size(300) + .with_expected_output_sizes(vec![250]) + .run(); + } + + #[test] + fn test_mixed_inline_string_view_filtered() { + let int_values = + Int32Array::from_iter((0..1000).map(|v| if v % 5 == 0 { None } else { Some(v) })); + let float_values = arrow_array::Float64Array::from_iter((0..1000).map(|v| Some(v as f64))); + let string_values: Vec> = vec![Some("foo"), None, Some("barbaz")]; + let string_view = StringViewArray::from_iter( + std::iter::repeat(string_values.iter()).flatten().take(1000), + ); + + let batch = RecordBatch::try_from_iter(vec![ + ("i", Arc::new(int_values) as ArrayRef), + ("f", Arc::new(float_values) as ArrayRef), + ("s", Arc::new(string_view) as ArrayRef), + ]) + .unwrap(); + + let filter = sparse_filter(1000); + + Test::new("coalesce_mixed_inline_string_view_filtered") + .with_batch(batch.clone()) + .with_filter(filter.clone()) + .with_batch(batch) + .with_filter(filter) + .with_batch_size(300) + .with_expected_output_sizes(vec![250]) + .run(); + } + + #[test] + fn test_mixed_boolean_inline_string_view_filtered() { + let bool_values = BooleanArray::from_iter((0..1000).map(|v| Some(v % 3 == 0))); + let string_values: Vec> = vec![Some("foo"), None, Some("barbaz")]; + let string_view = StringViewArray::from_iter( + std::iter::repeat(string_values.iter()).flatten().take(1000), + ); + + let batch = RecordBatch::try_from_iter(vec![ + ("b", Arc::new(bool_values) as ArrayRef), + ("s", Arc::new(string_view) as ArrayRef), + ]) + .unwrap(); + + let filter = sparse_filter(1000); + + Test::new("coalesce_mixed_boolean_inline_string_view_filtered") + .with_batch(batch.clone()) + .with_filter(filter.clone()) + .with_batch(batch) + .with_filter(filter) + .with_batch_size(300) + .with_expected_output_sizes(vec![250]) + .run(); + } + + #[test] + fn test_inline_filter_rejects_filter_longer_than_batch() { + let values: Vec> = vec![Some(b"foo"), Some(b"bar")]; + let binary_view = BinaryViewArray::from_iter(values); + let batch = + RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap(); + let filter = BooleanArray::from(vec![true, false, true]); + + let mut coalescer = BatchCoalescer::new(batch.schema(), 100); + let result = coalescer.push_batch_with_filter(batch, &filter); + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!( + err.contains("Filter predicate of length 3 is larger than target array of length 2"), + "unexpected error: {err}" + ); + } + + #[test] + fn test_filter_fast_path_schema_capability() { + let supported = Arc::new(Schema::new(vec![ + Field::new("primitive", DataType::UInt32, false), + Field::new("utf8_view", DataType::Utf8View, true), + Field::new("binary_view", DataType::BinaryView, true), + ])); + let coalescer = BatchCoalescer::new(supported, 100); + assert!(!coalescer.has_non_specialized_filter_columns); + + let utf8 = Arc::new(Schema::new(vec![Field::new("utf8", DataType::Utf8, true)])); + let coalescer = BatchCoalescer::new(utf8, 100); + assert!(coalescer.has_non_specialized_filter_columns); + + let boolean = Arc::new(Schema::new(vec![Field::new( + "boolean", + DataType::Boolean, + true, + )])); + let coalescer = BatchCoalescer::new(boolean, 100); + assert!(coalescer.has_non_specialized_filter_columns); + } + #[derive(Debug, Clone, PartialEq)] struct ExpectedLayout { len: usize, @@ -1685,6 +1990,10 @@ mod tests { } } + fn sparse_filter(len: usize) -> BooleanArray { + BooleanArray::from_iter((0..len).map(|idx| Some(idx % 8 == 0))) + } + /// Returns the named column as a StringViewArray fn col_as_string_view<'b>(name: &str, batch: &'b RecordBatch) -> &'b StringViewArray { batch @@ -1701,18 +2010,20 @@ mod tests { let (schema, mut columns, row_count) = batch.into_parts(); for column in columns.iter_mut() { - let Some(string_view) = column.as_string_view_opt() else { + if let Some(string_view) = column.as_string_view_opt() { + // Re-create the StringViewArray to ensure memory layout is + // consistent + let mut builder = StringViewBuilder::new(); + for s in string_view.iter() { + builder.append_option(s); + } + *column = Arc::new(builder.finish()); continue; - }; + } - // Re-create the StringViewArray to ensure memory layout is - // consistent - let mut builder = StringViewBuilder::new(); - for s in string_view.iter() { - builder.append_option(s); + if let Some(binary_view) = column.as_binary_view_opt() { + *column = Arc::new(BinaryViewArray::from_iter(binary_view.iter())); } - // Update the column with the new StringViewArray - *column = Arc::new(builder.finish()); } let options = RecordBatchOptions::new().with_row_count(Some(row_count)); diff --git a/arrow-select/src/coalesce/byte_view.rs b/arrow-select/src/coalesce/byte_view.rs index 6062cd5e77aa..32f92d044e82 100644 --- a/arrow-select/src/coalesce/byte_view.rs +++ b/arrow-select/src/coalesce/byte_view.rs @@ -16,10 +16,11 @@ // under the License. use crate::coalesce::InProgressArray; +use crate::filter::{FilterPredicate, FilterSelection, filter_null_mask}; use arrow_array::cast::AsArray; use arrow_array::types::ByteViewType; use arrow_array::{Array, ArrayRef, GenericByteViewArray}; -use arrow_buffer::{Buffer, NullBufferBuilder}; +use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, NullBufferBuilder}; use arrow_data::{ByteView, MAX_INLINE_VIEW_LEN}; use arrow_schema::ArrowError; use std::marker::PhantomData; @@ -111,6 +112,61 @@ impl InProgressByteViewArray { self.completed.push(next_buffer.into()); } + fn append_views_by_filter(&mut self, views: &[u128], filter: &FilterPredicate) { + let selected_count = filter.count(); + let current_len = self.views.len(); + self.views.reserve(selected_count); + + let mut written = 0; + + unsafe { + let mut out = self.views.spare_capacity_mut().as_mut_ptr().cast::(); + + match filter.selection() { + FilterSelection::None => {} + FilterSelection::All { .. } => { + std::ptr::copy_nonoverlapping(views.as_ptr(), out, selected_count); + written = selected_count; + } + FilterSelection::Slices(slices) => { + slices.for_each(|(start, end)| { + let len = end - start; + std::ptr::copy_nonoverlapping(views.as_ptr().add(start), out, len); + out = out.add(len); + written += len; + }); + } + FilterSelection::Indices(indices) => { + indices.for_each(|idx| { + out.write(*views.get_unchecked(idx)); + out = out.add(1); + written += 1; + }); + } + } + + self.views.set_len(current_len + written); + } + + debug_assert_eq!(written, selected_count); + } + + fn append_nulls_by_filter( + &mut self, + filter: &FilterPredicate, + source_nulls: Option<&NullBuffer>, + ) { + let Some((null_count, nulls)) = filter_null_mask(source_nulls, filter) else { + self.nulls.append_n_non_nulls(filter.count()); + return; + }; + + let nulls = unsafe { + NullBuffer::new_unchecked(BooleanBuffer::new(nulls, 0, filter.count()), null_count) + }; + self.nulls.append_buffer(&nulls); + } + /// Append views to self.views, updating the buffer index if necessary #[inline(never)] fn append_views_and_update_buffer_index(&mut self, views: &[u128], buffers: &[Buffer]) { @@ -325,7 +381,8 @@ impl InProgressArray for InProgressByteViewArray { }; let buffers = s.data_buffers(); - let views = &s.views().as_ref()[offset..offset + len]; + // SAFETY: copy_rows is called with ranges derived from the source array. + let views = unsafe { s.views().as_ref().get_unchecked(offset..offset + len) }; // If there are no data buffers in s (all inlined views), can append the // views/nulls and done @@ -346,6 +403,59 @@ impl InProgressArray for InProgressByteViewArray { Ok(()) } + fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(), ArrowError> { + self.ensure_capacity(); + let source = self.source.take().ok_or_else(|| { + ArrowError::InvalidArgumentError( + "Internal Error: InProgressByteViewArray: source not set".to_string(), + ) + })?; + + let s = source.array.as_byte_view::(); + + if !s.data_buffers().is_empty() { + // Restore the source taken above before returning the guard error. + self.source = Some(source); + return Err(ArrowError::InvalidArgumentError( + "Internal Error: InProgressByteViewArray::copy_rows_by_filter requires inline views" + .to_string(), + )); + } + + self.append_nulls_by_filter(filter, s.nulls()); + self.append_views_by_filter(s.views(), filter); + + self.source = Some(source); + Ok(()) + } + + fn copy_rows_by_filter_from( + &mut self, + source: ArrayRef, + filter: &FilterPredicate, + ) -> Result<(), ArrowError> { + let s = source.as_byte_view::(); + if s.data_buffers().is_empty() { + self.ensure_capacity(); + self.append_nulls_by_filter(filter, s.nulls()); + self.append_views_by_filter(s.views(), filter); + return Ok(()); + } + + // Match the filter kernel: filter views/nulls, but reuse data buffers. + let filtered = filter.filter(source.as_ref())?; + let filtered = filtered.as_byte_view::(); + + self.ensure_capacity(); + if let Some(nulls) = filtered.nulls().as_ref() { + self.nulls.append_buffer(nulls); + } else { + self.nulls.append_n_non_nulls(filter.count()); + } + self.append_views_and_update_buffer_index(filtered.views(), filtered.data_buffers()); + Ok(()) + } + fn finish(&mut self) -> Result { self.finish_current(); assert!(self.current.is_none()); @@ -405,6 +515,9 @@ impl BufferSource { #[cfg(test)] mod tests { use super::*; + use crate::filter::FilterBuilder; + use arrow_array::types::BinaryViewType; + use arrow_array::{BinaryViewArray, BooleanArray}; #[test] fn test_buffer_source() { @@ -444,4 +557,54 @@ mod tests { // Can override with larger size request assert_eq!(source.next_buffer(2_000_000).capacity(), 2_000_000); } + + #[test] + fn test_copy_rows_by_filter_rejects_non_inline_views() { + let values: Vec> = vec![Some(b"This value is longer than 12 bytes")]; + let array = BinaryViewArray::from_iter(values); + assert!(!array.data_buffers().is_empty()); + + let mut in_progress = InProgressByteViewArray::::new(1); + in_progress.set_source(Some(Arc::new(array))); + + let filter = BooleanArray::from(vec![true]); + let predicate = FilterBuilder::new(&filter).build(); + let err = in_progress.copy_rows_by_filter(&predicate).unwrap_err(); + + assert!( + err.to_string().contains("requires inline views"), + "unexpected error: {err}" + ); + } + + #[test] + fn test_copy_rows_by_filter_from_reuses_non_inline_buffers() { + let values = (0..32) + .map(|i| format!("This value is longer than 12 bytes: {i}").into_bytes()) + .collect::>(); + let array = BinaryViewArray::from_iter(values.iter().map(|v| Some(v.as_slice()))); + assert!(!array.data_buffers().is_empty()); + let source_buffer = array.data_buffers()[0].as_ptr(); + + let filter = BooleanArray::from((0..32).map(|i| i == 3 || i == 29).collect::>()); + let predicate = FilterBuilder::new(&filter).build(); + + let mut in_progress = InProgressByteViewArray::::new(32); + in_progress + .copy_rows_by_filter_from(Arc::new(array), &predicate) + .unwrap(); + let output = in_progress.finish().unwrap(); + let output = output.as_binary_view(); + + assert_eq!(output.len(), 2); + assert_eq!(output.value(0), values[3].as_slice()); + assert_eq!(output.value(1), values[29].as_slice()); + assert!( + output + .data_buffers() + .iter() + .any(|buffer| std::ptr::addr_eq(buffer.as_ptr(), source_buffer)), + "expected filtered output to reuse the source data buffer" + ); + } } diff --git a/arrow-select/src/coalesce/generic.rs b/arrow-select/src/coalesce/generic.rs index 1ea57dff929c..4fa64273ec5c 100644 --- a/arrow-select/src/coalesce/generic.rs +++ b/arrow-select/src/coalesce/generic.rs @@ -17,6 +17,7 @@ use super::InProgressArray; use crate::concat::concat; +use crate::filter::FilterPredicate; use arrow_array::ArrayRef; use arrow_schema::ArrowError; @@ -60,6 +61,16 @@ impl InProgressArray for GenericInProgressArray { Ok(()) } + fn copy_rows_by_filter_from( + &mut self, + source: ArrayRef, + filter: &FilterPredicate, + ) -> Result<(), ArrowError> { + let array = filter.filter(source.as_ref())?; + self.buffered_arrays.push(array); + Ok(()) + } + fn finish(&mut self) -> Result { // Concatenate all buffered arrays into a single array, which uses 2x // peak memory diff --git a/arrow-select/src/coalesce/primitive.rs b/arrow-select/src/coalesce/primitive.rs index a7f2fb32ce49..ac831fe089c4 100644 --- a/arrow-select/src/coalesce/primitive.rs +++ b/arrow-select/src/coalesce/primitive.rs @@ -16,9 +16,12 @@ // under the License. use crate::coalesce::InProgressArray; +use crate::filter::{ + FilterIndices, FilterPredicate, FilterSelection, FilterSlices, filter_null_mask, +}; use arrow_array::cast::AsArray; use arrow_array::{Array, ArrayRef, ArrowPrimitiveType, PrimitiveArray}; -use arrow_buffer::{NullBufferBuilder, ScalarBuffer}; +use arrow_buffer::{BooleanBuffer, NullBuffer, NullBufferBuilder, ScalarBuffer}; use arrow_schema::{ArrowError, DataType}; use std::fmt::Debug; use std::sync::Arc; @@ -59,6 +62,95 @@ impl InProgressPrimitiveArray { self.current.reserve(self.batch_size); } } + + fn append_values_by_indices( + current: &mut Vec, + values: &[T::Native], + indices: FilterIndices<'_>, + selected_count: usize, + ) { + let current_len = current.len(); + let mut written = 0; + + unsafe { + let mut out = current + .spare_capacity_mut() + .as_mut_ptr() + .cast::(); + + indices.for_each(|idx| { + // SAFETY: indices are derived from the filter predicate for this source. + out.write(*values.get_unchecked(idx)); + out = out.add(1); + written += 1; + }); + + current.set_len(current_len + written); + } + + debug_assert_eq!(written, selected_count); + } + + fn append_values_by_slices( + current: &mut Vec, + values: &[T::Native], + slices: FilterSlices<'_>, + selected_count: usize, + ) { + let current_len = current.len(); + let mut written = 0; + + unsafe { + let mut out = current + .spare_capacity_mut() + .as_mut_ptr() + .cast::(); + + slices.for_each(|(start, end)| { + let len = end - start; + // SAFETY: slices are derived from the filter predicate for this source. + std::ptr::copy_nonoverlapping(values.as_ptr().add(start), out, len); + out = out.add(len); + written += len; + }); + + current.set_len(current_len + written); + } + + debug_assert_eq!(written, selected_count); + } +} + +#[inline] +fn primitive_source( + source: &Option, +) -> Result<&PrimitiveArray, ArrowError> { + Ok(source + .as_ref() + .ok_or_else(|| { + ArrowError::InvalidArgumentError( + "Internal Error: InProgressPrimitiveArray: source not set".to_string(), + ) + })? + .as_primitive::()) +} + +fn append_filtered_nulls( + nulls: &mut NullBufferBuilder, + source_nulls: Option<&NullBuffer>, + filter: &FilterPredicate, +) { + if let Some((null_count, filtered_nulls)) = filter_null_mask(source_nulls, filter) { + let filtered_nulls = unsafe { + NullBuffer::new_unchecked( + BooleanBuffer::new(filtered_nulls, 0, filter.count()), + null_count, + ) + }; + nulls.append_buffer(&filtered_nulls); + } else { + nulls.append_n_non_nulls(filter.count()); + } } impl InProgressArray for InProgressPrimitiveArray { @@ -69,15 +161,7 @@ impl InProgressArray for InProgressPrimitiveArray fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError> { self.ensure_capacity(); - let s = self - .source - .as_ref() - .ok_or_else(|| { - ArrowError::InvalidArgumentError( - "Internal Error: InProgressPrimitiveArray: source not set".to_string(), - ) - })? - .as_primitive::(); + let s = primitive_source::(&self.source)?; // add nulls if necessary if let Some(nulls) = s.nulls().as_ref() { @@ -88,12 +172,49 @@ impl InProgressArray for InProgressPrimitiveArray }; // Copy the values + let values = s.values(); + // SAFETY: copy_rows is called with ranges derived from the source array. self.current - .extend_from_slice(&s.values()[offset..offset + len]); + .extend_from_slice(unsafe { values.get_unchecked(offset..offset + len) }); Ok(()) } + fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(), ArrowError> { + match filter.selection() { + FilterSelection::Indices(indices) => { + self.ensure_capacity(); + let s = primitive_source::(&self.source)?; + + append_filtered_nulls(&mut self.nulls, s.nulls(), filter); + self.current.reserve(filter.count()); + Self::append_values_by_indices( + &mut self.current, + s.values(), + indices, + filter.count(), + ); + Ok(()) + } + FilterSelection::Slices(slices) => { + self.ensure_capacity(); + let s = primitive_source::(&self.source)?; + + append_filtered_nulls(&mut self.nulls, s.nulls(), filter); + self.current.reserve(filter.count()); + Self::append_values_by_slices( + &mut self.current, + s.values(), + slices, + filter.count(), + ); + Ok(()) + } + // Other selection shapes reuse the generic copy_rows path. + selection => self.copy_rows_by_selection(selection), + } + } + fn finish(&mut self) -> Result { // take and reset the current values and nulls let values = std::mem::take(&mut self.current); @@ -106,3 +227,82 @@ impl InProgressArray for InProgressPrimitiveArray Ok(Arc::new(array)) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::filter::FilterBuilder; + use arrow_array::types::Int32Type; + use arrow_array::{BooleanArray, Int32Array}; + + #[test] + fn test_copy_rows_by_filter_index_iterator() { + let source = + Int32Array::from_iter((0..21).map(|idx| if idx % 5 == 0 { None } else { Some(idx) })); + let filter = BooleanArray::from_iter( + (0..21).map(|idx| Some(matches!(idx, 0 | 1 | 2 | 3 | 5 | 8 | 13))), + ); + let predicate = FilterBuilder::new(&filter).build(); + let FilterSelection::Indices(indices) = predicate.selection() else { + panic!("expected index iterator selection"); + }; + let mut selected_indices = Vec::new(); + indices.for_each(|idx| selected_indices.push(idx)); + assert_eq!(selected_indices, vec![0, 1, 2, 3, 5, 8, 13]); + + let mut in_progress = InProgressPrimitiveArray::::new(7, DataType::Int32); + in_progress.set_source(Some(Arc::new(source))); + in_progress.copy_rows_by_filter(&predicate).unwrap(); + + let result = in_progress.finish().unwrap(); + let result = result.as_primitive::(); + let expected = Int32Array::from(vec![ + None, + Some(1), + Some(2), + Some(3), + None, + Some(8), + Some(13), + ]); + assert_eq!(result, &expected); + } + + #[test] + fn test_copy_rows_by_filter_slice_iterator() { + let source = + Int32Array::from_iter((0..16).map(|idx| if idx % 5 == 0 { None } else { Some(idx) })); + let filter = BooleanArray::from_iter((0..16).map(|idx| Some(!matches!(idx, 3 | 9)))); + let predicate = FilterBuilder::new(&filter).build(); + let FilterSelection::Slices(slices) = predicate.selection() else { + panic!("expected slice iterator selection"); + }; + let mut selected_slices = Vec::new(); + slices.for_each(|slice| selected_slices.push(slice)); + assert_eq!(selected_slices, vec![(0, 3), (4, 9), (10, 16)]); + + let mut in_progress = InProgressPrimitiveArray::::new(14, DataType::Int32); + in_progress.set_source(Some(Arc::new(source))); + in_progress.copy_rows_by_filter(&predicate).unwrap(); + + let result = in_progress.finish().unwrap(); + let result = result.as_primitive::(); + let expected = Int32Array::from(vec![ + None, + Some(1), + Some(2), + Some(4), + None, + Some(6), + Some(7), + Some(8), + None, + Some(11), + Some(12), + Some(13), + Some(14), + None, + ]); + assert_eq!(result, &expected); + } +} diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index fcbce82d5d9d..b1f3a21a3ed3 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -81,13 +81,13 @@ impl Iterator for SlicesIterator<'_> { /// /// This provides the best performance on most predicates, apart from those which keep /// large runs and therefore favour [`SlicesIterator`] -struct IndexIterator<'a> { +pub(crate) struct IndexIterator<'a> { remaining: usize, iter: BitIndexIterator<'a>, } impl<'a> IndexIterator<'a> { - fn new(filter: &'a BooleanArray, remaining: usize) -> Self { + pub(crate) fn new(filter: &'a BooleanArray, remaining: usize) -> Self { assert_eq!(filter.null_count(), 0); let iter = filter.values().set_indices(); Self { remaining, iter } @@ -137,11 +137,6 @@ impl Iterator for IndexIterator<'_> { } } -/// Counts the number of set bits in `filter` -fn filter_count(filter: &BooleanArray) -> usize { - filter.values().count_set_bits() -} - /// Convert all null values in `BooleanArray` to `false` /// /// This is useful for filter-like operations which select only `true` @@ -259,12 +254,15 @@ pub struct FilterBuilder { impl FilterBuilder { /// Create a new [`FilterBuilder`] that can be used to construct a [`FilterPredicate`] pub fn new(filter: &BooleanArray) -> Self { + Self::new_with_count(filter, filter.true_count()) + } + + pub(crate) fn new_with_count(filter: &BooleanArray, count: usize) -> Self { let filter = match filter.null_count() { 0 => filter.clone(), _ => prep_null_mask_filter(filter), }; - let count = filter_count(&filter); let strategy = IterationStrategy::default_strategy(filter.len(), count); Self { @@ -366,6 +364,66 @@ impl IterationStrategy { } } +/// Borrowed description of which rows a [`FilterPredicate`] selects. +pub(crate) enum FilterSelection<'a> { + None, + All { len: usize }, + Slices(FilterSlices<'a>), + Indices(FilterIndices<'a>), +} + +pub(crate) type FilterSlices<'a> = + FilterIterator>, SlicesIterator<'a>>; + +pub(crate) type FilterIndices<'a> = + FilterIterator>, IndexIterator<'a>>; + +/// Holds either materialized rows or a lazy iterator. +/// +/// This does not implement [`Iterator`] on purpose. Callers use +/// [`Self::for_each`] or [`Self::try_for_each`] so the enum is matched once +/// before the loop, not once per row in `next`. +pub(crate) enum FilterIterator { + Materialized(M), + Lazy(I), +} + +impl FilterIterator +where + M: Iterator, + I: Iterator, +{ + pub(crate) fn for_each(self, f: F) + where + F: FnMut(M::Item), + { + match self { + Self::Materialized(iter) => iter.for_each(f), + Self::Lazy(iter) => iter.for_each(f), + } + } + + pub(crate) fn try_for_each(self, mut f: F) -> Result<(), E> + where + F: FnMut(M::Item) -> Result<(), E>, + { + match self { + Self::Materialized(iter) => { + for item in iter { + f(item)?; + } + } + Self::Lazy(iter) => { + for item in iter { + f(item)?; + } + } + } + + Ok(()) + } +} + /// A filtering predicate that can be applied to an [`Array`] #[derive(Debug)] pub struct FilterPredicate { @@ -410,6 +468,25 @@ impl FilterPredicate { self.count } + pub(crate) fn selection(&self) -> FilterSelection<'_> { + match &self.strategy { + IterationStrategy::None => FilterSelection::None, + IterationStrategy::All => FilterSelection::All { len: self.count }, + IterationStrategy::Slices(slices) => { + FilterSelection::Slices(FilterIterator::Materialized(slices.iter().copied())) + } + IterationStrategy::SlicesIterator => { + FilterSelection::Slices(FilterIterator::Lazy(SlicesIterator::new(&self.filter))) + } + IterationStrategy::Indices(indices) => { + FilterSelection::Indices(FilterIterator::Materialized(indices.iter().copied())) + } + IterationStrategy::IndexIterator => FilterSelection::Indices(FilterIterator::Lazy( + IndexIterator::new(&self.filter, self.count), + )), + } + } + /// Filters the given `nulls` buffer using this predicate. /// /// Returns `None` when there is nothing to track in the output, either @@ -575,7 +652,7 @@ where /// `Some((null_count, null_buffer))` where `null_count` is the number of nulls /// in the filtered output, and `null_buffer` is the filtered null buffer /// -fn filter_null_mask( +pub(crate) fn filter_null_mask( nulls: Option<&NullBuffer>, predicate: &FilterPredicate, ) -> Option<(usize, Buffer)> { @@ -649,7 +726,10 @@ fn filter_boolean(array: &BooleanArray, predicate: &FilterPredicate) -> BooleanA } #[inline(never)] -fn filter_native(values: &[T], predicate: &FilterPredicate) -> Buffer { +pub(crate) fn filter_native( + values: &[T], + predicate: &FilterPredicate, +) -> Buffer { assert!(values.len() >= predicate.filter.len()); match &predicate.strategy { @@ -1571,7 +1651,7 @@ mod tests { fn test_slice_iterator_bits() { let filter_values = (0..64).map(|i| i == 1).collect::>(); let filter = BooleanArray::from(filter_values); - let filter_count = filter_count(&filter); + let filter_count = filter.true_count(); let iter = SlicesIterator::new(&filter); let chunks = iter.collect::>(); @@ -1584,7 +1664,7 @@ mod tests { fn test_slice_iterator_bits1() { let filter_values = (0..64).map(|i| i != 1).collect::>(); let filter = BooleanArray::from(filter_values); - let filter_count = filter_count(&filter); + let filter_count = filter.true_count(); let iter = SlicesIterator::new(&filter); let chunks = iter.collect::>(); @@ -1597,7 +1677,7 @@ mod tests { fn test_slice_iterator_chunk_and_bits() { let filter_values = (0..130).map(|i| i % 62 != 0).collect::>(); let filter = BooleanArray::from(filter_values); - let filter_count = filter_count(&filter); + let filter_count = filter.true_count(); let iter = SlicesIterator::new(&filter); let chunks = iter.collect::>(); @@ -1606,6 +1686,43 @@ mod tests { assert_eq!(filter_count, 61 + 61 + 5); } + #[test] + fn test_filter_selection_iterators() { + let slices = [(0, 2), (4, 5)]; + let mut ranges = Vec::new(); + let selection: FilterSlices<'_> = FilterIterator::Materialized(slices.iter().copied()); + selection.for_each(|range| ranges.push(range)); + assert_eq!(ranges, slices); + + let filter = BooleanArray::from(vec![true, true, false, false, true]); + let mut ranges = Vec::new(); + let selection: FilterSlices<'_> = FilterIterator::Lazy(SlicesIterator::new(&filter)); + selection + .try_for_each(|range| { + ranges.push(range); + Ok::<(), ArrowError>(()) + }) + .unwrap(); + assert_eq!(ranges, vec![(0, 2), (4, 5)]); + + let indices = [1, 3, 5]; + let mut selected = Vec::new(); + let selection: FilterIndices<'_> = FilterIterator::Materialized(indices.iter().copied()); + selection.for_each(|idx| selected.push(idx)); + assert_eq!(selected, indices); + + let filter = BooleanArray::from(vec![false, true, false, true]); + let mut selected = Vec::new(); + let selection: FilterIndices<'_> = FilterIterator::Lazy(IndexIterator::new(&filter, 2)); + selection + .try_for_each(|idx| { + selected.push(idx); + Ok::<(), ArrowError>(()) + }) + .unwrap(); + assert_eq!(selected, vec![1, 3]); + } + #[test] fn test_null_mask() { let a = Int64Array::from(vec![Some(1), Some(2), None]); @@ -1690,7 +1807,7 @@ mod tests { .flat_map(|(start, end)| start..end) .collect(); - let count = filter_count(&filter); + let count = filter.true_count(); let index_bits: Vec<_> = IndexIterator::new(&filter, count).collect(); let expected_bits: Vec<_> = bools From 8e010ef3c7b0c9955096daa2cd0c2659f7eb2a41 Mon Sep 17 00:00:00 2001 From: cl Date: Thu, 4 Jun 2026 23:10:13 +0800 Subject: [PATCH 2/2] Tune sparse filter coalescing threshold Signed-off-by: cl --- arrow-select/src/coalesce.rs | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs index 4c0a21990e61..0d5199941fc2 100644 --- a/arrow-select/src/coalesce.rs +++ b/arrow-select/src/coalesce.rs @@ -42,6 +42,17 @@ fn has_sparse_filter_copy(data_type: &DataType) -> bool { data_type.is_primitive() || matches!(data_type, DataType::Utf8View | DataType::BinaryView) } +/// Maximum selected row fraction for the fused sparse-filter copy path. +/// +/// Shared benchmark results show this path helps low-selectivity filters, but +/// can regress once the filter becomes denser. Keep this as a cheap integer +/// threshold on the hot path: `selected_count <= filter_len / 16`. +const SPARSE_FILTER_COPY_MAX_SELECTIVITY_DENOMINATOR: usize = 16; + +fn should_use_sparse_filter_copy(filter_len: usize, selected_count: usize) -> bool { + selected_count <= filter_len / SPARSE_FILTER_COPY_MAX_SELECTIVITY_DENOMINATOR +} + /// Concatenate multiple [`RecordBatch`]es /// /// Implements the common pattern of incrementally creating output @@ -627,9 +638,12 @@ impl BatchCoalescer { .biggest_coalesce_batch_size .is_some_and(|limit| selected_count > limit); let does_not_fit_buffer = selected_count > self.target_batch_size - self.buffered_rows; + let should_materialize_filter = exceeds_coalesce_limit + || self.has_non_specialized_filter_columns + || does_not_fit_buffer + || !should_use_sparse_filter_copy(filter_len, selected_count); - if exceeds_coalesce_limit || self.has_non_specialized_filter_columns || does_not_fit_buffer - { + if should_materialize_filter { // Use materialized filtering when sparse per-column copying is unavailable. let predicate = Self::filter_predicate_for_batch(&batch, filter, selected_count); let filtered_batch = predicate.filter_record_batch(&batch)?; @@ -781,6 +795,14 @@ mod tests { .run(); } + #[test] + fn test_sparse_filter_copy_threshold() { + assert!(should_use_sparse_filter_copy(8192, 8)); + assert!(should_use_sparse_filter_copy(8192, 81)); + assert!(!should_use_sparse_filter_copy(8192, 819)); + assert!(!should_use_sparse_filter_copy(8192, 6553)); + } + #[test] fn test_single_large_batch_greater_than_target() { // test a single large batch