diff --git a/src/bin.rs b/src/bin.rs index f82a235..0188c53 100644 --- a/src/bin.rs +++ b/src/bin.rs @@ -7,8 +7,12 @@ use std::sync::Arc; use cursive::view::Resizable; +use crate::interpreter::clickhouse::PerfettoQueryScope; use crate::{ - interpreter::{ClickHouse, Context, ContextArc, options}, + interpreter::{ + ClickHouse, Context, ContextArc, Query, fetch_and_populate_perfetto_trace, + fetch_server_perfetto_sources, options, perfetto::PerfettoTraceBuilder, + }, view::Navigation, }; @@ -39,6 +43,105 @@ fn panic_hook(info: &PanicHookInfo<'_>) { ); } +fn write_perfetto_trace(path: &str, data: Vec) -> Result<()> { + std::fs::write(path, &data)?; + println!("Perfetto trace exported to {}", path); + Ok(()) +} + +async fn run_cli_perfetto_export( + options: &options::ChDigOptions, + clickhouse: &Arc, +) -> Result { + let mut perfetto_options = options.perfetto.clone(); + + match options.perfetto_command() { + Some(cmd) => { + perfetto_options.aggregated_zookeeper_log = cmd.aggregated_zookeeper_log; + perfetto_options.query_metric_log = cmd.query_metric_log; + perfetto_options.asynchronous_metric_log = cmd.asynchronous_metric_log; + perfetto_options.text_log_android = cmd.text_log; + perfetto_options.text_log = cmd.text_log; + perfetto_options.per_server = cmd.track_per_server; + if cmd.all { + perfetto_options.aggregated_zookeeper_log = true; + perfetto_options.query_metric_log = true; + perfetto_options.asynchronous_metric_log = true; + perfetto_options.text_log_android = true; + } + } + None => {} + }; + + let mut scope: PerfettoQueryScope = PerfettoQueryScope { + start: options.view.start.clone().into(), + end: options.view.end.clone().into(), + query_ids: None, + }; + let mut is_server_scope = false; + + match &options.view.query_id { + Some(query_id) => { + scope = clickhouse.get_perfetto_query_scope(&query_id).await?; + } + None => { + is_server_scope = true; + } + } + + scope.end = scope.end + chrono::TimeDelta::seconds(1); + + let query_block: clickhouse_rs::Block = clickhouse + .get_queries_for_perfetto(scope.start, scope.end, &scope.query_ids) + .await?; + let mut queries = Vec::new(); + for i in 0..query_block.row_count() { + match Query::from_clickhouse_block(&query_block, i, false) { + Ok(q) => queries.push(q), + Err(e) => log::warn!("Perfetto: failed to parse query row {}: {}", i, e), + } + } + + let mut builder = PerfettoTraceBuilder::new( + perfetto_options.per_server, + perfetto_options.text_log_android, + ); + builder.add_queries(&queries); + fetch_and_populate_perfetto_trace( + clickhouse, + &mut builder, + &perfetto_options, + scope.query_ids.as_ref().map(|v| v.as_slice()), + scope.start, + scope.end, + ) + .await; + + if is_server_scope { + fetch_server_perfetto_sources( + clickhouse, + &mut builder, + &perfetto_options, + scope.start, + scope.end, + ) + .await; + } + + let mut output = options.view.output.clone().unwrap(); + + if output == "./" { + if is_server_scope { + output += "server_perfetto_trace.pftrace" + } else { + output += &format!("{}.pftrace", options.view.query_id.as_ref().unwrap()); + } + } + write_perfetto_trace(&output, builder.build())?; + + Ok(true) +} + pub async fn chdig_main_async(itr: I) -> Result<()> where I: IntoIterator, @@ -61,6 +164,11 @@ where // panic hook will clear the screen). let clickhouse = Arc::new(ClickHouse::new(options.clickhouse.clone()).await?); + if options.perfetto_command().is_some() { + run_cli_perfetto_export(&options, &clickhouse).await?; + return Ok(()); + } + let server_warnings = match clickhouse.get_warnings().await { Ok(w) => w, Err(e) => { diff --git a/src/interpreter/clickhouse.rs b/src/interpreter/clickhouse.rs index 77c2d06..4e92812 100644 --- a/src/interpreter/clickhouse.rs +++ b/src/interpreter/clickhouse.rs @@ -55,6 +55,13 @@ pub struct TextLogArguments { pub end: RelativeDateTime, } +#[derive(Debug, Clone)] +pub struct PerfettoQueryScope { + pub query_ids: Option>, + pub start: DateTime, + pub end: DateTime, +} + #[derive(Default)] pub struct ClickHouseServerCPU { pub count: u64, @@ -1571,6 +1578,7 @@ impl ClickHouse { &self, start: DateTime, end: DateTime, + query_ids: &Option> ) -> Result { let dbtable = self.get_log_table_name("system", "query_log"); return self @@ -1602,6 +1610,7 @@ impl ClickHouse { WHERE type != 'QueryStart' AND event_date >= toDate(start_) AND event_time >= toDateTime(start_) AND event_date <= toDate(end_) AND event_time <= toDateTime(end_) + {query_ids} "#, start = start .timestamp_nanos_opt() @@ -1616,12 +1625,52 @@ impl ClickHouse { } else { "length(thread_ids)" }, + query_ids = if let Some(query_id) = query_ids { + format!("AND query_id IN ('{}')", query_id.join("','")) + } else { + String::new() + }, ) .as_str(), ) .await; } + pub async fn get_perfetto_query_scope(&self, query_id: &str) -> Result { + let query_log = self.get_table_name("system", "query_log"); + let query_id_escaped = query_id.replace('\'', "''"); + let block = self + .execute( + format!( + r#" + WITH '{query_id}' AS root_query_id + SELECT + groupUniqArray(query_id) AS query_ids, + min(query_start_time_microseconds) AS query_start_time_microseconds, + max(event_time_microseconds) AS query_end_time_microseconds + FROM {query_log} + WHERE + type != 'QueryStart' + AND (query_id = root_query_id OR initial_query_id = root_query_id) + "#, + query_id = query_id_escaped, + query_log = query_log, + ) + .as_str(), + ) + .await?; + + return Ok(PerfettoQueryScope { + query_ids: Some(block.get::, _>(0, "query_ids")?), + start: block + .get::, _>(0, "query_start_time_microseconds")? + .with_timezone(&Local), + end: block + .get::, _>(0, "query_end_time_microseconds")? + .with_timezone(&Local), + }); + } + pub async fn get_metric_log_for_perfetto( &self, start: DateTime, diff --git a/src/interpreter/mod.rs b/src/interpreter/mod.rs index 175037a..ec09e04 100644 --- a/src/interpreter/mod.rs +++ b/src/interpreter/mod.rs @@ -18,6 +18,7 @@ pub use clickhouse_quirks::ClickHouseQuirks; pub use context::Context; pub use context::ContextArc; pub use worker::Worker; +pub(crate) use worker::{fetch_and_populate_perfetto_trace, fetch_server_perfetto_sources}; pub type WorkerEvent = worker::Event; pub type Query = query::Query; diff --git a/src/interpreter/options.rs b/src/interpreter/options.rs index 5f23065..8867c1a 100644 --- a/src/interpreter/options.rs +++ b/src/interpreter/options.rs @@ -146,6 +146,81 @@ pub enum ChDigViews { Client, } +#[derive(Args, Debug, Clone)] +pub struct PerfettoCommand { + #[arg(long, action = ArgAction::SetTrue)] + pub aggregated_zookeeper_log: bool, + #[arg(long, action = ArgAction::SetTrue)] + pub query_metric_log: bool, + #[arg(long, action = ArgAction::SetTrue)] + pub asynchronous_metric_log: bool, + #[arg(long, default_value = "true")] + pub text_log: bool, + + #[arg(long, action = ArgAction::SetTrue)] + pub all: bool, + + #[arg(long, default_value = "true")] + pub track_per_server: bool, +} + +#[derive(Debug, Clone, Subcommand)] +pub enum ChDigCommand { + Queries, + LastQueries, + SlowQueries, + Merges, + S3Queue, + AzureQueue, + Mutations, + ReplicationQueue, + ReplicatedFetches, + Replicas, + Tables, + Errors, + Backups, + Dictionaries, + ServerLogs, + Loggers, + BackgroundSchedulePool, + BackgroundSchedulePoolLog, + TableParts, + AsynchronousInserts, + PartLog, + Client, + Perfetto(PerfettoCommand), +} + +impl ChDigCommand { + pub fn as_view(&self) -> Option { + match self { + ChDigCommand::Queries => Some(ChDigViews::Queries), + ChDigCommand::LastQueries => Some(ChDigViews::LastQueries), + ChDigCommand::SlowQueries => Some(ChDigViews::SlowQueries), + ChDigCommand::Merges => Some(ChDigViews::Merges), + ChDigCommand::S3Queue => Some(ChDigViews::S3Queue), + ChDigCommand::AzureQueue => Some(ChDigViews::AzureQueue), + ChDigCommand::Mutations => Some(ChDigViews::Mutations), + ChDigCommand::ReplicationQueue => Some(ChDigViews::ReplicationQueue), + ChDigCommand::ReplicatedFetches => Some(ChDigViews::ReplicatedFetches), + ChDigCommand::Replicas => Some(ChDigViews::Replicas), + ChDigCommand::Tables => Some(ChDigViews::Tables), + ChDigCommand::Errors => Some(ChDigViews::Errors), + ChDigCommand::Backups => Some(ChDigViews::Backups), + ChDigCommand::Dictionaries => Some(ChDigViews::Dictionaries), + ChDigCommand::ServerLogs => Some(ChDigViews::ServerLogs), + ChDigCommand::Loggers => Some(ChDigViews::Loggers), + ChDigCommand::BackgroundSchedulePool => Some(ChDigViews::BackgroundSchedulePool), + ChDigCommand::BackgroundSchedulePoolLog => Some(ChDigViews::BackgroundSchedulePoolLog), + ChDigCommand::TableParts => Some(ChDigViews::TableParts), + ChDigCommand::AsynchronousInserts => Some(ChDigViews::AsynchronousInserts), + ChDigCommand::PartLog => Some(ChDigViews::PartLog), + ChDigCommand::Client => Some(ChDigViews::Client), + ChDigCommand::Perfetto(_) => None, + } + } +} + #[derive(Parser, Clone)] #[command(name = "chdig")] #[command(author, version, about, long_about = None)] @@ -155,13 +230,26 @@ pub struct ChDigOptions { #[command(flatten)] pub view: ViewOptions, #[command(subcommand)] - pub start_view: Option, + pub command: Option, #[command(flatten)] pub service: ServiceOptions, #[clap(skip)] pub perfetto: ChDigPerfettoConfig, } +impl ChDigOptions { + pub fn start_view(&self) -> Option { + self.command.as_ref().and_then(ChDigCommand::as_view) + } + + pub fn perfetto_command(&self) -> Option<&PerfettoCommand> { + match &self.command { + Some(ChDigCommand::Perfetto(cmd)) => Some(cmd), + _ => None, + } + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Default, ValueEnum, Deserialize)] #[serde(rename_all = "lowercase")] pub enum LogsOrder { @@ -277,6 +365,14 @@ pub struct ViewOptions { /// Limit for number of queries to render in queries views #[arg(long, default_value_t = 10000)] pub queries_limit: u64, + + /// Specified query_id for commands that support it + #[arg(long = "query-id", alias = "query_id", value_name = "QUERY_ID")] + pub query_id: Option, + + #[arg(long, short = 'o', value_name = "PATH", default_value = "./")] + /// Output path for CLI export + pub output: Option, // TODO: --mouse/--no-mouse (see EXIT_MOUSE_SEQUENCE in termion) } @@ -1413,4 +1509,32 @@ mod tests { time::Duration::from_millis(5000) ); } + + #[test] + fn test_perfetto_query_cli_options() { + let options = parse_from([ + "chdig", + "--query_id", + "query-123", + "--output", + "/tmp/query.pftrace", + "perfetto", + ]) + .unwrap(); + + let _cmd = options.perfetto_command().unwrap(); + assert_eq!(options.view.query_id.as_deref(), Some("query-123")); + assert_eq!(options.view.output.as_deref(), Some("/tmp/query.pftrace")); + } + + #[test] + fn test_perfetto_server_cli_options() { + let options = parse_from([ + "chdig", "--start", "10minute", "--end", "5minute", "perfetto", + ]) + .unwrap(); + + let _cmd = options.perfetto_command().unwrap(); + assert!(options.view.query_id.is_none()); + } } diff --git a/src/interpreter/worker.rs b/src/interpreter/worker.rs index 365bbcd..a50ae46 100644 --- a/src/interpreter/worker.rs +++ b/src/interpreter/worker.rs @@ -365,7 +365,7 @@ async fn render_or_share_flamegraph( use crate::interpreter::options::ChDigPerfettoConfig; -async fn fetch_and_populate_perfetto_trace( +pub(crate) async fn fetch_and_populate_perfetto_trace( clickhouse: &Arc, builder: &mut PerfettoTraceBuilder, cfg: &ChDigPerfettoConfig, @@ -490,7 +490,7 @@ async fn fetch_and_populate_perfetto_trace( } } -async fn fetch_server_perfetto_sources( +pub(crate) async fn fetch_server_perfetto_sources( clickhouse: &Arc, builder: &mut PerfettoTraceBuilder, cfg: &ChDigPerfettoConfig, @@ -1209,7 +1209,7 @@ async fn process_event(context: ContextArc, event: Event, need_clear: &mut bool) } Event::ServerPerfettoExport(start, end) => { let perfetto_cfg = context.lock().unwrap().options.perfetto.clone(); - let query_block = clickhouse.get_queries_for_perfetto(start, end).await?; + let query_block = clickhouse.get_queries_for_perfetto(start, end, &None).await?; let mut queries = Vec::new(); for i in 0..query_block.row_count() { match Query::from_clickhouse_block(&query_block, i, false) { diff --git a/src/view/navigation.rs b/src/view/navigation.rs index 587924a..9a43ea6 100644 --- a/src/view/navigation.rs +++ b/src/view/navigation.rs @@ -261,7 +261,7 @@ impl Navigation for Cursive { .lock() .unwrap() .options - .start_view + .start_view() .unwrap_or(ChDigViews::Queries); let provider = context @@ -1204,7 +1204,7 @@ impl Navigation for Cursive { context .current_view - .or(context.options.start_view) + .or(context.options.start_view()) .unwrap_or(ChDigViews::Queries) };