Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

257 changes: 96 additions & 161 deletions datafusion/datasource-parquet/src/opener.rs

Large diffs are not rendered by default.

83 changes: 46 additions & 37 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ use std::sync::Arc;

use crate::DefaultParquetFileReaderFactory;
use crate::ParquetFileReaderFactory;
use crate::opener::ParquetMorselizer;
use crate::opener::build_pruning_predicates;
use crate::opener::{ParquetMorselizer, ParquetOpener};
use crate::row_filter::can_expr_be_pushed_down_with_schemas;
use datafusion_common::config::ConfigOptions;
#[cfg(feature = "parquet_encryption")]
use datafusion_common::config::EncryptionFactoryOptions;
use datafusion_datasource::as_file_source;
use datafusion_datasource::file_stream::FileOpener;
use datafusion_datasource::morsel::Morselizer;

use arrow::datatypes::TimeUnit;
use datafusion_common::DataFusionError;
Expand Down Expand Up @@ -246,12 +247,12 @@ use parquet::encryption::decrypt::FileDecryptionProperties;
/// # Execution Overview
///
/// * Step 1: `DataSourceExec::execute` is called, returning a `FileStream`
/// configured to open parquet files with a `ParquetOpener`.
/// configured to morselize parquet files with a `ParquetMorselizer`.
///
/// * Step 2: When the stream is polled, the `ParquetOpener` is called to open
/// the file.
/// * Step 2: When the stream is polled, the `ParquetMorselizer` is called to
/// plan the file.
///
/// * Step 3: The `ParquetOpener` gets the [`ParquetMetaData`] (file metadata)
/// * Step 3: The `ParquetMorselizer` gets the [`ParquetMetaData`] (file metadata)
/// via [`ParquetFileReaderFactory`], creating a `ParquetAccessPlan` by
/// applying predicates to metadata. The plan and projections are used to
/// determine what pages must be read.
Expand Down Expand Up @@ -511,11 +512,22 @@ impl From<ParquetSource> for Arc<dyn FileSource> {

impl FileSource for ParquetSource {
fn create_file_opener(
&self,
_object_store: Arc<dyn ObjectStore>,
_base_config: &FileScanConfig,
_partition: usize,
) -> datafusion_common::Result<Arc<dyn FileOpener>> {
datafusion_common::internal_err!(
"ParquetSource::create_file_opener called but it supports the Morsel API, please use that instead"
)
}

fn create_morselizer(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
) -> datafusion_common::Result<Arc<dyn FileOpener>> {
) -> datafusion_common::Result<Box<dyn Morselizer>> {
let expr_adapter_factory = base_config
.expr_adapter_factory
.clone()
Expand All @@ -542,37 +554,34 @@ impl FileSource for ParquetSource {
.as_ref()
.map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap());

let opener = Arc::new(ParquetOpener {
morselizer: ParquetMorselizer {
partition_index: partition,
projection: self.projection.clone(),
batch_size: self
.batch_size
.expect("Batch size must set before creating ParquetOpener"),
limit: base_config.limit,
preserve_order: base_config.preserve_order,
predicate: self.predicate.clone(),
table_schema: self.table_schema.clone(),
metadata_size_hint: self.metadata_size_hint,
metrics: self.metrics().clone(),
parquet_file_reader_factory,
pushdown_filters: self.pushdown_filters(),
reorder_filters: self.reorder_filters(),
force_filter_selections: self.force_filter_selections(),
enable_page_index: self.enable_page_index(),
enable_bloom_filter: self.bloom_filter_on_read(),
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
coerce_int96,
#[cfg(feature = "parquet_encryption")]
file_decryption_properties,
expr_adapter_factory,
#[cfg(feature = "parquet_encryption")]
encryption_factory: self.get_encryption_factory_with_config(),
max_predicate_cache_size: self.max_predicate_cache_size(),
reverse_row_groups: self.reverse_row_groups,
},
});
Ok(opener)
Ok(Box::new(ParquetMorselizer {
partition_index: partition,
projection: self.projection.clone(),
batch_size: self
.batch_size
.expect("Batch size must set before creating ParquetMorselizer"),
limit: base_config.limit,
preserve_order: base_config.preserve_order,
predicate: self.predicate.clone(),
table_schema: self.table_schema.clone(),
metadata_size_hint: self.metadata_size_hint,
metrics: self.metrics().clone(),
parquet_file_reader_factory,
pushdown_filters: self.pushdown_filters(),
reorder_filters: self.reorder_filters(),
force_filter_selections: self.force_filter_selections(),
enable_page_index: self.enable_page_index(),
enable_bloom_filter: self.bloom_filter_on_read(),
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
coerce_int96,
#[cfg(feature = "parquet_encryption")]
file_decryption_properties,
expr_adapter_factory,
#[cfg(feature = "parquet_encryption")]
encryption_factory: self.get_encryption_factory_with_config(),
max_predicate_cache_size: self.max_predicate_cache_size(),
reverse_row_groups: self.reverse_row_groups,
}))
}

fn as_any(&self) -> &dyn Any {
Expand Down
3 changes: 3 additions & 0 deletions datafusion/datasource/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ version.workspace = true
all-features = true

[features]
backtrace = ["datafusion-common/backtrace"]
compression = ["async-compression", "liblzma", "bzip2", "flate2", "zstd", "tokio-util"]
default = ["compression"]

Expand Down Expand Up @@ -63,6 +64,7 @@ itertools = { workspace = true }
liblzma = { workspace = true, optional = true }
log = { workspace = true }
object_store = { workspace = true }
parking_lot = { workspace = true }
rand = { workspace = true }
tempfile = { workspace = true, optional = true }
tokio = { workspace = true }
Expand All @@ -72,6 +74,7 @@ zstd = { workspace = true, optional = true }

[dev-dependencies]
criterion = { workspace = true }
insta = { workspace = true }
tempfile = { workspace = true }

# Note: add additional linter rules in lib.rs.
Expand Down
23 changes: 22 additions & 1 deletion datafusion/datasource/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::sync::Arc;
use crate::file_groups::FileGroupPartitioner;
use crate::file_scan_config::FileScanConfig;
use crate::file_stream::FileOpener;
use crate::morsel::{FileOpenerMorselizer, Morselizer};
#[expect(deprecated)]
use crate::schema_adapter::SchemaAdapterFactory;
use datafusion_common::config::ConfigOptions;
Expand Down Expand Up @@ -63,13 +64,33 @@ pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn FileSource>
///
/// [`DataSource`]: crate::source::DataSource
pub trait FileSource: Send + Sync {
/// Creates a `dyn FileOpener` based on given parameters
/// Creates a `dyn FileOpener` based on given parameters.
///
/// Note: File sources with a native morsel implementation should return an
/// error from this method and implementing [`Self::create_morselizer`] instead.
fn create_file_opener(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
) -> Result<Arc<dyn FileOpener>>;

/// Creates a `dyn Morselizer` based on given parameters.
///
/// The default implementation preserves existing behavior by adapting the
/// legacy [`FileOpener`] API into a [`Morselizer`].
///
/// It is preferred to implement the [`Morselizer`] API directly by
/// implementing this method.
fn create_morselizer(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
) -> Result<Box<dyn Morselizer>> {
let opener = self.create_file_opener(object_store, base_config, partition)?;
Ok(Box::new(FileOpenerMorselizer::new(opener)))
}
/// Any
fn as_any(&self) -> &dyn Any;

Expand Down
21 changes: 17 additions & 4 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use crate::file_groups::FileGroup;
use crate::{
PartitionedFile, display::FileGroupsDisplay, file::FileSource,
file_compression_type::FileCompressionType, file_stream::FileStreamBuilder,
source::DataSource, statistics::MinMaxStatistics,
file_stream::work_source::SharedWorkSource, source::DataSource,
statistics::MinMaxStatistics,
};
use arrow::datatypes::FieldRef;
use arrow::datatypes::{DataType, Schema, SchemaRef};
Expand Down Expand Up @@ -55,7 +56,13 @@ use datafusion_physical_plan::{
metrics::ExecutionPlanMetricsSet,
};
use log::{debug, warn};
use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc};
use std::{
any::Any,
fmt::Debug,
fmt::Formatter,
fmt::Result as FmtResult,
sync::{Arc, OnceLock},
};

/// [`FileScanConfig`] represents scanning data from a group of files
///
Expand Down Expand Up @@ -209,6 +216,11 @@ pub struct FileScanConfig {
/// If the number of file partitions > target_partitions, the file partitions will be grouped
/// in a round-robin fashion such that number of file partitions = target_partitions.
pub partitioned_by_file_group: bool,
/// Shared queue of unopened files for sibling streams in this scan.
///
/// This is initialized once per `FileScanConfig` and reused by reorderable
/// `FileStream`s created from that config.
pub(crate) shared_work_source: Arc<OnceLock<SharedWorkSource>>,
}

/// A builder for [`FileScanConfig`]'s.
Expand Down Expand Up @@ -551,6 +563,7 @@ impl FileScanConfigBuilder {
expr_adapter_factory: expr_adapter,
statistics,
partitioned_by_file_group,
shared_work_source: Arc::new(OnceLock::new()),
}
}
}
Expand Down Expand Up @@ -587,11 +600,11 @@ impl DataSource for FileScanConfig {

let source = self.file_source.with_batch_size(batch_size);

let opener = source.create_file_opener(object_store, self, partition)?;
let morselizer = source.create_morselizer(object_store, self, partition)?;

let stream = FileStreamBuilder::new(self)
.with_partition(partition)
.with_file_opener(opener)
.with_morselizer(morselizer)
.with_metrics(source.metrics())
.build()?;
Ok(Box::pin(cooperative(stream)))
Expand Down
76 changes: 63 additions & 13 deletions datafusion/datasource/src/file_stream/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,56 @@
use std::sync::Arc;

use crate::file_scan_config::FileScanConfig;
use crate::file_stream::scan_state::ScanState;
use crate::file_stream::work_source::{SharedWorkSource, WorkSource};
use crate::morsel::{FileOpenerMorselizer, Morselizer};
use datafusion_common::{Result, internal_err};
use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};

use super::metrics::FileStreamMetrics;
use super::{FileOpener, FileStream, FileStreamState, OnError};

/// Whether this stream may reorder work across sibling `FileStream`s.
///
/// This is derived entirely from [`FileScanConfig`]. Streams that must
/// preserve file order or file-group partition boundaries are not reorderable.
enum Reorderable {
/// This stream may reorder work using a shared queue of unopened files.
Yes(SharedWorkSource),
/// This stream must keep its own local file order.
No,
}

/// Builder for constructing a [`FileStream`].
pub struct FileStreamBuilder<'a> {
config: &'a FileScanConfig,
partition: Option<usize>,
file_opener: Option<Arc<dyn FileOpener>>,
morselizer: Option<Box<dyn Morselizer>>,
metrics: Option<&'a ExecutionPlanMetricsSet>,
on_error: OnError,
reorderable: Reorderable,
}

