Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions quickwit/quickwit-datafusion/tests/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,15 @@ async fn test_multi_statement_sql_with_semicolons_in_literals_and_comments() {
SELECT COUNT(*) AS cnt FROM "test-semi"
"#;

let stream = quickwit_datafusion::DataFusionService::new(Arc::clone(&builder))
.execute_sql(sql, &std::collections::HashMap::new())
let properties = std::collections::HashMap::new();
let execution = quickwit_datafusion::DataFusionService::new(Arc::clone(&builder))
.execute(quickwit_datafusion::DataFusionRequest::records(
quickwit_datafusion::DataFusionInput::Sql(sql),
&properties,
))
.await
.unwrap();
let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
let batches: Vec<RecordBatch> = execution.stream.try_collect().await.unwrap();
assert_eq!(total_rows(&batches), 1);
let cnt = batches[0]
.column_by_name("cnt")
Expand Down
60 changes: 29 additions & 31 deletions quickwit/quickwit-df-core/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::proto::{
ExecuteSqlRequest, ExecuteSqlResponse, ExecuteSubstraitRequest, ExecuteSubstraitResponse,
data_fusion_service_server,
};
use crate::service::DataFusionService;
use crate::service::{DataFusionInput, DataFusionOutput, DataFusionRequest, DataFusionService};

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum GrpcErrorKind {
Expand Down Expand Up @@ -167,42 +167,37 @@ impl data_fusion_service_server::DataFusionService for DataFusionServiceGrpcImpl
let req = request.into_inner();
let service = Arc::clone(&self.service);

// Route to the appropriate DataFusionService method:
// - substrait_plan_bytes: production path (pre-encoded protobuf)
// - substrait_plan_json: dev/tooling path (grpcurl, rollup JSON files)
// When `explain` is set, the server returns the EXPLAIN output
// (no storage I/O) instead of executing the plan.
let stream = match (
let input = match (
!req.substrait_plan_bytes.is_empty(),
!req.substrait_plan_json.is_empty(),
req.explain,
) {
(true, _, false) => service
.execute_substrait(&req.substrait_plan_bytes, &req.properties)
.await
.map_err(|err| df_error_to_status(&err))?,
(true, _, true) => service
.explain_substrait(&req.substrait_plan_bytes, &req.properties)
.await
.map_err(|err| df_error_to_status(&err))?,
(false, true, false) => service
.execute_substrait_json(&req.substrait_plan_json, &req.properties)
.await
.map_err(|err| df_error_to_status(&err))?,
(false, true, true) => service
.explain_substrait_json(&req.substrait_plan_json, &req.properties)
.await
.map_err(|err| df_error_to_status(&err))?,
_ => {
(true, _) => DataFusionInput::SubstraitBytes(&req.substrait_plan_bytes),
(false, true) => DataFusionInput::SubstraitJson(&req.substrait_plan_json),
(false, false) => {
return Err(tonic::Status::invalid_argument(
"either substrait_plan_bytes or substrait_plan_json must be set",
));
}
};
let output = if req.explain {
DataFusionOutput::Explain
} else {
DataFusionOutput::Records
};

let response_stream = map_batch_stream(stream, |ipc_bytes| ExecuteSubstraitResponse {
arrow_ipc_bytes: ipc_bytes,
});
let execution = service
.execute(DataFusionRequest {
input,
output,
properties: &req.properties,
})
.await
.map_err(|err| df_error_to_status(&err))?;

let response_stream =
map_batch_stream(execution.stream, |ipc_bytes| ExecuteSubstraitResponse {
arrow_ipc_bytes: ipc_bytes,
});
Ok(tonic::Response::new(response_stream))
}

Expand All @@ -213,15 +208,18 @@ impl data_fusion_service_server::DataFusionService for DataFusionServiceGrpcImpl
let req = request.into_inner();
let service = Arc::clone(&self.service);

let stream = service
.execute_sql(&req.sql, &req.properties)
let execution = service
.execute(DataFusionRequest::records(
DataFusionInput::Sql(&req.sql),
&req.properties,
))
.await
.map_err(|err| {
warn!(error = %err, "DataFusion SQL execution error");
df_error_to_status(&err)
})?;

let response_stream = map_batch_stream(stream, |ipc_bytes| ExecuteSqlResponse {
let response_stream = map_batch_stream(execution.stream, |ipc_bytes| ExecuteSqlResponse {
arrow_ipc_bytes: ipc_bytes,
});
Ok(tonic::Response::new(response_stream))
Expand Down
8 changes: 7 additions & 1 deletion quickwit/quickwit-df-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,13 @@ pub use data_source::{
};
pub use datafusion::execution::SendableRecordBatchStream;
pub use datafusion_distributed::{Worker, WorkerResolver};
pub use service::DataFusionService;
pub use service::{
DataFusionExecution, DataFusionExecutionMetadata, DataFusionExecutionStatistics,
DataFusionInput, DataFusionInputMetadata, DataFusionMetricStatistics, DataFusionOutput,
DataFusionPhysicalPlanMetadata, DataFusionPlanMetricStatistics, DataFusionPruningStatistics,
DataFusionRatioStatistics, DataFusionRequest, DataFusionService, SubstraitExecution,
SubstraitExecutionMetadata,
};
pub use session::DataFusionSessionBuilder;
pub use task_estimator::DataSourceExecPartitionEstimator;
pub use worker::build_worker;
Loading
Loading