Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 90 additions & 9 deletions clickhouse-admin/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use clickhouse_admin_types_versions::latest;
use clickhouse_admin_types_versions::{latest, v2};
use dropshot::{
HttpError, HttpResponseCreated, HttpResponseOk,
HttpResponseUpdatedNoContent, Path, Query, RequestContext, TypedBody,
Expand All @@ -27,6 +27,7 @@ api_versions!([
// | example for the next person.
// v
// (next_int, IDENT), // NOTE: read the note at the start of this macro!
(3, ADD_RETENTION_POLICY_FOR_ALL_TABLES),
(2, ADD_RETENTION_POLICY_AND_TABLE_USAGE),
(1, INITIAL),
]);
Expand Down Expand Up @@ -206,22 +207,62 @@ pub trait ClickhouseAdminServerApi {
#[endpoint {
method = PUT,
path = "/retention-policy",
versions = VERSION_ADD_RETENTION_POLICY_AND_TABLE_USAGE..,
versions = VERSION_ADD_RETENTION_POLICY_FOR_ALL_TABLES..,
}]
async fn set_retention_policy(
rqctx: RequestContext<Self::Context>,
policy: TypedBody<latest::retention::RetentionPolicy>,
policy: TypedBody<latest::retention::RetentionPolicyRequest>,
) -> Result<HttpResponseUpdatedNoContent, HttpError>;

/// Set the retention policy for timeseries data.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think it would be worth a note that this update is eventually consistent. IIUC, a user could set the retention policy, immediately read it back, and be surprised that it's not (yet) the expected value.

And a question: with SETTINGS mutations_sync=0, is it possible to queue an update that never succeeds? Speaking of which, have we tested what happens here when the disk is full, which is the issue that triggered this feature in the first place? I don't want to delay merging on this question, but we might want to figure it out sometime if we haven't already.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea about the note.

Yes, I think it's possible for a background mutation to fail. The description for the system.mutations table lists columns like last_fail_time, for example. I can't find on that page or any other whether mutations are retried. The phrasing "last fail time" makes me think they are, but that's not clear.

I'm not sure what happens if the disk is full, we haven't attempted it and it is hard to reproduce. From this page it appears that whole data parts (chunks of tables) are rewritten according to the mutation, and then the original is dropped. Depending on how the TTL is implemented, that could certainly lead to write amplification. It's a valid concern, and we should flag it for testing. I'm...fairly confident... that it will not cause more pain than we've already had because we're expecting to reduce the retention time which would lead to all or most of a data part being dropped, not rewritten with say, a different order. Or maybe I'm just hopeful.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually there's this:

Unlike merges, mutations can't be rolled back once submitted and will continue to execute even after server restarts unless explicitly cancelled

From this page ominously named "Avoid mutations". I agree it's a risk, but this is the only way to actually set a TTL other than managing it ourselves, which I think is out of scope.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note added in ec2050b

#[endpoint {
method = PUT,
path = "/retention-policy",
versions =
VERSION_ADD_RETENTION_POLICY_AND_TABLE_USAGE..VERSION_ADD_RETENTION_POLICY_FOR_ALL_TABLES,
}]
async fn set_retention_policy_v2(
rqctx: RequestContext<Self::Context>,
policy: TypedBody<v2::retention::RetentionPolicy>,
) -> Result<HttpResponseUpdatedNoContent, HttpError> {
Self::set_retention_policy(rqctx, policy.map(Into::into)).await
}

/// Get the retention policy for timeseries data from the database
#[endpoint {
method = GET,
path = "/retention-policy",
versions = VERSION_ADD_RETENTION_POLICY_AND_TABLE_USAGE..,
versions = VERSION_ADD_RETENTION_POLICY_FOR_ALL_TABLES..,
}]
async fn retention_policy(
rqctx: RequestContext<Self::Context>,
) -> Result<HttpResponseOk<latest::retention::RetentionPolicy>, HttpError>;
) -> Result<
HttpResponseOk<latest::retention::DatabaseRetentionPolicy>,
HttpError,
>;

