Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 107 additions & 2 deletions etl-destinations/src/ducklake/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
borrow::Cow,
error, fmt,
sync::{
Arc, LazyLock,
Arc, LazyLock, Mutex, Weak,
atomic::{AtomicBool, AtomicU64, Ordering},
},
time::Duration,
Expand Down Expand Up @@ -156,6 +156,38 @@ impl DuckDbQueryWatchdog {
}
}

/// Shared registry of interrupt handles for live DuckLake connections.
#[derive(Default)]
pub(super) struct DuckLakeInterruptRegistry {
handles: Mutex<Vec<Weak<duckdb::InterruptHandle>>>,
}

impl DuckLakeInterruptRegistry {
/// Registers one live DuckLake connection interrupt handle.
fn register(&self, handle: &Arc<duckdb::InterruptHandle>) {
self.handles
.lock()
.expect("ducklake interrupt registry mutex should not be poisoned")
.push(Arc::downgrade(handle));
}

/// Interrupts all currently live registered DuckLake connections.
pub(super) fn interrupt_all(&self) -> usize {
let mut interrupted = 0;
let mut handles =
self.handles.lock().expect("ducklake interrupt registry mutex should not be poisoned");
handles.retain(|handle| {
let Some(handle) = handle.upgrade() else {
return false;
};
handle.interrupt();
interrupted += 1;
true
});
interrupted
}
}

