From 9d4457ca3ab44228a2631d7cf89bdd36dd2cb7ac Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 31 Mar 2026 11:31:49 -0400 Subject: [PATCH 1/6] Introduce Morselizer API --- datafusion/datasource-parquet/src/opener.rs | 406 +++++++++++++++----- datafusion/datasource-parquet/src/source.rs | 56 +-- datafusion/datasource/src/mod.rs | 1 + datafusion/datasource/src/morsel/mod.rs | 174 +++++++++ 4 files changed, 507 insertions(+), 130 deletions(-) create mode 100644 datafusion/datasource/src/morsel/mod.rs diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 6621706c35c8..dcecce51286c 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! [`ParquetOpener`] state machine for opening Parquet files +//! [`ParquetOpener`] and [`ParquetMorselizer`] state machines for opening Parquet files use crate::page_filter::PagePruningAccessPlanFilter; use crate::row_filter::build_projection_read_plan; @@ -26,15 +26,21 @@ use crate::{ }; use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; +use datafusion_common::internal_datafusion_err; +use datafusion_common::internal_err; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; +use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; use datafusion_physical_expr::projection::{ProjectionExprs, Projector}; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr_adapter::replace_columns_with_literals; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; +use std::fmt; use std::future::Future; use std::mem; use std::pin::Pin; use std::sync::Arc; +use std::sync::mpsc; +use std::sync::mpsc::{Receiver, TryRecvError}; use std::task::{Context, Poll}; use arrow::datatypes::{Schema, SchemaRef, TimeUnit}; @@ -77,12 +83,26 @@ use parquet::bloom_filter::Sbbf; use parquet::errors::ParquetError; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader}; -/// Entry point for opening a Parquet file +/// Implements [`FileOpener`] for Parquet +#[derive(Clone)] +pub(super) struct ParquetOpener { + pub(super) morselizer: ParquetMorselizer, +} + +impl FileOpener for ParquetOpener { + fn open(&self, partitioned_file: PartitionedFile) -> Result { + let future = ParquetOpenFuture::new(&self.morselizer, partitioned_file)?; + Ok(Box::pin(future)) + } +} + +/// Stateless Parquet morselizer implementation. /// /// Reading a Parquet file is a multi-stage process, with multiple CPU-intensive /// steps interspersed with I/O steps. The code in this module implements the steps /// as an explicit state machine -- see [`ParquetOpenState`] for details. -pub(super) struct ParquetOpener { +#[derive(Clone)] +pub(super) struct ParquetMorselizer { /// Execution partition index pub(crate) partition_index: usize, /// Projection to apply on top of the table schema (i.e. can reference partition columns). @@ -137,6 +157,23 @@ pub(super) struct ParquetOpener { pub reverse_row_groups: bool, } +impl fmt::Debug for ParquetMorselizer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ParquetMorselizer") + .field("partition_index", &self.partition_index) + .field("preserve_order", &self.preserve_order) + .field("enable_page_index", &self.enable_page_index) + .field("enable_bloom_filter", &self.enable_bloom_filter) + .finish() + } +} + +impl Morselizer for ParquetMorselizer { + fn plan_file(&self, file: PartitionedFile) -> Result> { + Ok(Box::new(ParquetMorselPlanner::try_new(self, file)?)) + } +} + /// States for [`ParquetOpenFuture`] /// /// These states correspond to the steps required to read and apply various @@ -216,6 +253,27 @@ enum ParquetOpenState { Done, } +impl fmt::Debug for ParquetOpenState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let state = match self { + ParquetOpenState::Start { .. } => "Start", + #[cfg(feature = "parquet_encryption")] + ParquetOpenState::LoadEncryption(_) => "LoadEncryption", + ParquetOpenState::PruneFile(_) => "PruneFile", + ParquetOpenState::LoadMetadata(_) => "LoadMetadata", + ParquetOpenState::PrepareFilters(_) => "PrepareFilters", + ParquetOpenState::LoadPageIndex(_) => "LoadPageIndex", + ParquetOpenState::PruneWithStatistics(_) => "PruneWithStatistics", + ParquetOpenState::LoadBloomFilters(_) => "LoadBloomFilters", + ParquetOpenState::PruneWithBloomFilters(_) => "PruneWithBloomFilters", + ParquetOpenState::BuildStream(_) => "BuildStream", + ParquetOpenState::Ready(_) => "Ready", + ParquetOpenState::Done => "Done", + }; + f.write_str(state) + } +} + struct PreparedParquetOpen { partition_index: usize, partitioned_file: PartitionedFile, @@ -290,37 +348,13 @@ struct BloomFiltersLoadedParquetOpen { row_group_bloom_filters: Vec, } -/// Implements state machine described in [`ParquetOpenState`] -struct ParquetOpenFuture { - state: ParquetOpenState, -} - -impl ParquetOpenFuture { - #[cfg(feature = "parquet_encryption")] - fn new(prepared: PreparedParquetOpen, encryption_context: EncryptionContext) -> Self { - Self { - state: ParquetOpenState::Start { - prepared: Box::new(prepared), - encryption_context: Arc::new(encryption_context), - }, - } - } - - #[cfg(not(feature = "parquet_encryption"))] - fn new(prepared: PreparedParquetOpen) -> Self { - Self { - state: ParquetOpenState::Start { - prepared: Box::new(prepared), - }, - } - } -} - impl ParquetOpenState { /// Applies one CPU-only state transition. /// /// `Load*` states do not transition here and are returned unchanged so the /// driver loop can poll their inner futures separately. + /// + /// Implements state machine described in [`ParquetOpenState`] fn transition(self) -> Result { match self { ParquetOpenState::Start { @@ -392,93 +426,258 @@ impl ParquetOpenState { } } +/// Adapter for a [`MorselPlanner`] to the [`FileOpener`] API +/// +/// Implements state machine described in [`ParquetOpenState`] +struct ParquetOpenFuture { + planner: Box, + pending_io: Option>>, + ready_morsels: VecDeque>, +} + +impl ParquetOpenFuture { + fn new( + morselizer: &ParquetMorselizer, + partitioned_file: PartitionedFile, + ) -> Result { + Ok(Self { + planner: morselizer.plan_file(partitioned_file)?, + pending_io: None, + ready_morsels: VecDeque::new(), + }) + } +} + impl Future for ParquetOpenFuture { type Output = Result>>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - let state = mem::replace(&mut self.state, ParquetOpenState::Done); - let mut state = state.transition()?; + // If waiting on IO, poll + if let Some(io_future) = self.pending_io.as_mut() { + ready!(io_future.poll_unpin(cx))?; + self.pending_io = None; + } + + // have a morsel ready to go, return that + if let Some(morsel) = self.ready_morsels.pop_front() { + return Poll::Ready(Ok(morsel.into_stream())); + } + + // Planner did not produce any stream (for example, it pruned the entire file) + let Some(mut plan) = self.planner.plan()? else { + return Poll::Ready(Ok(futures::stream::empty().boxed())); + }; + + let child_planners = plan.take_planners(); + if !child_planners.is_empty() { + return Poll::Ready(internal_err!( + "Parquet FileOpener adapter does not support child morsel planners" + )); + } + + self.ready_morsels = plan.take_morsels().into(); + + if let Some(io_future) = plan.take_io_future() { + self.pending_io = Some(io_future); + } + } + } +} + +/// Implements the Morsel API +struct ParquetStreamMorsel { + stream: BoxStream<'static, Result>, +} + +impl ParquetStreamMorsel { + fn new(stream: BoxStream<'static, Result>) -> Self { + Self { stream } + } +} + +impl fmt::Debug for ParquetStreamMorsel { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ParquetStreamMorsel") + .finish_non_exhaustive() + } +} + +impl Morsel for ParquetStreamMorsel { + fn into_stream(self: Box) -> BoxStream<'static, Result> { + self.stream + } +} + +/// Stateful planner for opening a single parquet file via the morsel APIs. +enum ParquetMorselPlanner { + /// Ready to perform CPU-only planning work. + Ready(ParquetOpenState), + /// Waiting for an I/O future to produce the next planner state. + /// + /// Callers must not call [`MorselPlanner::plan`] again until the + /// corresponding I/O future has completed and its result is ready to + /// receive from the channel. + /// + /// Doing so is a protocol violation and transitions the planner to + /// [`ParquetMorselPlanner::Errored`]. + Waiting(Receiver>), + /// Actively planning (this state should be replaced by end of the call to plan) + Planning, + /// An earlier planning attempt returned an error. + Errored, +} + +impl fmt::Debug for ParquetMorselPlanner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Ready(state) => f + .debug_tuple("ParquetMorselPlanner::Ready") + .field(state) + .finish(), + Self::Waiting(_) => f + .debug_tuple("ParquetMorselPlanner::Waiting") + .field(&"") + .finish(), + Self::Planning => f.debug_tuple("ParquetMorselPlanner::Planning").finish(), + Self::Errored => f.debug_tuple("ParquetMorselPlanner::Errored").finish(), + } + } +} + +impl ParquetMorselPlanner { + fn try_new(morselizer: &ParquetMorselizer, file: PartitionedFile) -> Result { + let prepared = morselizer.prepare_open_file(file)?; + #[cfg(feature = "parquet_encryption")] + let state = ParquetOpenState::Start { + prepared: Box::new(prepared), + encryption_context: Arc::new(morselizer.get_encryption_context()), + }; + #[cfg(not(feature = "parquet_encryption"))] + let state = ParquetOpenState::Start { + prepared: Box::new(prepared), + }; + Ok(Self::Ready(state)) + } + + /// Schedule an I/O future that resolves to the planner's next owned state. + /// + /// This helper + /// + /// 1. creates a channel to send the next [`ParquetOpenState`] back to the + /// planner once the I/O future completes, + /// + /// 2. transitions the planner into [`ParquetMorselPlanner::Waiting`] + /// + /// 3. returns a [`MorselPlan`] containing the boxed I/O future for the + /// caller to poll. + /// + fn schedule_io(&mut self, future: F) -> MorselPlan + where + F: Future> + Send + 'static, + { + let (output_for_future, output) = mpsc::channel(); + let io_future = async move { + let next_state = future.await?; + output_for_future.send(Ok(next_state)).map_err(|e| { + DataFusionError::Execution(format!("failed to send planner output: {e}")) + })?; + Ok(()) + } + .boxed(); + *self = ParquetMorselPlanner::Waiting(output); + MorselPlan::new().with_io_future(io_future) + } +} + +impl MorselPlanner for ParquetMorselPlanner { + fn plan(&mut self) -> Result> { + loop { + let planner = mem::replace(self, ParquetMorselPlanner::Planning); + let state = match planner { + ParquetMorselPlanner::Ready(state) => state, + ParquetMorselPlanner::Waiting(output) => { + output + .try_recv() + .map_err(|e| { + // IO wasn't done + *self = ParquetMorselPlanner::Errored; + match e { + TryRecvError::Empty => internal_datafusion_err!( + "planner polled before I/O completed" + ), + TryRecvError::Disconnected => internal_datafusion_err!( + "planner polled after I/O disconnected" + ), + } + })? + .inspect_err(|_| { + // IO completed successfully, but the IO was an error + *self = ParquetMorselPlanner::Errored; + })? + } + ParquetMorselPlanner::Planning => { + return internal_err!( + "ParquetMorselPlanner::plan was re-entered before previous plan completed" + ); + } + ParquetMorselPlanner::Errored => { + return internal_err!( + "ParquetMorselPlanner::plan called after a previous error" + ); + } + }; + // check for end of stream + if let ParquetOpenState::Done = state { + *self = ParquetMorselPlanner::Ready(ParquetOpenState::Done); + return Ok(None); + }; + + let state = state.transition().inspect_err(|_| { + *self = ParquetMorselPlanner::Errored; + })?; match state { #[cfg(feature = "parquet_encryption")] - ParquetOpenState::LoadEncryption(mut future) => { - state = match future.poll_unpin(cx) { - Poll::Ready(result) => ParquetOpenState::PruneFile(result?), - Poll::Pending => { - self.state = ParquetOpenState::LoadEncryption(future); - return Poll::Pending; - } - }; + ParquetOpenState::LoadEncryption(future) => { + return Ok(Some(self.schedule_io(async move { + Ok(ParquetOpenState::PruneFile(future.await?)) + }))); } - ParquetOpenState::LoadMetadata(mut future) => { - state = match future.poll_unpin(cx) { - Poll::Ready(result) => { - ParquetOpenState::PrepareFilters(Box::new(result?)) - } - Poll::Pending => { - self.state = ParquetOpenState::LoadMetadata(future); - return Poll::Pending; - } - }; + ParquetOpenState::LoadMetadata(future) => { + return Ok(Some(self.schedule_io(async move { + Ok(ParquetOpenState::PrepareFilters(Box::new(future.await?))) + }))); } - ParquetOpenState::LoadPageIndex(mut future) => { - state = match future.poll_unpin(cx) { - Poll::Ready(result) => { - ParquetOpenState::PruneWithStatistics(Box::new(result?)) - } - Poll::Pending => { - self.state = ParquetOpenState::LoadPageIndex(future); - return Poll::Pending; - } - }; + ParquetOpenState::LoadPageIndex(future) => { + return Ok(Some(self.schedule_io(async move { + Ok(ParquetOpenState::PruneWithStatistics(Box::new( + future.await?, + ))) + }))); } - ParquetOpenState::LoadBloomFilters(mut future) => { - state = match future.poll_unpin(cx) { - Poll::Ready(result) => { - ParquetOpenState::PruneWithBloomFilters(Box::new(result?)) - } - Poll::Pending => { - self.state = ParquetOpenState::LoadBloomFilters(future); - return Poll::Pending; - } - }; + ParquetOpenState::LoadBloomFilters(future) => { + return Ok(Some(self.schedule_io(async move { + Ok(ParquetOpenState::PruneWithBloomFilters(Box::new( + future.await?, + ))) + }))); } ParquetOpenState::Ready(stream) => { - return Poll::Ready(Ok(stream)); + let morsels: Vec> = + vec![Box::new(ParquetStreamMorsel::new(stream))]; + return Ok(Some(MorselPlan::new().with_morsels(morsels))); } - ParquetOpenState::Done => { - return Poll::Ready(Ok(futures::stream::empty().boxed())); + ParquetOpenState::Done => return Ok(None), + cpu_state => { + *self = ParquetMorselPlanner::Ready(cpu_state); } - - // For all other states, loop again and try to transition - // immediately. All states are explicitly listed here to ensure any - // new states are handled correctly - ParquetOpenState::Start { .. } => {} - ParquetOpenState::PruneFile(_) => {} - ParquetOpenState::PrepareFilters(_) => {} - ParquetOpenState::PruneWithStatistics(_) => {} - ParquetOpenState::PruneWithBloomFilters(_) => {} - ParquetOpenState::BuildStream(_) => {} - }; - - self.state = state; + } } } } -impl FileOpener for ParquetOpener { - fn open(&self, partitioned_file: PartitionedFile) -> Result { - let prepared = self.prepare_open_file(partitioned_file)?; - #[cfg(feature = "parquet_encryption")] - let future = ParquetOpenFuture::new(prepared, self.get_encryption_context()); - #[cfg(not(feature = "parquet_encryption"))] - let future = ParquetOpenFuture::new(prepared); - Ok(Box::pin(future)) - } -} - -impl ParquetOpener { +impl ParquetMorselizer { /// Perform the CPU-only setup for opening a parquet file. fn prepare_open_file( &self, @@ -1447,7 +1646,7 @@ impl EncryptionContext { } } -impl ParquetOpener { +impl ParquetMorselizer { #[cfg(feature = "parquet_encryption")] fn get_encryption_context(&self) -> EncryptionContext { EncryptionContext::new( @@ -1576,7 +1775,7 @@ fn should_enable_page_index( mod test { use std::sync::Arc; - use super::{ConstantColumns, constant_columns_from_stats}; + use super::{ConstantColumns, ParquetMorselizer, constant_columns_from_stats}; use crate::{DefaultParquetFileReaderFactory, RowGroupAccess, opener::ParquetOpener}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; @@ -1731,11 +1930,12 @@ mod test { ProjectionExprs::from_indices(&all_indices, &file_schema) }; - ParquetOpener { + let morselizer = ParquetMorselizer { partition_index: self.partition_index, projection, batch_size: self.batch_size, limit: self.limit, + preserve_order: self.preserve_order, predicate: self.predicate, table_schema, metadata_size_hint: self.metadata_size_hint, @@ -1757,8 +1957,8 @@ mod test { encryption_factory: None, max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, - preserve_order: self.preserve_order, - } + }; + ParquetOpener { morselizer } } } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 64a339009e9c..30c28507b0c9 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -23,8 +23,8 @@ use std::sync::Arc; use crate::DefaultParquetFileReaderFactory; use crate::ParquetFileReaderFactory; -use crate::opener::ParquetOpener; use crate::opener::build_pruning_predicates; +use crate::opener::{ParquetMorselizer, ParquetOpener}; use crate::row_filter::can_expr_be_pushed_down_with_schemas; use datafusion_common::config::ConfigOptions; #[cfg(feature = "parquet_encryption")] @@ -543,32 +543,34 @@ impl FileSource for ParquetSource { .map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap()); let opener = Arc::new(ParquetOpener { - partition_index: partition, - projection: self.projection.clone(), - batch_size: self - .batch_size - .expect("Batch size must set before creating ParquetOpener"), - limit: base_config.limit, - preserve_order: base_config.preserve_order, - predicate: self.predicate.clone(), - table_schema: self.table_schema.clone(), - metadata_size_hint: self.metadata_size_hint, - metrics: self.metrics().clone(), - parquet_file_reader_factory, - pushdown_filters: self.pushdown_filters(), - reorder_filters: self.reorder_filters(), - force_filter_selections: self.force_filter_selections(), - enable_page_index: self.enable_page_index(), - enable_bloom_filter: self.bloom_filter_on_read(), - enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, - coerce_int96, - #[cfg(feature = "parquet_encryption")] - file_decryption_properties, - expr_adapter_factory, - #[cfg(feature = "parquet_encryption")] - encryption_factory: self.get_encryption_factory_with_config(), - max_predicate_cache_size: self.max_predicate_cache_size(), - reverse_row_groups: self.reverse_row_groups, + morselizer: ParquetMorselizer { + partition_index: partition, + projection: self.projection.clone(), + batch_size: self + .batch_size + .expect("Batch size must set before creating ParquetOpener"), + limit: base_config.limit, + preserve_order: base_config.preserve_order, + predicate: self.predicate.clone(), + table_schema: self.table_schema.clone(), + metadata_size_hint: self.metadata_size_hint, + metrics: self.metrics().clone(), + parquet_file_reader_factory, + pushdown_filters: self.pushdown_filters(), + reorder_filters: self.reorder_filters(), + force_filter_selections: self.force_filter_selections(), + enable_page_index: self.enable_page_index(), + enable_bloom_filter: self.bloom_filter_on_read(), + enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, + coerce_int96, + #[cfg(feature = "parquet_encryption")] + file_decryption_properties, + expr_adapter_factory, + #[cfg(feature = "parquet_encryption")] + encryption_factory: self.get_encryption_factory_with_config(), + max_predicate_cache_size: self.max_predicate_cache_size(), + reverse_row_groups: self.reverse_row_groups, + }, }); Ok(opener) } diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index bcc4627050d4..a9600271c28c 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -38,6 +38,7 @@ pub mod file_scan_config; pub mod file_sink_config; pub mod file_stream; pub mod memory; +pub mod morsel; pub mod projection; pub mod schema_adapter; pub mod sink; diff --git a/datafusion/datasource/src/morsel/mod.rs b/datafusion/datasource/src/morsel/mod.rs new file mode 100644 index 000000000000..4150bc12b4b2 --- /dev/null +++ b/datafusion/datasource/src/morsel/mod.rs @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Structures for Morsel Driven IO. +//! +//! Morsel Driven IO is a technique for parallelizing the reading of large files +//! by dividing them into smaller "morsels" that are processed independently. +//! +//! It is inspired by the paper [Morsel-Driven Parallelism: A NUMA-Aware Query +//! Evaluation Framework for the Many-Core Age](https://db.in.tum.de/~leis/papers/morsels.pdf). + +use std::fmt::Debug; + +use crate::PartitionedFile; +use arrow::array::RecordBatch; +use datafusion_common::Result; +use futures::future::BoxFuture; +use futures::stream::BoxStream; + +/// A Morsel of work ready to resolve to a stream of [`RecordBatch`]es. +/// +/// This represents a single morsel of work that is ready to be processed. It +/// has all data necessary (does not need any I/O) and is ready to be turned +/// into a stream of [`RecordBatch`]es for processing by the execution engine. +pub trait Morsel: Send + Debug { + /// Consume this morsel and produce a stream of [`RecordBatch`]es for processing. + /// + /// This should not do any I/O work, such as reading from the file. + fn into_stream(self: Box) -> BoxStream<'static, Result>; +} + +/// A Morselizer takes a single [`PartitionedFile`] and creates the initial planner +/// for that file. +/// +/// This is the entry point for morsel driven I/O. +pub trait Morselizer: Send + Sync + Debug { + /// Return the initial [`MorselPlanner`] for this file. + /// + /// "Morselzing" a file may involve CPU work, such as parsing parquet + /// metadata and evaluating pruning predicates. It should NOT do any I/O + /// work, such as reading from the file. If I/O is required, it should + /// return a future that the caller can poll to drive the I/O work to + /// completion, and once the future is complete, the caller can call + /// `plan_file` again for a different file. + fn plan_file(&self, file: PartitionedFile) -> Result>; +} + +/// A Morsel Planner is responsible for creating morsels for a given scan. +/// +/// The [`MorselPlanner`] is the unit of I/O. There is only ever a single I/O +/// outstanding for a specific planner. DataFusion may run +/// multiple planners in parallel, which corresponds to multiple parallel +/// I/O requests. +/// +/// It is not a Rust `Stream` so that it can explicitly separate CPU bound +/// work from I/O work. +/// +/// The design is similar to `ParquetPushDecoder`: when `plan` is called, it +/// should do CPU work to produce the next morsels or discover the next I/O +/// phase. +/// +/// Best practice is to spawn I/O in a Tokio task on a separate runtime to +/// ensure that CPU work doesn't block or slow down I/O work, but this is not +/// strictly required by the API. +pub trait MorselPlanner: Send + Debug { + /// Attempt to plan morsels. This may involve CPU work, such as parsing + /// parquet metadata and evaluating pruning predicates. + /// + /// It should NOT do any I/O work, such as reading from the file. If I/O is + /// required, the returned [`MorselPlan`] should contain a future that the + /// caller polls to drive the I/O work to completion. Once the future is + /// complete, the caller can call `plan` again to get the next morsels. + /// + /// Note this function is **not async** to make it explicitly clear that if + /// I/O is required, it should be done in the returned `io_future`. + /// + /// Returns `None` if the planner has no more work to do. + /// + /// # Empty Morsel Plans + /// + /// It may return `None`, which means no batches will be read from the file + /// (e.g. due to late-pruning based on statistics). + /// + /// # Output Ordering + /// + /// See the comments on [`MorselPlan`] for the logical output order. + fn plan(&mut self) -> Result>; +} + +/// Return result of [`MorselPlanner::plan`]. +/// +/// # Logical Ordering +/// +/// For plans where the output order of rows is maintained, the output order of +/// a [`MorselPlanner`] is logically defined as follows: +/// 1. All morsels that are directly produced +/// 2. Recursively, all morsels produced by the returned `planners` +#[derive(Default)] +pub struct MorselPlan { + /// Any morsels that are ready for processing. + morsels: Vec>, + /// Any newly-created planners that are ready for CPU work. + planners: Vec>, + /// A future that will drive any I/O work to completion. + /// + /// DataFusion will poll this future occasionally to drive the I/O work to + /// completion. Once the future resolves, DataFusion will call `plan` again + /// to get the next morsels. + io_future: Option>>, +} + +impl MorselPlan { + /// Create an empty morsel plan. + pub fn new() -> Self { + Self::default() + } + + /// Set the ready morsels. + pub fn with_morsels(mut self, morsels: Vec>) -> Self { + self.morsels = morsels; + self + } + + /// Set the ready child planners. + pub fn with_planners(mut self, planners: Vec>) -> Self { + self.planners = planners; + self + } + + /// Set the pending I/O future. + pub fn with_io_future(mut self, io_future: BoxFuture<'static, Result<()>>) -> Self { + self.io_future = Some(io_future); + self + } + + /// Take the ready morsels. + pub fn take_morsels(&mut self) -> Vec> { + std::mem::take(&mut self.morsels) + } + + /// Take the ready child planners. + pub fn take_planners(&mut self) -> Vec> { + std::mem::take(&mut self.planners) + } + + /// Take the pending I/O future, if any. + pub fn take_io_future(&mut self) -> Option>> { + self.io_future.take() + } + + /// Set the pending I/O future. + pub fn set_io_future(&mut self, io_future: BoxFuture<'static, Result<()>>) { + self.io_future = Some(io_future); + } + + /// Returns `true` if this plan contains an I/O future. + pub fn has_io_future(&self) -> bool { + self.io_future.is_some() + } +} From 52bf6be8246357aad48705cada7cd4310178ebc8 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 6 Apr 2026 21:57:50 -0500 Subject: [PATCH 2/6] Prototype ownership-based morsel planner API --- datafusion/datasource-parquet/src/opener.rs | 223 ++++++++------------ datafusion/datasource/src/morsel/mod.rs | 185 +++++++++++++--- 2 files changed, 235 insertions(+), 173 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index dcecce51286c..0e3ecc1cdee9 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -26,10 +26,12 @@ use crate::{ }; use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; -use datafusion_common::internal_datafusion_err; use datafusion_common::internal_err; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; -use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; +use datafusion_datasource::morsel::{ + BlockedPlannerContinuation, Morsel, MorselPlan, MorselPlanner, Morselizer, + PlannerStep, +}; use datafusion_physical_expr::projection::{ProjectionExprs, Projector}; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr_adapter::replace_columns_with_literals; @@ -39,8 +41,6 @@ use std::future::Future; use std::mem; use std::pin::Pin; use std::sync::Arc; -use std::sync::mpsc; -use std::sync::mpsc::{Receiver, TryRecvError}; use std::task::{Context, Poll}; use arrow::datatypes::{Schema, SchemaRef, TimeUnit}; @@ -430,8 +430,8 @@ impl ParquetOpenState { /// /// Implements state machine described in [`ParquetOpenState`] struct ParquetOpenFuture { - planner: Box, - pending_io: Option>>, + planner: Option>, + blocked_continuation: Option, ready_morsels: VecDeque>, } @@ -441,8 +441,8 @@ impl ParquetOpenFuture { partitioned_file: PartitionedFile, ) -> Result { Ok(Self { - planner: morselizer.plan_file(partitioned_file)?, - pending_io: None, + planner: Some(morselizer.plan_file(partitioned_file)?), + blocked_continuation: None, ready_morsels: VecDeque::new(), }) } @@ -453,10 +453,12 @@ impl Future for ParquetOpenFuture { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - // If waiting on IO, poll - if let Some(io_future) = self.pending_io.as_mut() { - ready!(io_future.poll_unpin(cx))?; - self.pending_io = None; + // If waiting on IO, poll the blocked continuation until it yields + // the next CPU-ready planner. + if let Some(blocked_continuation) = self.blocked_continuation.as_mut() { + let planner = ready!(blocked_continuation.poll_unpin(cx))?; + self.blocked_continuation = None; + self.planner = Some(planner); } // have a morsel ready to go, return that @@ -464,22 +466,26 @@ impl Future for ParquetOpenFuture { return Poll::Ready(Ok(morsel.into_stream())); } - // Planner did not produce any stream (for example, it pruned the entire file) - let Some(mut plan) = self.planner.plan()? else { + let Some(planner) = self.planner.take() else { return Poll::Ready(Ok(futures::stream::empty().boxed())); }; - let child_planners = plan.take_planners(); - if !child_planners.is_empty() { - return Poll::Ready(internal_err!( - "Parquet FileOpener adapter does not support child morsel planners" - )); - } - - self.ready_morsels = plan.take_morsels().into(); + match planner.step()? { + PlannerStep::Ready(mut plan) => { + let child_planners = plan.take_planners(); + if !child_planners.is_empty() { + return Poll::Ready(internal_err!( + "Parquet FileOpener adapter does not support child morsel planners" + )); + } - if let Some(io_future) = plan.take_io_future() { - self.pending_io = Some(io_future); + self.ready_morsels = plan.take_morsels().into(); + self.planner = plan.take_ready_continuation(); + self.blocked_continuation = plan.take_blocked_continuation(); + } + PlannerStep::Done => { + return Poll::Ready(Ok(futures::stream::empty().boxed())); + } } } } @@ -510,38 +516,13 @@ impl Morsel for ParquetStreamMorsel { } /// Stateful planner for opening a single parquet file via the morsel APIs. -enum ParquetMorselPlanner { - /// Ready to perform CPU-only planning work. - Ready(ParquetOpenState), - /// Waiting for an I/O future to produce the next planner state. - /// - /// Callers must not call [`MorselPlanner::plan`] again until the - /// corresponding I/O future has completed and its result is ready to - /// receive from the channel. - /// - /// Doing so is a protocol violation and transitions the planner to - /// [`ParquetMorselPlanner::Errored`]. - Waiting(Receiver>), - /// Actively planning (this state should be replaced by end of the call to plan) - Planning, - /// An earlier planning attempt returned an error. - Errored, -} +struct ParquetMorselPlanner(ParquetOpenState); impl fmt::Debug for ParquetMorselPlanner { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Ready(state) => f - .debug_tuple("ParquetMorselPlanner::Ready") - .field(state) - .finish(), - Self::Waiting(_) => f - .debug_tuple("ParquetMorselPlanner::Waiting") - .field(&"") - .finish(), - Self::Planning => f.debug_tuple("ParquetMorselPlanner::Planning").finish(), - Self::Errored => f.debug_tuple("ParquetMorselPlanner::Errored").finish(), - } + f.debug_tuple("ParquetMorselPlanner") + .field(&self.0) + .finish() } } @@ -557,120 +538,84 @@ impl ParquetMorselPlanner { let state = ParquetOpenState::Start { prepared: Box::new(prepared), }; - Ok(Self::Ready(state)) + Ok(Self(state)) } /// Schedule an I/O future that resolves to the planner's next owned state. - /// - /// This helper - /// - /// 1. creates a channel to send the next [`ParquetOpenState`] back to the - /// planner once the I/O future completes, - /// - /// 2. transitions the planner into [`ParquetMorselPlanner::Waiting`] - /// - /// 3. returns a [`MorselPlan`] containing the boxed I/O future for the - /// caller to poll. - /// - fn schedule_io(&mut self, future: F) -> MorselPlan + fn schedule_io(future: F) -> BlockedPlannerContinuation where F: Future> + Send + 'static, { - let (output_for_future, output) = mpsc::channel(); - let io_future = async move { - let next_state = future.await?; - output_for_future.send(Ok(next_state)).map_err(|e| { - DataFusionError::Execution(format!("failed to send planner output: {e}")) - })?; - Ok(()) - } - .boxed(); - *self = ParquetMorselPlanner::Waiting(output); - MorselPlan::new().with_io_future(io_future) + BlockedPlannerContinuation::new( + async move { Ok(Box::new(Self(future.await?)) as Box) } + .boxed(), + ) } } impl MorselPlanner for ParquetMorselPlanner { - fn plan(&mut self) -> Result> { + fn step(self: Box) -> Result { + let mut current_state = self.0; + loop { - let planner = mem::replace(self, ParquetMorselPlanner::Planning); - let state = match planner { - ParquetMorselPlanner::Ready(state) => state, - ParquetMorselPlanner::Waiting(output) => { - output - .try_recv() - .map_err(|e| { - // IO wasn't done - *self = ParquetMorselPlanner::Errored; - match e { - TryRecvError::Empty => internal_datafusion_err!( - "planner polled before I/O completed" - ), - TryRecvError::Disconnected => internal_datafusion_err!( - "planner polled after I/O disconnected" - ), - } - })? - .inspect_err(|_| { - // IO completed successfully, but the IO was an error - *self = ParquetMorselPlanner::Errored; - })? - } - ParquetMorselPlanner::Planning => { - return internal_err!( - "ParquetMorselPlanner::plan was re-entered before previous plan completed" - ); - } - ParquetMorselPlanner::Errored => { - return internal_err!( - "ParquetMorselPlanner::plan called after a previous error" - ); - } - }; - // check for end of stream - if let ParquetOpenState::Done = state { - *self = ParquetMorselPlanner::Ready(ParquetOpenState::Done); - return Ok(None); - }; + if let ParquetOpenState::Done = current_state { + return Ok(PlannerStep::Done); + } - let state = state.transition().inspect_err(|_| { - *self = ParquetMorselPlanner::Errored; - })?; + let state = current_state.transition()?; match state { #[cfg(feature = "parquet_encryption")] ParquetOpenState::LoadEncryption(future) => { - return Ok(Some(self.schedule_io(async move { - Ok(ParquetOpenState::PruneFile(future.await?)) - }))); + return Ok(PlannerStep::Ready( + MorselPlan::new().with_blocked_continuation(Self::schedule_io( + async move { Ok(ParquetOpenState::PruneFile(future.await?)) }, + )), + )); } ParquetOpenState::LoadMetadata(future) => { - return Ok(Some(self.schedule_io(async move { - Ok(ParquetOpenState::PrepareFilters(Box::new(future.await?))) - }))); + return Ok(PlannerStep::Ready( + MorselPlan::new().with_blocked_continuation(Self::schedule_io( + async move { + Ok(ParquetOpenState::PrepareFilters(Box::new( + future.await?, + ))) + }, + )), + )); } ParquetOpenState::LoadPageIndex(future) => { - return Ok(Some(self.schedule_io(async move { - Ok(ParquetOpenState::PruneWithStatistics(Box::new( - future.await?, - ))) - }))); + return Ok(PlannerStep::Ready( + MorselPlan::new().with_blocked_continuation(Self::schedule_io( + async move { + Ok(ParquetOpenState::PruneWithStatistics(Box::new( + future.await?, + ))) + }, + )), + )); } ParquetOpenState::LoadBloomFilters(future) => { - return Ok(Some(self.schedule_io(async move { - Ok(ParquetOpenState::PruneWithBloomFilters(Box::new( - future.await?, - ))) - }))); + return Ok(PlannerStep::Ready( + MorselPlan::new().with_blocked_continuation(Self::schedule_io( + async move { + Ok(ParquetOpenState::PruneWithBloomFilters(Box::new( + future.await?, + ))) + }, + )), + )); } ParquetOpenState::Ready(stream) => { let morsels: Vec> = vec![Box::new(ParquetStreamMorsel::new(stream))]; - return Ok(Some(MorselPlan::new().with_morsels(morsels))); + return Ok(PlannerStep::Ready( + MorselPlan::new().with_morsels(morsels), + )); } - ParquetOpenState::Done => return Ok(None), + ParquetOpenState::Done => return Ok(PlannerStep::Done), cpu_state => { - *self = ParquetMorselPlanner::Ready(cpu_state); + current_state = cpu_state; } } } diff --git a/datafusion/datasource/src/morsel/mod.rs b/datafusion/datasource/src/morsel/mod.rs index 4150bc12b4b2..6c846e69f155 100644 --- a/datafusion/datasource/src/morsel/mod.rs +++ b/datafusion/datasource/src/morsel/mod.rs @@ -28,6 +28,7 @@ use std::fmt::Debug; use crate::PartitionedFile; use arrow::array::RecordBatch; use datafusion_common::Result; +use futures::Future; use futures::future::BoxFuture; use futures::stream::BoxStream; @@ -77,31 +78,99 @@ pub trait Morselizer: Send + Sync + Debug { /// ensure that CPU work doesn't block or slow down I/O work, but this is not /// strictly required by the API. pub trait MorselPlanner: Send + Debug { - /// Attempt to plan morsels. This may involve CPU work, such as parsing - /// parquet metadata and evaluating pruning predicates. + /// Advance this planner by one step. /// - /// It should NOT do any I/O work, such as reading from the file. If I/O is - /// required, the returned [`MorselPlan`] should contain a future that the - /// caller polls to drive the I/O work to completion. Once the future is - /// complete, the caller can call `plan` again to get the next morsels. + /// This may involve CPU work, such as parsing parquet metadata and + /// evaluating pruning predicates. /// - /// Note this function is **not async** to make it explicitly clear that if - /// I/O is required, it should be done in the returned `io_future`. + /// It should NOT do any I/O work, such as reading from the file. If I/O is + /// required, the returned [`MorselPlan`] may contain a + /// [`BlockedPlannerContinuation`], whose future owns the blocked continuation and + /// resolves to the next [`MorselPlanner`]. /// - /// Returns `None` if the planner has no more work to do. + /// Note this function is **not async** to make it explicitly clear that I/O + /// must be driven by the returned [`BlockedPlannerContinuation`]. /// - /// # Empty Morsel Plans + /// # Lifecycle /// - /// It may return `None`, which means no batches will be read from the file - /// (e.g. due to late-pruning based on statistics). + /// A planner moves between CPU-ready and I/O-blocked states: /// - /// # Output Ordering + /// ```text + /// Box + /// | + /// | step() + /// v + /// PlannerStep::Ready(MorselPlan) + /// | + /// | may contain: + /// | - morsels + /// | - child planners + /// | - ready_continuation + /// | - blocked_continuation + /// v + /// BlockedPlannerContinuation + /// | + /// | poll / run I/O + /// v + /// Box + /// ``` /// - /// See the comments on [`MorselPlan`] for the logical output order. - fn plan(&mut self) -> Result>; + /// Returns [`PlannerStep::Done`] if the planner has no more work to do. + fn step(self: Box) -> Result; } -/// Return result of [`MorselPlanner::plan`]. +/// Result of advancing a [`MorselPlanner`]. +pub enum PlannerStep { + /// CPU work produced morsels and/or child planners immediately. + Ready(MorselPlan), + /// The planner has no more work to do. + Done, +} + +impl Debug for PlannerStep { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Ready(plan) => f.debug_tuple("PlannerStep::Ready").field(plan).finish(), + Self::Done => f.debug_tuple("PlannerStep::Done").finish(), + } + } +} + +/// A named future that owns the blocked continuation of a [`MorselPlanner`]. +/// +/// This is not just "some I/O future". It is the suspended remainder of the +/// planner state machine: once the required I/O completes, polling this future +/// yields the next CPU-ready planner. +pub struct BlockedPlannerContinuation { + future: BoxFuture<'static, Result>>, +} + +impl BlockedPlannerContinuation { + /// Create a new blocked continuation future. + pub fn new(future: BoxFuture<'static, Result>>) -> Self { + Self { future } + } +} + +impl Debug for BlockedPlannerContinuation { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BlockedPlannerContinuation") + .finish_non_exhaustive() + } +} + +impl Future for BlockedPlannerContinuation { + type Output = Result>; + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + self.future.as_mut().poll(cx) + } +} + +/// Return result of [`PlannerStep::Ready`]. /// /// # Logical Ordering /// @@ -109,18 +178,49 @@ pub trait MorselPlanner: Send + Debug { /// a [`MorselPlanner`] is logically defined as follows: /// 1. All morsels that are directly produced /// 2. Recursively, all morsels produced by the returned `planners` +/// +/// # Scheduler View +/// +/// A plan may hand the scheduler four kinds of work at once: +/// +/// ```text +/// MorselPlan +/// ├── morsels: ready output work +/// ├── planners: ready child CPU work +/// ├── ready_continuation: same planner, still CPU-ready +/// └── blocked_continuation: same planner, blocked on I/O +/// ``` +/// +/// This lets a single `step()` return ready work immediately while also handing +/// off the blocked remainder to a more concurrent I/O executor. #[derive(Default)] pub struct MorselPlan { /// Any morsels that are ready for processing. morsels: Vec>, /// Any newly-created planners that are ready for CPU work. planners: Vec>, - /// A future that will drive any I/O work to completion. - /// - /// DataFusion will poll this future occasionally to drive the I/O work to - /// completion. Once the future resolves, DataFusion will call `plan` again - /// to get the next morsels. - io_future: Option>>, + /// The same planner's immediate CPU-ready continuation. + ready_continuation: Option>, + /// A blocked continuation that can be polled independently while ready work + /// is being consumed. + blocked_continuation: Option, +} + +impl Debug for MorselPlan { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MorselPlan") + .field("morsels", &self.morsels.len()) + .field("planners", &self.planners.len()) + .field( + "ready_continuation", + &self.ready_continuation.as_ref().map(|_| ""), + ) + .field( + "blocked_continuation", + &self.blocked_continuation.as_ref().map(|_| ""), + ) + .finish() + } } impl MorselPlan { @@ -141,9 +241,21 @@ impl MorselPlan { self } - /// Set the pending I/O future. - pub fn with_io_future(mut self, io_future: BoxFuture<'static, Result<()>>) -> Self { - self.io_future = Some(io_future); + /// Set the same planner's immediate CPU-ready continuation. + pub fn with_ready_continuation( + mut self, + ready_continuation: Box, + ) -> Self { + self.ready_continuation = Some(ready_continuation); + self + } + + /// Set the blocked continuation that can be polled independently. + pub fn with_blocked_continuation( + mut self, + blocked_continuation: BlockedPlannerContinuation, + ) -> Self { + self.blocked_continuation = Some(blocked_continuation); self } @@ -157,18 +269,23 @@ impl MorselPlan { std::mem::take(&mut self.planners) } - /// Take the pending I/O future, if any. - pub fn take_io_future(&mut self) -> Option>> { - self.io_future.take() + /// Take the same planner's immediate CPU-ready continuation, if any. + pub fn take_ready_continuation(&mut self) -> Option> { + self.ready_continuation.take() + } + + /// Take the blocked continuation, if any. + pub fn take_blocked_continuation(&mut self) -> Option { + self.blocked_continuation.take() } - /// Set the pending I/O future. - pub fn set_io_future(&mut self, io_future: BoxFuture<'static, Result<()>>) { - self.io_future = Some(io_future); + /// Returns `true` if this plan contains a CPU-ready continuation. + pub fn has_ready_continuation(&self) -> bool { + self.ready_continuation.is_some() } - /// Returns `true` if this plan contains an I/O future. - pub fn has_io_future(&self) -> bool { - self.io_future.is_some() + /// Returns `true` if this plan contains a blocked continuation. + pub fn has_blocked_continuation(&self) -> bool { + self.blocked_continuation.is_some() } } From 75bb48c135d3013f7e57b9ed1feff890926858b6 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 6 Apr 2026 22:52:48 -0500 Subject: [PATCH 3/6] Prevent planner continuation overwrite in parquet adapter --- datafusion/datasource-parquet/src/opener.rs | 38 ++++++++++++++------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 0e3ecc1cdee9..66461d9fb19c 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -430,8 +430,8 @@ impl ParquetOpenState { /// /// Implements state machine described in [`ParquetOpenState`] struct ParquetOpenFuture { - planner: Option>, - blocked_continuation: Option, + ready_planners: VecDeque>, + blocked_continuations: VecDeque, ready_morsels: VecDeque>, } @@ -441,8 +441,8 @@ impl ParquetOpenFuture { partitioned_file: PartitionedFile, ) -> Result { Ok(Self { - planner: Some(morselizer.plan_file(partitioned_file)?), - blocked_continuation: None, + ready_planners: vec![morselizer.plan_file(partitioned_file)?].into(), + blocked_continuations: VecDeque::new(), ready_morsels: VecDeque::new(), }) } @@ -453,12 +453,19 @@ impl Future for ParquetOpenFuture { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - // If waiting on IO, poll the blocked continuation until it yields - // the next CPU-ready planner. - if let Some(blocked_continuation) = self.blocked_continuation.as_mut() { - let planner = ready!(blocked_continuation.poll_unpin(cx))?; - self.blocked_continuation = None; - self.planner = Some(planner); + // If waiting on IO, poll the oldest blocked continuation until it + // yields the next CPU-ready planner. + if let Some(mut blocked_continuation) = self.blocked_continuations.pop_front() { + match blocked_continuation.poll_unpin(cx) { + Poll::Pending => { + self.blocked_continuations + .push_front(blocked_continuation); + } + Poll::Ready(Ok(planner)) => { + self.ready_planners.push_back(planner); + } + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + } } // have a morsel ready to go, return that @@ -466,7 +473,7 @@ impl Future for ParquetOpenFuture { return Poll::Ready(Ok(morsel.into_stream())); } - let Some(planner) = self.planner.take() else { + let Some(planner) = self.ready_planners.pop_front() else { return Poll::Ready(Ok(futures::stream::empty().boxed())); }; @@ -480,8 +487,13 @@ impl Future for ParquetOpenFuture { } self.ready_morsels = plan.take_morsels().into(); - self.planner = plan.take_ready_continuation(); - self.blocked_continuation = plan.take_blocked_continuation(); + if let Some(ready_continuation) = plan.take_ready_continuation() { + self.ready_planners.push_back(ready_continuation); + } + if let Some(blocked_continuation) = plan.take_blocked_continuation() { + self.blocked_continuations + .push_back(blocked_continuation); + } } PlannerStep::Done => { return Poll::Ready(Ok(futures::stream::empty().boxed())); From d1bf5e5658dacc1e0cf5b37e873c50fcda7ae6a2 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 6 Apr 2026 22:57:44 -0500 Subject: [PATCH 4/6] restore comment --- datafusion/datasource-parquet/src/opener.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 66461d9fb19c..d59b04a2bc4a 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -473,6 +473,7 @@ impl Future for ParquetOpenFuture { return Poll::Ready(Ok(morsel.into_stream())); } + // Planner did not produce any stream (for example, it pruned the entire file) let Some(planner) = self.ready_planners.pop_front() else { return Poll::Ready(Ok(futures::stream::empty().boxed())); }; From c7b478010fbe13bc05934346965dad4bb3c1ed54 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 6 Apr 2026 23:30:02 -0500 Subject: [PATCH 5/6] Minimize morsel type-state prototype diff --- datafusion/datasource-parquet/src/opener.rs | 133 ++++++++-------- datafusion/datasource/src/morsel/mod.rs | 167 ++++++-------------- 2 files changed, 110 insertions(+), 190 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index d59b04a2bc4a..252f56f134c4 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -29,8 +29,7 @@ use arrow::datatypes::DataType; use datafusion_common::internal_err; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::morsel::{ - BlockedPlannerContinuation, Morsel, MorselPlan, MorselPlanner, Morselizer, - PlannerStep, + Morsel, MorselPlan, MorselPlanner, Morselizer, PendingMorselPlanner, }; use datafusion_physical_expr::projection::{ProjectionExprs, Projector}; use datafusion_physical_expr::utils::reassign_expr_columns; @@ -431,7 +430,7 @@ impl ParquetOpenState { /// Implements state machine described in [`ParquetOpenState`] struct ParquetOpenFuture { ready_planners: VecDeque>, - blocked_continuations: VecDeque, + pending_io: VecDeque, ready_morsels: VecDeque>, } @@ -442,7 +441,7 @@ impl ParquetOpenFuture { ) -> Result { Ok(Self { ready_planners: vec![morselizer.plan_file(partitioned_file)?].into(), - blocked_continuations: VecDeque::new(), + pending_io: VecDeque::new(), ready_morsels: VecDeque::new(), }) } @@ -455,11 +454,10 @@ impl Future for ParquetOpenFuture { loop { // If waiting on IO, poll the oldest blocked continuation until it // yields the next CPU-ready planner. - if let Some(mut blocked_continuation) = self.blocked_continuations.pop_front() { - match blocked_continuation.poll_unpin(cx) { + if let Some(mut io_future) = self.pending_io.pop_front() { + match io_future.poll_unpin(cx) { Poll::Pending => { - self.blocked_continuations - .push_front(blocked_continuation); + self.pending_io.push_front(io_future); } Poll::Ready(Ok(planner)) => { self.ready_planners.push_back(planner); @@ -478,27 +476,29 @@ impl Future for ParquetOpenFuture { return Poll::Ready(Ok(futures::stream::empty().boxed())); }; - match planner.step()? { - PlannerStep::Ready(mut plan) => { - let child_planners = plan.take_planners(); - if !child_planners.is_empty() { - return Poll::Ready(internal_err!( - "Parquet FileOpener adapter does not support child morsel planners" - )); - } + let Some(mut plan) = planner.plan()? else { + return Poll::Ready(Ok(futures::stream::empty().boxed())); + }; - self.ready_morsels = plan.take_morsels().into(); - if let Some(ready_continuation) = plan.take_ready_continuation() { - self.ready_planners.push_back(ready_continuation); - } - if let Some(blocked_continuation) = plan.take_blocked_continuation() { - self.blocked_continuations - .push_back(blocked_continuation); - } - } - PlannerStep::Done => { - return Poll::Ready(Ok(futures::stream::empty().boxed())); - } + let child_planners = plan.take_planners(); + if !child_planners.is_empty() { + return Poll::Ready(internal_err!( + "Parquet FileOpener adapter does not support child morsel planners" + )); + } + + self.ready_morsels = plan.take_morsels().into(); + if let Some(io_future) = plan.take_io_future() { + self.pending_io.push_back(io_future); + } + + if self.ready_morsels.is_empty() + && self.ready_planners.is_empty() + && self.pending_io.is_empty() + { + return Poll::Ready(internal_err!( + "planner returned an empty morsel plan" + )); } } } @@ -555,24 +555,37 @@ impl ParquetMorselPlanner { } /// Schedule an I/O future that resolves to the planner's next owned state. - fn schedule_io(future: F) -> BlockedPlannerContinuation + /// + /// This helper + /// + /// 1. packages the next [`ParquetOpenState`] into a future that yields the + /// next CPU-ready planner once the I/O future completes, + /// + /// 2. moves the blocked continuation into that future rather than keeping a + /// `Waiting` planner state around, + /// + /// 3. returns a [`MorselPlan`] containing the boxed I/O future for the + /// caller to poll. + /// + fn schedule_io(future: F) -> MorselPlan where F: Future> + Send + 'static, { - BlockedPlannerContinuation::new( + let io_future = PendingMorselPlanner::new( async move { Ok(Box::new(Self(future.await?)) as Box) } .boxed(), - ) + ); + MorselPlan::new().with_io_future(io_future) } } impl MorselPlanner for ParquetMorselPlanner { - fn step(self: Box) -> Result { + fn plan(self: Box) -> Result> { let mut current_state = self.0; loop { if let ParquetOpenState::Done = current_state { - return Ok(PlannerStep::Done); + return Ok(None); } let state = current_state.transition()?; @@ -580,53 +593,35 @@ impl MorselPlanner for ParquetMorselPlanner { match state { #[cfg(feature = "parquet_encryption")] ParquetOpenState::LoadEncryption(future) => { - return Ok(PlannerStep::Ready( - MorselPlan::new().with_blocked_continuation(Self::schedule_io( - async move { Ok(ParquetOpenState::PruneFile(future.await?)) }, - )), - )); + return Ok(Some(Self::schedule_io(async move { + Ok(ParquetOpenState::PruneFile(future.await?)) + }))); } ParquetOpenState::LoadMetadata(future) => { - return Ok(PlannerStep::Ready( - MorselPlan::new().with_blocked_continuation(Self::schedule_io( - async move { - Ok(ParquetOpenState::PrepareFilters(Box::new( - future.await?, - ))) - }, - )), - )); + return Ok(Some(Self::schedule_io(async move { + Ok(ParquetOpenState::PrepareFilters(Box::new(future.await?))) + }))); } ParquetOpenState::LoadPageIndex(future) => { - return Ok(PlannerStep::Ready( - MorselPlan::new().with_blocked_continuation(Self::schedule_io( - async move { - Ok(ParquetOpenState::PruneWithStatistics(Box::new( - future.await?, - ))) - }, - )), - )); + return Ok(Some(Self::schedule_io(async move { + Ok(ParquetOpenState::PruneWithStatistics(Box::new( + future.await?, + ))) + }))); } ParquetOpenState::LoadBloomFilters(future) => { - return Ok(PlannerStep::Ready( - MorselPlan::new().with_blocked_continuation(Self::schedule_io( - async move { - Ok(ParquetOpenState::PruneWithBloomFilters(Box::new( - future.await?, - ))) - }, - )), - )); + return Ok(Some(Self::schedule_io(async move { + Ok(ParquetOpenState::PruneWithBloomFilters(Box::new( + future.await?, + ))) + }))); } ParquetOpenState::Ready(stream) => { let morsels: Vec> = vec![Box::new(ParquetStreamMorsel::new(stream))]; - return Ok(PlannerStep::Ready( - MorselPlan::new().with_morsels(morsels), - )); + return Ok(Some(MorselPlan::new().with_morsels(morsels))); } - ParquetOpenState::Done => return Ok(PlannerStep::Done), + ParquetOpenState::Done => return Ok(None), cpu_state => { current_state = cpu_state; } diff --git a/datafusion/datasource/src/morsel/mod.rs b/datafusion/datasource/src/morsel/mod.rs index 6c846e69f155..3939a6ba4dc8 100644 --- a/datafusion/datasource/src/morsel/mod.rs +++ b/datafusion/datasource/src/morsel/mod.rs @@ -78,62 +78,31 @@ pub trait Morselizer: Send + Sync + Debug { /// ensure that CPU work doesn't block or slow down I/O work, but this is not /// strictly required by the API. pub trait MorselPlanner: Send + Debug { - /// Advance this planner by one step. - /// - /// This may involve CPU work, such as parsing parquet metadata and - /// evaluating pruning predicates. + /// Attempt to plan morsels. This may involve CPU work, such as parsing + /// parquet metadata and evaluating pruning predicates. /// /// It should NOT do any I/O work, such as reading from the file. If I/O is - /// required, the returned [`MorselPlan`] may contain a - /// [`BlockedPlannerContinuation`], whose future owns the blocked continuation and - /// resolves to the next [`MorselPlanner`]. + /// required, the returned [`MorselPlan`] should contain a future that owns + /// the blocked continuation and resolves to the next [`MorselPlanner`]. /// - /// Note this function is **not async** to make it explicitly clear that I/O - /// must be driven by the returned [`BlockedPlannerContinuation`]. + /// Taking ownership of `self` encodes that contract in the type system: + /// once `plan` returns an `io_future`, there is no planner value left that + /// a caller could accidentally poll again before the I/O completes. /// - /// # Lifecycle + /// Note this function is **not async** to make it explicitly clear that if + /// I/O is required, it should be done in the returned `io_future`. /// - /// A planner moves between CPU-ready and I/O-blocked states: + /// Returns `None` if the planner has no more work to do. /// - /// ```text - /// Box - /// | - /// | step() - /// v - /// PlannerStep::Ready(MorselPlan) - /// | - /// | may contain: - /// | - morsels - /// | - child planners - /// | - ready_continuation - /// | - blocked_continuation - /// v - /// BlockedPlannerContinuation - /// | - /// | poll / run I/O - /// v - /// Box - /// ``` + /// # Empty Morsel Plans /// - /// Returns [`PlannerStep::Done`] if the planner has no more work to do. - fn step(self: Box) -> Result; -} - -/// Result of advancing a [`MorselPlanner`]. -pub enum PlannerStep { - /// CPU work produced morsels and/or child planners immediately. - Ready(MorselPlan), - /// The planner has no more work to do. - Done, -} - -impl Debug for PlannerStep { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Ready(plan) => f.debug_tuple("PlannerStep::Ready").field(plan).finish(), - Self::Done => f.debug_tuple("PlannerStep::Done").finish(), - } - } + /// It may return `None`, which means no batches will be read from the file + /// (e.g. due to late-pruning based on statistics). + /// + /// # Output Ordering + /// + /// See the comments on [`MorselPlan`] for the logical output order. + fn plan(self: Box) -> Result>; } /// A named future that owns the blocked continuation of a [`MorselPlanner`]. @@ -141,25 +110,28 @@ impl Debug for PlannerStep { /// This is not just "some I/O future". It is the suspended remainder of the /// planner state machine: once the required I/O completes, polling this future /// yields the next CPU-ready planner. -pub struct BlockedPlannerContinuation { +/// +/// This avoids the previous runtime protocol of "planner is waiting, so don't +/// call `plan` again yet": the blocked continuation has moved into this future. +pub struct PendingMorselPlanner { future: BoxFuture<'static, Result>>, } -impl BlockedPlannerContinuation { +impl PendingMorselPlanner { /// Create a new blocked continuation future. pub fn new(future: BoxFuture<'static, Result>>) -> Self { Self { future } } } -impl Debug for BlockedPlannerContinuation { +impl Debug for PendingMorselPlanner { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("BlockedPlannerContinuation") + f.debug_struct("PendingMorselPlanner") .finish_non_exhaustive() } } -impl Future for BlockedPlannerContinuation { +impl Future for PendingMorselPlanner { type Output = Result>; fn poll( @@ -170,7 +142,7 @@ impl Future for BlockedPlannerContinuation { } } -/// Return result of [`PlannerStep::Ready`]. +/// Return result of [`MorselPlanner::plan`]. /// /// # Logical Ordering /// @@ -178,49 +150,19 @@ impl Future for BlockedPlannerContinuation { /// a [`MorselPlanner`] is logically defined as follows: /// 1. All morsels that are directly produced /// 2. Recursively, all morsels produced by the returned `planners` -/// -/// # Scheduler View -/// -/// A plan may hand the scheduler four kinds of work at once: -/// -/// ```text -/// MorselPlan -/// ├── morsels: ready output work -/// ├── planners: ready child CPU work -/// ├── ready_continuation: same planner, still CPU-ready -/// └── blocked_continuation: same planner, blocked on I/O -/// ``` -/// -/// This lets a single `step()` return ready work immediately while also handing -/// off the blocked remainder to a more concurrent I/O executor. #[derive(Default)] pub struct MorselPlan { /// Any morsels that are ready for processing. morsels: Vec>, /// Any newly-created planners that are ready for CPU work. planners: Vec>, - /// The same planner's immediate CPU-ready continuation. - ready_continuation: Option>, - /// A blocked continuation that can be polled independently while ready work - /// is being consumed. - blocked_continuation: Option, -} - -impl Debug for MorselPlan { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("MorselPlan") - .field("morsels", &self.morsels.len()) - .field("planners", &self.planners.len()) - .field( - "ready_continuation", - &self.ready_continuation.as_ref().map(|_| ""), - ) - .field( - "blocked_continuation", - &self.blocked_continuation.as_ref().map(|_| ""), - ) - .finish() - } + /// A future that will drive any I/O work to completion and yield the next + /// CPU-ready planner. + /// + /// DataFusion will poll this future occasionally to drive the I/O work to + /// completion. Once the future resolves, the returned planner is ready for + /// another call to `plan`. + io_future: Option, } impl MorselPlan { @@ -241,21 +183,9 @@ impl MorselPlan { self } - /// Set the same planner's immediate CPU-ready continuation. - pub fn with_ready_continuation( - mut self, - ready_continuation: Box, - ) -> Self { - self.ready_continuation = Some(ready_continuation); - self - } - - /// Set the blocked continuation that can be polled independently. - pub fn with_blocked_continuation( - mut self, - blocked_continuation: BlockedPlannerContinuation, - ) -> Self { - self.blocked_continuation = Some(blocked_continuation); + /// Set the pending I/O future. + pub fn with_io_future(mut self, io_future: PendingMorselPlanner) -> Self { + self.io_future = Some(io_future); self } @@ -269,23 +199,18 @@ impl MorselPlan { std::mem::take(&mut self.planners) } - /// Take the same planner's immediate CPU-ready continuation, if any. - pub fn take_ready_continuation(&mut self) -> Option> { - self.ready_continuation.take() - } - - /// Take the blocked continuation, if any. - pub fn take_blocked_continuation(&mut self) -> Option { - self.blocked_continuation.take() + /// Take the pending I/O future, if any. + pub fn take_io_future(&mut self) -> Option { + self.io_future.take() } - /// Returns `true` if this plan contains a CPU-ready continuation. - pub fn has_ready_continuation(&self) -> bool { - self.ready_continuation.is_some() + /// Set the pending I/O future. + pub fn set_io_future(&mut self, io_future: PendingMorselPlanner) { + self.io_future = Some(io_future); } - /// Returns `true` if this plan contains a blocked continuation. - pub fn has_blocked_continuation(&self) -> bool { - self.blocked_continuation.is_some() + /// Returns `true` if this plan contains an I/O future. + pub fn has_io_future(&self) -> bool { + self.io_future.is_some() } } From e4f3cfe1be98d613feb9137fe1fa59596b788f03 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 7 Apr 2026 08:28:47 -0500 Subject: [PATCH 6/6] fix --- datafusion/datasource-parquet/src/opener.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 252f56f134c4..77bb855ddf8c 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -471,6 +471,12 @@ impl Future for ParquetOpenFuture { return Poll::Ready(Ok(morsel.into_stream())); } + // If all remaining work is blocked on I/O, wait for it to yield the + // next planner rather than incorrectly reporting an empty stream. + if self.ready_planners.is_empty() && !self.pending_io.is_empty() { + return Poll::Pending; + } + // Planner did not produce any stream (for example, it pruned the entire file) let Some(planner) = self.ready_planners.pop_front() else { return Poll::Ready(Ok(futures::stream::empty().boxed()));