diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index a212122401f9..91a2726ae165 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -1461,6 +1461,39 @@ mod tests { } } + fn make_single_i64_ndv_stats( + distinct_count: Precision, + min_value: Option, + max_value: Option, + ) -> Statistics { + let to_precision = |value| Precision::Exact(ScalarValue::Int64(Some(value))); + + Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_distinct_count(distinct_count) + .with_min_value( + min_value.map(to_precision).unwrap_or(Precision::Absent), + ) + .with_max_value( + max_value.map(to_precision).unwrap_or(Precision::Absent), + ), + ) + } + + fn merge_single_i64_ndv_distinct_count( + left: Statistics, + right: Statistics, + ) -> Precision { + let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]); + + Statistics::try_merge_iter([&left, &right], &schema) + .unwrap() + .column_statistics[0] + .distinct_count + } + #[test] fn test_try_merge() { // Create a schema with two columns @@ -1906,6 +1939,185 @@ mod tests { ); } + #[test] + fn test_try_merge_ndv_original_union_edge_cases() { + struct NdvTestCase { + name: &'static str, + left_ndv: Precision, + left_min: Option, + left_max: Option, + right_ndv: Precision, + right_min: Option, + right_max: Option, + expected: Precision, + } + + let cases = vec![ + NdvTestCase { + name: "disjoint ranges", + left_ndv: Precision::Exact(5), + left_min: Some(0), + left_max: Some(10), + right_ndv: Precision::Exact(3), + right_min: Some(20), + right_max: Some(30), + expected: Precision::Inexact(8), + }, + NdvTestCase { + name: "identical ranges", + left_ndv: Precision::Exact(10), + left_min: Some(0), + left_max: Some(100), + right_ndv: Precision::Exact(8), + right_min: Some(0), + right_max: Some(100), + expected: Precision::Inexact(10), + }, + NdvTestCase { + name: "partial overlap", + left_ndv: Precision::Exact(100), + left_min: Some(0), + left_max: Some(100), + right_ndv: Precision::Exact(50), + right_min: Some(50), + right_max: Some(150), + expected: Precision::Inexact(125), + }, + NdvTestCase { + name: "right contained in left", + left_ndv: Precision::Exact(100), + left_min: Some(0), + left_max: Some(100), + right_ndv: Precision::Exact(50), + right_min: Some(25), + right_max: Some(75), + expected: Precision::Inexact(100), + }, + NdvTestCase { + name: "same constant value", + left_ndv: Precision::Exact(1), + left_min: Some(5), + left_max: Some(5), + right_ndv: Precision::Exact(1), + right_min: Some(5), + right_max: Some(5), + expected: Precision::Inexact(1), + }, + NdvTestCase { + name: "different constant values", + left_ndv: Precision::Exact(1), + left_min: Some(5), + left_max: Some(5), + right_ndv: Precision::Exact(1), + right_min: Some(10), + right_max: Some(10), + expected: Precision::Inexact(2), + }, + NdvTestCase { + name: "left constant within right range", + left_ndv: Precision::Exact(1), + left_min: Some(5), + left_max: Some(5), + right_ndv: Precision::Exact(10), + right_min: Some(0), + right_max: Some(10), + expected: Precision::Inexact(10), + }, + NdvTestCase { + name: "left constant outside right range", + left_ndv: Precision::Exact(1), + left_min: Some(20), + left_max: Some(20), + right_ndv: Precision::Exact(10), + right_min: Some(0), + right_max: Some(10), + expected: Precision::Inexact(11), + }, + NdvTestCase { + name: "right constant within left range", + left_ndv: Precision::Exact(10), + left_min: Some(0), + left_max: Some(10), + right_ndv: Precision::Exact(1), + right_min: Some(5), + right_max: Some(5), + expected: Precision::Inexact(10), + }, + NdvTestCase { + name: "right constant outside left range", + left_ndv: Precision::Exact(10), + left_min: Some(0), + left_max: Some(10), + right_ndv: Precision::Exact(1), + right_min: Some(20), + right_max: Some(20), + expected: Precision::Inexact(11), + }, + NdvTestCase { + name: "missing bounds exact plus exact", + left_ndv: Precision::Exact(10), + left_min: None, + left_max: None, + right_ndv: Precision::Exact(5), + right_min: None, + right_max: None, + // Shared merge falls back to max without bounds. + expected: Precision::Inexact(10), + }, + NdvTestCase { + name: "missing bounds exact plus inexact", + left_ndv: Precision::Exact(10), + left_min: None, + left_max: None, + right_ndv: Precision::Inexact(5), + right_min: None, + right_max: None, + // Shared merge falls back to max without bounds. + expected: Precision::Inexact(10), + }, + NdvTestCase { + name: "missing bounds inexact plus inexact", + left_ndv: Precision::Inexact(7), + left_min: None, + left_max: None, + right_ndv: Precision::Inexact(3), + right_min: None, + right_max: None, + // Shared merge falls back to max without bounds. + expected: Precision::Inexact(7), + }, + NdvTestCase { + name: "exact plus absent", + left_ndv: Precision::Exact(10), + left_min: None, + left_max: None, + right_ndv: Precision::Absent, + right_min: None, + right_max: None, + expected: Precision::Absent, + }, + NdvTestCase { + name: "inexact plus absent", + left_ndv: Precision::Inexact(4), + left_min: None, + left_max: None, + right_ndv: Precision::Absent, + right_min: None, + right_max: None, + expected: Precision::Absent, + }, + ]; + + for case in cases { + let actual = merge_single_i64_ndv_distinct_count( + make_single_i64_ndv_stats(case.left_ndv, case.left_min, case.left_max), + make_single_i64_ndv_stats(case.right_ndv, case.right_min, case.right_max), + ); + + assert_eq!(actual, case.expected, "case {} failed", case.name); + } + } + #[test] fn test_with_fetch_basic_preservation() { // Test that column statistics and byte size are preserved (as inexact) when applying fetch diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 20295b7e6fac..1306eef119ad 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -27,9 +27,8 @@ use std::sync::Arc; use std::task::{Context, Poll}; use super::{ - ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, - ExecutionPlanProperties, Partitioning, PlanProperties, RecordBatchStream, - SendableRecordBatchStream, Statistics, + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning, + PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, metrics::{ExecutionPlanMetricsSet, MetricsSet}, }; use crate::check_if_same_properties; @@ -49,7 +48,6 @@ use crate::stream::ObservedStream; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::config::ConfigOptions; -use datafusion_common::stats::{Precision, estimate_ndv_with_overlap}; use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ Result, assert_or_internal_err, exec_err, internal_datafusion_err, @@ -342,23 +340,12 @@ impl ExecutionPlan for UnionExec { // If we get here, the partition index is out of bounds Ok(Arc::new(Statistics::new_unknown(&self.schema()))) } else { - // Collect statistics from all inputs - let stats = self - .inputs - .iter() - .map(|input_exec| { - input_exec - .partition_statistics(None) - .map(Arc::unwrap_or_clone) - }) - .collect::>>()?; - - Ok(Arc::new( - stats - .into_iter() - .reduce(stats_union) - .unwrap_or_else(|| Statistics::new_unknown(&self.schema())), - )) + let schema = self.schema(); + Ok(Arc::new(merge_input_statistics( + &self.inputs, + None, + schema.as_ref(), + )?)) } } @@ -669,21 +656,12 @@ impl ExecutionPlan for InterleaveExec { } fn partition_statistics(&self, partition: Option) -> Result> { - let stats = self - .inputs - .iter() - .map(|stat| { - stat.partition_statistics(partition) - .map(Arc::unwrap_or_clone) - }) - .collect::>>()?; - - Ok(Arc::new( - stats - .into_iter() - .reduce(stats_union) - .unwrap_or_else(|| Statistics::new_unknown(&self.schema())), - )) + let schema = self.schema(); + Ok(Arc::new(merge_input_statistics( + &self.inputs, + partition, + schema.as_ref(), + )?)) } fn benefits_from_input_partitioning(&self) -> Vec { @@ -842,69 +820,35 @@ impl Stream for CombinedRecordBatchStream { } } -fn col_stats_union( - mut left: ColumnStatistics, - right: &ColumnStatistics, -) -> ColumnStatistics { - left.distinct_count = union_distinct_count(&left, right); - left.min_value = left.min_value.min(&right.min_value); - left.max_value = left.max_value.max(&right.max_value); - left.sum_value = left.sum_value.add_for_sum(&right.sum_value); - left.null_count = left.null_count.add(&right.null_count); - - left -} - -fn union_distinct_count( - left: &ColumnStatistics, - right: &ColumnStatistics, -) -> Precision { - let (ndv_left, ndv_right) = match ( - left.distinct_count.get_value(), - right.distinct_count.get_value(), - ) { - (Some(&l), Some(&r)) => (l, r), - _ => return Precision::Absent, - }; - - // Even with exact inputs, the union NDV depends on how - // many distinct values are shared between the left and right. - // We can only estimate this via range overlap. Thus both paths - // below return `Inexact`. - if let Some(ndv) = estimate_ndv_with_overlap(left, right, ndv_left, ndv_right) { - return Precision::Inexact(ndv); - } - - Precision::Inexact(ndv_left + ndv_right) -} +fn merge_input_statistics( + inputs: &[Arc], + partition: Option, + schema: &Schema, +) -> Result { + let stats = inputs + .iter() + .map(|input| { + input + .partition_statistics(partition) + .map(Arc::unwrap_or_clone) + }) + .collect::>>()?; -fn stats_union(mut left: Statistics, right: Statistics) -> Statistics { - let Statistics { - num_rows: right_num_rows, - total_byte_size: right_total_bytes, - column_statistics: right_column_statistics, - .. - } = right; - left.num_rows = left.num_rows.add(&right_num_rows); - left.total_byte_size = left.total_byte_size.add(&right_total_bytes); - left.column_statistics = left - .column_statistics - .into_iter() - .zip(right_column_statistics.iter()) - .map(|(a, b)| col_stats_union(a, b)) - .collect::>(); - left + Statistics::try_merge_iter(stats.iter(), schema) } #[cfg(test)] mod tests { use super::*; use crate::collect; + use crate::repartition::RepartitionExec; + use crate::test::exec::StatisticsExec; use crate::test::{self, TestMemoryExec}; use arrow::compute::SortOptions; use arrow::datatypes::DataType; - use datafusion_common::ScalarValue; + use datafusion_common::stats::Precision; + use datafusion_common::{ColumnStatistics, ScalarValue}; use datafusion_physical_expr::equivalence::convert_to_orderings; use datafusion_physical_expr::expressions::col; @@ -959,294 +903,113 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_stats_union() { - let left = Statistics { - num_rows: Precision::Exact(5), - total_byte_size: Precision::Exact(23), - column_statistics: vec![ - ColumnStatistics { - distinct_count: Precision::Exact(5), - max_value: Precision::Exact(ScalarValue::Int64(Some(21))), - min_value: Precision::Exact(ScalarValue::Int64(Some(-4))), - sum_value: Precision::Exact(ScalarValue::Int64(Some(42))), - null_count: Precision::Exact(0), - byte_size: Precision::Absent, - }, - ColumnStatistics { - distinct_count: Precision::Exact(1), - max_value: Precision::Exact(ScalarValue::from("x")), - min_value: Precision::Exact(ScalarValue::from("a")), - sum_value: Precision::Absent, - null_count: Precision::Exact(3), - byte_size: Precision::Absent, - }, - ColumnStatistics { - distinct_count: Precision::Absent, - max_value: Precision::Exact(ScalarValue::Float32(Some(1.1))), - min_value: Precision::Exact(ScalarValue::Float32(Some(0.1))), - sum_value: Precision::Exact(ScalarValue::Float32(Some(42.0))), - null_count: Precision::Absent, - byte_size: Precision::Absent, - }, - ], - }; + fn stats_merge_inputs() -> (SchemaRef, Statistics, Statistics, Statistics) { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])); + + let left = Statistics::default() + .with_num_rows(Precision::Exact(5)) + .with_total_byte_size(Precision::Exact(23)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_distinct_count(Precision::Exact(5)) + .with_min_value(Precision::Exact(ScalarValue::UInt32(Some(1)))) + .with_max_value(Precision::Exact(ScalarValue::UInt32(Some(21)))) + .with_sum_value(Precision::Exact(ScalarValue::UInt32(Some(42)))) + .with_null_count(Precision::Exact(0)) + .with_byte_size(Precision::Exact(40)), + ); - let right = Statistics { - num_rows: Precision::Exact(7), - total_byte_size: Precision::Exact(29), - column_statistics: vec![ - ColumnStatistics { - distinct_count: Precision::Exact(3), - max_value: Precision::Exact(ScalarValue::Int64(Some(34))), - min_value: Precision::Exact(ScalarValue::Int64(Some(1))), - sum_value: Precision::Exact(ScalarValue::Int64(Some(42))), - null_count: Precision::Exact(1), - byte_size: Precision::Absent, - }, - ColumnStatistics { - distinct_count: Precision::Absent, - max_value: Precision::Exact(ScalarValue::from("c")), - min_value: Precision::Exact(ScalarValue::from("b")), - sum_value: Precision::Absent, - null_count: Precision::Absent, - byte_size: Precision::Absent, - }, - ColumnStatistics { - distinct_count: Precision::Absent, - max_value: Precision::Absent, - min_value: Precision::Absent, - sum_value: Precision::Absent, - null_count: Precision::Absent, - byte_size: Precision::Absent, - }, - ], - }; + let right = Statistics::default() + .with_num_rows(Precision::Exact(7)) + .with_total_byte_size(Precision::Exact(29)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_distinct_count(Precision::Exact(3)) + .with_min_value(Precision::Exact(ScalarValue::UInt32(Some(22)))) + .with_max_value(Precision::Exact(ScalarValue::UInt32(Some(34)))) + .with_sum_value(Precision::Exact(ScalarValue::UInt32(Some(8)))) + .with_null_count(Precision::Exact(1)) + .with_byte_size(Precision::Exact(60)), + ); - let result = stats_union(left, right); - let expected = Statistics { - num_rows: Precision::Exact(12), - total_byte_size: Precision::Exact(52), - column_statistics: vec![ - ColumnStatistics { - distinct_count: Precision::Inexact(6), - max_value: Precision::Exact(ScalarValue::Int64(Some(34))), - min_value: Precision::Exact(ScalarValue::Int64(Some(-4))), - sum_value: Precision::Exact(ScalarValue::Int64(Some(84))), - null_count: Precision::Exact(1), - byte_size: Precision::Absent, - }, - ColumnStatistics { - distinct_count: Precision::Absent, - max_value: Precision::Exact(ScalarValue::from("x")), - min_value: Precision::Exact(ScalarValue::from("a")), - sum_value: Precision::Absent, - null_count: Precision::Absent, - byte_size: Precision::Absent, - }, - ColumnStatistics { - distinct_count: Precision::Absent, - max_value: Precision::Absent, - min_value: Precision::Absent, - sum_value: Precision::Absent, - null_count: Precision::Absent, - byte_size: Precision::Absent, - }, - ], - }; + let expected = Statistics::default() + .with_num_rows(Precision::Exact(12)) + .with_total_byte_size(Precision::Exact(52)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_distinct_count(Precision::Inexact(8)) + .with_min_value(Precision::Exact(ScalarValue::UInt32(Some(1)))) + .with_max_value(Precision::Exact(ScalarValue::UInt32(Some(34)))) + .with_sum_value(Precision::Exact(ScalarValue::UInt64(Some(50)))) + .with_null_count(Precision::Exact(1)) + .with_byte_size(Precision::Exact(100)), + ); - assert_eq!(result, expected); + (schema, left, right, expected) } #[test] - fn test_union_distinct_count() { - // (left_ndv, left_min, left_max, right_ndv, right_min, right_max, expected) - type NdvTestCase = ( - Precision, - Option, - Option, - Precision, - Option, - Option, - Precision, - ); - let cases: Vec = vec![ - // disjoint ranges: NDV = 5 + 3 - ( - Precision::Exact(5), - Some(0), - Some(10), - Precision::Exact(3), - Some(20), - Some(30), - Precision::Inexact(8), - ), - // identical ranges: intersection = max(10, 8) = 10 - ( - Precision::Exact(10), - Some(0), - Some(100), - Precision::Exact(8), - Some(0), - Some(100), - Precision::Inexact(10), - ), - // partial overlap: 50 + 50 + 25 = 125 - ( - Precision::Exact(100), - Some(0), - Some(100), - Precision::Exact(50), - Some(50), - Some(150), - Precision::Inexact(125), - ), - // right contained in left: 50 + 50 + 0 = 100 - ( - Precision::Exact(100), - Some(0), - Some(100), - Precision::Exact(50), - Some(25), - Some(75), - Precision::Inexact(100), - ), - // both constant, same value - ( - Precision::Exact(1), - Some(5), - Some(5), - Precision::Exact(1), - Some(5), - Some(5), - Precision::Inexact(1), - ), - // both constant, different values - ( - Precision::Exact(1), - Some(5), - Some(5), - Precision::Exact(1), - Some(10), - Some(10), - Precision::Inexact(2), - ), - // left constant within right range - ( - Precision::Exact(1), - Some(5), - Some(5), - Precision::Exact(10), - Some(0), - Some(10), - Precision::Inexact(10), - ), - // left constant outside right range - ( - Precision::Exact(1), - Some(20), - Some(20), - Precision::Exact(10), - Some(0), - Some(10), - Precision::Inexact(11), - ), - // right constant within left range - ( - Precision::Exact(10), - Some(0), - Some(10), - Precision::Exact(1), - Some(5), - Some(5), - Precision::Inexact(10), - ), - // right constant outside left range - ( - Precision::Exact(10), - Some(0), - Some(10), - Precision::Exact(1), - Some(20), - Some(20), - Precision::Inexact(11), - ), - // missing min/max falls back to sum (exact + exact) - ( - Precision::Exact(10), - None, - None, - Precision::Exact(5), - None, - None, - Precision::Inexact(15), - ), - // missing min/max falls back to sum (exact + inexact) - ( - Precision::Exact(10), - None, - None, - Precision::Inexact(5), - None, - None, - Precision::Inexact(15), - ), - // missing min/max falls back to sum (inexact + inexact) - ( - Precision::Inexact(7), - None, - None, - Precision::Inexact(3), - None, - None, - Precision::Inexact(10), - ), - // one side absent - ( - Precision::Exact(10), - None, - None, - Precision::Absent, - None, - None, - Precision::Absent, - ), - // one side absent (inexact + absent) - ( - Precision::Inexact(4), - None, - None, - Precision::Absent, - None, - None, - Precision::Absent, - ), - ]; + fn test_union_partition_statistics_uses_shared_statistics_merge() -> Result<()> { + let (schema, left, right, expected) = stats_merge_inputs(); - for ( - i, - (left_ndv, left_min, left_max, right_ndv, right_min, right_max, expected), - ) in cases.into_iter().enumerate() - { - let to_sv = |v| Precision::Exact(ScalarValue::Int64(Some(v))); - let left = ColumnStatistics { - distinct_count: left_ndv, - min_value: left_min.map(to_sv).unwrap_or(Precision::Absent), - max_value: left_max.map(to_sv).unwrap_or(Precision::Absent), - ..Default::default() - }; - let right = ColumnStatistics { - distinct_count: right_ndv, - min_value: right_min.map(to_sv).unwrap_or(Precision::Absent), - max_value: right_max.map(to_sv).unwrap_or(Precision::Absent), - ..Default::default() - }; - assert_eq!( - union_distinct_count(&left, &right), - expected, - "case {i} failed" - ); - } + let left: Arc = + Arc::new(StatisticsExec::new(left, schema.as_ref().clone())); + let right: Arc = + Arc::new(StatisticsExec::new(right, schema.as_ref().clone())); + + let union = UnionExec::try_new(vec![left, right])?; + let stats = union.partition_statistics(None)?; + + assert_eq!(stats.as_ref(), &expected); + Ok(()) + } + + #[test] + fn test_interleave_partition_statistics_uses_shared_statistics_merge() -> Result<()> { + let (schema, left, right, expected) = stats_merge_inputs(); + let hash_expr = vec![col("a", schema.as_ref())?]; + + let left: Arc = Arc::new(RepartitionExec::try_new( + Arc::new(StatisticsExec::new(left, schema.as_ref().clone())), + Partitioning::Hash(hash_expr.clone(), 2), + )?); + let right: Arc = Arc::new(RepartitionExec::try_new( + Arc::new(StatisticsExec::new(right, schema.as_ref().clone())), + Partitioning::Hash(hash_expr, 2), + )?); + + let interleave = InterleaveExec::try_new(vec![left, right])?; + let stats = interleave.partition_statistics(None)?; + + assert_eq!(stats.as_ref(), &expected); + Ok(()) + } + + #[test] + fn test_interleave_partition_statistics_for_partition_uses_shared_statistics_merge() + -> Result<()> { + let (schema, left, right, _) = stats_merge_inputs(); + let hash_expr = vec![col("a", schema.as_ref())?]; + + let left: Arc = Arc::new(RepartitionExec::try_new( + Arc::new(StatisticsExec::new(left, schema.as_ref().clone())), + Partitioning::Hash(hash_expr.clone(), 2), + )?); + let right: Arc = Arc::new(RepartitionExec::try_new( + Arc::new(StatisticsExec::new(right, schema.as_ref().clone())), + Partitioning::Hash(hash_expr, 2), + )?); + + let interleave = InterleaveExec::try_new(vec![left, right])?; + let stats = interleave.partition_statistics(Some(0))?; + + let expected = Statistics::default() + .with_num_rows(Precision::Inexact(5)) + .with_total_byte_size(Precision::Inexact(25)) + .add_column_statistics(ColumnStatistics::new_unknown()); + + assert_eq!(stats.as_ref(), &expected); + Ok(()) } #[tokio::test]