Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 64 additions & 1 deletion etl-api/src/configs/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use etl_config::shared::{BatchConfig, PgConnectionConfig, PipelineConfig, TableSyncCopyConfig};
use etl::config::ReplicationSlotPersistence;
use etl_config::shared::{
BatchConfig, PgConnectionConfig, PipelineConfig, ReplicationSlotConfig, TableSyncCopyConfig,
};
Comment on lines +1 to +4
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider importing ReplicationSlotPersistence from etl_config::shared for consistency.

ReplicationSlotPersistence is defined in etl_config::shared but imported here via etl::config. While this works due to re-exports, importing directly from the source crate would be more consistent with how ReplicationSlotConfig is imported.

📝 Suggested fix
-use etl::config::ReplicationSlotPersistence;
 use etl_config::shared::{
-    BatchConfig, PgConnectionConfig, PipelineConfig, ReplicationSlotConfig, TableSyncCopyConfig,
+    BatchConfig, PgConnectionConfig, PipelineConfig, ReplicationSlotConfig,
+    ReplicationSlotPersistence, TableSyncCopyConfig,
 };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
use etl::config::ReplicationSlotPersistence;
use etl_config::shared::{
BatchConfig, PgConnectionConfig, PipelineConfig, ReplicationSlotConfig, TableSyncCopyConfig,
};
use etl_config::shared::{
BatchConfig, PgConnectionConfig, PipelineConfig, ReplicationSlotConfig,
ReplicationSlotPersistence, TableSyncCopyConfig,
};
🤖 Prompt for AI Agents
In `@etl-api/src/configs/pipeline.rs` around lines 1 - 4, The import of
ReplicationSlotPersistence is coming from etl::config but should be imported
directly from etl_config::shared for consistency with ReplicationSlotConfig;
update the use statements so ReplicationSlotPersistence is added to the
etl_config::shared import list (alongside BatchConfig, PgConnectionConfig,
PipelineConfig, ReplicationSlotConfig, TableSyncCopyConfig) and remove the
separate etl::config::ReplicationSlotPersistence import (and drop the
etl::config import entirely if it becomes unused).

use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

Expand Down Expand Up @@ -26,6 +29,55 @@ pub struct ApiBatchConfig {
pub max_fill_ms: Option<u64>,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize, ToSchema, Default)]
#[serde(rename_all = "snake_case")]
pub enum ApiReplicationSlotPersistence {
#[default]
Permanent,
Temporary,
}

impl From<ReplicationSlotPersistence> for ApiReplicationSlotPersistence {
fn from(value: ReplicationSlotPersistence) -> Self {
match value {
ReplicationSlotPersistence::Permanent => ApiReplicationSlotPersistence::Permanent,
ReplicationSlotPersistence::Temporary => ApiReplicationSlotPersistence::Temporary,
}
}
}

impl From<ApiReplicationSlotPersistence> for ReplicationSlotPersistence {
fn from(value: ApiReplicationSlotPersistence) -> Self {
match value {
ApiReplicationSlotPersistence::Permanent => ReplicationSlotPersistence::Permanent,
ApiReplicationSlotPersistence::Temporary => ReplicationSlotPersistence::Temporary,
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, Default)]
#[serde(rename_all = "snake_case")]
pub struct ApiReplicationSlotConfig {
#[schema(example = false)]
pub persistence: ApiReplicationSlotPersistence,
}
Comment on lines +34 to +64
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Add documentation to API types and consider adding Copy derive.

The new API types are missing documentation. Also, ApiReplicationSlotConfig could derive Copy since it only contains a Copy field, matching the internal ReplicationSlotConfig.

📝 Suggested improvements
+/// Determines the persistence behavior of a replication slot in API requests.
 #[derive(Debug, Clone, Copy, Serialize, Deserialize, ToSchema, Default)]
 #[serde(rename_all = "snake_case")]
 pub enum ApiReplicationSlotPersistence {
+    /// Slot persists until explicitly dropped.
     #[default]
     Permanent,
+    /// Slot is automatically dropped when the connection ends.
     Temporary,
 }