/// Get the retention policy for timeseries data from the database
Comment thread
bnaecker marked this conversation as resolved.
#[endpoint {
method = GET,
path = "/retention-policy",
versions =
VERSION_ADD_RETENTION_POLICY_AND_TABLE_USAGE..VERSION_ADD_RETENTION_POLICY_FOR_ALL_TABLES,
}]
async fn retention_policy_v2(
rqctx: RequestContext<Self::Context>,
) -> Result<HttpResponseOk<v2::retention::RetentionPolicy>, HttpError> {
let HttpResponseOk(mut policy) = Self::retention_policy(rqctx).await?;
policy
.tables
.pop_first()
Comment thread
bnaecker marked this conversation as resolved.
.ok_or_else(|| {
HttpError::for_unavail(
None,
"Database is not yet populated".to_string(),
)
})
.map(|pol| HttpResponseOk(pol.into()))
}

/// Return the resource usage of database tables.
#[endpoint {
Expand Down Expand Up @@ -272,22 +313,62 @@ pub trait ClickhouseAdminSingleApi {
#[endpoint {
method = PUT,
path = "/retention-policy",
versions = VERSION_ADD_RETENTION_POLICY_AND_TABLE_USAGE..,
versions = VERSION_ADD_RETENTION_POLICY_FOR_ALL_TABLES..,
}]
async fn set_retention_policy(
rqctx: RequestContext<Self::Context>,
policy: TypedBody<latest::retention::RetentionPolicy>,
policy: TypedBody<latest::retention::RetentionPolicyRequest>,
) -> Result<HttpResponseUpdatedNoContent, HttpError>;

/// Set the retention policy for timeseries data.
#[endpoint {
method = PUT,
path = "/retention-policy",
versions =
VERSION_ADD_RETENTION_POLICY_AND_TABLE_USAGE..VERSION_ADD_RETENTION_POLICY_FOR_ALL_TABLES,
}]
async fn set_retention_policy_v2(
rqctx: RequestContext<Self::Context>,
policy: TypedBody<v2::retention::RetentionPolicy>,
) -> Result<HttpResponseUpdatedNoContent, HttpError> {
Self::set_retention_policy(rqctx, policy.map(Into::into)).await
}

/// Get the retention policy for timeseries data from the database
#[endpoint {
method = GET,
path = "/retention-policy",
versions = VERSION_ADD_RETENTION_POLICY_AND_TABLE_USAGE..,
versions = VERSION_ADD_RETENTION_POLICY_FOR_ALL_TABLES..,
}]
async fn retention_policy(
rqctx: RequestContext<Self::Context>,
) -> Result<HttpResponseOk<latest::retention::RetentionPolicy>, HttpError>;
) -> Result<
HttpResponseOk<latest::retention::DatabaseRetentionPolicy>,
HttpError,
>;

/// Get the retention policy for timeseries data from the database
#[endpoint {
method = GET,
path = "/retention-policy",
versions =
VERSION_ADD_RETENTION_POLICY_AND_TABLE_USAGE..VERSION_ADD_RETENTION_POLICY_FOR_ALL_TABLES,
}]
async fn retention_policy_v2(
rqctx: RequestContext<Self::Context>,
) -> Result<HttpResponseOk<v2::retention::RetentionPolicy>, HttpError> {
let HttpResponseOk(mut policy) = Self::retention_policy(rqctx).await?;
policy
.tables
.pop_first()
Comment thread
bnaecker marked this conversation as resolved.
.ok_or_else(|| {
HttpError::for_unavail(
None,
"Database is not yet populated".to_string(),
)
})
.map(|pol| HttpResponseOk(pol.into()))
}

