Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 107 additions & 1 deletion src/bin.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::{Result, anyhow};
use backtrace::Backtrace;
use chrono::{DateTime, Local};
use flexi_logger::{FileSpec, LogSpecification, Logger};
use std::ffi::OsString;
use std::panic::{self, PanicHookInfo};
Expand All @@ -8,7 +9,10 @@ use std::sync::Arc;
use cursive::view::Resizable;

use crate::{
interpreter::{ClickHouse, Context, ContextArc, options},
interpreter::{
ClickHouse, Context, ContextArc, Query, fetch_and_populate_perfetto_trace,
fetch_server_perfetto_sources, options, perfetto::PerfettoTraceBuilder,
},
view::Navigation,
};

Expand Down Expand Up @@ -39,6 +43,104 @@ fn panic_hook(info: &PanicHookInfo<'_>) {
);
}

fn write_perfetto_trace(path: &str, data: Vec<u8>) -> Result<()> {
std::fs::write(path, &data)?;
println!("Perfetto trace exported to {}", path);
Ok(())
}

async fn maybe_run_cli_perfetto_export(
options: &options::ChDigOptions,
clickhouse: &Arc<ClickHouse>,
) -> Result<bool> {
let Some(cmd) = options.perfetto_command() else {
return Ok(false);
};
Comment thread
UnamedRus marked this conversation as resolved.
Outdated

let output = cmd
.output
.clone()
.unwrap_or_else(|| "/tmp/chdig_perfetto.pftrace".to_string());
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this is not the default for cli option?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's "generic" option now, so hardcoding perfetto naming seems wrong.
Anyway, it set to ./ as default.

If it ./
for server level -> trace saved as ./server_perfetto_trace.pftrace
for query level ./query_id.pftrace


let mut perfetto_options = options.perfetto.clone();

perfetto_options.aggregated_zookeeper_log = true;
perfetto_options.query_metric_log = true;
perfetto_options.asynchronous_metric_log = true;

if let Some(query_id) = cmd.query_id.as_deref() {
let scope = clickhouse.get_perfetto_query_scope(query_id).await?;
let end_time = scope.end + chrono::TimeDelta::seconds(1);
let query_block = clickhouse
.get_queries_for_perfetto(scope.start, end_time)
.await?;
let query_ids = scope.query_ids;
let query_ids_set = std::collections::HashSet::<&String>::from_iter(query_ids.iter());
let mut queries = Vec::new();

for i in 0..query_block.row_count() {
match Query::from_clickhouse_block(&query_block, i, false) {
Ok(q) if query_ids_set.contains(&q.query_id) => queries.push(q),
Ok(_) => {}
Err(e) => log::warn!("Perfetto: failed to parse query row {}: {}", i, e),
}
}

let mut builder = PerfettoTraceBuilder::new(
perfetto_options.per_server,
perfetto_options.text_log_android,
);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be passed from command line

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about those two.

IMO, android style logs should be enabled/used by default.

Per server track split also make sense, compared to not have it.
(Actually, in order to use TracePacketDefaults, there should/will be much more detailed sequences/tracks)

builder.add_queries(&queries);
fetch_and_populate_perfetto_trace(
clickhouse,
&mut builder,
&perfetto_options,
Some(&query_ids),
scope.start,
end_time,
)
.await;
write_perfetto_trace(&output, builder.build())?;
return Ok(true);
}

if cmd.server {
let start: DateTime<Local> = cmd.start.clone().into();
let end: DateTime<Local> = cmd.end.clone().into();
let query_block = clickhouse.get_queries_for_perfetto(start, end).await?;
let mut queries = Vec::new();

for i in 0..query_block.row_count() {
match Query::from_clickhouse_block(&query_block, i, false) {
Ok(q) => queries.push(q),
Err(e) => log::warn!("Perfetto: failed to parse query row {}: {}", i, e),
}
}

let end_time = end + chrono::TimeDelta::seconds(1);
let mut builder = PerfettoTraceBuilder::new(
perfetto_options.per_server,
perfetto_options.text_log_android,
);
builder.add_queries(&queries);
fetch_and_populate_perfetto_trace(
clickhouse,
&mut builder,
&perfetto_options,
None,
start,
end_time,
)
.await;
fetch_server_perfetto_sources(clickhouse, &mut builder, &perfetto_options, start, end_time)
.await;
write_perfetto_trace(&output, builder.build())?;
return Ok(true);
}

Ok(false)
}

