diff --git a/etl-destinations/src/ducklake/client.rs b/etl-destinations/src/ducklake/client.rs index b1a8b580e..64ab88bb1 100644 --- a/etl-destinations/src/ducklake/client.rs +++ b/etl-destinations/src/ducklake/client.rs @@ -4,7 +4,7 @@ use std::{ borrow::Cow, error, fmt, sync::{ - Arc, LazyLock, + Arc, LazyLock, Mutex, Weak, atomic::{AtomicBool, AtomicU64, Ordering}, }, time::Duration, @@ -156,6 +156,38 @@ impl DuckDbQueryWatchdog { } } +/// Shared registry of interrupt handles for live DuckLake connections. +#[derive(Default)] +pub(super) struct DuckLakeInterruptRegistry { + handles: Mutex>>, +} + +impl DuckLakeInterruptRegistry { + /// Registers one live DuckLake connection interrupt handle. + fn register(&self, handle: &Arc) { + self.handles + .lock() + .expect("ducklake interrupt registry mutex should not be poisoned") + .push(Arc::downgrade(handle)); + } + + /// Interrupts all currently live registered DuckLake connections. + pub(super) fn interrupt_all(&self) -> usize { + let mut interrupted = 0; + let mut handles = + self.handles.lock().expect("ducklake interrupt registry mutex should not be poisoned"); + handles.retain(|handle| { + let Some(handle) = handle.upgrade() else { + return false; + }; + handle.interrupt(); + interrupted += 1; + true + }); + interrupted + } +} + /// Custom r2d2 connection manager that opens an in-memory DuckDB connection and /// attaches the DuckLake catalog on every `connect()` call. /// @@ -169,6 +201,8 @@ pub(super) struct DuckLakeConnectionManager { /// Disables DuckDB extension autoload/autoinstall when vendored local /// extensions are required. pub(super) disable_extension_autoload: bool, + /// Shared registry of interrupt handles for live connections. + pub(super) interrupt_registry: Arc, /// Counts successfully initialized DuckDB connections for tests. #[cfg(feature = "test-utils")] pub(super) open_count: Arc, @@ -316,6 +350,16 @@ impl Drop for LazyDuckLakePool { } impl DuckLakeConnectionManager { + /// Returns the shared interrupt registry for this manager and its clones. + pub(super) fn interrupt_registry(&self) -> Arc { + Arc::clone(&self.interrupt_registry) + } + + /// Interrupts all currently live managed DuckLake connections. + pub(super) fn interrupt_all_connections(&self) -> usize { + self.interrupt_registry.interrupt_all() + } + /// Opens one fully initialized DuckDB connection and attaches the lake /// catalog. pub(super) fn open_duckdb_connection( @@ -332,6 +376,7 @@ impl DuckLakeConnectionManager { } else { duckdb::Connection::open_in_memory().map_err(DuckLakeConnectionError::validation)? }; + self.interrupt_registry.register(&conn.interrupt_handle()); for step in self.setup_plan.steps() { let phase_started = Instant::now(); info!( @@ -410,6 +455,7 @@ pub(super) async fn build_warm_ducklake_pool( let pool = r2d2::Pool::builder() .max_size(pool_size) .min_idle(Some(pool_size)) + .connection_timeout(Duration::from_mins(4)) .test_on_check_out(true) // Callers log the returned pool initialization failure once, so // suppress r2d2's internal per-attempt logging here. @@ -602,7 +648,7 @@ where mod tests { use std::sync::Arc; - use tokio::sync::Semaphore; + use tokio::sync::{Semaphore, oneshot}; use super::*; @@ -610,6 +656,7 @@ mod tests { DuckLakeConnectionManager { setup_plan: Arc::new(DuckLakeSetupPlan::default()), disable_extension_autoload: cfg!(target_os = "linux"), + interrupt_registry: Arc::new(DuckLakeInterruptRegistry::default()), #[cfg(feature = "test-utils")] open_count: Arc::new(AtomicUsize::new(0)), } @@ -763,4 +810,62 @@ mod tests { deadline" ); } + + #[tokio::test] + async fn interrupt_all_connections_cancels_running_query() { + let manager = make_blocking_test_manager(); + let pool = Arc::new( + build_warm_ducklake_pool(manager.clone(), 1, "test") + .await + .expect("failed to build blocking test pool"), + ); + let blocking_slots = Arc::new(Semaphore::new(1)); + let (query_started_tx, query_started_rx) = oneshot::channel(); + + let query_task = tokio::spawn({ + let pool = Arc::clone(&pool); + let blocking_slots = Arc::clone(&blocking_slots); + async move { + run_duckdb_blocking_with_timeout( + pool, + blocking_slots, + DuckDbBlockingOperationKind::Foreground, + Duration::from_secs(30), + move |conn| -> EtlResult<()> { + let _ = query_started_tx.send(()); + conn.query_row( + "SELECT COUNT(*) FROM range(10000000) t1, range(1000000) t2;", + [], + |row| row.get::<_, i64>(0), + ) + .map_err(|source| { + etl_error!( + ErrorKind::DestinationQueryFailed, + "DuckLake interrupt test query failed", + source: source + ) + })?; + Ok(()) + }, + ) + .await + } + }); + + query_started_rx.await.expect("query should start before interrupt"); + tokio::time::sleep(Duration::from_millis(100)).await; + assert!( + manager.interrupt_all_connections() >= 1, + "expected at least one registered connection to be interrupted" + ); + + let error = tokio::time::timeout(Duration::from_secs(5), query_task) + .await + .expect("interrupted query should finish promptly") + .expect("interrupt test task should not panic") + .expect_err("interrupted query should fail"); + + assert_eq!(error.kind(), ErrorKind::DestinationQueryFailed); + assert_eq!(error.description(), Some("DuckLake interrupt test query failed")); + } } diff --git a/etl-destinations/src/ducklake/core.rs b/etl-destinations/src/ducklake/core.rs index 4b192a046..d0a72942d 100644 --- a/etl-destinations/src/ducklake/core.rs +++ b/etl-destinations/src/ducklake/core.rs @@ -1,13 +1,12 @@ #[cfg(feature = "test-utils")] use std::sync::atomic::AtomicUsize; -#[cfg(test)] -use std::time::Duration; use std::{ collections::{HashMap, HashSet}, sync::{ Arc, atomic::{AtomicBool, Ordering}, }, + time::Duration, }; use etl::{ @@ -32,7 +31,7 @@ use sqlx::{PgPool, postgres::PgPoolOptions}; #[cfg(feature = "test-utils")] use tokio::sync::oneshot; use tokio::{ - sync::{OwnedRwLockReadGuard, OwnedSemaphorePermit, RwLock, Semaphore}, + sync::{OwnedRwLockReadGuard, OwnedSemaphorePermit, RwLock, Semaphore, TryAcquireError}, task::JoinSet, }; use tracing::{debug, info}; @@ -51,8 +50,8 @@ use crate::{ retain_truncates_after_sequence_key, }, client::{ - DuckDbBlockingOperationKind, DuckLakeConnectionManager, build_warm_ducklake_pool, - format_query_error_detail, run_duckdb_blocking, + DuckDbBlockingOperationKind, DuckLakeConnectionManager, DuckLakeInterruptRegistry, + build_warm_ducklake_pool, format_query_error_detail, run_duckdb_blocking, }, config::{ MAINTENANCE_TARGET_FILE_SIZE, build_setup_plan, current_duckdb_extension_strategy, @@ -121,7 +120,6 @@ pub(super) fn is_create_table_conflict(error: &duckdb::Error, table_name: &str) /// deferred to coordinated maintenance. #[derive(Clone)] pub struct DuckLakeDestination { - #[cfg(feature = "test-utils")] manager: Arc, pool: Arc>, blocking_slots: Arc, @@ -158,6 +156,11 @@ where } async fn shutdown(&self) -> EtlResult<()> { + let interrupted_connections = self.manager.interrupt_all_connections(); + info!( + interrupted_connections, + "ducklake shutdown requested, interrupted active duckdb connections" + ); self.tasks.shutdown().await?; self.shutdown_maintenance_worker().await?; self.shutdown_metrics_sampler().await?; @@ -455,6 +458,7 @@ where let manager = Arc::new(DuckLakeConnectionManager { setup_plan: Arc::clone(&setup_plan), disable_extension_autoload, + interrupt_registry: Arc::new(DuckLakeInterruptRegistry::default()), #[cfg(feature = "test-utils")] open_count: Arc::new(AtomicUsize::new(0)), }); @@ -535,8 +539,7 @@ where let inline_flush_requested = Arc::new(AtomicBool::new(false)); let inline_flush_requests = Arc::new(PendingInlineFlushRequests::default()); let mut destination = Self { - #[cfg(feature = "test-utils")] - manager, + manager: Arc::clone(&manager), pool: Arc::clone(&pool), blocking_slots: Arc::clone(&blocking_slots), checkpoint_gate: Arc::clone(&checkpoint_gate), @@ -560,6 +563,7 @@ where DuckLakeConnectionManager { setup_plan: Arc::clone(&setup_plan), disable_extension_autoload, + interrupt_registry: manager.interrupt_registry(), #[cfg(feature = "test-utils")] open_count: Arc::new(AtomicUsize::new(0)), }, @@ -876,7 +880,16 @@ where join_set.spawn(async move { let _table_write_permit = table_write_permit; + let checkpoint_wait_started = tokio::time::Instant::now(); let _checkpoint_guard = checkpoint_gate.read_owned().await; + let checkpoint_wait = checkpoint_wait_started.elapsed(); + if checkpoint_wait > Duration::from_secs(1) { + info!( + table = %destination_table_name, + checkpoint_wait_ms = checkpoint_wait.as_millis() as u64, + "ducklake waited for checkpoint gate before streaming write" + ); + } let last_sequence_key = read_table_streaming_progress_sequence_key_blocking( Arc::clone(&pool), @@ -893,6 +906,13 @@ where ); return Ok::<(), etl::error::EtlError>(()); } + let is_first_streaming_batch = last_sequence_key.is_none(); + info!( + table = %destination_table_name, + pending_mutation_count = pending_mutations.len(), + is_first_streaming_batch, + "ducklake applying streaming mutations" + ); let maintenance_notification = maintenance_worker.as_ref().as_ref().map(|_| { @@ -907,6 +927,11 @@ where )?; apply_table_batches_with_retry(pool, blocking_slots, prepared_batches) .await?; + info!( + table = %destination_table_name, + is_first_streaming_batch, + "ducklake applied streaming mutations" + ); if let (Some(worker), Some(notification)) = (maintenance_worker.as_ref(), maintenance_notification) { @@ -1011,6 +1036,12 @@ where } } + info!( + table_id = %table_id, + table = %table_name, + "ducklake destination table cache miss, ensuring table exists" + ); + let _table_creation_permit = Arc::clone(&self.table_creation_slots).acquire_owned().await.map_err(|_| { etl_error!(ErrorKind::InvalidState, "DuckLake table creation semaphore closed") @@ -1050,12 +1081,16 @@ where Arc::clone(&self.blocking_slots), DuckDbBlockingOperationKind::Foreground, move |conn| -> EtlResult<()> { + debug!( + table = %table_name_clone, + "ducklake create table begin" + ); match conn.execute_batch(&qualified_ddl) { Ok(()) => { - created_tables.lock().insert(table_name_clone); + created_tables.lock().insert(table_name_clone.clone()); } Err(e) if is_create_table_conflict(&e, &table_name_clone) => { - created_tables.lock().insert(table_name_clone); + created_tables.lock().insert(table_name_clone.clone()); } Err(e) => { return Err(etl_error!( @@ -1066,6 +1101,10 @@ where )); } } + debug!( + table = %table_name_clone, + "ducklake create table finished" + ); Ok(()) }, ) @@ -1117,10 +1156,26 @@ where /// Serializes table-local truncate and CDC mutation writes. async fn acquire_table_write_slot(&self, table_name: &str) -> EtlResult { let table_slot = table_write_slot(&self.table_write_slots, table_name); - - table_slot.acquire_owned().await.map_err(|_| { - etl_error!(ErrorKind::InvalidState, "DuckLake table write semaphore closed") - }) + match Arc::clone(&table_slot).try_acquire_owned() { + Ok(permit) => Ok(permit), + Err(TryAcquireError::NoPermits) => { + info!( + table = %table_name, + "ducklake waiting for table write slot" + ); + let permit = table_slot.acquire_owned().await.map_err(|_| { + etl_error!(ErrorKind::InvalidState, "DuckLake table write semaphore closed") + })?; + info!( + table = %table_name, + "ducklake acquired table write slot after wait" + ); + Ok(permit) + } + Err(TryAcquireError::Closed) => { + Err(etl_error!(ErrorKind::InvalidState, "DuckLake table write semaphore closed")) + } + } } /// Acquires shared mutation access so exclusive background maintenance diff --git a/etl-destinations/src/ducklake/maintenance.rs b/etl-destinations/src/ducklake/maintenance.rs index 26f1c61ed..3239c9b69 100644 --- a/etl-destinations/src/ducklake/maintenance.rs +++ b/etl-destinations/src/ducklake/maintenance.rs @@ -40,7 +40,7 @@ use crate::ducklake::{ /// Dedicated pool size for background DuckLake maintenance work. const MAINTENANCE_POOL_SIZE: u32 = 1; /// Poll interval for checking per-table inline flush thresholds. -const MAINTENANCE_FLUSH_POLL_INTERVAL: Duration = Duration::from_secs(30); +const MAINTENANCE_FLUSH_POLL_INTERVAL: Duration = Duration::from_mins(2); /// Fixed cadence for expiring old DuckLake snapshots. const MAINTENANCE_EXPIRE_SNAPSHOTS_INTERVAL: Duration = Duration::from_secs(5 * 60 * 60); /// Fixed cadence for cleaning up old DuckLake files. @@ -2054,6 +2054,9 @@ mod tests { let manager = DuckLakeConnectionManager { setup_plan: Arc::new(crate::ducklake::config::DuckLakeSetupPlan::default()), disable_extension_autoload: cfg!(target_os = "linux"), + interrupt_registry: Arc::new( + crate::ducklake::client::DuckLakeInterruptRegistry::default(), + ), open_count: Arc::clone(&open_count), }; let pool = Arc::new( @@ -2110,6 +2113,9 @@ mod tests { DuckLakeConnectionManager { setup_plan: Arc::new(crate::ducklake::config::DuckLakeSetupPlan::default()), disable_extension_autoload: cfg!(target_os = "linux"), + interrupt_registry: Arc::new( + crate::ducklake::client::DuckLakeInterruptRegistry::default(), + ), open_count: Arc::clone(&open_count), }, Arc::new(RwLock::new(())), @@ -2140,6 +2146,9 @@ mod tests { DuckLakeConnectionManager { setup_plan: Arc::new(crate::ducklake::config::DuckLakeSetupPlan::default()), disable_extension_autoload: cfg!(target_os = "linux"), + interrupt_registry: Arc::new( + crate::ducklake::client::DuckLakeInterruptRegistry::default(), + ), open_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)), }, 1,