Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 109 additions & 1 deletion src/bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ use std::sync::Arc;

use cursive::view::Resizable;

use crate::interpreter::clickhouse::PerfettoQueryScope;
use crate::{
interpreter::{ClickHouse, Context, ContextArc, options},
interpreter::{
ClickHouse, Context, ContextArc, Query, fetch_and_populate_perfetto_trace,
fetch_server_perfetto_sources, options, perfetto::PerfettoTraceBuilder,
},
view::Navigation,
};

Expand Down Expand Up @@ -39,6 +43,105 @@ fn panic_hook(info: &PanicHookInfo<'_>) {
);
}

fn write_perfetto_trace(path: &str, data: Vec<u8>) -> Result<()> {
std::fs::write(path, &data)?;
println!("Perfetto trace exported to {}", path);
Ok(())
}

async fn run_cli_perfetto_export(
options: &options::ChDigOptions,
clickhouse: &Arc<ClickHouse>,
) -> Result<bool> {
let mut perfetto_options = options.perfetto.clone();

match options.perfetto_command() {
Some(cmd) => {
perfetto_options.aggregated_zookeeper_log = cmd.aggregated_zookeeper_log;
perfetto_options.query_metric_log = cmd.query_metric_log;
perfetto_options.asynchronous_metric_log = cmd.asynchronous_metric_log;
perfetto_options.text_log_android = cmd.text_log;
perfetto_options.text_log = cmd.text_log;
perfetto_options.per_server = cmd.track_per_server;
if cmd.all {
perfetto_options.aggregated_zookeeper_log = true;
perfetto_options.query_metric_log = true;
perfetto_options.asynchronous_metric_log = true;
perfetto_options.text_log_android = true;
}
}
None => {}
};

let mut scope: PerfettoQueryScope = PerfettoQueryScope {
start: options.view.start.clone().into(),
end: options.view.end.clone().into(),
query_ids: None,
};
let mut is_server_scope = false;

match &options.view.query_id {
Some(query_id) => {
scope = clickhouse.get_perfetto_query_scope(&query_id).await?;
}
None => {
is_server_scope = true;
}
}

scope.end = scope.end + chrono::TimeDelta::seconds(1);

let query_block: clickhouse_rs::Block<clickhouse_rs::types::Complex> = clickhouse
.get_queries_for_perfetto(scope.start, scope.end, &scope.query_ids)
.await?;
let mut queries = Vec::new();
for i in 0..query_block.row_count() {
match Query::from_clickhouse_block(&query_block, i, false) {
Ok(q) => queries.push(q),
Err(e) => log::warn!("Perfetto: failed to parse query row {}: {}", i, e),
}
}

let mut builder = PerfettoTraceBuilder::new(
perfetto_options.per_server,
perfetto_options.text_log_android,
);
builder.add_queries(&queries);
fetch_and_populate_perfetto_trace(
clickhouse,
&mut builder,
&perfetto_options,
scope.query_ids.as_ref().map(|v| v.as_slice()),
scope.start,
scope.end,
)
.await;

if is_server_scope {
fetch_server_perfetto_sources(
clickhouse,
&mut builder,
&perfetto_options,
scope.start,
scope.end,
)
.await;
}

let mut output = options.view.output.clone().unwrap();

if output == "./" {
if is_server_scope {
output += "server_perfetto_trace.pftrace"
} else {
output += &format!("{}.pftrace", options.view.query_id.as_ref().unwrap());
}
}
write_perfetto_trace(&output, builder.build())?;

Ok(true)
}

pub async fn chdig_main_async<I, T>(itr: I) -> Result<()>
where
I: IntoIterator<Item = T>,
Expand All @@ -61,6 +164,11 @@ where
// panic hook will clear the screen).
let clickhouse = Arc::new(ClickHouse::new(options.clickhouse.clone()).await?);

if options.perfetto_command().is_some() {
run_cli_perfetto_export(&options, &clickhouse).await?;
return Ok(());
}

