Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions datafusion-examples/examples/custom_data_source/csv_json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use datafusion::{
file_format::file_compression_type::FileCompressionType,
listing::PartitionedFile,
object_store::ObjectStoreUrl,
physical_plan::{CsvSource, FileSource, FileStream, JsonOpener, JsonSource},
physical_plan::{
CsvSource, FileSource, FileStreamBuilder, JsonOpener, JsonSource,
},
},
error::Result,
physical_plan::metrics::ExecutionPlanMetricsSet,
Expand Down Expand Up @@ -80,8 +82,12 @@ async fn csv_opener() -> Result<()> {
.create_file_opener(object_store, &scan_config, 0)?;

let mut result = vec![];
let mut stream =
FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())?;
let metrics = ExecutionPlanMetricsSet::new();
let mut stream = FileStreamBuilder::new(&scan_config)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this illustrates the new API pretty well -- instead of a pile of arguments to FileStream::new there is now FileStreamBuilder

.with_partition(0)
.with_file_opener(opener)
.with_metrics(&metrics)
.build()?;
while let Some(batch) = stream.next().await.transpose()? {
result.push(batch);
}
Expand Down Expand Up @@ -137,12 +143,12 @@ async fn json_opener() -> Result<()> {
.with_file(PartitionedFile::new(path.to_string(), 10))
.build();

let mut stream = FileStream::new(
&scan_config,
0,
Arc::new(opener),
&ExecutionPlanMetricsSet::new(),
)?;
let metrics = ExecutionPlanMetricsSet::new();
let mut stream = FileStreamBuilder::new(&scan_config)
.with_partition(0)
.with_file_opener(Arc::new(opener))
.with_metrics(&metrics)
.build()?;
let mut result = vec![];
while let Some(batch) = stream.next().await.transpose()? {
result.push(batch);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,5 @@ pub use datafusion_datasource::file_scan_config::{
pub use datafusion_datasource::file_sink_config::*;

pub use datafusion_datasource::file_stream::{
FileOpenFuture, FileOpener, FileStream, OnError,
FileOpenFuture, FileOpener, FileStream, FileStreamBuilder, OnError,
};
8 changes: 6 additions & 2 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use crate::file_groups::FileGroup;
use crate::{
PartitionedFile, display::FileGroupsDisplay, file::FileSource,
file_compression_type::FileCompressionType, file_stream::FileStream,
file_compression_type::FileCompressionType, file_stream::FileStreamBuilder,
source::DataSource, statistics::MinMaxStatistics,
};
use arrow::datatypes::FieldRef;
Expand Down Expand Up @@ -588,7 +588,11 @@ impl DataSource for FileScanConfig {

let opener = source.create_file_opener(object_store, self, partition)?;

let stream = FileStream::new(self, partition, opener, source.metrics())?;
let stream = FileStreamBuilder::new(self)
.with_partition(partition)
.with_file_opener(opener)
.with_metrics(source.metrics())
.build()?;
Ok(Box::pin(cooperative(stream)))
}

Expand Down
108 changes: 108 additions & 0 deletions datafusion/datasource/src/file_stream/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use crate::file_scan_config::FileScanConfig;
use datafusion_common::{Result, internal_err};
use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};

use super::{FileOpener, FileStream, FileStreamMetrics, FileStreamState, OnError};

/// Builder for constructing a [`FileStream`].
pub struct FileStreamBuilder<'a> {
config: &'a FileScanConfig,
partition: Option<usize>,
file_opener: Option<Arc<dyn FileOpener>>,
metrics: Option<&'a ExecutionPlanMetricsSet>,
on_error: OnError,
}

impl<'a> FileStreamBuilder<'a> {
/// Create a new builder.
pub fn new(config: &'a FileScanConfig) -> Self {
Self {
config,
partition: None,
file_opener: None,
metrics: None,
on_error: OnError::Fail,
}
}

/// Configure the partition to scan.
pub fn with_partition(mut self, partition: usize) -> Self {
self.partition = Some(partition);
self
}

/// Configure the [`FileOpener`] used to open files.
pub fn with_file_opener(mut self, file_opener: Arc<dyn FileOpener>) -> Self {
self.file_opener = Some(file_opener);
self
}

/// Configure the metrics set used by the stream.
pub fn with_metrics(mut self, metrics: &'a ExecutionPlanMetricsSet) -> Self {
self.metrics = Some(metrics);
self
}

/// Configure the behavior when opening or scanning a file fails.
pub fn with_on_error(mut self, on_error: OnError) -> Self {
self.on_error = on_error;
self
}

/// Build the configured [`FileStream`].
pub fn build(self) -> Result<FileStream> {
let Self {
config,
partition,
file_opener,
metrics,
on_error,
} = self;

let Some(partition) = partition else {
return internal_err!("FileStreamBuilder missing required partition");
};
let Some(file_opener) = file_opener else {
return internal_err!("FileStreamBuilder missing required file_opener");
};
let Some(metrics) = metrics else {
return internal_err!("FileStreamBuilder missing required metrics");
};
let projected_schema = config.projected_schema()?;
let Some(file_group) = config.file_groups.get(partition).cloned() else {
return internal_err!(
"FileStreamBuilder invalid partition index: {partition}"
);
};

Ok(FileStream {
file_iter: file_group.into_inner().into_iter().collect(),
projected_schema,
remain: config.limit,
file_opener,
state: FileStreamState::Idle,
file_stream_metrics: FileStreamMetrics::new(metrics, partition),
baseline_metrics: BaselineMetrics::new(metrics, partition),
on_error,
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
//! Note: Most traits here need to be marked `Sync + Send` to be
//! compliant with the `SendableRecordBatchStream` trait.

mod builder;

use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
Expand All @@ -29,7 +31,7 @@ use std::task::{Context, Poll};
use crate::PartitionedFile;
use crate::file_scan_config::FileScanConfig;
use arrow::datatypes::SchemaRef;
use datafusion_common::error::Result;
use datafusion_common::Result;
use datafusion_execution::RecordBatchStream;
use datafusion_physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory, Time,
Expand All @@ -42,6 +44,8 @@ use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::{FutureExt as _, Stream, StreamExt as _, ready};

pub use builder::FileStreamBuilder;

/// A stream that iterates record batch by record batch, file over file.
pub struct FileStream {
/// An iterator over input files.
Expand All @@ -66,26 +70,18 @@ pub struct FileStream {

impl FileStream {
/// Create a new `FileStream` using the give `FileOpener` to scan underlying files
#[deprecated(since = "54.0.0", note = "Use FileStreamBuilder instead")]
pub fn new(
config: &FileScanConfig,
partition: usize,
file_opener: Arc<dyn FileOpener>,
metrics: &ExecutionPlanMetricsSet,
) -> Result<Self> {
let projected_schema = config.projected_schema()?;

let file_group = config.file_groups[partition].clone();

Ok(Self {
file_iter: file_group.into_inner().into_iter().collect(),
projected_schema,
remain: config.limit,
file_opener,
state: FileStreamState::Idle,
file_stream_metrics: FileStreamMetrics::new(metrics, partition),
baseline_metrics: BaselineMetrics::new(metrics, partition),
on_error: OnError::Fail,
})
FileStreamBuilder::new(config)
.with_partition(partition)
.with_file_opener(file_opener)
.with_metrics(metrics)
.build()
}

/// Specify the behavior when an error occurs opening or scanning a file
Expand Down Expand Up @@ -400,17 +396,17 @@ impl FileStreamMetrics {

#[cfg(test)]
mod tests {
use crate::PartitionedFile;
use crate::file_scan_config::FileScanConfigBuilder;
use crate::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use crate::tests::make_partition;
use crate::{PartitionedFile, TableSchema};
use datafusion_common::error::Result;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::{FutureExt as _, StreamExt as _};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};

use crate::file_stream::{FileOpenFuture, FileOpener, FileStream, OnError};
use crate::file_stream::{FileOpenFuture, FileOpener, FileStreamBuilder, OnError};
use crate::test_util::MockSource;
use arrow::array::RecordBatch;
use arrow::datatypes::Schema;
Expand Down Expand Up @@ -530,7 +526,7 @@ mod tests {

let on_error = self.on_error;

let table_schema = crate::table_schema::TableSchema::new(file_schema, vec![]);
let table_schema = TableSchema::new(file_schema, vec![]);
let config = FileScanConfigBuilder::new(
ObjectStoreUrl::parse("test:///").unwrap(),
Arc::new(MockSource::new(table_schema)),
Expand All @@ -539,10 +535,12 @@ mod tests {
.with_limit(self.limit)
.build();
let metrics_set = ExecutionPlanMetricsSet::new();
let file_stream =
FileStream::new(&config, 0, Arc::new(self.opener), &metrics_set)
.unwrap()
.with_on_error(on_error);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileStream was already starting to grow a builder interface as it gained features, so let's just make it directly

let file_stream = FileStreamBuilder::new(&config)
.with_partition(0)
.with_file_opener(Arc::new(self.opener))
.with_metrics(&metrics_set)
.with_on_error(on_error)
.build()?;

file_stream
.collect::<Vec<_>>()
Expand All @@ -563,6 +561,23 @@ mod tests {
.expect("error executing stream")
}

/// Create the smallest valid file scan config for builder validation tests.
fn builder_test_config() -> FileScanConfig {
let table_schema = TableSchema::new(Arc::new(Schema::empty()), vec![]);
FileScanConfigBuilder::new(
ObjectStoreUrl::parse("test:///").unwrap(),
Arc::new(MockSource::new(table_schema)),
)
.with_file(PartitionedFile::new("mock_file", 10))
.build()
}

/// Convenience helper to keep builder error assertions focused on the
/// specific missing or invalid input under test.
fn builder_error(builder: FileStreamBuilder<'_>) -> String {
builder.build().err().unwrap().to_string()
}

#[tokio::test]
async fn on_error_opening() -> Result<()> {
let batches = FileStreamTest::new()
Expand Down Expand Up @@ -857,4 +872,36 @@ mod tests {

Ok(())
}

#[test]
fn builder_requires_partition_file_opener_and_metrics() {
let config = builder_test_config();

let err = builder_error(FileStreamBuilder::new(&config));
assert!(err.contains("FileStreamBuilder missing required partition"));

let err = builder_error(FileStreamBuilder::new(&config).with_partition(0));
assert!(err.contains("FileStreamBuilder missing required file_opener"));

let err = builder_error(
FileStreamBuilder::new(&config)
.with_partition(0)
.with_file_opener(Arc::new(TestOpener::default())),
);
assert!(err.contains("FileStreamBuilder missing required metrics"));
}

#[test]
fn builder_errors_on_invalid_partition() {
let config = builder_test_config();
let metrics = ExecutionPlanMetricsSet::new();

let err = builder_error(
FileStreamBuilder::new(&config)
.with_partition(1)
.with_file_opener(Arc::new(TestOpener::default()))
.with_metrics(&metrics),
);
assert!(err.contains("FileStreamBuilder invalid partition index: 1"));
}
}
Loading