diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 6621706c35c8..77bb855ddf8c 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! [`ParquetOpener`] state machine for opening Parquet files +//! [`ParquetOpener`] and [`ParquetMorselizer`] state machines for opening Parquet files use crate::page_filter::PagePruningAccessPlanFilter; use crate::row_filter::build_projection_read_plan; @@ -26,11 +26,16 @@ use crate::{ }; use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; +use datafusion_common::internal_err; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; +use datafusion_datasource::morsel::{ + Morsel, MorselPlan, MorselPlanner, Morselizer, PendingMorselPlanner, +}; use datafusion_physical_expr::projection::{ProjectionExprs, Projector}; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr_adapter::replace_columns_with_literals; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; +use std::fmt; use std::future::Future; use std::mem; use std::pin::Pin; @@ -77,12 +82,26 @@ use parquet::bloom_filter::Sbbf; use parquet::errors::ParquetError; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader}; -/// Entry point for opening a Parquet file +/// Implements [`FileOpener`] for Parquet +#[derive(Clone)] +pub(super) struct ParquetOpener { + pub(super) morselizer: ParquetMorselizer, +} + +impl FileOpener for ParquetOpener { + fn open(&self, partitioned_file: PartitionedFile) -> Result { + let future = ParquetOpenFuture::new(&self.morselizer, partitioned_file)?; + Ok(Box::pin(future)) + } +} + +/// Stateless Parquet morselizer implementation. /// /// Reading a Parquet file is a multi-stage process, with multiple CPU-intensive /// steps interspersed with I/O steps. The code in this module implements the steps /// as an explicit state machine -- see [`ParquetOpenState`] for details. -pub(super) struct ParquetOpener { +#[derive(Clone)] +pub(super) struct ParquetMorselizer { /// Execution partition index pub(crate) partition_index: usize, /// Projection to apply on top of the table schema (i.e. can reference partition columns). @@ -137,6 +156,23 @@ pub(super) struct ParquetOpener { pub reverse_row_groups: bool, } +impl fmt::Debug for ParquetMorselizer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ParquetMorselizer") + .field("partition_index", &self.partition_index) + .field("preserve_order", &self.preserve_order) + .field("enable_page_index", &self.enable_page_index) + .field("enable_bloom_filter", &self.enable_bloom_filter) + .finish() + } +} + +impl Morselizer for ParquetMorselizer { + fn plan_file(&self, file: PartitionedFile) -> Result> { + Ok(Box::new(ParquetMorselPlanner::try_new(self, file)?)) + } +} + /// States for [`ParquetOpenFuture`] /// /// These states correspond to the steps required to read and apply various @@ -216,6 +252,27 @@ enum ParquetOpenState { Done, } +impl fmt::Debug for ParquetOpenState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let state = match self { + ParquetOpenState::Start { .. } => "Start", + #[cfg(feature = "parquet_encryption")] + ParquetOpenState::LoadEncryption(_) => "LoadEncryption", + ParquetOpenState::PruneFile(_) => "PruneFile", + ParquetOpenState::LoadMetadata(_) => "LoadMetadata", + ParquetOpenState::PrepareFilters(_) => "PrepareFilters", + ParquetOpenState::LoadPageIndex(_) => "LoadPageIndex", + ParquetOpenState::PruneWithStatistics(_) => "PruneWithStatistics", + ParquetOpenState::LoadBloomFilters(_) => "LoadBloomFilters", + ParquetOpenState::PruneWithBloomFilters(_) => "PruneWithBloomFilters", + ParquetOpenState::BuildStream(_) => "BuildStream", + ParquetOpenState::Ready(_) => "Ready", + ParquetOpenState::Done => "Done", + }; + f.write_str(state) + } +} + struct PreparedParquetOpen { partition_index: usize, partitioned_file: PartitionedFile, @@ -290,37 +347,13 @@ struct BloomFiltersLoadedParquetOpen { row_group_bloom_filters: Vec, } -/// Implements state machine described in [`ParquetOpenState`] -struct ParquetOpenFuture { - state: ParquetOpenState, -} - -impl ParquetOpenFuture { - #[cfg(feature = "parquet_encryption")] - fn new(prepared: PreparedParquetOpen, encryption_context: EncryptionContext) -> Self { - Self { - state: ParquetOpenState::Start { - prepared: Box::new(prepared), - encryption_context: Arc::new(encryption_context), - }, - } - } - - #[cfg(not(feature = "parquet_encryption"))] - fn new(prepared: PreparedParquetOpen) -> Self { - Self { - state: ParquetOpenState::Start { - prepared: Box::new(prepared), - }, - } - } -} - impl ParquetOpenState { /// Applies one CPU-only state transition. /// /// `Load*` states do not transition here and are returned unchanged so the /// driver loop can poll their inner futures separately. + /// + /// Implements state machine described in [`ParquetOpenState`] fn transition(self) -> Result { match self { ParquetOpenState::Start { @@ -392,93 +425,218 @@ impl ParquetOpenState { } } +/// Adapter for a [`MorselPlanner`] to the [`FileOpener`] API +/// +/// Implements state machine described in [`ParquetOpenState`] +struct ParquetOpenFuture { + ready_planners: VecDeque>, + pending_io: VecDeque, + ready_morsels: VecDeque>, +} + +impl ParquetOpenFuture { + fn new( + morselizer: &ParquetMorselizer, + partitioned_file: PartitionedFile, + ) -> Result { + Ok(Self { + ready_planners: vec![morselizer.plan_file(partitioned_file)?].into(), + pending_io: VecDeque::new(), + ready_morsels: VecDeque::new(), + }) + } +} + impl Future for ParquetOpenFuture { type Output = Result>>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - let state = mem::replace(&mut self.state, ParquetOpenState::Done); - let mut state = state.transition()?; + // If waiting on IO, poll the oldest blocked continuation until it + // yields the next CPU-ready planner. + if let Some(mut io_future) = self.pending_io.pop_front() { + match io_future.poll_unpin(cx) { + Poll::Pending => { + self.pending_io.push_front(io_future); + } + Poll::Ready(Ok(planner)) => { + self.ready_planners.push_back(planner); + } + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + } + } + + // have a morsel ready to go, return that + if let Some(morsel) = self.ready_morsels.pop_front() { + return Poll::Ready(Ok(morsel.into_stream())); + } + + // If all remaining work is blocked on I/O, wait for it to yield the + // next planner rather than incorrectly reporting an empty stream. + if self.ready_planners.is_empty() && !self.pending_io.is_empty() { + return Poll::Pending; + } + + // Planner did not produce any stream (for example, it pruned the entire file) + let Some(planner) = self.ready_planners.pop_front() else { + return Poll::Ready(Ok(futures::stream::empty().boxed())); + }; + + let Some(mut plan) = planner.plan()? else { + return Poll::Ready(Ok(futures::stream::empty().boxed())); + }; + + let child_planners = plan.take_planners(); + if !child_planners.is_empty() { + return Poll::Ready(internal_err!( + "Parquet FileOpener adapter does not support child morsel planners" + )); + } + + self.ready_morsels = plan.take_morsels().into(); + if let Some(io_future) = plan.take_io_future() { + self.pending_io.push_back(io_future); + } + + if self.ready_morsels.is_empty() + && self.ready_planners.is_empty() + && self.pending_io.is_empty() + { + return Poll::Ready(internal_err!( + "planner returned an empty morsel plan" + )); + } + } + } +} + +/// Implements the Morsel API +struct ParquetStreamMorsel { + stream: BoxStream<'static, Result>, +} + +impl ParquetStreamMorsel { + fn new(stream: BoxStream<'static, Result>) -> Self { + Self { stream } + } +} + +impl fmt::Debug for ParquetStreamMorsel { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ParquetStreamMorsel") + .finish_non_exhaustive() + } +} + +impl Morsel for ParquetStreamMorsel { + fn into_stream(self: Box) -> BoxStream<'static, Result> { + self.stream + } +} + +/// Stateful planner for opening a single parquet file via the morsel APIs. +struct ParquetMorselPlanner(ParquetOpenState); + +impl fmt::Debug for ParquetMorselPlanner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("ParquetMorselPlanner") + .field(&self.0) + .finish() + } +} + +impl ParquetMorselPlanner { + fn try_new(morselizer: &ParquetMorselizer, file: PartitionedFile) -> Result { + let prepared = morselizer.prepare_open_file(file)?; + #[cfg(feature = "parquet_encryption")] + let state = ParquetOpenState::Start { + prepared: Box::new(prepared), + encryption_context: Arc::new(morselizer.get_encryption_context()), + }; + #[cfg(not(feature = "parquet_encryption"))] + let state = ParquetOpenState::Start { + prepared: Box::new(prepared), + }; + Ok(Self(state)) + } + + /// Schedule an I/O future that resolves to the planner's next owned state. + /// + /// This helper + /// + /// 1. packages the next [`ParquetOpenState`] into a future that yields the + /// next CPU-ready planner once the I/O future completes, + /// + /// 2. moves the blocked continuation into that future rather than keeping a + /// `Waiting` planner state around, + /// + /// 3. returns a [`MorselPlan`] containing the boxed I/O future for the + /// caller to poll. + /// + fn schedule_io(future: F) -> MorselPlan + where + F: Future> + Send + 'static, + { + let io_future = PendingMorselPlanner::new( + async move { Ok(Box::new(Self(future.await?)) as Box) } + .boxed(), + ); + MorselPlan::new().with_io_future(io_future) + } +} + +impl MorselPlanner for ParquetMorselPlanner { + fn plan(self: Box) -> Result> { + let mut current_state = self.0; + + loop { + if let ParquetOpenState::Done = current_state { + return Ok(None); + } + + let state = current_state.transition()?; match state { #[cfg(feature = "parquet_encryption")] - ParquetOpenState::LoadEncryption(mut future) => { - state = match future.poll_unpin(cx) { - Poll::Ready(result) => ParquetOpenState::PruneFile(result?), - Poll::Pending => { - self.state = ParquetOpenState::LoadEncryption(future); - return Poll::Pending; - } - }; + ParquetOpenState::LoadEncryption(future) => { + return Ok(Some(Self::schedule_io(async move { + Ok(ParquetOpenState::PruneFile(future.await?)) + }))); } - ParquetOpenState::LoadMetadata(mut future) => { - state = match future.poll_unpin(cx) { - Poll::Ready(result) => { - ParquetOpenState::PrepareFilters(Box::new(result?)) - } - Poll::Pending => { - self.state = ParquetOpenState::LoadMetadata(future); - return Poll::Pending; - } - }; + ParquetOpenState::LoadMetadata(future) => { + return Ok(Some(Self::schedule_io(async move { + Ok(ParquetOpenState::PrepareFilters(Box::new(future.await?))) + }))); } - ParquetOpenState::LoadPageIndex(mut future) => { - state = match future.poll_unpin(cx) { - Poll::Ready(result) => { - ParquetOpenState::PruneWithStatistics(Box::new(result?)) - } - Poll::Pending => { - self.state = ParquetOpenState::LoadPageIndex(future); - return Poll::Pending; - } - }; + ParquetOpenState::LoadPageIndex(future) => { + return Ok(Some(Self::schedule_io(async move { + Ok(ParquetOpenState::PruneWithStatistics(Box::new( + future.await?, + ))) + }))); } - ParquetOpenState::LoadBloomFilters(mut future) => { - state = match future.poll_unpin(cx) { - Poll::Ready(result) => { - ParquetOpenState::PruneWithBloomFilters(Box::new(result?)) - } - Poll::Pending => { - self.state = ParquetOpenState::LoadBloomFilters(future); - return Poll::Pending; - } - }; + ParquetOpenState::LoadBloomFilters(future) => { + return Ok(Some(Self::schedule_io(async move { + Ok(ParquetOpenState::PruneWithBloomFilters(Box::new( + future.await?, + ))) + }))); } ParquetOpenState::Ready(stream) => { - return Poll::Ready(Ok(stream)); + let morsels: Vec> = + vec![Box::new(ParquetStreamMorsel::new(stream))]; + return Ok(Some(MorselPlan::new().with_morsels(morsels))); } - ParquetOpenState::Done => { - return Poll::Ready(Ok(futures::stream::empty().boxed())); + ParquetOpenState::Done => return Ok(None), + cpu_state => { + current_state = cpu_state; } - - // For all other states, loop again and try to transition - // immediately. All states are explicitly listed here to ensure any - // new states are handled correctly - ParquetOpenState::Start { .. } => {} - ParquetOpenState::PruneFile(_) => {} - ParquetOpenState::PrepareFilters(_) => {} - ParquetOpenState::PruneWithStatistics(_) => {} - ParquetOpenState::PruneWithBloomFilters(_) => {} - ParquetOpenState::BuildStream(_) => {} - }; - - self.state = state; + } } } } -impl FileOpener for ParquetOpener { - fn open(&self, partitioned_file: PartitionedFile) -> Result { - let prepared = self.prepare_open_file(partitioned_file)?; - #[cfg(feature = "parquet_encryption")] - let future = ParquetOpenFuture::new(prepared, self.get_encryption_context()); - #[cfg(not(feature = "parquet_encryption"))] - let future = ParquetOpenFuture::new(prepared); - Ok(Box::pin(future)) - } -} - -impl ParquetOpener { +impl ParquetMorselizer { /// Perform the CPU-only setup for opening a parquet file. fn prepare_open_file( &self, @@ -1447,7 +1605,7 @@ impl EncryptionContext { } } -impl ParquetOpener { +impl ParquetMorselizer { #[cfg(feature = "parquet_encryption")] fn get_encryption_context(&self) -> EncryptionContext { EncryptionContext::new( @@ -1576,7 +1734,7 @@ fn should_enable_page_index( mod test { use std::sync::Arc; - use super::{ConstantColumns, constant_columns_from_stats}; + use super::{ConstantColumns, ParquetMorselizer, constant_columns_from_stats}; use crate::{DefaultParquetFileReaderFactory, RowGroupAccess, opener::ParquetOpener}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; @@ -1731,11 +1889,12 @@ mod test { ProjectionExprs::from_indices(&all_indices, &file_schema) }; - ParquetOpener { + let morselizer = ParquetMorselizer { partition_index: self.partition_index, projection, batch_size: self.batch_size, limit: self.limit, + preserve_order: self.preserve_order, predicate: self.predicate, table_schema, metadata_size_hint: self.metadata_size_hint, @@ -1757,8 +1916,8 @@ mod test { encryption_factory: None, max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, - preserve_order: self.preserve_order, - } + }; + ParquetOpener { morselizer } } } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 64a339009e9c..30c28507b0c9 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -23,8 +23,8 @@ use std::sync::Arc; use crate::DefaultParquetFileReaderFactory; use crate::ParquetFileReaderFactory; -use crate::opener::ParquetOpener; use crate::opener::build_pruning_predicates; +use crate::opener::{ParquetMorselizer, ParquetOpener}; use crate::row_filter::can_expr_be_pushed_down_with_schemas; use datafusion_common::config::ConfigOptions; #[cfg(feature = "parquet_encryption")] @@ -543,32 +543,34 @@ impl FileSource for ParquetSource { .map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap()); let opener = Arc::new(ParquetOpener { - partition_index: partition, - projection: self.projection.clone(), - batch_size: self - .batch_size - .expect("Batch size must set before creating ParquetOpener"), - limit: base_config.limit, - preserve_order: base_config.preserve_order, - predicate: self.predicate.clone(), - table_schema: self.table_schema.clone(), - metadata_size_hint: self.metadata_size_hint, - metrics: self.metrics().clone(), - parquet_file_reader_factory, - pushdown_filters: self.pushdown_filters(), - reorder_filters: self.reorder_filters(), - force_filter_selections: self.force_filter_selections(), - enable_page_index: self.enable_page_index(), - enable_bloom_filter: self.bloom_filter_on_read(), - enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, - coerce_int96, - #[cfg(feature = "parquet_encryption")] - file_decryption_properties, - expr_adapter_factory, - #[cfg(feature = "parquet_encryption")] - encryption_factory: self.get_encryption_factory_with_config(), - max_predicate_cache_size: self.max_predicate_cache_size(), - reverse_row_groups: self.reverse_row_groups, + morselizer: ParquetMorselizer { + partition_index: partition, + projection: self.projection.clone(), + batch_size: self + .batch_size + .expect("Batch size must set before creating ParquetOpener"), + limit: base_config.limit, + preserve_order: base_config.preserve_order, + predicate: self.predicate.clone(), + table_schema: self.table_schema.clone(), + metadata_size_hint: self.metadata_size_hint, + metrics: self.metrics().clone(), + parquet_file_reader_factory, + pushdown_filters: self.pushdown_filters(), + reorder_filters: self.reorder_filters(), + force_filter_selections: self.force_filter_selections(), + enable_page_index: self.enable_page_index(), + enable_bloom_filter: self.bloom_filter_on_read(), + enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, + coerce_int96, + #[cfg(feature = "parquet_encryption")] + file_decryption_properties, + expr_adapter_factory, + #[cfg(feature = "parquet_encryption")] + encryption_factory: self.get_encryption_factory_with_config(), + max_predicate_cache_size: self.max_predicate_cache_size(), + reverse_row_groups: self.reverse_row_groups, + }, }); Ok(opener) } diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index bcc4627050d4..a9600271c28c 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -38,6 +38,7 @@ pub mod file_scan_config; pub mod file_sink_config; pub mod file_stream; pub mod memory; +pub mod morsel; pub mod projection; pub mod schema_adapter; pub mod sink; diff --git a/datafusion/datasource/src/morsel/mod.rs b/datafusion/datasource/src/morsel/mod.rs new file mode 100644 index 000000000000..3939a6ba4dc8 --- /dev/null +++ b/datafusion/datasource/src/morsel/mod.rs @@ -0,0 +1,216 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Structures for Morsel Driven IO. +//! +//! Morsel Driven IO is a technique for parallelizing the reading of large files +//! by dividing them into smaller "morsels" that are processed independently. +//! +//! It is inspired by the paper [Morsel-Driven Parallelism: A NUMA-Aware Query +//! Evaluation Framework for the Many-Core Age](https://db.in.tum.de/~leis/papers/morsels.pdf). + +use std::fmt::Debug; + +use crate::PartitionedFile; +use arrow::array::RecordBatch; +use datafusion_common::Result; +use futures::Future; +use futures::future::BoxFuture; +use futures::stream::BoxStream; + +/// A Morsel of work ready to resolve to a stream of [`RecordBatch`]es. +/// +/// This represents a single morsel of work that is ready to be processed. It +/// has all data necessary (does not need any I/O) and is ready to be turned +/// into a stream of [`RecordBatch`]es for processing by the execution engine. +pub trait Morsel: Send + Debug { + /// Consume this morsel and produce a stream of [`RecordBatch`]es for processing. + /// + /// This should not do any I/O work, such as reading from the file. + fn into_stream(self: Box) -> BoxStream<'static, Result>; +} + +/// A Morselizer takes a single [`PartitionedFile`] and creates the initial planner +/// for that file. +/// +/// This is the entry point for morsel driven I/O. +pub trait Morselizer: Send + Sync + Debug { + /// Return the initial [`MorselPlanner`] for this file. + /// + /// "Morselzing" a file may involve CPU work, such as parsing parquet + /// metadata and evaluating pruning predicates. It should NOT do any I/O + /// work, such as reading from the file. If I/O is required, it should + /// return a future that the caller can poll to drive the I/O work to + /// completion, and once the future is complete, the caller can call + /// `plan_file` again for a different file. + fn plan_file(&self, file: PartitionedFile) -> Result>; +} + +/// A Morsel Planner is responsible for creating morsels for a given scan. +/// +/// The [`MorselPlanner`] is the unit of I/O. There is only ever a single I/O +/// outstanding for a specific planner. DataFusion may run +/// multiple planners in parallel, which corresponds to multiple parallel +/// I/O requests. +/// +/// It is not a Rust `Stream` so that it can explicitly separate CPU bound +/// work from I/O work. +/// +/// The design is similar to `ParquetPushDecoder`: when `plan` is called, it +/// should do CPU work to produce the next morsels or discover the next I/O +/// phase. +/// +/// Best practice is to spawn I/O in a Tokio task on a separate runtime to +/// ensure that CPU work doesn't block or slow down I/O work, but this is not +/// strictly required by the API. +pub trait MorselPlanner: Send + Debug { + /// Attempt to plan morsels. This may involve CPU work, such as parsing + /// parquet metadata and evaluating pruning predicates. + /// + /// It should NOT do any I/O work, such as reading from the file. If I/O is + /// required, the returned [`MorselPlan`] should contain a future that owns + /// the blocked continuation and resolves to the next [`MorselPlanner`]. + /// + /// Taking ownership of `self` encodes that contract in the type system: + /// once `plan` returns an `io_future`, there is no planner value left that + /// a caller could accidentally poll again before the I/O completes. + /// + /// Note this function is **not async** to make it explicitly clear that if + /// I/O is required, it should be done in the returned `io_future`. + /// + /// Returns `None` if the planner has no more work to do. + /// + /// # Empty Morsel Plans + /// + /// It may return `None`, which means no batches will be read from the file + /// (e.g. due to late-pruning based on statistics). + /// + /// # Output Ordering + /// + /// See the comments on [`MorselPlan`] for the logical output order. + fn plan(self: Box) -> Result>; +} + +/// A named future that owns the blocked continuation of a [`MorselPlanner`]. +/// +/// This is not just "some I/O future". It is the suspended remainder of the +/// planner state machine: once the required I/O completes, polling this future +/// yields the next CPU-ready planner. +/// +/// This avoids the previous runtime protocol of "planner is waiting, so don't +/// call `plan` again yet": the blocked continuation has moved into this future. +pub struct PendingMorselPlanner { + future: BoxFuture<'static, Result>>, +} + +impl PendingMorselPlanner { + /// Create a new blocked continuation future. + pub fn new(future: BoxFuture<'static, Result>>) -> Self { + Self { future } + } +} + +impl Debug for PendingMorselPlanner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PendingMorselPlanner") + .finish_non_exhaustive() + } +} + +impl Future for PendingMorselPlanner { + type Output = Result>; + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + self.future.as_mut().poll(cx) + } +} + +/// Return result of [`MorselPlanner::plan`]. +/// +/// # Logical Ordering +/// +/// For plans where the output order of rows is maintained, the output order of +/// a [`MorselPlanner`] is logically defined as follows: +/// 1. All morsels that are directly produced +/// 2. Recursively, all morsels produced by the returned `planners` +#[derive(Default)] +pub struct MorselPlan { + /// Any morsels that are ready for processing. + morsels: Vec>, + /// Any newly-created planners that are ready for CPU work. + planners: Vec>, + /// A future that will drive any I/O work to completion and yield the next + /// CPU-ready planner. + /// + /// DataFusion will poll this future occasionally to drive the I/O work to + /// completion. Once the future resolves, the returned planner is ready for + /// another call to `plan`. + io_future: Option, +} + +impl MorselPlan { + /// Create an empty morsel plan. + pub fn new() -> Self { + Self::default() + } + + /// Set the ready morsels. + pub fn with_morsels(mut self, morsels: Vec>) -> Self { + self.morsels = morsels; + self + } + + /// Set the ready child planners. + pub fn with_planners(mut self, planners: Vec>) -> Self { + self.planners = planners; + self + } + + /// Set the pending I/O future. + pub fn with_io_future(mut self, io_future: PendingMorselPlanner) -> Self { + self.io_future = Some(io_future); + self + } + + /// Take the ready morsels. + pub fn take_morsels(&mut self) -> Vec> { + std::mem::take(&mut self.morsels) + } + + /// Take the ready child planners. + pub fn take_planners(&mut self) -> Vec> { + std::mem::take(&mut self.planners) + } + + /// Take the pending I/O future, if any. + pub fn take_io_future(&mut self) -> Option { + self.io_future.take() + } + + /// Set the pending I/O future. + pub fn set_io_future(&mut self, io_future: PendingMorselPlanner) { + self.io_future = Some(io_future); + } + + /// Returns `true` if this plan contains an I/O future. + pub fn has_io_future(&self) -> bool { + self.io_future.is_some() + } +}