diff --git a/Cargo.toml b/Cargo.toml index 775f17177..8b3c3efa3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "etl-config", "etl-destinations", "etl-examples", + "etl-mysql", "etl-postgres", "etl-replicator", "etl-telemetry", @@ -24,6 +25,7 @@ etl = { path = "etl", default-features = false } etl-api = { path = "etl-api", default-features = false } etl-config = { path = "etl-config", default-features = false } etl-destinations = { path = "etl-destinations", default-features = false } +etl-mysql = { path = "etl-mysql", default-features = false } etl-postgres = { path = "etl-postgres", default-features = false } etl-replicator = { path = "etl-replicator", default-features = false } etl-telemetry = { path = "etl-telemetry", default-features = false } diff --git a/etl-mysql/Cargo.toml b/etl-mysql/Cargo.toml new file mode 100644 index 000000000..dbea6c167 --- /dev/null +++ b/etl-mysql/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "etl-mysql" +version = "0.1.0" +edition.workspace = true +license.workspace = true +rust-version.workspace = true +repository.workspace = true +homepage.workspace = true + +[features] +test-utils = [] +tokio = [] +sqlx = [] +bigquery = [] +replication = ["sqlx"] + +[dependencies] +etl-config = { workspace = true } + +bytes = { workspace = true } +chrono = { workspace = true, features = ["serde"] } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } +sqlx = { workspace = true, features = [ + "runtime-tokio-rustls", + "macros", + "mysql", + "json", + "migrate", + "time", +] } +thiserror = { workspace = true } +tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } +tracing = { workspace = true } diff --git a/etl-mysql/README.md b/etl-mysql/README.md new file mode 100644 index 000000000..06a264d93 --- /dev/null +++ b/etl-mysql/README.md @@ -0,0 +1,14 @@ +# etl-mysql + +MySQL database connection utilities for all crates. + +This crate provides database connection options and utilities for working with MySQL. +It supports the `sqlx` crate through feature flags and includes utilities for MySQL binlog-based replication. + +## Features + +- `sqlx` - Enables sqlx MySQL support +- `tokio` - Enables tokio-based utilities +- `replication` - Enables MySQL binlog replication support +- `test-utils` - Enables test utilities +- `bigquery` - Enables BigQuery integration support diff --git a/etl-mysql/src/lib.rs b/etl-mysql/src/lib.rs new file mode 100644 index 000000000..daae1af0d --- /dev/null +++ b/etl-mysql/src/lib.rs @@ -0,0 +1,13 @@ +//! MySQL database connection utilities for all crates. +//! +//! This crate provides database connection options and utilities for working with MySQL. +//! It supports the [`sqlx`] crate through feature flags. + +#[cfg(feature = "replication")] +pub mod replication; +#[cfg(feature = "sqlx")] +pub mod sqlx; +#[cfg(feature = "tokio")] +pub mod tokio; +pub mod types; +pub mod version; diff --git a/etl-mysql/src/liblib.rlib b/etl-mysql/src/liblib.rlib new file mode 100644 index 000000000..b1049357f Binary files /dev/null and b/etl-mysql/src/liblib.rlib differ diff --git a/etl-mysql/src/replication/db.rs b/etl-mysql/src/replication/db.rs new file mode 100644 index 000000000..3d76828ed --- /dev/null +++ b/etl-mysql/src/replication/db.rs @@ -0,0 +1,186 @@ +use std::num::NonZeroI32; + +use sqlx::{MySqlPool, Row, mysql::MySqlPoolOptions}; +use thiserror::Error; + +use crate::types::{TableId, TableName}; + +/// MySQL database connection configuration placeholder. +/// +/// This is a simplified version for the MySQL implementation. +/// In production, this should be defined in etl-config. +#[derive(Debug, Clone)] +pub struct MySqlConnectionConfig { + pub host: String, + pub port: u16, + pub name: String, + pub username: String, + pub password: Option, +} + +impl MySqlConnectionConfig { + /// Creates MySQL connection options for connecting to the configured database. + pub fn with_db(&self) -> sqlx::mysql::MySqlConnectOptions { + let mut options = sqlx::mysql::MySqlConnectOptions::new() + .host(&self.host) + .port(self.port) + .username(&self.username) + .database(&self.name); + + if let Some(password) = &self.password { + options = options.password(password); + } + + options + } +} + +/// Errors that can occur during table lookups. +#[derive(Debug, Error)] +pub enum TableLookupError { + #[error("Database error: {0}")] + Database(#[from] sqlx::Error), + + #[error("Table with ID {0} not found")] + TableNotFound(TableId), +} + +/// Connects to the source database with a connection pool. +/// +/// Creates a MySQL connection pool with the specified minimum and maximum +/// connection counts for accessing the source database. +#[cfg(feature = "replication")] +pub async fn connect_to_source_database( + config: &MySqlConnectionConfig, + min_connections: u32, + max_connections: u32, +) -> Result { + let options = config.with_db(); + + let pool = MySqlPoolOptions::new() + .min_connections(min_connections) + .max_connections(max_connections) + .connect_with(options) + .await?; + + Ok(pool) +} + +/// Retrieves table name from table identifier by querying information_schema. +/// +/// Looks up the schema and table name for the given table identifier using MySQL's +/// information_schema database. +pub async fn get_table_name_from_id( + pool: &MySqlPool, + table_id: TableId, +) -> Result { + let query = " + SELECT table_schema, table_name + FROM information_schema.tables + WHERE table_schema = DATABASE() + AND table_name = ? + "; + + let row = sqlx::query(query) + .bind(table_id.into_inner().to_string()) + .fetch_optional(pool) + .await?; + + match row { + Some(row) => { + let schema_name: String = row.try_get("table_schema")?; + let table_name: String = row.try_get("table_name")?; + + Ok(TableName { + schema: schema_name, + name: table_name, + }) + } + None => Err(TableLookupError::TableNotFound(table_id)), + } +} + +/// Extracts the MySQL server version from a version string. +/// +/// This function parses version strings like "8.0.35" or "5.7.44-log" +/// and converts them to the numeric format used by MySQL. +/// +/// Returns the version in the format: MAJOR * 10000 + MINOR * 100 + PATCH +/// For example: MySQL 8.0.35 = 80035, MySQL 5.7.44 = 50744 +/// +/// Returns `None` if the version string cannot be parsed or results in zero. +pub fn extract_server_version(server_version_str: impl AsRef) -> Option { + let version_part = server_version_str + .as_ref() + .split_whitespace() + .next() + .unwrap_or("0.0.0"); + + let version_part = version_part.split('-').next().unwrap_or("0.0.0"); + + let version_components: Vec<&str> = version_part.split('.').collect(); + + let major = version_components + .first() + .and_then(|v| v.parse::().ok()) + .unwrap_or(0); + let minor = version_components + .get(1) + .and_then(|v| v.parse::().ok()) + .unwrap_or(0); + let patch = version_components + .get(2) + .and_then(|v| v.parse::().ok()) + .unwrap_or(0); + + let version = major * 10000 + minor * 100 + patch; + + NonZeroI32::new(version) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_extract_server_version_basic_versions() { + assert_eq!(extract_server_version("8.0.35"), NonZeroI32::new(80035)); + assert_eq!(extract_server_version("5.7.44"), NonZeroI32::new(50744)); + assert_eq!(extract_server_version("8.1.0"), NonZeroI32::new(80100)); + assert_eq!(extract_server_version("8.2.5"), NonZeroI32::new(80205)); + } + + #[test] + fn test_extract_server_version_with_suffixes() { + assert_eq!(extract_server_version("8.0.35-log"), NonZeroI32::new(80035)); + assert_eq!( + extract_server_version("5.7.44-0ubuntu0.18.04.1"), + NonZeroI32::new(50744) + ); + assert_eq!( + extract_server_version("8.0.32-0ubuntu0.22.04.2"), + NonZeroI32::new(80032) + ); + } + + #[test] + fn test_extract_server_version_invalid_inputs() { + assert_eq!(extract_server_version(""), None); + assert_eq!(extract_server_version("invalid"), None); + assert_eq!(extract_server_version("not.a.version"), None); + assert_eq!(extract_server_version("MySQL"), None); + assert_eq!(extract_server_version(" "), None); + } + + #[test] + fn test_extract_server_version_zero_versions() { + assert_eq!(extract_server_version("0.0.0"), None); + assert_eq!(extract_server_version("0.0"), None); + } + + #[test] + fn test_extract_server_version_whitespace_handling() { + assert_eq!(extract_server_version(" 8.0.35 "), NonZeroI32::new(80035)); + assert_eq!(extract_server_version("8.0.35\n"), NonZeroI32::new(80035)); + } +} diff --git a/etl-mysql/src/replication/health.rs b/etl-mysql/src/replication/health.rs new file mode 100644 index 000000000..f58b630cc --- /dev/null +++ b/etl-mysql/src/replication/health.rs @@ -0,0 +1,38 @@ +use sqlx::MySqlExecutor; + +/// Fully-qualified table names required by ETL. +pub const ETL_TABLE_NAMES: [&str; 4] = [ + "etl.replication_state", + "etl.table_mappings", + "etl.table_schemas", + "etl.table_columns", +]; + +/// Returns true if all required ETL tables exist in the source database. +/// +/// Checks presence of the following relations: +/// - etl.replication_state +/// - etl.table_mappings +/// - etl.table_schemas +/// - etl.table_columns +pub async fn etl_tables_present<'c, E>(executor: E) -> Result +where + E: MySqlExecutor<'c>, +{ + // Check if all required tables exist with a single query + let count: i64 = sqlx::query_scalar( + " + SELECT COUNT(DISTINCT CONCAT(table_schema, '.', table_name)) + FROM information_schema.tables + WHERE CONCAT(table_schema, '.', table_name) IN (?, ?, ?, ?) + ", + ) + .bind(ETL_TABLE_NAMES[0]) + .bind(ETL_TABLE_NAMES[1]) + .bind(ETL_TABLE_NAMES[2]) + .bind(ETL_TABLE_NAMES[3]) + .fetch_one(executor) + .await?; + + Ok(count == ETL_TABLE_NAMES.len() as i64) +} diff --git a/etl-mysql/src/replication/lag.rs b/etl-mysql/src/replication/lag.rs new file mode 100644 index 000000000..632ca9c55 --- /dev/null +++ b/etl-mysql/src/replication/lag.rs @@ -0,0 +1,110 @@ +/// MySQL binlog lag monitoring. +/// +/// This module provides utilities for monitoring replication lag using MySQL's binlog position. +/// Unlike PostgreSQL's LSN, MySQL uses file position based replication tracking. + +use sqlx::MySqlPool; +use thiserror::Error; + +/// Errors that can occur during lag calculation. +#[derive(Debug, Error)] +pub enum LagError { + #[error("Database error: {0}")] + Database(#[from] sqlx::Error), + + #[error("Failed to parse binlog position: {0}")] + ParseError(String), +} + +/// Represents a MySQL binlog position. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct BinlogPosition { + /// The binlog file number. + pub file_number: u32, + /// The position within the binlog file. + pub position: u64, +} + +impl BinlogPosition { + pub fn new(file_number: u32, position: u64) -> Self { + Self { + file_number, + position, + } + } + + /// Calculates the approximate lag between two binlog positions. + /// + /// This is an estimation since we can't know the exact size difference + /// without accessing the actual binlog files. + pub fn lag_bytes(&self, other: &BinlogPosition) -> i64 { + if self.file_number == other.file_number { + other.position as i64 - self.position as i64 + } else { + let file_diff = (other.file_number as i64 - self.file_number as i64) * 1024 * 1024; + file_diff + (other.position as i64 - self.position as i64) + } + } +} + +/// Gets the current binlog position from the MySQL server. +pub async fn get_current_binlog_position(pool: &MySqlPool) -> Result { + let row: (String, u64) = sqlx::query_as("SHOW MASTER STATUS") + .fetch_one(pool) + .await?; + + let file_name = row.0; + let position = row.1; + + let file_number = parse_binlog_file_number(&file_name)?; + + Ok(BinlogPosition::new(file_number, position)) +} + +/// Parses a MySQL binlog file name to extract the file number. +/// +/// Binlog files are typically named like "mysql-bin.000123" or "binlog.000456". +fn parse_binlog_file_number(file_name: &str) -> Result { + let parts: Vec<&str> = file_name.split('.').collect(); + if parts.len() < 2 { + return Err(LagError::ParseError(format!( + "Invalid binlog file name: {}", + file_name + ))); + } + + parts + .last() + .and_then(|s| s.parse::().ok()) + .ok_or_else(|| LagError::ParseError(format!("Failed to parse file number: {}", file_name))) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_binlog_file_number() { + assert_eq!(parse_binlog_file_number("mysql-bin.000123").unwrap(), 123); + assert_eq!(parse_binlog_file_number("binlog.000456").unwrap(), 456); + assert_eq!(parse_binlog_file_number("log.001").unwrap(), 1); + } + + #[test] + fn test_parse_binlog_file_number_invalid() { + assert!(parse_binlog_file_number("invalid").is_err()); + assert!(parse_binlog_file_number("").is_err()); + assert!(parse_binlog_file_number("file.abc").is_err()); + } + + #[test] + fn test_binlog_position_lag() { + let pos1 = BinlogPosition::new(1, 1000); + let pos2 = BinlogPosition::new(1, 2000); + assert_eq!(pos1.lag_bytes(&pos2), 1000); + + let pos3 = BinlogPosition::new(1, 1000); + let pos4 = BinlogPosition::new(2, 500); + assert!(pos3.lag_bytes(&pos4) > 0); + } +} diff --git a/etl-mysql/src/replication/mod.rs b/etl-mysql/src/replication/mod.rs new file mode 100644 index 000000000..82f43831d --- /dev/null +++ b/etl-mysql/src/replication/mod.rs @@ -0,0 +1,10 @@ +pub mod db; +pub mod health; +pub mod lag; +pub mod schema; +pub mod slots; +pub mod state; +pub mod table_mappings; +pub mod worker; + +pub use db::*; diff --git a/etl-mysql/src/replication/schema.rs b/etl-mysql/src/replication/schema.rs new file mode 100644 index 000000000..54e631211 --- /dev/null +++ b/etl-mysql/src/replication/schema.rs @@ -0,0 +1,126 @@ +/// MySQL schema information utilities. +/// +/// This module provides functions for querying MySQL's information_schema +/// to retrieve table and column metadata. + +use sqlx::{MySqlPool, Row}; +use thiserror::Error; + +use crate::types::{ColumnSchema, TableId, TableName, TableSchema}; + +/// Errors that can occur during schema operations. +#[derive(Debug, Error)] +pub enum SchemaError { + #[error("Database error: {0}")] + Database(#[from] sqlx::Error), + + #[error("Table {0} not found")] + TableNotFound(String), + + #[error("Invalid schema data: {0}")] + InvalidData(String), +} + +/// Retrieves the complete schema for a table. +pub async fn get_table_schema( + pool: &MySqlPool, + table_name: &TableName, +) -> Result { + let columns = get_table_columns(pool, table_name).await?; + + if columns.is_empty() { + return Err(SchemaError::TableNotFound(table_name.to_string())); + } + + let table_id = compute_table_id(table_name); + + Ok(TableSchema::new(table_id, table_name.clone(), columns)) +} + +/// Retrieves column information for a table. +async fn get_table_columns( + pool: &MySqlPool, + table_name: &TableName, +) -> Result, SchemaError> { + let query = r#" + SELECT + c.column_name, + c.data_type, + c.character_maximum_length, + c.is_nullable, + CASE WHEN k.column_name IS NOT NULL THEN 1 ELSE 0 END as is_primary + FROM information_schema.columns c + LEFT JOIN information_schema.key_column_usage k + ON c.table_schema = k.table_schema + AND c.table_name = k.table_name + AND c.column_name = k.column_name + AND k.constraint_name = 'PRIMARY' + WHERE c.table_schema = ? + AND c.table_name = ? + ORDER BY c.ordinal_position + "#; + + let rows = sqlx::query(query) + .bind(&table_name.schema) + .bind(&table_name.name) + .fetch_all(pool) + .await?; + + let mut columns = Vec::new(); + for row in rows { + let column_name: String = row.try_get("column_name")?; + let data_type: String = row.try_get("data_type")?; + let max_length: Option = row.try_get("character_maximum_length")?; + let is_nullable: String = row.try_get("is_nullable")?; + let is_primary: i32 = row.try_get("is_primary")?; + + let modifier = max_length.unwrap_or(-1) as i32; + let nullable = is_nullable == "YES"; + let primary = is_primary == 1; + + columns.push(ColumnSchema::new( + column_name, + data_type.to_uppercase(), + modifier, + nullable, + primary, + )); + } + + Ok(columns) +} + +/// Computes a deterministic table ID from the table name. +/// +/// Since MySQL doesn't have a direct equivalent to PostgreSQL's OID, +/// we generate a hash-based ID from the table's fully qualified name. +pub fn compute_table_id(table_name: &TableName) -> TableId { + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + + let mut hasher = DefaultHasher::new(); + table_name.schema.hash(&mut hasher); + table_name.name.hash(&mut hasher); + let hash = hasher.finish(); + + TableId::new((hash & 0xFFFFFFFF) as u32) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_compute_table_id_deterministic() { + let table1 = TableName::new("test_db".to_string(), "users".to_string()); + let table2 = TableName::new("test_db".to_string(), "users".to_string()); + let table3 = TableName::new("test_db".to_string(), "posts".to_string()); + + let id1 = compute_table_id(&table1); + let id2 = compute_table_id(&table2); + let id3 = compute_table_id(&table3); + + assert_eq!(id1, id2); + assert_ne!(id1, id3); + } +} diff --git a/etl-mysql/src/replication/slots.rs b/etl-mysql/src/replication/slots.rs new file mode 100644 index 000000000..148e366d5 --- /dev/null +++ b/etl-mysql/src/replication/slots.rs @@ -0,0 +1,209 @@ +use sqlx::MySqlPool; +use thiserror::Error; + +use crate::types::TableId; + +/// Maximum length for a MySQL replication identifier in bytes. +const MAX_IDENTIFIER_LENGTH: usize = 64; + +/// Prefixes for different types of replication identifiers. +pub const APPLY_WORKER_PREFIX: &str = "supabase_etl_apply"; +pub const TABLE_SYNC_WORKER_PREFIX: &str = "supabase_etl_table_sync"; + +/// Error type for slot operations. +#[derive(Debug, Error)] +pub enum EtlReplicationSlotError { + #[error("Invalid identifier length: {0}")] + InvalidIdentifierLength(String), + + #[error("Invalid identifier name: {0}")] + InvalidIdentifierName(String), +} + +/// Parsed representation of a replication identifier name. +/// +/// Note: MySQL doesn't have replication slots like PostgreSQL, but we maintain +/// similar naming conventions for consistency and future CDC integration. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum EtlReplicationSlot { + /// Apply worker identifier for a pipeline. + Apply { pipeline_id: u64 }, + /// Table sync worker identifier for a pipeline and table. + TableSync { pipeline_id: u64, table_id: TableId }, +} + +impl EtlReplicationSlot { + /// Creates a new [`EtlReplicationSlot`] for the apply worker. + pub fn for_apply_worker(pipeline_id: u64) -> Self { + Self::Apply { pipeline_id } + } + + /// Creates a new [`EtlReplicationSlot`] for the table sync worker. + pub fn for_table_sync_worker(pipeline_id: u64, table_id: TableId) -> Self { + Self::TableSync { + pipeline_id, + table_id, + } + } + + /// Returns the prefix of apply sync identifier for a pipeline. + pub fn apply_prefix(pipeline_id: u64) -> Result { + let prefix = format!("{APPLY_WORKER_PREFIX}_{pipeline_id}"); + + if prefix.len() >= MAX_IDENTIFIER_LENGTH { + return Err(EtlReplicationSlotError::InvalidIdentifierLength(prefix)); + } + + Ok(prefix) + } + + /// Returns the prefix of table sync identifiers for a pipeline. + pub fn table_sync_prefix(pipeline_id: u64) -> Result { + let prefix = format!("{TABLE_SYNC_WORKER_PREFIX}_{pipeline_id}_"); + + if prefix.len() >= MAX_IDENTIFIER_LENGTH { + return Err(EtlReplicationSlotError::InvalidIdentifierLength(prefix)); + } + + Ok(prefix) + } +} + +impl TryFrom<&str> for EtlReplicationSlot { + type Error = EtlReplicationSlotError; + + fn try_from(identifier: &str) -> Result { + if let Some(rest) = identifier.strip_prefix(APPLY_WORKER_PREFIX) { + let rest = rest + .strip_prefix('_') + .ok_or_else(|| EtlReplicationSlotError::InvalidIdentifierName(identifier.into()))?; + let pipeline_id: u64 = rest + .parse() + .ok() + .ok_or_else(|| EtlReplicationSlotError::InvalidIdentifierName(identifier.into()))?; + + return Ok(EtlReplicationSlot::for_apply_worker(pipeline_id)); + } + + if let Some(rest) = identifier.strip_prefix(TABLE_SYNC_WORKER_PREFIX) { + let rest = rest + .strip_prefix('_') + .ok_or_else(|| EtlReplicationSlotError::InvalidIdentifierName(identifier.into()))?; + let mut parts = rest.rsplitn(2, '_'); + let table_id_str = parts + .next() + .ok_or_else(|| EtlReplicationSlotError::InvalidIdentifierName(identifier.into()))?; + let pipeline_id_str = parts + .next() + .ok_or_else(|| EtlReplicationSlotError::InvalidIdentifierName(identifier.into()))?; + + let pipeline_id: u64 = pipeline_id_str + .parse() + .ok() + .ok_or_else(|| EtlReplicationSlotError::InvalidIdentifierName(identifier.into()))?; + let table_oid: u32 = table_id_str + .parse() + .ok() + .ok_or_else(|| EtlReplicationSlotError::InvalidIdentifierName(identifier.into()))?; + + return Ok(EtlReplicationSlot::for_table_sync_worker( + pipeline_id, + TableId::new(table_oid), + )); + } + + Err(EtlReplicationSlotError::InvalidIdentifierName( + identifier.into(), + )) + } +} + +impl TryFrom for String { + type Error = EtlReplicationSlotError; + + fn try_from(slot: EtlReplicationSlot) -> Result { + let identifier = match slot { + EtlReplicationSlot::Apply { pipeline_id } => { + format!("{APPLY_WORKER_PREFIX}_{pipeline_id}") + } + EtlReplicationSlot::TableSync { + pipeline_id, + table_id, + } => { + format!( + "{TABLE_SYNC_WORKER_PREFIX}_{pipeline_id}_{}", + table_id.into_inner() + ) + } + }; + + if identifier.len() > MAX_IDENTIFIER_LENGTH { + return Err(EtlReplicationSlotError::InvalidIdentifierLength( + identifier, + )); + } + + Ok(identifier) + } +} + +/// Placeholder for MySQL CDC connection management. +/// +/// Note: MySQL doesn't have built-in logical replication slots like PostgreSQL. +/// This function is provided for API compatibility but doesn't perform actual operations. +pub async fn delete_pipeline_replication_slots( + _pool: &MySqlPool, + _pipeline_id: u64, + _table_ids: &[TableId], +) -> sqlx::Result<()> { + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_apply_worker_identifier_name() { + let pipeline_id = 1; + let result: String = EtlReplicationSlot::for_apply_worker(pipeline_id) + .try_into() + .unwrap(); + + assert!(result.starts_with(APPLY_WORKER_PREFIX)); + assert!(result.len() <= MAX_IDENTIFIER_LENGTH); + assert_eq!(result, "supabase_etl_apply_1"); + } + + #[test] + fn test_table_sync_worker_identifier_name() { + let pipeline_id = 1; + let table_id = TableId::new(12345); + let result: String = EtlReplicationSlot::for_table_sync_worker(pipeline_id, table_id) + .try_into() + .unwrap(); + + assert!(result.starts_with(TABLE_SYNC_WORKER_PREFIX)); + assert!(result.len() <= MAX_IDENTIFIER_LENGTH); + assert_eq!(result, "supabase_etl_table_sync_1_12345"); + } + + #[test] + fn test_parse_apply_worker_identifier() { + let identifier = "supabase_etl_apply_42"; + let slot: EtlReplicationSlot = identifier.try_into().unwrap(); + + assert_eq!(slot, EtlReplicationSlot::for_apply_worker(42)); + } + + #[test] + fn test_parse_table_sync_worker_identifier() { + let identifier = "supabase_etl_table_sync_42_67890"; + let slot: EtlReplicationSlot = identifier.try_into().unwrap(); + + assert_eq!( + slot, + EtlReplicationSlot::for_table_sync_worker(42, TableId::new(67890)) + ); + } +} diff --git a/etl-mysql/src/replication/state.rs b/etl-mysql/src/replication/state.rs new file mode 100644 index 000000000..4ee1f161b --- /dev/null +++ b/etl-mysql/src/replication/state.rs @@ -0,0 +1,20 @@ +/// Placeholder module for MySQL replication state. +/// +/// This module provides compatibility with the PostgreSQL implementation +/// but adapts the concepts for MySQL's binlog-based replication. + +pub struct ReplicationState { + /// The current binlog file name. + pub binlog_file: String, + /// The current position in the binlog file. + pub binlog_position: u64, +} + +impl ReplicationState { + pub fn new(binlog_file: String, binlog_position: u64) -> Self { + Self { + binlog_file, + binlog_position, + } + } +} diff --git a/etl-mysql/src/replication/table_mappings.rs b/etl-mysql/src/replication/table_mappings.rs new file mode 100644 index 000000000..52c8f5656 --- /dev/null +++ b/etl-mysql/src/replication/table_mappings.rs @@ -0,0 +1,69 @@ +/// Table mappings for MySQL replication. +/// +/// This module provides utilities for managing table mappings between source +/// and destination tables in the ETL pipeline. + +use crate::types::{TableId, TableName}; +use std::collections::HashMap; + +/// A mapping between source table IDs and their names. +pub struct TableMappings { + mappings: HashMap, +} + +impl TableMappings { + pub fn new() -> Self { + Self { + mappings: HashMap::new(), + } + } + + /// Adds a mapping between a table ID and table name. + pub fn add_mapping(&mut self, table_id: TableId, table_name: TableName) { + self.mappings.insert(table_id, table_name); + } + + /// Retrieves the table name for a given table ID. + pub fn get_table_name(&self, table_id: &TableId) -> Option<&TableName> { + self.mappings.get(table_id) + } + + /// Returns all table mappings. + pub fn all_mappings(&self) -> &HashMap { + &self.mappings + } + + /// Returns the number of mappings. + pub fn len(&self) -> usize { + self.mappings.len() + } + + /// Returns whether the mappings are empty. + pub fn is_empty(&self) -> bool { + self.mappings.is_empty() + } +} + +impl Default for TableMappings { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_table_mappings() { + let mut mappings = TableMappings::new(); + assert!(mappings.is_empty()); + + let table_id = TableId::new(123); + let table_name = TableName::new("test_db".to_string(), "users".to_string()); + + mappings.add_mapping(table_id, table_name.clone()); + assert_eq!(mappings.len(), 1); + assert_eq!(mappings.get_table_name(&table_id), Some(&table_name)); + } +} diff --git a/etl-mysql/src/replication/worker.rs b/etl-mysql/src/replication/worker.rs new file mode 100644 index 000000000..5396b6cf3 --- /dev/null +++ b/etl-mysql/src/replication/worker.rs @@ -0,0 +1,97 @@ +/// Worker management for MySQL replication. +/// +/// This module provides utilities for managing replication workers in the ETL pipeline. +/// Unlike PostgreSQL's logical replication workers, MySQL workers are adapted for +/// binlog-based change data capture. + +use crate::types::TableId; + +/// Represents a worker type in the replication pipeline. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum WorkerType { + /// Apply worker that processes binlog events. + Apply, + /// Table sync worker that performs initial table synchronization. + TableSync { table_id: TableId }, +} + +/// Represents the state of a replication worker. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum WorkerState { + /// Worker is initializing. + Initializing, + /// Worker is running normally. + Running, + /// Worker has stopped. + Stopped, + /// Worker encountered an error. + Errored { reason: String }, +} + +/// Configuration for a replication worker. +#[derive(Debug, Clone)] +pub struct WorkerConfig { + /// The pipeline ID this worker belongs to. + pub pipeline_id: u64, + /// The type of worker. + pub worker_type: WorkerType, + /// The current state of the worker. + pub state: WorkerState, +} + +impl WorkerConfig { + pub fn new(pipeline_id: u64, worker_type: WorkerType) -> Self { + Self { + pipeline_id, + worker_type, + state: WorkerState::Initializing, + } + } + + /// Updates the worker state. + pub fn set_state(&mut self, state: WorkerState) { + self.state = state; + } + + /// Returns whether the worker is currently running. + pub fn is_running(&self) -> bool { + matches!(self.state, WorkerState::Running) + } + + /// Returns whether the worker has encountered an error. + pub fn is_errored(&self) -> bool { + matches!(self.state, WorkerState::Errored { .. }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_worker_config() { + let mut config = WorkerConfig::new(1, WorkerType::Apply); + assert_eq!(config.state, WorkerState::Initializing); + assert!(!config.is_running()); + + config.set_state(WorkerState::Running); + assert!(config.is_running()); + assert!(!config.is_errored()); + + config.set_state(WorkerState::Errored { + reason: "Test error".to_string(), + }); + assert!(config.is_errored()); + assert!(!config.is_running()); + } + + #[test] + fn test_worker_types() { + let apply = WorkerType::Apply; + let table_sync = WorkerType::TableSync { + table_id: TableId::new(123), + }; + + assert_ne!(apply, table_sync); + } +} diff --git a/etl-mysql/src/sqlx/mod.rs b/etl-mysql/src/sqlx/mod.rs new file mode 100644 index 000000000..b1621af88 --- /dev/null +++ b/etl-mysql/src/sqlx/mod.rs @@ -0,0 +1,2 @@ +#[cfg(feature = "test-utils")] +pub mod test_utils; diff --git a/etl-mysql/src/sqlx/test_utils.rs b/etl-mysql/src/sqlx/test_utils.rs new file mode 100644 index 000000000..abc34b94c --- /dev/null +++ b/etl-mysql/src/sqlx/test_utils.rs @@ -0,0 +1,60 @@ +use sqlx::{Connection, Executor, MySqlConnection, MySqlPool}; + +use crate::replication::db::MySqlConnectionConfig; + +/// Creates a new MySQL database and returns a connection pool. +/// +/// Connects to MySQL server, creates a new database, and returns a [`MySqlPool`] +/// connected to the newly created database. +/// +/// # Panics +/// Panics if connection or database creation fails. +pub async fn create_mysql_database(config: &MySqlConnectionConfig) -> MySqlPool { + let connection_url = format!( + "mysql://{}:{}@{}:{}", + config.username, + config.password.as_ref().unwrap_or(&String::new()), + config.host, + config.port + ); + + let mut connection = MySqlConnection::connect(&connection_url) + .await + .expect("Failed to connect to MySQL"); + + connection + .execute(&*format!("CREATE DATABASE `{}`", config.name)) + .await + .expect("Failed to create database"); + + let db_url = format!("{}/{}", connection_url, config.name); + MySqlPool::connect(&db_url) + .await + .expect("Failed to connect to MySQL database") +} + +/// Drops a MySQL database and terminates all connections. +/// +/// Connects to MySQL server and drops the target database if it exists. +/// Used for test cleanup. +/// +/// # Panics +/// Panics if any database operation fails. +pub async fn drop_mysql_database(config: &MySqlConnectionConfig) { + let connection_url = format!( + "mysql://{}:{}@{}:{}", + config.username, + config.password.as_ref().unwrap_or(&String::new()), + config.host, + config.port + ); + + let mut connection = MySqlConnection::connect(&connection_url) + .await + .expect("Failed to connect to MySQL"); + + connection + .execute(&*format!("DROP DATABASE IF EXISTS `{}`", config.name)) + .await + .expect("Failed to destroy database"); +} diff --git a/etl-mysql/src/tokio/mod.rs b/etl-mysql/src/tokio/mod.rs new file mode 100644 index 000000000..b1621af88 --- /dev/null +++ b/etl-mysql/src/tokio/mod.rs @@ -0,0 +1,2 @@ +#[cfg(feature = "test-utils")] +pub mod test_utils; diff --git a/etl-mysql/src/tokio/test_utils.rs b/etl-mysql/src/tokio/test_utils.rs new file mode 100644 index 000000000..0a7bbf8ad --- /dev/null +++ b/etl-mysql/src/tokio/test_utils.rs @@ -0,0 +1,408 @@ +use std::num::NonZeroI32; + +use tokio::runtime::Handle; + +use crate::replication::db::MySqlConnectionConfig; +use crate::replication::extract_server_version; +use crate::types::{ColumnSchema, TableId, TableName}; + +/// Table modification operations for ALTER TABLE statements. +pub enum TableModification<'a> { + /// Add a new column with specified name and data type. + AddColumn { name: &'a str, data_type: &'a str }, + /// Drop an existing column by name. + DropColumn { name: &'a str }, + /// Alter an existing column with the specified alteration. + AlterColumn { + name: &'a str, + alteration: &'a str, + }, +} + +/// MySQL database wrapper for testing operations. +/// +/// Provides a unified interface for database operations with automatic cleanup functionality. +pub struct MySqlDatabase { + pub config: MySqlConnectionConfig, + pub pool: Option, + server_version: Option, + destroy_on_drop: bool, +} + +impl MySqlDatabase { + pub fn server_version(&self) -> Option { + self.server_version + } + + /// Creates a new table with the given name and column definitions. + /// + /// Optionally adds a primary key column named `id` of type `BIGINT AUTO_INCREMENT`. + /// Returns a computed table ID based on the table name. + pub async fn create_table( + &self, + table_name: TableName, + add_pk_col: bool, + columns: &[(&str, &str)], + ) -> Result { + let columns_str = columns + .iter() + .map(|(name, typ)| format!("`{name}` {typ}")) + .collect::>() + .join(", "); + + let pk_col = if add_pk_col { + "`id` BIGINT AUTO_INCREMENT PRIMARY KEY, " + } else { + "" + }; + + let create_table_query = format!( + "CREATE TABLE {} ({pk_col}{columns_str})", + table_name.as_quoted_identifier(), + ); + + sqlx::query(&create_table_query) + .execute(self.pool.as_ref().unwrap()) + .await?; + + use crate::replication::schema::compute_table_id; + let table_id = compute_table_id(&table_name); + + Ok(table_id) + } + + /// Modifies an existing table using ALTER TABLE operations. + /// + /// Applies the specified modifications (add/drop/alter columns) to the table. + pub async fn alter_table( + &self, + table_name: TableName, + modifications: &[TableModification<'_>], + ) -> Result<(), sqlx::Error> { + for modification in modifications { + let alter_clause = match modification { + TableModification::AddColumn { name, data_type } => { + format!("ADD COLUMN `{name}` {data_type}") + } + TableModification::DropColumn { name } => { + format!("DROP COLUMN `{name}`") + } + TableModification::AlterColumn { name, alteration } => { + format!("MODIFY COLUMN `{name}` {alteration}") + } + }; + + let alter_table_query = format!( + "ALTER TABLE {} {}", + table_name.as_quoted_identifier(), + alter_clause + ); + + sqlx::query(&alter_table_query) + .execute(self.pool.as_ref().unwrap()) + .await?; + } + + Ok(()) + } + + /// Inserts a single row of values into the specified table. + /// + /// Takes column names and generates appropriate parameterized placeholders + /// for the INSERT statement. + pub async fn insert_values( + &self, + table_name: TableName, + columns: &[&str], + values: &[T], + ) -> Result + where + T: Send + Sync, + for<'q> &'q T: sqlx::Encode<'q, sqlx::MySql> + sqlx::Type, + { + let columns_str = columns + .iter() + .map(|c| format!("`{c}`")) + .collect::>() + .join(", "); + let placeholders: Vec = (0..values.len()).map(|_| "?".to_string()).collect(); + let placeholders_str = placeholders.join(", "); + + let insert_query = format!( + "INSERT INTO {} ({}) VALUES ({})", + table_name.as_quoted_identifier(), + columns_str, + placeholders_str + ); + + let mut query = sqlx::query(&insert_query); + for value in values { + query = query.bind(value); + } + + let result = query.execute(self.pool.as_ref().unwrap()).await?; + + Ok(result.rows_affected()) + } + + /// Updates all rows in the table with new values. + /// + /// Sets the specified columns to new values across all rows in the table. + /// Returns the number of rows affected. + pub async fn update_values( + &self, + table_name: TableName, + columns: &[&str], + values: &[T], + ) -> Result + where + T: Send + Sync, + for<'q> &'q T: sqlx::Encode<'q, sqlx::MySql> + sqlx::Type, + { + let set_clauses: Vec = columns + .iter() + .map(|col| format!("`{col}` = ?")) + .collect(); + let set_clause = set_clauses.join(", "); + + let update_query = format!( + "UPDATE {} SET {}", + table_name.as_quoted_identifier(), + set_clause + ); + + let mut query = sqlx::query(&update_query); + for value in values { + query = query.bind(value); + } + + let result = query.execute(self.pool.as_ref().unwrap()).await?; + + Ok(result.rows_affected()) + } + + /// Deletes rows from the table based on column conditions. + /// + /// Constructs a DELETE statement with WHERE clause using the provided + /// column names, expressions, and logical operator. + pub async fn delete_values( + &self, + table_name: TableName, + columns: &[&str], + expressions: &[&str], + operator: &str, + ) -> Result { + let delete_clauses: Vec = columns + .iter() + .zip(expressions.iter()) + .map(|(col, val)| format!("`{col}` = {val}")) + .collect(); + let delete_clauses = delete_clauses.join(operator); + + let delete_query = format!( + "DELETE FROM {} WHERE {}", + table_name.as_quoted_identifier(), + delete_clauses + ); + + let result = sqlx::query(&delete_query) + .execute(self.pool.as_ref().unwrap()) + .await?; + + Ok(result.rows_affected()) + } + + /// Queries values from a single column with optional WHERE clause. + /// + /// Returns all values from the specified column, optionally filtered + /// by the provided WHERE condition. + pub async fn query_table( + &self, + table_name: &TableName, + column: &str, + where_clause: Option<&str>, + ) -> Result, sqlx::Error> + where + T: Send + Unpin, + for<'r> T: sqlx::Decode<'r, sqlx::MySql> + sqlx::Type, + { + let where_str = where_clause.map_or(String::new(), |w| format!("WHERE {w}")); + let query = format!( + "SELECT `{}` FROM {} {}", + column, + table_name.as_quoted_identifier(), + where_str + ); + + let rows: Vec<(T,)> = sqlx::query_as(&query) + .fetch_all(self.pool.as_ref().unwrap()) + .await?; + + Ok(rows.into_iter().map(|row| row.0).collect()) + } + + /// Truncates all data from the specified table. + /// + /// Removes all rows from the table while preserving the table structure. + pub async fn truncate_table(&self, table_name: TableName) -> Result<(), sqlx::Error> { + let query = format!("TRUNCATE TABLE {}", table_name.as_quoted_identifier()); + + sqlx::query(&query) + .execute(self.pool.as_ref().unwrap()) + .await?; + + Ok(()) + } + + /// Executes arbitrary SQL on the database. + pub async fn run_sql(&self, sql: &str) -> Result { + let result = sqlx::query(sql) + .execute(self.pool.as_ref().unwrap()) + .await?; + + Ok(result.rows_affected()) + } + + /// Creates a new test database with automatic cleanup. + /// + /// Creates a new MySQL database and establishes a connection pool. + /// The database will be dropped automatically when this instance is dropped. + pub async fn new(config: MySqlConnectionConfig) -> Self { + let (pool, server_version) = create_mysql_database(&config).await; + + Self { + config, + pool: Some(pool), + server_version, + destroy_on_drop: true, + } + } + + /// Creates a duplicate connection to the same database. + /// + /// Establishes an additional connection pool to the existing database + /// without creating a new database. + pub async fn duplicate(&self) -> Self { + let config = self.config.clone(); + let (pool, server_version) = connect_to_mysql_database(&config).await; + + Self { + config, + pool: Some(pool), + server_version, + destroy_on_drop: false, + } + } +} + +impl Drop for MySqlDatabase { + fn drop(&mut self) { + if self.destroy_on_drop { + tokio::task::block_in_place(move || { + Handle::current().block_on(async move { drop_mysql_database(&self.config).await }); + }); + } + } +} + +/// Returns the default ID column schema for test tables. +/// +/// Creates a [`ColumnSchema`] for a non-nullable, primary key column named "id" +/// of type `BIGINT` that is added by default to tables created by [`MySqlDatabase`]. +pub fn id_column_schema() -> ColumnSchema { + ColumnSchema { + name: "id".to_string(), + typ: "BIGINT".to_string(), + modifier: -1, + nullable: false, + primary: true, + } +} + +/// Creates a new MySQL database and returns a connected pool. +/// +/// Establishes connection to MySQL server, creates a new database, +/// and returns a [`MySqlPool`] connected to the newly created database. +/// +/// # Panics +/// Panics if connection or database creation fails. +pub async fn create_mysql_database( + config: &MySqlConnectionConfig, +) -> (sqlx::MySqlPool, Option) { + let connection_url = format!( + "mysql://{}:{}@{}:{}", + config.username, + config.password.as_ref().unwrap_or(&String::new()), + config.host, + config.port + ); + + let pool = sqlx::MySqlPool::connect(&connection_url) + .await + .expect("Failed to connect to MySQL"); + + sqlx::query(&format!("CREATE DATABASE `{}`", config.name)) + .execute(&pool) + .await + .expect("Failed to create database"); + + let (db_pool, server_version) = connect_to_mysql_database(config).await; + + (db_pool, server_version) +} + +/// Connects to an existing MySQL database. +/// +/// Establishes a connection pool to the database specified in the configuration. +/// Assumes the database already exists. +pub async fn connect_to_mysql_database( + config: &MySqlConnectionConfig, +) -> (sqlx::MySqlPool, Option) { + let connection_url = format!( + "mysql://{}:{}@{}:{}/{}", + config.username, + config.password.as_ref().unwrap_or(&String::new()), + config.host, + config.port, + config.name + ); + + let pool = sqlx::MySqlPool::connect(&connection_url) + .await + .expect("Failed to connect to MySQL database"); + + let version_row: (String,) = sqlx::query_as("SELECT VERSION()") + .fetch_one(&pool) + .await + .expect("Failed to get server version"); + + let server_version = extract_server_version(version_row.0); + + (pool, server_version) +} + +/// Drops a MySQL database and cleans up all resources. +/// +/// Drops the database and removes all resources. Used for thorough cleanup +/// of test databases. +/// +/// # Panics +/// Panics if any database operation fails. +pub async fn drop_mysql_database(config: &MySqlConnectionConfig) { + let connection_url = format!( + "mysql://{}:{}@{}:{}", + config.username, + config.password.as_ref().unwrap_or(&String::new()), + config.host, + config.port + ); + + let pool = sqlx::MySqlPool::connect(&connection_url) + .await + .expect("Failed to connect to MySQL"); + + sqlx::query(&format!("DROP DATABASE IF EXISTS `{}`", config.name)) + .execute(&pool) + .await + .expect("Failed to destroy database"); +} diff --git a/etl-mysql/src/types/mod.rs b/etl-mysql/src/types/mod.rs new file mode 100644 index 000000000..438e6b4c7 --- /dev/null +++ b/etl-mysql/src/types/mod.rs @@ -0,0 +1,7 @@ +mod schema; +mod time; +mod utils; + +pub use schema::*; +pub use time::*; +pub use utils::*; diff --git a/etl-mysql/src/types/schema.rs b/etl-mysql/src/types/schema.rs new file mode 100644 index 000000000..529cbce8e --- /dev/null +++ b/etl-mysql/src/types/schema.rs @@ -0,0 +1,196 @@ +use std::cmp::Ordering; +use std::fmt; +use std::str::FromStr; + +/// An object identifier in MySQL. +type Oid = u32; + +/// A fully qualified MySQL table name consisting of a schema (database) and table name. +/// +/// This type represents a table identifier in MySQL, which requires both a schema name +/// (database name) and a table name. It provides methods for formatting the name in different contexts. +#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord)] +pub struct TableName { + /// The schema (database) name containing the table. + pub schema: String, + /// The name of the table within the schema. + pub name: String, +} + +impl TableName { + pub fn new(schema: String, name: String) -> TableName { + Self { schema, name } + } + + /// Returns the table name as a properly quoted MySQL identifier. + /// + /// This method ensures the schema and table names are properly escaped according to + /// MySQL identifier quoting rules using backticks. + pub fn as_quoted_identifier(&self) -> String { + format!("`{}`.`{}`", self.schema, self.name) + } +} + +impl fmt::Display for TableName { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_fmt(format_args!("{0}.{1}", self.schema, self.name)) + } +} + +/// A type alias for MySQL type modifiers. +/// +/// Type modifiers in MySQL are used to specify additional type-specific attributes, +/// such as length for varchar or precision for numeric types. +type TypeModifier = i32; + +/// Represents the schema of a single column in a MySQL table. +/// +/// This type contains all metadata about a column including its name, data type, +/// type modifier, nullability, and whether it's part of the primary key. +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct ColumnSchema { + /// The name of the column. + pub name: String, + /// The MySQL data type of the column as a string. + pub typ: String, + /// Type-specific modifier value (e.g., length for varchar). + pub modifier: TypeModifier, + /// Whether the column can contain NULL values. + pub nullable: bool, + /// Whether the column is part of the table's primary key. + pub primary: bool, +} + +impl ColumnSchema { + pub fn new( + name: String, + typ: String, + modifier: TypeModifier, + nullable: bool, + primary: bool, + ) -> ColumnSchema { + Self { + name, + typ, + modifier, + nullable, + primary, + } + } + + /// Compares two [`ColumnSchema`] instances, excluding the `nullable` field. + /// + /// Return `true` if all fields except `nullable` are equal, `false` otherwise. + /// + /// This method is used for comparing table schemas loaded via the initial table sync and the + /// relation messages received via CDC. The reason for skipping the `nullable` field is that + /// unfortunately MySQL binlog doesn't always propagate nullable information of a column. + pub fn partial_eq(&self, other: &ColumnSchema) -> bool { + self.name == other.name && self.typ == other.typ && self.modifier == other.modifier + } +} + +/// A type-safe wrapper for MySQL table identifiers. +/// +/// MySQL tables are uniquely identified by their schema (database) and table name combination. +/// This newtype provides type safety by preventing accidental use of raw values. +#[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Hash)] +pub struct TableId(pub Oid); + +impl TableId { + /// Creates a new [`TableId`] from an [`Oid`]. + pub fn new(oid: Oid) -> Self { + Self(oid) + } + + /// Returns the underlying [`Oid`] value. + pub fn into_inner(self) -> Oid { + self.0 + } +} + +impl From for TableId { + fn from(oid: Oid) -> Self { + Self(oid) + } +} + +impl From for Oid { + fn from(table_id: TableId) -> Self { + table_id.0 + } +} + +impl fmt::Display for TableId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl FromStr for TableId { + type Err = ::Err; + + fn from_str(s: &str) -> Result { + s.parse::().map(TableId::new) + } +} + +/// Represents the complete schema of a MySQL table. +/// +/// This type contains all metadata about a table including its name, ID, +/// and the schemas of all its columns. +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct TableSchema { + /// The identifier of the table. + pub id: TableId, + /// The fully qualified name of the table. + pub name: TableName, + /// The schemas of all columns in the table. + pub column_schemas: Vec, +} + +impl TableSchema { + pub fn new(id: TableId, name: TableName, column_schemas: Vec) -> Self { + Self { + id, + name, + column_schemas, + } + } + + /// Returns the number of columns in the table. + pub fn num_columns(&self) -> usize { + self.column_schemas.len() + } + + /// Compares two [`TableSchema`] instances excluding nullable fields. + /// + /// This method checks if the table IDs match and all column schemas match + /// using partial equality (excluding nullable fields). + pub fn partial_eq(&self, other: &TableSchema) -> bool { + if self.id != other.id { + return false; + } + + if self.column_schemas.len() != other.column_schemas.len() { + return false; + } + + self.column_schemas + .iter() + .zip(other.column_schemas.iter()) + .all(|(a, b)| a.partial_eq(b)) + } +} + +impl PartialOrd for TableSchema { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.id.cmp(&other.id)) + } +} + +impl Ord for TableSchema { + fn cmp(&self, other: &Self) -> Ordering { + self.id.cmp(&other.id) + } +} diff --git a/etl-mysql/src/types/time.rs b/etl-mysql/src/types/time.rs new file mode 100644 index 000000000..db1b84bd9 --- /dev/null +++ b/etl-mysql/src/types/time.rs @@ -0,0 +1,24 @@ +use std::sync::LazyLock; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +/// MySQL date format string for parsing dates in YYYY-MM-DD format. +pub const DATE_FORMAT: &str = "%Y-%m-%d"; + +/// MySQL time format string for parsing times with optional fractional seconds. +pub const TIME_FORMAT: &str = "%H:%M:%S%.f"; + +/// MySQL datetime format string for parsing datetimes with optional fractional seconds. +pub const DATETIME_FORMAT: &str = "%Y-%m-%d %H:%M:%S%.f"; + +/// MySQL timestamp format string with timezone offset in +HHMM format. +pub const TIMESTAMP_FORMAT_HHMM: &str = "%Y-%m-%d %H:%M:%S%.f%#z"; + +/// MySQL timestamp format string with timezone offset in +HH:MM format. +pub const TIMESTAMP_FORMAT_HH_MM: &str = "%Y-%m-%d %H:%M:%S%.f%:z"; + +/// Number of seconds between Unix epoch (1970-01-01) and MySQL epoch (1970-01-01). +const MYSQL_EPOCH_OFFSET_SECONDS: u64 = 0; + +/// MySQL epoch (1970-01-01 00:00:00 UTC) for timestamp calculations. +pub static MYSQL_EPOCH: LazyLock = + LazyLock::new(|| UNIX_EPOCH + Duration::from_secs(MYSQL_EPOCH_OFFSET_SECONDS)); diff --git a/etl-mysql/src/types/utils.rs b/etl-mysql/src/types/utils.rs new file mode 100644 index 000000000..5dd2002c0 --- /dev/null +++ b/etl-mysql/src/types/utils.rs @@ -0,0 +1,57 @@ +/// Returns whether the MySQL type is an array-like type. +/// +/// Note: MySQL doesn't have native array types like PostgreSQL, but JSON arrays +/// can be used for similar functionality. +pub fn is_array_type(typ: &str) -> bool { + matches!(typ, "JSON" | "json") +} + +/// Creates a hex-encoded sequence number from binlog positions to ensure correct event ordering. +/// +/// Creates a hex-encoded sequence number that ensures events are processed in the correct order +/// even when they have the same system time. The format is compatible with BigQuery's +/// `_CHANGE_SEQUENCE_NUMBER` column requirements. +/// +/// The rationale for using the binlog position is that BigQuery will preserve the highest sequence +/// number in case of equal primary key, which is what we want since in case of updates, we want the +/// latest update in MySQL order to be the winner. We have first the `commit_pos` in the key +/// so that BigQuery can first order operations based on the position at which the transaction committed +/// and if two operations belong to the same transaction (meaning they have the same position), the +/// `start_pos` will be used. We first order by `commit_pos` to preserve the order in which operations +/// are received by the pipeline since transactions are ordered by commit time and not interleaved. +pub fn generate_sequence_number(start_pos: u64, commit_pos: u64) -> String { + format!("{commit_pos:016x}/{start_pos:016x}") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_is_array_type() { + assert!(is_array_type("JSON")); + assert!(is_array_type("json")); + assert!(!is_array_type("VARCHAR")); + assert!(!is_array_type("INT")); + assert!(!is_array_type("TEXT")); + } + + #[test] + fn test_generate_sequence_number() { + let start = 1000u64; + let commit = 2000u64; + let seq = generate_sequence_number(start, commit); + assert_eq!(seq, "00000000000007d0/00000000000003e8"); + } + + #[test] + fn test_generate_sequence_number_ordering() { + let seq1 = generate_sequence_number(100, 1000); + let seq2 = generate_sequence_number(200, 1000); + let seq3 = generate_sequence_number(100, 2000); + + assert!(seq2 > seq1); + assert!(seq3 > seq1); + assert!(seq3 > seq2); + } +} diff --git a/etl-mysql/src/version.rs b/etl-mysql/src/version.rs new file mode 100644 index 000000000..41100f069 --- /dev/null +++ b/etl-mysql/src/version.rs @@ -0,0 +1,110 @@ +//! MySQL version constants and utilities. +//! +//! This module provides version constants for supported MySQL versions and macros +//! for ergonomic version comparison. Version numbers follow MySQL's internal format: +//! `MAJOR * 10000 + MINOR * 100 + PATCH`. +//! +//! # Supported Versions +//! +//! ETL officially supports MySQL versions 5.7, 8.0, 8.1, 8.2, 8.3, and 8.4. + +use std::num::NonZeroI32; + +pub const MYSQL_5_7: i32 = 50700; +pub const MYSQL_8_0: i32 = 80000; +pub const MYSQL_8_1: i32 = 80100; +pub const MYSQL_8_2: i32 = 80200; +pub const MYSQL_8_3: i32 = 80300; +pub const MYSQL_8_4: i32 = 80400; + +/// Returns [`true`] if the server version meets or exceeds the required version. +/// +/// This function handles [`None`] server versions by returning [`false`], making it +/// safe to use in contexts where version information might not be available. +pub fn meets_version(server_version: Option, required_version: i32) -> bool { + server_version.is_some_and(|v| v.get() >= required_version) +} + +/// Checks if the server version meets or exceeds the required version. +/// +/// This macro provides ergonomic version checking by accepting various input types +/// for the server version (Option, NonZeroI32, i32) and comparing against +/// version constants. +#[macro_export] +macro_rules! requires_version { + ($server_version:expr, $required:expr) => { + $crate::version::meets_version($server_version, $required) + }; +} + +/// Checks if the server version is below the specified version. +/// +/// This macro is useful for conditional logic when features are not available +/// in older MySQL versions. +#[macro_export] +macro_rules! below_version { + ($server_version:expr, $required:expr) => { + !$crate::version::meets_version($server_version, $required) + }; +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_version_constants() { + assert_eq!(MYSQL_5_7, 50700); + assert_eq!(MYSQL_8_0, 80000); + assert_eq!(MYSQL_8_1, 80100); + assert_eq!(MYSQL_8_2, 80200); + assert_eq!(MYSQL_8_3, 80300); + assert_eq!(MYSQL_8_4, 80400); + } + + #[test] + fn test_meets_version_with_some() { + let version = NonZeroI32::new(80035); + assert!(meets_version(version, MYSQL_5_7)); + assert!(meets_version(version, MYSQL_8_0)); + assert!(!meets_version(version, MYSQL_8_1)); + assert!(!meets_version(version, MYSQL_8_2)); + } + + #[test] + fn test_meets_version_with_none() { + assert!(!meets_version(None, MYSQL_5_7)); + assert!(!meets_version(None, MYSQL_8_0)); + assert!(!meets_version(None, MYSQL_8_1)); + } + + #[test] + fn test_meets_version_exact_match() { + let version = NonZeroI32::new(MYSQL_8_0); + assert!(meets_version(version, MYSQL_8_0)); + } + + #[test] + fn test_requires_version_macro() { + let version = NonZeroI32::new(80200); + assert!(requires_version!(version, MYSQL_5_7)); + assert!(requires_version!(version, MYSQL_8_0)); + assert!(requires_version!(version, MYSQL_8_1)); + assert!(requires_version!(version, MYSQL_8_2)); + assert!(!requires_version!(version, MYSQL_8_3)); + } + + #[test] + fn test_below_version_macro() { + let version = NonZeroI32::new(50730); + assert!(!below_version!(version, MYSQL_5_7)); + assert!(below_version!(version, MYSQL_8_0)); + assert!(below_version!(version, MYSQL_8_1)); + } + + #[test] + fn test_requires_version_with_none() { + let version: Option = None; + assert!(!requires_version!(version, MYSQL_8_0)); + } +} diff --git a/etl/Cargo.toml b/etl/Cargo.toml index ad0203254..899f160aa 100644 --- a/etl/Cargo.toml +++ b/etl/Cargo.toml @@ -8,12 +8,13 @@ repository.workspace = true homepage.workspace = true [features] -test-utils = ["etl-postgres/test-utils"] +test-utils = ["etl-postgres/test-utils", "etl-mysql/test-utils"] failpoints = ["fail/failpoints"] default = [] [dependencies] etl-config = { workspace = true } +etl-mysql = { workspace = true, features = ["tokio", "replication"] } etl-postgres = { workspace = true, features = ["tokio", "replication"] } byteorder = { workspace = true } @@ -44,6 +45,11 @@ uuid = { workspace = true, features = ["v4"] } x509-cert = { workspace = true, default-features = false } [dev-dependencies] +etl-mysql = { workspace = true, features = [ + "tokio", + "replication", + "test-utils", +] } etl-postgres = { workspace = true, features = [ "tokio", "replication", diff --git a/etl/tests/mysql_store.rs b/etl/tests/mysql_store.rs new file mode 100644 index 000000000..c665a09e2 --- /dev/null +++ b/etl/tests/mysql_store.rs @@ -0,0 +1,303 @@ +#![cfg(feature = "test-utils")] + +use etl_mysql::types::{ColumnSchema, TableId, TableName, TableSchema}; +use etl_mysql::version::{MYSQL_8_0, meets_version}; +use etl_telemetry::tracing::init_test_tracing; +use std::num::NonZeroI32; + +fn create_sample_table_schema() -> TableSchema { + let table_id = TableId::new(12345); + let table_name = TableName::new("test_db".to_string(), "test_table".to_string()); + let columns = vec![ + ColumnSchema::new("id".to_string(), "INT".to_string(), -1, false, true), + ColumnSchema::new("name".to_string(), "VARCHAR".to_string(), 255, true, false), + ColumnSchema::new( + "created_at".to_string(), + "TIMESTAMP".to_string(), + -1, + false, + false, + ), + ]; + + TableSchema::new(table_id, table_name, columns) +} + +fn create_another_table_schema() -> TableSchema { + let table_id = TableId::new(67890); + let table_name = TableName::new("test_db".to_string(), "another_table".to_string()); + let columns = vec![ + ColumnSchema::new("id".to_string(), "BIGINT".to_string(), -1, false, true), + ColumnSchema::new( + "description".to_string(), + "TEXT".to_string(), + -1, + true, + false, + ), + ]; + + TableSchema::new(table_id, table_name, columns) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_table_schema_creation() { + init_test_tracing(); + + let table_schema = create_sample_table_schema(); + + assert_eq!(table_schema.id.into_inner(), 12345); + assert_eq!(table_schema.name.schema, "test_db"); + assert_eq!(table_schema.name.name, "test_table"); + assert_eq!(table_schema.num_columns(), 3); + + let id_column = table_schema + .column_schemas + .iter() + .find(|c| c.name == "id"); + assert!(id_column.is_some()); + let id_column = id_column.unwrap(); + assert_eq!(id_column.typ, "INT"); + assert!(id_column.primary); + assert!(!id_column.nullable); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_table_name_formatting() { + init_test_tracing(); + + let table_name = TableName::new("my_database".to_string(), "users".to_string()); + + assert_eq!(table_name.to_string(), "my_database.users"); + assert_eq!(table_name.as_quoted_identifier(), "`my_database`.`users`"); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_table_id_operations() { + init_test_tracing(); + + let id1 = TableId::new(100); + let id2 = TableId::new(200); + let id3 = TableId::new(100); + + assert_eq!(id1, id3); + assert_ne!(id1, id2); + assert!(id1 < id2); + + assert_eq!(id1.into_inner(), 100); + assert_eq!(id1.to_string(), "100"); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_column_schema_comparison() { + init_test_tracing(); + + let col1 = ColumnSchema::new("id".to_string(), "INT".to_string(), -1, false, true); + let col2 = ColumnSchema::new("id".to_string(), "INT".to_string(), -1, true, true); + + assert_ne!(col1, col2); + + assert!(col1.partial_eq(&col2)); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_table_schema_comparison() { + init_test_tracing(); + + let schema1 = create_sample_table_schema(); + let mut schema2 = create_sample_table_schema(); + + assert_eq!(schema1, schema2); + + schema2.column_schemas[0].nullable = true; + assert_ne!(schema1, schema2); + + assert!(schema1.partial_eq(&schema2)); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_multiple_tables_ordering() { + init_test_tracing(); + + let schema1 = create_sample_table_schema(); + let schema2 = create_another_table_schema(); + + assert!(schema1.id < schema2.id); + assert!(schema1 < schema2); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_mysql_version_checking() { + init_test_tracing(); + + let version_8_0 = NonZeroI32::new(80035); + assert!(meets_version(version_8_0, MYSQL_8_0)); + + let version_5_7 = NonZeroI32::new(50744); + assert!(!meets_version(version_5_7, MYSQL_8_0)); + + assert!(!meets_version(None, MYSQL_8_0)); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_replication_slots() { + init_test_tracing(); + + use etl_mysql::replication::slots::{EtlReplicationSlot, APPLY_WORKER_PREFIX}; + + let pipeline_id = 1; + let slot = EtlReplicationSlot::for_apply_worker(pipeline_id); + + let slot_name: String = slot.try_into().unwrap(); + assert!(slot_name.starts_with(APPLY_WORKER_PREFIX)); + assert_eq!(slot_name, "supabase_etl_apply_1"); + + let parsed_slot: EtlReplicationSlot = slot_name.as_str().try_into().unwrap(); + assert_eq!(parsed_slot, EtlReplicationSlot::for_apply_worker(1)); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_table_sync_slots() { + init_test_tracing(); + + use etl_mysql::replication::slots::{EtlReplicationSlot, TABLE_SYNC_WORKER_PREFIX}; + + let pipeline_id = 1; + let table_id = TableId::new(12345); + let slot = EtlReplicationSlot::for_table_sync_worker(pipeline_id, table_id); + + let slot_name: String = slot.try_into().unwrap(); + assert!(slot_name.starts_with(TABLE_SYNC_WORKER_PREFIX)); + assert_eq!(slot_name, "supabase_etl_table_sync_1_12345"); + + let parsed_slot: EtlReplicationSlot = slot_name.as_str().try_into().unwrap(); + assert_eq!( + parsed_slot, + EtlReplicationSlot::for_table_sync_worker(1, TableId::new(12345)) + ); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_table_mappings() { + init_test_tracing(); + + use etl_mysql::replication::table_mappings::TableMappings; + + let mut mappings = TableMappings::new(); + assert!(mappings.is_empty()); + + let table_id = TableId::new(123); + let table_name = TableName::new("test_db".to_string(), "users".to_string()); + + mappings.add_mapping(table_id, table_name.clone()); + assert_eq!(mappings.len(), 1); + assert_eq!(mappings.get_table_name(&table_id), Some(&table_name)); + + let all_mappings = mappings.all_mappings(); + assert_eq!(all_mappings.len(), 1); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_worker_config() { + init_test_tracing(); + + use etl_mysql::replication::worker::{WorkerConfig, WorkerState, WorkerType}; + + let mut config = WorkerConfig::new(1, WorkerType::Apply); + assert_eq!(config.state, WorkerState::Initializing); + assert!(!config.is_running()); + assert!(!config.is_errored()); + + config.set_state(WorkerState::Running); + assert!(config.is_running()); + assert!(!config.is_errored()); + + config.set_state(WorkerState::Errored { + reason: "Test error".to_string(), + }); + assert!(config.is_errored()); + assert!(!config.is_running()); + + config.set_state(WorkerState::Stopped); + assert!(!config.is_running()); + assert!(!config.is_errored()); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_binlog_position() { + init_test_tracing(); + + use etl_mysql::replication::lag::BinlogPosition; + + let pos1 = BinlogPosition::new(1, 1000); + let pos2 = BinlogPosition::new(1, 2000); + + assert_eq!(pos1.lag_bytes(&pos2), 1000); + + let pos3 = BinlogPosition::new(1, 1000); + let pos4 = BinlogPosition::new(2, 500); + assert!(pos3.lag_bytes(&pos4) > 0); + + let pos5 = BinlogPosition::new(2, 2000); + let pos6 = BinlogPosition::new(1, 1000); + assert!(pos5.lag_bytes(&pos6) < 0); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_sequence_number_generation() { + init_test_tracing(); + + use etl_mysql::types::generate_sequence_number; + + let start = 1000u64; + let commit = 2000u64; + let seq = generate_sequence_number(start, commit); + assert_eq!(seq, "00000000000007d0/00000000000003e8"); + + let seq1 = generate_sequence_number(100, 1000); + let seq2 = generate_sequence_number(200, 1000); + let seq3 = generate_sequence_number(100, 2000); + + assert!(seq2 > seq1); + assert!(seq3 > seq1); + assert!(seq3 > seq2); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_replication_state() { + init_test_tracing(); + + use etl_mysql::replication::state::ReplicationState; + + let state = ReplicationState::new("mysql-bin.000123".to_string(), 4567); + assert_eq!(state.binlog_file, "mysql-bin.000123"); + assert_eq!(state.binlog_position, 4567); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_server_version_extraction() { + init_test_tracing(); + + use etl_mysql::replication::extract_server_version; + + assert_eq!( + extract_server_version("8.0.35"), + NonZeroI32::new(80035) + ); + assert_eq!( + extract_server_version("5.7.44"), + NonZeroI32::new(50744) + ); + assert_eq!( + extract_server_version("8.0.35-log"), + NonZeroI32::new(80035) + ); + assert_eq!( + extract_server_version("5.7.44-0ubuntu0.18.04.1"), + NonZeroI32::new(50744) + ); + + assert_eq!(extract_server_version(""), None); + assert_eq!(extract_server_version("invalid"), None); + assert_eq!(extract_server_version("0.0.0"), None); +}