diff --git a/quickwit/quickwit-datafusion/tests/metrics.rs b/quickwit/quickwit-datafusion/tests/metrics.rs index 66f8572b79f..bc48939331f 100644 --- a/quickwit/quickwit-datafusion/tests/metrics.rs +++ b/quickwit/quickwit-datafusion/tests/metrics.rs @@ -276,11 +276,15 @@ async fn test_multi_statement_sql_with_semicolons_in_literals_and_comments() { SELECT COUNT(*) AS cnt FROM "test-semi" "#; - let stream = quickwit_datafusion::DataFusionService::new(Arc::clone(&builder)) - .execute_sql(sql, &std::collections::HashMap::new()) + let properties = std::collections::HashMap::new(); + let execution = quickwit_datafusion::DataFusionService::new(Arc::clone(&builder)) + .execute(quickwit_datafusion::DataFusionRequest::records( + quickwit_datafusion::DataFusionInput::Sql(sql), + &properties, + )) .await .unwrap(); - let batches: Vec = stream.try_collect().await.unwrap(); + let batches: Vec = execution.stream.try_collect().await.unwrap(); assert_eq!(total_rows(&batches), 1); let cnt = batches[0] .column_by_name("cnt") diff --git a/quickwit/quickwit-df-core/src/grpc.rs b/quickwit/quickwit-df-core/src/grpc.rs index 0f12165a40f..d93fcefd252 100644 --- a/quickwit/quickwit-df-core/src/grpc.rs +++ b/quickwit/quickwit-df-core/src/grpc.rs @@ -48,7 +48,7 @@ use crate::proto::{ ExecuteSqlRequest, ExecuteSqlResponse, ExecuteSubstraitRequest, ExecuteSubstraitResponse, data_fusion_service_server, }; -use crate::service::DataFusionService; +use crate::service::{DataFusionInput, DataFusionOutput, DataFusionRequest, DataFusionService}; #[derive(Clone, Copy, Debug, Eq, PartialEq)] enum GrpcErrorKind { @@ -167,42 +167,37 @@ impl data_fusion_service_server::DataFusionService for DataFusionServiceGrpcImpl let req = request.into_inner(); let service = Arc::clone(&self.service); - // Route to the appropriate DataFusionService method: - // - substrait_plan_bytes: production path (pre-encoded protobuf) - // - substrait_plan_json: dev/tooling path (grpcurl, rollup JSON files) - // When `explain` is set, the server returns the EXPLAIN output - // (no storage I/O) instead of executing the plan. - let stream = match ( + let input = match ( !req.substrait_plan_bytes.is_empty(), !req.substrait_plan_json.is_empty(), - req.explain, ) { - (true, _, false) => service - .execute_substrait(&req.substrait_plan_bytes, &req.properties) - .await - .map_err(|err| df_error_to_status(&err))?, - (true, _, true) => service - .explain_substrait(&req.substrait_plan_bytes, &req.properties) - .await - .map_err(|err| df_error_to_status(&err))?, - (false, true, false) => service - .execute_substrait_json(&req.substrait_plan_json, &req.properties) - .await - .map_err(|err| df_error_to_status(&err))?, - (false, true, true) => service - .explain_substrait_json(&req.substrait_plan_json, &req.properties) - .await - .map_err(|err| df_error_to_status(&err))?, - _ => { + (true, _) => DataFusionInput::SubstraitBytes(&req.substrait_plan_bytes), + (false, true) => DataFusionInput::SubstraitJson(&req.substrait_plan_json), + (false, false) => { return Err(tonic::Status::invalid_argument( "either substrait_plan_bytes or substrait_plan_json must be set", )); } }; + let output = if req.explain { + DataFusionOutput::Explain + } else { + DataFusionOutput::Records + }; - let response_stream = map_batch_stream(stream, |ipc_bytes| ExecuteSubstraitResponse { - arrow_ipc_bytes: ipc_bytes, - }); + let execution = service + .execute(DataFusionRequest { + input, + output, + properties: &req.properties, + }) + .await + .map_err(|err| df_error_to_status(&err))?; + + let response_stream = + map_batch_stream(execution.stream, |ipc_bytes| ExecuteSubstraitResponse { + arrow_ipc_bytes: ipc_bytes, + }); Ok(tonic::Response::new(response_stream)) } @@ -213,15 +208,18 @@ impl data_fusion_service_server::DataFusionService for DataFusionServiceGrpcImpl let req = request.into_inner(); let service = Arc::clone(&self.service); - let stream = service - .execute_sql(&req.sql, &req.properties) + let execution = service + .execute(DataFusionRequest::records( + DataFusionInput::Sql(&req.sql), + &req.properties, + )) .await .map_err(|err| { warn!(error = %err, "DataFusion SQL execution error"); df_error_to_status(&err) })?; - let response_stream = map_batch_stream(stream, |ipc_bytes| ExecuteSqlResponse { + let response_stream = map_batch_stream(execution.stream, |ipc_bytes| ExecuteSqlResponse { arrow_ipc_bytes: ipc_bytes, }); Ok(tonic::Response::new(response_stream)) diff --git a/quickwit/quickwit-df-core/src/lib.rs b/quickwit/quickwit-df-core/src/lib.rs index 51f27ff47b6..853aced59a3 100644 --- a/quickwit/quickwit-df-core/src/lib.rs +++ b/quickwit/quickwit-df-core/src/lib.rs @@ -43,7 +43,12 @@ pub use data_source::{ }; pub use datafusion::execution::SendableRecordBatchStream; pub use datafusion_distributed::{Worker, WorkerResolver}; -pub use service::DataFusionService; +pub use service::{ + DataFusionExecution, DataFusionExecutionMetadata, DataFusionExecutionStatistics, + DataFusionInput, DataFusionInputMetadata, DataFusionMetricStatistics, DataFusionOutput, + DataFusionPhysicalPlanMetadata, DataFusionPlanMetricStatistics, DataFusionPruningStatistics, + DataFusionRatioStatistics, DataFusionRequest, DataFusionService, +}; pub use session::DataFusionSessionBuilder; pub use task_estimator::DataSourceExecPartitionEstimator; pub use worker::build_worker; diff --git a/quickwit/quickwit-df-core/src/service.rs b/quickwit/quickwit-df-core/src/service.rs index 940f4d90366..fc56f8806b0 100644 --- a/quickwit/quickwit-df-core/src/service.rs +++ b/quickwit/quickwit-df-core/src/service.rs @@ -14,45 +14,319 @@ //! Pure-Rust DataFusion query execution service. //! -//! [`DataFusionService`] is the core query execution entry point: it holds an -//! `Arc` and exposes `execute_substrait` and -//! `execute_sql` methods that return streaming `RecordBatch` iterators. +//! [`DataFusionService`] is the core query execution entry point. It has no +//! tonic or gRPC dependencies: protocol adapters translate their request shape +//! into a [`DataFusionRequest`] and call [`DataFusionService::execute`]. //! -//! ## No tonic / gRPC coupling -//! -//! This struct has zero gRPC dependencies. `quickwit_df_core::grpc` -//! provides the tonic adapter that encodes batches as Arrow IPC, and -//! `quickwit-serve/src/datafusion_api/setup.rs` mounts that service in OSS. -//! A downstream caller can do the same from its own handler, calling -//! `execute_substrait(&[u8])` and streaming the resulting batches in its own -//! proto response format. -//! -//! ## Usage -//! -//! ```ignore -//! use std::sync::Arc; -//! use quickwit_datafusion::{DataFusionService, DataFusionSessionBuilder}; -//! -//! let builder = Arc::new(DataFusionSessionBuilder::new().with_runtime_plugin(my_plugin)); -//! let service = DataFusionService::new(Arc::clone(&builder)); -//! -//! let mut stream = service.execute_substrait(&plan_bytes).await?; -//! while let Some(batch) = stream.next().await { -//! // handle batch -//! } -//! ``` +//! SQL/Substrait input, normal/explain output, and metadata collection all share +//! the same planning path. This keeps the service API small and makes metadata a +//! property of every execution instead of a second set of methods. use std::collections::HashMap; +use std::fmt; use std::sync::Arc; +use std::time::{Duration, Instant}; +use arrow::array::{ArrayRef, StringArray}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; use datafusion::error::{DataFusionError, Result as DFResult}; use datafusion::execution::SendableRecordBatchStream; +use datafusion::logical_expr::LogicalPlan; +use datafusion::physical_plan::display::DisplayableExecutionPlan; +use datafusion::physical_plan::metrics::{Metric, MetricType, MetricValue, MetricsSet}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{ExecutionPlan, execute_stream}; use datafusion::prelude::SessionContext; use datafusion::sql::sqlparser::dialect::dialect_from_str; use datafusion::sql::sqlparser::tokenizer::{Token, Tokenizer}; +use datafusion_distributed::{ + DistributedExec, DistributedMetricsFormat, NetworkBoundaryExt, display_plan_ascii, + rewrite_distributed_plan_with_metrics, +}; +use datafusion_substrait::substrait::proto::Plan as SubstraitPlan; +use futures::StreamExt; use crate::session::DataFusionSessionBuilder; +#[derive(Clone, Copy)] +pub enum DataFusionInput<'a> { + Sql(&'a str), + SubstraitBytes(&'a [u8]), + SubstraitJson(&'a str), +} + +impl DataFusionInput<'_> { + fn summary(self) -> (&'static str, usize) { + match self { + Self::Sql(sql) => ("sql", sql.len()), + Self::SubstraitBytes(plan_bytes) => ("substrait_bytes", plan_bytes.len()), + Self::SubstraitJson(plan_json) => ("substrait_json", plan_json.len()), + } + } +} + +impl fmt::Debug for DataFusionInput<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let (kind, len) = self.summary(); + f.debug_struct("DataFusionInput") + .field("kind", &kind) + .field("len", &len) + .finish() + } +} + +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub enum DataFusionOutput { + #[default] + Records, + Explain, + ExplainAnalyze, +} + +#[derive(Clone, Copy)] +pub struct DataFusionRequest<'a> { + pub input: DataFusionInput<'a>, + pub output: DataFusionOutput, + pub properties: &'a HashMap, +} + +impl<'a> DataFusionRequest<'a> { + pub fn records( + input: DataFusionInput<'a>, + properties: &'a HashMap, + ) -> DataFusionRequest<'a> { + Self { + input, + output: DataFusionOutput::Records, + properties, + } + } + + pub fn explain( + input: DataFusionInput<'a>, + properties: &'a HashMap, + ) -> DataFusionRequest<'a> { + Self { + input, + output: DataFusionOutput::Explain, + properties, + } + } + + pub fn explain_analyze( + input: DataFusionInput<'a>, + properties: &'a HashMap, + ) -> DataFusionRequest<'a> { + Self { + input, + output: DataFusionOutput::ExplainAnalyze, + properties, + } + } +} + +impl fmt::Debug for DataFusionRequest<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("DataFusionRequest") + .field("input", &self.input) + .field("output", &self.output) + .field("num_properties", &self.properties.len()) + .finish() + } +} + +pub struct DataFusionExecution { + pub stream: SendableRecordBatchStream, + pub metadata: DataFusionExecutionMetadata, +} + +pub struct DataFusionExecutionMetadata { + pub input: DataFusionInputMetadata, + pub output: DataFusionOutput, + pub logical_plan: String, + pub physical_plan: Arc, + pub input_decode_duration: Duration, + pub logical_planning_duration: Duration, + pub physical_planning_duration: Duration, + pub stream_creation_duration: Duration, + pub analyze_execution_duration: Option, + pub analyze_output_rows: Option, +} + +#[derive(Debug, Clone)] +pub struct DataFusionPhysicalPlanMetadata { + pub display: String, + pub statistics: DataFusionExecutionStatistics, +} + +#[derive(Debug, Clone, Default)] +pub struct DataFusionExecutionStatistics { + pub num_stages: usize, + pub num_tasks: usize, + pub elapsed_compute: Option, + pub repartition_time: Option, + pub time_elapsed_processing: Option, + pub output_rows: Option, + pub output_bytes: Option, + pub output_batches: Option, + pub spill_count: Option, + pub spilled_bytes: Option, + pub spilled_rows: Option, + pub aggregate_metrics: Vec, + pub plan_metrics: Vec, +} + +#[derive(Debug, Clone)] +pub struct DataFusionPlanMetricStatistics { + pub node_path: String, + pub node_name: String, + pub metric: DataFusionMetricStatistics, +} + +#[derive(Debug, Clone)] +pub struct DataFusionMetricStatistics { + pub name: String, + pub partition: Option, + pub metric_type: &'static str, + pub value: Option, + pub display_value: String, + pub labels: Vec<(String, String)>, + pub pruning: Option, + pub ratio: Option, +} + +#[derive(Debug, Clone)] +pub struct DataFusionPruningStatistics { + pub pruned: usize, + pub matched: usize, + pub fully_matched: usize, +} + +#[derive(Debug, Clone)] +pub struct DataFusionRatioStatistics { + pub part: usize, + pub total: usize, +} + +impl DataFusionExecutionMetadata { + pub fn physical_plan_metadata(&self) -> DataFusionPhysicalPlanMetadata { + physical_plan_metadata(&self.physical_plan) + } +} + +impl DataFusionExecutionStatistics { + pub fn to_json_string(&self) -> String { + self.to_json_value().to_string() + } + + fn to_json_value(&self) -> serde_json::Value { + serde_json::json!({ + "num_stages": self.num_stages, + "num_tasks": self.num_tasks, + "elapsed_compute_ms": self.elapsed_compute.map(duration_as_millis_f64), + "repartition_time_ms": self.repartition_time.map(duration_as_millis_f64), + "time_elapsed_processing_ms": self.time_elapsed_processing.map(duration_as_millis_f64), + "output_rows": self.output_rows, + "output_bytes": self.output_bytes, + "output_batches": self.output_batches, + "spill_count": self.spill_count, + "spilled_bytes": self.spilled_bytes, + "spilled_rows": self.spilled_rows, + "aggregate_metrics": self + .aggregate_metrics + .iter() + .map(DataFusionMetricStatistics::to_json_value) + .collect::>(), + "plan_metrics": self + .plan_metrics + .iter() + .map(DataFusionPlanMetricStatistics::to_json_value) + .collect::>(), + }) + } +} + +impl DataFusionPlanMetricStatistics { + fn to_json_value(&self) -> serde_json::Value { + serde_json::json!({ + "node_path": self.node_path, + "node_name": self.node_name, + "metric": self.metric.to_json_value(), + }) + } +} + +impl DataFusionMetricStatistics { + fn to_json_value(&self) -> serde_json::Value { + serde_json::json!({ + "name": self.name, + "partition": self.partition, + "metric_type": self.metric_type, + "value": self.value, + "display_value": self.display_value, + "labels": self + .labels + .iter() + .map(|(name, value)| serde_json::json!({ + "name": name, + "value": value, + })) + .collect::>(), + "pruning": self.pruning.as_ref().map(DataFusionPruningStatistics::to_json_value), + "ratio": self.ratio.as_ref().map(DataFusionRatioStatistics::to_json_value), + }) + } +} + +impl DataFusionPruningStatistics { + fn to_json_value(&self) -> serde_json::Value { + serde_json::json!({ + "pruned": self.pruned, + "matched": self.matched, + "fully_matched": self.fully_matched, + "total": self.pruned + self.matched, + }) + } +} + +impl DataFusionRatioStatistics { + fn to_json_value(&self) -> serde_json::Value { + serde_json::json!({ + "part": self.part, + "total": self.total, + }) + } +} + +pub enum DataFusionInputMetadata { + Sql { sql: String }, + Substrait { plan_json: String }, +} + +impl DataFusionInputMetadata { + pub fn sql(&self) -> Option<&str> { + match self { + Self::Sql { sql } => Some(sql.as_str()), + Self::Substrait { .. } => None, + } + } + + pub fn substrait_plan_json(&self) -> Option<&str> { + match self { + Self::Sql { .. } => None, + Self::Substrait { plan_json } => Some(plan_json.as_str()), + } + } +} + +struct PreparedLogicalPlan { + input: DataFusionInputMetadata, + logical_plan: LogicalPlan, + input_decode_duration: Duration, + logical_planning_duration: Duration, +} + /// Split a SQL string into top-level statements using the configured dialect's /// tokenizer rather than raw string splitting. pub fn split_sql_statements(ctx: &SessionContext, sql: &str) -> DFResult> { @@ -128,143 +402,404 @@ impl DataFusionService { Self { builder } } - /// Execute a Substrait plan encoded as protobuf bytes. - /// - /// Builds a fresh session via the underlying `DataFusionSessionBuilder`, - /// decodes the plan, and returns a streaming `RecordBatch` iterator. - /// The caller decides whether to collect, send via gRPC, or pipe to Arrow - /// Flight — no materialization happens inside this method. - /// - /// `properties` flows from `ExecuteSubstraitRequest.properties` into - /// `SessionConfig` overrides (e.g. `execution.target_partitions`). Pass - /// an empty map when no overrides apply. - #[tracing::instrument(skip(self, plan_bytes, properties), fields(plan_bytes_len = plan_bytes.len()))] - pub async fn execute_substrait( - &self, - plan_bytes: &[u8], - properties: &HashMap, - ) -> DFResult { + /// Execute a SQL or Substrait query and return both the lazy output stream + /// and planning metadata. + #[tracing::instrument(skip(self, request), fields(output = ?request.output))] + pub async fn execute(&self, request: DataFusionRequest<'_>) -> DFResult { + let (input_kind, input_len) = request.input.summary(); tracing::info!( - plan_bytes_len = plan_bytes.len(), - num_properties = properties.len(), - "executing substrait plan" + input_kind, + input_len, + output = ?request.output, + num_properties = request.properties.len(), + "executing DataFusion query" ); - use datafusion_substrait::substrait::proto::Plan; - use prost::Message; - let plan = Plan::decode(plan_bytes) - .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?; + let ctx = self + .builder + .build_session_with_properties(request.properties)?; + let prepared = self.prepare_logical_plan(request.input, &ctx).await?; + let logical_plan_display = prepared.logical_plan.display_indent().to_string(); + + let physical_planning_start = Instant::now(); + let physical_plan = ctx + .state() + .create_physical_plan(&prepared.logical_plan) + .await?; + let physical_planning_duration = physical_planning_start.elapsed(); + + let stream_creation_start = Instant::now(); + let mut analyze_execution_duration = None; + let mut analyze_output_rows = None; + let stream = match request.output { + DataFusionOutput::Records => { + execute_stream(Arc::clone(&physical_plan), ctx.task_ctx())? + } + DataFusionOutput::Explain => { + explain_stream(&logical_plan_display, &physical_plan, None)? + } + DataFusionOutput::ExplainAnalyze => { + let analyze_execution_start = Instant::now(); + let num_rows = + execute_plan_for_metrics(Arc::clone(&physical_plan), ctx.task_ctx()).await?; + analyze_execution_duration = Some(analyze_execution_start.elapsed()); + analyze_output_rows = Some(num_rows); + let physical_plan_metadata = physical_plan_metadata(&physical_plan); + explain_stream( + &logical_plan_display, + &physical_plan, + Some(&physical_plan_metadata), + )? + } + }; + let stream_creation_duration = stream_creation_start.elapsed(); - self.execute_substrait_plan(&plan, properties).await + Ok(DataFusionExecution { + stream, + metadata: DataFusionExecutionMetadata { + input: prepared.input, + output: request.output, + logical_plan: logical_plan_display, + physical_plan, + input_decode_duration: prepared.input_decode_duration, + logical_planning_duration: prepared.logical_planning_duration, + physical_planning_duration, + stream_creation_duration, + analyze_execution_duration, + analyze_output_rows, + }, + }) } - /// Execute a Substrait plan from its proto3 JSON representation. - /// - /// Accepts the JSON format produced by DataFusion's `to_substrait_plan` + - /// `serde_json::to_string`, or the `rollup_substrait.json` format used in - /// integration tests and dev tooling. - /// - /// This is the dev/tooling path — grpcurl and Python scripts can pass the - /// plan as a JSON string without pre-encoding to binary protobuf. - pub async fn execute_substrait_json( + async fn prepare_logical_plan( &self, - plan_json: &str, - properties: &HashMap, - ) -> DFResult { - use datafusion_substrait::substrait::proto::Plan; + input: DataFusionInput<'_>, + ctx: &SessionContext, + ) -> DFResult { + match input { + DataFusionInput::Sql(sql) => self.prepare_sql(sql, ctx).await, + DataFusionInput::SubstraitBytes(plan_bytes) => { + self.prepare_substrait_bytes(plan_bytes, ctx).await + } + DataFusionInput::SubstraitJson(plan_json) => { + self.prepare_substrait_json(plan_json, ctx).await + } + } + } - let plan: Plan = serde_json::from_str(plan_json).map_err(|e| { - datafusion::error::DataFusionError::Plan(format!("invalid Substrait plan JSON: {e}")) - })?; + async fn prepare_sql(&self, sql: &str, ctx: &SessionContext) -> DFResult { + let input_decode_start = Instant::now(); + let mut statements = split_sql_statements(ctx, sql)?; + let input_decode_duration = input_decode_start.elapsed(); - self.execute_substrait_plan(&plan, properties).await - } + let last = statements + .pop() + .ok_or_else(|| DataFusionError::Plan("no SQL statements provided".to_string()))?; - async fn execute_substrait_plan( - &self, - plan: &datafusion_substrait::substrait::proto::Plan, - properties: &HashMap, - ) -> DFResult { - let ctx = self.builder.build_session_with_properties(properties)?; - crate::substrait::execute_substrait_plan_streaming( - plan, - &ctx, - self.builder.substrait_extensions(), - ) - .await + for statement in statements { + ctx.sql(&statement).await?.collect().await?; + } + + let logical_planning_start = Instant::now(); + let df = ctx.sql(&last).await?; + let logical_plan = df.into_optimized_plan()?; + let logical_planning_duration = logical_planning_start.elapsed(); + + Ok(PreparedLogicalPlan { + input: DataFusionInputMetadata::Sql { + sql: sql.to_string(), + }, + logical_plan, + input_decode_duration, + logical_planning_duration, + }) } - /// Like [`execute_substrait`], but returns the EXPLAIN output instead of - /// running the plan. The returned stream emits `(plan_type, plan)` rows - /// — same shape as a SQL `EXPLAIN VERBOSE`. - pub async fn explain_substrait( + async fn prepare_substrait_bytes( &self, plan_bytes: &[u8], - properties: &HashMap, - ) -> DFResult { - use datafusion_substrait::substrait::proto::Plan; + ctx: &SessionContext, + ) -> DFResult { use prost::Message; - let plan = Plan::decode(plan_bytes) - .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?; - self.explain_substrait_plan(&plan, properties).await + + let input_decode_start = Instant::now(); + let plan = SubstraitPlan::decode(plan_bytes) + .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?; + let plan_json = substrait_plan_json(&plan)?; + let input_decode_duration = input_decode_start.elapsed(); + + self.prepare_substrait_plan(plan, plan_json, input_decode_duration, ctx) + .await } - /// JSON variant of [`explain_substrait`] — same semantics, proto3-JSON input. - pub async fn explain_substrait_json( + async fn prepare_substrait_json( &self, plan_json: &str, - properties: &HashMap, - ) -> DFResult { - use datafusion_substrait::substrait::proto::Plan; - let plan: Plan = serde_json::from_str(plan_json).map_err(|e| { - datafusion::error::DataFusionError::Plan(format!("invalid Substrait plan JSON: {e}")) + ctx: &SessionContext, + ) -> DFResult { + let input_decode_start = Instant::now(); + let plan: SubstraitPlan = serde_json::from_str(plan_json).map_err(|err| { + datafusion::error::DataFusionError::Plan(format!("invalid Substrait plan JSON: {err}")) })?; - self.explain_substrait_plan(&plan, properties).await + let input_decode_duration = input_decode_start.elapsed(); + + self.prepare_substrait_plan(plan, plan_json.to_string(), input_decode_duration, ctx) + .await } - async fn explain_substrait_plan( + async fn prepare_substrait_plan( &self, - plan: &datafusion_substrait::substrait::proto::Plan, - properties: &HashMap, - ) -> DFResult { - let ctx = self.builder.build_session_with_properties(properties)?; - crate::substrait::explain_substrait_plan_streaming( - plan, - &ctx, + plan: SubstraitPlan, + plan_json: String, + input_decode_duration: Duration, + ctx: &SessionContext, + ) -> DFResult { + let state = ctx.state(); + let logical_planning_start = Instant::now(); + let logical_plan = crate::substrait::logical_plan_from_substrait( + &plan, + &state, self.builder.substrait_extensions(), ) - .await - } - - /// Execute one or more SQL statements from a single SQL string. - /// - /// DDL statements (e.g. `CREATE EXTERNAL TABLE`) are executed for side - /// effects. The last statement produces the result stream. - /// - /// Returns an error if `sql` contains no statements, or if any statement - /// fails to parse or execute. - #[tracing::instrument(skip(self, sql, properties), fields(sql_len = sql.len()))] - pub async fn execute_sql( - &self, - sql: &str, - properties: &HashMap, - ) -> DFResult { - tracing::info!( - sql_len = sql.len(), - num_properties = properties.len(), - "executing SQL query" + .await?; + let logical_planning_duration = logical_planning_start.elapsed(); + + tracing::debug!( + plan = %logical_plan.display_indent(), + "substrait plan converted to DataFusion logical plan" ); - let ctx = self.builder.build_session_with_properties(properties)?; - let mut statements = split_sql_statements(&ctx, sql)?; - let last = statements - .pop() - .ok_or_else(|| DataFusionError::Plan("no SQL statements provided".to_string()))?; - for statement in statements { - ctx.sql(&statement).await?.collect().await?; + Ok(PreparedLogicalPlan { + input: DataFusionInputMetadata::Substrait { plan_json }, + logical_plan, + input_decode_duration, + logical_planning_duration, + }) + } +} + +fn substrait_plan_json(plan: &SubstraitPlan) -> DFResult { + serde_json::to_string(plan).map_err(|err| { + datafusion::error::DataFusionError::Plan(format!( + "failed to serialize Substrait plan JSON: {err}" + )) + }) +} + +async fn execute_plan_for_metrics( + physical_plan: Arc, + task_ctx: Arc, +) -> DFResult { + let mut stream = execute_stream(physical_plan, task_ctx)?; + let mut num_rows = 0; + while let Some(batch) = stream.next().await { + num_rows += batch?.num_rows(); + } + Ok(num_rows) +} + +fn explain_stream( + logical_plan: &str, + physical_plan: &Arc, + physical_plan_metadata: Option<&DataFusionPhysicalPlanMetadata>, +) -> DFResult { + let schema = explain_schema(); + let physical_plan_display_text; + let mut plan_types = vec!["logical_plan".to_string(), "physical_plan".to_string()]; + let mut plans = vec![logical_plan.to_string()]; + if let Some(physical_plan_metadata) = physical_plan_metadata { + plans.push(physical_plan_metadata.display.clone()); + plan_types.push("execution_statistics_json".to_string()); + plans.push(physical_plan_metadata.statistics.to_json_string()); + } else { + physical_plan_display_text = physical_plan_display(physical_plan); + plans.push(physical_plan_display_text); + } + let plan_type: ArrayRef = Arc::new(StringArray::from(plan_types)); + let plan: ArrayRef = Arc::new(StringArray::from(plans)); + let batch = + RecordBatch::try_new(Arc::clone(&schema), vec![plan_type, plan]).map_err(|err| { + DataFusionError::Execution(format!("failed to build explain output: {err}")) + })?; + + Ok(Box::pin(RecordBatchStreamAdapter::new( + schema, + futures::stream::iter(vec![Ok(batch)]), + ))) +} + +fn explain_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("plan_type", DataType::Utf8, false), + Field::new("plan", DataType::Utf8, false), + ])) +} + +fn physical_plan_metadata( + physical_plan: &Arc, +) -> DataFusionPhysicalPlanMetadata { + let (physical_plan, show_metrics) = physical_plan_with_metrics(physical_plan); + let display = if physical_plan.as_any().is::() { + display_plan_ascii(physical_plan.as_ref(), show_metrics) + } else { + DisplayableExecutionPlan::with_metrics(physical_plan.as_ref()) + .indent(false) + .to_string() + }; + let statistics = extract_execution_statistics(physical_plan.as_ref()); + DataFusionPhysicalPlanMetadata { + display, + statistics, + } +} + +fn physical_plan_display(physical_plan: &Arc) -> String { + physical_plan_metadata(physical_plan).display +} + +fn physical_plan_with_metrics( + physical_plan: &Arc, +) -> (Arc, bool) { + if physical_plan.as_any().is::() { + return match rewrite_distributed_plan_with_metrics( + Arc::clone(physical_plan), + DistributedMetricsFormat::PerTask, + ) { + Ok(physical_plan) => (physical_plan, true), + Err(_) => (Arc::clone(physical_plan), false), + }; + } + + (Arc::clone(physical_plan), true) +} + +fn extract_execution_statistics( + physical_plan: &(dyn ExecutionPlan + 'static), +) -> DataFusionExecutionStatistics { + let mut statistics = DataFusionExecutionStatistics { + num_stages: 1, + num_tasks: 1, + ..Default::default() + }; + let mut all_metrics = MetricsSet::new(); + collect_plan_metrics( + physical_plan, + "0".to_string(), + &mut all_metrics, + &mut statistics, + ); + let aggregate_metrics = all_metrics.aggregate_by_name().sorted_for_display(); + statistics.elapsed_compute = aggregate_metrics + .elapsed_compute() + .map(duration_from_nanos_usize); + statistics.repartition_time = aggregate_metrics + .sum_by_name("repartition_time") + .map(|metric| duration_from_nanos_usize(metric.as_usize())); + statistics.time_elapsed_processing = aggregate_metrics + .sum_by_name("time_elapsed_processing") + .map(|metric| duration_from_nanos_usize(metric.as_usize())); + statistics.output_rows = aggregate_metrics.output_rows(); + statistics.output_bytes = aggregate_metrics + .sum(|metric| matches!(metric.value(), MetricValue::OutputBytes(_))) + .map(|metric| metric.as_usize()); + statistics.output_batches = aggregate_metrics + .sum(|metric| matches!(metric.value(), MetricValue::OutputBatches(_))) + .map(|metric| metric.as_usize()); + statistics.spill_count = aggregate_metrics.spill_count(); + statistics.spilled_bytes = aggregate_metrics.spilled_bytes(); + statistics.spilled_rows = aggregate_metrics.spilled_rows(); + statistics.aggregate_metrics = aggregate_metrics + .iter() + .map(|metric| metric_statistics(metric)) + .collect(); + statistics +} + +fn collect_plan_metrics( + physical_plan: &(dyn ExecutionPlan + 'static), + node_path: String, + all_metrics: &mut MetricsSet, + statistics: &mut DataFusionExecutionStatistics, +) { + if let Some(network_boundary) = physical_plan.as_network_boundary() { + statistics.num_tasks += network_boundary.input_stage().tasks.len(); + statistics.num_stages += 1; + } + + if let Some(metrics) = physical_plan.metrics() { + for metric in metrics.iter() { + all_metrics.push(Arc::clone(metric)); + statistics + .plan_metrics + .push(DataFusionPlanMetricStatistics { + node_path: node_path.clone(), + node_name: physical_plan.name().to_string(), + metric: metric_statistics(metric), + }); } + } - let df = ctx.sql(&last).await?; - df.execute_stream().await + for (idx, child) in physical_plan.children().into_iter().enumerate() { + collect_plan_metrics( + child.as_ref(), + format!("{node_path}.{idx}"), + all_metrics, + statistics, + ); + } +} + +fn metric_statistics(metric: &Metric) -> DataFusionMetricStatistics { + let value = metric.value(); + let pruning = match value { + MetricValue::PruningMetrics { + pruning_metrics, .. + } => Some(DataFusionPruningStatistics { + pruned: pruning_metrics.pruned(), + matched: pruning_metrics.matched(), + fully_matched: pruning_metrics.fully_matched(), + }), + _ => None, + }; + let ratio = match value { + MetricValue::Ratio { ratio_metrics, .. } => Some(DataFusionRatioStatistics { + part: ratio_metrics.part(), + total: ratio_metrics.total(), + }), + _ => None, + }; + + DataFusionMetricStatistics { + name: value.name().to_string(), + partition: metric.partition(), + metric_type: metric_type_name(metric.metric_type()), + value: if pruning.is_some() || ratio.is_some() { + None + } else { + Some(value.as_usize()) + }, + display_value: value.to_string(), + labels: metric + .labels() + .iter() + .map(|label| (label.name().to_string(), label.value().to_string())) + .collect(), + pruning, + ratio, + } +} + +fn metric_type_name(metric_type: MetricType) -> &'static str { + match metric_type { + MetricType::SUMMARY => "summary", + MetricType::DEV => "dev", } } + +fn duration_from_nanos_usize(nanos: usize) -> Duration { + Duration::from_nanos(nanos.min(u64::MAX as usize) as u64) +} + +fn duration_as_millis_f64(duration: Duration) -> f64 { + duration.as_secs_f64() * 1_000.0 +} diff --git a/quickwit/quickwit-df-core/src/substrait.rs b/quickwit/quickwit-df-core/src/substrait.rs index 35e826b067b..385fa9a1d2b 100644 --- a/quickwit/quickwit-df-core/src/substrait.rs +++ b/quickwit/quickwit-df-core/src/substrait.rs @@ -99,6 +99,17 @@ impl<'a> QuickwitSubstraitConsumer<'a> { } } +pub(crate) async fn logical_plan_from_substrait( + plan: &Plan, + state: &SessionState, + extensions_for_reads: &[Arc], +) -> DFResult { + let extensions = Extensions::try_from(&plan.extensions) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + let consumer = QuickwitSubstraitConsumer::new(&extensions, state, extensions_for_reads); + from_substrait_plan_with_consumer(&consumer, plan).await +} + #[async_trait] impl SubstraitConsumer for QuickwitSubstraitConsumer<'_> { // ── Required boilerplate ───────────────────────────────────────── @@ -233,11 +244,7 @@ pub async fn execute_substrait_plan( extensions_for_reads: &[Arc], ) -> DFResult> { let state = ctx.state(); - let extensions = Extensions::try_from(&plan.extensions) - .map_err(|e| DataFusionError::External(Box::new(e)))?; - - let consumer = QuickwitSubstraitConsumer::new(&extensions, &state, extensions_for_reads); - let logical_plan = from_substrait_plan_with_consumer(&consumer, plan).await?; + let logical_plan = logical_plan_from_substrait(plan, &state, extensions_for_reads).await?; tracing::debug!( plan = %logical_plan.display_indent(), @@ -266,11 +273,7 @@ pub async fn execute_substrait_plan_streaming( extensions_for_reads: &[Arc], ) -> DFResult { let state = ctx.state(); - let extensions = Extensions::try_from(&plan.extensions) - .map_err(|e| DataFusionError::External(Box::new(e)))?; - - let consumer = QuickwitSubstraitConsumer::new(&extensions, &state, extensions_for_reads); - let logical_plan = from_substrait_plan_with_consumer(&consumer, plan).await?; + let logical_plan = logical_plan_from_substrait(plan, &state, extensions_for_reads).await?; tracing::debug!( plan = %logical_plan.display_indent(), @@ -278,33 +281,5 @@ pub async fn execute_substrait_plan_streaming( ); let df = ctx.execute_logical_plan(logical_plan).await?; - let stream = df.execute_stream().await?; - Ok(stream) -} - -/// Same as `execute_substrait_plan_streaming` but returns the EXPLAIN output -/// (logical + physical plan text) instead of executing the plan. -/// -/// Substrait has no in-plan explain mode, so we expose this at the RPC layer. -/// The returned stream emits a single batch with the same schema as SQL -/// `EXPLAIN` (two columns: `plan_type`, `plan`) so the client side can reuse -/// its existing rendering. -pub async fn explain_substrait_plan_streaming( - plan: &Plan, - ctx: &datafusion::prelude::SessionContext, - extensions_for_reads: &[Arc], -) -> DFResult { - let state = ctx.state(); - let extensions = Extensions::try_from(&plan.extensions) - .map_err(|e| DataFusionError::External(Box::new(e)))?; - - let consumer = QuickwitSubstraitConsumer::new(&extensions, &state, extensions_for_reads); - let logical_plan = from_substrait_plan_with_consumer(&consumer, plan).await?; - - let df = ctx.execute_logical_plan(logical_plan).await?; - // verbose=true exposes every optimizer pass; analyze=false means we don't - // actually execute the plan (no storage I/O, no aggregation work). - let explain_df = df.explain(true, false)?; - let stream = explain_df.execute_stream().await?; - Ok(stream) + df.execute_stream().await }