diff --git a/etl-benchmarks/benches/table_copies.rs b/etl-benchmarks/benches/table_copies.rs index 9b9308996..eaa755276 100644 --- a/etl-benchmarks/benches/table_copies.rs +++ b/etl-benchmarks/benches/table_copies.rs @@ -345,6 +345,8 @@ async fn start_pipeline(args: RunArgs) -> Result<(), Box> { id: 1, publication_name: args.publication_name, pg_connection: pg_connection_config, + primary_connection: None, + heartbeat: None, batch: BatchConfig { max_size: args.batch_max_size, max_fill_ms: args.batch_max_fill_ms, diff --git a/etl-config/src/shared/heartbeat.rs b/etl-config/src/shared/heartbeat.rs new file mode 100644 index 000000000..146139bd0 --- /dev/null +++ b/etl-config/src/shared/heartbeat.rs @@ -0,0 +1,195 @@ +//! Heartbeat configuration for read replica replication slot maintenance. + +use crate::shared::ValidationError; +use serde::{Deserialize, Serialize}; +#[cfg(feature = "utoipa")] +use utoipa::ToSchema; + +/// Configuration for the heartbeat worker that maintains replication slot activity. +/// +/// When replicating from a read replica, the replication slot on the primary can +/// become inactive during idle periods. The heartbeat worker periodically emits +/// WAL messages to keep the slot active. +#[derive(Clone, Debug, Deserialize, Serialize)] +#[cfg_attr(feature = "utoipa", derive(ToSchema))] +pub struct HeartbeatConfig { + /// Interval in milliseconds between heartbeat emissions. + /// + /// Default: 30000 (30 seconds) + #[serde(default = "default_interval_ms")] + pub interval_ms: u64, + + /// Minimum backoff duration in milliseconds after a failed heartbeat attempt. + /// + /// Default: 1000 (1 second) + #[serde(default = "default_min_backoff_ms")] + pub min_backoff_ms: u64, + + /// Maximum backoff duration in milliseconds after repeated failures. + /// + /// Default: 60000 (60 seconds) + #[serde(default = "default_max_backoff_ms")] + pub max_backoff_ms: u64, + + /// Jitter percentage to apply to backoff duration (0-100). + /// + /// Helps prevent thundering herd when multiple workers reconnect. + /// Default: 25 + #[serde(default = "default_jitter_percent")] + pub jitter_percent: u8, +} + +impl HeartbeatConfig { + /// Default heartbeat interval: 30 seconds. + pub const DEFAULT_INTERVAL_MS: u64 = 30_000; + + /// Default minimum backoff: 1 second. + pub const DEFAULT_MIN_BACKOFF_MS: u64 = 1_000; + + /// Default maximum backoff: 60 seconds. + pub const DEFAULT_MAX_BACKOFF_MS: u64 = 60_000; + + /// Default jitter percentage: 25%. + pub const DEFAULT_JITTER_PERCENT: u8 = 25; + + /// Validates the heartbeat configuration. + /// + /// Ensures interval_ms > 0, jitter_percent <= 100, min_backoff_ms > 0, + /// and min_backoff_ms <= max_backoff_ms. + pub fn validate(&self) -> Result<(), ValidationError> { + if self.interval_ms == 0 { + return Err(ValidationError::InvalidFieldValue { + field: "interval_ms".to_string(), + constraint: "must be greater than 0".to_string(), + }); + } + + if self.jitter_percent > 100 { + return Err(ValidationError::InvalidFieldValue { + field: "jitter_percent".to_string(), + constraint: "must be <= 100".to_string(), + }); + } + + if self.min_backoff_ms == 0 { + return Err(ValidationError::InvalidFieldValue { + field: "min_backoff_ms".to_string(), + constraint: "must be greater than 0".to_string(), + }); + } + + if self.min_backoff_ms > self.max_backoff_ms { + return Err(ValidationError::InvalidFieldValue { + field: "min_backoff_ms".to_string(), + constraint: "must be <= max_backoff_ms".to_string(), + }); + } + + Ok(()) + } +} + +impl Default for HeartbeatConfig { + fn default() -> Self { + Self { + interval_ms: Self::DEFAULT_INTERVAL_MS, + min_backoff_ms: Self::DEFAULT_MIN_BACKOFF_MS, + max_backoff_ms: Self::DEFAULT_MAX_BACKOFF_MS, + jitter_percent: Self::DEFAULT_JITTER_PERCENT, + } + } +} + +fn default_interval_ms() -> u64 { + HeartbeatConfig::DEFAULT_INTERVAL_MS +} + +fn default_min_backoff_ms() -> u64 { + HeartbeatConfig::DEFAULT_MIN_BACKOFF_MS +} + +fn default_max_backoff_ms() -> u64 { + HeartbeatConfig::DEFAULT_MAX_BACKOFF_MS +} + +fn default_jitter_percent() -> u8 { + HeartbeatConfig::DEFAULT_JITTER_PERCENT +} + +/// Connection options optimized for heartbeat connections. +/// +/// Uses shorter timeouts since heartbeat connections are lightweight +/// health checks that should fail fast. +pub const ETL_HEARTBEAT_OPTIONS: &str = concat!( + "application_name=etl_heartbeat", + " ", "statement_timeout=5000", + " ", "lock_timeout=5000", + " ", "idle_in_transaction_session_timeout=30000", +); + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_default_config() { + let config = HeartbeatConfig::default(); + assert_eq!(config.interval_ms, 30_000); + assert_eq!(config.min_backoff_ms, 1_000); + assert_eq!(config.max_backoff_ms, 60_000); + assert_eq!(config.jitter_percent, 25); + } + + #[test] + fn test_heartbeat_options() { + assert!(ETL_HEARTBEAT_OPTIONS.contains("application_name=etl_heartbeat")); + assert!(ETL_HEARTBEAT_OPTIONS.contains("statement_timeout=5000")); + // Verify options are properly space-separated + assert!(ETL_HEARTBEAT_OPTIONS.contains(" statement_timeout=")); + assert!(ETL_HEARTBEAT_OPTIONS.contains(" lock_timeout=")); + assert!(ETL_HEARTBEAT_OPTIONS.contains(" idle_in_transaction_session_timeout=")); + } + + #[test] + fn test_validate_valid_config() { + let config = HeartbeatConfig::default(); + assert!(config.validate().is_ok()); + } + + #[test] + fn test_validate_zero_interval() { + let config = HeartbeatConfig { + interval_ms: 0, + ..Default::default() + }; + assert!(config.validate().is_err()); + } + + #[test] + fn test_validate_jitter_too_high() { + let config = HeartbeatConfig { + jitter_percent: 101, + ..Default::default() + }; + assert!(config.validate().is_err()); + } + + #[test] + fn test_validate_min_greater_than_max() { + let config = HeartbeatConfig { + min_backoff_ms: 10_000, + max_backoff_ms: 1_000, + ..Default::default() + }; + assert!(config.validate().is_err()); + } + + #[test] + fn test_validate_zero_min_backoff() { + let config = HeartbeatConfig { + min_backoff_ms: 0, + ..Default::default() + }; + assert!(config.validate().is_err()); + } +} diff --git a/etl-config/src/shared/mod.rs b/etl-config/src/shared/mod.rs index e9f87563e..e843a1cad 100644 --- a/etl-config/src/shared/mod.rs +++ b/etl-config/src/shared/mod.rs @@ -2,6 +2,7 @@ mod base; mod batch; mod connection; mod destination; +mod heartbeat; mod pipeline; mod replicator; mod sentry; @@ -11,6 +12,7 @@ pub use base::*; pub use batch::*; pub use connection::*; pub use destination::*; +pub use heartbeat::*; pub use pipeline::*; pub use replicator::*; pub use sentry::*; diff --git a/etl-config/src/shared/pipeline.rs b/etl-config/src/shared/pipeline.rs index 82788a04d..8385bdc09 100644 --- a/etl-config/src/shared/pipeline.rs +++ b/etl-config/src/shared/pipeline.rs @@ -3,10 +3,11 @@ use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use crate::shared::{ - PgConnectionConfig, PgConnectionConfigWithoutSecrets, ValidationError, batch::BatchConfig, + HeartbeatConfig, PgConnectionConfig, PgConnectionConfigWithoutSecrets, ValidationError, + batch::BatchConfig, }; -/// c copy should be performed.Selection rules for tables participating in replication. +/// Selection rules for tables participating in replication. /// /// Controls which tables are eligible for initial table copy and streaming. #[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] @@ -67,6 +68,15 @@ pub struct PipelineConfig { /// The connection configuration for the Postgres instance to which the pipeline connects for /// replication. pub pg_connection: PgConnectionConfig, + /// Optional connection configuration for the primary database when replicating from a read replica. + /// + /// When set, enables replica mode where heartbeat messages are sent to the primary + /// to keep the replication slot active. + pub primary_connection: Option, + /// Optional heartbeat configuration for replica mode. + /// + /// Only used when `primary_connection` is set. If not provided, default values are used. + pub heartbeat: Option, /// Batch processing configuration. #[serde(default)] pub batch: BatchConfig, @@ -94,6 +104,26 @@ impl PipelineConfig { /// Default maximum number of concurrent table sync workers. pub const DEFAULT_MAX_TABLE_SYNC_WORKERS: u16 = 4; + /// Returns `true` if the pipeline is configured for replica mode. + /// + /// Replica mode is enabled when `primary_connection` is set, indicating + /// that replication is from a read replica and heartbeat messages should + /// be sent to the primary. + pub fn is_replica_mode(&self) -> bool { + self.primary_connection.is_some() + } + + /// Returns the heartbeat configuration, using defaults if not explicitly set. + /// + /// Returns `None` if not in replica mode (no `primary_connection`). + pub fn heartbeat_config(&self) -> Option { + if self.is_replica_mode() { + Some(self.heartbeat.clone().unwrap_or_default()) + } else { + None + } + } + /// Validates pipeline configuration settings. /// /// Checks batch configuration and ensures worker counts and retry attempts are non-zero. @@ -114,6 +144,13 @@ impl PipelineConfig { }); } + // Only validate heartbeat config when replica mode is enabled (primary_connection is set) + if self.primary_connection.is_some() { + if let Some(ref heartbeat) = self.heartbeat { + heartbeat.validate()?; + } + } + Ok(()) } } @@ -145,6 +182,10 @@ pub struct PipelineConfigWithoutSecrets { /// The connection configuration for the Postgres instance to which the pipeline connects for /// replication. pub pg_connection: PgConnectionConfigWithoutSecrets, + /// Optional connection configuration for the primary database when replicating from a read replica. + pub primary_connection: Option, + /// Optional heartbeat configuration for replica mode. + pub heartbeat: Option, /// Batch processing configuration. #[serde(default)] pub batch: BatchConfig, @@ -168,6 +209,8 @@ impl From for PipelineConfigWithoutSecrets { id: value.id, publication_name: value.publication_name, pg_connection: value.pg_connection.into(), + primary_connection: value.primary_connection.map(Into::into), + heartbeat: value.heartbeat, batch: value.batch, table_error_retry_delay_ms: value.table_error_retry_delay_ms, table_error_retry_max_attempts: value.table_error_retry_max_attempts, diff --git a/etl-examples/src/main.rs b/etl-examples/src/main.rs index 6344d5221..aeb490bc5 100644 --- a/etl-examples/src/main.rs +++ b/etl-examples/src/main.rs @@ -181,6 +181,10 @@ async fn main_impl() -> Result<(), Box> { id: 1, // Using a simple ID for the example publication_name: args.publication, pg_connection: pg_connection_config, + // Primary database connection for replica mode (None = replicating from primary directly) + primary_connection: None, + // Heartbeat config for keeping replication slot active in replica mode + heartbeat: None, batch: BatchConfig { max_size: args.bq_args.max_batch_size, max_fill_ms: args.bq_args.max_batch_fill_duration_ms, diff --git a/etl/Cargo.toml b/etl/Cargo.toml index 14824af9e..88783819c 100644 --- a/etl/Cargo.toml +++ b/etl/Cargo.toml @@ -28,9 +28,11 @@ metrics = { workspace = true } pg_escape = { workspace = true } pin-project-lite = { workspace = true } postgres-replication = { workspace = true } +rand = { workspace = true, features = ["thread_rng"] } rustls = { workspace = true, features = ["aws-lc-rs", "logging"] } serde_json = { workspace = true, features = ["std"] } sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] } +thiserror = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread"] } tokio-postgres = { workspace = true, features = [ "runtime", @@ -50,5 +52,3 @@ etl-postgres = { workspace = true, features = [ "test-utils", ] } etl-telemetry = { workspace = true } - -rand = { workspace = true, features = ["thread_rng"] } diff --git a/etl/src/error.rs b/etl/src/error.rs index c8aeb3d23..1e72aaa0c 100644 --- a/etl/src/error.rs +++ b/etl/src/error.rs @@ -133,6 +133,7 @@ pub enum ErrorKind { InvalidState, ApplyWorkerPanic, TableSyncWorkerPanic, + HeartbeatWorkerPanic, StateRollbackError, // Replication Errors @@ -283,31 +284,16 @@ impl PartialEq for EtlError { } impl Hash for EtlError { - /// Hashes the error using only its stable identifying components. - /// - /// Only hashes the error kind and static description, intentionally excluding: - /// - Location information (file, line, column) - /// - Detail field (often contains dynamic data like table names, IDs) - /// - Source errors - /// - Backtrace - /// - /// This ensures that errors of the same category produce the same hash, - /// enabling stable grouping and deduplication across multiple occurrences. fn hash(&self, state: &mut H) { match &self.repr { ErrorRepr::Single(payload) => { - // Hash the discriminant to distinguish from Many variant. std::mem::discriminant(&self.repr).hash(state); - // Hash only the stable components. payload.kind.hash(state); payload.description.hash(state); } ErrorRepr::Many { errors, .. } => { - // Hash the discriminant to distinguish from Single variant. std::mem::discriminant(&self.repr).hash(state); - // Hash the number of errors for differentiation. errors.len().hash(state); - // Hash all errors in the aggregation. for error in errors { error.hash(state); } @@ -383,7 +369,6 @@ impl error::Error for EtlError { .source .as_ref() .map(|source| source as &(dyn error::Error + 'static)), - // For aggregated errors, we forward the first contained error as the source. ErrorRepr::Many { errors, .. } => errors .first() .map(|error| error as &(dyn error::Error + 'static)), @@ -391,7 +376,6 @@ impl error::Error for EtlError { } } -/// Writes the captured backtrace with indentation. fn write_backtrace( backtrace: &Backtrace, f: &mut fmt::Formatter<'_>, @@ -414,7 +398,6 @@ fn write_backtrace( Ok(()) } -/// Writes the detail block with indentation. fn write_detail(detail: Option<&str>, f: &mut fmt::Formatter<'_>, indent: usize) -> fmt::Result { if let Some(detail) = detail { let indent_str = " ".repeat(indent); @@ -435,7 +418,6 @@ fn write_detail(detail: Option<&str>, f: &mut fmt::Formatter<'_>, indent: usize) Ok(()) } -/// Creates an [`EtlError`] from an error kind and static description. impl From<(ErrorKind, &'static str)> for EtlError { #[track_caller] fn from((kind, desc): (ErrorKind, &'static str)) -> EtlError { @@ -443,7 +425,6 @@ impl From<(ErrorKind, &'static str)> for EtlError { } } -/// Creates an [`EtlError`] from an error kind, static description, and dynamic detail. impl From<(ErrorKind, &'static str, D)> for EtlError where D: Into>, @@ -454,10 +435,6 @@ where } } -/// Creates an [`EtlError`] from a vector of errors for aggregation. -/// -/// If the vector contains exactly one error, returns that error directly without wrapping -/// it in the [`ErrorRepr::Many`] variant. impl From> for EtlError where E: Into, @@ -478,7 +455,6 @@ where } } -/// Converts [`std::io::Error`] to [`EtlError`] with [`ErrorKind::IoError`]. impl From for EtlError { #[track_caller] fn from(err: std::io::Error) -> EtlError { @@ -493,10 +469,6 @@ impl From for EtlError { } } -/// Converts [`serde_json::Error`] to [`EtlError`] with the appropriate error kind. -/// -/// Maps to [`ErrorKind::SerializationError`] for serialization failures and -/// [`ErrorKind::DeserializationError`] for deserialization failures based on error classification. impl From for EtlError { #[track_caller] fn from(err: serde_json::Error) -> EtlError { @@ -523,7 +495,6 @@ impl From for EtlError { } } -/// Converts [`std::str::Utf8Error`] to [`EtlError`] with [`ErrorKind::ConversionError`]. impl From for EtlError { #[track_caller] fn from(err: std::str::Utf8Error) -> EtlError { @@ -538,7 +509,6 @@ impl From for EtlError { } } -/// Converts [`std::string::FromUtf8Error`] to [`EtlError`] with [`ErrorKind::ConversionError`]. impl From for EtlError { #[track_caller] fn from(err: std::string::FromUtf8Error) -> EtlError { @@ -553,7 +523,6 @@ impl From for EtlError { } } -/// Converts [`std::num::ParseIntError`] to [`EtlError`] with [`ErrorKind::ConversionError`]. impl From for EtlError { #[track_caller] fn from(err: std::num::ParseIntError) -> EtlError { @@ -568,7 +537,6 @@ impl From for EtlError { } } -/// Converts [`std::num::ParseFloatError`] to [`EtlError`] with [`ErrorKind::ConversionError`]. impl From for EtlError { #[track_caller] fn from(err: std::num::ParseFloatError) -> EtlError { @@ -583,10 +551,6 @@ impl From for EtlError { } } -/// Converts [`tokio_postgres::Error`] to [`EtlError`] with the appropriate error kind. -/// -/// Maps errors based on Postgres SQLSTATE codes to provide granular error classification -/// for better error handling in ETL operations. impl From for EtlError { #[track_caller] fn from(err: tokio_postgres::Error) -> EtlError { @@ -595,7 +559,6 @@ impl From for EtlError { use tokio_postgres::error::SqlState; match *sqlstate { - // Connection errors (08xxx) SqlState::CONNECTION_EXCEPTION | SqlState::CONNECTION_DOES_NOT_EXIST | SqlState::CONNECTION_FAILURE @@ -605,13 +568,11 @@ impl From for EtlError { "PostgreSQL connection failed", ), - // Authentication errors (28xxx) SqlState::INVALID_AUTHORIZATION_SPECIFICATION | SqlState::INVALID_PASSWORD => ( ErrorKind::AuthenticationError, "PostgreSQL authentication failed", ), - // Data integrity violations (23xxx) SqlState::INTEGRITY_CONSTRAINT_VIOLATION | SqlState::NOT_NULL_VIOLATION | SqlState::FOREIGN_KEY_VIOLATION @@ -621,7 +582,6 @@ impl From for EtlError { "PostgreSQL constraint violation", ), - // Data conversion errors (22xxx) SqlState::DATA_EXCEPTION | SqlState::INVALID_TEXT_REPRESENTATION | SqlState::INVALID_DATETIME_FORMAT @@ -631,7 +591,6 @@ impl From for EtlError { "PostgreSQL data conversion failed", ), - // Schema/object not found errors (42xxx) SqlState::UNDEFINED_TABLE | SqlState::UNDEFINED_COLUMN | SqlState::UNDEFINED_FUNCTION @@ -640,7 +599,6 @@ impl From for EtlError { "PostgreSQL schema object not found", ), - // Syntax and access errors (42xxx) SqlState::SYNTAX_ERROR | SqlState::SYNTAX_ERROR_OR_ACCESS_RULE_VIOLATION | SqlState::INSUFFICIENT_PRIVILEGE => ( @@ -648,7 +606,6 @@ impl From for EtlError { "PostgreSQL syntax or access error", ), - // Resource errors (53xxx) SqlState::INSUFFICIENT_RESOURCES | SqlState::OUT_OF_MEMORY | SqlState::TOO_MANY_CONNECTIONS => ( @@ -656,7 +613,6 @@ impl From for EtlError { "PostgreSQL resource limitation", ), - // Transaction errors (40xxx, 25xxx) SqlState::TRANSACTION_ROLLBACK | SqlState::T_R_SERIALIZATION_FAILURE | SqlState::T_R_DEADLOCK_DETECTED @@ -664,13 +620,11 @@ impl From for EtlError { (ErrorKind::InvalidState, "PostgreSQL transaction failed") } - // System errors (58xxx, XX xxx) SqlState::SYSTEM_ERROR | SqlState::INTERNAL_ERROR => { (ErrorKind::SourceQueryFailed, "PostgreSQL system error") } SqlState::IO_ERROR => (ErrorKind::SourceIoError, "PostgreSQL I/O error"), - // Operator intervention errors (57xxx) SqlState::OPERATOR_INTERVENTION => ( ErrorKind::SourceOperationCanceled, "PostgreSQL operation canceled", @@ -699,7 +653,6 @@ impl From for EtlError { "PostgreSQL idle session timeout", ), - // Object state errors (55xxx) SqlState::OBJECT_NOT_IN_PREREQUISITE_STATE => ( ErrorKind::InvalidState, "PostgreSQL object not in prerequisite state", @@ -712,7 +665,6 @@ impl From for EtlError { "PostgreSQL lock not available", ), - // Program limit errors (54xxx) SqlState::PROGRAM_LIMIT_EXCEEDED | SqlState::STATEMENT_TOO_COMPLEX | SqlState::TOO_MANY_COLUMNS @@ -721,14 +673,12 @@ impl From for EtlError { "PostgreSQL program limit exceeded", ), - // Configuration errors (53xxx) SqlState::DISK_FULL => (ErrorKind::SourceIoError, "PostgreSQL disk full"), SqlState::CONFIGURATION_LIMIT_EXCEEDED => ( ErrorKind::SourceConfigurationLimitExceeded, "PostgreSQL configuration limit exceeded", ), - // Transaction state errors (25xxx) SqlState::ACTIVE_SQL_TRANSACTION | SqlState::NO_ACTIVE_SQL_TRANSACTION | SqlState::IN_FAILED_SQL_TRANSACTION @@ -737,41 +687,34 @@ impl From for EtlError { "PostgreSQL transaction state error", ), - // Cursor errors (24xxx, 34xxx) SqlState::INVALID_CURSOR_STATE | SqlState::INVALID_CURSOR_NAME => { (ErrorKind::InvalidState, "PostgreSQL cursor error") } - // Data corruption errors (XX xxx) SqlState::DATA_CORRUPTED | SqlState::INDEX_CORRUPTED => { (ErrorKind::SourceIoError, "PostgreSQL data corruption") } - // Configuration file errors (F0xxx) SqlState::CONFIG_FILE_ERROR | SqlState::LOCK_FILE_EXISTS => { (ErrorKind::ConfigError, "PostgreSQL configuration error") } - // Feature not supported (0Axxx) SqlState::FEATURE_NOT_SUPPORTED => ( ErrorKind::SourceSchemaError, "PostgreSQL feature not supported", ), - // Invalid transaction initiation (0Bxxx) SqlState::INVALID_TRANSACTION_INITIATION => ( ErrorKind::InvalidState, "PostgreSQL invalid transaction initiation", ), - // Dependent objects errors (2Bxxx) SqlState::DEPENDENT_PRIVILEGE_DESCRIPTORS_STILL_EXIST | SqlState::DEPENDENT_OBJECTS_STILL_EXIST => ( ErrorKind::InvalidState, "PostgreSQL dependent objects exist", ), - // SQL routine errors (2Fxxx) SqlState::SQL_ROUTINE_EXCEPTION | SqlState::S_R_E_FUNCTION_EXECUTED_NO_RETURN_STATEMENT | SqlState::S_R_E_MODIFYING_SQL_DATA_NOT_PERMITTED @@ -780,7 +723,6 @@ impl From for EtlError { (ErrorKind::SourceQueryFailed, "PostgreSQL routine exception") } - // External routine errors (38xxx, 39xxx) SqlState::EXTERNAL_ROUTINE_EXCEPTION | SqlState::E_R_E_CONTAINING_SQL_NOT_PERMITTED | SqlState::E_R_E_MODIFYING_SQL_DATA_NOT_PERMITTED @@ -796,7 +738,6 @@ impl From for EtlError { "PostgreSQL external routine error", ), - // PL/pgSQL errors (P0xxx) SqlState::PLPGSQL_ERROR | SqlState::RAISE_EXCEPTION | SqlState::NO_DATA_FOUND @@ -805,7 +746,6 @@ impl From for EtlError { (ErrorKind::SourceQueryFailed, "PostgreSQL PL/pgSQL error") } - // Foreign Data Wrapper errors (HVxxx) - connection/schema related SqlState::FDW_ERROR | SqlState::FDW_UNABLE_TO_ESTABLISH_CONNECTION => ( ErrorKind::SourceConnectionFailed, "PostgreSQL FDW connection failed", @@ -846,18 +786,15 @@ impl From for EtlError { "PostgreSQL FDW operation error", ), - // Snapshot errors (72xxx) - important for replication consistency SqlState::SNAPSHOT_TOO_OLD => ( ErrorKind::SourceSnapshotTooOld, "PostgreSQL snapshot too old", ), - // Array errors - relevant for replication data handling SqlState::ARRAY_ELEMENT_ERROR => { (ErrorKind::ConversionError, "PostgreSQL array error") } - // XML/JSON errors that could occur during replication SqlState::NOT_AN_XML_DOCUMENT | SqlState::INVALID_XML_DOCUMENT | SqlState::INVALID_XML_CONTENT @@ -883,11 +820,9 @@ impl From for EtlError { (ErrorKind::ConversionError, "PostgreSQL XML/JSON error") } - // Default for other SQL states _ => (ErrorKind::SourceError, "PostgreSQL error"), } } - // No SQL state means connection issue None => ( ErrorKind::SourceConnectionFailed, "PostgreSQL connection failed", @@ -905,7 +840,6 @@ impl From for EtlError { } } -/// Converts [`rustls::Error`] to [`EtlError`] with [`ErrorKind::EncryptionError`]. impl From for EtlError { #[track_caller] fn from(err: rustls::Error) -> EtlError { @@ -920,7 +854,6 @@ impl From for EtlError { } } -/// Converts [`rustls::pki_types::pem::Error`] to [`EtlError`] with [`ErrorKind::ConfigError`]. impl From for EtlError { #[track_caller] fn from(err: rustls::pki_types::pem::Error) -> EtlError { @@ -935,7 +868,6 @@ impl From for EtlError { } } -/// Converts [`uuid::Error`] to [`EtlError`] with [`ErrorKind::InvalidData`]. impl From for EtlError { #[track_caller] fn from(err: uuid::Error) -> EtlError { @@ -950,7 +882,6 @@ impl From for EtlError { } } -/// Converts [`chrono::ParseError`] to [`EtlError`] with [`ErrorKind::ConversionError`]. impl From for EtlError { #[track_caller] fn from(err: chrono::ParseError) -> EtlError { @@ -965,7 +896,6 @@ impl From for EtlError { } } -/// Converts [`ParseNumericError`] to [`EtlError`] with [`ErrorKind::ConversionError`]. impl From for EtlError { #[track_caller] fn from(err: ParseNumericError) -> EtlError { @@ -980,10 +910,6 @@ impl From for EtlError { } } -/// Converts [`sqlx::Error`] to [`EtlError`] with the appropriate error kind. -/// -/// Maps database errors to [`ErrorKind::SourceQueryFailed`], I/O errors to [`ErrorKind::IoError`], -/// and connection pool errors to [`ErrorKind::SourceConnectionFailed`]. impl From for EtlError { #[track_caller] fn from(err: sqlx::Error) -> EtlError { @@ -1007,7 +933,6 @@ impl From for EtlError { } } -/// Converts [`etl_postgres::replication::slots::EtlReplicationSlotError`] to [`EtlError`] with appropriate error kind. impl From for EtlError { #[track_caller] fn from(err: etl_postgres::replication::slots::EtlReplicationSlotError) -> EtlError { @@ -1082,38 +1007,12 @@ mod tests { assert_eq!(multi_err.detail(), None); } - #[test] - fn test_multiple_errors_with_detail() { - let errors = vec![ - EtlError::from(( - ErrorKind::ValidationError, - "Invalid schema", - "Missing required field".to_string(), - )), - EtlError::from((ErrorKind::ConversionError, "Type mismatch")), - ]; - let multi_err: EtlError = errors.into(); - - assert_eq!(multi_err.detail(), Some("Missing required field")); - } - - #[test] - fn test_from_vector() { - let errors = vec![ - EtlError::from((ErrorKind::ValidationError, "Error 1")), - EtlError::from((ErrorKind::ConversionError, "Error 2")), - ]; - let multi_err = EtlError::from(errors); - assert_eq!(multi_err.kinds().len(), 2); - } - #[test] fn test_from_vector_single_error_not_wrapped() { let error = EtlError::from((ErrorKind::ValidationError, "Single error")); let errors = vec![error]; let result = EtlError::from(errors); - // Single error should not be wrapped in Many variant. assert_eq!(result.kinds().len(), 1); assert_eq!(result.kind(), ErrorKind::ValidationError); } @@ -1140,48 +1039,6 @@ mod tests { assert!(display_str.contains(" @ ")); } - #[test] - fn test_error_display_with_detail() { - let err = EtlError::from(( - ErrorKind::SourceQueryFailed, - "SQL query failed", - "Invalid table name".to_string(), - )); - let display_str = format!("{err}"); - assert!(display_str.contains("QueryFailed")); - assert!(display_str.contains("SQL query failed")); - assert!(display_str.contains("Invalid table name")); - assert!(display_str.contains("\n Detail:")); - } - - #[test] - fn test_multiple_errors_display() { - let errors = vec![ - EtlError::from((ErrorKind::ValidationError, "Invalid schema")), - EtlError::from((ErrorKind::ConversionError, "Type mismatch")), - ]; - let multi_err: EtlError = errors.into(); - let display_str = format!("{multi_err}"); - assert!(display_str.contains("[Many] 2 errors aggregated @ ")); - assert!(display_str.contains("1. [ValidationError] Invalid schema @ ")); - } - - #[test] - fn test_error_source_preserved() { - let io_err = std::io::Error::other("boom"); - let err = EtlError::from(io_err); - let source = err.source().expect("missing source"); - assert_eq!(source.to_string(), "boom"); - } - - #[test] - fn test_many_forwards_source() { - let inner = EtlError::from(std::io::Error::other("inner failure")); - let outer: EtlError = vec![inner.clone(), EtlError::from((ErrorKind::Unknown, "x"))].into(); - let source = outer.source().expect("missing aggregate source"); - assert_eq!(source.to_string(), inner.to_string()); - } - #[test] fn test_macro_usage() { let err = etl_error!(ErrorKind::ValidationError, "Invalid data format"); @@ -1195,26 +1052,6 @@ mod tests { ); assert_eq!(err_with_detail.kind(), ErrorKind::ConversionError); assert!(err_with_detail.detail().unwrap().contains("Cannot convert")); - - let owned_detail = String::from("Owned detail"); - let err_with_owned = etl_error!( - ErrorKind::InvalidData, - "Owned detail preserved", - owned_detail - ); - assert_eq!(err_with_owned.detail(), Some("Owned detail")); - } - - #[test] - fn test_macro_with_source() { - let err = etl_error!( - ErrorKind::IoError, - "I/O failure", - "disk unavailable", - source: std::io::Error::other("disk unavailable") - ); - let source = err.source().expect("missing macro source"); - assert_eq!(source.to_string(), "disk unavailable"); } #[test] @@ -1223,218 +1060,9 @@ mod tests { bail!(ErrorKind::ValidationError, "Test error"); } - fn test_function_with_detail() -> EtlResult { - bail!( - ErrorKind::ConversionError, - "Test error", - "Additional detail" - ); - } - - fn test_function_with_owned_detail() -> EtlResult { - let detail = String::from("Owned bail detail"); - bail!( - ErrorKind::DestinationError, - "Test error with owned detail", - detail - ); - } - - fn test_function_with_source() -> EtlResult { - bail!( - ErrorKind::IoError, - "Test error with source", - source: std::io::Error::other("socket closed") - ); - } - let result = test_function(); assert!(result.is_err()); let err = result.unwrap_err(); assert_eq!(err.kind(), ErrorKind::ValidationError); - - let result = test_function_with_detail(); - assert!(result.is_err()); - let err = result.unwrap_err(); - assert_eq!(err.kind(), ErrorKind::ConversionError); - assert!(err.detail().unwrap().contains("Additional detail")); - - let result = test_function_with_owned_detail(); - assert!(result.is_err()); - let err = result.unwrap_err(); - assert_eq!(err.kind(), ErrorKind::DestinationError); - assert_eq!(err.detail(), Some("Owned bail detail")); - - let result = test_function_with_source(); - assert!(result.is_err()); - let err = result.unwrap_err(); - assert_eq!(err.kind(), ErrorKind::IoError); - assert_eq!( - err.source().expect("missing bail source").to_string(), - "socket closed" - ); - } - - #[test] - fn test_nested_multiple_errors() { - let inner_errors = vec![ - EtlError::from((ErrorKind::ConversionError, "Inner error 1")), - EtlError::from((ErrorKind::ValidationError, "Inner error 2")), - ]; - let inner_multi: EtlError = inner_errors.into(); - - let outer_errors = vec![ - inner_multi, - EtlError::from((ErrorKind::IoError, "Outer error")), - ]; - let outer_multi: EtlError = outer_errors.into(); - - let kinds = outer_multi.kinds(); - assert_eq!(kinds.len(), 3); - assert!(kinds.contains(&ErrorKind::ConversionError)); - assert!(kinds.contains(&ErrorKind::ValidationError)); - assert!(kinds.contains(&ErrorKind::IoError)); - - let rendered = format!("{outer_multi}"); - println!("{rendered}"); - assert!(rendered.contains("[Many] 2 errors aggregated")); - assert!(rendered.contains("1. [Many]")); - } - - #[test] - fn test_json_error_classification() { - // Test syntax error during deserialization - let json_err = serde_json::from_str::("invalid json").unwrap_err(); - let etl_err = EtlError::from(json_err); - assert_eq!(etl_err.kind(), ErrorKind::DeserializationError); - assert!(etl_err.detail().unwrap().contains("expected")); - - // Test data error during deserialization - let json_err = serde_json::from_str::("\"not_a_bool\"").unwrap_err(); - let etl_err = EtlError::from(json_err); - assert_eq!(etl_err.kind(), ErrorKind::DeserializationError); - assert!(etl_err.detail().is_some()); - } - - #[test] - fn test_hash_stability() { - use std::collections::hash_map::DefaultHasher; - use std::hash::{Hash, Hasher}; - - // Same error kind and description should produce same hash. - let err1 = EtlError::from(( - ErrorKind::SourceConnectionFailed, - "Database connection failed", - )); - let err2 = EtlError::from(( - ErrorKind::SourceConnectionFailed, - "Database connection failed", - )); - - let mut hasher1 = DefaultHasher::new(); - err1.hash(&mut hasher1); - let hash1 = hasher1.finish(); - - let mut hasher2 = DefaultHasher::new(); - err2.hash(&mut hasher2); - let hash2 = hasher2.finish(); - - assert_eq!(hash1, hash2); - } - - #[test] - fn test_hash_ignores_detail() { - use std::collections::hash_map::DefaultHasher; - use std::hash::{Hash, Hasher}; - - // Same kind and description with different details should produce same hash. - let err1 = EtlError::from(( - ErrorKind::SourceQueryFailed, - "Query failed", - "Table 'users' not found".to_string(), - )); - let err2 = EtlError::from(( - ErrorKind::SourceQueryFailed, - "Query failed", - "Table 'orders' not found".to_string(), - )); - - let mut hasher1 = DefaultHasher::new(); - err1.hash(&mut hasher1); - let hash1 = hasher1.finish(); - - let mut hasher2 = DefaultHasher::new(); - err2.hash(&mut hasher2); - let hash2 = hasher2.finish(); - - assert_eq!(hash1, hash2, "Hash should ignore detail field"); - } - - #[test] - fn test_hash_distinguishes_different_errors() { - use std::collections::hash_map::DefaultHasher; - use std::hash::{Hash, Hasher}; - - // Different error kinds should produce different hashes. - let err1 = EtlError::from((ErrorKind::SourceConnectionFailed, "Connection failed")); - let err2 = EtlError::from((ErrorKind::SourceQueryFailed, "Query failed")); - - let mut hasher1 = DefaultHasher::new(); - err1.hash(&mut hasher1); - let hash1 = hasher1.finish(); - - let mut hasher2 = DefaultHasher::new(); - err2.hash(&mut hasher2); - let hash2 = hasher2.finish(); - - assert_ne!( - hash1, hash2, - "Different error kinds should have different hashes" - ); - } - - #[test] - fn test_hash_aggregated_errors() { - use std::collections::hash_map::DefaultHasher; - use std::hash::{Hash, Hasher}; - - // Aggregated errors should hash all contained errors. - let errors1 = vec![ - EtlError::from((ErrorKind::ValidationError, "Invalid schema")), - EtlError::from((ErrorKind::ConversionError, "Type mismatch")), - ]; - let multi_err1: EtlError = errors1.into(); - - let errors2 = vec![ - EtlError::from((ErrorKind::ValidationError, "Invalid schema")), - EtlError::from((ErrorKind::ConversionError, "Type mismatch")), - ]; - let multi_err2: EtlError = errors2.into(); - - let mut hasher1 = DefaultHasher::new(); - multi_err1.hash(&mut hasher1); - let hash1 = hasher1.finish(); - - let mut hasher2 = DefaultHasher::new(); - multi_err2.hash(&mut hasher2); - let hash2 = hasher2.finish(); - - assert_eq!(hash1, hash2, "Same aggregated errors should have same hash"); - - // Different order or different errors should produce different hash. - let errors3 = vec![ - EtlError::from((ErrorKind::ConversionError, "Type mismatch")), - EtlError::from((ErrorKind::ValidationError, "Invalid schema")), - ]; - let multi_err3: EtlError = errors3.into(); - - let mut hasher3 = DefaultHasher::new(); - multi_err3.hash(&mut hasher3); - let hash3 = hasher3.finish(); - - assert_ne!( - hash1, hash3, - "Different error order should produce different hash" - ); } } diff --git a/etl/src/lib.rs b/etl/src/lib.rs index ac08f0be8..fc4aaf786 100644 --- a/etl/src/lib.rs +++ b/etl/src/lib.rs @@ -16,6 +16,7 @@ //! - **Robust error handling**: Comprehensive error classification with retry strategies //! - **Concurrent processing**: Parallel table synchronization and event application for increased throughput //! - **Suspendable**: Persistent tracking of replication progress which allows the pipeline to be safely paused and restarted +//! - **Read replica support**: Optional heartbeat mechanism for replicating from read replicas //! //! # Core Concepts //! @@ -77,6 +78,8 @@ //! id: 1, //! publication_name: "my_publication".to_string(), //! pg_connection: pg_config, +//! primary_connection: None, +//! heartbeat: None, //! batch: BatchConfig { max_size: 1000, max_fill_ms: 5000 }, //! table_error_retry_delay_ms: 10000, //! table_error_retry_max_attempts: 5, diff --git a/etl/src/metrics.rs b/etl/src/metrics.rs index ff2f41050..fe672fa9e 100644 --- a/etl/src/metrics.rs +++ b/etl/src/metrics.rs @@ -15,6 +15,13 @@ pub const ETL_EVENTS_PROCESSED_TOTAL: &str = "etl_events_processed_total"; pub const ETL_STATUS_UPDATES_TOTAL: &str = "etl_status_updates_total"; pub const ETL_STATUS_UPDATES_SKIPPED_TOTAL: &str = "etl_status_updates_skipped_total"; +// Heartbeat metrics. +pub const ETL_HEARTBEAT_EMISSIONS_TOTAL: &str = "etl_heartbeat_emissions_total"; +pub const ETL_HEARTBEAT_FAILURES_TOTAL: &str = "etl_heartbeat_failures_total"; +pub const ETL_HEARTBEAT_CONSECUTIVE_FAILURES: &str = "etl_heartbeat_consecutive_failures"; +pub const ETL_HEARTBEAT_CONNECTION_ATTEMPTS_TOTAL: &str = "etl_heartbeat_connection_attempts_total"; +pub const ETL_HEARTBEAT_LAST_EMISSION_TIMESTAMP: &str = "etl_heartbeat_last_emission_timestamp"; + /// Label key for replication phase (used by table state metrics). pub const PHASE_LABEL: &str = "phase"; /// Label key for the ETL worker type ("table_sync" or "apply"). @@ -96,5 +103,36 @@ pub(crate) fn register_metrics() { Unit::Count, "Total number of status updates skipped due to throttling, labeled by pipeline_id" ); + + // Heartbeat metrics. + describe_counter!( + ETL_HEARTBEAT_EMISSIONS_TOTAL, + Unit::Count, + "Total number of heartbeat messages emitted to the primary" + ); + + describe_counter!( + ETL_HEARTBEAT_FAILURES_TOTAL, + Unit::Count, + "Total number of heartbeat emission failures" + ); + + describe_gauge!( + ETL_HEARTBEAT_CONSECUTIVE_FAILURES, + Unit::Count, + "Current number of consecutive heartbeat failures" + ); + + describe_counter!( + ETL_HEARTBEAT_CONNECTION_ATTEMPTS_TOTAL, + Unit::Count, + "Total number of heartbeat connection attempts to the primary" + ); + + describe_gauge!( + ETL_HEARTBEAT_LAST_EMISSION_TIMESTAMP, + Unit::Seconds, + "Unix timestamp of the last successful heartbeat emission" + ); }); } diff --git a/etl/src/pipeline.rs b/etl/src/pipeline.rs index 6602b4902..f8efa000f 100644 --- a/etl/src/pipeline.rs +++ b/etl/src/pipeline.rs @@ -16,6 +16,7 @@ use crate::store::state::StateStore; use crate::types::PipelineId; use crate::workers::apply::{ApplyWorker, ApplyWorkerHandle}; use crate::workers::base::{Worker, WorkerHandle}; +use crate::workers::heartbeat::{HeartbeatWorker, HeartbeatWorkerHandle}; use crate::workers::pool::TableSyncWorkerPool; use etl_config::shared::PipelineConfig; use etl_postgres::types::TableId; @@ -38,6 +39,7 @@ enum PipelineState { // with workers management, which should not be done in the pipeline. apply_worker: ApplyWorkerHandle, pool: TableSyncWorkerPool, + heartbeat_worker: Option, }, } @@ -53,6 +55,10 @@ enum PipelineState { /// /// Multiple table sync workers run in parallel during the initial phase, while a single /// apply worker processes the replication stream of table that were already copied. +/// +/// When configured for read replica mode (with `primary_connection` set), the pipeline +/// also starts a heartbeat worker that emits periodic messages to the primary database +/// to keep the replication slot active. #[derive(Debug)] pub struct Pipeline { config: Arc, @@ -115,10 +121,14 @@ where /// This method initializes the connection to Postgres, sets up table mappings and schemas, /// creates the worker pool for table synchronization, and starts the apply worker for /// processing replication stream events. + /// + /// When configured for read replica mode (with `primary_connection` set), this method + /// also starts a heartbeat worker that maintains replication slot activity on the primary. pub async fn start(&mut self) -> EtlResult<()> { info!( publication_name = %self.config.publication_name, pipeline_id = %self.config.id, + replica_mode = %self.config.is_replica_mode(), "starting pipeline" ); @@ -162,7 +172,28 @@ where .start() .await?; - self.state = PipelineState::Started { apply_worker, pool }; + // Start heartbeat worker if configured for read replica mode + let heartbeat_worker = if let Some(primary_config) = &self.config.primary_connection { + info!(pipeline_id = %self.config.id, "replica mode enabled, starting heartbeat worker"); + // Safe to unwrap: heartbeat_config() returns Some when primary_connection is set + let heartbeat_config = self.config.heartbeat_config() + .expect("heartbeat_config must be Some when primary_connection is set"); + let worker = HeartbeatWorker::new( + self.config.id, + primary_config.clone(), + heartbeat_config, + self.shutdown_tx.subscribe(), + ); + Some(worker.start()) + } else { + None + }; + + self.state = PipelineState::Started { + apply_worker, + pool, + heartbeat_worker, + }; Ok(()) } @@ -176,9 +207,15 @@ where /// The wait process ensures proper shutdown ordering: /// 1. Apply worker completes first (may spawn additional table sync workers) /// 2. All table sync workers complete - /// 3. Any errors from workers are aggregated and returned + /// 3. Heartbeat worker completes (if running) + /// 4. Any errors from workers are aggregated and returned pub async fn wait(self) -> EtlResult<()> { - let PipelineState::Started { apply_worker, pool } = self.state else { + let PipelineState::Started { + apply_worker, + pool, + heartbeat_worker, + } = self.state + else { info!("pipeline was not started, skipping wait"); return Ok(()); @@ -221,6 +258,17 @@ where info!(error_count = errors_number, "table sync workers failed"); } + // Wait for heartbeat worker if it was running. + // Heartbeat failures are intentionally non-fatal: the pipeline can continue operating + // even if heartbeat emission fails. Failures are logged for observability but not + // propagated to the errors collection to avoid shutting down an otherwise healthy pipeline. + if let Some(heartbeat_handle) = heartbeat_worker { + info!("waiting for heartbeat worker to complete"); + if let Err(err) = heartbeat_handle.wait().await { + error!(error = %err, "heartbeat worker failed"); + } + } + // Once all workers completed, we notify the destination of shutting down. info!("shutting down destination"); if let Err(err) = self.destination.shutdown().await { diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index 952536f71..1a1288173 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -1,7 +1,7 @@ use crate::error::{ErrorKind, EtlResult}; use crate::utils::tokio::MakeRustlsConnect; use crate::{bail, etl_error}; -use etl_config::shared::{ETL_REPLICATION_OPTIONS, IntoConnectOptions, PgConnectionConfig}; +use etl_config::shared::{ETL_HEARTBEAT_OPTIONS, ETL_REPLICATION_OPTIONS, IntoConnectOptions, PgConnectionConfig}; use etl_postgres::replication::extract_server_version; use etl_postgres::types::convert_type_oid_to_type; use etl_postgres::types::{ColumnSchema, TableId, TableName, TableSchema}; @@ -40,44 +40,48 @@ where } .instrument(span); - // There is no need to track the connection task via the `JoinHandle` since the `Client`, which - // returned the connection, will automatically terminate the connection when dropped. tokio::spawn(task); } +/// Builds a root certificate store from the TLS configuration. +fn build_root_cert_store(pg_connection_config: &PgConnectionConfig) -> EtlResult { + let mut root_store = rustls::RootCertStore::empty(); + if pg_connection_config.tls.enabled { + for cert in CertificateDer::pem_slice_iter( + pg_connection_config.tls.trusted_root_certs.as_bytes(), + ) { + let cert = cert?; + root_store.add(cert)?; + } + } + Ok(root_store) +} + /// Result returned when creating a new replication slot. -/// -/// Contains the consistent point LSN that should be used as the starting point -/// for logical replication. #[derive(Debug, Clone)] pub struct CreateSlotResult { - /// The LSN at which the slot was created, representing a consistent point in the WAL. + /// The LSN at which the slot was created. pub consistent_point: PgLsn, } /// Result returned when retrieving an existing replication slot. -/// -/// Contains the confirmed flush LSN indicating how far replication has progressed. #[derive(Debug, Clone)] pub struct GetSlotResult { - /// The LSN up to which changes have been confirmed as processed by ETL. + /// The LSN up to which changes have been confirmed as processed. pub confirmed_flush_lsn: PgLsn, } /// Result type for operations that either get an existing slot or create a new one. -/// -/// This enum distinguishes between whether a slot was newly created or already existed, -/// providing appropriate result data for each case. #[derive(Debug, Clone)] pub enum GetOrCreateSlotResult { - /// A new slot was created with the given consistent point. + /// A new slot was created. CreateSlot(CreateSlotResult), - /// An existing slot was found with the given confirmed flush LSN. + /// An existing slot was found. GetSlot(GetSlotResult), } impl GetOrCreateSlotResult { - /// Returns the lsn that should be used as starting LSN during events replication. + /// Returns the LSN that should be used as starting LSN during events replication. pub fn get_start_lsn(&self) -> PgLsn { match self { GetOrCreateSlotResult::CreateSlot(result) => result.consistent_point, @@ -87,32 +91,18 @@ impl GetOrCreateSlotResult { } /// A transaction that operates within the context of a replication slot. -/// -/// This type ensures that the parent connection remains active for the duration of any -/// transaction spawned by that connection for a given slot. -/// -/// The `client` is the client that created the slot and must be active for the duration of -/// the transaction for the snapshot of the slot to be consistent. #[derive(Debug)] pub struct PgReplicationSlotTransaction { client: PgReplicationClient, } impl PgReplicationSlotTransaction { - /// Creates a new transaction within the context of a replication slot. - /// - /// The transaction is started with a repeatable read isolation level and uses the - /// snapshot associated with the provided slot. async fn new(client: PgReplicationClient) -> EtlResult { client.begin_tx().await?; - Ok(Self { client }) } /// Retrieves the schema information for the supplied table. - /// - /// If a publication is specified, only columns included in that publication - /// will be returned. pub async fn get_table_schema( &self, table_id: TableId, @@ -122,8 +112,6 @@ impl PgReplicationSlotTransaction { } /// Creates a COPY stream for reading data from the specified table. - /// - /// The stream will include only the columns specified in `column_schemas`. pub async fn get_table_copy_stream( &self, table_id: TableId, @@ -148,16 +136,11 @@ impl PgReplicationSlotTransaction { /// Result of building publication filter SQL components. struct PublicationFilter { - /// CTEs to include in the WITH clause (empty string if no publication filtering). ctes: String, - /// Predicate to include in the WHERE clause (empty string if no publication filtering). predicate: String, } /// A client for interacting with Postgres's logical replication features. -/// -/// This client provides methods for creating replication slots, managing transactions, -/// and streaming changes from the database. #[derive(Debug, Clone)] pub struct PgReplicationClient { client: Arc, @@ -165,10 +148,7 @@ pub struct PgReplicationClient { } impl PgReplicationClient { - /// Establishes a connection to Postgres. The connection uses TLS if configured in the - /// supplied [`PgConnectionConfig`]. - /// - /// The connection is configured for logical replication mode + /// Establishes a connection to Postgres in replication mode. pub async fn connect(pg_connection_config: PgConnectionConfig) -> EtlResult { match pg_connection_config.tls.enabled { true => PgReplicationClient::connect_tls(pg_connection_config).await, @@ -176,9 +156,17 @@ impl PgReplicationClient { } } - /// Establishes a connection to Postgres without TLS encryption. + /// Establishes a regular (non-replication) connection to Postgres. /// - /// The connection is configured for logical replication mode. + /// This is used for heartbeat connections that need to execute regular SQL + /// commands like `pg_logical_emit_message()` without replication mode. + pub async fn connect_regular(pg_connection_config: PgConnectionConfig) -> EtlResult { + match pg_connection_config.tls.enabled { + true => PgReplicationClient::connect_regular_tls(pg_connection_config).await, + false => PgReplicationClient::connect_regular_no_tls(pg_connection_config).await, + } + } + async fn connect_no_tls(pg_connection_config: PgConnectionConfig) -> EtlResult { let mut config: Config = pg_connection_config .clone() @@ -201,24 +189,13 @@ impl PgReplicationClient { }) } - /// Establishes a TLS-encrypted connection to Postgres. - /// - /// The connection is configured for logical replication mode async fn connect_tls(pg_connection_config: PgConnectionConfig) -> EtlResult { let mut config: Config = pg_connection_config .clone() .with_db(Some(&ETL_REPLICATION_OPTIONS)); config.replication_mode(ReplicationMode::Logical); - let mut root_store = rustls::RootCertStore::empty(); - if pg_connection_config.tls.enabled { - for cert in CertificateDer::pem_slice_iter( - pg_connection_config.tls.trusted_root_certs.as_bytes(), - ) { - let cert = cert?; - root_store.add(cert)?; - } - }; + let root_store = build_root_cert_store(&pg_connection_config)?; let tls_config = ClientConfig::builder() .with_root_certificates(root_store) @@ -240,19 +217,50 @@ impl PgReplicationClient { }) } + async fn connect_regular_no_tls(pg_connection_config: PgConnectionConfig) -> EtlResult { + let config: Config = pg_connection_config + .clone() + .with_db(Some(&ETL_HEARTBEAT_OPTIONS)); + + let (client, connection) = config.connect(NoTls).await?; + + spawn_postgres_connection::(connection); + + info!("connected to postgres (regular mode) without tls"); + + Ok(client) + } + + async fn connect_regular_tls(pg_connection_config: PgConnectionConfig) -> EtlResult { + let config: Config = pg_connection_config + .clone() + .with_db(Some(&ETL_HEARTBEAT_OPTIONS)); + + let root_store = build_root_cert_store(&pg_connection_config)?; + + let tls_config = ClientConfig::builder() + .with_root_certificates(root_store) + .with_no_client_auth(); + + let (client, connection) = config.connect(MakeRustlsConnect::new(tls_config)).await?; + + spawn_postgres_connection::(connection); + + info!("connected to postgres (regular mode) with tls"); + + Ok(client) + } + /// Checks if the underlying connection is closed. pub fn is_closed(&self) -> bool { self.client.is_closed() } - /// Creates a new logical replication slot with the specified name and a transaction which is set - /// on the snapshot exported by the slot creation. + /// Creates a new logical replication slot with the specified name and a transaction. pub async fn create_slot_with_transaction( &self, slot_name: &str, ) -> EtlResult<(PgReplicationSlotTransaction, CreateSlotResult)> { - // TODO: check if we want to consume the client and return it on commit to avoid any other - // operations on a connection that has started a transaction. let transaction = PgReplicationSlotTransaction::new(self.clone()).await?; let slot = self.create_slot_internal(slot_name, true).await?; @@ -265,8 +273,6 @@ impl PgReplicationClient { } /// Gets the slot by `slot_name`. - /// - /// Returns an error in case of failure or missing slot. pub async fn get_slot(&self, slot_name: &str) -> EtlResult { let query = format!( r#"select confirmed_flush_lsn from pg_replication_slots where slot_name = {};"#, @@ -298,14 +304,6 @@ impl PgReplicationClient { } /// Gets an existing replication slot or creates a new one if it doesn't exist. - /// - /// This method first attempts to get the slot by name. If the slot doesn't exist, - /// it creates a new one. - /// - /// Returns a tuple containing: - /// - A boolean indicating whether the slot was created (true) or already existed (false) - /// - The slot result containing either the confirmed_flush_lsn (for existing slots) - /// or the consistent_point (for newly created slots) pub async fn get_or_create_slot(&self, slot_name: &str) -> EtlResult { match self.get_slot(slot_name).await { Ok(slot) => { @@ -325,11 +323,8 @@ impl PgReplicationClient { } /// Deletes a replication slot with the specified name. - /// - /// Returns an error if the slot doesn't exist or if there are any issues with the deletion. pub async fn delete_slot(&self, slot_name: &str) -> EtlResult<()> { info!(slot_name, "deleting replication slot"); - // Do not convert the query or the options to lowercase, see comment in `create_slot_internal`. let query = format!( r#"DROP_REPLICATION_SLOT {} WAIT;"#, quote_identifier(slot_name) @@ -383,9 +378,6 @@ impl PgReplicationClient { } /// Retrieves the `publish_via_partition_root` setting for a publication. - /// - /// Returns `true` if the publication is configured to send replication messages using - /// the parent table OID, or `false` if it sends them using child partition OIDs. pub async fn get_publish_via_partition_root(&self, publication: &str) -> EtlResult { let query = format!( "select pubviaroot from pg_publication where pubname = {};", @@ -408,9 +400,6 @@ impl PgReplicationClient { } /// Checks if any of the provided table IDs are partitioned tables. - /// - /// A partitioned table is one where `relkind = 'p'` in `pg_class`. - /// Returns `true` if at least one table is partitioned, `false` otherwise. pub async fn has_partitioned_tables(&self, table_ids: &[TableId]) -> EtlResult { if table_ids.is_empty() { return Ok(false); @@ -463,10 +452,6 @@ impl PgReplicationClient { } /// Retrieves the OIDs of all tables included in a publication. - /// - /// For partitioned tables with `publish_via_partition_root=true`, this returns only the parent - /// table OID. The query uses a recursive CTE to walk up the partition inheritance hierarchy - /// and identify root tables that have no parent themselves. pub async fn get_publication_table_ids( &self, publication_name: &str, @@ -474,8 +459,6 @@ impl PgReplicationClient { let query = format!( r#" with recursive pub_tables as ( - -- Get all tables from publication (pg_publication_tables includes explicit tables, - -- ALL TABLES publications, and FOR TABLES IN SCHEMA publications) select c.oid from pg_publication_tables pt join pg_class c on c.relname = pt.tablename @@ -483,17 +466,14 @@ impl PgReplicationClient { where pt.pubname = {pub} ), hierarchy(relid) as ( - -- Start with published tables select oid from pub_tables union - -- Recursively find parent tables in inheritance hierarchy select i.inhparent from pg_inherits i join hierarchy h on h.relid = i.inhrelid ) - -- Return only root tables (those without a parent) select distinct relid as oid from hierarchy where not exists ( @@ -515,8 +495,6 @@ impl PgReplicationClient { } /// Starts a logical replication stream from the specified publication and slot. - /// - /// The stream will begin reading changes from the provided `start_lsn`. pub async fn start_logical_replication( &self, publication_name: &str, @@ -525,7 +503,6 @@ impl PgReplicationClient { ) -> EtlResult { info!(publication_name, slot_name, %start_lsn, "starting logical replication"); - // Do not convert the query or the options to lowercase, see comment in `create_slot_internal`. let options = format!( r#"("proto_version" '1', "publication_names" {})"#, quote_literal(quote_identifier(publication_name).as_ref()), @@ -544,10 +521,6 @@ impl PgReplicationClient { Ok(stream) } - /// Begins a new transaction with repeatable read isolation level. - /// - /// The transaction doesn't make any assumptions about the snapshot in use, since this is a - /// concern of the statements issued within the transaction. async fn begin_tx(&self) -> EtlResult<()> { self.client .simple_query("begin read only isolation level repeatable read;") @@ -556,32 +529,23 @@ impl PgReplicationClient { Ok(()) } - /// Commits the current transaction. async fn commit_tx(&self) -> EtlResult<()> { self.client.simple_query("commit;").await?; Ok(()) } - /// Rolls back the current transaction. async fn rollback_tx(&self) -> EtlResult<()> { self.client.simple_query("rollback;").await?; Ok(()) } - /// Internal helper method to create a replication slot. - /// - /// The `use_snapshot` parameter determines whether to use a snapshot for the slot creation. async fn create_slot_internal( &self, slot_name: &str, use_snapshot: bool, ) -> EtlResult { - // Do not convert the query or the options to lowercase, since the lexer for - // replication commands (repl_scanner.l) in Postgres code expects the commands - // in uppercase. This probably should be fixed in upstream, but for now we will - // keep the commands in uppercase. let snapshot_option = if use_snapshot { "USE_SNAPSHOT" } else { @@ -632,10 +596,6 @@ impl PgReplicationClient { )) } - /// Retrieves the schema for a single table. - /// - /// If a publication is specified, only columns included in that publication - /// will be returned. async fn get_table_schema( &self, table_id: TableId, @@ -651,9 +611,6 @@ impl PgReplicationClient { }) } - /// Loads the table name and schema information for a given table OID. - /// - /// Returns a `TableName` containing both the schema and table name. async fn get_table_name(&self, table_id: TableId) -> EtlResult { let table_info_query = format!( "select n.nspname as schema_name, c.relname as table_name @@ -683,12 +640,6 @@ impl PgReplicationClient { ); } - /// Builds SQL fragments for filtering columns based on publication settings. - /// - /// Returns CTEs and predicates that filter columns according to: - /// - Postgres 15+: Column-level filtering using `prattrs` - /// - Postgres 14 and earlier: Table-level filtering only - /// - No publication: No filtering (empty strings) fn build_publication_filter_sql( &self, table_id: TableId, @@ -701,7 +652,6 @@ impl PgReplicationClient { }; }; - // Postgres 15+ supports column-level filtering via prattrs if requires_version!(self.server_version, POSTGRES_15) { return PublicationFilter { ctes: format!( @@ -739,7 +689,6 @@ impl PgReplicationClient { }; } - // Postgres 14 and earlier: table-level filtering only PublicationFilter { ctes: format!( "pub_info as ( @@ -760,30 +709,22 @@ impl PgReplicationClient { } } - /// Retrieves schema information for all columns in a table. - /// - /// If a publication is specified, only columns included in that publication - /// will be returned. Generated columns are always excluded since they are not - /// supported in PostgreSQL logical replication. async fn get_column_schemas( &self, table_id: TableId, publication: Option<&str>, ) -> EtlResult> { - // Build publication filter CTEs and predicates based on Postgres version. let publication_filter = self.build_publication_filter_sql(table_id, publication); let column_info_query = format!( r#" with {publication_ctes} - -- Find the direct parent table (for child partitions) direct_parent as ( select i.inhparent as parent_oid from pg_inherits i where i.inhrelid = {table_id} limit 1 ), - -- Extract primary key column names from the parent table parent_pk_cols as ( select array_agg(a.attname order by x.n) as pk_column_names from pg_constraint con @@ -799,9 +740,7 @@ impl PgReplicationClient { a.atttypmod, a.attnotnull, case - -- Check if column has a direct primary key index when coalesce(i.indisprimary, false) = true then true - -- Check if column name matches parent's primary key (for partitions) when exists ( select 1 from parent_pk_cols pk @@ -825,7 +764,6 @@ impl PgReplicationClient { publication_predicate = publication_filter.predicate, ); - // Check for generated columns so we can warn if there are any. let generated_columns_check_query = format!( r#"select exists ( select 1 @@ -854,7 +792,6 @@ impl PgReplicationClient { table_id ); } - // Explicity break for clarity; this query returns a single SimpleQueryMessage::Row. break; } } @@ -887,24 +824,19 @@ impl PgReplicationClient { } /// Retrieves the publication row filter for a table. - /// If no publication is specified, we will always return None pub async fn get_row_filter( &self, table_id: TableId, publication_name: Option<&str>, ) -> EtlResult> { - // Row filters on publications were added in Postgres 15. For any earlier versions we know that there is no row filter if below_version!(self.server_version, POSTGRES_15) { return Ok(None); } - // If we don't have a publication the row filter is implicitly non-existent let publication = match publication_name { Some(publication) => publication, _ => return Ok(None), }; - // This uses the same query as the `pg_publication_tables`, but with some minor tweaks (COALESCE, only return the rowfilter, - // filter on oid and pubname). All of these are available >= Postgres 15. let row_filter_query = format!( "select pt.rowfilter as row_filter from pg_publication_tables pt @@ -931,8 +863,6 @@ impl PgReplicationClient { } /// Creates a COPY stream for reading data from a table using its OID. - /// - /// The stream will include only the specified columns and use text format, and respect publication row filters (if a publication is specified) pub async fn get_table_copy_stream( &self, table_id: TableId, @@ -949,7 +879,6 @@ impl PgReplicationClient { let filter = self.get_row_filter(table_id, publication).await?; let copy_query = if let Some(pred) = filter { - // Use select-form so we can add where. format!( r#"copy (select {} from {} where {}) to stdout with (format text);"#, column_list, @@ -969,9 +898,6 @@ impl PgReplicationClient { Ok(stream) } - /// Helper function to extract a value from a SimpleQueryMessage::Row - /// - /// Returns an error if the column is not found or if the value cannot be parsed to the target type. async fn get_row_value( row: &SimpleQueryRow, column_name: &str, diff --git a/etl/src/test_utils/pipeline.rs b/etl/src/test_utils/pipeline.rs index 2e186920d..7b034eca2 100644 --- a/etl/src/test_utils/pipeline.rs +++ b/etl/src/test_utils/pipeline.rs @@ -149,6 +149,8 @@ where id: self.pipeline_id, publication_name: self.publication_name, pg_connection: self.pg_connection_config, + primary_connection: None, + heartbeat: None, batch: self.batch.unwrap_or(BatchConfig { max_size: 1, max_fill_ms: 1000, diff --git a/etl/src/workers/heartbeat.rs b/etl/src/workers/heartbeat.rs new file mode 100644 index 000000000..6db73f0cb --- /dev/null +++ b/etl/src/workers/heartbeat.rs @@ -0,0 +1,358 @@ +//! Heartbeat worker for maintaining replication slot activity. +//! +//! When replicating from a read replica (PostgreSQL 15+), the replication slot on the +//! primary can become inactive during idle periods. This worker periodically emits +//! `pg_logical_emit_message()` calls to the primary to keep the slot active. + +use crate::concurrency::shutdown::ShutdownRx; +use crate::error::ErrorKind; +use crate::etl_error; +use crate::metrics::{ + ETL_HEARTBEAT_CONSECUTIVE_FAILURES, ETL_HEARTBEAT_CONNECTION_ATTEMPTS_TOTAL, + ETL_HEARTBEAT_EMISSIONS_TOTAL, ETL_HEARTBEAT_FAILURES_TOTAL, + ETL_HEARTBEAT_LAST_EMISSION_TIMESTAMP, PIPELINE_ID_LABEL, +}; +use crate::replication::client::PgReplicationClient; +use crate::types::PipelineId; +use crate::workers::base::WorkerHandle; +use etl_config::shared::{HeartbeatConfig, PgConnectionConfig}; +use metrics::{counter, gauge}; +use rand::Rng; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use thiserror::Error; +use tokio::task::JoinHandle; +use tracing::{error, info, warn}; + +/// Errors that can occur during heartbeat operations. +#[derive(Debug, Error)] +pub enum HeartbeatError { + /// Failed to connect to the primary database. + #[error("failed to connect to primary: {0}")] + ConnectionFailed(String), + + /// Failed to emit heartbeat message. + #[error("failed to emit heartbeat: {0}")] + EmitFailed(String), + + /// The heartbeat worker was shut down. + #[error("heartbeat worker shutdown")] + Shutdown, +} + +/// Handle to a running heartbeat worker. +/// +/// Provides methods to wait for worker completion. +pub struct HeartbeatWorkerHandle { + join_handle: Option>, +} + +impl WorkerHandle<()> for HeartbeatWorkerHandle { + /// Returns the state of the heartbeat worker (unit type as heartbeat has no state). + fn state(&self) -> () {} + + /// Waits for the heartbeat worker to complete. + /// + /// Returns when the worker has shut down, either gracefully or due to an error. + async fn wait(mut self) -> crate::error::EtlResult<()> { + let Some(handle) = self.join_handle.take() else { + return Ok(()); + }; + + handle.await.map_err(|err| { + etl_error!( + ErrorKind::HeartbeatWorkerPanic, + "Heartbeat worker panicked", + err + ) + })?; + + Ok(()) + } +} + +/// Worker that maintains replication slot activity on the primary database. +/// +/// When replicating from a read replica, the replication slot on the primary can become +/// inactive during idle periods since WAL is generated on the primary but consumed from +/// the replica. This worker periodically emits heartbeat messages to generate WAL activity. +pub struct HeartbeatWorker { + pipeline_id: PipelineId, + primary_config: PgConnectionConfig, + heartbeat_config: HeartbeatConfig, + shutdown_rx: ShutdownRx, + consecutive_failures: u32, +} + +impl HeartbeatWorker { + /// Creates a new heartbeat worker. + /// + /// # Arguments + /// + /// * `pipeline_id` - The pipeline ID for metrics labeling. + /// * `primary_config` - Connection configuration for the primary database. + /// * `heartbeat_config` - Configuration for heartbeat timing and backoff. + /// * `shutdown_rx` - Receiver for shutdown signals. + pub fn new( + pipeline_id: PipelineId, + primary_config: PgConnectionConfig, + heartbeat_config: HeartbeatConfig, + shutdown_rx: ShutdownRx, + ) -> Self { + Self { + pipeline_id, + primary_config, + heartbeat_config, + shutdown_rx, + consecutive_failures: 0, + } + } + + /// Starts the heartbeat worker in a background task. + /// + /// Returns a handle that can be used to wait for the worker to complete. + pub fn start(self) -> HeartbeatWorkerHandle { + let join_handle = tokio::spawn(async move { + self.run().await; + }); + + HeartbeatWorkerHandle { + join_handle: Some(join_handle), + } + } + + /// Main loop for the heartbeat worker. + async fn run(mut self) { + info!( + pipeline_id = %self.pipeline_id, + interval_ms = self.heartbeat_config.interval_ms, + "starting heartbeat worker" + ); + + loop { + // Check for shutdown before attempting connection. + if self.is_shutdown_requested() { + info!(pipeline_id = %self.pipeline_id, "heartbeat worker shutting down"); + break; + } + + // Attempt to connect and run heartbeat loop. + match self.connect_and_heartbeat().await { + Ok(()) => { + // Clean shutdown requested. + info!(pipeline_id = %self.pipeline_id, "heartbeat worker completed"); + break; + } + Err(HeartbeatError::Shutdown) => { + info!(pipeline_id = %self.pipeline_id, "heartbeat worker shutting down"); + break; + } + Err(e) => { + self.consecutive_failures += 1; + self.record_failure(); + + warn!( + pipeline_id = %self.pipeline_id, + error = %e, + consecutive_failures = self.consecutive_failures, + "heartbeat error, will retry" + ); + + // Calculate backoff with jitter. + let backoff = self.calculate_backoff(); + + info!( + pipeline_id = %self.pipeline_id, + backoff_ms = backoff.as_millis(), + "waiting before retry" + ); + + // Wait for backoff duration or shutdown. + if self.wait_with_shutdown(backoff).await { + info!(pipeline_id = %self.pipeline_id, "heartbeat worker shutting down during backoff"); + break; + } + } + } + } + + // Reset metrics on shutdown. + gauge!( + ETL_HEARTBEAT_CONSECUTIVE_FAILURES, + PIPELINE_ID_LABEL => self.pipeline_id.to_string() + ) + .set(0.0); + } + + /// Connects to the primary and runs the heartbeat loop. + async fn connect_and_heartbeat(&mut self) -> Result<(), HeartbeatError> { + // Record connection attempt. + counter!( + ETL_HEARTBEAT_CONNECTION_ATTEMPTS_TOTAL, + PIPELINE_ID_LABEL => self.pipeline_id.to_string() + ) + .increment(1); + + info!( + pipeline_id = %self.pipeline_id, + host = %self.primary_config.host, + port = self.primary_config.port, + "connecting to primary for heartbeat" + ); + + // Connect in regular (non-replication) mode. + let client = PgReplicationClient::connect_regular(self.primary_config.clone()) + .await + .map_err(|e| HeartbeatError::ConnectionFailed(e.to_string()))?; + + info!( + pipeline_id = %self.pipeline_id, + "connected to primary, starting heartbeat loop" + ); + + // Reset consecutive failures on successful connection. + self.consecutive_failures = 0; + self.update_consecutive_failures_metric(); + + // Run heartbeat loop. + self.heartbeat_loop(&client).await + } + + /// Runs the heartbeat emission loop. + /// + /// Emits an initial heartbeat immediately, then continues emitting at the configured interval. + async fn heartbeat_loop( + &mut self, + client: &tokio_postgres::Client, + ) -> Result<(), HeartbeatError> { + let interval = Duration::from_millis(self.heartbeat_config.interval_ms); + + loop { + // Check if connection is still alive before emitting. + if client.is_closed() { + return Err(HeartbeatError::ConnectionFailed( + "connection closed unexpectedly".to_string(), + )); + } + + // Emit heartbeat message. + self.emit_heartbeat(client).await?; + + // Wait for interval or shutdown. + if self.wait_with_shutdown(interval).await { + return Ok(()); + } + } + } + + /// Emits a heartbeat message to the primary. + /// + /// Uses `pg_logical_emit_message()` to generate a WAL record that will flow + /// through the replication chain without affecting any tables. + async fn emit_heartbeat(&self, client: &tokio_postgres::Client) -> Result<(), HeartbeatError> { + // Use non-transactional message (false) so it's immediately visible in WAL. + // The 'etl_heartbeat' prefix identifies these messages. + // Empty payload minimizes WAL size. + let result = client + .simple_query("SELECT pg_logical_emit_message(false, 'etl_heartbeat', '')") + .await; + + match result { + Ok(_) => { + self.record_emission(); + Ok(()) + } + Err(e) => { + // Note: record_failure() is called by the caller in run() to avoid double counting + Err(HeartbeatError::EmitFailed(e.to_string())) + } + } + } + + /// Calculates the backoff duration with jitter. + fn calculate_backoff(&self) -> Duration { + let min_backoff = Duration::from_millis(self.heartbeat_config.min_backoff_ms); + let max_backoff = Duration::from_millis(self.heartbeat_config.max_backoff_ms); + + // Exponential backoff: min * 2^failures, capped at max. + let exp_backoff = min_backoff + .saturating_mul(2u32.saturating_pow(self.consecutive_failures.saturating_sub(1))); + let base_backoff = exp_backoff.min(max_backoff); + + // Add jitter to prevent thundering herd. + self.apply_jitter(base_backoff) + } + + /// Applies jitter to a duration using the configured jitter percentage. + /// + /// Uses the `rand` crate for proper randomization to prevent thundering herd. + fn apply_jitter(&self, base_backoff: Duration) -> Duration { + let jitter_fraction = self.heartbeat_config.jitter_percent as f64 / 100.0; + let jitter_range = base_backoff.as_secs_f64() * jitter_fraction; + + // Use rand crate for proper randomization. + let mut rng = rand::thread_rng(); + let jitter = rng.gen_range(-jitter_range..jitter_range); + + // Apply jitter and clamp to minimum of 0.1 seconds. + let jittered_secs = (base_backoff.as_secs_f64() + jitter).max(0.1); + Duration::from_secs_f64(jittered_secs) + } + + /// Checks if shutdown has been requested. + fn is_shutdown_requested(&self) -> bool { + *self.shutdown_rx.borrow() + } + + /// Waits for a duration or until shutdown is requested. + /// + /// Returns `true` if shutdown was requested, `false` if the duration elapsed. + async fn wait_with_shutdown(&mut self, duration: Duration) -> bool { + tokio::select! { + _ = tokio::time::sleep(duration) => false, + result = self.shutdown_rx.changed() => { + // Channel closed or value changed to true. + result.is_err() || *self.shutdown_rx.borrow() + } + } + } + + /// Records a successful heartbeat emission in metrics. + fn record_emission(&self) { + counter!( + ETL_HEARTBEAT_EMISSIONS_TOTAL, + PIPELINE_ID_LABEL => self.pipeline_id.to_string() + ) + .increment(1); + + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64(); + + gauge!( + ETL_HEARTBEAT_LAST_EMISSION_TIMESTAMP, + PIPELINE_ID_LABEL => self.pipeline_id.to_string() + ) + .set(timestamp); + } + + /// Records a heartbeat failure in metrics. + fn record_failure(&self) { + counter!( + ETL_HEARTBEAT_FAILURES_TOTAL, + PIPELINE_ID_LABEL => self.pipeline_id.to_string() + ) + .increment(1); + + self.update_consecutive_failures_metric(); + } + + /// Updates the consecutive failures gauge metric. + fn update_consecutive_failures_metric(&self) { + gauge!( + ETL_HEARTBEAT_CONSECUTIVE_FAILURES, + PIPELINE_ID_LABEL => self.pipeline_id.to_string() + ) + .set(self.consecutive_failures as f64); + } +} diff --git a/etl/src/workers/mod.rs b/etl/src/workers/mod.rs index 311364024..5792084e6 100644 --- a/etl/src/workers/mod.rs +++ b/etl/src/workers/mod.rs @@ -6,5 +6,6 @@ pub mod apply; pub mod base; +pub mod heartbeat; pub mod pool; pub mod table_sync;