diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs index 8fe88fb8c377..fff5b2086a70 100644 --- a/arrow-select/src/coalesce.rs +++ b/arrow-select/src/coalesce.rs @@ -1197,6 +1197,245 @@ mod tests { .run(); } + #[test] + fn test_binary_view_filtered() { + let values: Vec> = vec![ + Some(b"foo"), + None, + Some(b"A longer string that is more than 12 bytes"), + ]; + + let binary_view = + BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000)); + let batch = + RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap(); + let filter = sparse_filter(1000); + + Test::new("coalesce_binary_view_filtered") + .with_batch(batch.clone()) + .with_filter(filter.clone()) + .with_batch(batch) + .with_filter(filter) + .with_batch_size(256) + .with_expected_output_sizes(vec![250]) + .run(); + } + + #[test] + fn test_binary_view_filtered_inline() { + let values: Vec> = vec![Some(b"foo"), None, Some(b"barbaz")]; + + let binary_view = + BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000)); + let batch = + RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap(); + let filter = sparse_filter(1000); + + Test::new("coalesce_binary_view_filtered_inline") + .with_batch(batch.clone()) + .with_filter(filter.clone()) + .with_batch(batch) + .with_filter(filter) + .with_batch_size(300) + .with_expected_output_sizes(vec![250]) + .run(); + } + + #[test] + fn test_string_view_filtered_inline() { + let values: Vec> = vec![Some("foo"), None, Some("barbaz")]; + + let string_view = + StringViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000)); + let batch = + RecordBatch::try_from_iter(vec![("c0", Arc::new(string_view) as ArrayRef)]).unwrap(); + let filter = sparse_filter(1000); + + Test::new("coalesce_string_view_filtered_inline") + .with_batch(batch.clone()) + .with_filter(filter.clone()) + .with_batch(batch) + .with_filter(filter) + .with_batch_size(300) + .with_expected_output_sizes(vec![250]) + .run(); + } + + #[test] + fn test_mixed_inline_binary_view_filtered() { + let int_values = + Int32Array::from_iter((0..1000).map(|v| if v % 5 == 0 { None } else { Some(v) })); + let float_values = arrow_array::Float64Array::from_iter((0..1000).map(|v| Some(v as f64))); + let binary_values: Vec> = vec![Some(b"foo"), None, Some(b"barbaz")]; + let binary_view = BinaryViewArray::from_iter( + std::iter::repeat(binary_values.iter()).flatten().take(1000), + ); + + let batch = RecordBatch::try_from_iter(vec![ + ("i", Arc::new(int_values) as ArrayRef), + ("f", Arc::new(float_values) as ArrayRef), + ("b", Arc::new(binary_view) as ArrayRef), + ]) + .unwrap(); + + let filter = sparse_filter(1000); + + Test::new("coalesce_mixed_inline_binary_view_filtered") + .with_batch(batch.clone()) + .with_filter(filter.clone()) + .with_batch(batch) + .with_filter(filter) + .with_batch_size(300) + .with_expected_output_sizes(vec![250]) + .run(); + } + + #[test] + fn test_mixed_inline_string_view_filtered() { + let int_values = + Int32Array::from_iter((0..1000).map(|v| if v % 5 == 0 { None } else { Some(v) })); + let float_values = arrow_array::Float64Array::from_iter((0..1000).map(|v| Some(v as f64))); + let string_values: Vec> = vec![Some("foo"), None, Some("barbaz")]; + let string_view = StringViewArray::from_iter( + std::iter::repeat(string_values.iter()).flatten().take(1000), + ); + + let batch = RecordBatch::try_from_iter(vec![ + ("i", Arc::new(int_values) as ArrayRef), + ("f", Arc::new(float_values) as ArrayRef), + ("s", Arc::new(string_view) as ArrayRef), + ]) + .unwrap(); + + let filter = sparse_filter(1000); + + Test::new("coalesce_mixed_inline_string_view_filtered") + .with_batch(batch.clone()) + .with_filter(filter.clone()) + .with_batch(batch) + .with_filter(filter) + .with_batch_size(300) + .with_expected_output_sizes(vec![250]) + .run(); + } + + #[test] + fn test_inline_binary_view_sparse() { + // All-inline binary views (<=12 bytes) + 5% selectivity + let values: Vec> = vec![Some(b"foo"), None, Some(b"barbaz")]; + let binary_view = + BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000)); + let batch = + RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap(); + let filter = very_sparse_filter(1000); + + Test::new("inline_binary_view_sparse") + .with_batch(batch.clone()) + .with_filter(filter.clone()) + .with_batch(batch) + .with_filter(filter) + .with_batch_size(1024) + .with_expected_output_sizes(vec![100]) + .run(); + } + + #[test] + fn test_inline_string_view_sparse() { + let values: Vec> = vec![Some("foo"), None, Some("barbaz")]; + let string_view = + StringViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000)); + let batch = + RecordBatch::try_from_iter(vec![("c0", Arc::new(string_view) as ArrayRef)]).unwrap(); + let filter = very_sparse_filter(1000); + + Test::new("inline_string_view_sparse") + .with_batch(batch.clone()) + .with_filter(filter.clone()) + .with_batch(batch) + .with_filter(filter) + .with_batch_size(1024) + .with_expected_output_sizes(vec![100]) + .run(); + } + + #[test] + fn test_inline_mixed_sparse() { + // Mixing primitives with inline views exercises materialized `Indices` + // selection for both the primitive and the byte-view. + let int_values = + Int32Array::from_iter((0..1000).map(|v| if v % 5 == 0 { None } else { Some(v) })); + let float_values = arrow_array::Float64Array::from_iter((0..1000).map(|v| Some(v as f64))); + let string_values: Vec> = vec![Some("foo"), None, Some("barbaz")]; + let string_view = StringViewArray::from_iter( + std::iter::repeat(string_values.iter()).flatten().take(1000), + ); + let binary_values: Vec> = vec![Some(b"x"), None, Some(b"abcdef")]; + let binary_view = BinaryViewArray::from_iter( + std::iter::repeat(binary_values.iter()).flatten().take(1000), + ); + + let batch = RecordBatch::try_from_iter(vec![ + ("i", Arc::new(int_values) as ArrayRef), + ("f", Arc::new(float_values) as ArrayRef), + ("s", Arc::new(string_view) as ArrayRef), + ("b", Arc::new(binary_view) as ArrayRef), + ]) + .unwrap(); + let filter = very_sparse_filter(1000); + + Test::new("inline_mixed_sparse") + .with_batch(batch.clone()) + .with_filter(filter.clone()) + .with_batch(batch) + .with_filter(filter) + .with_batch_size(1024) + .with_expected_output_sizes(vec![100]) + .run(); + } + + #[test] + fn test_inline_crosses_target_batch_size() { + // Each 1000-row batch selects 50 rows; target_batch_size 100 + // exercises intermediate batch flushing batch (not just the final + // flush). + let values: Vec> = vec![Some(b"foo"), None, Some(b"barbaz")]; + let make_batch = || { + let binary_view = + BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000)); + RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap() + }; + let filter = very_sparse_filter(1000); + + Test::new("inline_crosses_target_batch_size") + .with_batch(make_batch()) + .with_filter(filter.clone()) + .with_batch(make_batch()) + .with_filter(filter.clone()) + .with_batch(make_batch()) + .with_filter(filter) + .with_batch_size(100) + .with_expected_output_sizes(vec![100, 50]) + .run(); + } + + #[test] + fn test_inline_filter_rejects_filter_longer_than_batch() { + let values: Vec> = vec![Some(b"foo"), Some(b"bar")]; + let binary_view = BinaryViewArray::from_iter(values); + let batch = + RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap(); + let filter = BooleanArray::from(vec![true, false, true]); + + let mut coalescer = BatchCoalescer::new(batch.schema(), 100); + let result = coalescer.push_batch_with_filter(batch, &filter); + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!( + err.contains("Filter predicate of length 3 is larger than target array of length 2"), + "unexpected error: {err}" + ); + } + #[derive(Debug, Clone, PartialEq)] struct ExpectedLayout { len: usize, @@ -1694,6 +1933,18 @@ mod tests { .expect("column is not a string view") } + /// Filter that selects 12.5% of the rows (`idx % 8 == 0`) + fn sparse_filter(len: usize) -> BooleanArray { + BooleanArray::from_iter((0..len).map(|idx| Some(idx % 8 == 0))) + } + + /// Filter that selects 5% of the rows (`idx % 20 == 0`) + /// + /// Different code paths are taken for very sparse filters + fn very_sparse_filter(len: usize) -> BooleanArray { + BooleanArray::from_iter((0..len).map(|idx| Some(idx % 20 == 0))) + } + /// Normalize the `RecordBatch` so that the memory layout is consistent /// (e.g. StringArray is compacted). fn normalize_batch(batch: RecordBatch) -> RecordBatch { diff --git a/parquet-testing b/parquet-testing index 5a6cf84678df..a3d96a65e11e 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit 5a6cf84678df3af65c231109b78b86ba9bf495df +Subproject commit a3d96a65e11e2bbca7d22a894e8313ede90a33a3