/// Return the resource usage of database tables.
#[endpoint {
Expand Down
35 changes: 24 additions & 11 deletions clickhouse-admin/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use camino::Utf8PathBuf;
use chrono::Utc;
use clickhouse_admin_types::config::GenerateConfigResult;
use clickhouse_admin_types::keeper::KeeperConfigurableSettings;
use clickhouse_admin_types::retention::RetentionPolicy;
use clickhouse_admin_types::retention::{
DatabaseRetentionPolicy, RetentionPolicyRequest,
};
use clickhouse_admin_types::server::ServerConfigurableSettings;
use clickhouse_admin_types::usage::{
DatabaseUsage, DatabaseUsageError, DatabaseUsageResult,
Expand Down Expand Up @@ -273,7 +275,7 @@ impl ServerContext {
/// Update the retention policy of the oximeter database tables.
pub async fn set_retention_policy(
&self,
policy: RetentionPolicy,
policy: RetentionPolicyRequest,
) -> Result<(), HttpError> {
let (tx, rx) = oneshot::channel();
self.retention_tx
Expand All @@ -294,7 +296,9 @@ impl ServerContext {
}

/// Request the retention policy from the database.
pub async fn retention_policy(&self) -> Result<RetentionPolicy, HttpError> {
pub async fn retention_policy(
&self,
) -> Result<DatabaseRetentionPolicy, HttpError> {
let (tx, rx) = oneshot::channel();
self.retention_tx.try_send(RetentionRequest::Get { tx }).map_err(
|_| {
Expand Down Expand Up @@ -594,8 +598,13 @@ fn read_generation_from_file(path: Utf8PathBuf) -> Result<Option<Generation>> {

#[derive(Debug)]
enum RetentionRequest {
Set { policy: RetentionPolicy, tx: oneshot::Sender<Result<(), HttpError>> },
Get { tx: oneshot::Sender<Result<RetentionPolicy, HttpError>> },
Set {
policy: RetentionPolicyRequest,
tx: oneshot::Sender<Result<(), HttpError>>,
},
Get {
tx: oneshot::Sender<Result<DatabaseRetentionPolicy, HttpError>>,
},
}

async fn long_running_retention_task(
Expand Down Expand Up @@ -629,7 +638,7 @@ async fn long_running_retention_task(
async fn set_retention_policy(
log: &Logger,
client: &OximeterDbClient,
policy: RetentionPolicy,
policy: RetentionPolicyRequest,
replicated: bool,
tx: oneshot::Sender<Result<(), HttpError>>,
) {
Expand Down Expand Up @@ -667,7 +676,7 @@ async fn set_retention_policy(
async fn get_retention_policy(
log: &Logger,
client: &OximeterDbClient,
tx: oneshot::Sender<Result<RetentionPolicy, HttpError>>,
tx: oneshot::Sender<Result<DatabaseRetentionPolicy, HttpError>>,
) {
debug!(log, "fetching retention policy from database");
let result = match client.retention_policy().await {
Expand Down Expand Up @@ -768,7 +777,7 @@ mod tests {
use camino::Utf8PathBuf;
use clickhouse_admin_types::CLICKHOUSE_SERVER_CONFIG_FILE;
use clickhouse_admin_types::retention::Days;
use clickhouse_admin_types::retention::RetentionPolicy;
use clickhouse_admin_types::retention::RetentionPolicyRequest;
use dropshot::ErrorStatusCode;
use omicron_common::api::external::Generation;
use omicron_test_utils::dev;
Expand Down Expand Up @@ -887,10 +896,13 @@ mod tests {
.retention_policy()
.await
.expect("failed to get retention policy");
assert_eq!(policy.days, Days::new(30).unwrap());
assert!(!policy.tables.is_empty());
assert!(
policy.tables.iter().all(|pol| pol.days == Days::new(30).unwrap())
);

let days = Days::new(3).unwrap();
let new = RetentionPolicy { days };
let new = RetentionPolicyRequest { days };
context
.set_retention_policy(new)
.await
Expand All @@ -899,7 +911,8 @@ mod tests {
.retention_policy()
.await
.expect("failed to get retention policy");
assert_eq!(policy.days, days);
assert!(!policy.tables.is_empty());
assert!(policy.tables.iter().all(|pol| pol.days == days));

clickhouse.cleanup().await.unwrap();
logctx.cleanup_successful();
Expand Down
12 changes: 7 additions & 5 deletions clickhouse-admin/src/http_entrypoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use clickhouse_admin_types::keeper::{
ClickhouseKeeperClusterMembership, KeeperConf, KeeperConfigurableSettings,
Lgif, RaftConfig,
};
use clickhouse_admin_types::retention::RetentionPolicy;
use clickhouse_admin_types::retention::{
DatabaseRetentionPolicy, RetentionPolicyRequest,
};
use clickhouse_admin_types::server::{
DistributedDdlQueue, MetricInfoPath, ServerConfigurableSettings,
SystemTimeSeries, SystemTimeSeriesSettings, TimeSeriesSettingsQuery,
Expand Down Expand Up @@ -106,7 +108,7 @@ impl ClickhouseAdminServerApi for ClickhouseAdminServerImpl {

async fn set_retention_policy(
rqctx: RequestContext<Self::Context>,
policy: TypedBody<RetentionPolicy>,
policy: TypedBody<RetentionPolicyRequest>,
) -> Result<HttpResponseUpdatedNoContent, HttpError> {
rqctx
.context()
Expand All @@ -117,7 +119,7 @@ impl ClickhouseAdminServerApi for ClickhouseAdminServerImpl {

async fn retention_policy(
rqctx: RequestContext<Self::Context>,
) -> Result<HttpResponseOk<RetentionPolicy>, HttpError> {
) -> Result<HttpResponseOk<DatabaseRetentionPolicy>, HttpError> {
rqctx.context().retention_policy().await.map(HttpResponseOk)
}

Expand Down Expand Up @@ -226,7 +228,7 @@ impl ClickhouseAdminSingleApi for ClickhouseAdminSingleImpl {

async fn set_retention_policy(
rqctx: RequestContext<Self::Context>,
policy: TypedBody<RetentionPolicy>,
policy: TypedBody<RetentionPolicyRequest>,
) -> Result<HttpResponseUpdatedNoContent, HttpError> {
rqctx
.context()
Expand All @@ -237,7 +239,7 @@ impl ClickhouseAdminSingleApi for ClickhouseAdminSingleImpl {

async fn retention_policy(
rqctx: RequestContext<Self::Context>,
) -> Result<HttpResponseOk<RetentionPolicy>, HttpError> {
) -> Result<HttpResponseOk<DatabaseRetentionPolicy>, HttpError> {
rqctx.context().retention_policy().await.map(HttpResponseOk)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,9 @@ impl Days {
}
}
}

impl std::fmt::Display for Days {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} days", self.0.get())
Comment thread
bnaecker marked this conversation as resolved.
Outdated
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

//! Version `ADD_RETENTION_POLICY_FOR_ALL_TABLES` of the clickhouse-admin
//! server API.
pub mod retention;
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use crate::v2::retention::Days;
use iddqd::IdOrdItem;
use iddqd::IdOrdMap;
use iddqd::id_upcast;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;

/// A request for setting a retention policy.
#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)]
pub struct RetentionPolicyRequest {
/// The requested retention period, in days.
pub days: Days,
}

impl From<crate::v2::retention::RetentionPolicy> for RetentionPolicyRequest {
fn from(value: crate::v2::retention::RetentionPolicy) -> Self {
Self { days: value.days }
}
}

/// Policy for retaining telemetry data for a database table.
#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)]
pub struct RetentionPolicy {
/// The table the policy applies to.
pub table: String,
/// The retention period, in days.
pub days: Days,
}

impl From<RetentionPolicy> for crate::v2::retention::RetentionPolicy {
fn from(value: RetentionPolicy) -> Self {
Self { days: value.days }
}
}

impl IdOrdItem for RetentionPolicy {
type Key<'a> = &'a str;

fn key(&self) -> Self::Key<'_> {
self.table.as_str()
}

id_upcast!();
}

/// Policy for retaining telemetry data for all tables.
#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)]
pub struct DatabaseRetentionPolicy {
pub tables: IdOrdMap<RetentionPolicy>,
}
4 changes: 3 additions & 1 deletion clickhouse-admin/types/versions/src/latest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ pub mod config {

pub mod retention {
pub use crate::v2::retention::Days;
pub use crate::v2::retention::RetentionPolicy;
pub use crate::v3::retention::DatabaseRetentionPolicy;
pub use crate::v3::retention::RetentionPolicy;
pub use crate::v3::retention::RetentionPolicyRequest;
}

pub mod usage {
Expand Down
2 changes: 2 additions & 0 deletions clickhouse-admin/types/versions/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ pub mod latest;
pub mod v1;
#[path = "add_retention_policy_and_table_usage/mod.rs"]
pub mod v2;
#[path = "add_retention_policy_for_all_tables/mod.rs"]
pub mod v3;
Loading
Loading