Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 49 additions & 18 deletions clickhouse-admin/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,15 @@ async fn get_retention_policy(
//
// This is a WAG, but the computations we do today to report table usage are
// pretty inexpensive.
//
// NOTE: We explicitly use a smaller value during testing. This is to avoid
// manipulating time with Tokio's test utils. That cannot work, because we run
// every query to ClickHouse with a timeout, and that causes the tokio timers to
// "auto-advance" to the end of that timeout when we pause in a test.
#[cfg(not(test))]
const USAGE_UPDATE_INTERVAL: Duration = Duration::from_mins(2);
#[cfg(test)]
const USAGE_UPDATE_INTERVAL: Duration = Duration::from_secs(1);

async fn long_running_usage_task(
tx: watch::Sender<DatabaseUsageResult>,
Expand Down Expand Up @@ -953,34 +961,53 @@ mod tests {
.is_empty()
);

// Jump forward until we actually do compute the usage again.
tokio::time::pause();
let now = tokio::time::Instant::now();
while now.elapsed() < 2 * USAGE_UPDATE_INTERVAL {
tokio::time::advance(std::time::Duration::from_millis(10)).await;
}
tokio::time::resume();
let usage = context.database_usage();
// Wait until we actually do compute the usage again.
let usage = dev::poll::wait_for_condition(
|| async {
let usage = context.database_usage();
match &usage.last_success {
Some(success) => {
if success.tables.is_empty() {
Err(dev::poll::CondCheckError::<()>::NotYet)
} else {
Ok(usage)
}
}
None => Err(dev::poll::CondCheckError::<()>::NotYet),
}
},
&std::time::Duration::from_millis(100),
&(2 * USAGE_UPDATE_INTERVAL),
)
.await
.unwrap();
println!("{usage:#?}");
let tables = &usage
.last_success
.as_ref()
.expect("Should have computed something")
.tables;
tables.contains_key(&String::from("oximeter.measurements_u64"));
tables.contains_key(&String::from("oximeter.measurements_u64"));
tables.contains_key(&String::from("oximeter.measurements_f64"));
let version = tables.get(&String::from("oximeter.version")).unwrap();
assert_eq!(version.n_rows, 1);

// Kill the database, and force another collection.
// Kill the database, and wait for another collection. This one should
// fail.
clickhouse.cleanup().await.unwrap();
tokio::time::pause();
let now = tokio::time::Instant::now();
while now.elapsed() < 2 * USAGE_UPDATE_INTERVAL {
tokio::time::advance(std::time::Duration::from_millis(10)).await;
}
tokio::time::resume();
let usage = context.database_usage();
let usage = dev::poll::wait_for_condition(
|| async {
let usage = context.database_usage();
match &usage.last_error {
Some(_) => Ok(usage),
None => Err(dev::poll::CondCheckError::<()>::NotYet),
}
},
&std::time::Duration::from_millis(100),
&(2 * USAGE_UPDATE_INTERVAL),
)
.await
.unwrap();
println!("{usage:#?}");
assert!(
usage.last_success.is_some(),
Expand All @@ -989,7 +1016,11 @@ mod tests {
let Some(err) = usage.last_error.as_ref() else {
panic!("expected an error to have occurred, but found None");
};
assert!(err.error.starts_with("Failed to check out"));
let is_network_err = |msg: &str| -> bool {
msg.starts_with("Failed to check out")
|| msg.contains("TCP connection to server")
};
assert!(is_network_err(&err.error), "Expected a network error error");

logctx.cleanup_successful();
}
Expand Down
Loading