diff --git a/Cargo.lock b/Cargo.lock index b04e8847d..c8ac6d9fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1305,6 +1305,12 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "data-encoding" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" + [[package]] name = "deadpool" version = "0.12.3" @@ -1703,6 +1709,8 @@ dependencies = [ "serde", "serde_json", "tokio", + "tokio-stream", + "tokio-tungstenite", "tonic", "tracing", "uuid", @@ -5972,6 +5980,22 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-tungstenite" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a9daff607c6d2bf6c16fd681ccb7eecc83e4e2cdc1ca067ffaadfca5de7f084" +dependencies = [ + "futures-util", + "log", + "rustls", + "rustls-native-certs 0.8.3", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.18" @@ -6243,6 +6267,25 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4793cb5e56680ecbb1d843515b23b6de9a75eb04b66643e256a396d43be33c13" +dependencies = [ + "bytes", + "data-encoding", + "http 1.4.0", + "httparse", + "log", + "rand 0.9.2", + "rustls", + "rustls-pki-types", + "sha1", + "thiserror 2.0.17", + "utf-8", +] + [[package]] name = "twox-hash" version = "2.1.2" @@ -6677,7 +6720,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 0a455ea45..cdd01ddaf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,7 @@ anyhow = { version = "1.0.98", default-features = false } arrow = { version = "57.0", default-features = false } async-trait = { version = "0.1.88" } aws-lc-rs = { version = "1.13.3", default-features = false } -base64 = { version = "0.22.1", default-features = false } +base64 = { version = "0.22.1", default-features = false, features = ["std"] } byteorder = { version = "1.5.0", default-features = false } bytes = { version = "1.10.1" } chrono = { version = "0.4.41", default-features = false } @@ -82,6 +82,7 @@ tokio = { version = "1.47.0", default-features = false } tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, rev = "c4b473b478b3adfbf8667d2fbe895d8423f1290b" } tokio-rustls = { version = "0.26.2", default-features = false } tokio-stream = { version = "0.1.18", default-features = false, features = ["sync"] } +tokio-tungstenite = { version = "0.26", default-features = false, features = ["connect", "rustls-tls-native-roots"] } tonic = { version = "0.14.2", default-features = false } tracing = { version = "0.1.41", default-features = false } tracing-actix-web = { version = "0.7.19", default-features = false } diff --git a/etl-config/src/shared/destination.rs b/etl-config/src/shared/destination.rs index d7b12cbfe..86beb9181 100644 --- a/etl-config/src/shared/destination.rs +++ b/etl-config/src/shared/destination.rs @@ -46,6 +46,24 @@ pub enum DestinationConfig { #[serde(flatten)] config: IcebergConfig, }, + #[serde(rename = "realtime")] + SupabaseRealtime { + /// WebSocket URL, e.g. wss://project.supabase.co/realtime/v1/websocket + url: String, + /// API key (anon key) for authenticating with Realtime. + api_key: SecretString, + /// Whether to use private channels (prefixes topic with `"private:"`). + /// Defaults to `false`. + #[serde(default)] + private_channels: bool, + /// Maximum number of send retries before giving up. Defaults to `5`. + #[serde(default = "default_max_retries")] + max_retries: u32, + }, +} + +fn default_max_retries() -> u32 { + 5 } impl DestinationConfig { @@ -199,6 +217,12 @@ pub enum DestinationConfigWithoutSecrets { #[serde(flatten)] config: IcebergConfigWithoutSecrets, }, + #[serde(rename = "realtime")] + SupabaseRealtime { + url: String, + private_channels: bool, + max_retries: u32, + }, } impl From for DestinationConfigWithoutSecrets { @@ -219,6 +243,16 @@ impl From for DestinationConfigWithoutSecrets { DestinationConfig::Iceberg { config } => DestinationConfigWithoutSecrets::Iceberg { config: config.into(), }, + DestinationConfig::SupabaseRealtime { + url, + api_key: _, + private_channels, + max_retries, + } => DestinationConfigWithoutSecrets::SupabaseRealtime { + url, + private_channels, + max_retries, + }, } } } diff --git a/etl-destinations/Cargo.toml b/etl-destinations/Cargo.toml index 4df9681f1..80df5b262 100644 --- a/etl-destinations/Cargo.toml +++ b/etl-destinations/Cargo.toml @@ -31,6 +31,16 @@ iceberg = [ "dep:serde", "dep:serde_json", ] +realtime = [ + "dep:tokio-tungstenite", + "dep:tokio", + "tokio/time", + "dep:tokio-stream", + "dep:futures", + "dep:tracing", + "dep:serde_json", + "dep:base64", +] egress = ["etl/egress"] # We assume that `test-utils` is always used in conjunction with `bigquery` or `iceberg` thus we only # put here the extra dependencies needed. @@ -43,6 +53,7 @@ arrow = { workspace = true, optional = true } async-trait = { workspace = true, optional = true } base64 = { workspace = true, optional = true } chrono = { workspace = true } +futures = { workspace = true, optional = true } gcp-bigquery-client = { workspace = true, optional = true, features = [ "rust-tls", "aws-lc-rs", @@ -57,6 +68,8 @@ reqwest = { workspace = true, optional = true, features = ["json"] } serde = { workspace = true, optional = true, features = ["derive"] } serde_json = { workspace = true, optional = true } tokio = { workspace = true, optional = true, features = ["sync"] } +tokio-stream = { workspace = true, optional = true } +tokio-tungstenite = { workspace = true, optional = true } tonic = { workspace = true, optional = true } tracing = { workspace = true, optional = true, default-features = true } uuid = { workspace = true, optional = true, features = ["v4"] } diff --git a/etl-destinations/src/lib.rs b/etl-destinations/src/lib.rs index 371cef719..092af6719 100644 --- a/etl-destinations/src/lib.rs +++ b/etl-destinations/src/lib.rs @@ -9,3 +9,5 @@ pub mod bigquery; pub mod egress; #[cfg(feature = "iceberg")] pub mod iceberg; +#[cfg(feature = "realtime")] +pub mod realtime; diff --git a/etl-destinations/src/realtime/connection.rs b/etl-destinations/src/realtime/connection.rs new file mode 100644 index 000000000..57e27c7a2 --- /dev/null +++ b/etl-destinations/src/realtime/connection.rs @@ -0,0 +1,225 @@ +use etl::error::{ErrorKind, EtlResult}; +use etl::etl_error; +use futures::SinkExt; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use tokio::net::TcpStream; +use tokio::time::Duration; +use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::Message}; +use tracing::{debug, info}; + +const BACKOFF_BASE_MS: u64 = 1_000; +const BACKOFF_MAX_MS: u64 = 30_000; + +/// A live WebSocket connection to Supabase Realtime. +struct RealtimeConnection { + ws: WebSocketStream>, +} + +impl RealtimeConnection { + async fn connect(url: &str, api_key: &str) -> EtlResult { + let connect_url = format!("{}?apikey={}&vsn=2.0.0", url, api_key); + debug!(url = %url, "connecting to Realtime WebSocket"); + + let (ws, _) = connect_async(&connect_url).await.map_err(|e| { + etl_error!( + ErrorKind::DestinationConnectionFailed, + "Failed to connect to Realtime WebSocket", + format!("URL: {url}, error: {e}"), + source: e + ) + })?; + + info!(url = %url, "connected to Realtime WebSocket"); + Ok(Self { ws }) + } + + async fn send(&mut self, message: &str) -> EtlResult<()> { + self.ws + .send(Message::Text(message.to_string().into())) + .await + .map_err(|e| { + etl_error!( + ErrorKind::DestinationIoError, + "Failed to send message to Realtime", + format!("error: {e}"), + source: e + ) + }) + } +} + +/// Manages a WebSocket connection with reconnect support. +/// +/// All sends are fire-and-forget: no ack is expected from Realtime. +/// Retry logic is intentionally kept outside this struct (in `core.rs`) so that +/// the lock on this struct is not held during backoff sleeps. +pub struct ConnectionManager { + connection: Option, + url: String, + api_key: String, + /// Incremented on every successful reconnect so callers can detect that + /// previously-joined channels must be re-subscribed on the new connection. + /// Shared with [`RealtimeDestination`] via `Arc` so generation can be read + /// without acquiring the `Mutex`. + generation: Arc, +} + +impl ConnectionManager { + /// Creates a new manager and returns a shared handle to the connection generation counter. + pub fn new(url: String, api_key: String) -> (Self, Arc) { + let generation = Arc::new(AtomicU64::new(0)); + ( + Self { + connection: None, + url, + api_key, + generation: Arc::clone(&generation), + }, + generation, + ) + } + + /// Ensures the connection is established, connecting if necessary. + pub async fn ensure_connected(&mut self) -> EtlResult<()> { + if self.connection.is_none() { + self.connection = Some(RealtimeConnection::connect(&self.url, &self.api_key).await?); + } + Ok(()) + } + + /// Attempts a single send. Ensures the connection first, then sends once. + /// + /// On send failure the connection is cleared so the next call will reconnect. + /// Callers are responsible for retrying with backoff. + pub async fn try_send(&mut self, message: &str) -> EtlResult<()> { + self.ensure_connected().await?; + let conn = self + .connection + .as_mut() + .expect("ensure_connected guarantees Some"); + match conn.send(message).await { + Ok(()) => Ok(()), + Err(e) => { + self.connection = None; + Err(e) + } + } + } + + pub(super) async fn reconnect(&mut self) -> EtlResult<()> { + let conn = RealtimeConnection::connect(&self.url, &self.api_key).await?; + self.connection = Some(conn); + self.generation.fetch_add(1, Ordering::Relaxed); + Ok(()) + } + + /// Closes the connection gracefully. + pub async fn close(&mut self) { + if let Some(mut conn) = self.connection.take() { + let _ = conn.ws.close(None).await; + } + } +} + +pub(super) fn backoff_duration(attempt: u32) -> Duration { + let shift = attempt.min(63) as u64; + let ms = BACKOFF_BASE_MS + .saturating_mul(1u64 << shift) + .min(BACKOFF_MAX_MS); + Duration::from_millis(ms) +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::StreamExt; + use std::net::SocketAddr; + use tokio::net::TcpListener; + use tokio_tungstenite::accept_async; + + #[test] + fn connection_manager_starts_disconnected() { + let (mgr, _gen) = ConnectionManager::new("wss://example.com".into(), "key".into()); + assert!(mgr.connection.is_none()); + } + + #[test] + fn backoff_duration_doubles_each_attempt() { + assert_eq!(backoff_duration(0), Duration::from_millis(1_000)); + assert_eq!(backoff_duration(1), Duration::from_millis(2_000)); + assert_eq!(backoff_duration(2), Duration::from_millis(4_000)); + assert_eq!(backoff_duration(3), Duration::from_millis(8_000)); + assert_eq!(backoff_duration(4), Duration::from_millis(16_000)); + } + + #[test] + fn backoff_duration_caps_at_max() { + assert_eq!(backoff_duration(5), Duration::from_millis(30_000)); + assert_eq!(backoff_duration(10), Duration::from_millis(30_000)); + assert_eq!(backoff_duration(u32::MAX), Duration::from_millis(30_000)); + } + + async fn bind_listener() -> (TcpListener, SocketAddr) { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + (listener, addr) + } + + async fn mock_server(listener: TcpListener) -> tokio::task::JoinHandle> { + tokio::spawn(async move { + let (stream, _) = listener.accept().await.unwrap(); + let mut ws = accept_async(stream).await.unwrap(); + let mut received = Vec::new(); + while let Some(Ok(msg)) = ws.next().await { + match msg { + tokio_tungstenite::tungstenite::Message::Text(t) => { + received.push(t.to_string()); + } + tokio_tungstenite::tungstenite::Message::Close(_) => break, + _ => {} + } + } + received + }) + } + + #[tokio::test] + async fn try_send_delivers_message() { + let (listener, addr) = bind_listener().await; + let server = mock_server(listener).await; + + let url = format!("ws://{addr}/"); + let (mut mgr, _gen) = ConnectionManager::new(url, "key".into()); + + mgr.try_send("hello").await.unwrap(); + mgr.close().await; + + let messages = server.await.unwrap(); + assert_eq!(messages, vec!["hello"]); + } + + #[tokio::test] + async fn try_send_delivers_multiple_messages_in_order() { + let (listener, addr) = bind_listener().await; + let server = mock_server(listener).await; + + let url = format!("ws://{addr}/"); + let (mut mgr, _gen) = ConnectionManager::new(url, "key".into()); + + mgr.try_send("first").await.unwrap(); + mgr.try_send("second").await.unwrap(); + mgr.try_send("third").await.unwrap(); + mgr.close().await; + + let messages = server.await.unwrap(); + assert_eq!(messages, vec!["first", "second", "third"]); + } + + #[tokio::test] + async fn try_send_fails_when_no_server_available() { + let (mut mgr, _gen) = ConnectionManager::new("ws://127.0.0.1:1/".into(), "key".into()); + let result = mgr.try_send("hello").await; + assert!(result.is_err()); + } +} diff --git a/etl-destinations/src/realtime/core.rs b/etl-destinations/src/realtime/core.rs new file mode 100644 index 000000000..936d0d741 --- /dev/null +++ b/etl-destinations/src/realtime/core.rs @@ -0,0 +1,637 @@ +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + +use etl::destination::Destination; +use etl::error::{ErrorKind, EtlResult}; +use etl::etl_error; +use etl::types::{Event, PgLsn, TableId, TableRow, TableSchema}; +use tokio::sync::{Mutex, RwLock}; +use tokio::task::AbortHandle; +use tokio::time::{Duration, MissedTickBehavior, interval, sleep}; +use tokio_stream::StreamExt; +use tokio_stream::wrappers::IntervalStream; +use tracing::{debug, warn}; + +#[cfg(feature = "egress")] +use crate::egress::{PROCESSING_TYPE_STREAMING, log_processed_bytes}; + +use super::connection::{ConnectionManager, backoff_duration}; +use super::encoding::{ + build_broadcast_message, build_heartbeat_message, build_join_message, build_topic, + delete_payload, insert_payload, table_row_to_json, truncate_payload, update_payload, +}; + +pub const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(20); +const INSERT_EVENT: &str = "insert"; +const UPDATE_EVENT: &str = "update"; +const DELETE_EVENT: &str = "delete"; + +pub struct RealtimeConfig { + pub url: String, + pub api_key: String, + pub private_channels: bool, + pub max_retries: u32, + /// How often to send a Phoenix heartbeat. Defaults to 20 s. + pub heartbeat_interval: Duration, +} + +/// Destination that broadcasts replicated Postgres changes to Supabase Realtime channels. +#[derive(Clone)] +pub struct RealtimeDestination { + private_channels: bool, + max_retries: u32, + connection: Arc>, + /// Shared generation counter from `ConnectionManager`. Readable without locking. + connection_generation: Arc, + /// Cache of table schemas populated from `Relation` events. + schema_cache: Arc>>>, + /// Topics that have received a `phx_join` in the current connection generation. + joined_topics: Arc>>, + /// The connection generation when `joined_topics` was last populated. + joined_generation: Arc, + /// Handle to abort the background heartbeat task when this destination is dropped/shutdown. + heartbeat_abort: AbortHandle, +} + +/// Sends `msg` with retry, releasing the `ConnectionManager` lock between attempts. +/// +/// This avoids holding the lock during backoff sleeps, preventing the heartbeat +/// or other senders from being starved while a retry waits. +async fn send_with_retry( + connection: &Mutex, + msg: &str, + max_retries: u32, +) -> EtlResult<()> { + let mut last_err = None; + for attempt in 0..=max_retries { + match connection.lock().await.try_send(msg).await { + Ok(()) => return Ok(()), + Err(e) => { + warn!(attempt, error = %e, "send failed, reconnecting"); + last_err = Some(e); + sleep(backoff_duration(attempt)).await; + if let Err(e) = connection.lock().await.reconnect().await { + warn!(error = %e, "reconnect failed"); + } + } + } + } + Err(last_err.expect("loop ran at least once")) +} + +impl RealtimeDestination { + pub fn new(config: RealtimeConfig) -> Self { + let (manager, connection_generation) = ConnectionManager::new(config.url, config.api_key); + let connection = Arc::new(Mutex::new(manager)); + + let connection_clone = Arc::clone(&connection); + let max_retries = config.max_retries; + let heartbeat_interval = config.heartbeat_interval; + let abort_handle = tokio::spawn(async move { + let mut ticker = interval(heartbeat_interval); + ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); + let mut stream = IntervalStream::new(ticker); + let heartbeat = build_heartbeat_message(); + stream.next().await; // skip the immediate first tick + loop { + stream.next().await; + if let Err(e) = send_with_retry(&connection_clone, &heartbeat, max_retries).await { + warn!(error = %e, "heartbeat failed after all retries"); + } + } + }) + .abort_handle(); + + Self { + private_channels: config.private_channels, + max_retries, + connection, + connection_generation, + schema_cache: Arc::new(RwLock::new(HashMap::new())), + joined_topics: Arc::new(RwLock::new(HashSet::new())), + joined_generation: Arc::new(AtomicU64::new(0)), + heartbeat_abort: abort_handle, + } + } + + /// Ensures the channel for `topic` has been joined, sending `phx_join` if needed. + /// + /// If the underlying connection was re-established since the last join, all + /// previously-joined topics are cleared and the current topic is re-joined. + async fn ensure_joined(&self, topic: &str) -> EtlResult<()> { + let current_gen = self.connection_generation.load(Ordering::Relaxed); + { + let mut topics = self.joined_topics.write().await; + if self.joined_generation.load(Ordering::Relaxed) != current_gen { + topics.clear(); + self.joined_generation.store(current_gen, Ordering::Relaxed); + } + if !topics.insert(topic.to_string()) { + return Ok(()); + } + } // write lock released before network I/O + send_with_retry( + &self.connection, + &build_join_message(topic), + self.max_retries, + ) + .await?; + debug!(topic, "joined realtime channel"); + Ok(()) + } + + /// Sends a broadcast message to `topic`, joining first if required. + async fn broadcast( + &self, + topic: &str, + event: &str, + payload: serde_json::Value, + ) -> EtlResult { + self.ensure_joined(topic).await?; + let msg = build_broadcast_message(topic, event, payload); + let bytes = msg.len(); + send_with_retry(&self.connection, &msg, self.max_retries).await?; + Ok(bytes) + } +} + +impl Destination for RealtimeDestination { + fn name() -> &'static str { + "realtime" + } + + async fn shutdown(&self) -> EtlResult<()> { + self.heartbeat_abort.abort(); + self.connection.lock().await.close().await; + Ok(()) + } + + async fn truncate_table(&self, table_id: TableId) -> EtlResult<()> { + let schema = self + .schema_cache + .read() + .await + .get(&table_id) + .cloned() + .ok_or_else(|| { + etl_error!( + ErrorKind::MissingTableSchema, + "Schema not found for table", + format!("table_id={table_id}") + ) + })?; + let topic = build_topic(&schema.name, self.private_channels); + let payload = truncate_payload(&schema.name, PgLsn::from(0u64)); + self.broadcast(&topic, DELETE_EVENT, payload).await?; + Ok(()) + } + + async fn write_table_rows( + &self, + table_id: TableId, + table_rows: Vec, + ) -> EtlResult<()> { + if table_rows.is_empty() { + return Ok(()); + } + let schema = self + .schema_cache + .read() + .await + .get(&table_id) + .cloned() + .ok_or_else(|| { + etl_error!( + ErrorKind::MissingTableSchema, + "Schema not found for table", + format!("table_id={table_id}") + ) + })?; + let topic = build_topic(&schema.name, self.private_channels); + for row in table_rows { + let record = table_row_to_json(&row, &schema); + let payload = insert_payload(&schema.name, record, PgLsn::from(0u64)); + self.broadcast(&topic, INSERT_EVENT, payload).await?; + } + Ok(()) + } + + async fn write_events(&self, events: Vec) -> EtlResult<()> { + #[cfg_attr(not(feature = "egress"), allow(unused_variables))] + let mut bytes_sent: u64 = 0; + + for event in events { + match event { + Event::Relation(rel) => { + self.schema_cache + .write() + .await + .insert(rel.table_schema.id, Arc::new(rel.table_schema)); + } + Event::Insert(ins) => { + let schema = self + .schema_cache + .read() + .await + .get(&ins.table_id) + .cloned() + .ok_or_else(|| { + etl_error!( + ErrorKind::MissingTableSchema, + "Schema not found for table", + format!("table_id={}", ins.table_id) + ) + })?; + let topic = build_topic(&schema.name, self.private_channels); + let record = table_row_to_json(&ins.table_row, &schema); + let payload = insert_payload(&schema.name, record, ins.commit_lsn); + bytes_sent += self.broadcast(&topic, INSERT_EVENT, payload).await? as u64; + } + Event::Update(upd) => { + let schema = self + .schema_cache + .read() + .await + .get(&upd.table_id) + .cloned() + .ok_or_else(|| { + etl_error!( + ErrorKind::MissingTableSchema, + "Schema not found for table", + format!("table_id={}", upd.table_id) + ) + })?; + let topic = build_topic(&schema.name, self.private_channels); + let record = table_row_to_json(&upd.table_row, &schema); + let old_record = upd + .old_table_row + .as_ref() + .map(|(_, row)| table_row_to_json(row, &schema)); + let payload = update_payload(&schema.name, record, old_record, upd.commit_lsn); + bytes_sent += self.broadcast(&topic, UPDATE_EVENT, payload).await? as u64; + } + Event::Delete(del) => { + let schema = self + .schema_cache + .read() + .await + .get(&del.table_id) + .cloned() + .ok_or_else(|| { + etl_error!( + ErrorKind::MissingTableSchema, + "Schema not found for table", + format!("table_id={}", del.table_id) + ) + })?; + let topic = build_topic(&schema.name, self.private_channels); + let old_record = del + .old_table_row + .as_ref() + .map(|(_, row)| table_row_to_json(row, &schema)); + let payload = delete_payload(&schema.name, old_record, del.commit_lsn); + bytes_sent += self.broadcast(&topic, DELETE_EVENT, payload).await? as u64; + } + Event::Truncate(trunc) => { + for rel_id in &trunc.rel_ids { + let table_id = TableId::new(*rel_id); + match self.schema_cache.read().await.get(&table_id).cloned() { + Some(schema) => { + let topic = build_topic(&schema.name, self.private_channels); + let payload = truncate_payload(&schema.name, trunc.commit_lsn); + bytes_sent += + self.broadcast(&topic, DELETE_EVENT, payload).await? as u64; + } + None => { + warn!(table_id = %table_id, "skipping truncate for unknown table"); + } + } + } + } + Event::Begin(_) | Event::Commit(_) | Event::Unsupported => {} + } + } + + #[cfg(feature = "egress")] + if bytes_sent > 0 { + log_processed_bytes(Self::name(), PROCESSING_TYPE_STREAMING, bytes_sent, 0); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use etl::destination::Destination; + use etl::types::{ + Cell, ColumnSchema, DeleteEvent, InsertEvent, RelationEvent, TableName, TruncateEvent, + Type, UpdateEvent, + }; + use futures::StreamExt; + use std::net::SocketAddr; + use tokio::net::TcpListener; + use tokio_tungstenite::accept_async; + + async fn bind_listener() -> (TcpListener, SocketAddr) { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + (listener, addr) + } + + async fn mock_server(listener: TcpListener) -> tokio::task::JoinHandle> { + tokio::spawn(async move { + let (stream, _) = listener.accept().await.unwrap(); + let mut ws = accept_async(stream).await.unwrap(); + let mut received = Vec::new(); + while let Some(Ok(msg)) = ws.next().await { + match msg { + tokio_tungstenite::tungstenite::Message::Text(t) => { + received.push(t.to_string()); + } + tokio_tungstenite::tungstenite::Message::Close(_) => break, + _ => {} + } + } + received + }) + } + + fn test_schema() -> TableSchema { + TableSchema::new( + TableId::new(1), + TableName::new("public".into(), "users".into()), + vec![ColumnSchema::new("id".into(), Type::INT4, -1, false, true)], + ) + } + + fn test_config(url: String) -> RealtimeConfig { + RealtimeConfig { + url, + api_key: "test_key".into(), + private_channels: false, + max_retries: 0, + heartbeat_interval: Duration::from_secs(3600), + } + } + + fn lsn() -> PgLsn { + PgLsn::from(0u64) + } + + fn relation_event(schema: TableSchema) -> Event { + Event::Relation(RelationEvent { + start_lsn: lsn(), + commit_lsn: lsn(), + table_schema: schema, + }) + } + + fn parsed(msg: &str) -> serde_json::Value { + serde_json::from_str(msg).unwrap() + } + + #[tokio::test] + async fn inserted_row_is_broadcast_with_correct_payload() { + let (listener, addr) = bind_listener().await; + let server = mock_server(listener).await; + let dest = RealtimeDestination::new(test_config(format!("ws://{addr}/"))); + + dest.write_events(vec![ + relation_event(test_schema()), + Event::Insert(InsertEvent { + start_lsn: lsn(), + commit_lsn: lsn(), + table_id: TableId::new(1), + table_row: TableRow::new(vec![Cell::I32(42)]), + }), + ]) + .await + .unwrap(); + + dest.shutdown().await.unwrap(); + let msgs = server.await.unwrap(); + + let broadcast = msgs + .iter() + .map(|m| parsed(m)) + .find(|m| m[3] == "broadcast") + .unwrap(); + assert_eq!(broadcast[4]["event"], "insert"); + assert_eq!(broadcast[4]["payload"]["op"], "INSERT"); + assert_eq!(broadcast[4]["payload"]["schema"], "public"); + assert_eq!(broadcast[4]["payload"]["table"], "users"); + assert_eq!(broadcast[4]["payload"]["record"]["id"], 42); + } + + #[tokio::test] + async fn write_events_update_broadcasts_record_and_old_record() { + let (listener, addr) = bind_listener().await; + let server = mock_server(listener).await; + let dest = RealtimeDestination::new(test_config(format!("ws://{addr}/"))); + + let schema = test_schema(); + dest.write_events(vec![ + relation_event(schema), + Event::Update(UpdateEvent { + start_lsn: lsn(), + commit_lsn: lsn(), + table_id: TableId::new(1), + table_row: TableRow::new(vec![Cell::I32(2)]), + old_table_row: Some((false, TableRow::new(vec![Cell::I32(1)]))), + }), + ]) + .await + .unwrap(); + + dest.shutdown().await.unwrap(); + let msgs = server.await.unwrap(); + + assert_eq!(msgs.len(), 2); + let broadcast = parsed(&msgs[1]); + assert_eq!(broadcast[4]["event"], "update"); + assert_eq!(broadcast[4]["payload"]["op"], "UPDATE"); + assert_eq!(broadcast[4]["payload"]["record"]["id"], 2); + assert_eq!(broadcast[4]["payload"]["old_record"]["id"], 1); + } + + #[tokio::test] + async fn write_events_delete_broadcasts_old_record() { + let (listener, addr) = bind_listener().await; + let server = mock_server(listener).await; + let dest = RealtimeDestination::new(test_config(format!("ws://{addr}/"))); + + let schema = test_schema(); + dest.write_events(vec![ + relation_event(schema), + Event::Delete(DeleteEvent { + start_lsn: lsn(), + commit_lsn: lsn(), + table_id: TableId::new(1), + old_table_row: Some((false, TableRow::new(vec![Cell::I32(99)]))), + }), + ]) + .await + .unwrap(); + + dest.shutdown().await.unwrap(); + let msgs = server.await.unwrap(); + + assert_eq!(msgs.len(), 2); + let broadcast = parsed(&msgs[1]); + assert_eq!(broadcast[4]["event"], "delete"); + assert_eq!(broadcast[4]["payload"]["op"], "DELETE"); + assert_eq!(broadcast[4]["payload"]["old_record"]["id"], 99); + assert_eq!(broadcast[4]["payload"]["record"], serde_json::Value::Null); + } + + #[tokio::test] + async fn write_events_truncate_broadcasts_delete_event() { + let (listener, addr) = bind_listener().await; + let server = mock_server(listener).await; + let dest = RealtimeDestination::new(test_config(format!("ws://{addr}/"))); + + let schema = test_schema(); + dest.write_events(vec![ + relation_event(schema), + Event::Truncate(TruncateEvent { + start_lsn: lsn(), + commit_lsn: lsn(), + options: 0, + rel_ids: vec![1], + }), + ]) + .await + .unwrap(); + + dest.shutdown().await.unwrap(); + let msgs = server.await.unwrap(); + + assert_eq!(msgs.len(), 2); + let broadcast = parsed(&msgs[1]); + assert_eq!(broadcast[4]["event"], "delete"); + assert_eq!(broadcast[4]["payload"]["op"], "TRUNCATE"); + assert_eq!(broadcast[4]["payload"]["schema"], "public"); + assert_eq!(broadcast[4]["payload"]["table"], "users"); + } + + #[tokio::test] + async fn write_events_insert_without_relation_returns_error() { + let dest = RealtimeDestination::new(test_config("ws://127.0.0.1:1/".into())); + + let result = dest + .write_events(vec![Event::Insert(InsertEvent { + start_lsn: lsn(), + commit_lsn: lsn(), + table_id: TableId::new(1), + table_row: TableRow::new(vec![Cell::I32(1)]), + })]) + .await; + + dest.heartbeat_abort.abort(); + assert!(result.is_err()); + } + + #[tokio::test] + async fn write_table_rows_broadcasts_as_inserts() { + let (listener, addr) = bind_listener().await; + let server = mock_server(listener).await; + let dest = RealtimeDestination::new(test_config(format!("ws://{addr}/"))); + + let schema = test_schema(); + dest.write_events(vec![relation_event(schema)]) + .await + .unwrap(); + + dest.write_table_rows( + TableId::new(1), + vec![ + TableRow::new(vec![Cell::I32(1)]), + TableRow::new(vec![Cell::I32(2)]), + ], + ) + .await + .unwrap(); + + dest.shutdown().await.unwrap(); + let msgs = server.await.unwrap(); + + // phx_join + 2 broadcasts + assert_eq!(msgs.len(), 3); + assert_eq!(parsed(&msgs[0])[3], "phx_join"); + for msg in &msgs[1..] { + let b = parsed(msg); + assert_eq!(b[4]["event"], "insert"); + assert_eq!(b[4]["payload"]["op"], "INSERT"); + } + assert_eq!(parsed(&msgs[1])[4]["payload"]["record"]["id"], 1); + assert_eq!(parsed(&msgs[2])[4]["payload"]["record"]["id"], 2); + } + + #[tokio::test] + async fn events_across_multiple_transactions_are_all_delivered() { + let (listener, addr) = bind_listener().await; + let server = mock_server(listener).await; + let dest = RealtimeDestination::new(test_config(format!("ws://{addr}/"))); + + let schema = test_schema(); + let insert = |id| { + Event::Insert(InsertEvent { + start_lsn: lsn(), + commit_lsn: lsn(), + table_id: TableId::new(1), + table_row: TableRow::new(vec![Cell::I32(id)]), + }) + }; + + dest.write_events(vec![relation_event(schema.clone()), insert(1)]) + .await + .unwrap(); + dest.write_events(vec![relation_event(schema), insert(2)]) + .await + .unwrap(); + + dest.shutdown().await.unwrap(); + let msgs = server.await.unwrap(); + + let broadcasts: Vec<_> = msgs + .iter() + .map(|m| parsed(m)) + .filter(|m| m[3] == "broadcast") + .collect(); + assert_eq!(broadcasts.len(), 2); + assert_eq!(broadcasts[0][4]["payload"]["record"]["id"], 1); + assert_eq!(broadcasts[1][4]["payload"]["record"]["id"], 2); + } + + #[tokio::test] + async fn private_channels_prefixes_topic() { + let (listener, addr) = bind_listener().await; + let server = mock_server(listener).await; + let dest = RealtimeDestination::new(RealtimeConfig { + private_channels: true, + ..test_config(format!("ws://{addr}/")) + }); + + let schema = test_schema(); + dest.write_events(vec![ + relation_event(schema), + Event::Insert(InsertEvent { + start_lsn: lsn(), + commit_lsn: lsn(), + table_id: TableId::new(1), + table_row: TableRow::new(vec![Cell::I32(1)]), + }), + ]) + .await + .unwrap(); + + dest.shutdown().await.unwrap(); + let msgs = server.await.unwrap(); + + let join = parsed(&msgs[0]); + assert_eq!(join[2], "realtime:private:etl:public.users"); + } +} diff --git a/etl-destinations/src/realtime/encoding.rs b/etl-destinations/src/realtime/encoding.rs new file mode 100644 index 000000000..dfd8167e8 --- /dev/null +++ b/etl-destinations/src/realtime/encoding.rs @@ -0,0 +1,454 @@ +use base64::{Engine, engine::general_purpose::STANDARD as BASE64}; +use etl::types::{ArrayCell, Cell, PgLsn, TableName, TableRow, TableSchema}; +use serde_json::{Map, Value}; + +/// Serializes a [`TableRow`] into a JSON object using the column names from the schema. +pub(super) fn table_row_to_json(row: &TableRow, schema: &TableSchema) -> Value { + let mut map = Map::with_capacity(schema.column_schemas.len()); + for (cell, col) in row.values().iter().zip(schema.column_schemas.iter()) { + map.insert(col.name.clone(), cell_to_json(cell)); + } + Value::Object(map) +} + +/// Converts a [`Cell`] to its JSON representation. +/// +/// Special cases: +/// - `f32`/`f64` NaN/Inf → string (`"NaN"`, `"Infinity"`, `"-Infinity"`) +/// - `i64` values outside safe integer range (>2^53) → string to preserve precision +/// - `Bytes` → base64-encoded string +/// - `Numeric` NaN/Inf variants → string +/// - `Array` elements containing `None` → JSON `null` +pub(super) fn cell_to_json(cell: &Cell) -> Value { + match cell { + Cell::Null => Value::Null, + Cell::Bool(v) => Value::Bool(*v), + Cell::I16(v) => Value::Number((*v).into()), + Cell::I32(v) => Value::Number((*v).into()), + Cell::U32(v) => Value::Number((*v).into()), + Cell::I64(v) => { + const MAX_SAFE: i64 = 1 << 53; + if v.abs() > MAX_SAFE { + Value::String(v.to_string()) + } else { + Value::Number((*v).into()) + } + } + Cell::F32(v) => f32_to_json(*v), + Cell::F64(v) => f64_to_json(*v), + Cell::Numeric(n) => { + use etl::types::PgNumeric; + match n { + PgNumeric::NaN => Value::String("NaN".into()), + PgNumeric::PositiveInfinity => Value::String("Infinity".into()), + PgNumeric::NegativeInfinity => Value::String("-Infinity".into()), + PgNumeric::Value { .. } => Value::String(n.to_string()), + } + } + Cell::Date(v) => Value::String(v.to_string()), + Cell::Time(v) => Value::String(v.to_string()), + Cell::Timestamp(v) => Value::String(v.to_string()), + Cell::TimestampTz(v) => Value::String(v.to_rfc3339()), + Cell::Uuid(v) => Value::String(v.to_string()), + Cell::Json(v) => v.clone(), + Cell::String(v) => Value::String(v.clone()), + Cell::Bytes(v) => Value::String(BASE64.encode(v)), + Cell::Array(arr) => array_cell_to_json(arr), + } +} + +fn f32_to_json(v: f32) -> Value { + if v.is_nan() { + Value::String("NaN".into()) + } else if v == f32::INFINITY { + Value::String("Infinity".into()) + } else if v == f32::NEG_INFINITY { + Value::String("-Infinity".into()) + } else { + serde_json::Number::from_f64(v as f64) + .map(Value::Number) + .unwrap_or(Value::Null) + } +} + +fn f64_to_json(v: f64) -> Value { + if v.is_nan() { + Value::String("NaN".into()) + } else if v == f64::INFINITY { + Value::String("Infinity".into()) + } else if v == f64::NEG_INFINITY { + Value::String("-Infinity".into()) + } else { + serde_json::Number::from_f64(v) + .map(Value::Number) + .unwrap_or(Value::Null) + } +} + +fn array_cell_to_json(arr: &ArrayCell) -> Value { + match arr { + ArrayCell::Bool(v) => Value::Array( + v.iter() + .map(|e| opt_to_json(e, |b| Value::Bool(*b))) + .collect(), + ), + ArrayCell::String(v) => Value::Array( + v.iter() + .map(|e| opt_to_json(e, |s| Value::String(s.clone()))) + .collect(), + ), + ArrayCell::I16(v) => Value::Array( + v.iter() + .map(|e| opt_to_json(e, |n| Value::Number((*n).into()))) + .collect(), + ), + ArrayCell::I32(v) => Value::Array( + v.iter() + .map(|e| opt_to_json(e, |n| Value::Number((*n).into()))) + .collect(), + ), + ArrayCell::U32(v) => Value::Array( + v.iter() + .map(|e| opt_to_json(e, |n| Value::Number((*n).into()))) + .collect(), + ), + ArrayCell::I64(v) => Value::Array( + v.iter() + .map(|e| { + opt_to_json(e, |n| { + const MAX_SAFE: i64 = 1 << 53; + if n.abs() > MAX_SAFE { + Value::String(n.to_string()) + } else { + Value::Number((*n).into()) + } + }) + }) + .collect(), + ), + ArrayCell::F32(v) => Value::Array( + v.iter() + .map(|e| opt_to_json(e, |n| f32_to_json(*n))) + .collect(), + ), + ArrayCell::F64(v) => Value::Array( + v.iter() + .map(|e| opt_to_json(e, |n| f64_to_json(*n))) + .collect(), + ), + ArrayCell::Numeric(v) => Value::Array( + v.iter() + .map(|e| { + e.as_ref() + .map(|n| cell_to_json(&Cell::Numeric(n.clone()))) + .unwrap_or(Value::Null) + }) + .collect(), + ), + ArrayCell::Date(v) => Value::Array( + v.iter() + .map(|e| opt_to_json(e, |d| Value::String(d.to_string()))) + .collect(), + ), + ArrayCell::Time(v) => Value::Array( + v.iter() + .map(|e| opt_to_json(e, |t| Value::String(t.to_string()))) + .collect(), + ), + ArrayCell::Timestamp(v) => Value::Array( + v.iter() + .map(|e| opt_to_json(e, |t| Value::String(t.to_string()))) + .collect(), + ), + ArrayCell::TimestampTz(v) => Value::Array( + v.iter() + .map(|e| opt_to_json(e, |t| Value::String(t.to_rfc3339()))) + .collect(), + ), + ArrayCell::Uuid(v) => Value::Array( + v.iter() + .map(|e| opt_to_json(e, |u| Value::String(u.to_string()))) + .collect(), + ), + ArrayCell::Json(v) => { + Value::Array(v.iter().map(|e| e.clone().unwrap_or(Value::Null)).collect()) + } + ArrayCell::Bytes(v) => Value::Array( + v.iter() + .map(|e| opt_to_json(e, |b| Value::String(BASE64.encode(b)))) + .collect(), + ), + } +} + +fn opt_to_json Value>(opt: &Option, f: F) -> Value { + opt.as_ref().map(f).unwrap_or(Value::Null) +} + +const CHANNEL_PREFIX: &str = "etl"; + +/// Builds the topic string for a Realtime channel. +/// +/// Format: `realtime:etl:.` for public channels +/// or `realtime:private:etl:.
` for private channels. +/// The `realtime:` prefix is required by the Supabase Realtime server. +pub(super) fn build_topic(table_name: &TableName, private_channels: bool) -> String { + let channel = format!("{CHANNEL_PREFIX}:{}.{}", table_name.schema, table_name.name); + if private_channels { + format!("realtime:private:{channel}") + } else { + format!("realtime:{channel}") + } +} + +/// Builds a Phoenix v2 `phx_join` message for the given topic. +pub(super) fn build_join_message(topic: &str) -> String { + serde_json::json!([ + null, + "1", + topic, + "phx_join", + { + "config": { + "broadcast": { "self": false, "ack": false }, + "presence": { "key": "" }, + "postgres_changes": [] + } + } + ]) + .to_string() +} + +/// Builds a Phoenix v2 heartbeat message. +/// +/// Must be sent every ~25 seconds to prevent the server from closing the connection. +pub(super) fn build_heartbeat_message() -> String { + serde_json::json!([null, "hb", "phoenix", "heartbeat", {}]).to_string() +} + +/// Builds a Phoenix v2 broadcast message for a CDC event. +pub(super) fn build_broadcast_message(topic: &str, event: &str, payload: Value) -> String { + serde_json::json!([null, null, topic, "broadcast", {"event": event, "payload": payload}]) + .to_string() +} + +/// Builds the payload for an INSERT event. +pub(super) fn insert_payload(table_name: &TableName, record: Value, commit_lsn: PgLsn) -> Value { + serde_json::json!({ + "op": "INSERT", + "schema": table_name.schema, + "table": table_name.name, + "record": record, + "old_record": null, + "commit_lsn": commit_lsn.to_string() + }) +} + +/// Builds the payload for an UPDATE event. +pub(super) fn update_payload( + table_name: &TableName, + record: Value, + old_record: Option, + commit_lsn: PgLsn, +) -> Value { + serde_json::json!({ + "op": "UPDATE", + "schema": table_name.schema, + "table": table_name.name, + "record": record, + "old_record": old_record.unwrap_or(Value::Null), + "commit_lsn": commit_lsn.to_string() + }) +} + +/// Builds the payload for a DELETE event. +pub(super) fn delete_payload( + table_name: &TableName, + old_record: Option, + commit_lsn: PgLsn, +) -> Value { + serde_json::json!({ + "op": "DELETE", + "schema": table_name.schema, + "table": table_name.name, + "record": null, + "old_record": old_record.unwrap_or(Value::Null), + "commit_lsn": commit_lsn.to_string() + }) +} + +/// Builds the payload for a TRUNCATE event. +pub(super) fn truncate_payload(table_name: &TableName, commit_lsn: PgLsn) -> Value { + serde_json::json!({ + "op": "TRUNCATE", + "schema": table_name.schema, + "table": table_name.name, + "record": null, + "old_record": null, + "commit_lsn": commit_lsn.to_string() + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use etl::types::PgNumeric; + use etl::types::{ArrayCell, Cell}; + + // These tests cover non-obvious serialization contracts that affect Realtime + // consumers — cases where a naive implementation would silently lose data or + // produce invalid JSON. + + #[test] + fn large_i64_is_serialized_as_string_to_preserve_js_precision() { + let large: i64 = (1_i64 << 53) + 1; + assert_eq!( + cell_to_json(&Cell::I64(large)), + Value::String(large.to_string()) + ); + } + + #[test] + fn float_special_values_are_serialized_as_strings_because_json_has_no_nan_or_inf() { + assert_eq!( + cell_to_json(&Cell::F64(f64::NAN)), + Value::String("NaN".into()) + ); + assert_eq!( + cell_to_json(&Cell::F64(f64::INFINITY)), + Value::String("Infinity".into()) + ); + assert_eq!( + cell_to_json(&Cell::F64(f64::NEG_INFINITY)), + Value::String("-Infinity".into()) + ); + assert_eq!( + cell_to_json(&Cell::F32(f32::NAN)), + Value::String("NaN".into()) + ); + assert_eq!( + cell_to_json(&Cell::F32(f32::INFINITY)), + Value::String("Infinity".into()) + ); + assert_eq!( + cell_to_json(&Cell::F32(f32::NEG_INFINITY)), + Value::String("-Infinity".into()) + ); + } + + #[test] + fn numeric_special_values_are_serialized_as_strings() { + assert_eq!( + cell_to_json(&Cell::Numeric(PgNumeric::NaN)), + Value::String("NaN".into()) + ); + assert_eq!( + cell_to_json(&Cell::Numeric(PgNumeric::PositiveInfinity)), + Value::String("Infinity".into()) + ); + assert_eq!( + cell_to_json(&Cell::Numeric(PgNumeric::NegativeInfinity)), + Value::String("-Infinity".into()) + ); + let n = PgNumeric::Value { + weight: 0, + sign: etl::types::Sign::Positive, + scale: 2, + digits: vec![1, 50], + }; + assert_eq!( + cell_to_json(&Cell::Numeric(n.clone())), + Value::String(n.to_string()) + ); + } + + #[test] + fn bytes_are_base64_encoded() { + let bytes = vec![0u8, 1, 2, 255]; + assert_eq!( + cell_to_json(&Cell::Bytes(bytes.clone())), + Value::String(BASE64.encode(&bytes)) + ); + } + + #[test] + fn array_nulls_are_preserved_as_json_null() { + let arr = Cell::Array(ArrayCell::String(vec![ + Some("a".into()), + None, + Some("b".into()), + ])); + assert_eq!(cell_to_json(&arr), serde_json::json!(["a", null, "b"])); + } + + #[test] + fn array_large_i64_elements_are_serialized_as_strings() { + let large: i64 = (1_i64 << 53) + 1; + let arr = Cell::Array(ArrayCell::I64(vec![Some(large), None, Some(1i64)])); + let result = cell_to_json(&arr); + assert_eq!(result[0], Value::String(large.to_string())); + assert_eq!(result[1], Value::Null); + assert_eq!(result[2], serde_json::json!(1)); + } + + #[test] + fn array_numeric_special_values_are_serialized_as_strings() { + let arr = Cell::Array(ArrayCell::Numeric(vec![ + Some(PgNumeric::NaN), + None, + Some(PgNumeric::PositiveInfinity), + ])); + let result = cell_to_json(&arr); + assert_eq!(result[0], Value::String("NaN".into())); + assert_eq!(result[1], Value::Null); + assert_eq!(result[2], Value::String("Infinity".into())); + } + + #[test] + fn array_bytes_elements_are_base64_encoded() { + let bytes = vec![0u8, 255u8]; + let arr = Cell::Array(ArrayCell::Bytes(vec![Some(bytes.clone()), None])); + let result = cell_to_json(&arr); + assert_eq!(result[0], Value::String(BASE64.encode(&bytes))); + assert_eq!(result[1], Value::Null); + } + + #[test] + fn public_channel_topic_has_realtime_prefix() { + let name = TableName::new("public".into(), "users".into()); + assert_eq!(build_topic(&name, false), "realtime:etl:public.users"); + } + + #[test] + fn private_channel_topic_includes_private_segment() { + let name = TableName::new("public".into(), "users".into()); + assert_eq!( + build_topic(&name, true), + "realtime:private:etl:public.users" + ); + } + + #[test] + fn join_message_disables_postgres_changes_and_presence() { + let msg = build_join_message("realtime:etl:public.users"); + let parsed: Value = serde_json::from_str(&msg).unwrap(); + let config = &parsed[4]["config"]; + assert_eq!(config["postgres_changes"], serde_json::json!([])); + assert_eq!(config["broadcast"]["self"], false); + assert_eq!(config["broadcast"]["ack"], false); + assert!(config["presence"]["key"].is_string()); + } + + #[test] + fn broadcast_message_structure() { + let payload = serde_json::json!({"op": "INSERT"}); + let msg = build_broadcast_message("etl:public.users", "db_changes", payload.clone()); + let parsed: Value = serde_json::from_str(&msg).unwrap(); + assert_eq!(parsed[0], Value::Null); // join_ref + assert_eq!(parsed[1], Value::Null); // msg_ref (fire-and-forget) + assert_eq!(parsed[2], "etl:public.users"); + assert_eq!(parsed[3], "broadcast"); + assert_eq!(parsed[4]["event"], "db_changes"); + assert_eq!(parsed[4]["payload"], payload); + } +} diff --git a/etl-destinations/src/realtime/mod.rs b/etl-destinations/src/realtime/mod.rs new file mode 100644 index 000000000..29c8c46c9 --- /dev/null +++ b/etl-destinations/src/realtime/mod.rs @@ -0,0 +1,5 @@ +mod connection; +mod core; +mod encoding; + +pub use core::{DEFAULT_HEARTBEAT_INTERVAL, RealtimeConfig, RealtimeDestination}; diff --git a/etl-replicator/Cargo.toml b/etl-replicator/Cargo.toml index 06fcf370f..097794c6e 100644 --- a/etl-replicator/Cargo.toml +++ b/etl-replicator/Cargo.toml @@ -14,7 +14,7 @@ egress = ["etl/egress", "etl-destinations/egress"] [dependencies] etl = { workspace = true } etl-config = { workspace = true, features = ["supabase"] } -etl-destinations = { workspace = true, features = ["bigquery", "iceberg"] } +etl-destinations = { workspace = true, features = ["bigquery", "iceberg", "realtime"] } etl-telemetry = { workspace = true } configcat = { workspace = true } diff --git a/etl-replicator/src/core.rs b/etl-replicator/src/core.rs index 414abed36..a64ee1850 100644 --- a/etl-replicator/src/core.rs +++ b/etl-replicator/src/core.rs @@ -21,6 +21,7 @@ use etl_destinations::iceberg::{ use etl_destinations::{ bigquery::BigQueryDestination, iceberg::{IcebergClient, IcebergDestination}, + realtime::{RealtimeConfig, RealtimeDestination}, }; use secrecy::ExposeSecret; use tokio::signal::unix::{SignalKind, signal}; @@ -137,6 +138,23 @@ pub async fn start_replicator_with_config( }; let destination = IcebergDestination::new(client, namespace, state_store.clone()); + let pipeline = Pipeline::new(replicator_config.pipeline, state_store, destination); + start_pipeline(pipeline).await?; + } + DestinationConfig::SupabaseRealtime { + url, + api_key, + private_channels, + max_retries, + } => { + let destination = RealtimeDestination::new(RealtimeConfig { + url: url.clone(), + api_key: api_key.expose_secret().to_string(), + private_channels: *private_channels, + max_retries: *max_retries, + heartbeat_interval: etl_destinations::realtime::DEFAULT_HEARTBEAT_INTERVAL, + }); + let pipeline = Pipeline::new(replicator_config.pipeline, state_store, destination); start_pipeline(pipeline).await?; } @@ -224,6 +242,17 @@ fn log_destination_config(config: &DestinationConfig) { "using generic rest iceberg destination config" ) } + DestinationConfig::SupabaseRealtime { + url, + api_key: _, + private_channels, + max_retries, + } => { + debug!( + url, + private_channels, max_retries, "using realtime destination config" + ) + } } }