Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 105 additions & 1 deletion src/bin.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::{Result, anyhow};
use backtrace::Backtrace;
use chrono::{DateTime, Local};
use flexi_logger::{FileSpec, LogSpecification, Logger};
use std::ffi::OsString;
use std::panic::{self, PanicHookInfo};
Expand All @@ -8,7 +9,10 @@ use std::sync::Arc;
use cursive::view::Resizable;

use crate::{
interpreter::{ClickHouse, Context, ContextArc, options},
interpreter::{
ClickHouse, Context, ContextArc, Query, fetch_and_populate_perfetto_trace,
fetch_server_perfetto_sources, options, perfetto::PerfettoTraceBuilder,
},
view::Navigation,
};

Expand Down Expand Up @@ -39,6 +43,101 @@ fn panic_hook(info: &PanicHookInfo<'_>) {
);
}

fn write_perfetto_trace(path: &str, data: Vec<u8>) -> Result<()> {
std::fs::write(path, &data)?;
println!("Perfetto trace exported to {}", path);
Ok(())
}

async fn run_cli_perfetto_export(
options: &options::ChDigOptions,
clickhouse: &Arc<ClickHouse>,
) -> Result<bool> {

let output = options
.view
.output
.clone()
.unwrap_or_else(|| "/tmp/chdig_perfetto.pftrace".to_string());
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this is not the default for cli option?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's "generic" option now, so hardcoding perfetto naming seems wrong.
Anyway, it set to ./ as default.

If it ./
for server level -> trace saved as ./server_perfetto_trace.pftrace
for query level ./query_id.pftrace


let mut perfetto_options = options.perfetto.clone();

perfetto_options.aggregated_zookeeper_log = true;
perfetto_options.query_metric_log = true;
perfetto_options.asynchronous_metric_log = true;

if let Some(query_id) = options.view.query_id.as_deref() {
let scope = clickhouse.get_perfetto_query_scope(query_id).await?;
let start = scope.start;
let end = scope.end;

let end_time = end + chrono::TimeDelta::seconds(1);
let query_block = clickhouse.get_queries_for_perfetto(start, end_time, &Some(scope.query_ids.clone())).await?;
let mut queries = Vec::new();

for i in 0..query_block.row_count() {
match Query::from_clickhouse_block(&query_block, i, false) {
Ok(q) => queries.push(q),
Err(e) => log::warn!("Perfetto: failed to parse query row {}: {}", i, e),
}
}

let mut builder = PerfettoTraceBuilder::new(
perfetto_options.per_server,
perfetto_options.text_log_android,
);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be passed from command line

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about those two.

IMO, android style logs should be enabled/used by default.

Per server track split also make sense, compared to not have it.
(Actually, in order to use TracePacketDefaults, there should/will be much more detailed sequences/tracks)

builder.add_queries(&queries);
fetch_and_populate_perfetto_trace(
clickhouse,
&mut builder,
&perfetto_options,
Some(&scope.query_ids),
scope.start,
end_time,
)
.await;
write_perfetto_trace(&output, builder.build())?;
return Ok(true);
}

if options.view.server {
let start: DateTime<Local> = options.view.start.clone().into();
let end: DateTime<Local> = options.view.end.clone().into();

let end_time = end + chrono::TimeDelta::seconds(1);
let query_block = clickhouse.get_queries_for_perfetto(start, end_time, &None).await?;
let mut queries = Vec::new();

for i in 0..query_block.row_count() {
match Query::from_clickhouse_block(&query_block, i, false) {
Ok(q) => queries.push(q),
Err(e) => log::warn!("Perfetto: failed to parse query row {}: {}", i, e),
}
}

let mut builder = PerfettoTraceBuilder::new(
perfetto_options.per_server,
perfetto_options.text_log_android,
);
builder.add_queries(&queries);
fetch_and_populate_perfetto_trace(
clickhouse,
&mut builder,
&perfetto_options,
None,
start,
end_time,
)
.await;
fetch_server_perfetto_sources(clickhouse, &mut builder, &perfetto_options, start, end_time)
.await;
write_perfetto_trace(&output, builder.build())?;
return Ok(true);
}

Ok(false)
}

