diff --git a/clickhouse-admin/api/src/lib.rs b/clickhouse-admin/api/src/lib.rs index d985da71dae..81589fe2c7f 100644 --- a/clickhouse-admin/api/src/lib.rs +++ b/clickhouse-admin/api/src/lib.rs @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -use clickhouse_admin_types_versions::latest; +use clickhouse_admin_types_versions::{latest, v2}; use dropshot::{ HttpError, HttpResponseCreated, HttpResponseOk, HttpResponseUpdatedNoContent, Path, Query, RequestContext, TypedBody, @@ -27,6 +27,7 @@ api_versions!([ // | example for the next person. // v // (next_int, IDENT), // NOTE: read the note at the start of this macro! + (3, ADD_RETENTION_POLICY_FOR_ALL_TABLES), (2, ADD_RETENTION_POLICY_AND_TABLE_USAGE), (1, INITIAL), ]); @@ -206,22 +207,62 @@ pub trait ClickhouseAdminServerApi { #[endpoint { method = PUT, path = "/retention-policy", - versions = VERSION_ADD_RETENTION_POLICY_AND_TABLE_USAGE.., + versions = VERSION_ADD_RETENTION_POLICY_FOR_ALL_TABLES.., }] async fn set_retention_policy( rqctx: RequestContext, - policy: TypedBody, + policy: TypedBody, ) -> Result; + /// Set the retention policy for timeseries data. + #[endpoint { + method = PUT, + path = "/retention-policy", + versions = + VERSION_ADD_RETENTION_POLICY_AND_TABLE_USAGE..VERSION_ADD_RETENTION_POLICY_FOR_ALL_TABLES, + }] + async fn set_retention_policy_v2( + rqctx: RequestContext, + policy: TypedBody, + ) -> Result { + Self::set_retention_policy(rqctx, policy.map(Into::into)).await + } + /// Get the retention policy for timeseries data from the database #[endpoint { method = GET, path = "/retention-policy", - versions = VERSION_ADD_RETENTION_POLICY_AND_TABLE_USAGE.., + versions = VERSION_ADD_RETENTION_POLICY_FOR_ALL_TABLES.., }] async fn retention_policy( rqctx: RequestContext, - ) -> Result, HttpError>; + ) -> Result< + HttpResponseOk, + HttpError, + >; + + /// Get the retention policy for timeseries data from the database + #[endpoint { + method = GET, + path = "/retention-policy", + versions = + VERSION_ADD_RETENTION_POLICY_AND_TABLE_USAGE..VERSION_ADD_RETENTION_POLICY_FOR_ALL_TABLES, + }] + async fn retention_policy_v2( + rqctx: RequestContext, + ) -> Result, HttpError> { + let HttpResponseOk(mut policy) = Self::retention_policy(rqctx).await?; + policy + .tables + .pop_first() + .ok_or_else(|| { + HttpError::for_unavail( + None, + "Database is not yet populated".to_string(), + ) + }) + .map(|pol| HttpResponseOk(pol.into())) + } /// Return the resource usage of database tables. #[endpoint { @@ -272,22 +313,62 @@ pub trait ClickhouseAdminSingleApi { #[endpoint { method = PUT, path = "/retention-policy", - versions = VERSION_ADD_RETENTION_POLICY_AND_TABLE_USAGE.., + versions = VERSION_ADD_RETENTION_POLICY_FOR_ALL_TABLES.., }] async fn set_retention_policy( rqctx: RequestContext, - policy: TypedBody, + policy: TypedBody, ) -> Result; + /// Set the retention policy for timeseries data. + #[endpoint { + method = PUT, + path = "/retention-policy", + versions = + VERSION_ADD_RETENTION_POLICY_AND_TABLE_USAGE..VERSION_ADD_RETENTION_POLICY_FOR_ALL_TABLES, + }] + async fn set_retention_policy_v2( + rqctx: RequestContext, + policy: TypedBody, + ) -> Result { + Self::set_retention_policy(rqctx, policy.map(Into::into)).await + } + /// Get the retention policy for timeseries data from the database #[endpoint { method = GET, path = "/retention-policy", - versions = VERSION_ADD_RETENTION_POLICY_AND_TABLE_USAGE.., + versions = VERSION_ADD_RETENTION_POLICY_FOR_ALL_TABLES.., }] async fn retention_policy( rqctx: RequestContext, - ) -> Result, HttpError>; + ) -> Result< + HttpResponseOk, + HttpError, + >; + + /// Get the retention policy for timeseries data from the database + #[endpoint { + method = GET, + path = "/retention-policy", + versions = + VERSION_ADD_RETENTION_POLICY_AND_TABLE_USAGE..VERSION_ADD_RETENTION_POLICY_FOR_ALL_TABLES, + }] + async fn retention_policy_v2( + rqctx: RequestContext, + ) -> Result, HttpError> { + let HttpResponseOk(mut policy) = Self::retention_policy(rqctx).await?; + policy + .tables + .pop_first() + .ok_or_else(|| { + HttpError::for_unavail( + None, + "Database is not yet populated".to_string(), + ) + }) + .map(|pol| HttpResponseOk(pol.into())) + } /// Return the resource usage of database tables. #[endpoint { diff --git a/clickhouse-admin/src/context.rs b/clickhouse-admin/src/context.rs index c55bd25dea0..8808965f8b3 100644 --- a/clickhouse-admin/src/context.rs +++ b/clickhouse-admin/src/context.rs @@ -9,7 +9,9 @@ use camino::Utf8PathBuf; use chrono::Utc; use clickhouse_admin_types::config::GenerateConfigResult; use clickhouse_admin_types::keeper::KeeperConfigurableSettings; -use clickhouse_admin_types::retention::RetentionPolicy; +use clickhouse_admin_types::retention::{ + DatabaseRetentionPolicy, RetentionPolicyRequest, +}; use clickhouse_admin_types::server::ServerConfigurableSettings; use clickhouse_admin_types::usage::{ DatabaseUsage, DatabaseUsageError, DatabaseUsageResult, @@ -273,7 +275,7 @@ impl ServerContext { /// Update the retention policy of the oximeter database tables. pub async fn set_retention_policy( &self, - policy: RetentionPolicy, + policy: RetentionPolicyRequest, ) -> Result<(), HttpError> { let (tx, rx) = oneshot::channel(); self.retention_tx @@ -294,7 +296,9 @@ impl ServerContext { } /// Request the retention policy from the database. - pub async fn retention_policy(&self) -> Result { + pub async fn retention_policy( + &self, + ) -> Result { let (tx, rx) = oneshot::channel(); self.retention_tx.try_send(RetentionRequest::Get { tx }).map_err( |_| { @@ -594,8 +598,13 @@ fn read_generation_from_file(path: Utf8PathBuf) -> Result> { #[derive(Debug)] enum RetentionRequest { - Set { policy: RetentionPolicy, tx: oneshot::Sender> }, - Get { tx: oneshot::Sender> }, + Set { + policy: RetentionPolicyRequest, + tx: oneshot::Sender>, + }, + Get { + tx: oneshot::Sender>, + }, } async fn long_running_retention_task( @@ -629,7 +638,7 @@ async fn long_running_retention_task( async fn set_retention_policy( log: &Logger, client: &OximeterDbClient, - policy: RetentionPolicy, + policy: RetentionPolicyRequest, replicated: bool, tx: oneshot::Sender>, ) { @@ -667,7 +676,7 @@ async fn set_retention_policy( async fn get_retention_policy( log: &Logger, client: &OximeterDbClient, - tx: oneshot::Sender>, + tx: oneshot::Sender>, ) { debug!(log, "fetching retention policy from database"); let result = match client.retention_policy().await { @@ -768,7 +777,7 @@ mod tests { use camino::Utf8PathBuf; use clickhouse_admin_types::CLICKHOUSE_SERVER_CONFIG_FILE; use clickhouse_admin_types::retention::Days; - use clickhouse_admin_types::retention::RetentionPolicy; + use clickhouse_admin_types::retention::RetentionPolicyRequest; use dropshot::ErrorStatusCode; use omicron_common::api::external::Generation; use omicron_test_utils::dev; @@ -882,15 +891,41 @@ mod tests { ErrorStatusCode::SERVICE_UNAVAILABLE )); + // Initialize the database, then check the retention policy again. + // + // We need to wait for the `system.query_log` table to exist, which can + // take a while. context.init_db(false).await.expect("failed to initialize database"); - let policy = context - .retention_policy() - .await - .expect("failed to get retention policy"); - assert_eq!(policy.days, Days::new(30).unwrap()); + let policy = dev::poll::wait_for_condition( + || async { + match context.retention_policy().await { + Ok(pol) => { + if pol.tables.contains_key("query_log") { + Ok(pol) + } else { + Err(dev::poll::CondCheckError::<()>::NotYet) + } + } + Err(_) => Err(dev::poll::CondCheckError::<()>::NotYet), + } + }, + &std::time::Duration::from_millis(100), + &std::time::Duration::from_secs(30), + ) + .await + .expect("failed to get retention policy"); + assert!(!policy.tables.is_empty()); + + // The query log defaults to 7 days TTL, everything else is 30. + assert!(policy.tables.iter().all(|pol| { + let expected = if pol.table == "query_log" { 7 } else { 30 }; + u8::from(pol.days) == expected + })); + + // Set everything to 3, and ensure we can read it back. let days = Days::new(3).unwrap(); - let new = RetentionPolicy { days }; + let new = RetentionPolicyRequest { days }; context .set_retention_policy(new) .await @@ -899,7 +934,8 @@ mod tests { .retention_policy() .await .expect("failed to get retention policy"); - assert_eq!(policy.days, days); + assert!(!policy.tables.is_empty()); + assert!(policy.tables.iter().all(|pol| pol.days == days)); clickhouse.cleanup().await.unwrap(); logctx.cleanup_successful(); diff --git a/clickhouse-admin/src/http_entrypoints.rs b/clickhouse-admin/src/http_entrypoints.rs index 61967f05efc..29bdde36bf4 100644 --- a/clickhouse-admin/src/http_entrypoints.rs +++ b/clickhouse-admin/src/http_entrypoints.rs @@ -9,7 +9,9 @@ use clickhouse_admin_types::keeper::{ ClickhouseKeeperClusterMembership, KeeperConf, KeeperConfigurableSettings, Lgif, RaftConfig, }; -use clickhouse_admin_types::retention::RetentionPolicy; +use clickhouse_admin_types::retention::{ + DatabaseRetentionPolicy, RetentionPolicyRequest, +}; use clickhouse_admin_types::server::{ DistributedDdlQueue, MetricInfoPath, ServerConfigurableSettings, SystemTimeSeries, SystemTimeSeriesSettings, TimeSeriesSettingsQuery, @@ -106,7 +108,7 @@ impl ClickhouseAdminServerApi for ClickhouseAdminServerImpl { async fn set_retention_policy( rqctx: RequestContext, - policy: TypedBody, + policy: TypedBody, ) -> Result { rqctx .context() @@ -117,7 +119,7 @@ impl ClickhouseAdminServerApi for ClickhouseAdminServerImpl { async fn retention_policy( rqctx: RequestContext, - ) -> Result, HttpError> { + ) -> Result, HttpError> { rqctx.context().retention_policy().await.map(HttpResponseOk) } @@ -226,7 +228,7 @@ impl ClickhouseAdminSingleApi for ClickhouseAdminSingleImpl { async fn set_retention_policy( rqctx: RequestContext, - policy: TypedBody, + policy: TypedBody, ) -> Result { rqctx .context() @@ -237,7 +239,7 @@ impl ClickhouseAdminSingleApi for ClickhouseAdminSingleImpl { async fn retention_policy( rqctx: RequestContext, - ) -> Result, HttpError> { + ) -> Result, HttpError> { rqctx.context().retention_policy().await.map(HttpResponseOk) } diff --git a/clickhouse-admin/types/versions/src/add_retention_policy_and_table_usage/retention.rs b/clickhouse-admin/types/versions/src/add_retention_policy_and_table_usage/retention.rs index b9c68a4428a..ae506a25cfa 100644 --- a/clickhouse-admin/types/versions/src/add_retention_policy_and_table_usage/retention.rs +++ b/clickhouse-admin/types/versions/src/add_retention_policy_and_table_usage/retention.rs @@ -67,4 +67,11 @@ impl Days { } } } + + /// Return a human-friendly string displaying the policy. + pub fn as_human_str(&self) -> String { + let n_days = self.0.get(); + let suffix = if n_days >= 1 { "days" } else { "day" }; + format!("{n_days} {suffix}") + } } diff --git a/clickhouse-admin/types/versions/src/add_retention_policy_for_all_tables/mod.rs b/clickhouse-admin/types/versions/src/add_retention_policy_for_all_tables/mod.rs new file mode 100644 index 00000000000..1ddf7a830b7 --- /dev/null +++ b/clickhouse-admin/types/versions/src/add_retention_policy_for_all_tables/mod.rs @@ -0,0 +1,8 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Version `ADD_RETENTION_POLICY_FOR_ALL_TABLES` of the clickhouse-admin +//! server API. + +pub mod retention; diff --git a/clickhouse-admin/types/versions/src/add_retention_policy_for_all_tables/retention.rs b/clickhouse-admin/types/versions/src/add_retention_policy_for_all_tables/retention.rs new file mode 100644 index 00000000000..6ca3761224f --- /dev/null +++ b/clickhouse-admin/types/versions/src/add_retention_policy_for_all_tables/retention.rs @@ -0,0 +1,55 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use crate::v2::retention::Days; +use iddqd::IdOrdItem; +use iddqd::IdOrdMap; +use iddqd::id_upcast; +use schemars::JsonSchema; +use serde::Deserialize; +use serde::Serialize; + +/// A request for setting a retention policy. +#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)] +pub struct RetentionPolicyRequest { + /// The requested retention period, in days. + pub days: Days, +} + +impl From for RetentionPolicyRequest { + fn from(value: crate::v2::retention::RetentionPolicy) -> Self { + Self { days: value.days } + } +} + +/// Policy for retaining telemetry data for a database table. +#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)] +pub struct RetentionPolicy { + /// The table the policy applies to. + pub table: String, + /// The retention period, in days. + pub days: Days, +} + +impl From for crate::v2::retention::RetentionPolicy { + fn from(value: RetentionPolicy) -> Self { + Self { days: value.days } + } +} + +impl IdOrdItem for RetentionPolicy { + type Key<'a> = &'a str; + + fn key(&self) -> Self::Key<'_> { + self.table.as_str() + } + + id_upcast!(); +} + +/// Policy for retaining telemetry data for all tables. +#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)] +pub struct DatabaseRetentionPolicy { + pub tables: IdOrdMap, +} diff --git a/clickhouse-admin/types/versions/src/latest.rs b/clickhouse-admin/types/versions/src/latest.rs index 9170eb1f0a7..e2cc0487b34 100644 --- a/clickhouse-admin/types/versions/src/latest.rs +++ b/clickhouse-admin/types/versions/src/latest.rs @@ -52,7 +52,9 @@ pub mod config { pub mod retention { pub use crate::v2::retention::Days; - pub use crate::v2::retention::RetentionPolicy; + pub use crate::v3::retention::DatabaseRetentionPolicy; + pub use crate::v3::retention::RetentionPolicy; + pub use crate::v3::retention::RetentionPolicyRequest; } pub mod usage { diff --git a/clickhouse-admin/types/versions/src/lib.rs b/clickhouse-admin/types/versions/src/lib.rs index 39ca2394e11..71a46a6b6c3 100644 --- a/clickhouse-admin/types/versions/src/lib.rs +++ b/clickhouse-admin/types/versions/src/lib.rs @@ -35,3 +35,5 @@ pub mod latest; pub mod v1; #[path = "add_retention_policy_and_table_usage/mod.rs"] pub mod v2; +#[path = "add_retention_policy_for_all_tables/mod.rs"] +pub mod v3; diff --git a/clients/clickhouse-admin-single-client/src/lib.rs b/clients/clickhouse-admin-single-client/src/lib.rs index 8df01ac72f9..a225a0898bd 100644 --- a/clients/clickhouse-admin-single-client/src/lib.rs +++ b/clients/clickhouse-admin-single-client/src/lib.rs @@ -27,3 +27,11 @@ progenitor::generate_api!( ServerConfigurableSettings = clickhouse_admin_types::server::ServerConfigurableSettings, } ); + +impl crate::types::Days { + pub fn as_human_str(&self) -> String { + let n_days = self.0.get(); + let suffix = if n_days > 1 { "days" } else { "day" }; + format!("{n_days} {suffix}") + } +} diff --git a/dev-tools/omdb/src/bin/omdb/clickhouse_admin.rs b/dev-tools/omdb/src/bin/omdb/clickhouse_admin.rs index 5eea3e64c66..89f42161889 100644 --- a/dev-tools/omdb/src/bin/omdb/clickhouse_admin.rs +++ b/dev-tools/omdb/src/bin/omdb/clickhouse_admin.rs @@ -42,7 +42,7 @@ pub struct ClickHouseAdminArgs { } #[derive(Debug, Args)] -struct RetentionPolicy { +struct RetentionPolicyRequest { /// The number of days to retain telemetry data. #[arg(long)] days: std::num::NonZeroU8, @@ -54,7 +54,7 @@ enum ClickHouseAdminCommands { /// Fetch the current database retention policy RetentionPolicy, /// Set the current database retention policy - SetRetentionPolicy(RetentionPolicy), + SetRetentionPolicy(RetentionPolicyRequest), /// Fetch the current database table usage. DatabaseUsage, } @@ -115,12 +115,24 @@ impl ClickHouseAdminArgs { } async fn retention_policy(&self, client: &Client) -> anyhow::Result<()> { - let types::RetentionPolicy { days } = client + let types::DatabaseRetentionPolicy { tables } = client .retention_policy() .await .context("fetching retention policy")? .into_inner(); - println!("Retention: {:>2} days", days); + let rows = tables + .into_iter() + .map(|t| RetentionPolicy { + table_name: t.table, + retention: t.days.as_human_str(), + }) + .collect::>(); + let table = Table::new(rows) + .with(tabled::settings::Style::empty()) + .with(tabled::settings::Padding::new(0, 1, 0, 0)) + .to_string(); + println!(); + println!("{table}"); Ok(()) } @@ -178,11 +190,11 @@ impl ClickHouseAdminArgs { async fn set_retention_policy( &self, client: &Client, - retention: &RetentionPolicy, + retention: &RetentionPolicyRequest, ) -> anyhow::Result<()> { let days = types::Days(retention.days); client - .set_retention_policy(&types::RetentionPolicy { days }) + .set_retention_policy(&types::RetentionPolicyRequest { days }) .await .context("setting retention policy") .map(|_| ()) @@ -212,6 +224,13 @@ struct TableUsage { n_rows: u64, } +#[derive(Tabled)] +#[tabled(rename_all = "SCREAMING_SNAKE_CASE")] +struct RetentionPolicy { + table_name: String, + retention: String, +} + #[cfg(test)] #[test] fn test_format_bytes() { diff --git a/openapi/clickhouse-admin-keeper/clickhouse-admin-keeper-2.0.0-4177d1.json.gitstub b/openapi/clickhouse-admin-keeper/clickhouse-admin-keeper-2.0.0-4177d1.json.gitstub new file mode 100644 index 00000000000..65d140f9788 --- /dev/null +++ b/openapi/clickhouse-admin-keeper/clickhouse-admin-keeper-2.0.0-4177d1.json.gitstub @@ -0,0 +1 @@ +61a6d609313f90c7c5a87d5dfdfe70745526334e:openapi/clickhouse-admin-keeper/clickhouse-admin-keeper-2.0.0-4177d1.json diff --git a/openapi/clickhouse-admin-keeper/clickhouse-admin-keeper-2.0.0-4177d1.json b/openapi/clickhouse-admin-keeper/clickhouse-admin-keeper-3.0.0-012c32.json similarity index 99% rename from openapi/clickhouse-admin-keeper/clickhouse-admin-keeper-2.0.0-4177d1.json rename to openapi/clickhouse-admin-keeper/clickhouse-admin-keeper-3.0.0-012c32.json index 27d485e22a3..35e41a2c831 100644 --- a/openapi/clickhouse-admin-keeper/clickhouse-admin-keeper-2.0.0-4177d1.json +++ b/openapi/clickhouse-admin-keeper/clickhouse-admin-keeper-3.0.0-012c32.json @@ -7,7 +7,7 @@ "url": "https://oxide.computer", "email": "api@oxide.computer" }, - "version": "2.0.0" + "version": "3.0.0" }, "paths": { "/4lw-conf": { diff --git a/openapi/clickhouse-admin-keeper/clickhouse-admin-keeper-latest.json b/openapi/clickhouse-admin-keeper/clickhouse-admin-keeper-latest.json index 66eb0536e62..295f48b4539 120000 --- a/openapi/clickhouse-admin-keeper/clickhouse-admin-keeper-latest.json +++ b/openapi/clickhouse-admin-keeper/clickhouse-admin-keeper-latest.json @@ -1 +1 @@ -clickhouse-admin-keeper-2.0.0-4177d1.json \ No newline at end of file +clickhouse-admin-keeper-3.0.0-012c32.json \ No newline at end of file diff --git a/openapi/clickhouse-admin-server/clickhouse-admin-server-2.0.0-0b00a8.json.gitstub b/openapi/clickhouse-admin-server/clickhouse-admin-server-2.0.0-0b00a8.json.gitstub new file mode 100644 index 00000000000..0d76a5eb4fc --- /dev/null +++ b/openapi/clickhouse-admin-server/clickhouse-admin-server-2.0.0-0b00a8.json.gitstub @@ -0,0 +1 @@ +61a6d609313f90c7c5a87d5dfdfe70745526334e:openapi/clickhouse-admin-server/clickhouse-admin-server-2.0.0-0b00a8.json diff --git a/openapi/clickhouse-admin-server/clickhouse-admin-server-2.0.0-0b00a8.json b/openapi/clickhouse-admin-server/clickhouse-admin-server-3.0.0-6a8d9b.json similarity index 95% rename from openapi/clickhouse-admin-server/clickhouse-admin-server-2.0.0-0b00a8.json rename to openapi/clickhouse-admin-server/clickhouse-admin-server-3.0.0-6a8d9b.json index 2c87a43f206..c6f0fe8e302 100644 --- a/openapi/clickhouse-admin-server/clickhouse-admin-server-2.0.0-0b00a8.json +++ b/openapi/clickhouse-admin-server/clickhouse-admin-server-3.0.0-6a8d9b.json @@ -7,7 +7,7 @@ "url": "https://oxide.computer", "email": "api@oxide.computer" }, - "version": "2.0.0" + "version": "3.0.0" }, "paths": { "/config": { @@ -125,7 +125,7 @@ "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/RetentionPolicy" + "$ref": "#/components/schemas/DatabaseRetentionPolicy" } } } @@ -145,7 +145,7 @@ "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/RetentionPolicy" + "$ref": "#/components/schemas/RetentionPolicyRequest" } } }, @@ -310,6 +310,33 @@ } ] }, + "DatabaseRetentionPolicy": { + "description": "Policy for retaining telemetry data for all tables.", + "type": "object", + "properties": { + "tables": { + "title": "IdOrdMap", + "x-rust-type": { + "crate": "iddqd", + "parameters": [ + { + "$ref": "#/components/schemas/RetentionPolicy" + } + ], + "path": "iddqd::IdOrdMap", + "version": "*" + }, + "type": "array", + "items": { + "$ref": "#/components/schemas/RetentionPolicy" + }, + "uniqueItems": true + } + }, + "required": [ + "tables" + ] + }, "DatabaseUsage": { "description": "The resource usage of all database tables.", "type": "object", @@ -899,7 +926,7 @@ ] }, "RetentionPolicy": { - "description": "Policy for retaining telemetry data.", + "description": "Policy for retaining telemetry data for a database table.", "type": "object", "properties": { "days": { @@ -909,6 +936,28 @@ "$ref": "#/components/schemas/Days" } ] + }, + "table": { + "description": "The table the policy applies to.", + "type": "string" + } + }, + "required": [ + "days", + "table" + ] + }, + "RetentionPolicyRequest": { + "description": "A request for setting a retention policy.", + "type": "object", + "properties": { + "days": { + "description": "The requested retention period, in days.", + "allOf": [ + { + "$ref": "#/components/schemas/Days" + } + ] } }, "required": [ diff --git a/openapi/clickhouse-admin-server/clickhouse-admin-server-latest.json b/openapi/clickhouse-admin-server/clickhouse-admin-server-latest.json index 455ec7a7a77..b10970ca47c 120000 --- a/openapi/clickhouse-admin-server/clickhouse-admin-server-latest.json +++ b/openapi/clickhouse-admin-server/clickhouse-admin-server-latest.json @@ -1 +1 @@ -clickhouse-admin-server-2.0.0-0b00a8.json \ No newline at end of file +clickhouse-admin-server-3.0.0-6a8d9b.json \ No newline at end of file diff --git a/openapi/clickhouse-admin-single/clickhouse-admin-single-2.0.0-490c30.json.gitstub b/openapi/clickhouse-admin-single/clickhouse-admin-single-2.0.0-490c30.json.gitstub new file mode 100644 index 00000000000..b189554e2f4 --- /dev/null +++ b/openapi/clickhouse-admin-single/clickhouse-admin-single-2.0.0-490c30.json.gitstub @@ -0,0 +1 @@ +61a6d609313f90c7c5a87d5dfdfe70745526334e:openapi/clickhouse-admin-single/clickhouse-admin-single-2.0.0-490c30.json diff --git a/openapi/clickhouse-admin-single/clickhouse-admin-single-2.0.0-490c30.json b/openapi/clickhouse-admin-single/clickhouse-admin-single-3.0.0-0ff327.json similarity index 87% rename from openapi/clickhouse-admin-single/clickhouse-admin-single-2.0.0-490c30.json rename to openapi/clickhouse-admin-single/clickhouse-admin-single-3.0.0-0ff327.json index c282cda533e..7bb52dde000 100644 --- a/openapi/clickhouse-admin-single/clickhouse-admin-single-2.0.0-490c30.json +++ b/openapi/clickhouse-admin-single/clickhouse-admin-single-3.0.0-0ff327.json @@ -7,7 +7,7 @@ "url": "https://oxide.computer", "email": "api@oxide.computer" }, - "version": "2.0.0" + "version": "3.0.0" }, "paths": { "/init": { @@ -37,7 +37,7 @@ "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/RetentionPolicy" + "$ref": "#/components/schemas/DatabaseRetentionPolicy" } } } @@ -57,7 +57,7 @@ "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/RetentionPolicy" + "$ref": "#/components/schemas/RetentionPolicyRequest" } } }, @@ -180,6 +180,33 @@ }, "components": { "schemas": { + "DatabaseRetentionPolicy": { + "description": "Policy for retaining telemetry data for all tables.", + "type": "object", + "properties": { + "tables": { + "title": "IdOrdMap", + "x-rust-type": { + "crate": "iddqd", + "parameters": [ + { + "$ref": "#/components/schemas/RetentionPolicy" + } + ], + "path": "iddqd::IdOrdMap", + "version": "*" + }, + "type": "array", + "items": { + "$ref": "#/components/schemas/RetentionPolicy" + }, + "uniqueItems": true + } + }, + "required": [ + "tables" + ] + }, "DatabaseUsage": { "description": "The resource usage of all database tables.", "type": "object", @@ -287,7 +314,7 @@ ] }, "RetentionPolicy": { - "description": "Policy for retaining telemetry data.", + "description": "Policy for retaining telemetry data for a database table.", "type": "object", "properties": { "days": { @@ -297,6 +324,28 @@ "$ref": "#/components/schemas/Days" } ] + }, + "table": { + "description": "The table the policy applies to.", + "type": "string" + } + }, + "required": [ + "days", + "table" + ] + }, + "RetentionPolicyRequest": { + "description": "A request for setting a retention policy.", + "type": "object", + "properties": { + "days": { + "description": "The requested retention period, in days.", + "allOf": [ + { + "$ref": "#/components/schemas/Days" + } + ] } }, "required": [ diff --git a/openapi/clickhouse-admin-single/clickhouse-admin-single-latest.json b/openapi/clickhouse-admin-single/clickhouse-admin-single-latest.json index b23bcece5ac..d2209c9b14c 120000 --- a/openapi/clickhouse-admin-single/clickhouse-admin-single-latest.json +++ b/openapi/clickhouse-admin-single/clickhouse-admin-single-latest.json @@ -1 +1 @@ -clickhouse-admin-single-2.0.0-490c30.json \ No newline at end of file +clickhouse-admin-single-3.0.0-0ff327.json \ No newline at end of file diff --git a/oximeter/db/src/client/mod.rs b/oximeter/db/src/client/mod.rs index bc97373f36d..0b8acf7b304 100644 --- a/oximeter/db/src/client/mod.rs +++ b/oximeter/db/src/client/mod.rs @@ -31,8 +31,10 @@ use crate::native::block::Block; use crate::native::block::ValueArray; use crate::query; use chrono::Utc; +use clickhouse_admin_types::retention::DatabaseRetentionPolicy; use clickhouse_admin_types::retention::Days; use clickhouse_admin_types::retention::RetentionPolicy; +use clickhouse_admin_types::retention::RetentionPolicyRequest; use clickhouse_admin_types::usage::DatabaseUsage; use clickhouse_admin_types::usage::TableUsage; use debug_ignore::DebugIgnore; @@ -40,6 +42,7 @@ use dropshot::EmptyScanParams; use dropshot::PaginationOrder; use dropshot::ResultsPage; use dropshot::WhichPage; +use iddqd::IdOrdMap; use omicron_common::backoff; use omicron_common::backoff::Backoff as _; use oximeter::Measurement; @@ -963,9 +966,24 @@ impl Client { } /// Set the retention policy for the oximeter database tables. + /// + /// This attempts to set the policy to the same value on every relevant + /// table. But because we set each table in a separate query, those + /// individual queries can fail, leaving each table with different retention + /// policies. + /// + /// The return value of `retention_policy()` includes the value for every + /// table. That should be checked to ensure the policy was correctly set on + /// every table. + /// + /// Also note that this sets the TTLs asynchronously. Fetching the TTL + /// itself immediately after should show the correct value (assuming it + /// succceeded), but dropping any data beyond the new TTL happens + /// asynchronously. Use the method `database_table_usage()` to monitor the + /// behavior. pub async fn set_retention_policy( &self, - policy: RetentionPolicy, + policy: RetentionPolicyRequest, replicated: bool, ) -> Result<(), Error> { let mut handle = self.claim_connection().await?; @@ -989,6 +1007,11 @@ impl Client { "error" => InlineErrorChain::new(&e), ); res = Err(e); + + // Be cautious and claim a new connection if this attempt fails. + // We're not really sure what state the connection is in, so + // recycle it to the pool to be reset and claim a new one. + handle = self.claim_connection().await?; } } res @@ -1001,7 +1024,7 @@ impl Client { async fn set_one_table_retention_policy( &self, handle: &mut Handle, - policy: &RetentionPolicy, + policy: &RetentionPolicyRequest, table: &str, replicated: bool, ) -> Result<(), Error> { @@ -1018,11 +1041,27 @@ impl Client { )) })?; let days = u8::from(policy.days); - let sql = format!( + + // We want to explicitly queue the changes to the TTL, rather than wait + // for the full modification. This is a bit complicated in our version + // of ClickHouse. We need to: + // + // - set the TTL, but ensure the DB does not "materialize" that TTL + // right away (the `materialize_ttl_after_modify` setting). + // - start materializing the TTL, but don't wait for it to finish (the + // `mutations_sync` setting). + let alter_ttl_sql = format!( "ALTER TABLE {table} {maybe_on_cluster}\ - MODIFY TTL {start_time} + toIntervalDay({days})" + MODIFY TTL {start_time} + toIntervalDay({days}) \ + SETTINGS materialize_ttl_after_modify=0;" ); - self.execute_native(handle, &sql).await + self.execute_native(handle, &alter_ttl_sql).await?; + let materialize_ttl_sql = format!( + "ALTER TABLE {table} {maybe_on_cluster} \ + MATERIALIZE TTL \ + SETTINGS mutations_sync=0;" + ); + self.execute_native(handle, &materialize_ttl_sql).await } fn ttl_start_time_expr_for_table(table: &str) -> Option<&'static str> { @@ -1031,7 +1070,7 @@ impl Client { } else if table.contains("fields") { Some("last_updated_at") } else if table.contains("query_log") { - Some("event_time") + Some("event_date") } else { None } @@ -1091,27 +1130,49 @@ impl Client { /// could not be extracted. `None` is returned if there is no retention /// policy set yet, which can happen if the database hasn't been initialized /// yet. + /// + /// This returns the retention policy for every relevant table in the + /// database. Because we set the policy individually on each of them, they + /// can differ, if that set request fails in the middle. That also implies + /// the returned values may be different for different tables. pub async fn retention_policy( &self, - ) -> Result, Error> { + ) -> Result, Error> { // It's possible today that different tables actually have different - // TTLs, because we short-circuit if we fail to set the TTL partway - // through the list of tables. We're hoping that they're all the same, - // and picking a measurements table for simplicity. + // TTLs, because we individual queries may fail. + // + // Let's fetch all of them. const SQL: &str = "\ - SELECT create_table_query \ + SELECT name, create_table_query \ FROM system.tables \ - WHERE database = 'oximeter' \ - AND startsWith(name, 'measurements') \ - AND position(engine, 'MergeTree') != 0 \ - LIMIT 1;"; + WHERE (\ + database = 'oximeter' \ + AND (\ + startsWith(name, 'measurements') OR \ + startsWith(name, 'fields') \ + ) \ + AND position(engine, 'MergeTree') != 0\ + ) OR (database = 'system' AND name = 'query_log');"; let result = self .execute_with_block(&mut self.claim_connection().await?, SQL) .await?; let block = result.data.ok_or_else(|| Error::QueryMissingData { query: SQL.to_string(), })?; - let ValueArray::String(rows) = &block + let names = block + .columns + .get("name") + .ok_or_else(|| { + Error::Database("Expected a column named `name`".to_string()) + })? + .values + .as_string() + .map_err(|dt| { + Error::Database(format!( + "Expected string datatype found '{dt}'" + )) + })?; + let queries = block .columns .get("create_table_query") .ok_or_else(|| { @@ -1120,35 +1181,44 @@ impl Client { ) })? .values - else { - return Err(Error::Database( - "Expected `create_table_query` to have type string".to_string(), - )); - }; - if rows.is_empty() { + .as_string() + .map_err(|dt| { + Error::Database(format!( + "Expected string datatype found '{dt}'" + )) + })?; + if queries.is_empty() { return Ok(None); } - if rows.len() != 1 { - return Err(Error::Database( - "Expected `create_table_query` to have exactly 1 row" - .to_string(), - )); - } - let days = - extract_ttl_in_days_from_measurements_create_table_query(&rows[0])?; - Ok(Some(RetentionPolicy { days })) + names + .iter() + .cloned() + .zip( + queries + .iter() + .map(|q| extract_ttl_in_days_from_create_table_query(q)), + ) + .map(|(table, ttl)| ttl.map(|days| RetentionPolicy { table, days })) + .collect::, _>>() + .map(|tables| Some(DatabaseRetentionPolicy { tables })) } /// Return the resource usage of tables in the database. pub async fn database_table_usage(&self) -> Result { let started_at = Utc::now(); - const SQL: &str = "\ + const SQL: &str = const_format::formatcp!( + "\ SELECT \ concat(database, '.', name) AS table_name, \ ifNull(total_bytes, 0) AS total_bytes, \ ifNull(total_rows, 0) AS total_rows \ FROM system.tables \ - WHERE has_own_data"; + WHERE (\ + database = '{}' OR \ + (database = 'system' AND name = 'query_log')\ + ) AND has_own_data", + crate::DATABASE_NAME, + ); let columns = self .execute_with_block(&mut self.claim_connection().await?, SQL) .await? @@ -1627,17 +1697,48 @@ impl Client { // SETTINGS ... // ``` // +// The TTL line could also look like: +// +// ``` +// TTL last_updated_at + toIntervalDay() +// ``` +// +// for the field tables. +// +// Lastly, the TTL line could look like this +// +// ``` +// TTL event_date + toIntervalDay() +// ``` +// +// for the system.query_log table. +// +// // We're picking out the number of days from the function argument as a string. -fn extract_ttl_in_days_from_measurements_create_table_query( +fn extract_ttl_in_days_from_create_table_query( row: &str, ) -> Result { - const NEEDLE: &str = "TTL toDateTime(timestamp) + toIntervalDay("; - let needle_start = row.find(NEEDLE).ok_or_else(|| { - Error::Database(format!( - "could not find TTL expression in query: '{row}'" - )) - })?; - let n_days_start = needle_start + NEEDLE.len(); + const NEEDLES: [&str; 3] = [ + // For measurement tables + "TTL toDateTime(timestamp) + toIntervalDay(", + // For field tables + "TTL last_updated_at + toIntervalDay(", + // For system.query_log + "TTL event_date + toIntervalDay(", + ]; + + // Find the first needle that matches, and error if none do. + let (needle_start, needle) = NEEDLES + .iter() + .find_map(|needle| row.find(needle).map(|ix| (ix, needle))) + .ok_or_else(|| { + Error::Database(format!( + "could not find TTL expression in query: '{row}'" + )) + })?; + + // Now the closing parentheses. + let n_days_start = needle_start + needle.len(); let close_paren_start = row[n_days_start..].find(")").ok_or_else(|| { Error::Database(format!( "could not find closing paren in TTL expression: '{row}'" @@ -5253,16 +5354,31 @@ mod tests { fn test_extract_ttl_in_days_from_create_table_query() { assert_eq!( 30u8, - u8::from(extract_ttl_in_days_from_measurements_create_table_query( + u8::from(extract_ttl_in_days_from_create_table_query( "some junk TTL toDateTime(timestamp) + toIntervalDay(30) other stuff" ).unwrap()), ); assert_eq!( 7u8, - u8::from(extract_ttl_in_days_from_measurements_create_table_query( + u8::from(extract_ttl_in_days_from_create_table_query( "some junk TTL toDateTime(timestamp) + toIntervalDay(7) other stuff" ).unwrap()), ); + assert_eq!( + 7u8, + u8::from(extract_ttl_in_days_from_create_table_query( + "some junk TTL last_updated_at + toIntervalDay(7) other stuff" + ).unwrap()), + ); + assert_eq!( + 7u8, + u8::from( + extract_ttl_in_days_from_create_table_query( + "some junk TTL event_date + toIntervalDay(7) other stuff" + ) + .unwrap() + ), + ); for invalid in [ "some junk TTL toDateTime(timestamp) + toIntervalDay(-1) other stuf", @@ -5271,12 +5387,10 @@ mod tests { "some junk TTL toDateTime(timestamp) + toIntervalDay(100)", "some junk TTL toDateTime(timestamp) + toIntervalDay(3000)", "some junk TTL toDateTime(timestamp) + toIntervalDay(3.0)", + "some junk TTL last_updated_at + toIntervalDay(3.0)", ] { assert!( - extract_ttl_in_days_from_measurements_create_table_query( - invalid - ) - .is_err() + extract_ttl_in_days_from_create_table_query(invalid).is_err() ) } }