From 8cd86f84210746afdf3bd77d56fdce5b3ea9cc83 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 2 Apr 2026 16:21:31 -0400 Subject: [PATCH 1/4] Rewrite FileStream in terms of Morselize API --- Cargo.lock | 1 + datafusion/datasource-parquet/src/opener.rs | 257 +++----- datafusion/datasource-parquet/src/source.rs | 83 +-- datafusion/datasource/Cargo.toml | 2 + datafusion/datasource/src/file.rs | 23 +- datafusion/datasource/src/file_scan_config.rs | 4 +- .../datasource/src/file_stream/builder.rs | 40 +- .../datasource/src/file_stream/metrics.rs | 9 +- datafusion/datasource/src/file_stream/mod.rs | 488 ++++++++++---- .../datasource/src/file_stream/scan_state.rs | 258 ++++++++ datafusion/datasource/src/morsel/adapters.rs | 122 ++++ datafusion/datasource/src/morsel/mocks.rs | 612 ++++++++++++++++++ datafusion/datasource/src/morsel/mod.rs | 5 + 13 files changed, 1562 insertions(+), 342 deletions(-) create mode 100644 datafusion/datasource/src/file_stream/scan_state.rs create mode 100644 datafusion/datasource/src/morsel/adapters.rs create mode 100644 datafusion/datasource/src/morsel/mocks.rs diff --git a/Cargo.lock b/Cargo.lock index 87c18826096c..3f7ab2ebaa7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1963,6 +1963,7 @@ dependencies = [ "flate2", "futures", "glob", + "insta", "itertools 0.14.0", "liblzma", "log", diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 35900e16c18e..21a383c6dabb 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! [`ParquetOpener`] and [`ParquetMorselizer`] state machines for opening Parquet files +//! [`ParquetMorselizer`] state machines for opening Parquet files use crate::page_filter::PagePruningAccessPlanFilter; use crate::row_filter::build_projection_read_plan; @@ -26,15 +26,11 @@ use crate::{ }; use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; -use datafusion_common::internal_err; -use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; -use datafusion_datasource::morsel::{ - Morsel, MorselPlan, MorselPlanner, Morselizer, PendingMorselPlanner, -}; +use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; use datafusion_physical_expr::projection::{ProjectionExprs, Projector}; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr_adapter::replace_columns_with_literals; -use std::collections::{HashMap, VecDeque}; +use std::collections::HashMap; use std::fmt; use std::future::Future; use std::mem; @@ -82,19 +78,6 @@ use parquet::bloom_filter::Sbbf; use parquet::errors::ParquetError; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader}; -/// Implements [`FileOpener`] for Parquet -#[derive(Clone)] -pub(super) struct ParquetOpener { - pub(super) morselizer: ParquetMorselizer, -} - -impl FileOpener for ParquetOpener { - fn open(&self, partitioned_file: PartitionedFile) -> Result { - let future = ParquetOpenFuture::new(&self.morselizer, partitioned_file)?; - Ok(Box::pin(future)) - } -} - /// Stateless Parquet morselizer implementation. /// /// Reading a Parquet file is a multi-stage process, with multiple CPU-intensive @@ -173,7 +156,7 @@ impl Morselizer for ParquetMorselizer { } } -/// States for [`ParquetOpenFuture`] +/// States for [`ParquetMorselPlanner`] /// /// These states correspond to the steps required to read and apply various /// filter operations. @@ -425,85 +408,6 @@ impl ParquetOpenState { } } -/// Adapter for a [`MorselPlanner`] to the [`FileOpener`] API -/// -/// Compatibility adapter that drives a morsel planner through the -/// [`FileOpener`] API. -struct ParquetOpenFuture { - planner: Option>, - pending_io: Option, - ready_morsels: VecDeque>, -} - -impl ParquetOpenFuture { - fn new( - morselizer: &ParquetMorselizer, - partitioned_file: PartitionedFile, - ) -> Result { - Ok(Self { - planner: Some(morselizer.plan_file(partitioned_file)?), - pending_io: None, - ready_morsels: VecDeque::new(), - }) - } -} - -impl Future for ParquetOpenFuture { - type Output = Result>>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - // If planner I/O completed, resume with the returned planner. - if let Some(io_future) = self.pending_io.as_mut() { - let maybe_planner = ready!(io_future.poll_unpin(cx)); - // Clear `pending_io` before handling the result so an error - // cannot leave both continuation paths populated. - self.pending_io = None; - if self.planner.is_some() { - return Poll::Ready(internal_err!( - "ParquetOpenFuture does not support concurrent planners" - )); - } - self.planner = Some(maybe_planner?); - } - - // If a stream morsel is ready, return it. - if let Some(morsel) = self.ready_morsels.pop_front() { - return Poll::Ready(Ok(morsel.into_stream())); - } - - // This shim must always own either a planner, a pending planner - // future, or a ready morsel. Reaching this branch means the - // continuation was lost. - let Some(planner) = self.planner.take() else { - return Poll::Ready(internal_err!( - "ParquetOpenFuture polled after completion" - )); - }; - - // Planner completed without producing a stream morsel. - // (e.g. all row groups were pruned) - let Some(mut plan) = planner.plan()? else { - return Poll::Ready(Ok(futures::stream::empty().boxed())); - }; - - let mut child_planners = plan.take_ready_planners(); - if child_planners.len() > 1 { - return Poll::Ready(internal_err!( - "Parquet FileOpener adapter does not support child morsel planners" - )); - } - self.planner = child_planners.pop(); - - self.ready_morsels = plan.take_morsels().into(); - - if let Some(io_future) = plan.take_pending_planner() { - self.pending_io = Some(io_future); - } - } - } -} - /// Implements the Morsel API struct ParquetStreamMorsel { stream: BoxStream<'static, Result>, @@ -1722,17 +1626,18 @@ fn should_enable_page_index( #[cfg(test)] mod test { - use std::sync::Arc; - + use super::*; use super::{ConstantColumns, ParquetMorselizer, constant_columns_from_stats}; - use crate::{DefaultParquetFileReaderFactory, RowGroupAccess, opener::ParquetOpener}; + use crate::{DefaultParquetFileReaderFactory, RowGroupAccess}; + use arrow::array::RecordBatch; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; use datafusion_common::{ - ColumnStatistics, DataFusionError, ScalarValue, Statistics, record_batch, + ColumnStatistics, ScalarValue, Statistics, internal_err, record_batch, stats::Precision, }; - use datafusion_datasource::{PartitionedFile, TableSchema, file_stream::FileOpener}; + use datafusion_datasource::morsel::{Morsel, Morselizer}; + use datafusion_datasource::{PartitionedFile, TableSchema}; use datafusion_expr::{col, lit}; use datafusion_physical_expr::{ PhysicalExpr, @@ -1744,14 +1649,17 @@ mod test { DefaultPhysicalExprAdapterFactory, replace_columns_with_literals, }; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; - use futures::{Stream, StreamExt}; + use futures::StreamExt; + use futures::stream::BoxStream; use object_store::{ObjectStore, ObjectStoreExt, memory::InMemory, path::Path}; use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; + use std::collections::VecDeque; + use std::sync::Arc; - /// Builder for creating [`ParquetOpener`] instances with sensible defaults for tests. + /// Builder for creating [`ParquetMorselizer`] instances with sensible defaults for tests. /// This helps reduce code duplication and makes it clear what differs between test cases. - struct ParquetOpenerBuilder { + struct ParquetMorselizerBuilder { store: Option>, table_schema: Option, partition_index: usize, @@ -1768,13 +1676,13 @@ mod test { enable_page_index: bool, enable_bloom_filter: bool, enable_row_group_stats_pruning: bool, - coerce_int96: Option, + coerce_int96: Option, max_predicate_cache_size: Option, reverse_row_groups: bool, preserve_order: bool, } - impl ParquetOpenerBuilder { + impl ParquetMorselizerBuilder { /// Create a new builder with sensible defaults for tests. fn new() -> Self { Self { @@ -1855,17 +1763,17 @@ mod test { self } - /// Build the ParquetOpener instance. + /// Build the ParquetMorselizer instance. /// /// # Panics /// /// Panics if required fields (store, schema/table_schema) are not set. - fn build(self) -> ParquetOpener { + fn build(self) -> ParquetMorselizer { let store = self .store - .expect("ParquetOpenerBuilder: store must be set via with_store()"); + .expect("ParquetMorselizerBuilder: store must be set via with_store()"); let table_schema = self.table_schema.expect( - "ParquetOpenerBuilder: table_schema must be set via with_schema() or with_table_schema()", + "ParquetMorselizerBuilder: table_schema must be set via with_schema() or with_table_schema()", ); let file_schema = Arc::clone(table_schema.file_schema()); @@ -1879,7 +1787,7 @@ mod test { ProjectionExprs::from_indices(&all_indices, &file_schema) }; - let morselizer = ParquetMorselizer { + ParquetMorselizer { partition_index: self.partition_index, projection, batch_size: self.batch_size, @@ -1906,8 +1814,45 @@ mod test { encryption_factory: None, max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, + } + } + } + + /// Test helper that drives a [`ParquetMorselizer`] to completion and returns + /// the first stream morsel it produces. + /// + /// This mirrors how `FileStream` consumes the morsel APIs: it repeatedly + /// plans CPU work, awaits any discovered I/O futures, and feeds the planner + /// back into the ready queue until a stream morsel is ready. + async fn open_file( + morselizer: &ParquetMorselizer, + file: PartitionedFile, + ) -> Result>> { + let mut planners = VecDeque::from([morselizer.plan_file(file)?]); + let mut morsels: VecDeque> = VecDeque::new(); + + loop { + if let Some(morsel) = morsels.pop_front() { + return Ok(Box::pin(morsel.into_stream())); + } + + let Some(planner) = planners.pop_front() else { + return Ok(Box::pin(futures::stream::empty())); }; - ParquetOpener { morselizer } + + if let Some(mut plan) = planner.plan()? { + morsels.extend(plan.take_morsels()); + planners.extend(plan.take_ready_planners()); + + if let Some(pending_planner) = plan.take_pending_planner() { + planners.push_front(pending_planner.await?); + continue; + } + + if morsels.is_empty() && planners.is_empty() { + return internal_err!("planner returned an empty morsel plan"); + } + } } } @@ -1995,12 +1940,7 @@ mod test { } async fn count_batches_and_rows( - mut stream: std::pin::Pin< - Box< - dyn Stream> - + Send, - >, - >, + mut stream: BoxStream<'static, Result>, ) -> (usize, usize) { let mut num_batches = 0; let mut num_rows = 0; @@ -2013,12 +1953,7 @@ mod test { /// Helper to collect all int32 values from the first column of batches async fn collect_int32_values( - mut stream: std::pin::Pin< - Box< - dyn Stream> - + Send, - >, - >, + mut stream: BoxStream<'static, Result>, ) -> Vec { use arrow::array::Array; let mut values = vec![]; @@ -2104,7 +2039,7 @@ mod test { )); let make_opener = |predicate| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_schema(Arc::clone(&schema)) .with_projection_indices(&[0, 1]) @@ -2117,7 +2052,7 @@ mod test { let expr = col("a").eq(lit(1)); let predicate = logical2physical(&expr, &schema); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -2126,7 +2061,7 @@ mod test { let expr = col("b").eq(lit(ScalarValue::Float32(Some(5.0)))); let predicate = logical2physical(&expr, &schema); let opener = make_opener(predicate); - let stream = opener.open(file).unwrap().await.unwrap(); + let stream = open_file(&opener, file).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -2157,7 +2092,7 @@ mod test { vec![Arc::new(Field::new("part", DataType::Int32, false))], ); let make_opener = |predicate| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_table_schema(table_schema_for_opener.clone()) .with_projection_indices(&[0]) @@ -2172,7 +2107,7 @@ mod test { // Otherwise we assume it already happened at the planning stage and won't re-do the work here let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema)); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -2183,7 +2118,7 @@ mod test { // Otherwise we assume it already happened at the planning stage and won't re-do the work here let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema)); let opener = make_opener(predicate); - let stream = opener.open(file).unwrap().await.unwrap(); + let stream = open_file(&opener, file).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -2226,7 +2161,7 @@ mod test { vec![Arc::new(Field::new("part", DataType::Int32, false))], ); let make_opener = |predicate| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_table_schema(table_schema_for_opener.clone()) .with_projection_indices(&[0]) @@ -2239,7 +2174,7 @@ mod test { let expr = col("part").eq(lit(1)).and(col("b").eq(lit(1.0))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -2248,7 +2183,7 @@ mod test { let expr = col("part").eq(lit(2)).and(col("b").eq(lit(1.0))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -2257,7 +2192,7 @@ mod test { let expr = col("part").eq(lit(1)).and(col("b").eq(lit(7.0))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -2266,7 +2201,7 @@ mod test { let expr = col("part").eq(lit(2)).and(col("b").eq(lit(7.0))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file).unwrap().await.unwrap(); + let stream = open_file(&opener, file).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -2298,7 +2233,7 @@ mod test { vec![Arc::new(Field::new("part", DataType::Int32, false))], ); let make_opener = |predicate| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_table_schema(table_schema_for_opener.clone()) .with_projection_indices(&[0]) @@ -2312,7 +2247,7 @@ mod test { let expr = col("part").eq(lit(1)).or(col("a").eq(lit(1))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -2321,7 +2256,7 @@ mod test { let expr = col("part").eq(lit(1)).or(col("a").eq(lit(3))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -2330,7 +2265,7 @@ mod test { let expr = col("part").eq(lit(2)).or(col("a").eq(lit(1))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 1); @@ -2339,7 +2274,7 @@ mod test { let expr = col("part").eq(lit(2)).or(col("a").eq(lit(3))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file).unwrap().await.unwrap(); + let stream = open_file(&opener, file).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -2379,7 +2314,7 @@ mod test { vec![Arc::new(Field::new("part", DataType::Int32, false))], ); let make_opener = |predicate| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_table_schema(table_schema_for_opener.clone()) .with_projection_indices(&[0]) @@ -2392,7 +2327,7 @@ mod test { let expr = col("a").eq(lit(42)); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -2401,7 +2336,7 @@ mod test { // This allows dynamic filters to prune partitions/files even if they are populated late into execution. let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema)); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -2411,7 +2346,7 @@ mod test { let expr = col("part").eq(lit(2)); let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema)); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -2420,7 +2355,7 @@ mod test { let expr = col("part").eq(lit(2)).and(col("a").eq(lit(42))); let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema)); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -2461,7 +2396,7 @@ mod test { ); let make_opener = |reverse_scan: bool| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_schema(Arc::clone(&schema)) .with_projection_indices(&[0]) @@ -2471,12 +2406,12 @@ mod test { // Test normal scan (forward) let opener = make_opener(false); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let forward_values = collect_int32_values(stream).await; // Test reverse scan let opener = make_opener(true); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let reverse_values = collect_int32_values(stream).await; // The forward scan should return data in the order written @@ -2503,7 +2438,7 @@ mod test { ); let make_opener = |reverse_scan: bool| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_schema(Arc::clone(&schema)) .with_projection_indices(&[0]) @@ -2514,11 +2449,11 @@ mod test { // With a single row group, forward and reverse should be the same // (only the row group order is reversed, not the rows within) let opener_forward = make_opener(false); - let stream_forward = opener_forward.open(file.clone()).unwrap().await.unwrap(); + let stream_forward = open_file(&opener_forward, file.clone()).await.unwrap(); let (batches_forward, _) = count_batches_and_rows(stream_forward).await; let opener_reverse = make_opener(true); - let stream_reverse = opener_reverse.open(file).unwrap().await.unwrap(); + let stream_reverse = open_file(&opener_reverse, file).await.unwrap(); let (batches_reverse, _) = count_batches_and_rows(stream_reverse).await; // Both should have the same number of batches since there's only one row group @@ -2579,7 +2514,7 @@ mod test { .with_extensions(Arc::new(access_plan)); let make_opener = |reverse_scan: bool| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_schema(Arc::clone(&schema)) .with_projection_indices(&[0]) @@ -2589,7 +2524,7 @@ mod test { // Forward scan: RG0(3,4), RG1(5,6,7,8), RG2(9,10) let opener = make_opener(false); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let forward_values = collect_int32_values(stream).await; // Forward scan should produce: RG0(3,4), RG1(5,6,7,8), RG2(9,10) @@ -2605,7 +2540,7 @@ mod test { // - RG1 is read second, WITH RG1's selection (select all) -> 5, 6, 7, 8 // - RG0 is read third, WITH RG0's selection (skip 2, select 2) -> 3, 4 let opener = make_opener(true); - let stream = opener.open(file).unwrap().await.unwrap(); + let stream = open_file(&opener, file).await.unwrap(); let reverse_values = collect_int32_values(stream).await; // Correct expected result: row groups reversed but each keeps its own selection @@ -2680,7 +2615,7 @@ mod test { .with_extensions(Arc::new(access_plan)); let make_opener = |reverse_scan: bool| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_schema(Arc::clone(&schema)) .with_projection_indices(&[0]) @@ -2691,7 +2626,7 @@ mod test { // Forward scan: RG0(1), RG2(5), RG3(7) // Note: RG1 is completely skipped let opener = make_opener(false); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let forward_values = collect_int32_values(stream).await; assert_eq!( @@ -2704,7 +2639,7 @@ mod test { // WITHOUT the bug fix, this would return WRONG values // because the RowSelection would be incorrectly mapped let opener = make_opener(true); - let stream = opener.open(file).unwrap().await.unwrap(); + let stream = open_file(&opener, file).await.unwrap(); let reverse_values = collect_int32_values(stream).await; assert_eq!( diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 1e54e98dfd04..8e96751df03b 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -23,14 +23,15 @@ use std::sync::Arc; use crate::DefaultParquetFileReaderFactory; use crate::ParquetFileReaderFactory; +use crate::opener::ParquetMorselizer; use crate::opener::build_pruning_predicates; -use crate::opener::{ParquetMorselizer, ParquetOpener}; use crate::row_filter::can_expr_be_pushed_down_with_schemas; use datafusion_common::config::ConfigOptions; #[cfg(feature = "parquet_encryption")] use datafusion_common::config::EncryptionFactoryOptions; use datafusion_datasource::as_file_source; use datafusion_datasource::file_stream::FileOpener; +use datafusion_datasource::morsel::Morselizer; use arrow::datatypes::TimeUnit; use datafusion_common::DataFusionError; @@ -246,12 +247,12 @@ use parquet::encryption::decrypt::FileDecryptionProperties; /// # Execution Overview /// /// * Step 1: `DataSourceExec::execute` is called, returning a `FileStream` -/// configured to open parquet files with a `ParquetOpener`. +/// configured to morselize parquet files with a `ParquetMorselizer`. /// -/// * Step 2: When the stream is polled, the `ParquetOpener` is called to open -/// the file. +/// * Step 2: When the stream is polled, the `ParquetMorselizer` is called to +/// plan the file. /// -/// * Step 3: The `ParquetOpener` gets the [`ParquetMetaData`] (file metadata) +/// * Step 3: The `ParquetMorselizer` gets the [`ParquetMetaData`] (file metadata) /// via [`ParquetFileReaderFactory`], creating a `ParquetAccessPlan` by /// applying predicates to metadata. The plan and projections are used to /// determine what pages must be read. @@ -511,11 +512,22 @@ impl From for Arc { impl FileSource for ParquetSource { fn create_file_opener( + &self, + _object_store: Arc, + _base_config: &FileScanConfig, + _partition: usize, + ) -> datafusion_common::Result> { + datafusion_common::internal_err!( + "ParquetSource::create_file_opener called but it supports the Morsel API" + ) + } + + fn create_morselizer( &self, object_store: Arc, base_config: &FileScanConfig, partition: usize, - ) -> datafusion_common::Result> { + ) -> datafusion_common::Result> { let expr_adapter_factory = base_config .expr_adapter_factory .clone() @@ -542,37 +554,34 @@ impl FileSource for ParquetSource { .as_ref() .map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap()); - let opener = Arc::new(ParquetOpener { - morselizer: ParquetMorselizer { - partition_index: partition, - projection: self.projection.clone(), - batch_size: self - .batch_size - .expect("Batch size must set before creating ParquetOpener"), - limit: base_config.limit, - preserve_order: base_config.preserve_order, - predicate: self.predicate.clone(), - table_schema: self.table_schema.clone(), - metadata_size_hint: self.metadata_size_hint, - metrics: self.metrics().clone(), - parquet_file_reader_factory, - pushdown_filters: self.pushdown_filters(), - reorder_filters: self.reorder_filters(), - force_filter_selections: self.force_filter_selections(), - enable_page_index: self.enable_page_index(), - enable_bloom_filter: self.bloom_filter_on_read(), - enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, - coerce_int96, - #[cfg(feature = "parquet_encryption")] - file_decryption_properties, - expr_adapter_factory, - #[cfg(feature = "parquet_encryption")] - encryption_factory: self.get_encryption_factory_with_config(), - max_predicate_cache_size: self.max_predicate_cache_size(), - reverse_row_groups: self.reverse_row_groups, - }, - }); - Ok(opener) + Ok(Box::new(ParquetMorselizer { + partition_index: partition, + projection: self.projection.clone(), + batch_size: self + .batch_size + .expect("Batch size must set before creating ParquetMorselizer"), + limit: base_config.limit, + preserve_order: base_config.preserve_order, + predicate: self.predicate.clone(), + table_schema: self.table_schema.clone(), + metadata_size_hint: self.metadata_size_hint, + metrics: self.metrics().clone(), + parquet_file_reader_factory, + pushdown_filters: self.pushdown_filters(), + reorder_filters: self.reorder_filters(), + force_filter_selections: self.force_filter_selections(), + enable_page_index: self.enable_page_index(), + enable_bloom_filter: self.bloom_filter_on_read(), + enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, + coerce_int96, + #[cfg(feature = "parquet_encryption")] + file_decryption_properties, + expr_adapter_factory, + #[cfg(feature = "parquet_encryption")] + encryption_factory: self.get_encryption_factory_with_config(), + max_predicate_cache_size: self.max_predicate_cache_size(), + reverse_row_groups: self.reverse_row_groups, + })) } fn as_any(&self) -> &dyn Any { diff --git a/datafusion/datasource/Cargo.toml b/datafusion/datasource/Cargo.toml index 1315f871a68f..402752165897 100644 --- a/datafusion/datasource/Cargo.toml +++ b/datafusion/datasource/Cargo.toml @@ -31,6 +31,7 @@ version.workspace = true all-features = true [features] +backtrace = ["datafusion-common/backtrace"] compression = ["async-compression", "liblzma", "bzip2", "flate2", "zstd", "tokio-util"] default = ["compression"] @@ -72,6 +73,7 @@ zstd = { workspace = true, optional = true } [dev-dependencies] criterion = { workspace = true } +insta = { workspace = true } tempfile = { workspace = true } # Note: add additional linter rules in lib.rs. diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index b5a6760cae02..9c43eeca2bc9 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use crate::file_groups::FileGroupPartitioner; use crate::file_scan_config::FileScanConfig; use crate::file_stream::FileOpener; +use crate::morsel::{FileOpenerMorselizer, Morselizer}; #[expect(deprecated)] use crate::schema_adapter::SchemaAdapterFactory; use datafusion_common::config::ConfigOptions; @@ -63,13 +64,33 @@ pub fn as_file_source(source: T) -> Arc /// /// [`DataSource`]: crate::source::DataSource pub trait FileSource: Send + Sync { - /// Creates a `dyn FileOpener` based on given parameters + /// Creates a `dyn FileOpener` based on given parameters. + /// + /// Note: File sources with a native morsel implementation should return an + /// error from this method and implementing [`Self::create_morselizer`] instead. fn create_file_opener( &self, object_store: Arc, base_config: &FileScanConfig, partition: usize, ) -> Result>; + + /// Creates a `dyn Morselizer` based on given parameters. + /// + /// The default implementation preserves existing behavior by adapting the + /// legacy [`FileOpener`] API into a [`Morselizer`]. + /// + /// It is preferred to implement the [`Morselizer`] API directly by + /// implementing this method. + fn create_morselizer( + &self, + object_store: Arc, + base_config: &FileScanConfig, + partition: usize, + ) -> Result> { + let opener = self.create_file_opener(object_store, base_config, partition)?; + Ok(Box::new(FileOpenerMorselizer::new(opener))) + } /// Any fn as_any(&self) -> &dyn Any; diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 976e1158f5eb..2aa5b6a88860 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -587,11 +587,11 @@ impl DataSource for FileScanConfig { let source = self.file_source.with_batch_size(batch_size); - let opener = source.create_file_opener(object_store, self, partition)?; + let morselizer = source.create_morselizer(object_store, self, partition)?; let stream = FileStreamBuilder::new(self) .with_partition(partition) - .with_file_opener(opener) + .with_morselizer(morselizer) .with_metrics(source.metrics()) .build()?; Ok(Box::pin(cooperative(stream))) diff --git a/datafusion/datasource/src/file_stream/builder.rs b/datafusion/datasource/src/file_stream/builder.rs index 6d99f4b56a8e..efe9c39ce3b3 100644 --- a/datafusion/datasource/src/file_stream/builder.rs +++ b/datafusion/datasource/src/file_stream/builder.rs @@ -18,6 +18,8 @@ use std::sync::Arc; use crate::file_scan_config::FileScanConfig; +use crate::file_stream::scan_state::ScanState; +use crate::morsel::{FileOpenerMorselizer, Morselizer}; use datafusion_common::{Result, internal_err}; use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; @@ -28,7 +30,7 @@ use super::{FileOpener, FileStream, FileStreamState, OnError}; pub struct FileStreamBuilder<'a> { config: &'a FileScanConfig, partition: Option, - file_opener: Option>, + morselizer: Option>, metrics: Option<&'a ExecutionPlanMetricsSet>, on_error: OnError, } @@ -39,7 +41,7 @@ impl<'a> FileStreamBuilder<'a> { Self { config, partition: None, - file_opener: None, + morselizer: None, metrics: None, on_error: OnError::Fail, } @@ -52,8 +54,18 @@ impl<'a> FileStreamBuilder<'a> { } /// Configure the [`FileOpener`] used to open files. + /// + /// This will overwrite any setting from [`Self::with_morselizer`] pub fn with_file_opener(mut self, file_opener: Arc) -> Self { - self.file_opener = Some(file_opener); + self.morselizer = Some(Box::new(FileOpenerMorselizer::new(file_opener))); + self + } + + /// Configure the [`Morselizer`] used to open files. + /// + /// This will overwrite any setting from [`Self::with_file_opener`] + pub fn with_morselizer(mut self, morselizer: Box) -> Self { + self.morselizer = Some(morselizer); self } @@ -74,7 +86,7 @@ impl<'a> FileStreamBuilder<'a> { let Self { config, partition, - file_opener, + morselizer, metrics, on_error, } = self; @@ -82,8 +94,8 @@ impl<'a> FileStreamBuilder<'a> { let Some(partition) = partition else { return internal_err!("FileStreamBuilder missing required partition"); }; - let Some(file_opener) = file_opener else { - return internal_err!("FileStreamBuilder missing required file_opener"); + let Some(morselizer) = morselizer else { + return internal_err!("FileStreamBuilder missing required morselizer"); }; let Some(metrics) = metrics else { return internal_err!("FileStreamBuilder missing required metrics"); @@ -95,15 +107,19 @@ impl<'a> FileStreamBuilder<'a> { ); }; + let file_stream_metrics = FileStreamMetrics::new(metrics, partition); + let scan_state = Box::new(ScanState::new( + file_group.into_inner(), + config.limit, + morselizer, + on_error, + file_stream_metrics, + )); + Ok(FileStream { - file_iter: file_group.into_inner().into_iter().collect(), projected_schema, - remain: config.limit, - file_opener, - state: FileStreamState::Idle, - file_stream_metrics: FileStreamMetrics::new(metrics, partition), + state: FileStreamState::Scan { scan_state }, baseline_metrics: BaselineMetrics::new(metrics, partition), - on_error, }) } } diff --git a/datafusion/datasource/src/file_stream/metrics.rs b/datafusion/datasource/src/file_stream/metrics.rs index f4dddeaee8d0..5f3894404f40 100644 --- a/datafusion/datasource/src/file_stream/metrics.rs +++ b/datafusion/datasource/src/file_stream/metrics.rs @@ -77,7 +77,7 @@ pub struct FileStreamMetrics { /// Wall clock time elapsed for data decompression + decoding /// /// Time spent waiting for the FileStream's input. - pub time_processing: StartableTime, + pub time_processing: Time, /// Count of errors opening file. /// /// If using `OnError::Skip` this will provide a count of the number of files @@ -126,11 +126,8 @@ impl FileStreamMetrics { start: None, }; - let time_processing = StartableTime { - metrics: MetricBuilder::new(metrics) - .subset_time("time_elapsed_processing", partition), - start: None, - }; + let time_processing = + MetricBuilder::new(metrics).subset_time("time_elapsed_processing", partition); let file_open_errors = MetricBuilder::new(metrics) .with_category(MetricCategory::Rows) diff --git a/datafusion/datasource/src/file_stream/mod.rs b/datafusion/datasource/src/file_stream/mod.rs index 33e5065cb5a3..ceb4686ea62f 100644 --- a/datafusion/datasource/src/file_stream/mod.rs +++ b/datafusion/datasource/src/file_stream/mod.rs @@ -23,8 +23,8 @@ mod builder; mod metrics; +mod scan_state; -use std::collections::VecDeque; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -38,33 +38,24 @@ use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet use arrow::record_batch::RecordBatch; +use futures::Stream; use futures::future::BoxFuture; use futures::stream::BoxStream; -use futures::{FutureExt as _, Stream, StreamExt as _, ready}; + +use self::scan_state::{ScanAndReturn, ScanState}; pub use builder::FileStreamBuilder; pub use metrics::{FileStreamMetrics, StartableTime}; /// A stream that iterates record batch by record batch, file over file. pub struct FileStream { - /// An iterator over input files. - file_iter: VecDeque, /// The stream schema (file schema including partition columns and after /// projection). projected_schema: SchemaRef, - /// The remaining number of records to parse, None if no limit - remain: Option, - /// A dynamic [`FileOpener`]. Calling `open()` returns a [`FileOpenFuture`], - /// which can be resolved to a stream of `RecordBatch`. - file_opener: Arc, /// The stream state state: FileStreamState, - /// File stream specific metrics - file_stream_metrics: FileStreamMetrics, /// runtime baseline metrics baseline_metrics: BaselineMetrics, - /// Describes the behavior of the `FileStream` if file opening or scanning fails - on_error: OnError, } impl FileStream { @@ -88,105 +79,34 @@ impl FileStream { /// If `OnError::Skip` the stream will skip files which encounter an error and continue /// If `OnError:Fail` (default) the stream will fail and stop processing when an error occurs pub fn with_on_error(mut self, on_error: OnError) -> Self { - self.on_error = on_error; + match &mut self.state { + FileStreamState::Scan { scan_state } => scan_state.set_on_error(on_error), + FileStreamState::Error | FileStreamState::Done => { + // no effect as there are no more files to process + } + }; self } - fn start_next_file(&mut self) -> Option> { - let part_file = self.file_iter.pop_front()?; - Some(self.file_opener.open(part_file)) - } - fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll>> { loop { match &mut self.state { - FileStreamState::Idle => match self.start_next_file().transpose() { - Ok(Some(future)) => { - self.file_stream_metrics.time_opening.start(); - self.state = FileStreamState::Open { future }; - } - Ok(None) => return Poll::Ready(None), - Err(e) => { - self.state = FileStreamState::Error; - return Poll::Ready(Some(Err(e))); - } - }, - FileStreamState::Open { future } => match ready!(future.poll_unpin(cx)) { - Ok(reader) => { - self.file_stream_metrics.files_opened.add(1); - self.file_stream_metrics.time_opening.stop(); - self.file_stream_metrics.time_scanning_until_data.start(); - self.file_stream_metrics.time_scanning_total.start(); - self.state = FileStreamState::Scan { reader }; - } - Err(e) => { - self.file_stream_metrics.file_open_errors.add(1); - match self.on_error { - OnError::Skip => { - self.file_stream_metrics.files_processed.add(1); - self.file_stream_metrics.time_opening.stop(); - self.state = FileStreamState::Idle - } - OnError::Fail => { - self.state = FileStreamState::Error; - return Poll::Ready(Some(Err(e))); - } - } - } - }, - FileStreamState::Scan { reader } => { - match ready!(reader.poll_next_unpin(cx)) { - Some(Ok(batch)) => { - self.file_stream_metrics.time_scanning_until_data.stop(); - self.file_stream_metrics.time_scanning_total.stop(); - let batch = match &mut self.remain { - Some(remain) => { - if *remain > batch.num_rows() { - *remain -= batch.num_rows(); - batch - } else { - let batch = batch.slice(0, *remain); - // Count this file and all remaining files - // we will never open. - let done = 1 + self.file_iter.len(); - self.file_stream_metrics - .files_processed - .add(done); - self.state = FileStreamState::Limit; - *remain = 0; - batch - } - } - None => batch, - }; - self.file_stream_metrics.time_scanning_total.start(); - return Poll::Ready(Some(Ok(batch))); - } - Some(Err(err)) => { - self.file_stream_metrics.file_scan_errors.add(1); - self.file_stream_metrics.time_scanning_until_data.stop(); - self.file_stream_metrics.time_scanning_total.stop(); - - match self.on_error { - OnError::Skip => { - self.file_stream_metrics.files_processed.add(1); - self.state = FileStreamState::Idle; - } - OnError::Fail => { - self.state = FileStreamState::Error; - return Poll::Ready(Some(Err(err))); - } - } + FileStreamState::Scan { scan_state: queue } => { + let action = queue.poll_scan(cx); + match action { + ScanAndReturn::Continue => continue, + ScanAndReturn::Done(result) => { + self.state = FileStreamState::Done; + return Poll::Ready(result); } - None => { - self.file_stream_metrics.files_processed.add(1); - self.file_stream_metrics.time_scanning_until_data.stop(); - self.file_stream_metrics.time_scanning_total.stop(); - self.state = FileStreamState::Idle; + ScanAndReturn::Error(err) => { + self.state = FileStreamState::Error; + return Poll::Ready(Some(Err(err))); } + ScanAndReturn::Return(result) => return result, } } - FileStreamState::Error | FileStreamState::Limit => { + FileStreamState::Error | FileStreamState::Done => { return Poll::Ready(None); } } @@ -201,9 +121,7 @@ impl Stream for FileStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - self.file_stream_metrics.time_processing.start(); let result = self.poll_inner(cx); - self.file_stream_metrics.time_processing.stop(); self.baseline_metrics.record_poll(result) } } @@ -238,33 +156,29 @@ pub trait FileOpener: Unpin + Send + Sync { fn open(&self, partitioned_file: PartitionedFile) -> Result; } -pub enum FileStreamState { - /// The idle state, no file is currently being read - Idle, - /// Currently performing asynchronous IO to obtain a stream of RecordBatch - /// for a given file - Open { - /// A [`FileOpenFuture`] returned by [`FileOpener::open`] - future: FileOpenFuture, - }, - /// Scanning the [`BoxStream`] returned by the completion of a [`FileOpenFuture`] - /// returned by [`FileOpener::open`] +enum FileStreamState { + /// Actively processing readers, ready morsels, and planner work. Scan { - /// The reader instance - reader: BoxStream<'static, Result>, + /// The ready queues and active reader for the current file. + scan_state: Box, }, /// Encountered an error Error, - /// Reached the row limit - Limit, + /// Finished scanning all requested data + Done, } -/// A timer that can be started and stopped. #[cfg(test)] mod tests { use crate::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; + use crate::morsel::mocks::{ + IoFutureId, MockMorselizer, MockPlanner, MorselId, PollsToResolve, + }; use crate::tests::make_partition; use crate::{PartitionedFile, TableSchema}; + use arrow::array::{AsArray, RecordBatch}; + use arrow::datatypes::{DataType, Field, Int32Type, Schema}; + use datafusion_common::DataFusionError; use datafusion_common::error::Result; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -274,8 +188,6 @@ mod tests { use crate::file_stream::{FileOpenFuture, FileOpener, FileStreamBuilder, OnError}; use crate::test_util::MockSource; - use arrow::array::RecordBatch; - use arrow::datatypes::Schema; use datafusion_common::{assert_batches_eq, exec_err, internal_err}; @@ -747,7 +659,7 @@ mod tests { assert!(err.contains("FileStreamBuilder missing required partition")); let err = builder_error(FileStreamBuilder::new(&config).with_partition(0)); - assert!(err.contains("FileStreamBuilder missing required file_opener")); + assert!(err.contains("FileStreamBuilder missing required morselizer")); let err = builder_error( FileStreamBuilder::new(&config) @@ -770,4 +682,334 @@ mod tests { ); assert!(err.contains("FileStreamBuilder invalid partition index: 1")); } + + /// Verifies the simplest morsel-driven flow: one planner produces one + /// morsel immediately, and that morsel is then scanned to completion. + #[tokio::test] + async fn morsel_no_io() -> Result<()> { + let test = FileStreamMorselTest::new().with_file( + MockPlanner::builder("file1.parquet") + .return_morsel(MorselId(10), 42) + .return_none() + .build(), + ); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Output Stream ----- + Batch: 42 + Done + ----- File Stream Events ----- + morselize_file: file1.parquet + planner_created: file1.parquet + planner_called: file1.parquet + morsel_produced: file1.parquet, MorselId(10) + morsel_stream_started: MorselId(10) + morsel_stream_batch_produced: MorselId(10), BatchId(42) + morsel_stream_finished: MorselId(10) + "); + + Ok(()) + } + + /// Verifies that a planner can block on one I/O phase and then produce a + /// morsel containing two batches. + #[tokio::test] + async fn morsel_single_io_two_batches() -> Result<()> { + let test = FileStreamMorselTest::new().with_file( + MockPlanner::builder("file1.parquet") + .return_io(IoFutureId(1), PollsToResolve(1)) + .return_morsel_batches(MorselId(10), vec![42, 43]) + .return_none() + .build(), + ); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Output Stream ----- + Batch: 42 + Batch: 43 + Done + ----- File Stream Events ----- + morselize_file: file1.parquet + planner_created: file1.parquet + planner_called: file1.parquet + io_future_created: file1.parquet, IoFutureId(1) + io_future_polled: file1.parquet, IoFutureId(1) + io_future_polled: file1.parquet, IoFutureId(1) + io_future_resolved: file1.parquet, IoFutureId(1) + planner_called: file1.parquet + morsel_produced: file1.parquet, MorselId(10) + morsel_stream_started: MorselId(10) + morsel_stream_batch_produced: MorselId(10), BatchId(42) + morsel_stream_batch_produced: MorselId(10), BatchId(43) + morsel_stream_finished: MorselId(10) + "); + + Ok(()) + } + + /// Verifies that a planner can traverse two sequential I/O phases before + /// producing one batch (similar to Parquet which does this0. + #[tokio::test] + async fn morsel_two_ios_one_batch() -> Result<()> { + let test = FileStreamMorselTest::new().with_file( + MockPlanner::builder("file1.parquet") + .return_io(IoFutureId(1), PollsToResolve(0)) + .return_io(IoFutureId(2), PollsToResolve(0)) + .return_morsel(MorselId(10), 42) + .return_none() + .build(), + ); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Output Stream ----- + Batch: 42 + Done + ----- File Stream Events ----- + morselize_file: file1.parquet + planner_created: file1.parquet + planner_called: file1.parquet + io_future_created: file1.parquet, IoFutureId(1) + io_future_polled: file1.parquet, IoFutureId(1) + io_future_resolved: file1.parquet, IoFutureId(1) + planner_called: file1.parquet + io_future_created: file1.parquet, IoFutureId(2) + io_future_polled: file1.parquet, IoFutureId(2) + io_future_resolved: file1.parquet, IoFutureId(2) + planner_called: file1.parquet + morsel_produced: file1.parquet, MorselId(10) + morsel_stream_started: MorselId(10) + morsel_stream_batch_produced: MorselId(10), BatchId(42) + morsel_stream_finished: MorselId(10) + "); + + Ok(()) + } + + /// Verifies that a planner I/O future can fail and terminate the stream. + #[tokio::test] + async fn morsel_io_error() -> Result<()> { + let test = FileStreamMorselTest::new().with_file( + MockPlanner::builder("file1.parquet") + .return_io_error( + IoFutureId(1), + PollsToResolve(0), + "io failed while opening file", + ) + .build(), + ); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Output Stream ----- + Error: io failed while opening file + Done + ----- File Stream Events ----- + morselize_file: file1.parquet + planner_created: file1.parquet + planner_called: file1.parquet + io_future_created: file1.parquet, IoFutureId(1) + io_future_polled: file1.parquet, IoFutureId(1) + io_future_errored: file1.parquet, IoFutureId(1), io failed while opening file + "); + + Ok(()) + } + + /// Verifies that planning can fail after a successful I/O phase. + #[tokio::test] + async fn morsel_plan_error_after_io() -> Result<()> { + let test = FileStreamMorselTest::new().with_file( + MockPlanner::builder("file1.parquet") + .return_io(IoFutureId(1), PollsToResolve(0)) + .return_error("planner failed after io") + .build(), + ); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Output Stream ----- + Error: planner failed after io + Done + ----- File Stream Events ----- + morselize_file: file1.parquet + planner_created: file1.parquet + planner_called: file1.parquet + io_future_created: file1.parquet, IoFutureId(1) + io_future_polled: file1.parquet, IoFutureId(1) + io_future_resolved: file1.parquet, IoFutureId(1) + planner_called: file1.parquet + "); + + Ok(()) + } + + /// Verifies that `FileStream` scans multiple files in order. + #[tokio::test] + async fn morsel_multiple_files() -> Result<()> { + let test = FileStreamMorselTest::new() + .with_file( + MockPlanner::builder("file1.parquet") + .return_morsel(MorselId(10), 41) + .return_none() + .build(), + ) + .with_file( + MockPlanner::builder("file2.parquet") + .return_morsel(MorselId(11), 42) + .return_none() + .build(), + ); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Output Stream ----- + Batch: 41 + Batch: 42 + Done + ----- File Stream Events ----- + morselize_file: file1.parquet + planner_created: file1.parquet + planner_called: file1.parquet + morsel_produced: file1.parquet, MorselId(10) + morsel_stream_started: MorselId(10) + morsel_stream_batch_produced: MorselId(10), BatchId(41) + morsel_stream_finished: MorselId(10) + morselize_file: file2.parquet + planner_created: file2.parquet + planner_called: file2.parquet + morsel_produced: file2.parquet, MorselId(11) + morsel_stream_started: MorselId(11) + morsel_stream_batch_produced: MorselId(11), BatchId(42) + morsel_stream_finished: MorselId(11) + "); + + Ok(()) + } + + /// Verifies that a global limit can stop the stream before a second file is opened. + #[tokio::test] + async fn morsel_limit_prevents_second_file() -> Result<()> { + let test = FileStreamMorselTest::new() + .with_file( + MockPlanner::builder("file1.parquet") + .return_morsel_batches(MorselId(10), vec![41, 42]) + .return_none() + .build(), + ) + .with_file( + MockPlanner::builder("file2.parquet") + .return_morsel(MorselId(11), 43) + .return_none() + .build(), + ) + .with_limit(1); + + // Note the snapshot should not ever see planner id2 + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Output Stream ----- + Batch: 41 + Done + ----- File Stream Events ----- + morselize_file: file1.parquet + planner_created: file1.parquet + planner_called: file1.parquet + morsel_produced: file1.parquet, MorselId(10) + morsel_stream_started: MorselId(10) + morsel_stream_batch_produced: MorselId(10), BatchId(41) + "); + + Ok(()) + } + + /// Tests how FileStream opens and processes files. + #[derive(Clone)] + struct FileStreamMorselTest { + morselizer: MockMorselizer, + file_names: Vec, + limit: Option, + } + + impl FileStreamMorselTest { + /// Creates an empty test harness. + fn new() -> Self { + Self { + morselizer: MockMorselizer::new(), + file_names: vec![], + limit: None, + } + } + + /// Adds one file and its root planner to the test input. + fn with_file(mut self, planner: MockPlanner) -> Self { + self.file_names.push(planner.file_path().to_string()); + self.morselizer = self.morselizer.with_file(planner); + self + } + + /// Sets a global output limit for the stream. + fn with_limit(mut self, limit: usize) -> Self { + self.limit = Some(limit); + self + } + + /// Runs the test returns combined output and scheduler trace text as a String. + async fn run(self) -> Result { + let observer = self.morselizer.observer().clone(); + observer.clear(); + + let config = self.test_config(); + let metrics_set = ExecutionPlanMetricsSet::new(); + let mut stream = FileStreamBuilder::new(&config) + .with_partition(0) + .with_morselizer(Box::new(self.morselizer)) + .with_metrics(&metrics_set) + .build()?; + + let mut stream_contents = Vec::new(); + while let Some(result) = stream.next().await { + match result { + Ok(batch) => { + let col = batch.column(0).as_primitive::(); + let batch_id = col.value(0); + stream_contents.push(format!("Batch: {batch_id}")); + } + Err(e) => { + // Pull the actual message for external errors rather than + // relying on DataFusionError formatting, which changes + // if backtraces are enabled, etc + let message = if let DataFusionError::External(generic) = e { + generic.to_string() + } else { + e.to_string() + }; + stream_contents.push(format!("Error: {message}")); + } + } + } + stream_contents.push("Done".to_string()); + + Ok(format!( + "----- Output Stream -----\n{}\n----- File Stream Events -----\n{}", + stream_contents.join("\n"), + observer.format_events() + )) + } + + /// Builds the `FileScanConfig` for the configured mock file set. + fn test_config(&self) -> FileScanConfig { + let file_group = self + .file_names + .iter() + .map(|name| PartitionedFile::new(name, 10)) + .collect(); + let table_schema = TableSchema::new( + Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])), + vec![], + ); + FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + Arc::new(MockSource::new(table_schema)), + ) + .with_file_group(file_group) + .with_limit(self.limit) + .build() + } + } } diff --git a/datafusion/datasource/src/file_stream/scan_state.rs b/datafusion/datasource/src/file_stream/scan_state.rs new file mode 100644 index 000000000000..5215c2ae7074 --- /dev/null +++ b/datafusion/datasource/src/file_stream/scan_state.rs @@ -0,0 +1,258 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::VecDeque; +use std::task::{Context, Poll}; + +use crate::PartitionedFile; +use crate::morsel::{Morsel, MorselPlanner, Morselizer, PendingMorselPlanner}; +use arrow::record_batch::RecordBatch; +use datafusion_common::{DataFusionError, Result}; +use datafusion_physical_plan::metrics::ScopedTimerGuard; +use futures::stream::BoxStream; +use futures::{FutureExt as _, StreamExt as _}; + +use super::{FileStreamMetrics, OnError}; + +/// State [`FileStreamState::Scan`] +/// +/// Groups together ready planners, ready morsels, the active reader, +/// pending planner I/O, the remaining files and limit, and the metrics +/// associated with processing that work. +/// +/// [`FileStreamState::Scan`]: super::FileStreamState::Scan +pub(super) struct ScanState { + /// Files that still need to be planned. + file_iter: VecDeque, + /// Remaining record limit, if any. + remain: Option, + /// The morselizer used to plan files + morselizer: Box, + /// Behavior if opening or scanning a file fails. + on_error: OnError, + /// CPU-ready planners for the current file. + ready_planners: VecDeque>, + /// Ready morsels for the current file. + ready_morsels: VecDeque>, + /// The active reader, if any. + reader: Option>>, + /// Planner currently doing I/O + pending_planner: Option, + /// Metrics for the active scan queues. + metrics: FileStreamMetrics, +} + +impl ScanState { + pub(super) fn new( + file_iter: impl Into>, + remain: Option, + morselizer: Box, + on_error: OnError, + metrics: FileStreamMetrics, + ) -> Self { + let file_iter = file_iter.into(); + Self { + file_iter, + remain, + morselizer, + on_error, + ready_planners: Default::default(), + ready_morsels: Default::default(), + reader: None, + pending_planner: None, + metrics, + } + } + + /// Updates how scan errors are handled while the stream is still active. + pub(super) fn set_on_error(&mut self, on_error: OnError) { + self.on_error = on_error; + } + + /// Drives one iteration of the active scan state, reading from morsels, + /// planners, pending planner I/O, or unopened files from `self`. + /// + /// The return [`ScanAndReturn`] tells `poll_inner` how to update the + /// outer `FileStreamState`. + pub(super) fn poll_scan(&mut self, cx: &mut Context<'_>) -> ScanAndReturn { + let _processing_timer: ScopedTimerGuard<'_> = + self.metrics.time_processing.timer(); + + // Try and resolve outstanding IO first + if let Some(mut pending_planner) = self.pending_planner.take() { + match pending_planner.poll_unpin(cx) { + // IO is still pending + Poll::Pending => { + self.pending_planner = Some(pending_planner); + return ScanAndReturn::Return(Poll::Pending); + } + // IO resolved, and the planner is ready for CPU work + Poll::Ready(Ok(planner)) => { + self.ready_planners.push_back(planner); + return ScanAndReturn::Continue; + } + // IO Error + Poll::Ready(Err(err)) => { + self.metrics.file_open_errors.add(1); + self.metrics.time_opening.stop(); + return match self.on_error { + OnError::Skip => { + self.metrics.files_processed.add(1); + ScanAndReturn::Continue + } + OnError::Fail => ScanAndReturn::Error(err), + }; + } + } + } + + // Next try and get the net batch from the active reader, if any + if let Some(reader) = self.reader.as_mut() { + match reader.poll_next_unpin(cx) { + // According to the API contract, readers should always be ready + // but in practice they may actually be waiting on IO, and if + // that happens wait for it here. + Poll::Pending => return ScanAndReturn::Return(Poll::Pending), + Poll::Ready(Some(Ok(batch))) => { + self.metrics.time_scanning_until_data.stop(); + self.metrics.time_scanning_total.stop(); + // check limit + let (batch, finished) = match &mut self.remain { + Some(remain) => { + if *remain > batch.num_rows() { + *remain -= batch.num_rows(); + self.metrics.time_scanning_total.start(); + (batch, false) + } else { + let batch = batch.slice(0, *remain); + let done = 1 + self.file_iter.len(); + self.metrics.files_processed.add(done); + *remain = 0; + (batch, true) + } + } + None => { + self.metrics.time_scanning_total.start(); + (batch, false) + } + }; + return if finished { + ScanAndReturn::Done(Some(Ok(batch))) + } else { + ScanAndReturn::Return(Poll::Ready(Some(Ok(batch)))) + }; + } + Poll::Ready(Some(Err(err))) => { + self.reader = None; + self.metrics.file_scan_errors.add(1); + self.metrics.time_scanning_until_data.stop(); + self.metrics.time_scanning_total.stop(); + return match self.on_error { + OnError::Skip => { + self.metrics.files_processed.add(1); + ScanAndReturn::Continue + } + OnError::Fail => ScanAndReturn::Error(err), + }; + } + Poll::Ready(None) => { + self.reader = None; + self.metrics.files_processed.add(1); + self.metrics.time_scanning_until_data.stop(); + self.metrics.time_scanning_total.stop(); + return ScanAndReturn::Continue; + } + } + } + + // Don't have a reader but have morsels ready to turn into a reader, so do that. + if let Some(morsel) = self.ready_morsels.pop_front() { + self.metrics.files_opened.add(1); + self.metrics.time_opening.stop(); + self.metrics.time_scanning_until_data.start(); + self.metrics.time_scanning_total.start(); + self.reader = Some(morsel.into_stream()); + return ScanAndReturn::Continue; + } + + // Don't have a morsel or stream, so try and plan some more morsels + if let Some(planner) = self.ready_planners.pop_front() { + return match planner.plan() { + Ok(Some(mut plan)) => { + // Get all morsels and planners and try again + self.ready_morsels.extend(plan.take_morsels()); + self.ready_planners.extend(plan.take_ready_planners()); + if let Some(pending_planner) = plan.take_pending_planner() { + self.pending_planner = Some(pending_planner); + } + ScanAndReturn::Continue + } + Ok(None) => { + self.metrics.files_processed.add(1); + self.metrics.time_opening.stop(); + ScanAndReturn::Continue + } + Err(err) => { + self.metrics.file_open_errors.add(1); + self.metrics.time_opening.stop(); + match self.on_error { + OnError::Skip => { + self.metrics.files_processed.add(1); + ScanAndReturn::Continue + } + OnError::Fail => ScanAndReturn::Error(err), + } + } + }; + } + + // No planners, morsels, or active reader, so try and open the next file and plan it. + let part_file = match self.file_iter.pop_front() { + Some(part_file) => part_file, + None => return ScanAndReturn::Done(None), + }; + + self.metrics.time_opening.start(); + match self.morselizer.plan_file(part_file) { + Ok(planner) => { + self.ready_planners.push_back(planner); + ScanAndReturn::Continue + } + Err(err) => match self.on_error { + OnError::Skip => { + self.metrics.file_open_errors.add(1); + self.metrics.time_opening.stop(); + self.metrics.files_processed.add(1); + ScanAndReturn::Continue + } + OnError::Fail => ScanAndReturn::Error(err), + }, + } + } +} + +/// What should be done on the next iteration of [`ScanState::poll_scan`]? +pub(super) enum ScanAndReturn { + /// Poll again + Continue, + /// Return the provided result without changing the outer state. + Return(Poll>>), + /// Update the outer `FileStreamState` to `Done` and return the provided result. + Done(Option>), + /// Update the outer `FileStreamState` to `Error` and return the provided error. + Error(DataFusionError), +} diff --git a/datafusion/datasource/src/morsel/adapters.rs b/datafusion/datasource/src/morsel/adapters.rs new file mode 100644 index 000000000000..6fa6d4916771 --- /dev/null +++ b/datafusion/datasource/src/morsel/adapters.rs @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::PartitionedFile; +use crate::file_stream::FileOpener; +use crate::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; +use arrow::array::RecordBatch; +use datafusion_common::Result; +use futures::FutureExt; +use futures::stream::BoxStream; +use std::fmt::Debug; +use std::sync::Arc; + +/// Adapt a legacy [`FileOpener`] to the morsel API. +/// +/// This preserves backwards compatibility for file formats that have not yet +/// implemented a native [`Morselizer`]. +pub struct FileOpenerMorselizer { + file_opener: Arc, +} + +impl Debug for FileOpenerMorselizer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FileOpenerMorselizer") + .field("file_opener", &"...") + .finish() + } +} + +impl FileOpenerMorselizer { + pub fn new(file_opener: Arc) -> Self { + Self { file_opener } + } +} + +impl Morselizer for FileOpenerMorselizer { + fn plan_file(&self, file: PartitionedFile) -> Result> { + Ok(Box::new(FileOpenFutureMorselPlanner::new( + Arc::clone(&self.file_opener), + file, + ))) + } +} + +enum FileOpenFutureMorselPlanner { + Unopened { + file_opener: Arc, + file: Box, + }, + ReadyStream(BoxStream<'static, Result>), +} + +impl Debug for FileOpenFutureMorselPlanner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Unopened { .. } => f + .debug_tuple("FileOpenFutureMorselPlanner::Unopened") + .finish(), + Self::ReadyStream(_) => f + .debug_tuple("FileOpenFutureMorselPlanner::ReadyStream") + .finish(), + } + } +} + +impl FileOpenFutureMorselPlanner { + fn new(file_opener: Arc, file: PartitionedFile) -> Self { + Self::Unopened { + file_opener, + file: Box::new(file), + } + } +} + +impl MorselPlanner for FileOpenFutureMorselPlanner { + fn plan(self: Box) -> Result> { + match *self { + Self::Unopened { file_opener, file } => { + let io_future = async move { + let stream = file_opener.open(*file)?.await?; + Ok(Box::new(Self::ReadyStream(stream)) as Box) + } + .boxed(); + Ok(Some(MorselPlan::new().with_pending_planner(io_future))) + } + Self::ReadyStream(stream) => Ok(Some( + MorselPlan::new() + .with_morsels(vec![Box::new(FileStreamMorsel { stream })]), + )), + } + } +} + +struct FileStreamMorsel { + stream: BoxStream<'static, Result>, +} + +impl Debug for FileStreamMorsel { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FileStreamMorsel").finish_non_exhaustive() + } +} + +impl Morsel for FileStreamMorsel { + fn into_stream(self: Box) -> BoxStream<'static, Result> { + self.stream + } +} diff --git a/datafusion/datasource/src/morsel/mocks.rs b/datafusion/datasource/src/morsel/mocks.rs new file mode 100644 index 000000000000..e23171ae2981 --- /dev/null +++ b/datafusion/datasource/src/morsel/mocks.rs @@ -0,0 +1,612 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Test-only mocks for exercising the morsel-driven `FileStream` scheduler. + +use std::collections::{HashMap, VecDeque}; +use std::fmt::{Display, Formatter}; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; + +use crate::PartitionedFile; +use crate::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; +use arrow::array::{Int32Array, RecordBatch}; +use arrow::datatypes::{DataType, Field, Schema}; +use datafusion_common::{DataFusionError, Result, internal_datafusion_err}; +use futures::stream::BoxStream; +use futures::{Future, FutureExt}; + +// Use thin wrappers around usize so the test setups are more explicit + +/// Identifier for a mock morsel in scheduler snapshots. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub(crate) struct MorselId(pub usize); + +/// Identifier for a produced batch in scheduler snapshots. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub(crate) struct BatchId(pub usize); + +/// Identifier for a mock I/O future in scheduler snapshots. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub(crate) struct IoFutureId(pub usize); + +/// Number of pending polls before a mock I/O future resolves. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub(crate) struct PollsToResolve(pub usize); + +/// Error message returned by a mock planner or I/O future. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct MockError(pub String); + +impl Display for MockError { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl std::error::Error for MockError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + None + } +} + +/// Scheduler-visible event captured by the mock morsel test harness. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum MorselEvent { + MorselizeFile { + path: String, + }, + PlannerCreated { + planner_name: String, + }, + PlannerCalled { + planner_name: String, + }, + IoFutureCreated { + planner_name: String, + io_future_id: IoFutureId, + }, + IoFuturePolled { + planner_name: String, + io_future_id: IoFutureId, + }, + IoFutureResolved { + planner_name: String, + io_future_id: IoFutureId, + }, + IoFutureErrored { + planner_name: String, + io_future_id: IoFutureId, + message: String, + }, + MorselProduced { + planner_name: String, + morsel_id: MorselId, + }, + MorselStreamStarted { + morsel_id: MorselId, + }, + MorselStreamBatchProduced { + morsel_id: MorselId, + batch_id: BatchId, + }, + MorselStreamFinished { + morsel_id: MorselId, + }, +} + +impl Display for MorselEvent { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + MorselEvent::MorselizeFile { path } => { + write!(f, "morselize_file: {path}") + } + MorselEvent::PlannerCreated { planner_name } => { + write!(f, "planner_created: {planner_name}") + } + MorselEvent::PlannerCalled { planner_name } => { + write!(f, "planner_called: {planner_name}") + } + MorselEvent::IoFutureCreated { + planner_name, + io_future_id, + } => write!(f, "io_future_created: {planner_name}, {io_future_id:?}"), + MorselEvent::IoFuturePolled { + planner_name, + io_future_id, + } => write!(f, "io_future_polled: {planner_name}, {io_future_id:?}"), + MorselEvent::IoFutureResolved { + planner_name, + io_future_id, + } => write!(f, "io_future_resolved: {planner_name}, {io_future_id:?}"), + MorselEvent::IoFutureErrored { + planner_name, + io_future_id, + message, + } => write!( + f, + "io_future_errored: {planner_name}, {io_future_id:?}, {message}" + ), + MorselEvent::MorselProduced { + planner_name, + morsel_id, + } => write!(f, "morsel_produced: {planner_name}, {morsel_id:?}"), + MorselEvent::MorselStreamStarted { morsel_id } => { + write!(f, "morsel_stream_started: {morsel_id:?}") + } + MorselEvent::MorselStreamBatchProduced { + morsel_id, + batch_id, + } => write!( + f, + "morsel_stream_batch_produced: {morsel_id:?}, {batch_id:?}" + ), + MorselEvent::MorselStreamFinished { morsel_id } => { + write!(f, "morsel_stream_finished: {morsel_id:?}") + } + } + } +} + +/// Shared observer that records scheduler events for snapshot tests. +#[derive(Debug, Default, Clone)] +pub(crate) struct MorselObserver { + events: Arc>>, +} + +impl MorselObserver { + /// Clears any previously recorded events. + pub(crate) fn clear(&self) { + self.events.lock().unwrap().clear(); + } + + /// Records one new scheduler event. + pub(crate) fn push(&self, event: MorselEvent) { + self.events.lock().unwrap().push(event); + } + + /// Formats all recorded events into a stable, snapshot-friendly trace. + pub(crate) fn format_events(&self) -> String { + self.events + .lock() + .unwrap() + .iter() + .map(ToString::to_string) + .collect::>() + .join("\n") + } +} + +/// Declarative planner spec used by the mock morselizer. +#[derive(Debug, Clone)] +pub(crate) struct MockPlanner { + file_path: String, + steps: VecDeque, +} + +impl MockPlanner { + /// Creates a fluent builder for one mock planner. + pub(crate) fn builder(file_path: impl Into) -> MockPlannerBuilder { + MockPlannerBuilder { + file_path: file_path.into(), + ..Default::default() + } + } + + /// Returns the file path associated with this planner. + pub(crate) fn file_path(&self) -> &str { + &self.file_path + } +} + +/// One scheduler-visible step in a mock planner's lifecycle. +#[derive(Debug, Clone)] +enum PlannerStep { + Morsel { + morsel_id: MorselId, + batch_ids: Vec, + }, + Io { + io_future_id: IoFutureId, + polls_to_resolve: PollsToResolve, + result: std::result::Result<(), MockError>, + }, + Error { + error: MockError, + }, + None, +} + +/// Fluent builder for [`MockPlanner`] test specs. +#[derive(Debug, Default)] +pub(crate) struct MockPlannerBuilder { + file_path: String, + steps: Vec, +} + +impl MockPlannerBuilder { + /// Adds one planning step that returns a single ready morsel. + pub(crate) fn return_morsel(mut self, morsel_id: MorselId, batch_id: i32) -> Self { + self.steps.push(PlannerStep::Morsel { + morsel_id, + batch_ids: vec![batch_id], + }); + self + } + + /// Adds one planning step that returns a morsel with multiple ready batches. + pub(crate) fn return_morsel_batches( + mut self, + morsel_id: MorselId, + batch_ids: Vec, + ) -> Self { + self.steps.push(PlannerStep::Morsel { + morsel_id, + batch_ids, + }); + self + } + + /// Adds one planning step that returns a single outstanding I/O future. + pub(crate) fn return_io( + mut self, + io_future_id: IoFutureId, + polls_to_resolve: PollsToResolve, + ) -> Self { + self.steps.push(PlannerStep::Io { + io_future_id, + polls_to_resolve, + result: Ok(()), + }); + self + } + + /// Adds one planning step that returns a failing I/O future. + pub(crate) fn return_io_error( + mut self, + io_future_id: IoFutureId, + polls_to_resolve: PollsToResolve, + message: impl Into, + ) -> Self { + self.steps.push(PlannerStep::Io { + io_future_id, + polls_to_resolve, + result: Err(MockError(message.into())), + }); + self + } + + /// Adds one planning step that reports the planner is exhausted. + pub(crate) fn return_none(mut self) -> Self { + self.steps.push(PlannerStep::None); + self + } + + /// Adds one planning step that fails during CPU planning. + pub(crate) fn return_error(mut self, message: impl Into) -> Self { + self.steps.push(PlannerStep::Error { + error: MockError(message.into()), + }); + self + } + + /// Finalizes the configured mock planner. + pub(crate) fn build(self) -> MockPlanner { + let Self { file_path, steps } = self; + + MockPlanner { + file_path, + steps: VecDeque::from(steps), + } + } +} + +/// Mock [`Morselizer`] that maps file paths to fixed planner specs. +#[derive(Debug, Clone, Default)] +pub(crate) struct MockMorselizer { + observer: MorselObserver, + files: HashMap, +} + +impl MockMorselizer { + /// Creates an empty mock morselizer. + pub(crate) fn new() -> Self { + Self::default() + } + + /// Returns the shared event observer for this test harness. + pub(crate) fn observer(&self) -> &MorselObserver { + &self.observer + } + + /// Associates a file path with the planner spec used to open it. + pub(crate) fn with_file(mut self, planner: MockPlanner) -> Self { + self.files.insert(planner.file_path.clone(), planner); + self + } +} + +impl Morselizer for MockMorselizer { + fn plan_file(&self, file: PartitionedFile) -> Result> { + let path = file.object_meta.location.to_string(); + self.observer + .push(MorselEvent::MorselizeFile { path: path.clone() }); + + let planner = self.files.get(&path).cloned().ok_or_else(|| { + internal_datafusion_err!("No mock planner configured for file: {path}") + })?; + + self.observer.push(MorselEvent::PlannerCreated { + planner_name: planner.file_path.clone(), + }); + + Ok(Box::new(MockMorselPlanner::new( + self.observer.clone(), + planner, + ))) + } +} + +/// Concrete mock planner that executes one predefined step per `plan()` call. +#[derive(Debug)] +struct MockMorselPlanner { + observer: MorselObserver, + planner_name: String, + steps: VecDeque, +} + +impl MockMorselPlanner { + /// Creates a concrete planner from its declarative test spec. + fn new(observer: MorselObserver, planner: MockPlanner) -> Self { + Self { + observer, + planner_name: planner.file_path, + steps: planner.steps, + } + } +} + +/// Rebuilds the mock planner continuation after one step completes. +fn remaining_planners( + observer: MorselObserver, + planner_name: String, + steps: VecDeque, +) -> Vec> { + let only_none_remaining = + matches!(steps.front(), Some(PlannerStep::None)) && steps.len() == 1; + + if steps.is_empty() || only_none_remaining { + Vec::new() + } else { + vec![Box::new(MockMorselPlanner { + observer, + planner_name, + steps, + }) as Box] + } +} + +impl MorselPlanner for MockMorselPlanner { + fn plan(self: Box) -> Result> { + let Self { + observer, + planner_name, + mut steps, + } = *self; + + observer.push(MorselEvent::PlannerCalled { + planner_name: planner_name.clone(), + }); + + let Some(step) = steps.pop_front() else { + return Ok(None); + }; + + match step { + PlannerStep::Morsel { + morsel_id, + batch_ids, + } => { + observer.push(MorselEvent::MorselProduced { + planner_name: planner_name.clone(), + morsel_id, + }); + Ok(Some( + MorselPlan::new() + .with_morsels(vec![Box::new(MockMorsel::new( + observer.clone(), + morsel_id, + batch_ids, + ))]) + .with_planners(remaining_planners( + observer.clone(), + planner_name.clone(), + steps, + )), + )) + } + PlannerStep::Io { + io_future_id, + polls_to_resolve, + result, + } => { + observer.push(MorselEvent::IoFutureCreated { + planner_name: planner_name.clone(), + io_future_id, + }); + let io_future = MockIoFuture::new( + observer.clone(), + planner_name.clone(), + io_future_id, + polls_to_resolve, + result, + ) + .map(move |result| { + result?; + Ok(Box::new(MockMorselPlanner { + observer, + planner_name, + steps, + }) as Box) + }) + .boxed(); + Ok(Some(MorselPlan::new().with_pending_planner(io_future))) + } + PlannerStep::Error { error } => { + Err(DataFusionError::External(Box::new(error))) + } + PlannerStep::None => Ok(None), + } + } +} + +/// Concrete morsel used by the mock scheduler tests. +#[derive(Debug)] +pub(crate) struct MockMorsel { + observer: MorselObserver, + morsel_id: MorselId, + batch_ids: Vec, +} + +impl MockMorsel { + /// Creates a mock morsel with a deterministic sequence of batches. + fn new(observer: MorselObserver, morsel_id: MorselId, batch_ids: Vec) -> Self { + Self { + observer, + morsel_id, + batch_ids, + } + } +} + +impl Morsel for MockMorsel { + fn into_stream(self: Box) -> BoxStream<'static, Result> { + self.observer.push(MorselEvent::MorselStreamStarted { + morsel_id: self.morsel_id, + }); + Box::pin(MockMorselStream { + observer: self.observer.clone(), + morsel_id: self.morsel_id, + batch_ids: self.batch_ids.into(), + finished: false, + }) + } +} + +/// Stream returned by [`MockMorsel::into_stream`]. +struct MockMorselStream { + observer: MorselObserver, + morsel_id: MorselId, + batch_ids: VecDeque, + finished: bool, +} + +impl futures::Stream for MockMorselStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + if let Some(batch_id) = self.batch_ids.pop_front() { + self.observer.push(MorselEvent::MorselStreamBatchProduced { + morsel_id: self.morsel_id, + batch_id: BatchId(batch_id as usize), + }); + return Poll::Ready(Some(Ok(single_value_batch(batch_id)))); + } + + if !self.finished { + self.finished = true; + self.observer.push(MorselEvent::MorselStreamFinished { + morsel_id: self.morsel_id, + }); + } + + Poll::Ready(None) + } +} + +/// Deterministic future used to simulate planner I/O in tests. +struct MockIoFuture { + observer: MorselObserver, + planner_name: String, + io_future_id: IoFutureId, + pending_polls_remaining: usize, + result: std::result::Result<(), MockError>, +} + +impl MockIoFuture { + /// Creates a future that resolves after `io_polls` pending polls. + fn new( + observer: MorselObserver, + planner_name: String, + io_future_id: IoFutureId, + polls_to_resolve: PollsToResolve, + result: std::result::Result<(), MockError>, + ) -> Self { + Self { + observer, + planner_name, + io_future_id, + pending_polls_remaining: polls_to_resolve.0, + result, + } + } +} + +impl Future for MockIoFuture { + type Output = Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.observer.push(MorselEvent::IoFuturePolled { + planner_name: self.planner_name.clone(), + io_future_id: self.io_future_id, + }); + + if self.pending_polls_remaining > 0 { + self.pending_polls_remaining -= 1; + cx.waker().wake_by_ref(); + return Poll::Pending; + } + + match &self.result { + Ok(()) => { + self.observer.push(MorselEvent::IoFutureResolved { + planner_name: self.planner_name.clone(), + io_future_id: self.io_future_id, + }); + Poll::Ready(Ok(())) + } + Err(e) => { + self.observer.push(MorselEvent::IoFutureErrored { + planner_name: self.planner_name.clone(), + io_future_id: self.io_future_id, + message: e.0.clone(), + }); + Poll::Ready(Err(DataFusionError::External(Box::new(e.clone())))) + } + } + } +} + +/// Creates a one-row batch so snapshot output stays compact and readable. +fn single_value_batch(value: i32) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])); + RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![value]))]).unwrap() +} diff --git a/datafusion/datasource/src/morsel/mod.rs b/datafusion/datasource/src/morsel/mod.rs index 5f200d779469..7b5066ca07a2 100644 --- a/datafusion/datasource/src/morsel/mod.rs +++ b/datafusion/datasource/src/morsel/mod.rs @@ -26,7 +26,12 @@ //! It is inspired by the paper [Morsel-Driven Parallelism: A NUMA-Aware Query //! Evaluation Framework for the Many-Core Age](https://db.in.tum.de/~leis/papers/morsels.pdf). +mod adapters; +#[cfg(test)] +pub(crate) mod mocks; + use crate::PartitionedFile; +pub(crate) use adapters::FileOpenerMorselizer; use arrow::array::RecordBatch; use datafusion_common::Result; use futures::FutureExt; From 393c03fc9220324d0bde2a1624dfa8413d4769a6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 9 Apr 2026 13:16:15 -0400 Subject: [PATCH 2/4] Address feedback from @adriangb --- datafusion/datasource-parquet/src/source.rs | 2 +- datafusion/datasource/src/file_stream/mod.rs | 2 +- .../datasource/src/file_stream/scan_state.rs | 65 +++++++++++++------ 3 files changed, 47 insertions(+), 22 deletions(-) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 8e96751df03b..d4dc4f1400a9 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -518,7 +518,7 @@ impl FileSource for ParquetSource { _partition: usize, ) -> datafusion_common::Result> { datafusion_common::internal_err!( - "ParquetSource::create_file_opener called but it supports the Morsel API" + "ParquetSource::create_file_opener called but it supports the Morsel API, please use that instead" ) } diff --git a/datafusion/datasource/src/file_stream/mod.rs b/datafusion/datasource/src/file_stream/mod.rs index ceb4686ea62f..4be2d765da3c 100644 --- a/datafusion/datasource/src/file_stream/mod.rs +++ b/datafusion/datasource/src/file_stream/mod.rs @@ -748,7 +748,7 @@ mod tests { } /// Verifies that a planner can traverse two sequential I/O phases before - /// producing one batch (similar to Parquet which does this0. + /// producing one batch, similar to Parquet. #[tokio::test] async fn morsel_two_ios_one_batch() -> Result<()> { let test = FileStreamMorselTest::new().with_file( diff --git a/datafusion/datasource/src/file_stream/scan_state.rs b/datafusion/datasource/src/file_stream/scan_state.rs index 5215c2ae7074..2d6f4756ee27 100644 --- a/datafusion/datasource/src/file_stream/scan_state.rs +++ b/datafusion/datasource/src/file_stream/scan_state.rs @@ -28,19 +28,39 @@ use futures::{FutureExt as _, StreamExt as _}; use super::{FileStreamMetrics, OnError}; -/// State [`FileStreamState::Scan`] +/// State [`FileStreamState::Scan`]. /// -/// Groups together ready planners, ready morsels, the active reader, -/// pending planner I/O, the remaining files and limit, and the metrics -/// associated with processing that work. +/// There is one `ScanState` per `FileStream`, and thus per output partition. +/// +/// It groups together the lifecycle of scanning that partition's files: +/// unopened files, CPU-ready planners, pending planner I/O, ready morsels, +/// the active reader, and the metrics associated with processing that work. +/// +/// # State Transitions +/// +/// ```text +/// file_iter +/// | +/// v +/// morselizer.plan_file(file) +/// | +/// v +/// ready_planners ---> plan() ---> ready_morsels ---> into_stream() ---> reader ---> RecordBatches +/// ^ | +/// | v +/// | pending_planner +/// | | +/// | v +/// +-------- poll until ready +/// ``` /// /// [`FileStreamState::Scan`]: super::FileStreamState::Scan pub(super) struct ScanState { /// Files that still need to be planned. file_iter: VecDeque, - /// Remaining record limit, if any. + /// Remaining row limit, if any. remain: Option, - /// The morselizer used to plan files + /// The morselizer used to plan files. morselizer: Box, /// Behavior if opening or scanning a file fails. on_error: OnError, @@ -50,7 +70,7 @@ pub(super) struct ScanState { ready_morsels: VecDeque>, /// The active reader, if any. reader: Option>>, - /// Planner currently doing I/O + /// The single planner currently blocked on I/O, if any. pending_planner: Option, /// Metrics for the active scan queues. metrics: FileStreamMetrics, @@ -83,8 +103,14 @@ impl ScanState { self.on_error = on_error; } - /// Drives one iteration of the active scan state, reading from morsels, - /// planners, pending planner I/O, or unopened files from `self`. + /// Drives one iteration of the active scan state. + /// + /// Work is attempted in this order: + /// 1. resolve any pending planner I/O + /// 2. poll the active reader + /// 3. turn a ready morsel into the active reader + /// 4. run CPU planning on a ready planner + /// 5. morselize the next unopened file /// /// The return [`ScanAndReturn`] tells `poll_inner` how to update the /// outer `FileStreamState`. @@ -120,17 +146,16 @@ impl ScanState { } } - // Next try and get the net batch from the active reader, if any + // Next try and get the next batch from the active reader, if any. if let Some(reader) = self.reader.as_mut() { match reader.poll_next_unpin(cx) { - // According to the API contract, readers should always be ready - // but in practice they may actually be waiting on IO, and if - // that happens wait for it here. + // Morsels should ideally only expose ready-to-decode streams, + // but tolerate pending readers here. Poll::Pending => return ScanAndReturn::Return(Poll::Pending), Poll::Ready(Some(Ok(batch))) => { self.metrics.time_scanning_until_data.stop(); self.metrics.time_scanning_total.stop(); - // check limit + // Apply any remaining row limit. let (batch, finished) = match &mut self.remain { Some(remain) => { if *remain > batch.num_rows() { @@ -179,9 +204,8 @@ impl ScanState { } } - // Don't have a reader but have morsels ready to turn into a reader, so do that. + // No active reader, but a morsel is ready to become the reader. if let Some(morsel) = self.ready_morsels.pop_front() { - self.metrics.files_opened.add(1); self.metrics.time_opening.stop(); self.metrics.time_scanning_until_data.start(); self.metrics.time_scanning_total.start(); @@ -189,11 +213,11 @@ impl ScanState { return ScanAndReturn::Continue; } - // Don't have a morsel or stream, so try and plan some more morsels + // No reader or morsel, so try to produce more work via CPU planning. if let Some(planner) = self.ready_planners.pop_front() { return match planner.plan() { Ok(Some(mut plan)) => { - // Get all morsels and planners and try again + // Queue any newly-ready morsels, planners, or planner I/O. self.ready_morsels.extend(plan.take_morsels()); self.ready_planners.extend(plan.take_ready_planners()); if let Some(pending_planner) = plan.take_pending_planner() { @@ -220,7 +244,7 @@ impl ScanState { }; } - // No planners, morsels, or active reader, so try and open the next file and plan it. + // No outstanding work remains, so morselize the next unopened file. let part_file = match self.file_iter.pop_front() { Some(part_file) => part_file, None => return ScanAndReturn::Done(None), @@ -229,6 +253,7 @@ impl ScanState { self.metrics.time_opening.start(); match self.morselizer.plan_file(part_file) { Ok(planner) => { + self.metrics.files_opened.add(1); self.ready_planners.push_back(planner); ScanAndReturn::Continue } @@ -247,7 +272,7 @@ impl ScanState { /// What should be done on the next iteration of [`ScanState::poll_scan`]? pub(super) enum ScanAndReturn { - /// Poll again + /// Poll again. Continue, /// Return the provided result without changing the outer state. Return(Poll>>), From 6b79a6f8367290670ea93def86fb0a1b01d52cac Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 3 Apr 2026 13:42:21 -0400 Subject: [PATCH 3/4] Add Dynamic work scheduling in FileStream --- Cargo.lock | 1 + datafusion/datasource/Cargo.toml | 1 + datafusion/datasource/src/file_scan_config.rs | 40 +- .../datasource/src/file_stream/builder.rs | 28 +- datafusion/datasource/src/file_stream/mod.rs | 581 ++++++++++++++++-- .../datasource/src/file_stream/scan_state.rs | 22 +- .../datasource/src/file_stream/work_source.rs | 89 +++ 7 files changed, 703 insertions(+), 59 deletions(-) create mode 100644 datafusion/datasource/src/file_stream/work_source.rs diff --git a/Cargo.lock b/Cargo.lock index 3f7ab2ebaa7d..17426736268c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1968,6 +1968,7 @@ dependencies = [ "liblzma", "log", "object_store", + "parking_lot", "rand 0.9.2", "tempfile", "tokio", diff --git a/datafusion/datasource/Cargo.toml b/datafusion/datasource/Cargo.toml index 402752165897..40e2271f4520 100644 --- a/datafusion/datasource/Cargo.toml +++ b/datafusion/datasource/Cargo.toml @@ -64,6 +64,7 @@ itertools = { workspace = true } liblzma = { workspace = true, optional = true } log = { workspace = true } object_store = { workspace = true } +parking_lot = { workspace = true } rand = { workspace = true } tempfile = { workspace = true, optional = true } tokio = { workspace = true } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 2aa5b6a88860..018ddf22e0ac 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -22,7 +22,8 @@ use crate::file_groups::FileGroup; use crate::{ PartitionedFile, display::FileGroupsDisplay, file::FileSource, file_compression_type::FileCompressionType, file_stream::FileStreamBuilder, - source::DataSource, statistics::MinMaxStatistics, + file_stream::work_source::SharedWorkSource, source::DataSource, + statistics::MinMaxStatistics, }; use arrow::datatypes::FieldRef; use arrow::datatypes::{DataType, Schema, SchemaRef}; @@ -55,7 +56,13 @@ use datafusion_physical_plan::{ metrics::ExecutionPlanMetricsSet, }; use log::{debug, warn}; -use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc}; +use std::{ + any::Any, + fmt::Debug, + fmt::Formatter, + fmt::Result as FmtResult, + sync::{Arc, OnceLock}, +}; /// [`FileScanConfig`] represents scanning data from a group of files /// @@ -209,6 +216,11 @@ pub struct FileScanConfig { /// If the number of file partitions > target_partitions, the file partitions will be grouped /// in a round-robin fashion such that number of file partitions = target_partitions. pub partitioned_by_file_group: bool, + /// Shared queue of unopened files for sibling streams in this scan. + /// + /// This is initialized once per `FileScanConfig` and reused by reorderable + /// `FileStream`s created from that config. + pub(crate) shared_work_source: Arc>, } /// A builder for [`FileScanConfig`]'s. @@ -551,10 +563,34 @@ impl FileScanConfigBuilder { expr_adapter_factory: expr_adapter, statistics, partitioned_by_file_group, + shared_work_source: Arc::new(OnceLock::new()), } } } +impl FileScanConfig { + /// Returns the shared unopened-file queue for reorderable streams in this scan. + /// + /// The queue is initialized once from all file groups so sibling streams + /// can begin stealing work immediately, even if they are built or polled + /// before every sibling `FileStream` has been constructed. + pub(crate) fn shared_work_source(&self) -> Option { + if self.preserve_order || self.partitioned_by_file_group { + return None; + } + + Some( + self.shared_work_source + .get_or_init(|| { + SharedWorkSource::new( + self.file_groups.iter().flat_map(FileGroup::iter).cloned(), + ) + }) + .clone(), + ) + } +} + impl From for FileScanConfigBuilder { fn from(config: FileScanConfig) -> Self { Self { diff --git a/datafusion/datasource/src/file_stream/builder.rs b/datafusion/datasource/src/file_stream/builder.rs index efe9c39ce3b3..c8b049e63a1d 100644 --- a/datafusion/datasource/src/file_stream/builder.rs +++ b/datafusion/datasource/src/file_stream/builder.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use crate::file_scan_config::FileScanConfig; use crate::file_stream::scan_state::ScanState; +use crate::file_stream::work_source::{SharedWorkSource, WorkSource}; use crate::morsel::{FileOpenerMorselizer, Morselizer}; use datafusion_common::{Result, internal_err}; use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; @@ -26,6 +27,17 @@ use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet use super::metrics::FileStreamMetrics; use super::{FileOpener, FileStream, FileStreamState, OnError}; +/// Whether this stream may reorder work across sibling `FileStream`s. +/// +/// This is derived entirely from [`FileScanConfig`]. Streams that must +/// preserve file order or file-group partition boundaries are not reorderable. +enum Reorderable { + /// This stream may reorder work using a shared queue of unopened files. + Yes(SharedWorkSource), + /// This stream must keep its own local file order. + No, +} + /// Builder for constructing a [`FileStream`]. pub struct FileStreamBuilder<'a> { config: &'a FileScanConfig, @@ -33,17 +45,24 @@ pub struct FileStreamBuilder<'a> { morselizer: Option>, metrics: Option<&'a ExecutionPlanMetricsSet>, on_error: OnError, + reorderable: Reorderable, } impl<'a> FileStreamBuilder<'a> { - /// Create a new builder. + /// Create a new builder for [`FileStream`]. pub fn new(config: &'a FileScanConfig) -> Self { + let reorderable = match config.shared_work_source() { + Some(shared_work_source) => Reorderable::Yes(shared_work_source), + None => Reorderable::No, + }; + Self { config, partition: None, morselizer: None, metrics: None, on_error: OnError::Fail, + reorderable, } } @@ -89,6 +108,7 @@ impl<'a> FileStreamBuilder<'a> { morselizer, metrics, on_error, + reorderable, } = self; let Some(partition) = partition else { @@ -106,10 +126,14 @@ impl<'a> FileStreamBuilder<'a> { "FileStreamBuilder invalid partition index: {partition}" ); }; + let work_source = match reorderable { + Reorderable::Yes(shared) => WorkSource::Shared(shared), + Reorderable::No => WorkSource::Local(file_group.into_inner().into()), + }; let file_stream_metrics = FileStreamMetrics::new(metrics, partition); let scan_state = Box::new(ScanState::new( - file_group.into_inner(), + work_source, config.limit, morselizer, on_error, diff --git a/datafusion/datasource/src/file_stream/mod.rs b/datafusion/datasource/src/file_stream/mod.rs index 4be2d765da3c..5f7a5b4c7c2e 100644 --- a/datafusion/datasource/src/file_stream/mod.rs +++ b/datafusion/datasource/src/file_stream/mod.rs @@ -24,6 +24,7 @@ mod builder; mod metrics; mod scan_state; +pub(crate) mod work_source; use std::pin::Pin; use std::sync::Arc; @@ -183,14 +184,21 @@ mod tests { use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::{FutureExt as _, StreamExt as _}; + use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; - use crate::file_stream::{FileOpenFuture, FileOpener, FileStreamBuilder, OnError}; + use crate::file_stream::{ + FileOpenFuture, FileOpener, FileStream, FileStreamBuilder, OnError, + }; use crate::test_util::MockSource; use datafusion_common::{assert_batches_eq, exec_err, internal_err}; + /// Test identifier for one `FileStream` partition. + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] + struct PartitionId(usize); + /// Test `FileOpener` which will simulate errors during file opening or scanning #[derive(Default)] struct TestOpener { @@ -918,11 +926,276 @@ mod tests { Ok(()) } - /// Tests how FileStream opens and processes files. + /// Return a morsel test with two partitions: + /// Partition 0: file1, file2, file3 + /// Partition 1: file4 + /// + /// Partition 1 has only 1 file but it polled first 4 times + fn two_partition_morsel_test() -> FileStreamMorselTest { + FileStreamMorselTest::new() + // Partition 0 has three files + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file1.parquet") + .return_morsel(MorselId(10), 101) + .return_none() + .build(), + ) + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file2.parquet") + .return_morsel(MorselId(11), 102) + .return_none() + .build(), + ) + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file3.parquet") + .return_morsel(MorselId(12), 103) + .return_none() + .build(), + ) + // Partition 1 has only one file, but is polled first + .with_file_in_partition( + PartitionId(1), + MockPlanner::builder("file4.parquet") + .return_morsel(MorselId(13), 201) + .return_none() + .build(), + ) + .with_reads(vec![ + PartitionId(1), + PartitionId(1), + PartitionId(1), + PartitionId(1), + PartitionId(1), + ]) + } + + /// Verifies that an idle sibling stream can steal shared files from + /// another stream once it exhausts its own local work. + #[tokio::test] + async fn morsel_shared_files_can_be_stolen() -> Result<()> { + let test = two_partition_morsel_test().with_file_stream_events(false); + + // Partition 0 starts with 3 files, but Partition 1 is polled first. + // Since Partition is polled first, it will run all the files even those + // that were assigned to Partition 0. + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Done + ----- Partition 1 ----- + Batch: 101 + Batch: 102 + Batch: 103 + Batch: 201 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Verifies that a stream that must preserve order keeps its files local + /// and therefore cannot steal from a sibling shared queue. + #[tokio::test] + async fn morsel_preserve_order_keeps_files_local() -> Result<()> { + // same fixture as `morsel_shared_files_can_be_stolen` but marked as + // preserve-order + let test = two_partition_morsel_test() + .with_preserve_order(true) + .with_file_stream_events(false); + + // Even though that Partition 1 is polled first, it can not steal files + // from partition 0. The three files originally assigned to Partition 0 + // must be evaluated by Partition 0. + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Batch: 101 + Batch: 102 + Batch: 103 + Done + ----- Partition 1 ----- + Batch: 201 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Verifies that `partitioned_by_file_group` disables shared work stealing. + #[tokio::test] + async fn morsel_partitioned_by_file_group_keeps_files_local() -> Result<()> { + // same fixture as `morsel_shared_files_can_be_stolen` but marked as + // preserve-partitioned + let test = two_partition_morsel_test() + .with_partitioned_by_file_group(true) + .with_file_stream_events(false); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Batch: 101 + Batch: 102 + Batch: 103 + Done + ----- Partition 1 ----- + Batch: 201 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Verifies that an empty sibling can immediately steal shared files when + /// it is polled before the stream that originally owned them. + #[tokio::test] + async fn morsel_empty_sibling_can_steal() -> Result<()> { + let test = FileStreamMorselTest::new() + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file1.parquet") + .return_morsel(MorselId(10), 101) + .return_none() + .build(), + ) + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file2.parquet") + .return_morsel(MorselId(11), 102) + .return_none() + .build(), + ) + // Poll the empty sibling first so it steals both files. + .with_reads(vec![PartitionId(1), PartitionId(1), PartitionId(1)]) + .with_file_stream_events(false); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Done + ----- Partition 1 ----- + Batch: 101 + Batch: 102 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Ensures that if an sibling is built and polled + /// before another sibling has been built and contributed its files to the + /// shared queue, the first sibling does not finish prematurely + #[tokio::test] + async fn morsel_empty_sibling_can_finish_before_shared_work_exists() -> Result<()> { + let test = FileStreamMorselTest::new() + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file1.parquet") + .return_morsel(MorselId(10), 101) + .return_none() + .build(), + ) + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file2.parquet") + .return_morsel(MorselId(11), 102) + .return_none() + .build(), + ) + // Build streams lazily so partition 1 can poll the shared queue + // before partition 0 has contributed its files. Once partition 0 + // is built, a later poll of partition 1 can still steal one of + // them from the shared queue. + .with_build_streams_on_first_read(true) + .with_reads(vec![PartitionId(1), PartitionId(0), PartitionId(1)]) + .with_file_stream_events(false); + + // Partition 1 polls too early once, then later steals one file after + // partition 0 has populated the shared queue. + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Batch: 102 + Done + ----- Partition 1 ----- + Batch: 101 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Verifies that one fast sibling can drain shared files that originated + /// in more than one other partition. + #[tokio::test] + async fn morsel_one_sibling_can_drain_multiple_siblings() -> Result<()> { + let test = FileStreamMorselTest::new() + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file1.parquet") + .return_morsel(MorselId(10), 101) + .return_none() + .build(), + ) + // Partition 1 has two files + .with_file_in_partition( + PartitionId(1), + MockPlanner::builder("file2.parquet") + .return_morsel(MorselId(11), 102) + .return_none() + .build(), + ) + .with_file_in_partition( + PartitionId(1), + MockPlanner::builder("file3.parquet") + .return_morsel(MorselId(12), 103) + .return_none() + .build(), + ) + // Partition 2 starts empty but is polled first, so it should drain + // the shared queue across both sibling partitions. + .with_reads(vec![ + PartitionId(2), + PartitionId(2), + PartitionId(1), + PartitionId(2), + ]) + .with_file_stream_events(false); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Done + ----- Partition 1 ----- + Batch: 103 + Done + ----- Partition 2 ----- + Batch: 101 + Batch: 102 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Tests how one or more `FileStream`s consume morselized file work. #[derive(Clone)] struct FileStreamMorselTest { morselizer: MockMorselizer, - file_names: Vec, + partition_files: BTreeMap>, + preserve_order: bool, + partitioned_by_file_group: bool, + file_stream_events: bool, + build_streams_on_first_read: bool, + reads: Vec, limit: Option, } @@ -931,74 +1204,225 @@ mod tests { fn new() -> Self { Self { morselizer: MockMorselizer::new(), - file_names: vec![], + partition_files: BTreeMap::new(), + preserve_order: false, + partitioned_by_file_group: false, + file_stream_events: true, + build_streams_on_first_read: false, + reads: vec![], limit: None, } } - /// Adds one file and its root planner to the test input. - fn with_file(mut self, planner: MockPlanner) -> Self { - self.file_names.push(planner.file_path().to_string()); + /// Adds one file to partition 0. + fn with_file(self, planner: MockPlanner) -> Self { + self.with_file_in_partition(PartitionId(0), planner) + } + + /// Adds one file and its root planner to the specified input partition. + fn with_file_in_partition( + mut self, + partition: PartitionId, + planner: MockPlanner, + ) -> Self { + self.partition_files + .entry(partition) + .or_default() + .push(planner.file_path().to_string()); self.morselizer = self.morselizer.with_file(planner); self } - /// Sets a global output limit for the stream. + /// Marks the stream (and all partitions) to preserve the specified file + /// order. + fn with_preserve_order(mut self, preserve_order: bool) -> Self { + self.preserve_order = preserve_order; + self + } + + /// Marks the test scan as pre-partitioned by file group, which should + /// force each stream to keep its own files local. + fn with_partitioned_by_file_group( + mut self, + partitioned_by_file_group: bool, + ) -> Self { + self.partitioned_by_file_group = partitioned_by_file_group; + self + } + + /// Controls whether scheduler events are included in the snapshot. + /// + /// When disabled, `run()` still includes the event section header but + /// replaces the trace with a fixed placeholder so tests can focus only + /// on the output batches. + fn with_file_stream_events(mut self, file_stream_events: bool) -> Self { + self.file_stream_events = file_stream_events; + self + } + + /// Controls whether streams are all built up front or lazily on their + /// first read. + /// + /// The default builds all streams before polling begins, which matches + /// normal execution. Tests may enable lazy creation to model races + /// where one sibling polls before another has contributed its files to + /// the shared queue. + fn with_build_streams_on_first_read( + mut self, + build_streams_on_first_read: bool, + ) -> Self { + self.build_streams_on_first_read = build_streams_on_first_read; + self + } + + /// Sets the partition polling order. + /// + /// `run()` polls these partitions in the listed order first. After + /// those explicit reads are exhausted, it completes to round + /// robin across all configured partitions, skipping any streams that + /// have already finished. + /// + /// This allows testing early scheduling decisions explicit in a test + /// while avoiding a fully scripted poll trace for the remainder. + fn with_reads(mut self, reads: Vec) -> Self { + self.reads = reads; + self + } + + /// Sets a global output limit for all streams created by this test. fn with_limit(mut self, limit: usize) -> Self { self.limit = Some(limit); self } - /// Runs the test returns combined output and scheduler trace text as a String. + /// Runs the test and returns combined stream output and scheduler + /// trace text. async fn run(self) -> Result { let observer = self.morselizer.observer().clone(); observer.clear(); - let config = self.test_config(); let metrics_set = ExecutionPlanMetricsSet::new(); - let mut stream = FileStreamBuilder::new(&config) - .with_partition(0) - .with_morselizer(Box::new(self.morselizer)) - .with_metrics(&metrics_set) - .build()?; + let partition_count = self.num_partitions(); - let mut stream_contents = Vec::new(); - while let Some(result) = stream.next().await { - match result { - Ok(batch) => { - let col = batch.column(0).as_primitive::(); - let batch_id = col.value(0); - stream_contents.push(format!("Batch: {batch_id}")); - } - Err(e) => { - // Pull the actual message for external errors rather than - // relying on DataFusionError formatting, which changes - // if backtraces are enabled, etc - let message = if let DataFusionError::External(generic) = e { - generic.to_string() - } else { - e.to_string() - }; - stream_contents.push(format!("Error: {message}")); - } + let mut partitions = (0..partition_count) + .map(|_| PartitionState::new()) + .collect::>(); + + let mut build_order = Vec::new(); + for partition in self.reads.iter().map(|partition| partition.0) { + if !build_order.contains(&partition) { + build_order.push(partition); + } + } + for partition in 0..partition_count { + if !build_order.contains(&partition) { + build_order.push(partition); } } - stream_contents.push("Done".to_string()); - Ok(format!( - "----- Output Stream -----\n{}\n----- File Stream Events -----\n{}", - stream_contents.join("\n"), + let config = self.test_config(); + if !self.build_streams_on_first_read { + for partition in build_order { + let stream = FileStreamBuilder::new(&config) + .with_partition(partition) + .with_morselizer(Box::new(self.morselizer.clone())) + .with_metrics(&metrics_set) + .build()?; + partitions[partition].set_stream(stream); + } + } + + let mut initial_reads: VecDeque<_> = self.reads.into(); + let mut next_round_robin = 0; + + while !initial_reads.is_empty() + || partitions.iter().any(PartitionState::is_active) + { + let partition = if let Some(partition) = initial_reads.pop_front() { + partition.0 + } else { + let partition = next_round_robin; + next_round_robin = (next_round_robin + 1) % partition_count.max(1); + partition + }; + + let partition_state = &mut partitions[partition]; + + if self.build_streams_on_first_read && !partition_state.built { + let stream = FileStreamBuilder::new(&config) + .with_partition(partition) + .with_morselizer(Box::new(self.morselizer.clone())) + .with_metrics(&metrics_set) + .build()?; + partition_state.set_stream(stream); + } + + let Some(stream) = partition_state.stream.as_mut() else { + continue; + }; + + match stream.next().await { + Some(result) => partition_state.push_output(format_result(result)), + None => partition_state.finish(), + } + } + + let output_text = if partition_count == 1 { + format!( + "----- Output Stream -----\n{}", + partitions[0].output.join("\n") + ) + } else { + partitions + .into_iter() + .enumerate() + .map(|(partition, state)| { + format!( + "----- Partition {} -----\n{}", + partition, + state.output.join("\n") + ) + }) + .collect::>() + .join("\n") + }; + + let file_stream_events = if self.file_stream_events { observer.format_events() + } else { + "(omitted due to with_file_stream_events(false))".to_string() + }; + + Ok(format!( + "{output_text}\n----- File Stream Events -----\n{file_stream_events}", )) } - /// Builds the `FileScanConfig` for the configured mock file set. + /// Returns the number of configured partitions, including empty ones + /// that appear only in the explicit read schedule. + fn num_partitions(&self) -> usize { + self.partition_files + .keys() + .map(|partition| partition.0 + 1) + .chain(self.reads.iter().map(|partition| partition.0 + 1)) + .max() + .unwrap_or(1) + } + + /// Builds a `FileScanConfig` covering every configured partition. fn test_config(&self) -> FileScanConfig { - let file_group = self - .file_names - .iter() - .map(|name| PartitionedFile::new(name, 10)) - .collect(); + let file_groups = (0..self.num_partitions()) + .map(|partition| { + self.partition_files + .get(&PartitionId(partition)) + .into_iter() + .flat_map(|files| files.iter()) + .map(|name| PartitionedFile::new(name, 10)) + .collect::>() + .into() + }) + .collect::>(); + let table_schema = TableSchema::new( Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])), vec![], @@ -1007,9 +1431,76 @@ mod tests { ObjectStoreUrl::parse("test:///").unwrap(), Arc::new(MockSource::new(table_schema)), ) - .with_file_group(file_group) + .with_file_groups(file_groups) .with_limit(self.limit) + .with_preserve_order(self.preserve_order) + .with_partitioned_by_file_group(self.partitioned_by_file_group) .build() } } + + /// Formats one stream poll result into a stable snapshot line. + fn format_result(result: Result) -> String { + match result { + Ok(batch) => { + let col = batch.column(0).as_primitive::(); + let batch_id = col.value(0); + format!("Batch: {batch_id}") + } + Err(e) => { + // Pull the actual message for external errors rather than + // relying on DataFusionError formatting, which changes if + // backtraces are enabled, etc. + let message = if let DataFusionError::External(generic) = e { + generic.to_string() + } else { + e.to_string() + }; + format!("Error: {message}") + } + } + } + + /// Test-only state for one stream partition in [`FileStreamMorselTest`]. + struct PartitionState { + /// Whether the `FileStream` for this partition has been built yet. + built: bool, + /// The live stream, if this partition has not finished yet. + stream: Option, + /// Snapshot lines produced by this partition. + output: Vec, + } + + impl PartitionState { + /// Create an unbuilt partition with no output yet. + fn new() -> Self { + Self { + built: false, + stream: None, + output: vec![], + } + } + + /// Returns true if this partition might still produce output. + fn is_active(&self) -> bool { + !self.built || self.stream.is_some() + } + + /// Records that this partition's stream has been built. + fn set_stream(&mut self, stream: FileStream) { + self.stream = Some(stream); + self.built = true; + } + + /// Records one formatted output line for this partition. + fn push_output(&mut self, line: String) { + self.output.push(line); + } + + /// Marks this partition as finished. + fn finish(&mut self) { + self.push_output("Done".to_string()); + self.stream = None; + } + } } diff --git a/datafusion/datasource/src/file_stream/scan_state.rs b/datafusion/datasource/src/file_stream/scan_state.rs index 2d6f4756ee27..275840dd0401 100644 --- a/datafusion/datasource/src/file_stream/scan_state.rs +++ b/datafusion/datasource/src/file_stream/scan_state.rs @@ -18,7 +18,6 @@ use std::collections::VecDeque; use std::task::{Context, Poll}; -use crate::PartitionedFile; use crate::morsel::{Morsel, MorselPlanner, Morselizer, PendingMorselPlanner}; use arrow::record_batch::RecordBatch; use datafusion_common::{DataFusionError, Result}; @@ -26,6 +25,7 @@ use datafusion_physical_plan::metrics::ScopedTimerGuard; use futures::stream::BoxStream; use futures::{FutureExt as _, StreamExt as _}; +use super::work_source::WorkSource; use super::{FileStreamMetrics, OnError}; /// State [`FileStreamState::Scan`]. @@ -39,7 +39,7 @@ use super::{FileStreamMetrics, OnError}; /// # State Transitions /// /// ```text -/// file_iter +/// work_source /// | /// v /// morselizer.plan_file(file) @@ -56,8 +56,8 @@ use super::{FileStreamMetrics, OnError}; /// /// [`FileStreamState::Scan`]: super::FileStreamState::Scan pub(super) struct ScanState { - /// Files that still need to be planned. - file_iter: VecDeque, + /// Unopened files that still need to be planned for this stream. + work_source: WorkSource, /// Remaining row limit, if any. remain: Option, /// The morselizer used to plan files. @@ -71,6 +71,9 @@ pub(super) struct ScanState { /// The active reader, if any. reader: Option>>, /// The single planner currently blocked on I/O, if any. + /// + /// Once the I/O completes, yields the next planner and is pushed back + /// onto `ready_planners`. pending_planner: Option, /// Metrics for the active scan queues. metrics: FileStreamMetrics, @@ -78,15 +81,14 @@ pub(super) struct ScanState { impl ScanState { pub(super) fn new( - file_iter: impl Into>, + work_source: WorkSource, remain: Option, morselizer: Box, on_error: OnError, metrics: FileStreamMetrics, ) -> Self { - let file_iter = file_iter.into(); Self { - file_iter, + work_source, remain, morselizer, on_error, @@ -164,7 +166,7 @@ impl ScanState { (batch, false) } else { let batch = batch.slice(0, *remain); - let done = 1 + self.file_iter.len(); + let done = 1 + self.work_source.len(); self.metrics.files_processed.add(done); *remain = 0; (batch, true) @@ -244,8 +246,8 @@ impl ScanState { }; } - // No outstanding work remains, so morselize the next unopened file. - let part_file = match self.file_iter.pop_front() { + // No outstanding work remains, so begin planning the next unopened file. + let part_file = match self.work_source.pop_front() { Some(part_file) => part_file, None => return ScanAndReturn::Done(None), }; diff --git a/datafusion/datasource/src/file_stream/work_source.rs b/datafusion/datasource/src/file_stream/work_source.rs new file mode 100644 index 000000000000..23e82c51bf0f --- /dev/null +++ b/datafusion/datasource/src/file_stream/work_source.rs @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::VecDeque; +use std::sync::Arc; + +use crate::PartitionedFile; +use parking_lot::Mutex; + +/// Source of unopened files for one `ScanState`. +/// +/// Streams that may share work across siblings use [`WorkSource::Shared`], +/// while streams that must preserve their own file order or output partition +/// boundaries keep their files in [`WorkSource::Local`]. +pub(super) enum WorkSource { + /// Files this stream will plan locally without sharing them. + Local(VecDeque), + /// Files shared with sibling streams. + Shared(SharedWorkSource), +} + +impl WorkSource { + /// Pop the next file to plan from this work source. + pub(super) fn pop_front(&mut self) -> Option { + match self { + Self::Local(files) => files.pop_front(), + Self::Shared(shared) => shared.pop_front(), + } + } + + /// Return the number of files that are still waiting to be planned. + pub(super) fn len(&self) -> usize { + match self { + Self::Local(files) => files.len(), + Self::Shared(shared) => shared.len(), + } + } +} + +/// Shared source of unopened files that sibling `FileStream`s may steal from. +/// +/// Each sibling contributes its initial file group into the shared queue during +/// construction. Later, whichever stream becomes idle first may take the next +/// unopened file from the front of that queue. +#[derive(Debug, Clone)] +pub(crate) struct SharedWorkSource { + inner: Arc, +} + +#[derive(Debug, Default)] +pub(super) struct SharedWorkSourceInner { + files: Mutex>, +} + +impl SharedWorkSource { + /// Create a shared work source containing the provided unopened files. + pub(crate) fn new(files: impl IntoIterator) -> Self { + let files = files.into_iter().collect(); + Self { + inner: Arc::new(SharedWorkSourceInner { + files: Mutex::new(files), + }), + } + } + + /// Pop the next file from the shared work queue. + fn pop_front(&self) -> Option { + self.inner.files.lock().pop_front() + } + + /// Return the number of files still waiting in the shared queue. + fn len(&self) -> usize { + self.inner.files.lock().len() + } +} From e80caabe29ffb3a4b53fb4e85b05da11d58f1902 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 9 Apr 2026 16:17:35 -0400 Subject: [PATCH 4/4] Revert "simple" This reverts commit c479ed68e02a4337eb9cbafa8ec1d70b4e5c2345. --- datafusion/datasource/src/file_scan_config.rs | 23 -------------- .../datasource/src/file_stream/builder.rs | 20 +++++++++--- datafusion/datasource/src/file_stream/mod.rs | 10 +++--- .../datasource/src/file_stream/work_source.rs | 31 +++++++++++++++---- 4 files changed, 45 insertions(+), 39 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 018ddf22e0ac..822a8ca9de0d 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -568,29 +568,6 @@ impl FileScanConfigBuilder { } } -impl FileScanConfig { - /// Returns the shared unopened-file queue for reorderable streams in this scan. - /// - /// The queue is initialized once from all file groups so sibling streams - /// can begin stealing work immediately, even if they are built or polled - /// before every sibling `FileStream` has been constructed. - pub(crate) fn shared_work_source(&self) -> Option { - if self.preserve_order || self.partitioned_by_file_group { - return None; - } - - Some( - self.shared_work_source - .get_or_init(|| { - SharedWorkSource::new( - self.file_groups.iter().flat_map(FileGroup::iter).cloned(), - ) - }) - .clone(), - ) - } -} - impl From for FileScanConfigBuilder { fn from(config: FileScanConfig) -> Self { Self { diff --git a/datafusion/datasource/src/file_stream/builder.rs b/datafusion/datasource/src/file_stream/builder.rs index c8b049e63a1d..e58c1c7109a2 100644 --- a/datafusion/datasource/src/file_stream/builder.rs +++ b/datafusion/datasource/src/file_stream/builder.rs @@ -51,9 +51,14 @@ pub struct FileStreamBuilder<'a> { impl<'a> FileStreamBuilder<'a> { /// Create a new builder for [`FileStream`]. pub fn new(config: &'a FileScanConfig) -> Self { - let reorderable = match config.shared_work_source() { - Some(shared_work_source) => Reorderable::Yes(shared_work_source), - None => Reorderable::No, + let reorderable = if config.preserve_order || config.partitioned_by_file_group { + Reorderable::No + } else { + let shared_work_source = config + .shared_work_source + .get_or_init(SharedWorkSource::new) + .clone(); + Reorderable::Yes(shared_work_source) }; Self { @@ -126,9 +131,14 @@ impl<'a> FileStreamBuilder<'a> { "FileStreamBuilder invalid partition index: {partition}" ); }; + let files = file_group.into_inner(); let work_source = match reorderable { - Reorderable::Yes(shared) => WorkSource::Shared(shared), - Reorderable::No => WorkSource::Local(file_group.into_inner().into()), + Reorderable::Yes(shared) => { + shared.register_stream(); + shared.push_files(files); + WorkSource::Shared(shared) + } + Reorderable::No => WorkSource::Local(files.into()), }; let file_stream_metrics = FileStreamMetrics::new(metrics, partition); diff --git a/datafusion/datasource/src/file_stream/mod.rs b/datafusion/datasource/src/file_stream/mod.rs index 5f7a5b4c7c2e..08435f4bb60c 100644 --- a/datafusion/datasource/src/file_stream/mod.rs +++ b/datafusion/datasource/src/file_stream/mod.rs @@ -980,15 +980,15 @@ mod tests { // Partition 0 starts with 3 files, but Partition 1 is polled first. // Since Partition is polled first, it will run all the files even those - // that were assigned to Partition 0. + // that were asssigned to Partition 0. insta::assert_snapshot!(test.run().await.unwrap(), @r" ----- Partition 0 ----- Done ----- Partition 1 ----- + Batch: 201 Batch: 101 Batch: 102 Batch: 103 - Batch: 201 Done ----- File Stream Events ----- (omitted due to with_file_stream_events(false)) @@ -1120,10 +1120,10 @@ mod tests { // partition 0 has populated the shared queue. insta::assert_snapshot!(test.run().await.unwrap(), @r" ----- Partition 0 ----- + Batch: 101 Batch: 102 Done ----- Partition 1 ----- - Batch: 101 Done ----- File Stream Events ----- (omitted due to with_file_stream_events(false)) @@ -1173,11 +1173,11 @@ mod tests { ----- Partition 0 ----- Done ----- Partition 1 ----- - Batch: 103 + Batch: 101 Done ----- Partition 2 ----- - Batch: 101 Batch: 102 + Batch: 103 Done ----- File Stream Events ----- (omitted due to with_file_stream_events(false)) diff --git a/datafusion/datasource/src/file_stream/work_source.rs b/datafusion/datasource/src/file_stream/work_source.rs index 23e82c51bf0f..94064d408423 100644 --- a/datafusion/datasource/src/file_stream/work_source.rs +++ b/datafusion/datasource/src/file_stream/work_source.rs @@ -17,6 +17,7 @@ use std::collections::VecDeque; use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use crate::PartitionedFile; use parking_lot::Mutex; @@ -64,19 +65,29 @@ pub(crate) struct SharedWorkSource { #[derive(Debug, Default)] pub(super) struct SharedWorkSourceInner { files: Mutex>, + /// bookkeeping for the participating siblings and is intended to support + /// later coordination improvements. + active_streams: AtomicUsize, } impl SharedWorkSource { - /// Create a shared work source containing the provided unopened files. - pub(crate) fn new(files: impl IntoIterator) -> Self { - let files = files.into_iter().collect(); + /// Create an empty shared work source. + pub(crate) fn new() -> Self { Self { - inner: Arc::new(SharedWorkSourceInner { - files: Mutex::new(files), - }), + inner: Arc::new(SharedWorkSourceInner::default()), } } + /// Register one active stream that may pull from this shared queue. + pub(super) fn register_stream(&self) { + self.inner.active_streams.fetch_add(1, Ordering::Relaxed); + } + + /// Add newly discovered files into the shared work queue. + pub(super) fn push_files(&self, files: impl IntoIterator) { + self.inner.files.lock().extend(files); + } + /// Pop the next file from the shared work queue. fn pop_front(&self) -> Option { self.inner.files.lock().pop_front() @@ -87,3 +98,11 @@ impl SharedWorkSource { self.inner.files.lock().len() } } + +impl Drop for SharedWorkSource { + fn drop(&mut self) { + if Arc::strong_count(&self.inner) > 1 { + self.inner.active_streams.fetch_sub(1, Ordering::Relaxed); + } + } +}