-
-
Notifications
You must be signed in to change notification settings - Fork 154
feat(replication): Add support for TEMPORARY replication slots
#515
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
9be5ab4
fc583aa
4d04b0a
8ffbf21
ee3533b
22990a4
3138108
548b6a1
dc6f1f2
8fabadb
dd7c2ad
1b1f8aa
69cbbd5
c9d0326
9b7d2a3
c4d696c
c70cf69
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
| @@ -1,4 +1,7 @@ | ||||||||
| use etl_config::shared::{BatchConfig, PgConnectionConfig, PipelineConfig, TableSyncCopyConfig}; | ||||||||
| use etl::config::ReplicationSlotPersistence; | ||||||||
| use etl_config::shared::{ | ||||||||
| BatchConfig, PgConnectionConfig, PipelineConfig, ReplicationSlotConfig, TableSyncCopyConfig, | ||||||||
| }; | ||||||||
| use serde::{Deserialize, Serialize}; | ||||||||
| use utoipa::ToSchema; | ||||||||
|
|
||||||||
|
|
@@ -28,6 +31,54 @@ pub struct ApiBatchConfig { | |||||||
| pub max_fill_ms: Option<u64>, | ||||||||
| } | ||||||||
|
|
||||||||
| #[derive(Debug, Clone, Copy, Serialize, Deserialize, ToSchema, Default)] | ||||||||
| #[serde(rename_all = "snake_case")] | ||||||||
| pub enum ApiReplicationSlotPersistence { | ||||||||
| #[default] | ||||||||
| Permanent, | ||||||||
| Temporary, | ||||||||
| } | ||||||||
|
|
||||||||
| impl From<ReplicationSlotPersistence> for ApiReplicationSlotPersistence { | ||||||||
| fn from(value: ReplicationSlotPersistence) -> Self { | ||||||||
| match value { | ||||||||
| ReplicationSlotPersistence::Permanent => ApiReplicationSlotPersistence::Permanent, | ||||||||
| ReplicationSlotPersistence::Temporary => ApiReplicationSlotPersistence::Temporary, | ||||||||
| } | ||||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
| impl From<ApiReplicationSlotPersistence> for ReplicationSlotPersistence { | ||||||||
| fn from(value: ApiReplicationSlotPersistence) -> Self { | ||||||||
| match value { | ||||||||
| ApiReplicationSlotPersistence::Permanent => ReplicationSlotPersistence::Permanent, | ||||||||
| ApiReplicationSlotPersistence::Temporary => ReplicationSlotPersistence::Temporary, | ||||||||
| } | ||||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
| #[derive(Debug, Clone, Serialize, Deserialize, ToSchema, Default)] | ||||||||
| #[serde(rename_all = "snake_case")] | ||||||||
| pub struct ApiReplicationSlotConfig { | ||||||||
| pub persistence: ApiReplicationSlotPersistence, | ||||||||
| } | ||||||||
|
Comment on lines
+34
to
+64
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial Add documentation to API types and consider adding The new API types are missing documentation. Also, 📝 Suggested improvements+/// Determines the persistence behavior of a replication slot in API requests.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, ToSchema, Default)]
#[serde(rename_all = "snake_case")]
pub enum ApiReplicationSlotPersistence {
+ /// Slot persists until explicitly dropped.
#[default]
Permanent,
+ /// Slot is automatically dropped when the connection ends.
Temporary,
}
// ... conversions ...
+/// Configuration for the replication slot used by the pipeline in API requests.
-#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, Default)]
+#[derive(Debug, Clone, Copy, Serialize, Deserialize, ToSchema, Default)]
#[serde(rename_all = "snake_case")]
pub struct ApiReplicationSlotConfig {
+ /// Controls whether the replication slot persists across connection sessions.
pub persistence: ApiReplicationSlotPersistence,
}🤖 Prompt for AI Agents |
||||||||
|
|
||||||||
| impl From<ReplicationSlotConfig> for ApiReplicationSlotConfig { | ||||||||
| fn from(value: ReplicationSlotConfig) -> Self { | ||||||||
| ApiReplicationSlotConfig { | ||||||||
| persistence: value.persistence.into(), | ||||||||
| } | ||||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
| impl From<ApiReplicationSlotConfig> for ReplicationSlotConfig { | ||||||||
| fn from(value: ApiReplicationSlotConfig) -> Self { | ||||||||
| ReplicationSlotConfig { | ||||||||
| persistence: value.persistence.into(), | ||||||||
| } | ||||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
| #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] | ||||||||
| pub struct FullApiPipelineConfig { | ||||||||
| #[schema(example = "my_publication")] | ||||||||
|
|
@@ -46,6 +97,8 @@ pub struct FullApiPipelineConfig { | |||||||
| pub max_table_sync_workers: Option<u16>, | ||||||||
| #[serde(skip_serializing_if = "Option::is_none")] | ||||||||
| pub table_sync_copy: Option<TableSyncCopyConfig>, | ||||||||
| #[serde(skip_serializing_if = "Option::is_none")] | ||||||||
| pub replication_slot: Option<ApiReplicationSlotConfig>, | ||||||||
| pub log_level: Option<LogLevel>, | ||||||||
| } | ||||||||
|
|
||||||||
|
|
@@ -61,6 +114,7 @@ impl From<StoredPipelineConfig> for FullApiPipelineConfig { | |||||||
| table_error_retry_max_attempts: Some(value.table_error_retry_max_attempts), | ||||||||
| max_table_sync_workers: Some(value.max_table_sync_workers), | ||||||||
| table_sync_copy: Some(value.table_sync_copy), | ||||||||
| replication_slot: Some(value.replication_slot.into()), | ||||||||
| log_level: value.log_level, | ||||||||
| } | ||||||||
| } | ||||||||
|
|
@@ -97,6 +151,7 @@ pub struct PartialApiPipelineConfig { | |||||||
| pub struct StoredPipelineConfig { | ||||||||
| pub publication_name: String, | ||||||||
| #[serde(default)] | ||||||||
| pub replication_slot: ReplicationSlotConfig, | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will crash for existing stored configs.
Suggested change
This has to be paired with the other comment that I left.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added |
||||||||
| pub batch: BatchConfig, | ||||||||
| #[serde(default = "default_table_error_retry_delay_ms")] | ||||||||
| pub table_error_retry_delay_ms: u64, | ||||||||
|
|
@@ -118,6 +173,7 @@ impl StoredPipelineConfig { | |||||||
| PipelineConfig { | ||||||||
| id: pipeline_id, | ||||||||
| publication_name: self.publication_name, | ||||||||
| replication_slot: self.replication_slot, | ||||||||
| pg_connection: pg_connection_config, | ||||||||
| batch: self.batch, | ||||||||
| table_error_retry_delay_ms: self.table_error_retry_delay_ms, | ||||||||
|
|
@@ -178,6 +234,7 @@ impl From<FullApiPipelineConfig> for StoredPipelineConfig { | |||||||
|
|
||||||||
| Self { | ||||||||
| publication_name: value.publication_name, | ||||||||
| replication_slot: value.replication_slot.unwrap_or_default().into(), | ||||||||
| batch, | ||||||||
| table_error_retry_delay_ms: value | ||||||||
| .table_error_retry_delay_ms | ||||||||
|
|
@@ -203,6 +260,7 @@ mod tests { | |||||||
| fn test_stored_pipeline_config_serialization() { | ||||||||
| let config = StoredPipelineConfig { | ||||||||
| publication_name: "test_publication".to_string(), | ||||||||
| replication_slot: ReplicationSlotConfig::default(), | ||||||||
| batch: BatchConfig { | ||||||||
| max_size: 1000, | ||||||||
| max_fill_ms: 5000, | ||||||||
|
|
@@ -242,6 +300,7 @@ mod tests { | |||||||
| table_error_retry_max_attempts: None, | ||||||||
| max_table_sync_workers: None, | ||||||||
| table_sync_copy: None, | ||||||||
| replication_slot: None, | ||||||||
| log_level: Some(LogLevel::Debug), | ||||||||
| }; | ||||||||
|
|
||||||||
|
|
@@ -260,6 +319,7 @@ mod tests { | |||||||
| table_error_retry_max_attempts: None, | ||||||||
| max_table_sync_workers: None, | ||||||||
| table_sync_copy: None, | ||||||||
| replication_slot: None, | ||||||||
| log_level: None, | ||||||||
| }; | ||||||||
|
|
||||||||
|
|
@@ -285,6 +345,7 @@ mod tests { | |||||||
| fn test_partial_api_pipeline_config_merge() { | ||||||||
| let mut stored = StoredPipelineConfig { | ||||||||
| publication_name: "old_publication".to_string(), | ||||||||
| replication_slot: ReplicationSlotConfig::default(), | ||||||||
| batch: BatchConfig { | ||||||||
| max_size: 500, | ||||||||
| max_fill_ms: 2000, | ||||||||
|
|
||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Consider importing
ReplicationSlotPersistencefrometl_config::sharedfor consistency.ReplicationSlotPersistenceis defined inetl_config::sharedbut imported here viaetl::config. While this works due to re-exports, importing directly from the source crate would be more consistent with howReplicationSlotConfigis imported.📝 Suggested fix
📝 Committable suggestion
🤖 Prompt for AI Agents