Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion etl-api/src/configs/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use etl_config::shared::{BatchConfig, PgConnectionConfig, PipelineConfig};
use etl_config::shared::{BatchConfig, PgConnectionConfig, PipelineConfig, ReplicationSlotConfig};
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

Expand Down Expand Up @@ -31,6 +31,7 @@ pub struct FullApiPipelineConfig {
#[schema(example = "my_publication")]
#[serde(deserialize_with = "crate::utils::trim_string")]
pub publication_name: String,
pub temporary_replication_slot: bool,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there other kind of replication slots? Asking since having a boolean might give us less flexibility in the future to support other replication slots types.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm as per the Postgres docs (here) there are some other knobs that can be turned when creating the replication slots:

CREATE_REPLICATION_SLOT slot_name [ TEMPORARY ] { PHYSICAL | LOGICAL output_plugin } [ ( option [, ...] ) ] 
Create a physical or logical replication slot. See [Section 26.2.6](https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS) for more about replication slots.

slot_name
The name of the slot to create. Must be a valid replication slot name (see [Section 26.2.6.1](https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION)).

output_plugin
The name of the output plugin used for logical decoding (see [Section 47.6](https://www.postgresql.org/docs/current/logicaldecoding-output-plugin.html)).

TEMPORARY
Specify that this replication slot is a temporary one. Temporary slots are not saved to disk and are automatically dropped on error or when the session has finished.

The following options are supported:

TWO_PHASE [ boolean ]
If true, this logical replication slot supports decoding of two-phase commit. With this option, commands related to two-phase commit such as PREPARE TRANSACTION, COMMIT PREPARED and ROLLBACK PREPARED are decoded and transmitted. The transaction will be decoded and transmitted at PREPARE TRANSACTION time. The default is false.

RESERVE_WAL [ boolean ]
If true, this physical replication slot reserves WAL immediately. Otherwise, WAL is only reserved upon connection from a streaming replication client. The default is false.

SNAPSHOT { 'export' | 'use' | 'nothing' }
Decides what to do with the snapshot created during logical slot initialization. 'export', which is the default, will export the snapshot for use in other sessions. This option can't be used inside a transaction. 'use' will use the snapshot for the current transaction executing the command. This option must be used in a transaction, and CREATE_REPLICATION_SLOT must be the first command run in that transaction. Finally, 'nothing' will just use the snapshot for logical decoding as normal but won't do anything else with it.

FAILOVER [ boolean ]
If true, the slot is enabled to be synced to the standbys so that logical replication can be resumed after failover. The default is false.

I would guess that etl will limit itself to logical replication, so the physical replication is irrelevant. Some of the others might be relevant, such as FAILOVER (to enable clean failover if someone is running a high-availability postgres cluster using e.g., cloudnative-pg). But iirc the etl currently couldn't support this without adding some kind of reconnect logic anyhow.

But I agree with your point! It could make sense to make it more future-proof wrt. other existing (or future) replication slot options later, by changing the ReplicationSlotConfig to something like:

pub struct ReplicationSlotConfig {
    // Making this private has the benefit of forcing creation through the Default trait. If we combine it with some
    // builder methods we prevent future additions from being a breaking change. But could also just be pub
    temporary: bool,
    // extend with other attributes as needed in the future, e.g.,
    // failover: bool
}

impl Default for ReplicationSlotConfig { /* ... */ }

impl ReplicationSlotConfig { /* ... */ }

#[serde(skip_serializing_if = "Option::is_none")]
pub batch: Option<ApiBatchConfig>,
#[schema(example = 1000)]
Expand All @@ -49,6 +50,10 @@ impl From<StoredPipelineConfig> for FullApiPipelineConfig {
fn from(value: StoredPipelineConfig) -> Self {
Self {
publication_name: value.publication_name,
temporary_replication_slot: match value.replication_slot {
ReplicationSlotConfig::Temporary => true,
ReplicationSlotConfig::Permanent => false,
},
batch: Some(ApiBatchConfig {
max_size: Some(value.batch.max_size),
max_fill_ms: Some(value.batch.max_fill_ms),
Expand Down Expand Up @@ -89,6 +94,7 @@ pub struct PartialApiPipelineConfig {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StoredPipelineConfig {
pub publication_name: String,
pub replication_slot: ReplicationSlotConfig,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will crash for existing stored configs.

Suggested change
pub replication_slot: ReplicationSlotConfig,
#[serde(default)]
pub replication_slot: ReplicationSlotConfig,

This has to be paired with the other comment that I left.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

pub batch: BatchConfig,
pub table_error_retry_delay_ms: u64,
#[serde(default = "default_table_error_retry_max_attempts")]
Expand All @@ -106,6 +112,7 @@ impl StoredPipelineConfig {
PipelineConfig {
id: pipeline_id,
publication_name: self.publication_name,
replication_slot: self.replication_slot,
pg_connection: pg_connection_config,
batch: self.batch,
table_error_retry_delay_ms: self.table_error_retry_delay_ms,
Expand Down Expand Up @@ -161,6 +168,11 @@ impl From<FullApiPipelineConfig> for StoredPipelineConfig {

Self {
publication_name: value.publication_name,
replication_slot: if value.temporary_replication_slot {
ReplicationSlotConfig::Temporary
} else {
ReplicationSlotConfig::Permanent
},
batch,
table_error_retry_delay_ms: value
.table_error_retry_delay_ms
Expand All @@ -185,6 +197,7 @@ mod tests {
fn test_stored_pipeline_config_serialization() {
let config = StoredPipelineConfig {
publication_name: "test_publication".to_string(),
replication_slot: ReplicationSlotConfig::Permanent,
batch: BatchConfig {
max_size: 1000,
max_fill_ms: 5000,
Expand Down Expand Up @@ -223,6 +236,7 @@ mod tests {
table_error_retry_max_attempts: None,
max_table_sync_workers: None,
log_level: Some(LogLevel::Debug),
temporary_replication_slot: false,
};

let stored: StoredPipelineConfig = full_config.clone().into();
Expand All @@ -235,6 +249,7 @@ mod tests {
fn test_full_api_pipeline_config_defaults() {
let full_config = FullApiPipelineConfig {
publication_name: "test_publication".to_string(),
temporary_replication_slot: false,
batch: None,
table_error_retry_delay_ms: None,
table_error_retry_max_attempts: None,
Expand Down Expand Up @@ -264,6 +279,7 @@ mod tests {
fn test_partial_api_pipeline_config_merge() {
let mut stored = StoredPipelineConfig {
publication_name: "old_publication".to_string(),
replication_slot: ReplicationSlotConfig::Permanent,
batch: BatchConfig {
max_size: 500,
max_fill_ms: 2000,
Expand Down
5 changes: 3 additions & 2 deletions etl-api/src/k8s/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1052,8 +1052,8 @@ mod tests {
use super::*;

use etl_config::shared::{
BatchConfig, DestinationConfig, PgConnectionConfig, PipelineConfig, ReplicatorConfig,
ReplicatorConfigWithoutSecrets, TlsConfig,
BatchConfig, DestinationConfig, PgConnectionConfig, PipelineConfig, ReplicationSlotConfig,
ReplicatorConfig, ReplicatorConfigWithoutSecrets, TlsConfig,
};
use insta::assert_json_snapshot;

Expand Down Expand Up @@ -1139,6 +1139,7 @@ mod tests {
pipeline: PipelineConfig {
id: 42,
publication_name: "all-pub".to_string(),
replication_slot: ReplicationSlotConfig::Permanent,
pg_connection: PgConnectionConfig {
host: "localhost".to_string(),
port: 5432,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ expression: config_map_json
"apiVersion": "v1",
"data": {
"base.json": "",
"prod.json": "{\"destination\":{\"big_query\":{\"project_id\":\"project-id\",\"dataset_id\":\"dataset-id\",\"max_concurrent_streams\":4}},\"pipeline\":{\"id\":42,\"publication_name\":\"all-pub\",\"pg_connection\":{\"host\":\"localhost\",\"port\":5432,\"name\":\"postgres\",\"username\":\"postgres\",\"tls\":{\"trusted_root_certs\":\"\",\"enabled\":false}},\"batch\":{\"max_size\":10000,\"max_fill_ms\":1000},\"table_error_retry_delay_ms\":500,\"table_error_retry_max_attempts\":3,\"max_table_sync_workers\":4}}"
"prod.json": "{\"destination\":{\"big_query\":{\"project_id\":\"project-id\",\"dataset_id\":\"dataset-id\",\"max_concurrent_streams\":4}},\"pipeline\":{\"id\":42,\"publication_name\":\"all-pub\",\"replication_slot\":\"Permanent\",\"pg_connection\":{\"host\":\"localhost\",\"port\":5432,\"name\":\"postgres\",\"username\":\"postgres\",\"tls\":{\"trusted_root_certs\":\"\",\"enabled\":false}},\"batch\":{\"max_size\":10000,\"max_fill_ms\":1000},\"table_error_retry_delay_ms\":500,\"table_error_retry_max_attempts\":3,\"max_table_sync_workers\":4}}"
},
"kind": "ConfigMap",
"metadata": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ expression: response.config
---
FullApiPipelineConfig {
publication_name: "updated_publication",
temporary_replication_slot: false,
batch: Some(
ApiBatchConfig {
max_size: Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ expression: response.config
---
FullApiPipelineConfig {
publication_name: "updated_publication",
temporary_replication_slot: false,
batch: Some(
ApiBatchConfig {
max_size: Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ expression: response.config
---
FullApiPipelineConfig {
publication_name: "publication",
temporary_replication_slot: false,
batch: Some(
ApiBatchConfig {
max_size: Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ expression: response.config
---
FullApiPipelineConfig {
publication_name: "publication",
temporary_replication_slot: false,
batch: Some(
ApiBatchConfig {
max_size: Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ expression: pipeline.config
---
FullApiPipelineConfig {
publication_name: "publication",
temporary_replication_slot: false,
batch: Some(
ApiBatchConfig {
max_size: Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ expression: response.config
---
FullApiPipelineConfig {
publication_name: "publication",
temporary_replication_slot: false,
batch: Some(
ApiBatchConfig {
max_size: Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ expression: response.config
---
FullApiPipelineConfig {
publication_name: "updated_publication",
temporary_replication_slot: false,
batch: Some(
ApiBatchConfig {
max_size: Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ expression: response.config
---
FullApiPipelineConfig {
publication_name: "publication",
temporary_replication_slot: false,
batch: Some(
ApiBatchConfig {
max_size: Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ expression: response.config
---
FullApiPipelineConfig {
publication_name: "publication",
temporary_replication_slot: false,
batch: Some(
ApiBatchConfig {
max_size: Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ expression: response.config
---
FullApiPipelineConfig {
publication_name: "publication",
temporary_replication_slot: false,
batch: Some(
ApiBatchConfig {
max_size: Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ expression: response.config
---
FullApiPipelineConfig {
publication_name: "publication",
temporary_replication_slot: false,
batch: Some(
ApiBatchConfig {
max_size: Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ expression: response.config
---
FullApiPipelineConfig {
publication_name: "publication",
temporary_replication_slot: false,
batch: Some(
ApiBatchConfig {
max_size: Some(
Expand Down
2 changes: 2 additions & 0 deletions etl-api/tests/support/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ pub mod pipelines {
table_error_retry_max_attempts: Some(5),
max_table_sync_workers: Some(2),
log_level: Some(LogLevel::Info),
temporary_replication_slot: false,
}
}

Expand All @@ -253,6 +254,7 @@ pub mod pipelines {
table_error_retry_max_attempts: Some(10),
max_table_sync_workers: Some(4),
log_level: Some(LogLevel::Info),
temporary_replication_slot: false,
}
}

Expand Down
2 changes: 2 additions & 0 deletions etl-benchmarks/benches/table_copies.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use clap::{Parser, Subcommand, ValueEnum};
use etl::config::ReplicationSlotConfig;
use etl::destination::Destination;
use etl::error::EtlResult;
use etl::pipeline::Pipeline;
Expand Down Expand Up @@ -327,6 +328,7 @@ async fn start_pipeline(args: RunArgs) -> Result<(), Box<dyn Error>> {
let pipeline_config = PipelineConfig {
id: 1,
publication_name: args.publication_name,
replication_slot: ReplicationSlotConfig::Permanent,
pg_connection: pg_connection_config,
batch: BatchConfig {
max_size: args.batch_max_size,
Expand Down
12 changes: 11 additions & 1 deletion etl-config/src/shared/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use serde::{Deserialize, Serialize};
use crate::shared::{
PgConnectionConfig, PgConnectionConfigWithoutSecrets, ValidationError, batch::BatchConfig,
};

/// Configuration for an ETL pipeline.
///
/// Contains all settings required to run a replication pipeline including
Expand All @@ -20,6 +19,8 @@ pub struct PipelineConfig {
pub id: u64,
/// Name of the Postgres publication to use for logical replication.
pub publication_name: String,
/// Whether to use a temporary replication slot
pub replication_slot: ReplicationSlotConfig,
Comment on lines +63 to +64
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Add trailing period to documentation comment.

Per coding guidelines, documentation comments should be properly punctuated.

📝 Suggested fix
-    /// Configuration for the replication slot used
+    /// Configuration for the replication slot used.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Configuration for the replication slot used
pub replication_slot: ReplicationSlotConfig,
/// Configuration for the replication slot used.
pub replication_slot: ReplicationSlotConfig,
🤖 Prompt for AI Agents
In `@etl-config/src/shared/pipeline.rs` around lines 63 - 64, The documentation
comment for the struct field replication_slot should end with a period; update
the doc comment above pub replication_slot: ReplicationSlotConfig to read
"Configuration for the replication slot used." ensuring the trailing period is
added to satisfy punctuation guidelines and reference the ReplicationSlotConfig
type in the same comment.

⚠️ Potential issue | 🟠 Major

Add #[serde(default)] for backward compatibility.

The replication_slot field in PipelineConfig is missing #[serde(default)]. When deserializing existing configs that don't have this field, deserialization will fail. This was addressed in StoredPipelineConfig but not here.

🔧 Proposed fix
     /// Configuration for the replication slot used.
+    #[serde(default)]
     pub replication_slot: ReplicationSlotConfig,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Configuration for the replication slot used
pub replication_slot: ReplicationSlotConfig,
/// Configuration for the replication slot used.
#[serde(default)]
pub replication_slot: ReplicationSlotConfig,
🤖 Prompt for AI Agents
In `@etl-config/src/shared/pipeline.rs` around lines 63 - 64, Add serde default
for backward compatibility: annotate the PipelineConfig struct's
replication_slot field with #[serde(default)] so deserializing older configs
that omit replication_slot succeeds; update the replication_slot declaration
(type ReplicationSlotConfig) in PipelineConfig to mirror StoredPipelineConfig's
handling and ensure the default implementation for ReplicationSlotConfig exists
or derive/implement Default if needed.

/// The connection configuration for the Postgres instance to which the pipeline connects for
/// replication.
pub pg_connection: PgConnectionConfig,
Expand Down Expand Up @@ -64,6 +65,8 @@ pub struct PipelineConfigWithoutSecrets {
pub id: u64,
/// Name of the Postgres publication to use for logical replication.
pub publication_name: String,
/// Whether to use a temporary replication slot
pub replication_slot: ReplicationSlotConfig,
Comment on lines +143 to +144
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add #[serde(default)] and trailing period for consistency.

Same issues as PipelineConfig: missing #[serde(default)] for backward compatibility and missing period in documentation.

🔧 Proposed fix
-    /// Configuration for the replication slot used
+    /// Configuration for the replication slot used.
+    #[serde(default)]
     pub replication_slot: ReplicationSlotConfig,
🤖 Prompt for AI Agents
In `@etl-config/src/shared/pipeline.rs` around lines 143 - 144, The doc comment
for the field replication_slot is missing a trailing period and the field lacks
#[serde(default)] for backward compatibility; update the PipelineConfig struct
so the replication_slot field has the #[serde(default)] attribute and terminate
its documentation sentence with a period, referencing the ReplicationSlotConfig
type and the replication_slot field to locate the change.

/// The connection configuration for the Postgres instance to which the pipeline connects for
/// replication.
pub pg_connection: PgConnectionConfigWithoutSecrets,
Expand All @@ -82,6 +85,7 @@ impl From<PipelineConfig> for PipelineConfigWithoutSecrets {
PipelineConfigWithoutSecrets {
id: value.id,
publication_name: value.publication_name,
replication_slot: value.replication_slot,
pg_connection: value.pg_connection.into(),
batch: value.batch,
table_error_retry_delay_ms: value.table_error_retry_delay_ms,
Expand All @@ -90,3 +94,9 @@ impl From<PipelineConfig> for PipelineConfigWithoutSecrets {
}
}
}

#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
pub enum ReplicationSlotConfig {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
pub enum ReplicationSlotConfig {
#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ReplicationSlotConfig {

Temporary,
Permanent,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To main backward compatibility we should set the default to be Permanent so that if we read a config from the db, we default to Permanent.

}
5 changes: 4 additions & 1 deletion etl-examples/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ The pipeline will automatically:
*/

use clap::{Args, Parser};
use etl::config::{BatchConfig, PgConnectionConfig, PipelineConfig, TlsConfig};
use etl::config::{
BatchConfig, PgConnectionConfig, PipelineConfig, ReplicationSlotConfig, TlsConfig,
};
use etl::pipeline::Pipeline;
use etl::store::both::memory::MemoryStore;
use etl_destinations::bigquery::BigQueryDestination;
Expand Down Expand Up @@ -174,6 +176,7 @@ async fn main_impl() -> Result<(), Box<dyn Error>> {
table_error_retry_delay_ms: 10000,
table_error_retry_max_attempts: 5,
max_table_sync_workers: args.bq_args.max_table_sync_workers,
replication_slot: ReplicationSlotConfig::Permanent,
};

// Initialize BigQuery destination with service account authentication
Expand Down
3 changes: 2 additions & 1 deletion etl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
//!
//! ```rust,no_run
//! use etl::{
//! config::{BatchConfig, PgConnectionConfig, PipelineConfig, TlsConfig},
//! config::{BatchConfig, PgConnectionConfig, PipelineConfig, TlsConfig, ReplicationSlotConfig},
//! destination::memory::MemoryDestination,
//! pipeline::Pipeline,
//! store::both::memory::MemoryStore,
Expand All @@ -76,6 +76,7 @@
//! let config = PipelineConfig {
//! id: 1,
//! publication_name: "my_publication".to_string(),
//! replication_slot: ReplicationSlotConfig::Permanent,
//! pg_connection: pg_config,
//! batch: BatchConfig { max_size: 1000, max_fill_ms: 5000 },
//! table_error_retry_delay_ms: 10000,
Expand Down
Loading