let server_warnings = match clickhouse.get_warnings().await {
Ok(w) => w,
Err(e) => {
Expand Down
49 changes: 49 additions & 0 deletions src/interpreter/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ pub struct TextLogArguments {
pub end: RelativeDateTime,
}

#[derive(Debug, Clone)]
pub struct PerfettoQueryScope {
pub query_ids: Option<Vec<String>>,
pub start: DateTime<Local>,
pub end: DateTime<Local>,
}

#[derive(Default)]
pub struct ClickHouseServerCPU {
pub count: u64,
Expand Down Expand Up @@ -1571,6 +1578,7 @@ impl ClickHouse {
&self,
start: DateTime<Local>,
end: DateTime<Local>,
query_ids: &Option<Vec<String>>
) -> Result<Columns> {
let dbtable = self.get_log_table_name("system", "query_log");
return self
Expand Down Expand Up @@ -1602,6 +1610,7 @@ impl ClickHouse {
WHERE type != 'QueryStart'
AND event_date >= toDate(start_) AND event_time >= toDateTime(start_)
AND event_date <= toDate(end_) AND event_time <= toDateTime(end_)
{query_ids}
"#,
start = start
.timestamp_nanos_opt()
Expand All @@ -1616,12 +1625,52 @@ impl ClickHouse {
} else {
"length(thread_ids)"
},
query_ids = if let Some(query_id) = query_ids {
format!("AND query_id IN ('{}')", query_id.join("','"))
} else {
String::new()
},
)
.as_str(),
)
.await;
}

pub async fn get_perfetto_query_scope(&self, query_id: &str) -> Result<PerfettoQueryScope> {
let query_log = self.get_table_name("system", "query_log");
let query_id_escaped = query_id.replace('\'', "''");
let block = self
.execute(
format!(
r#"
WITH '{query_id}' AS root_query_id
SELECT
groupUniqArray(query_id) AS query_ids,
min(query_start_time_microseconds) AS query_start_time_microseconds,
max(event_time_microseconds) AS query_end_time_microseconds
FROM {query_log}
WHERE
type != 'QueryStart'
AND (query_id = root_query_id OR initial_query_id = root_query_id)
"#,
query_id = query_id_escaped,
query_log = query_log,
)
.as_str(),
)
.await?;

return Ok(PerfettoQueryScope {
query_ids: Some(block.get::<Vec<String>, _>(0, "query_ids")?),
start: block
.get::<DateTime<Tz>, _>(0, "query_start_time_microseconds")?
.with_timezone(&Local),
end: block
.get::<DateTime<Tz>, _>(0, "query_end_time_microseconds")?
.with_timezone(&Local),
});
}

pub async fn get_metric_log_for_perfetto(
&self,
start: DateTime<Local>,
Expand Down
1 change: 1 addition & 0 deletions src/interpreter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub use clickhouse_quirks::ClickHouseQuirks;
pub use context::Context;
pub use context::ContextArc;
pub use worker::Worker;
pub(crate) use worker::{fetch_and_populate_perfetto_trace, fetch_server_perfetto_sources};

pub type WorkerEvent = worker::Event;
pub type Query = query::Query;
Expand Down
126 changes: 125 additions & 1 deletion src/interpreter/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,81 @@ pub enum ChDigViews {
Client,
}

#[derive(Args, Debug, Clone)]
pub struct PerfettoCommand {
#[arg(long, action = ArgAction::SetTrue)]
pub aggregated_zookeeper_log: bool,
#[arg(long, action = ArgAction::SetTrue)]
pub query_metric_log: bool,
#[arg(long, action = ArgAction::SetTrue)]
pub asynchronous_metric_log: bool,
#[arg(long, default_value = "true")]
pub text_log: bool,

#[arg(long, action = ArgAction::SetTrue)]
pub all: bool,

#[arg(long, default_value = "true")]
pub track_per_server: bool,
}

#[derive(Debug, Clone, Subcommand)]
pub enum ChDigCommand {
Queries,
LastQueries,
SlowQueries,
Merges,
S3Queue,
AzureQueue,
Mutations,
ReplicationQueue,
ReplicatedFetches,
Replicas,
Tables,
Errors,
Backups,
Dictionaries,
ServerLogs,
Loggers,
BackgroundSchedulePool,
BackgroundSchedulePoolLog,
TableParts,
AsynchronousInserts,
PartLog,
Client,
Perfetto(PerfettoCommand),
}

impl ChDigCommand {
pub fn as_view(&self) -> Option<ChDigViews> {
match self {
ChDigCommand::Queries => Some(ChDigViews::Queries),
ChDigCommand::LastQueries => Some(ChDigViews::LastQueries),
ChDigCommand::SlowQueries => Some(ChDigViews::SlowQueries),
ChDigCommand::Merges => Some(ChDigViews::Merges),
ChDigCommand::S3Queue => Some(ChDigViews::S3Queue),
ChDigCommand::AzureQueue => Some(ChDigViews::AzureQueue),
ChDigCommand::Mutations => Some(ChDigViews::Mutations),
ChDigCommand::ReplicationQueue => Some(ChDigViews::ReplicationQueue),
ChDigCommand::ReplicatedFetches => Some(ChDigViews::ReplicatedFetches),
ChDigCommand::Replicas => Some(ChDigViews::Replicas),
ChDigCommand::Tables => Some(ChDigViews::Tables),
ChDigCommand::Errors => Some(ChDigViews::Errors),
ChDigCommand::Backups => Some(ChDigViews::Backups),
ChDigCommand::Dictionaries => Some(ChDigViews::Dictionaries),
ChDigCommand::ServerLogs => Some(ChDigViews::ServerLogs),
ChDigCommand::Loggers => Some(ChDigViews::Loggers),
ChDigCommand::BackgroundSchedulePool => Some(ChDigViews::BackgroundSchedulePool),
ChDigCommand::BackgroundSchedulePoolLog => Some(ChDigViews::BackgroundSchedulePoolLog),
ChDigCommand::TableParts => Some(ChDigViews::TableParts),
ChDigCommand::AsynchronousInserts => Some(ChDigViews::AsynchronousInserts),
ChDigCommand::PartLog => Some(ChDigViews::PartLog),
ChDigCommand::Client => Some(ChDigViews::Client),
ChDigCommand::Perfetto(_) => None,
}
}
}

#[derive(Parser, Clone)]
#[command(name = "chdig")]
#[command(author, version, about, long_about = None)]
Expand All @@ -155,13 +230,26 @@ pub struct ChDigOptions {
#[command(flatten)]
pub view: ViewOptions,
#[command(subcommand)]
pub start_view: Option<ChDigViews>,
pub command: Option<ChDigCommand>,
#[command(flatten)]
pub service: ServiceOptions,
#[clap(skip)]
pub perfetto: ChDigPerfettoConfig,
}

impl ChDigOptions {
pub fn start_view(&self) -> Option<ChDigViews> {
self.command.as_ref().and_then(ChDigCommand::as_view)
}

pub fn perfetto_command(&self) -> Option<&PerfettoCommand> {
match &self.command {
Some(ChDigCommand::Perfetto(cmd)) => Some(cmd),
_ => None,
}
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, ValueEnum, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum LogsOrder {
Expand Down Expand Up @@ -277,6 +365,14 @@ pub struct ViewOptions {
/// Limit for number of queries to render in queries views
#[arg(long, default_value_t = 10000)]
pub queries_limit: u64,

/// Specified query_id for commands that support it
#[arg(long = "query-id", alias = "query_id", value_name = "QUERY_ID")]
pub query_id: Option<String>,

#[arg(long, short = 'o', value_name = "PATH", default_value = "./")]
/// Output path for CLI export
pub output: Option<String>,
// TODO: --mouse/--no-mouse (see EXIT_MOUSE_SEQUENCE in termion)
}

Expand Down Expand Up @@ -1413,4 +1509,32 @@ mod tests {
time::Duration::from_millis(5000)
);
}

#[test]
fn test_perfetto_query_cli_options() {
let options = parse_from([
"chdig",
"--query_id",
"query-123",
"--output",
"/tmp/query.pftrace",
"perfetto",
])
.unwrap();

let _cmd = options.perfetto_command().unwrap();
assert_eq!(options.view.query_id.as_deref(), Some("query-123"));
assert_eq!(options.view.output.as_deref(), Some("/tmp/query.pftrace"));
}

#[test]
fn test_perfetto_server_cli_options() {
let options = parse_from([
"chdig", "--start", "10minute", "--end", "5minute", "perfetto",
])
.unwrap();

let _cmd = options.perfetto_command().unwrap();
assert!(options.view.query_id.is_none());
}
}
Loading
Loading