Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion etl-api/src/configs/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use etl_config::shared::{BatchConfig, PgConnectionConfig, PipelineConfig};
use etl_config::shared::{BatchConfig, PgConnectionConfig, PipelineConfig, ReplicationSlotConfig};
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

Expand Down Expand Up @@ -26,11 +26,34 @@ pub struct ApiBatchConfig {
pub max_fill_ms: Option<u64>,
}

#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, Default)]
pub struct ApiReplicationSlotConfig {
#[schema(example = false)]
pub temporary: bool,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the API, it's fine to use this interface as of now.

}
Comment on lines +34 to +64
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Add documentation to API types and consider adding Copy derive.

The new API types are missing documentation. Also, ApiReplicationSlotConfig could derive Copy since it only contains a Copy field, matching the internal ReplicationSlotConfig.

📝 Suggested improvements
+/// Determines the persistence behavior of a replication slot in API requests.
 #[derive(Debug, Clone, Copy, Serialize, Deserialize, ToSchema, Default)]
 #[serde(rename_all = "snake_case")]
 pub enum ApiReplicationSlotPersistence {
+    /// Slot persists until explicitly dropped.
     #[default]
     Permanent,
+    /// Slot is automatically dropped when the connection ends.
     Temporary,
 }

 // ... conversions ...

+/// Configuration for the replication slot used by the pipeline in API requests.
-#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, Default)]
+#[derive(Debug, Clone, Copy, Serialize, Deserialize, ToSchema, Default)]
 #[serde(rename_all = "snake_case")]
 pub struct ApiReplicationSlotConfig {
+    /// Controls whether the replication slot persists across connection sessions.
     pub persistence: ApiReplicationSlotPersistence,
 }
🤖 Prompt for AI Agents
In `@etl-api/src/configs/pipeline.rs` around lines 34 - 64, Add missing
documentation comments to the API types and derive Copy where appropriate: add
doc comments for the ApiReplicationSlotPersistence enum (and its variants
Permanent and Temporary) and for the ApiReplicationSlotConfig struct and its
persistence field to explain intent/usage, and update the derives on
ApiReplicationSlotConfig (and optionally ApiReplicationSlotPersistence) to
include Copy so the API types match the internal
ReplicationSlotConfig/ReplicationSlotPersistence semantics; ensure the
referenced symbols are ApiReplicationSlotPersistence, ApiReplicationSlotConfig,
and the persistence field.


impl From<ReplicationSlotConfig> for ApiReplicationSlotConfig {
fn from(value: ReplicationSlotConfig) -> Self {
ApiReplicationSlotConfig {
temporary: value.temporary,
}
}
}