pub async fn chdig_main_async<I, T>(itr: I) -> Result<()>
where
I: IntoIterator<Item = T>,
Expand All @@ -61,6 +160,11 @@ where
// panic hook will clear the screen).
let clickhouse = Arc::new(ClickHouse::new(options.clickhouse.clone()).await?);

if options.perfetto_command().is_some() {
run_cli_perfetto_export(&options, &clickhouse).await?;
return Ok(());
}

let server_warnings = match clickhouse.get_warnings().await {
Ok(w) => w,
Err(e) => {
Expand Down
56 changes: 56 additions & 0 deletions src/interpreter/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ pub struct TextLogArguments {
pub end: RelativeDateTime,
}

#[derive(Debug, Clone)]
pub struct PerfettoQueryScope {
pub query_ids: Vec<String>,
pub start: DateTime<Local>,
pub end: DateTime<Local>,
}

#[derive(Default)]
pub struct ClickHouseServerCPU {
pub count: u64,
Expand Down Expand Up @@ -1571,6 +1578,7 @@ impl ClickHouse {
&self,
start: DateTime<Local>,
end: DateTime<Local>,
query_ids: &Option<Vec<String>>
) -> Result<Columns> {
let dbtable = self.get_log_table_name("system", "query_log");
return self
Expand Down Expand Up @@ -1602,6 +1610,7 @@ impl ClickHouse {
WHERE type != 'QueryStart'
AND event_date >= toDate(start_) AND event_time >= toDateTime(start_)
AND event_date <= toDate(end_) AND event_time <= toDateTime(end_)
{query_ids}
"#,
start = start
.timestamp_nanos_opt()
Expand All @@ -1616,12 +1625,59 @@ impl ClickHouse {
} else {
"length(thread_ids)"
},
query_ids = if let Some(query_id) = query_ids {
format!("AND query_id IN ('{}')", query_id.join("','"))
} else {
String::new()
},
)
.as_str(),
)
.await;
}

pub async fn get_perfetto_query_scope(&self, query_id: &str) -> Result<PerfettoQueryScope> {
let query_log = self.get_table_name("system", "query_log");
let query_id_escaped = query_id.replace('\'', "''");
let block = self
.execute(
format!(
r#"
WITH '{query_id}' AS root_query_id
SELECT
groupUniqArray(query_id) AS query_ids,
min(query_start_time_microseconds) AS query_start_time_microseconds,
max(event_time_microseconds) AS query_end_time_microseconds
FROM {query_log}
WHERE
type != 'QueryStart'
AND (query_id = root_query_id OR initial_query_id = root_query_id)
"#,
query_id = query_id_escaped,
query_log = query_log,
)
.as_str(),
)
.await?;

if block.row_count() == 0 {
Comment thread
UnamedRus marked this conversation as resolved.
Outdated
return Err(Error::msg(format!(
"Query '{}' was not found in system.query_log",
query_id
)));
}

return Ok(PerfettoQueryScope {
query_ids: block.get(0, "query_ids")?,
start: block
.get::<DateTime<Tz>, _>(0, "query_start_time_microseconds")?
.with_timezone(&Local),
end: block
.get::<DateTime<Tz>, _>(0, "query_end_time_microseconds")?
.with_timezone(&Local),
});
}

pub async fn get_metric_log_for_perfetto(
&self,
start: DateTime<Local>,
Expand Down
1 change: 1 addition & 0 deletions src/interpreter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub use clickhouse_quirks::ClickHouseQuirks;
pub use context::Context;
pub use context::ContextArc;
pub use worker::Worker;
pub(crate) use worker::{fetch_and_populate_perfetto_trace, fetch_server_perfetto_sources};

pub type WorkerEvent = worker::Event;
pub type Query = query::Query;
Expand Down
Loading
Loading