/// Custom r2d2 connection manager that opens an in-memory DuckDB connection and
/// attaches the DuckLake catalog on every `connect()` call.
///
Expand All @@ -169,6 +201,8 @@ pub(super) struct DuckLakeConnectionManager {
/// Disables DuckDB extension autoload/autoinstall when vendored local
/// extensions are required.
pub(super) disable_extension_autoload: bool,
/// Shared registry of interrupt handles for live connections.
pub(super) interrupt_registry: Arc<DuckLakeInterruptRegistry>,
/// Counts successfully initialized DuckDB connections for tests.
#[cfg(feature = "test-utils")]
pub(super) open_count: Arc<AtomicUsize>,
Expand Down Expand Up @@ -316,6 +350,16 @@ impl Drop for LazyDuckLakePool {
}

impl DuckLakeConnectionManager {
/// Returns the shared interrupt registry for this manager and its clones.
pub(super) fn interrupt_registry(&self) -> Arc<DuckLakeInterruptRegistry> {
Arc::clone(&self.interrupt_registry)
}

/// Interrupts all currently live managed DuckLake connections.
pub(super) fn interrupt_all_connections(&self) -> usize {
self.interrupt_registry.interrupt_all()
}

/// Opens one fully initialized DuckDB connection and attaches the lake
/// catalog.
pub(super) fn open_duckdb_connection(
Expand All @@ -332,6 +376,7 @@ impl DuckLakeConnectionManager {
} else {
duckdb::Connection::open_in_memory().map_err(DuckLakeConnectionError::validation)?
};
self.interrupt_registry.register(&conn.interrupt_handle());
for step in self.setup_plan.steps() {
let phase_started = Instant::now();
info!(
Expand Down Expand Up @@ -410,6 +455,7 @@ pub(super) async fn build_warm_ducklake_pool(
let pool = r2d2::Pool::builder()
.max_size(pool_size)
.min_idle(Some(pool_size))
.connection_timeout(Duration::from_mins(4))
.test_on_check_out(true)
// Callers log the returned pool initialization failure once, so
// suppress r2d2's internal per-attempt logging here.
Expand Down Expand Up @@ -602,14 +648,15 @@ where
mod tests {
use std::sync::Arc;

use tokio::sync::Semaphore;
use tokio::sync::{Semaphore, oneshot};

use super::*;

fn make_blocking_test_manager() -> DuckLakeConnectionManager {
DuckLakeConnectionManager {
setup_plan: Arc::new(DuckLakeSetupPlan::default()),
disable_extension_autoload: cfg!(target_os = "linux"),
interrupt_registry: Arc::new(DuckLakeInterruptRegistry::default()),
#[cfg(feature = "test-utils")]
open_count: Arc::new(AtomicUsize::new(0)),
}
Expand Down Expand Up @@ -763,4 +810,62 @@ mod tests {
deadline"
);
}

#[tokio::test]
async fn interrupt_all_connections_cancels_running_query() {
let manager = make_blocking_test_manager();
let pool = Arc::new(
build_warm_ducklake_pool(manager.clone(), 1, "test")
.await
.expect("failed to build blocking test pool"),
);
let blocking_slots = Arc::new(Semaphore::new(1));
let (query_started_tx, query_started_rx) = oneshot::channel();

let query_task = tokio::spawn({
let pool = Arc::clone(&pool);
let blocking_slots = Arc::clone(&blocking_slots);
async move {
run_duckdb_blocking_with_timeout(
pool,
blocking_slots,
DuckDbBlockingOperationKind::Foreground,
Duration::from_secs(30),
move |conn| -> EtlResult<()> {
let _ = query_started_tx.send(());
conn.query_row(
"SELECT COUNT(*) FROM range(10000000) t1, range(1000000) t2;",
[],
|row| row.get::<_, i64>(0),
)
.map_err(|source| {
etl_error!(
ErrorKind::DestinationQueryFailed,
"DuckLake interrupt test query failed",
source: source
)
})?;
Ok(())
},
)
.await
}
});

query_started_rx.await.expect("query should start before interrupt");
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(
manager.interrupt_all_connections() >= 1,
"expected at least one registered connection to be interrupted"
);

let error = tokio::time::timeout(Duration::from_secs(5), query_task)
.await
.expect("interrupted query should finish promptly")
.expect("interrupt test task should not panic")
.expect_err("interrupted query should fail");

assert_eq!(error.kind(), ErrorKind::DestinationQueryFailed);
assert_eq!(error.description(), Some("DuckLake interrupt test query failed"));
}
}
83 changes: 69 additions & 14 deletions etl-destinations/src/ducklake/core.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
#[cfg(feature = "test-utils")]
use std::sync::atomic::AtomicUsize;
#[cfg(test)]
use std::time::Duration;
use std::{
collections::{HashMap, HashSet},
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
time::Duration,
};

use etl::{
Expand All @@ -32,7 +31,7 @@ use sqlx::{PgPool, postgres::PgPoolOptions};
#[cfg(feature = "test-utils")]
use tokio::sync::oneshot;
use tokio::{
sync::{OwnedRwLockReadGuard, OwnedSemaphorePermit, RwLock, Semaphore},
sync::{OwnedRwLockReadGuard, OwnedSemaphorePermit, RwLock, Semaphore, TryAcquireError},
task::JoinSet,
};
use tracing::{debug, info};
Expand All @@ -51,8 +50,8 @@ use crate::{
retain_truncates_after_sequence_key,
},
client::{
DuckDbBlockingOperationKind, DuckLakeConnectionManager, build_warm_ducklake_pool,
format_query_error_detail, run_duckdb_blocking,
DuckDbBlockingOperationKind, DuckLakeConnectionManager, DuckLakeInterruptRegistry,
build_warm_ducklake_pool, format_query_error_detail, run_duckdb_blocking,
},
config::{
MAINTENANCE_TARGET_FILE_SIZE, build_setup_plan, current_duckdb_extension_strategy,
Expand Down Expand Up @@ -121,7 +120,6 @@ pub(super) fn is_create_table_conflict(error: &duckdb::Error, table_name: &str)
/// deferred to coordinated maintenance.
#[derive(Clone)]
pub struct DuckLakeDestination<S> {
#[cfg(feature = "test-utils")]
manager: Arc<DuckLakeConnectionManager>,
pool: Arc<r2d2::Pool<DuckLakeConnectionManager>>,
blocking_slots: Arc<Semaphore>,
Expand Down Expand Up @@ -158,6 +156,11 @@ where
}

async fn shutdown(&self) -> EtlResult<()> {
let interrupted_connections = self.manager.interrupt_all_connections();
info!(
interrupted_connections,
"ducklake shutdown requested, interrupted active duckdb connections"
);
self.tasks.shutdown().await?;
self.shutdown_maintenance_worker().await?;
self.shutdown_metrics_sampler().await?;
Expand Down Expand Up @@ -455,6 +458,7 @@ where
let manager = Arc::new(DuckLakeConnectionManager {
setup_plan: Arc::clone(&setup_plan),
disable_extension_autoload,
interrupt_registry: Arc::new(DuckLakeInterruptRegistry::default()),
#[cfg(feature = "test-utils")]
open_count: Arc::new(AtomicUsize::new(0)),
});
Expand Down Expand Up @@ -535,8 +539,7 @@ where
let inline_flush_requested = Arc::new(AtomicBool::new(false));
let inline_flush_requests = Arc::new(PendingInlineFlushRequests::default());
let mut destination = Self {
#[cfg(feature = "test-utils")]
manager,
manager: Arc::clone(&manager),
pool: Arc::clone(&pool),
blocking_slots: Arc::clone(&blocking_slots),
checkpoint_gate: Arc::clone(&checkpoint_gate),
Expand All @@ -560,6 +563,7 @@ where
DuckLakeConnectionManager {
setup_plan: Arc::clone(&setup_plan),
disable_extension_autoload,
interrupt_registry: manager.interrupt_registry(),
#[cfg(feature = "test-utils")]
open_count: Arc::new(AtomicUsize::new(0)),
},
Expand Down Expand Up @@ -876,7 +880,16 @@ where

join_set.spawn(async move {
let _table_write_permit = table_write_permit;
let checkpoint_wait_started = tokio::time::Instant::now();
let _checkpoint_guard = checkpoint_gate.read_owned().await;
let checkpoint_wait = checkpoint_wait_started.elapsed();
if checkpoint_wait > Duration::from_secs(1) {
info!(
table = %destination_table_name,
checkpoint_wait_ms = checkpoint_wait.as_millis() as u64,
"ducklake waited for checkpoint gate before streaming write"
);
}
let last_sequence_key =
read_table_streaming_progress_sequence_key_blocking(
Arc::clone(&pool),
Expand All @@ -893,6 +906,13 @@ where
);
return Ok::<(), etl::error::EtlError>(());
}
let is_first_streaming_batch = last_sequence_key.is_none();
info!(
table = %destination_table_name,
pending_mutation_count = pending_mutations.len(),
is_first_streaming_batch,
"ducklake applying streaming mutations"
);

let maintenance_notification =
maintenance_worker.as_ref().as_ref().map(|_| {
Expand All @@ -907,6 +927,11 @@ where
)?;
apply_table_batches_with_retry(pool, blocking_slots, prepared_batches)
.await?;
info!(
table = %destination_table_name,
is_first_streaming_batch,
"ducklake applied streaming mutations"
);
if let (Some(worker), Some(notification)) =
(maintenance_worker.as_ref(), maintenance_notification)
{
Expand Down Expand Up @@ -1011,6 +1036,12 @@ where
}
}

info!(
table_id = %table_id,
table = %table_name,
"ducklake destination table cache miss, ensuring table exists"
);

let _table_creation_permit =
Arc::clone(&self.table_creation_slots).acquire_owned().await.map_err(|_| {
etl_error!(ErrorKind::InvalidState, "DuckLake table creation semaphore closed")
Expand Down Expand Up @@ -1050,12 +1081,16 @@ where
Arc::clone(&self.blocking_slots),
DuckDbBlockingOperationKind::Foreground,
move |conn| -> EtlResult<()> {
debug!(
table = %table_name_clone,
"ducklake create table begin"
);
match conn.execute_batch(&qualified_ddl) {
Ok(()) => {
created_tables.lock().insert(table_name_clone);
created_tables.lock().insert(table_name_clone.clone());
}
Err(e) if is_create_table_conflict(&e, &table_name_clone) => {
created_tables.lock().insert(table_name_clone);
created_tables.lock().insert(table_name_clone.clone());
}
Err(e) => {
return Err(etl_error!(
Expand All @@ -1066,6 +1101,10 @@ where
));
}
}
debug!(
table = %table_name_clone,
"ducklake create table finished"
);
Ok(())
},
)
Expand Down Expand Up @@ -1117,10 +1156,26 @@ where
/// Serializes table-local truncate and CDC mutation writes.
async fn acquire_table_write_slot(&self, table_name: &str) -> EtlResult<OwnedSemaphorePermit> {
let table_slot = table_write_slot(&self.table_write_slots, table_name);

table_slot.acquire_owned().await.map_err(|_| {
etl_error!(ErrorKind::InvalidState, "DuckLake table write semaphore closed")
})
match Arc::clone(&table_slot).try_acquire_owned() {
Ok(permit) => Ok(permit),
Err(TryAcquireError::NoPermits) => {
info!(
table = %table_name,
"ducklake waiting for table write slot"
);
let permit = table_slot.acquire_owned().await.map_err(|_| {
etl_error!(ErrorKind::InvalidState, "DuckLake table write semaphore closed")
})?;
info!(
table = %table_name,
"ducklake acquired table write slot after wait"
);
Ok(permit)
}
Err(TryAcquireError::Closed) => {
Err(etl_error!(ErrorKind::InvalidState, "DuckLake table write semaphore closed"))
}
}
}

/// Acquires shared mutation access so exclusive background maintenance
Expand Down
3 changes: 3 additions & 0 deletions etl-destinations/src/ducklake/maintenance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2051,9 +2051,10 @@
#[tokio::test]
async fn known_rewrite_single_output_file_error_is_suppressed_and_recycles_connection() {
let open_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let manager = DuckLakeConnectionManager {

Check warning on line 2054 in etl-destinations/src/ducklake/maintenance.rs

View workflow job for this annotation

GitHub Actions / Lint (fmt)

Diff in /home/runner/_work/etl/etl/etl-destinations/src/ducklake/maintenance.rs
setup_plan: Arc::new(crate::ducklake::config::DuckLakeSetupPlan::default()),
disable_extension_autoload: cfg!(target_os = "linux"),
interrupt_registry: Arc::new(crate::ducklake::client::DuckLakeInterruptRegistry::default()),
open_count: Arc::clone(&open_count),
};
let pool = Arc::new(
Expand Down Expand Up @@ -2107,9 +2108,10 @@
async fn spawn_ducklake_maintenance_worker_warms_pool_in_background() {
let open_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let worker = spawn_ducklake_maintenance_worker(
DuckLakeConnectionManager {

Check warning on line 2111 in etl-destinations/src/ducklake/maintenance.rs

View workflow job for this annotation

GitHub Actions / Lint (fmt)

Diff in /home/runner/_work/etl/etl/etl-destinations/src/ducklake/maintenance.rs
setup_plan: Arc::new(crate::ducklake::config::DuckLakeSetupPlan::default()),
disable_extension_autoload: cfg!(target_os = "linux"),
interrupt_registry: Arc::new(crate::ducklake::client::DuckLakeInterruptRegistry::default()),
open_count: Arc::clone(&open_count),
},
Arc::new(RwLock::new(())),
Expand Down Expand Up @@ -2137,9 +2139,10 @@
async fn requested_inline_flush_stays_pending_when_mutation_guard_is_active() {
let pool = Arc::new(
build_warm_ducklake_pool(
DuckLakeConnectionManager {

Check warning on line 2142 in etl-destinations/src/ducklake/maintenance.rs

View workflow job for this annotation

GitHub Actions / Lint (fmt)

Diff in /home/runner/_work/etl/etl/etl-destinations/src/ducklake/maintenance.rs
setup_plan: Arc::new(crate::ducklake::config::DuckLakeSetupPlan::default()),
disable_extension_autoload: cfg!(target_os = "linux"),
interrupt_registry: Arc::new(crate::ducklake::client::DuckLakeInterruptRegistry::default()),
open_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
},
1,
Expand Down
Loading