pub async fn chdig_main_async<I, T>(itr: I) -> Result<()>
where
I: IntoIterator<Item = T>,
Expand All @@ -61,6 +163,10 @@ where
// panic hook will clear the screen).
let clickhouse = Arc::new(ClickHouse::new(options.clickhouse.clone()).await?);

if maybe_run_cli_perfetto_export(&options, &clickhouse).await? {
return Ok(());
}

let server_warnings = match clickhouse.get_warnings().await {
Ok(w) => w,
Err(e) => {
Expand Down
49 changes: 49 additions & 0 deletions src/interpreter/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ pub struct TextLogArguments {
pub end: RelativeDateTime,
}

#[derive(Debug, Clone)]
pub struct PerfettoQueryScope {
pub query_ids: Vec<String>,
pub start: DateTime<Local>,
pub end: DateTime<Local>,
}

#[derive(Default)]
pub struct ClickHouseServerCPU {
pub count: u64,
Expand Down Expand Up @@ -1622,6 +1629,48 @@ impl ClickHouse {
.await;
}

pub async fn get_perfetto_query_scope(&self, query_id: &str) -> Result<PerfettoQueryScope> {
let query_log = self.get_table_name("system", "query_log");
let query_id_escaped = query_id.replace('\'', "''");
let block = self
.execute(
format!(
r#"
WITH '{query_id}' AS root_query_id
SELECT
groupUniqArray(query_id) AS query_ids,
min(query_start_time_microseconds) AS query_start_time_microseconds,
max(event_time_microseconds) AS query_end_time_microseconds
FROM {query_log}
WHERE
type != 'QueryStart'
AND (query_id = root_query_id OR initial_query_id = root_query_id)
"#,
query_id = query_id_escaped,
query_log = query_log,
)
.as_str(),
)
.await?;

if block.row_count() == 0 {
Comment thread
UnamedRus marked this conversation as resolved.
Outdated
return Err(Error::msg(format!(
"Query '{}' was not found in system.query_log",
query_id
)));
}

return Ok(PerfettoQueryScope {
query_ids: block.get(0, "query_ids")?,
start: block
.get::<DateTime<Tz>, _>(0, "query_start_time_microseconds")?
.with_timezone(&Local),
end: block
.get::<DateTime<Tz>, _>(0, "query_end_time_microseconds")?
.with_timezone(&Local),
});
}

pub async fn get_metric_log_for_perfetto(
&self,
start: DateTime<Local>,
Expand Down
1 change: 1 addition & 0 deletions src/interpreter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub use clickhouse_quirks::ClickHouseQuirks;
pub use context::Context;
pub use context::ContextArc;
pub use worker::Worker;
pub(crate) use worker::{fetch_and_populate_perfetto_trace, fetch_server_perfetto_sources};

pub type WorkerEvent = worker::Event;
pub type Query = query::Query;
Expand Down
146 changes: 145 additions & 1 deletion src/interpreter/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,87 @@ pub enum ChDigViews {
Client,
}

#[derive(Args, Debug, Clone)]
pub struct PerfettoCommand {
#[arg(long, action = ArgAction::SetTrue, conflicts_with = "query_id")]
/// Export server-wide Perfetto trace for the specified time window
pub server: bool,
#[arg(
long = "query-id",
Comment thread
UnamedRus marked this conversation as resolved.
Outdated
alias = "query_id",
value_name = "QUERY_ID",
conflicts_with = "server"
)]
/// Export query-scoped Perfetto trace for the specified query_id
pub query_id: Option<String>,
#[arg(long, value_name = "PATH")]
/// Output path for CLI Perfetto export
pub output: Option<String>,
#[arg(long, short('b'), default_value = "1hour")]
/// Begin of the time interval to look at (used with --server)
pub start: RelativeDateTime,
#[arg(long, short('e'), default_value = "")]
/// End of the time interval (used with --server)
pub end: RelativeDateTime,
Comment thread
UnamedRus marked this conversation as resolved.
Outdated
}

#[derive(Debug, Clone, Subcommand)]
pub enum ChDigCommand {
Queries,
LastQueries,
SlowQueries,
Merges,
S3Queue,
AzureQueue,
Mutations,
ReplicationQueue,
ReplicatedFetches,
Replicas,
Tables,
Errors,
Backups,
Dictionaries,
ServerLogs,
Loggers,
BackgroundSchedulePool,
BackgroundSchedulePoolLog,
TableParts,
AsynchronousInserts,
PartLog,
Client,
Perfetto(PerfettoCommand),
}

impl ChDigCommand {
pub fn as_view(&self) -> Option<ChDigViews> {
match self {
ChDigCommand::Queries => Some(ChDigViews::Queries),
ChDigCommand::LastQueries => Some(ChDigViews::LastQueries),
ChDigCommand::SlowQueries => Some(ChDigViews::SlowQueries),
ChDigCommand::Merges => Some(ChDigViews::Merges),
ChDigCommand::S3Queue => Some(ChDigViews::S3Queue),
ChDigCommand::AzureQueue => Some(ChDigViews::AzureQueue),
ChDigCommand::Mutations => Some(ChDigViews::Mutations),
ChDigCommand::ReplicationQueue => Some(ChDigViews::ReplicationQueue),
ChDigCommand::ReplicatedFetches => Some(ChDigViews::ReplicatedFetches),
ChDigCommand::Replicas => Some(ChDigViews::Replicas),
ChDigCommand::Tables => Some(ChDigViews::Tables),
ChDigCommand::Errors => Some(ChDigViews::Errors),
ChDigCommand::Backups => Some(ChDigViews::Backups),
ChDigCommand::Dictionaries => Some(ChDigViews::Dictionaries),
ChDigCommand::ServerLogs => Some(ChDigViews::ServerLogs),
ChDigCommand::Loggers => Some(ChDigViews::Loggers),
ChDigCommand::BackgroundSchedulePool => Some(ChDigViews::BackgroundSchedulePool),
ChDigCommand::BackgroundSchedulePoolLog => Some(ChDigViews::BackgroundSchedulePoolLog),
ChDigCommand::TableParts => Some(ChDigViews::TableParts),
ChDigCommand::AsynchronousInserts => Some(ChDigViews::AsynchronousInserts),
ChDigCommand::PartLog => Some(ChDigViews::PartLog),
ChDigCommand::Client => Some(ChDigViews::Client),
ChDigCommand::Perfetto(_) => None,
}
}
}

#[derive(Parser, Clone)]
#[command(name = "chdig")]
#[command(author, version, about, long_about = None)]
Expand All @@ -155,13 +236,25 @@ pub struct ChDigOptions {
#[command(flatten)]
pub view: ViewOptions,
#[command(subcommand)]
pub start_view: Option<ChDigViews>,
pub command: Option<ChDigCommand>,
#[command(flatten)]
pub service: ServiceOptions,
#[clap(skip)]
pub perfetto: ChDigPerfettoConfig,
}

impl ChDigOptions {
pub fn start_view(&self) -> Option<ChDigViews> {
self.command.as_ref().and_then(ChDigCommand::as_view)
}

pub fn perfetto_command(&self) -> Option<&PerfettoCommand> {
match &self.command {
Some(ChDigCommand::Perfetto(cmd)) => Some(cmd),
_ => None,
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, ValueEnum, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum LogsOrder {
Expand Down Expand Up @@ -979,6 +1072,15 @@ where

adjust_defaults(&mut options)?;

if let Some(cmd) = options.perfetto_command()
&& cmd.query_id.is_none()
&& !cmd.server
{
return Err(anyhow!(
"perfetto command requires --query-id/--query_id or --server"
));
}

return Ok(options);
}

Expand Down Expand Up @@ -1413,4 +1515,46 @@ mod tests {
time::Duration::from_millis(5000)
);
}

#[test]
fn test_perfetto_query_cli_options() {
let options = parse_from([
"chdig",
"perfetto",
"--query_id",
"query-123",
"--output",
"/tmp/query.pftrace",
])
.unwrap();

let cmd = options.perfetto_command().unwrap();
assert_eq!(cmd.query_id.as_deref(), Some("query-123"));
assert!(!cmd.server);
assert_eq!(cmd.output.as_deref(), Some("/tmp/query.pftrace"));
}

#[test]
fn test_perfetto_server_cli_options() {
let options = parse_from([
"chdig", "perfetto", "--server", "--start", "10minute", "--end", "5minute",
])
.unwrap();

let cmd = options.perfetto_command().unwrap();
assert!(cmd.server);
assert!(cmd.query_id.is_none());
}

#[test]
fn test_perfetto_output_requires_export_mode() {
let err = match parse_from(["chdig", "perfetto", "--output", "/tmp/out.pftrace"]) {
Ok(_) => panic!("expected parse_from() to fail"),
Err(err) => err,
};
assert_eq!(
err.to_string(),
"perfetto command requires --query-id/--query_id or --server"
);
}
}
Loading
Loading