impl<'a> FileStreamBuilder<'a> {
/// Create a new builder.
/// Create a new builder for [`FileStream`].
pub fn new(config: &'a FileScanConfig) -> Self {
let reorderable = if config.preserve_order || config.partitioned_by_file_group {
Reorderable::No
} else {
let shared_work_source = config
.shared_work_source
.get_or_init(SharedWorkSource::new)
.clone();
Reorderable::Yes(shared_work_source)
};

Self {
config,
partition: None,
file_opener: None,
morselizer: None,
metrics: None,
on_error: OnError::Fail,
reorderable,
}
}

Expand All @@ -52,8 +78,18 @@ impl<'a> FileStreamBuilder<'a> {
}

/// Configure the [`FileOpener`] used to open files.
///
/// This will overwrite any setting from [`Self::with_morselizer`]
pub fn with_file_opener(mut self, file_opener: Arc<dyn FileOpener>) -> Self {
self.file_opener = Some(file_opener);
self.morselizer = Some(Box::new(FileOpenerMorselizer::new(file_opener)));
self
}

/// Configure the [`Morselizer`] used to open files.
///
/// This will overwrite any setting from [`Self::with_file_opener`]
pub fn with_morselizer(mut self, morselizer: Box<dyn Morselizer>) -> Self {
self.morselizer = Some(morselizer);
self
}

Expand All @@ -74,16 +110,17 @@ impl<'a> FileStreamBuilder<'a> {
let Self {
config,
partition,
file_opener,
morselizer,
metrics,
on_error,
reorderable,
} = self;

let Some(partition) = partition else {
return internal_err!("FileStreamBuilder missing required partition");
};
let Some(file_opener) = file_opener else {
return internal_err!("FileStreamBuilder missing required file_opener");
let Some(morselizer) = morselizer else {
return internal_err!("FileStreamBuilder missing required morselizer");
};
let Some(metrics) = metrics else {
return internal_err!("FileStreamBuilder missing required metrics");
Expand All @@ -94,16 +131,29 @@ impl<'a> FileStreamBuilder<'a> {
"FileStreamBuilder invalid partition index: {partition}"
);
};
let files = file_group.into_inner();
let work_source = match reorderable {
Reorderable::Yes(shared) => {
shared.register_stream();
shared.push_files(files);
WorkSource::Shared(shared)
}
Reorderable::No => WorkSource::Local(files.into()),
};

let file_stream_metrics = FileStreamMetrics::new(metrics, partition);
let scan_state = Box::new(ScanState::new(
work_source,
config.limit,
morselizer,
on_error,
file_stream_metrics,
));

Ok(FileStream {
file_iter: file_group.into_inner().into_iter().collect(),
projected_schema,
remain: config.limit,
file_opener,
state: FileStreamState::Idle,
file_stream_metrics: FileStreamMetrics::new(metrics, partition),
state: FileStreamState::Scan { scan_state },
baseline_metrics: BaselineMetrics::new(metrics, partition),
on_error,
})
}
}
Loading
Loading