impl From<ApiReplicationSlotConfig> for ReplicationSlotConfig {
fn from(value: ApiReplicationSlotConfig) -> Self {
ReplicationSlotConfig {
temporary: value.temporary,
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct FullApiPipelineConfig {
#[schema(example = "my_publication")]
#[serde(deserialize_with = "crate::utils::trim_string")]
pub publication_name: String,
pub replication_slot: ApiReplicationSlotConfig,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here:

Suggested change
pub replication_slot: ApiReplicationSlotConfig,
#[serde(default)
pub replication_slot: ApiReplicationSlotConfig,

Otherwise if the value is missing, it will crash existing API consumers.

#[serde(skip_serializing_if = "Option::is_none")]
pub batch: Option<ApiBatchConfig>,
#[schema(example = 1000)]
Expand All @@ -49,6 +72,7 @@ impl From<StoredPipelineConfig> for FullApiPipelineConfig {
fn from(value: StoredPipelineConfig) -> Self {
Self {
publication_name: value.publication_name,
replication_slot: value.replication_slot.into(),
batch: Some(ApiBatchConfig {
max_size: Some(value.batch.max_size),
max_fill_ms: Some(value.batch.max_fill_ms),
Expand Down Expand Up @@ -89,6 +113,8 @@ pub struct PartialApiPipelineConfig {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StoredPipelineConfig {
pub publication_name: String,
#[serde(default)]
pub replication_slot: ReplicationSlotConfig,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will crash for existing stored configs.

Suggested change
pub replication_slot: ReplicationSlotConfig,
#[serde(default)]
pub replication_slot: ReplicationSlotConfig,

This has to be paired with the other comment that I left.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

pub batch: BatchConfig,
pub table_error_retry_delay_ms: u64,
#[serde(default = "default_table_error_retry_max_attempts")]
Expand All @@ -106,6 +132,7 @@ impl StoredPipelineConfig {
PipelineConfig {
id: pipeline_id,
publication_name: self.publication_name,
replication_slot: self.replication_slot,
pg_connection: pg_connection_config,
batch: self.batch,
table_error_retry_delay_ms: self.table_error_retry_delay_ms,
Expand Down Expand Up @@ -161,6 +188,7 @@ impl From<FullApiPipelineConfig> for StoredPipelineConfig {

Self {
publication_name: value.publication_name,
replication_slot: value.replication_slot.into(),
batch,
table_error_retry_delay_ms: value
.table_error_retry_delay_ms
Expand All @@ -185,6 +213,7 @@ mod tests {
fn test_stored_pipeline_config_serialization() {
let config = StoredPipelineConfig {
publication_name: "test_publication".to_string(),
replication_slot: ReplicationSlotConfig::default(),
batch: BatchConfig {
max_size: 1000,
max_fill_ms: 5000,
Expand Down Expand Up @@ -223,6 +252,7 @@ mod tests {
table_error_retry_max_attempts: None,
max_table_sync_workers: None,
log_level: Some(LogLevel::Debug),
replication_slot: ApiReplicationSlotConfig::default(),
};

let stored: StoredPipelineConfig = full_config.clone().into();
Expand All @@ -235,6 +265,7 @@ mod tests {
fn test_full_api_pipeline_config_defaults() {
let full_config = FullApiPipelineConfig {
publication_name: "test_publication".to_string(),
replication_slot: ApiReplicationSlotConfig::default(),
batch: None,
table_error_retry_delay_ms: None,
table_error_retry_max_attempts: None,
Expand Down Expand Up @@ -264,6 +295,7 @@ mod tests {
fn test_partial_api_pipeline_config_merge() {
let mut stored = StoredPipelineConfig {
publication_name: "old_publication".to_string(),
replication_slot: ReplicationSlotConfig::default(),
batch: BatchConfig {
max_size: 500,
max_fill_ms: 2000,
Expand Down
5 changes: 3 additions & 2 deletions etl-api/src/k8s/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1052,8 +1052,8 @@ mod tests {
use super::*;

use etl_config::shared::{
BatchConfig, DestinationConfig, PgConnectionConfig, PipelineConfig, ReplicatorConfig,
ReplicatorConfigWithoutSecrets, TlsConfig,
BatchConfig, DestinationConfig, PgConnectionConfig, PipelineConfig, ReplicationSlotConfig,
ReplicatorConfig, ReplicatorConfigWithoutSecrets, TlsConfig,
};
use insta::assert_json_snapshot;

Expand Down Expand Up @@ -1139,6 +1139,7 @@ mod tests {
pipeline: PipelineConfig {
id: 42,
publication_name: "all-pub".to_string(),
replication_slot: ReplicationSlotConfig::default(),
pg_connection: PgConnectionConfig {
host: "localhost".to_string(),
port: 5432,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ expression: config_map_json
"apiVersion": "v1",
"data": {
"base.json": "",
"prod.json": "{\"destination\":{\"big_query\":{\"project_id\":\"project-id\",\"dataset_id\":\"dataset-id\",\"max_concurrent_streams\":4}},\"pipeline\":{\"id\":42,\"publication_name\":\"all-pub\",\"pg_connection\":{\"host\":\"localhost\",\"port\":5432,\"name\":\"postgres\",\"username\":\"postgres\",\"tls\":{\"trusted_root_certs\":\"\",\"enabled\":false}},\"batch\":{\"max_size\":10000,\"max_fill_ms\":1000},\"table_error_retry_delay_ms\":500,\"table_error_retry_max_attempts\":3,\"max_table_sync_workers\":4}}"
"prod.json": "{\"destination\":{\"big_query\":{\"project_id\":\"project-id\",\"dataset_id\":\"dataset-id\",\"max_concurrent_streams\":4}},\"pipeline\":{\"id\":42,\"publication_name\":\"all-pub\",\"replication_slot\":{\"temporary\":false},\"pg_connection\":{\"host\":\"localhost\",\"port\":5432,\"name\":\"postgres\",\"username\":\"postgres\",\"tls\":{\"trusted_root_certs\":\"\",\"enabled\":false}},\"batch\":{\"max_size\":10000,\"max_fill_ms\":1000},\"table_error_retry_delay_ms\":500,\"table_error_retry_max_attempts\":3,\"max_table_sync_workers\":4}}"
},
"kind": "ConfigMap",
"metadata": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ expression: response.config
---
FullApiPipelineConfig {
publication_name: "updated_publication",
replication_slot: ApiReplicationSlotConfig {
temporary: false,
},
batch: Some(
ApiBatchConfig {
max_size: Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ expression: response.config
---
FullApiPipelineConfig {
publication_name: "updated_publication",
replication_slot: ApiReplicationSlotConfig {
temporary: false,
},
batch: Some(
ApiBatchConfig {
max_size: Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ expression: response.config
---
FullApiPipelineConfig {
publication_name: "publication",
replication_slot: ApiReplicationSlotConfig {
temporary: false,
},
batch: Some(
ApiBatchConfig {
max_size: Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ expression: response.config
---
FullApiPipelineConfig {
publication_name: "publication",
replication_slot: ApiReplicationSlotConfig {
temporary: false,
},
batch: Some(
ApiBatchConfig {
max_size: Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ expression: pipeline.config
---
FullApiPipelineConfig {
publication_name: "publication",
replication_slot: ApiReplicationSlotConfig {
temporary: false,
},
batch: Some(
ApiBatchConfig {
max_size: Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ expression: response.config
---
FullApiPipelineConfig {
publication_name: "publication",
replication_slot: ApiReplicationSlotConfig {
temporary: false,
},
batch: Some(
ApiBatchConfig {
max_size: Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ expression: response.config
---
FullApiPipelineConfig {
publication_name: "updated_publication",
replication_slot: ApiReplicationSlotConfig {
temporary: false,
},
batch: Some(
ApiBatchConfig {
max_size: Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ expression: response.config
---
FullApiPipelineConfig {
publication_name: "publication",
replication_slot: ApiReplicationSlotConfig {
temporary: false,
},
batch: Some(
ApiBatchConfig {
max_size: Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ expression: response.config
---
FullApiPipelineConfig {
publication_name: "publication",
replication_slot: ApiReplicationSlotConfig {
temporary: false,
},
batch: Some(
ApiBatchConfig {
max_size: Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ expression: response.config
---
FullApiPipelineConfig {
publication_name: "publication",
replication_slot: ApiReplicationSlotConfig {
temporary: false,
},
batch: Some(
ApiBatchConfig {
max_size: Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ expression: response.config
---
FullApiPipelineConfig {
publication_name: "publication",
replication_slot: ApiReplicationSlotConfig {
temporary: false,
},
batch: Some(
ApiBatchConfig {
max_size: Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ expression: response.config
---
FullApiPipelineConfig {
publication_name: "publication",
replication_slot: ApiReplicationSlotConfig {
temporary: false,
},
batch: Some(
ApiBatchConfig {
max_size: Some(
Expand Down
7 changes: 6 additions & 1 deletion etl-api/tests/support/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,10 @@ pub mod tenants {
/// Pipeline config helpers.
pub mod pipelines {
use super::*;
use etl_api::configs::{log::LogLevel, pipeline::ApiBatchConfig};
use etl_api::configs::{
log::LogLevel,
pipeline::{ApiBatchConfig, ApiReplicationSlotConfig},
};

/// Returns a default pipeline config.
pub fn new_pipeline_config() -> FullApiPipelineConfig {
Expand All @@ -238,6 +241,7 @@ pub mod pipelines {
table_error_retry_max_attempts: Some(5),
max_table_sync_workers: Some(2),
log_level: Some(LogLevel::Info),
replication_slot: ApiReplicationSlotConfig::default(),
}
}

Expand All @@ -253,6 +257,7 @@ pub mod pipelines {
table_error_retry_max_attempts: Some(10),
max_table_sync_workers: Some(4),
log_level: Some(LogLevel::Info),
replication_slot: ApiReplicationSlotConfig::default(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions etl-benchmarks/benches/table_copies.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use clap::{Parser, Subcommand, ValueEnum};
use etl::config::ReplicationSlotConfig;
use etl::destination::Destination;
use etl::error::EtlResult;
use etl::pipeline::Pipeline;
Expand Down Expand Up @@ -327,6 +328,7 @@ async fn start_pipeline(args: RunArgs) -> Result<(), Box<dyn Error>> {
let pipeline_config = PipelineConfig {
id: 1,
publication_name: args.publication_name,
replication_slot: ReplicationSlotConfig::default(),
pg_connection: pg_connection_config,
batch: BatchConfig {
max_size: args.batch_max_size,
Expand Down
12 changes: 11 additions & 1 deletion etl-config/src/shared/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use serde::{Deserialize, Serialize};
use crate::shared::{
PgConnectionConfig, PgConnectionConfigWithoutSecrets, ValidationError, batch::BatchConfig,
};

/// Configuration for an ETL pipeline.
///
/// Contains all settings required to run a replication pipeline including
Expand All @@ -20,6 +19,8 @@ pub struct PipelineConfig {
pub id: u64,
/// Name of the Postgres publication to use for logical replication.
pub publication_name: String,
/// Whether to use a temporary replication slot
pub replication_slot: ReplicationSlotConfig,
Comment on lines +63 to +64
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Add trailing period to documentation comment.

Per coding guidelines, documentation comments should be properly punctuated.

📝 Suggested fix
-    /// Configuration for the replication slot used
+    /// Configuration for the replication slot used.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Configuration for the replication slot used
pub replication_slot: ReplicationSlotConfig,
/// Configuration for the replication slot used.
pub replication_slot: ReplicationSlotConfig,
🤖 Prompt for AI Agents
In `@etl-config/src/shared/pipeline.rs` around lines 63 - 64, The documentation
comment for the struct field replication_slot should end with a period; update
the doc comment above pub replication_slot: ReplicationSlotConfig to read
"Configuration for the replication slot used." ensuring the trailing period is
added to satisfy punctuation guidelines and reference the ReplicationSlotConfig
type in the same comment.

⚠️ Potential issue | 🟠 Major

Add #[serde(default)] for backward compatibility.

The replication_slot field in PipelineConfig is missing #[serde(default)]. When deserializing existing configs that don't have this field, deserialization will fail. This was addressed in StoredPipelineConfig but not here.

🔧 Proposed fix
     /// Configuration for the replication slot used.
+    #[serde(default)]
     pub replication_slot: ReplicationSlotConfig,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Configuration for the replication slot used
pub replication_slot: ReplicationSlotConfig,
/// Configuration for the replication slot used.
#[serde(default)]
pub replication_slot: ReplicationSlotConfig,
🤖 Prompt for AI Agents
In `@etl-config/src/shared/pipeline.rs` around lines 63 - 64, Add serde default
for backward compatibility: annotate the PipelineConfig struct's
replication_slot field with #[serde(default)] so deserializing older configs
that omit replication_slot succeeds; update the replication_slot declaration
(type ReplicationSlotConfig) in PipelineConfig to mirror StoredPipelineConfig's
handling and ensure the default implementation for ReplicationSlotConfig exists
or derive/implement Default if needed.

/// The connection configuration for the Postgres instance to which the pipeline connects for
/// replication.
pub pg_connection: PgConnectionConfig,
Expand Down Expand Up @@ -64,6 +65,8 @@ pub struct PipelineConfigWithoutSecrets {
pub id: u64,
/// Name of the Postgres publication to use for logical replication.
pub publication_name: String,
/// Whether to use a temporary replication slot
pub replication_slot: ReplicationSlotConfig,
Comment on lines +143 to +144
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add #[serde(default)] and trailing period for consistency.

Same issues as PipelineConfig: missing #[serde(default)] for backward compatibility and missing period in documentation.

🔧 Proposed fix
-    /// Configuration for the replication slot used
+    /// Configuration for the replication slot used.
+    #[serde(default)]
     pub replication_slot: ReplicationSlotConfig,
🤖 Prompt for AI Agents
In `@etl-config/src/shared/pipeline.rs` around lines 143 - 144, The doc comment
for the field replication_slot is missing a trailing period and the field lacks
#[serde(default)] for backward compatibility; update the PipelineConfig struct
so the replication_slot field has the #[serde(default)] attribute and terminate
its documentation sentence with a period, referencing the ReplicationSlotConfig
type and the replication_slot field to locate the change.

/// The connection configuration for the Postgres instance to which the pipeline connects for
/// replication.
pub pg_connection: PgConnectionConfigWithoutSecrets,
Expand All @@ -82,6 +85,7 @@ impl From<PipelineConfig> for PipelineConfigWithoutSecrets {
PipelineConfigWithoutSecrets {
id: value.id,
publication_name: value.publication_name,
replication_slot: value.replication_slot,
pg_connection: value.pg_connection.into(),
batch: value.batch,
table_error_retry_delay_ms: value.table_error_retry_delay_ms,
Expand All @@ -90,3 +94,9 @@ impl From<PipelineConfig> for PipelineConfigWithoutSecrets {
}
}
}

#[derive(Clone, Copy, Debug, Deserialize, Serialize, Default)]
#[serde(rename_all = "snake_case")]
pub struct ReplicationSlotConfig {
pub temporary: bool,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried searching online for variants, to keep our format open for the future we could do something like:

Suggested change
pub temporary: bool,
pub persistence: ReplicationSlotPersistence,

And then the ReplicationSlotPersistence could be something like:

enum ReplicationSlotType {
    Permanent,
    Temporary
}

And that could be serialized as an internally tagged enum as snake case and with the identifier named something like type.

The reason why I am suggesting this is that if there is ever the case where persistence changes or we want a slight variation, having a boolean is painful to migrate from. Unfortunately me being overly strict is because we store this config in the middleware db and the migration paths are very tricky.

}
5 changes: 4 additions & 1 deletion etl-examples/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ The pipeline will automatically:
*/

use clap::{Args, Parser};
use etl::config::{BatchConfig, PgConnectionConfig, PipelineConfig, TlsConfig};
use etl::config::{
BatchConfig, PgConnectionConfig, PipelineConfig, ReplicationSlotConfig, TlsConfig,
};
use etl::pipeline::Pipeline;
use etl::store::both::memory::MemoryStore;
use etl_destinations::bigquery::BigQueryDestination;
Expand Down Expand Up @@ -174,6 +176,7 @@ async fn main_impl() -> Result<(), Box<dyn Error>> {
table_error_retry_delay_ms: 10000,
table_error_retry_max_attempts: 5,
max_table_sync_workers: args.bq_args.max_table_sync_workers,
replication_slot: ReplicationSlotConfig::default(),
};

// Initialize BigQuery destination with service account authentication
Expand Down
3 changes: 2 additions & 1 deletion etl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
//!
//! ```rust,no_run
//! use etl::{
//! config::{BatchConfig, PgConnectionConfig, PipelineConfig, TlsConfig},
//! config::{BatchConfig, PgConnectionConfig, PipelineConfig, TlsConfig, ReplicationSlotConfig},
//! destination::memory::MemoryDestination,
//! pipeline::Pipeline,
//! store::both::memory::MemoryStore,
Expand All @@ -76,6 +76,7 @@
//! let config = PipelineConfig {
//! id: 1,
//! publication_name: "my_publication".to_string(),
//! replication_slot: ReplicationSlotConfig::default(),
//! pg_connection: pg_config,
//! batch: BatchConfig { max_size: 1000, max_fill_ms: 5000 },
//! table_error_retry_delay_ms: 10000,
Expand Down
Loading