 // ... conversions ...

+/// Configuration for the replication slot used by the pipeline in API requests.
-#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, Default)]
+#[derive(Debug, Clone, Copy, Serialize, Deserialize, ToSchema, Default)]
 #[serde(rename_all = "snake_case")]
 pub struct ApiReplicationSlotConfig {
+    /// Controls whether the replication slot persists across connection sessions.
     pub persistence: ApiReplicationSlotPersistence,
 }
🤖 Prompt for AI Agents
In `@etl-api/src/configs/pipeline.rs` around lines 34 - 64, Add missing
documentation comments to the API types and derive Copy where appropriate: add
doc comments for the ApiReplicationSlotPersistence enum (and its variants
Permanent and Temporary) and for the ApiReplicationSlotConfig struct and its
persistence field to explain intent/usage, and update the derives on
ApiReplicationSlotConfig (and optionally ApiReplicationSlotPersistence) to
include Copy so the API types match the internal
ReplicationSlotConfig/ReplicationSlotPersistence semantics; ensure the
referenced symbols are ApiReplicationSlotPersistence, ApiReplicationSlotConfig,
and the persistence field.


impl From<ReplicationSlotConfig> for ApiReplicationSlotConfig {
fn from(value: ReplicationSlotConfig) -> Self {
ApiReplicationSlotConfig {
persistence: value.persistence.into(),
}
}
}

impl From<ApiReplicationSlotConfig> for ReplicationSlotConfig {
fn from(value: ApiReplicationSlotConfig) -> Self {
ReplicationSlotConfig {
persistence: value.persistence.into(),
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct FullApiPipelineConfig {
#[schema(example = "my_publication")]
Expand All @@ -44,6 +96,8 @@ pub struct FullApiPipelineConfig {
pub max_table_sync_workers: Option<u16>,
#[serde(skip_serializing_if = "Option::is_none")]
pub table_sync_copy: Option<TableSyncCopyConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub replication_slot: Option<ApiReplicationSlotConfig>,
pub log_level: Option<LogLevel>,
}

Expand All @@ -59,6 +113,7 @@ impl From<StoredPipelineConfig> for FullApiPipelineConfig {
table_error_retry_max_attempts: Some(value.table_error_retry_max_attempts),
max_table_sync_workers: Some(value.max_table_sync_workers),
table_sync_copy: Some(value.table_sync_copy),
replication_slot: Some(value.replication_slot.into()),
log_level: value.log_level,
}
}
Expand Down Expand Up @@ -94,6 +149,8 @@ pub struct PartialApiPipelineConfig {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StoredPipelineConfig {
pub publication_name: String,
#[serde(default)]
pub replication_slot: ReplicationSlotConfig,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will crash for existing stored configs.

Suggested change
pub replication_slot: ReplicationSlotConfig,
#[serde(default)]
pub replication_slot: ReplicationSlotConfig,

This has to be paired with the other comment that I left.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

pub batch: BatchConfig,
pub table_error_retry_delay_ms: u64,
#[serde(default = "default_table_error_retry_max_attempts")]
Expand All @@ -113,6 +170,7 @@ impl StoredPipelineConfig {
PipelineConfig {
id: pipeline_id,
publication_name: self.publication_name,
replication_slot: self.replication_slot,
pg_connection: pg_connection_config,
batch: self.batch,
table_error_retry_delay_ms: self.table_error_retry_delay_ms,
Expand Down Expand Up @@ -173,6 +231,7 @@ impl From<FullApiPipelineConfig> for StoredPipelineConfig {

Self {
publication_name: value.publication_name,
replication_slot: value.replication_slot.unwrap_or_default().into(),
batch,
table_error_retry_delay_ms: value
.table_error_retry_delay_ms
Expand All @@ -198,6 +257,7 @@ mod tests {
fn test_stored_pipeline_config_serialization() {
let config = StoredPipelineConfig {
publication_name: "test_publication".to_string(),
replication_slot: ReplicationSlotConfig::default(),
batch: BatchConfig {
max_size: 1000,
max_fill_ms: 5000,
Expand Down Expand Up @@ -237,6 +297,7 @@ mod tests {
table_error_retry_max_attempts: None,
max_table_sync_workers: None,
table_sync_copy: None,
replication_slot: None,
log_level: Some(LogLevel::Debug),
};

Expand All @@ -255,6 +316,7 @@ mod tests {
table_error_retry_max_attempts: None,
max_table_sync_workers: None,
table_sync_copy: None,
replication_slot: None,
log_level: None,
};

Expand All @@ -280,6 +342,7 @@ mod tests {
fn test_partial_api_pipeline_config_merge() {
let mut stored = StoredPipelineConfig {
publication_name: "old_publication".to_string(),
replication_slot: ReplicationSlotConfig::default(),
batch: BatchConfig {
max_size: 500,
max_fill_ms: 2000,
Expand Down
5 changes: 3 additions & 2 deletions etl-api/src/k8s/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1052,8 +1052,8 @@ mod tests {
use super::*;

use etl_config::shared::{
BatchConfig, DestinationConfig, PgConnectionConfig, PipelineConfig, ReplicatorConfig,
ReplicatorConfigWithoutSecrets, TableSyncCopyConfig, TlsConfig,
BatchConfig, DestinationConfig, PgConnectionConfig, PipelineConfig, ReplicationSlotConfig,
ReplicatorConfig, ReplicatorConfigWithoutSecrets, TableSyncCopyConfig, TlsConfig,
};
use insta::assert_json_snapshot;

Expand Down Expand Up @@ -1139,6 +1139,7 @@ mod tests {
pipeline: PipelineConfig {
id: 42,
publication_name: "all-pub".to_string(),
replication_slot: ReplicationSlotConfig::default(),
pg_connection: PgConnectionConfig {
host: "localhost".to_string(),
port: 5432,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ expression: config_map_json
"apiVersion": "v1",
"data": {
"base.json": "",
"prod.json": "{\"destination\":{\"big_query\":{\"project_id\":\"project-id\",\"dataset_id\":\"dataset-id\",\"max_concurrent_streams\":4}},\"pipeline\":{\"id\":42,\"publication_name\":\"all-pub\",\"pg_connection\":{\"host\":\"localhost\",\"port\":5432,\"name\":\"postgres\",\"username\":\"postgres\",\"tls\":{\"trusted_root_certs\":\"\",\"enabled\":false}},\"batch\":{\"max_size\":10000,\"max_fill_ms\":1000},\"table_error_retry_delay_ms\":500,\"table_error_retry_max_attempts\":3,\"max_table_sync_workers\":4,\"table_sync_copy\":{\"type\":\"include_all_tables\"}}}"
"prod.json": "{\"destination\":{\"big_query\":{\"project_id\":\"project-id\",\"dataset_id\":\"dataset-id\",\"max_concurrent_streams\":4}},\"pipeline\":{\"id\":42,\"publication_name\":\"all-pub\",\"replication_slot\":{\"persistence\":\"permanent\"},\"pg_connection\":{\"host\":\"localhost\",\"port\":5432,\"name\":\"postgres\",\"username\":\"postgres\",\"tls\":{\"trusted_root_certs\":\"\",\"enabled\":false}},\"batch\":{\"max_size\":10000,\"max_fill_ms\":1000},\"table_error_retry_delay_ms\":500,\"table_error_retry_max_attempts\":3,\"max_table_sync_workers\":4,\"table_sync_copy\":{\"type\":\"include_all_tables\"}}}"
},
"kind": "ConfigMap",
"metadata": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ FullApiPipelineConfig {
table_sync_copy: Some(
IncludeAllTables,
),
replication_slot: Some(
ApiReplicationSlotConfig {
persistence: Permanent,
},
),
log_level: Some(
Info,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ FullApiPipelineConfig {
table_sync_copy: Some(
IncludeAllTables,
),
replication_slot: Some(
ApiReplicationSlotConfig {
persistence: Permanent,
},
),
log_level: Some(
Info,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ FullApiPipelineConfig {
table_sync_copy: Some(
IncludeAllTables,
),
replication_slot: Some(
ApiReplicationSlotConfig {
persistence: Permanent,
},
),
log_level: Some(
Info,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ FullApiPipelineConfig {
table_sync_copy: Some(
IncludeAllTables,
),
replication_slot: Some(
ApiReplicationSlotConfig {
persistence: Permanent,
},
),
log_level: Some(
Info,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ FullApiPipelineConfig {
table_sync_copy: Some(
IncludeAllTables,
),
replication_slot: Some(
ApiReplicationSlotConfig {
persistence: Permanent,
},
),
log_level: Some(
Info,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ FullApiPipelineConfig {
table_sync_copy: Some(
IncludeAllTables,
),
replication_slot: Some(
ApiReplicationSlotConfig {
persistence: Permanent,
},
),
log_level: Some(
Info,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ FullApiPipelineConfig {
table_sync_copy: Some(
IncludeAllTables,
),
replication_slot: Some(
ApiReplicationSlotConfig {
persistence: Permanent,
},
),
log_level: Some(
Info,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,10 @@ FullApiPipelineConfig {
table_sync_copy: Some(
IncludeAllTables,
),
replication_slot: Some(
ApiReplicationSlotConfig {
persistence: Permanent,
},
),
log_level: None,
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,10 @@ FullApiPipelineConfig {
table_sync_copy: Some(
IncludeAllTables,
),
replication_slot: Some(
ApiReplicationSlotConfig {
persistence: Permanent,
},
),
log_level: None,
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,10 @@ FullApiPipelineConfig {
table_sync_copy: Some(
IncludeAllTables,
),
replication_slot: Some(
ApiReplicationSlotConfig {
persistence: Permanent,
},
),
log_level: None,
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ FullApiPipelineConfig {
table_sync_copy: Some(
IncludeAllTables,
),
replication_slot: Some(
ApiReplicationSlotConfig {
persistence: Permanent,
},
),
log_level: Some(
Debug,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,10 @@ FullApiPipelineConfig {
table_sync_copy: Some(
IncludeAllTables,
),
replication_slot: Some(
ApiReplicationSlotConfig {
persistence: Permanent,
},
),
log_level: None,
}
7 changes: 6 additions & 1 deletion etl-api/tests/support/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,10 @@ pub mod tenants {
/// Pipeline config helpers.
pub mod pipelines {
use super::*;
use etl_api::configs::{log::LogLevel, pipeline::ApiBatchConfig};
use etl_api::configs::{
log::LogLevel,
pipeline::{ApiBatchConfig, ApiReplicationSlotConfig},
};
use etl_config::shared::TableSyncCopyConfig;

/// Returns a default pipeline config.
Expand All @@ -240,6 +243,7 @@ pub mod pipelines {
max_table_sync_workers: Some(2),
table_sync_copy: Some(TableSyncCopyConfig::IncludeAllTables),
log_level: Some(LogLevel::Info),
replication_slot: Some(ApiReplicationSlotConfig::default()),
}
}

Expand All @@ -256,6 +260,7 @@ pub mod pipelines {
max_table_sync_workers: Some(4),
table_sync_copy: Some(TableSyncCopyConfig::IncludeAllTables),
log_level: Some(LogLevel::Info),
replication_slot: Some(ApiReplicationSlotConfig::default()),
}
}

Expand Down
1 change: 1 addition & 0 deletions etl-api/tests/validators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ fn create_pipeline_config(publication_name: &str) -> FullApiPipelineConfig {
table_error_retry_delay_ms: None,
table_error_retry_max_attempts: None,
table_sync_copy: None,
replication_slot: None,
}
}

Expand Down
2 changes: 2 additions & 0 deletions etl-benchmarks/benches/table_copies.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use clap::{Parser, Subcommand, ValueEnum};
use etl::config::ReplicationSlotConfig;
use etl::destination::Destination;
use etl::error::EtlResult;
use etl::pipeline::Pipeline;
Expand Down Expand Up @@ -344,6 +345,7 @@ async fn start_pipeline(args: RunArgs) -> Result<(), Box<dyn Error>> {
let pipeline_config = PipelineConfig {
id: 1,
publication_name: args.publication_name,
replication_slot: ReplicationSlotConfig::default(),
pg_connection: pg_connection_config,
batch: BatchConfig {
max_size: args.batch_max_size,
Expand Down
Loading
Loading