From 53aa7380e7c4996dc81eb0bb482dc6733df9369e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 30 Mar 2026 15:28:12 -0400 Subject: [PATCH 1/7] Add `FileStreamBuilder` for creating FileStreams --- datafusion/datasource/src/file_scan_config.rs | 9 +- datafusion/datasource/src/file_stream.rs | 185 ++++++++++++++++-- 2 files changed, 177 insertions(+), 17 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 773e6b2055fa0..8c77947fd3741 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -21,7 +21,8 @@ use crate::file_groups::FileGroup; use crate::{ PartitionedFile, display::FileGroupsDisplay, file::FileSource, - file_compression_type::FileCompressionType, file_stream::FileStream, + file_compression_type::FileCompressionType, + file_stream::FileStreamBuilder, source::DataSource, statistics::MinMaxStatistics, }; use arrow::datatypes::FieldRef; @@ -588,7 +589,11 @@ impl DataSource for FileScanConfig { let opener = source.create_file_opener(object_store, self, partition)?; - let stream = FileStream::new(self, partition, opener, source.metrics())?; + let stream = FileStreamBuilder::new(self) + .with_partition(partition) + .with_file_opener(opener) + .with_metrics(source.metrics()) + .build()?; Ok(Box::pin(cooperative(stream))) } diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index 8a4ec4a7f1d1a..03f2be90a8b5f 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -29,7 +29,7 @@ use std::task::{Context, Poll}; use crate::PartitionedFile; use crate::file_scan_config::FileScanConfig; use arrow::datatypes::SchemaRef; -use datafusion_common::error::Result; +use datafusion_common::{internal_err, Result}; use datafusion_execution::RecordBatchStream; use datafusion_physical_plan::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory, Time, @@ -64,29 +64,115 @@ pub struct FileStream { on_error: OnError, } -impl FileStream { - /// Create a new `FileStream` using the give `FileOpener` to scan underlying files - pub fn new( - config: &FileScanConfig, - partition: usize, - file_opener: Arc, - metrics: &ExecutionPlanMetricsSet, - ) -> Result { - let projected_schema = config.projected_schema()?; +/// Builder for constructing a [`FileStream`]. +pub struct FileStreamBuilder<'a> { + config: &'a FileScanConfig, + partition: Option, + file_opener: Option>, + metrics: Option<&'a ExecutionPlanMetricsSet>, + on_error: OnError, +} - let file_group = config.file_groups[partition].clone(); +impl<'a> FileStreamBuilder<'a> { + /// Create a new builder. + pub fn new(config: &'a FileScanConfig) -> Self { + Self { + config, + partition: None, + file_opener: None, + metrics: None, + on_error: OnError::Fail, + } + } - Ok(Self { + /// Configure the partition to scan. + pub fn with_partition(mut self, partition: usize) -> Self { + self.partition = Some(partition); + self + } + + /// Configure the [`FileOpener`] used to open files. + pub fn with_file_opener(mut self, file_opener: Arc) -> Self { + self.file_opener = Some(file_opener); + self + } + + /// Configure the metrics set used by the stream. + pub fn with_metrics( + mut self, + metrics: &'a ExecutionPlanMetricsSet, + ) -> Self { + self.metrics = Some(metrics); + self + } + + /// Configure the behavior when opening or scanning a file fails. + pub fn with_on_error(mut self, on_error: OnError) -> Self { + self.on_error = on_error; + self + } + + /// Build the configured [`FileStream`]. + pub fn build(self) -> Result { + let partition = match self.partition { + Some(partition) => partition, + None => { + return internal_err!( + "FileStreamBuilder missing required field: partition" + ); + } + }; + let file_opener = match self.file_opener { + Some(file_opener) => file_opener, + None => { + return internal_err!( + "FileStreamBuilder missing required field: file_opener" + ); + } + }; + let metrics = match self.metrics { + Some(metrics) => metrics, + None => { + return internal_err!("FileStreamBuilder missing required field: metrics"); + } + }; + let projected_schema = self.config.projected_schema()?; + let file_group = match self.config.file_groups.get(partition).cloned() { + Some(file_group) => file_group, + None => { + return internal_err!( + "FileStreamBuilder invalid partition index: {partition}" + ); + } + }; + + Ok(FileStream { file_iter: file_group.into_inner().into_iter().collect(), projected_schema, - remain: config.limit, + remain: self.config.limit, file_opener, state: FileStreamState::Idle, file_stream_metrics: FileStreamMetrics::new(metrics, partition), baseline_metrics: BaselineMetrics::new(metrics, partition), - on_error: OnError::Fail, + on_error: self.on_error, }) } +} + +impl FileStream { + /// Create a new `FileStream` using the give `FileOpener` to scan underlying files + pub fn new( + config: &FileScanConfig, + partition: usize, + file_opener: Arc, + metrics: &ExecutionPlanMetricsSet, + ) -> Result { + FileStreamBuilder::new(config) + .with_partition(partition) + .with_file_opener(file_opener) + .with_metrics(metrics) + .build() + } /// Specify the behavior when an error occurs opening or scanning a file /// @@ -410,7 +496,9 @@ mod tests { use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; - use crate::file_stream::{FileOpenFuture, FileOpener, FileStream, OnError}; + use crate::file_stream::{ + FileOpenFuture, FileOpener, FileStream, FileStreamBuilder, OnError, + }; use crate::test_util::MockSource; use arrow::array::RecordBatch; use arrow::datatypes::Schema; @@ -857,4 +945,71 @@ mod tests { Ok(()) } + + #[test] + fn builder_requires_partition_file_opener_and_metrics() { + let table_schema = crate::table_schema::TableSchema::new( + Arc::new(Schema::empty()), + vec![], + ); + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + Arc::new(MockSource::new(table_schema)), + ) + .with_file(PartitionedFile::new("mock_file", 10)) + .build(); + + let err = FileStreamBuilder::new(&config) + .build() + .err() + .unwrap(); + assert!(err + .to_string() + .contains("FileStreamBuilder missing required field: partition")); + + let err = FileStreamBuilder::new(&config) + .with_partition(0) + .build() + .err() + .unwrap(); + assert!(err + .to_string() + .contains("FileStreamBuilder missing required field: file_opener")); + + let err = FileStreamBuilder::new(&config) + .with_partition(0) + .with_file_opener(Arc::new(TestOpener::default())) + .build() + .err() + .unwrap(); + assert!(err + .to_string() + .contains("FileStreamBuilder missing required field: metrics")); + } + + #[test] + fn builder_errors_on_invalid_partition() { + let table_schema = crate::table_schema::TableSchema::new( + Arc::new(Schema::empty()), + vec![], + ); + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + Arc::new(MockSource::new(table_schema)), + ) + .with_file(PartitionedFile::new("mock_file", 10)) + .build(); + let metrics = ExecutionPlanMetricsSet::new(); + + let err = FileStreamBuilder::new(&config) + .with_partition(1) + .with_file_opener(Arc::new(TestOpener::default())) + .with_metrics(&metrics) + .build() + .err() + .unwrap(); + assert!(err + .to_string() + .contains("FileStreamBuilder invalid partition index: 1")); + } } From ca8e28970e89f6d622d91394ee0b6179a7bbdae6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 30 Mar 2026 15:33:38 -0400 Subject: [PATCH 2/7] Deprecate --- .../custom_data_source/csv_json_opener.rs | 24 ++++--- .../core/src/datasource/physical_plan/mod.rs | 2 +- datafusion/datasource/src/file_scan_config.rs | 3 +- datafusion/datasource/src/file_stream.rs | 72 +++++++++---------- 4 files changed, 52 insertions(+), 49 deletions(-) diff --git a/datafusion-examples/examples/custom_data_source/csv_json_opener.rs b/datafusion-examples/examples/custom_data_source/csv_json_opener.rs index 35f36ea8bc0ce..51c0e2167053e 100644 --- a/datafusion-examples/examples/custom_data_source/csv_json_opener.rs +++ b/datafusion-examples/examples/custom_data_source/csv_json_opener.rs @@ -27,7 +27,9 @@ use datafusion::{ file_format::file_compression_type::FileCompressionType, listing::PartitionedFile, object_store::ObjectStoreUrl, - physical_plan::{CsvSource, FileSource, FileStream, JsonOpener, JsonSource}, + physical_plan::{ + CsvSource, FileSource, FileStreamBuilder, JsonOpener, JsonSource, + }, }, error::Result, physical_plan::metrics::ExecutionPlanMetricsSet, @@ -80,8 +82,12 @@ async fn csv_opener() -> Result<()> { .create_file_opener(object_store, &scan_config, 0)?; let mut result = vec![]; - let mut stream = - FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())?; + let metrics = ExecutionPlanMetricsSet::new(); + let mut stream = FileStreamBuilder::new(&scan_config) + .with_partition(0) + .with_file_opener(opener) + .with_metrics(&metrics) + .build()?; while let Some(batch) = stream.next().await.transpose()? { result.push(batch); } @@ -137,12 +143,12 @@ async fn json_opener() -> Result<()> { .with_file(PartitionedFile::new(path.to_string(), 10)) .build(); - let mut stream = FileStream::new( - &scan_config, - 0, - Arc::new(opener), - &ExecutionPlanMetricsSet::new(), - )?; + let metrics = ExecutionPlanMetricsSet::new(); + let mut stream = FileStreamBuilder::new(&scan_config) + .with_partition(0) + .with_file_opener(Arc::new(opener)) + .with_metrics(&metrics) + .build()?; let mut result = vec![]; while let Some(batch) = stream.next().await.transpose()? { result.push(batch); diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 04c8ea129d05c..8e4855afa66bb 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -49,5 +49,5 @@ pub use datafusion_datasource::file_scan_config::{ pub use datafusion_datasource::file_sink_config::*; pub use datafusion_datasource::file_stream::{ - FileOpenFuture, FileOpener, FileStream, OnError, + FileOpenFuture, FileOpener, FileStream, FileStreamBuilder, OnError, }; diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 8c77947fd3741..78da70402f93d 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -21,8 +21,7 @@ use crate::file_groups::FileGroup; use crate::{ PartitionedFile, display::FileGroupsDisplay, file::FileSource, - file_compression_type::FileCompressionType, - file_stream::FileStreamBuilder, + file_compression_type::FileCompressionType, file_stream::FileStreamBuilder, source::DataSource, statistics::MinMaxStatistics, }; use arrow::datatypes::FieldRef; diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index 03f2be90a8b5f..ed18fc41711ea 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -29,7 +29,7 @@ use std::task::{Context, Poll}; use crate::PartitionedFile; use crate::file_scan_config::FileScanConfig; use arrow::datatypes::SchemaRef; -use datafusion_common::{internal_err, Result}; +use datafusion_common::{Result, internal_err}; use datafusion_execution::RecordBatchStream; use datafusion_physical_plan::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory, Time, @@ -98,10 +98,7 @@ impl<'a> FileStreamBuilder<'a> { } /// Configure the metrics set used by the stream. - pub fn with_metrics( - mut self, - metrics: &'a ExecutionPlanMetricsSet, - ) -> Self { + pub fn with_metrics(mut self, metrics: &'a ExecutionPlanMetricsSet) -> Self { self.metrics = Some(metrics); self } @@ -133,7 +130,9 @@ impl<'a> FileStreamBuilder<'a> { let metrics = match self.metrics { Some(metrics) => metrics, None => { - return internal_err!("FileStreamBuilder missing required field: metrics"); + return internal_err!( + "FileStreamBuilder missing required field: metrics" + ); } }; let projected_schema = self.config.projected_schema()?; @@ -161,6 +160,7 @@ impl<'a> FileStreamBuilder<'a> { impl FileStream { /// Create a new `FileStream` using the give `FileOpener` to scan underlying files + #[deprecated(since = "54.0.0", note = "Use FileStreamBuilder instead")] pub fn new( config: &FileScanConfig, partition: usize, @@ -496,9 +496,7 @@ mod tests { use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; - use crate::file_stream::{ - FileOpenFuture, FileOpener, FileStream, FileStreamBuilder, OnError, - }; + use crate::file_stream::{FileOpenFuture, FileOpener, FileStreamBuilder, OnError}; use crate::test_util::MockSource; use arrow::array::RecordBatch; use arrow::datatypes::Schema; @@ -627,10 +625,13 @@ mod tests { .with_limit(self.limit) .build(); let metrics_set = ExecutionPlanMetricsSet::new(); - let file_stream = - FileStream::new(&config, 0, Arc::new(self.opener), &metrics_set) - .unwrap() - .with_on_error(on_error); + let file_stream = FileStreamBuilder::new(&config) + .with_partition(0) + .with_file_opener(Arc::new(self.opener)) + .with_metrics(&metrics_set) + .with_on_error(on_error) + .build() + .unwrap(); file_stream .collect::>() @@ -948,10 +949,8 @@ mod tests { #[test] fn builder_requires_partition_file_opener_and_metrics() { - let table_schema = crate::table_schema::TableSchema::new( - Arc::new(Schema::empty()), - vec![], - ); + let table_schema = + crate::table_schema::TableSchema::new(Arc::new(Schema::empty()), vec![]); let config = FileScanConfigBuilder::new( ObjectStoreUrl::parse("test:///").unwrap(), Arc::new(MockSource::new(table_schema)), @@ -959,22 +958,21 @@ mod tests { .with_file(PartitionedFile::new("mock_file", 10)) .build(); - let err = FileStreamBuilder::new(&config) - .build() - .err() - .unwrap(); - assert!(err - .to_string() - .contains("FileStreamBuilder missing required field: partition")); + let err = FileStreamBuilder::new(&config).build().err().unwrap(); + assert!( + err.to_string() + .contains("FileStreamBuilder missing required field: partition") + ); let err = FileStreamBuilder::new(&config) .with_partition(0) .build() .err() .unwrap(); - assert!(err - .to_string() - .contains("FileStreamBuilder missing required field: file_opener")); + assert!( + err.to_string() + .contains("FileStreamBuilder missing required field: file_opener") + ); let err = FileStreamBuilder::new(&config) .with_partition(0) @@ -982,17 +980,16 @@ mod tests { .build() .err() .unwrap(); - assert!(err - .to_string() - .contains("FileStreamBuilder missing required field: metrics")); + assert!( + err.to_string() + .contains("FileStreamBuilder missing required field: metrics") + ); } #[test] fn builder_errors_on_invalid_partition() { - let table_schema = crate::table_schema::TableSchema::new( - Arc::new(Schema::empty()), - vec![], - ); + let table_schema = + crate::table_schema::TableSchema::new(Arc::new(Schema::empty()), vec![]); let config = FileScanConfigBuilder::new( ObjectStoreUrl::parse("test:///").unwrap(), Arc::new(MockSource::new(table_schema)), @@ -1008,8 +1005,9 @@ mod tests { .build() .err() .unwrap(); - assert!(err - .to_string() - .contains("FileStreamBuilder invalid partition index: 1")); + assert!( + err.to_string() + .contains("FileStreamBuilder invalid partition index: 1") + ); } } From f96f85fa90b57c2b4066555c3cf4cfb0fac7d0e1 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 30 Mar 2026 15:37:36 -0400 Subject: [PATCH 3/7] modularize --- .../datasource/src/file_stream/builder.rs | 118 ++++++++++++++++++ .../{file_stream.rs => file_stream/mod.rs} | 100 +-------------- 2 files changed, 123 insertions(+), 95 deletions(-) create mode 100644 datafusion/datasource/src/file_stream/builder.rs rename datafusion/datasource/src/{file_stream.rs => file_stream/mod.rs} (91%) diff --git a/datafusion/datasource/src/file_stream/builder.rs b/datafusion/datasource/src/file_stream/builder.rs new file mode 100644 index 0000000000000..43b27f8303e55 --- /dev/null +++ b/datafusion/datasource/src/file_stream/builder.rs @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::file_scan_config::FileScanConfig; +use datafusion_common::{Result, internal_err}; +use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; + +use super::{FileOpener, FileStream, FileStreamMetrics, FileStreamState, OnError}; + +/// Builder for constructing a [`FileStream`]. +pub struct FileStreamBuilder<'a> { + config: &'a FileScanConfig, + partition: Option, + file_opener: Option>, + metrics: Option<&'a ExecutionPlanMetricsSet>, + on_error: OnError, +} + +impl<'a> FileStreamBuilder<'a> { + /// Create a new builder. + pub fn new(config: &'a FileScanConfig) -> Self { + Self { + config, + partition: None, + file_opener: None, + metrics: None, + on_error: OnError::Fail, + } + } + + /// Configure the partition to scan. + pub fn with_partition(mut self, partition: usize) -> Self { + self.partition = Some(partition); + self + } + + /// Configure the [`FileOpener`] used to open files. + pub fn with_file_opener(mut self, file_opener: Arc) -> Self { + self.file_opener = Some(file_opener); + self + } + + /// Configure the metrics set used by the stream. + pub fn with_metrics(mut self, metrics: &'a ExecutionPlanMetricsSet) -> Self { + self.metrics = Some(metrics); + self + } + + /// Configure the behavior when opening or scanning a file fails. + pub fn with_on_error(mut self, on_error: OnError) -> Self { + self.on_error = on_error; + self + } + + /// Build the configured [`FileStream`]. + pub fn build(self) -> Result { + let partition = match self.partition { + Some(partition) => partition, + None => { + return internal_err!( + "FileStreamBuilder missing required field: partition" + ); + } + }; + let file_opener = match self.file_opener { + Some(file_opener) => file_opener, + None => { + return internal_err!( + "FileStreamBuilder missing required field: file_opener" + ); + } + }; + let metrics = match self.metrics { + Some(metrics) => metrics, + None => { + return internal_err!( + "FileStreamBuilder missing required field: metrics" + ); + } + }; + let projected_schema = self.config.projected_schema()?; + let file_group = match self.config.file_groups.get(partition).cloned() { + Some(file_group) => file_group, + None => { + return internal_err!( + "FileStreamBuilder invalid partition index: {partition}" + ); + } + }; + + Ok(FileStream { + file_iter: file_group.into_inner().into_iter().collect(), + projected_schema, + remain: self.config.limit, + file_opener, + state: FileStreamState::Idle, + file_stream_metrics: FileStreamMetrics::new(metrics, partition), + baseline_metrics: BaselineMetrics::new(metrics, partition), + on_error: self.on_error, + }) + } +} diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream/mod.rs similarity index 91% rename from datafusion/datasource/src/file_stream.rs rename to datafusion/datasource/src/file_stream/mod.rs index ed18fc41711ea..2879cfa6cc6d0 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream/mod.rs @@ -21,6 +21,8 @@ //! Note: Most traits here need to be marked `Sync + Send` to be //! compliant with the `SendableRecordBatchStream` trait. +mod builder; + use std::collections::VecDeque; use std::pin::Pin; use std::sync::Arc; @@ -29,7 +31,7 @@ use std::task::{Context, Poll}; use crate::PartitionedFile; use crate::file_scan_config::FileScanConfig; use arrow::datatypes::SchemaRef; -use datafusion_common::{Result, internal_err}; +use datafusion_common::Result; use datafusion_execution::RecordBatchStream; use datafusion_physical_plan::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory, Time, @@ -42,6 +44,8 @@ use futures::future::BoxFuture; use futures::stream::BoxStream; use futures::{FutureExt as _, Stream, StreamExt as _, ready}; +pub use builder::FileStreamBuilder; + /// A stream that iterates record batch by record batch, file over file. pub struct FileStream { /// An iterator over input files. @@ -64,100 +68,6 @@ pub struct FileStream { on_error: OnError, } -/// Builder for constructing a [`FileStream`]. -pub struct FileStreamBuilder<'a> { - config: &'a FileScanConfig, - partition: Option, - file_opener: Option>, - metrics: Option<&'a ExecutionPlanMetricsSet>, - on_error: OnError, -} - -impl<'a> FileStreamBuilder<'a> { - /// Create a new builder. - pub fn new(config: &'a FileScanConfig) -> Self { - Self { - config, - partition: None, - file_opener: None, - metrics: None, - on_error: OnError::Fail, - } - } - - /// Configure the partition to scan. - pub fn with_partition(mut self, partition: usize) -> Self { - self.partition = Some(partition); - self - } - - /// Configure the [`FileOpener`] used to open files. - pub fn with_file_opener(mut self, file_opener: Arc) -> Self { - self.file_opener = Some(file_opener); - self - } - - /// Configure the metrics set used by the stream. - pub fn with_metrics(mut self, metrics: &'a ExecutionPlanMetricsSet) -> Self { - self.metrics = Some(metrics); - self - } - - /// Configure the behavior when opening or scanning a file fails. - pub fn with_on_error(mut self, on_error: OnError) -> Self { - self.on_error = on_error; - self - } - - /// Build the configured [`FileStream`]. - pub fn build(self) -> Result { - let partition = match self.partition { - Some(partition) => partition, - None => { - return internal_err!( - "FileStreamBuilder missing required field: partition" - ); - } - }; - let file_opener = match self.file_opener { - Some(file_opener) => file_opener, - None => { - return internal_err!( - "FileStreamBuilder missing required field: file_opener" - ); - } - }; - let metrics = match self.metrics { - Some(metrics) => metrics, - None => { - return internal_err!( - "FileStreamBuilder missing required field: metrics" - ); - } - }; - let projected_schema = self.config.projected_schema()?; - let file_group = match self.config.file_groups.get(partition).cloned() { - Some(file_group) => file_group, - None => { - return internal_err!( - "FileStreamBuilder invalid partition index: {partition}" - ); - } - }; - - Ok(FileStream { - file_iter: file_group.into_inner().into_iter().collect(), - projected_schema, - remain: self.config.limit, - file_opener, - state: FileStreamState::Idle, - file_stream_metrics: FileStreamMetrics::new(metrics, partition), - baseline_metrics: BaselineMetrics::new(metrics, partition), - on_error: self.on_error, - }) - } -} - impl FileStream { /// Create a new `FileStream` using the give `FileOpener` to scan underlying files #[deprecated(since = "54.0.0", note = "Use FileStreamBuilder instead")] From 0bbaf3a7b168187423c08a84e6652f066ee2b6db Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 30 Mar 2026 15:43:24 -0400 Subject: [PATCH 4/7] Simplify --- .../datasource/src/file_stream/builder.rs | 43 ++++++++----------- 1 file changed, 19 insertions(+), 24 deletions(-) diff --git a/datafusion/datasource/src/file_stream/builder.rs b/datafusion/datasource/src/file_stream/builder.rs index 43b27f8303e55..89542bb276418 100644 --- a/datafusion/datasource/src/file_stream/builder.rs +++ b/datafusion/datasource/src/file_stream/builder.rs @@ -70,32 +70,27 @@ impl<'a> FileStreamBuilder<'a> { /// Build the configured [`FileStream`]. pub fn build(self) -> Result { - let partition = match self.partition { - Some(partition) => partition, - None => { - return internal_err!( - "FileStreamBuilder missing required field: partition" - ); - } + let Self { + config, + partition, + file_opener, + metrics, + on_error, + } = self; + + let Some(partition) = partition else { + return internal_err!("FileStreamBuilder missing required field: partition"); }; - let file_opener = match self.file_opener { - Some(file_opener) => file_opener, - None => { - return internal_err!( - "FileStreamBuilder missing required field: file_opener" - ); - } + let Some(file_opener) = file_opener else { + return internal_err!( + "FileStreamBuilder missing required field: file_opener" + ); }; - let metrics = match self.metrics { - Some(metrics) => metrics, - None => { - return internal_err!( - "FileStreamBuilder missing required field: metrics" - ); - } + let Some(metrics) = metrics else { + return internal_err!("FileStreamBuilder missing required field: metrics"); }; - let projected_schema = self.config.projected_schema()?; - let file_group = match self.config.file_groups.get(partition).cloned() { + let projected_schema = config.projected_schema()?; + let file_group = match config.file_groups.get(partition).cloned() { Some(file_group) => file_group, None => { return internal_err!( @@ -112,7 +107,7 @@ impl<'a> FileStreamBuilder<'a> { state: FileStreamState::Idle, file_stream_metrics: FileStreamMetrics::new(metrics, partition), baseline_metrics: BaselineMetrics::new(metrics, partition), - on_error: self.on_error, + on_error, }) } } From 844b39ae79a02ea2bedb3aa9ec8058babb30aeb7 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 30 Mar 2026 15:49:19 -0400 Subject: [PATCH 5/7] simplify tests --- datafusion/datasource/src/file_stream/mod.rs | 92 ++++++++------------ 1 file changed, 38 insertions(+), 54 deletions(-) diff --git a/datafusion/datasource/src/file_stream/mod.rs b/datafusion/datasource/src/file_stream/mod.rs index 2879cfa6cc6d0..4ea0bab211320 100644 --- a/datafusion/datasource/src/file_stream/mod.rs +++ b/datafusion/datasource/src/file_stream/mod.rs @@ -396,9 +396,9 @@ impl FileStreamMetrics { #[cfg(test)] mod tests { - use crate::PartitionedFile; - use crate::file_scan_config::FileScanConfigBuilder; + use crate::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use crate::tests::make_partition; + use crate::{PartitionedFile, TableSchema}; use datafusion_common::error::Result; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -526,7 +526,7 @@ mod tests { let on_error = self.on_error; - let table_schema = crate::table_schema::TableSchema::new(file_schema, vec![]); + let table_schema = TableSchema::new(file_schema, vec![]); let config = FileScanConfigBuilder::new( ObjectStoreUrl::parse("test:///").unwrap(), Arc::new(MockSource::new(table_schema)), @@ -540,8 +540,7 @@ mod tests { .with_file_opener(Arc::new(self.opener)) .with_metrics(&metrics_set) .with_on_error(on_error) - .build() - .unwrap(); + .build()?; file_stream .collect::>() @@ -562,6 +561,23 @@ mod tests { .expect("error executing stream") } + /// Create the smallest valid file scan config for builder validation tests. + fn builder_test_config() -> FileScanConfig { + let table_schema = TableSchema::new(Arc::new(Schema::empty()), vec![]); + FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + Arc::new(MockSource::new(table_schema)), + ) + .with_file(PartitionedFile::new("mock_file", 10)) + .build() + } + + /// Convenience helper to keep builder error assertions focused on the + /// specific missing or invalid input under test. + fn builder_error(builder: FileStreamBuilder<'_>) -> String { + builder.build().err().unwrap().to_string() + } + #[tokio::test] async fn on_error_opening() -> Result<()> { let batches = FileStreamTest::new() @@ -859,65 +875,33 @@ mod tests { #[test] fn builder_requires_partition_file_opener_and_metrics() { - let table_schema = - crate::table_schema::TableSchema::new(Arc::new(Schema::empty()), vec![]); - let config = FileScanConfigBuilder::new( - ObjectStoreUrl::parse("test:///").unwrap(), - Arc::new(MockSource::new(table_schema)), - ) - .with_file(PartitionedFile::new("mock_file", 10)) - .build(); + let config = builder_test_config(); - let err = FileStreamBuilder::new(&config).build().err().unwrap(); - assert!( - err.to_string() - .contains("FileStreamBuilder missing required field: partition") - ); + let err = builder_error(FileStreamBuilder::new(&config)); + assert!(err.contains("FileStreamBuilder missing required field: partition")); - let err = FileStreamBuilder::new(&config) - .with_partition(0) - .build() - .err() - .unwrap(); - assert!( - err.to_string() - .contains("FileStreamBuilder missing required field: file_opener") - ); + let err = builder_error(FileStreamBuilder::new(&config).with_partition(0)); + assert!(err.contains("FileStreamBuilder missing required field: file_opener")); - let err = FileStreamBuilder::new(&config) - .with_partition(0) - .with_file_opener(Arc::new(TestOpener::default())) - .build() - .err() - .unwrap(); - assert!( - err.to_string() - .contains("FileStreamBuilder missing required field: metrics") + let err = builder_error( + FileStreamBuilder::new(&config) + .with_partition(0) + .with_file_opener(Arc::new(TestOpener::default())), ); + assert!(err.contains("FileStreamBuilder missing required field: metrics")); } #[test] fn builder_errors_on_invalid_partition() { - let table_schema = - crate::table_schema::TableSchema::new(Arc::new(Schema::empty()), vec![]); - let config = FileScanConfigBuilder::new( - ObjectStoreUrl::parse("test:///").unwrap(), - Arc::new(MockSource::new(table_schema)), - ) - .with_file(PartitionedFile::new("mock_file", 10)) - .build(); + let config = builder_test_config(); let metrics = ExecutionPlanMetricsSet::new(); - let err = FileStreamBuilder::new(&config) - .with_partition(1) - .with_file_opener(Arc::new(TestOpener::default())) - .with_metrics(&metrics) - .build() - .err() - .unwrap(); - assert!( - err.to_string() - .contains("FileStreamBuilder invalid partition index: 1") + let err = builder_error( + FileStreamBuilder::new(&config) + .with_partition(1) + .with_file_opener(Arc::new(TestOpener::default())) + .with_metrics(&metrics), ); + assert!(err.contains("FileStreamBuilder invalid partition index: 1")); } } From c14fd8f287b8658c23eeb5854bec299853b78a61 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 30 Mar 2026 15:50:50 -0400 Subject: [PATCH 6/7] moar simple --- .../datasource/src/file_stream/builder.rs | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/datafusion/datasource/src/file_stream/builder.rs b/datafusion/datasource/src/file_stream/builder.rs index 89542bb276418..67d7aaacbcd98 100644 --- a/datafusion/datasource/src/file_stream/builder.rs +++ b/datafusion/datasource/src/file_stream/builder.rs @@ -79,24 +79,19 @@ impl<'a> FileStreamBuilder<'a> { } = self; let Some(partition) = partition else { - return internal_err!("FileStreamBuilder missing required field: partition"); + return internal_err!("FileStreamBuilder missing required partition"); }; let Some(file_opener) = file_opener else { - return internal_err!( - "FileStreamBuilder missing required field: file_opener" - ); + return internal_err!("FileStreamBuilder missing required file_opener"); }; let Some(metrics) = metrics else { - return internal_err!("FileStreamBuilder missing required field: metrics"); + return internal_err!("FileStreamBuilder missing required metrics"); }; let projected_schema = config.projected_schema()?; - let file_group = match config.file_groups.get(partition).cloned() { - Some(file_group) => file_group, - None => { - return internal_err!( - "FileStreamBuilder invalid partition index: {partition}" - ); - } + let Some(file_group) = config.file_groups.get(partition).cloned() else { + return internal_err!( + "FileStreamBuilder invalid partition index: {partition}" + ); }; Ok(FileStream { From fed2f18a5e6cb7bee6c5ab1b254d6ff783a71945 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 30 Mar 2026 16:03:59 -0400 Subject: [PATCH 7/7] cleanup --- datafusion/datasource/src/file_stream/builder.rs | 2 +- datafusion/datasource/src/file_stream/mod.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/datasource/src/file_stream/builder.rs b/datafusion/datasource/src/file_stream/builder.rs index 67d7aaacbcd98..7f21ace92c46b 100644 --- a/datafusion/datasource/src/file_stream/builder.rs +++ b/datafusion/datasource/src/file_stream/builder.rs @@ -97,7 +97,7 @@ impl<'a> FileStreamBuilder<'a> { Ok(FileStream { file_iter: file_group.into_inner().into_iter().collect(), projected_schema, - remain: self.config.limit, + remain: config.limit, file_opener, state: FileStreamState::Idle, file_stream_metrics: FileStreamMetrics::new(metrics, partition), diff --git a/datafusion/datasource/src/file_stream/mod.rs b/datafusion/datasource/src/file_stream/mod.rs index 4ea0bab211320..a423552917408 100644 --- a/datafusion/datasource/src/file_stream/mod.rs +++ b/datafusion/datasource/src/file_stream/mod.rs @@ -878,17 +878,17 @@ mod tests { let config = builder_test_config(); let err = builder_error(FileStreamBuilder::new(&config)); - assert!(err.contains("FileStreamBuilder missing required field: partition")); + assert!(err.contains("FileStreamBuilder missing required partition")); let err = builder_error(FileStreamBuilder::new(&config).with_partition(0)); - assert!(err.contains("FileStreamBuilder missing required field: file_opener")); + assert!(err.contains("FileStreamBuilder missing required file_opener")); let err = builder_error( FileStreamBuilder::new(&config) .with_partition(0) .with_file_opener(Arc::new(TestOpener::default())), ); - assert!(err.contains("FileStreamBuilder missing required field: metrics")); + assert!(err.contains("FileStreamBuilder missing required metrics")); } #[test]