From 760b7327503042369a8eaac673f8f441814d1ee1 Mon Sep 17 00:00:00 2001 From: Filipe Silva Date: Fri, 8 May 2026 16:38:59 +0100 Subject: [PATCH 1/4] feat: clickhouse staging-optimized Fix https://github.com/dlt-hub/dlt/issues/3926 --- .../impl/clickhouse/clickhouse.py | 66 ++++++++++- dlt/destinations/impl/clickhouse/factory.py | 6 +- .../dlt-ecosystem/destinations/clickhouse.md | 2 + .../test_clickhouse_configuration.py | 27 +++++ .../load/pipeline/test_replace_disposition.py | 108 +++++++++++++++++- 5 files changed, 205 insertions(+), 4 deletions(-) diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index 98481e59b5..d0ae27e06f 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -1,6 +1,6 @@ from copy import deepcopy from textwrap import dedent -from typing import Any, Dict, Literal, Optional, List, Sequence, cast +from typing import Any, Dict, Iterable, Literal, Optional, List, Sequence, cast from urllib.parse import ParseResult, urlparse import clickhouse_connect @@ -31,6 +31,8 @@ from dlt.common.storages import FileStorage from dlt.common.storages.configuration import FilesystemConfiguration, ensure_canonical_az_url from dlt.common.storages.fsspec_filesystem import AZURE_BLOB_STORAGE_PROTOCOLS +from dlt.common.storages.load_package import ParsedLoadJobFileName +from dlt.common.destination.exceptions import DestinationTerminalException from dlt.destinations.exceptions import LoadJobTerminalException from dlt.destinations.impl.clickhouse.configuration import ( ClickHouseClientConfiguration, @@ -62,7 +64,7 @@ ) from dlt.destinations.job_impl import ReferenceFollowupJobRequest from dlt.destinations.sql_client import SqlClientBase -from dlt.destinations.sql_jobs import SqlMergeFollowupJob +from dlt.destinations.sql_jobs import SqlMergeFollowupJob, SqlStagingReplaceFollowupJob from dlt.destinations.utils import get_deterministic_temp_table_name from dlt.destinations.path_utils import get_file_format_and_compression @@ -296,6 +298,30 @@ def requires_temp_table_for_delete(cls) -> bool: return False +class ClickHouseStagingReplaceJob(SqlStagingReplaceFollowupJob): + """Atomic staging-optimized replace via `EXCHANGE TABLES`. + + Requires the destination database to use the `Atomic` or `Shared` engine. + The follow-up `TRUNCATE` clears the (now-old) data left in the staging slot + so it does not retain previous production data between loads. + """ + + @classmethod + def generate_sql( + cls, + table_chain: Sequence[PreparedTableSchema], + sql_client: SqlClientBase[Any], + ) -> List[str]: + sql: List[str] = [] + for table in table_chain: + with sql_client.with_staging_dataset(): + staging_table_name = sql_client.make_qualified_table_name(table["name"]) + table_name = sql_client.make_qualified_table_name(table["name"]) + sql.append(f"EXCHANGE TABLES {staging_table_name} AND {table_name}") + sql.append(f"TRUNCATE TABLE {staging_table_name}") + return sql + + class ClickHouseClient(SqlJobClientWithStagingDataset, SupportsStagingDestination): def __init__( self, @@ -348,6 +374,42 @@ def _create_merge_followup_jobs( ) -> List[FollowupJobRequest]: return [ClickHouseMergeJob.from_table_chain(table_chain, self.sql_client)] + def _create_replace_followup_jobs( + self, table_chain: Sequence[PreparedTableSchema] + ) -> List[FollowupJobRequest]: + root_table = table_chain[0] + if root_table["x-replace-strategy"] == "staging-optimized": # type: ignore[typeddict-item] + return [ClickHouseStagingReplaceJob.from_table_chain(table_chain, self.sql_client)] + return super()._create_replace_followup_jobs(table_chain) + + def verify_schema( + self, + only_tables: Iterable[str] = None, + new_jobs: Iterable[ParsedLoadJobFileName] = None, + ) -> List[PreparedTableSchema]: + loaded_tables = super().verify_schema(only_tables, new_jobs) + # probe the database engine early so staging-optimized fails fast at init, + # before any data is extracted or loaded into staging + if any( + table.get("x-replace-strategy") == "staging-optimized" for table in loaded_tables + ): + self._verify_database_supports_exchange() + return loaded_tables + + def _verify_database_supports_exchange(self) -> None: + # `EXCHANGE TABLES` is only supported on Atomic and Shared database engines + result = self.sql_client.execute_sql( + "SELECT engine FROM system.databases WHERE name = currentDatabase()" + ) + engine = result[0][0] if result else "unknown" + if engine not in ("Atomic", "Shared"): + raise DestinationTerminalException( + "ClickHouse replace_strategy='staging-optimized' requires the Atomic or Shared" + f" database engine to use EXCHANGE TABLES (current: {engine}). Either choose" + " 'insert-from-staging' or 'truncate-and-insert', or recreate the database with" + " ENGINE = Atomic." + ) + def _get_column_def_sql(self, c: TColumnSchema, table: PreparedTableSchema = None) -> str: # Build column definition. # The primary key and sort order definition is defined outside column specification. diff --git a/dlt/destinations/impl/clickhouse/factory.py b/dlt/destinations/impl/clickhouse/factory.py index 39a0e9c823..f3089f529d 100644 --- a/dlt/destinations/impl/clickhouse/factory.py +++ b/dlt/destinations/impl/clickhouse/factory.py @@ -208,7 +208,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.supports_truncate_command = True caps.supported_merge_strategies = ["delete-insert", "scd2"] - caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"] + caps.supported_replace_strategies = [ + "truncate-and-insert", + "insert-from-staging", + "staging-optimized", + ] caps.enforces_nulls_on_alter = False caps.sqlglot_dialect = "clickhouse" diff --git a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md index 11120c3a24..99b96f7d15 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md +++ b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md @@ -112,6 +112,8 @@ select_sequential_consistency = 1 # Ensures read-after-wri All [write dispositions](../../general-usage/incremental-loading#choosing-a-write-disposition) are supported. +If you set the [`replace` strategy](../../general-usage/full-loading.md) to `staging-optimized`, the destination tables will be atomically swapped with by the staging tables using via [`EXCHANGE TABLES`](https://clickhouse.com/docs/en/sql-reference/statements/exchange) (requires `Atomic` or `Shared` database engine). + ## Data loading Data is loaded into ClickHouse using the most efficient method depending on the data source: diff --git a/tests/load/clickhouse/test_clickhouse_configuration.py b/tests/load/clickhouse/test_clickhouse_configuration.py index ee6b3562f1..bd9ca31634 100644 --- a/tests/load/clickhouse/test_clickhouse_configuration.py +++ b/tests/load/clickhouse/test_clickhouse_configuration.py @@ -1,8 +1,10 @@ from typing import Iterator import pytest +from pytest_mock import MockerFixture from dlt.common.configuration.resolve import resolve_configuration +from dlt.common.destination.exceptions import DestinationTerminalException from dlt.common.libs.sql_alchemy_compat import make_url from dlt.common.utils import custom_environ, digest128 from dlt.destinations import clickhouse @@ -146,3 +148,28 @@ def test_client_table_name_and_paths(client: ClickHouseClient) -> None: parts = client.sql_client.make_qualified_table_name_path("test_table", quote=False) assert parts[0] in databases assert parts[1] == f"{dataset_name}###test_table" + + +@pytest.mark.parametrize("engine", ["Atomic", "Shared"]) +def test_staging_optimized_accepts_supported_engines( + client: ClickHouseClient, mocker: MockerFixture, engine: str +) -> None: + mocker.patch.object(client.sql_client, "execute_sql", return_value=[(engine,)]) + client._verify_database_supports_exchange() + + +@pytest.mark.parametrize("engine", ["Ordinary", "Replicated", "Lazy"]) +def test_staging_optimized_rejects_unsupported_engines( + client: ClickHouseClient, mocker: MockerFixture, engine: str +) -> None: + mocker.patch.object(client.sql_client, "execute_sql", return_value=[(engine,)]) + with pytest.raises(DestinationTerminalException, match="Atomic or Shared"): + client._verify_database_supports_exchange() + + +def test_staging_optimized_rejects_missing_engine( + client: ClickHouseClient, mocker: MockerFixture +) -> None: + mocker.patch.object(client.sql_client, "execute_sql", return_value=[]) + with pytest.raises(DestinationTerminalException, match="unknown"): + client._verify_database_supports_exchange() diff --git a/tests/load/pipeline/test_replace_disposition.py b/tests/load/pipeline/test_replace_disposition.py index 124bc0be51..e7532e1944 100644 --- a/tests/load/pipeline/test_replace_disposition.py +++ b/tests/load/pipeline/test_replace_disposition.py @@ -410,6 +410,11 @@ def test_replace_sql_queries( destination_spy = mocker.spy(MsSqlStagingReplaceJob, "generate_sql") + elif dest_type == "clickhouse": + from dlt.destinations.impl.clickhouse.clickhouse import ClickHouseStagingReplaceJob + + destination_spy = mocker.spy(ClickHouseStagingReplaceJob, "generate_sql") + pipeline = destination_config.setup_pipeline( f"insert_from_staging_test_{uniq_id()}", dev_mode=True ) @@ -441,7 +446,7 @@ def test_replace_sql_queries( ) elif replace_strategy == "staging-optimized": - if dest_type in ["postgres", "mssql"]: + if dest_type in ["postgres", "mssql", "clickhouse"]: assert destination_spy.call_count == 1 else: assert clone_sql_generator_spy.call_count == 1 @@ -525,3 +530,104 @@ def load_items_empty(): info = pipeline.run(load_items_empty(), **destination_config.run_kwargs) assert_load_info(info) assert_empty_tables(pipeline, "items", "items__sub_items") + + +@pytest.mark.essential +@pytest.mark.parametrize( + "destination_config", + destinations_configs( + default_sql_configs=True, + subset=["clickhouse"], + ), + ids=lambda x: x.name, +) +def test_clickhouse_atomic_swap_replace( + destination_config: DestinationTestConfiguration, + mocker: MockerFixture, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test ClickHouse atomic swap via EXCHANGE TABLES with sequential loads, nested tables, and empty resource.""" + from dlt.destinations.sql_jobs import SqlStagingFollowupJob, SqlStagingReplaceFollowupJob + from dlt.destinations.impl.clickhouse.clickhouse import ClickHouseStagingReplaceJob + + monkeypatch.setenv("DESTINATION__REPLACE_STRATEGY", "staging-optimized") + + clone_spy = mocker.spy(SqlStagingReplaceFollowupJob, "_generate_clone_sql") + insert_spy = mocker.spy(SqlStagingFollowupJob, "_generate_insert_sql") + swap_spy = mocker.spy(ClickHouseStagingReplaceJob, "generate_sql") + + pipeline = destination_config.setup_pipeline("clickhouse_atomic_swap_test", dev_mode=True) + + @dlt.resource(name="items", write_disposition="replace", primary_key="id") + def load_items(offset): + for i in range(offset, offset + 3): + yield { + "id": i, + "name": f"item {i}", + "sub_items": [ + {"id": i * 100 + 1, "name": f"sub {i * 100 + 1}"}, + {"id": i * 100 + 2, "name": f"sub {i * 100 + 2}"}, + ], + } + + # first load: nested data via swap + info = pipeline.run(load_items(0), **destination_config.run_kwargs) + assert_load_info(info) + + assert swap_spy.call_count == 1 + assert clone_spy.call_count == 0 + assert insert_spy.call_count == 0 + for sql_stmt in swap_spy.spy_return: + assert "EXCHANGE TABLES" in sql_stmt or "TRUNCATE TABLE" in sql_stmt + assert any("EXCHANGE TABLES" in sql for sql in swap_spy.spy_return) + + assert load_table_counts(pipeline, "items", "items__sub_items") == { + "items": 3, + "items__sub_items": 6, + } + + # second load: different data, verifies full replacement after prior swap + swap_spy.reset_mock() + info = pipeline.run(load_items(100), **destination_config.run_kwargs) + assert_load_info(info) + + assert swap_spy.call_count == 1 + assert load_table_counts(pipeline, "items", "items__sub_items") == { + "items": 3, + "items__sub_items": 6, + } + table_dicts = load_tables_to_dicts(pipeline, "items") + assert {int(r["id"]) for r in table_dicts["items"]} == {100, 101, 102} + + # third load: schema evolution adds a new column, EXCHANGE must work after ALTER + @dlt.resource(name="items", write_disposition="replace", primary_key="id") + def load_items_with_new_column(offset): + for i in range(offset, offset + 3): + yield { + "id": i, + "name": f"item {i}", + "category": f"cat {i}", + "sub_items": [ + {"id": i * 100 + 1, "name": f"sub {i * 100 + 1}"}, + ], + } + + swap_spy.reset_mock() + info = pipeline.run(load_items_with_new_column(200), **destination_config.run_kwargs) + assert_load_info(info) + assert swap_spy.call_count == 1 + assert "category" in pipeline.default_schema.get_table("items")["columns"] + table_dicts = load_tables_to_dicts(pipeline, "items") + assert {int(r["id"]) for r in table_dicts["items"]} == {200, 201, 202} + assert {r["category"] for r in table_dicts["items"]} == {"cat 200", "cat 201", "cat 202"} + + # fourth load: empty resource clears all tables + @dlt.resource(name="items", write_disposition="replace", primary_key="id") + def load_items_empty(): + if False: + yield + + swap_spy.reset_mock() + info = pipeline.run(load_items_empty(), **destination_config.run_kwargs) + assert_load_info(info) + assert_empty_tables(pipeline, "items", "items__sub_items") From 05060287217513a19497d01669908f2ff7859b97 Mon Sep 17 00:00:00 2001 From: Filipe Silva Date: Thu, 4 Jun 2026 16:26:44 +0100 Subject: [PATCH 2/4] test: remove unnecessary test --- .../load/pipeline/test_replace_disposition.py | 101 ------------------ 1 file changed, 101 deletions(-) diff --git a/tests/load/pipeline/test_replace_disposition.py b/tests/load/pipeline/test_replace_disposition.py index e7532e1944..5a8fc750d8 100644 --- a/tests/load/pipeline/test_replace_disposition.py +++ b/tests/load/pipeline/test_replace_disposition.py @@ -530,104 +530,3 @@ def load_items_empty(): info = pipeline.run(load_items_empty(), **destination_config.run_kwargs) assert_load_info(info) assert_empty_tables(pipeline, "items", "items__sub_items") - - -@pytest.mark.essential -@pytest.mark.parametrize( - "destination_config", - destinations_configs( - default_sql_configs=True, - subset=["clickhouse"], - ), - ids=lambda x: x.name, -) -def test_clickhouse_atomic_swap_replace( - destination_config: DestinationTestConfiguration, - mocker: MockerFixture, - monkeypatch: pytest.MonkeyPatch, -) -> None: - """Test ClickHouse atomic swap via EXCHANGE TABLES with sequential loads, nested tables, and empty resource.""" - from dlt.destinations.sql_jobs import SqlStagingFollowupJob, SqlStagingReplaceFollowupJob - from dlt.destinations.impl.clickhouse.clickhouse import ClickHouseStagingReplaceJob - - monkeypatch.setenv("DESTINATION__REPLACE_STRATEGY", "staging-optimized") - - clone_spy = mocker.spy(SqlStagingReplaceFollowupJob, "_generate_clone_sql") - insert_spy = mocker.spy(SqlStagingFollowupJob, "_generate_insert_sql") - swap_spy = mocker.spy(ClickHouseStagingReplaceJob, "generate_sql") - - pipeline = destination_config.setup_pipeline("clickhouse_atomic_swap_test", dev_mode=True) - - @dlt.resource(name="items", write_disposition="replace", primary_key="id") - def load_items(offset): - for i in range(offset, offset + 3): - yield { - "id": i, - "name": f"item {i}", - "sub_items": [ - {"id": i * 100 + 1, "name": f"sub {i * 100 + 1}"}, - {"id": i * 100 + 2, "name": f"sub {i * 100 + 2}"}, - ], - } - - # first load: nested data via swap - info = pipeline.run(load_items(0), **destination_config.run_kwargs) - assert_load_info(info) - - assert swap_spy.call_count == 1 - assert clone_spy.call_count == 0 - assert insert_spy.call_count == 0 - for sql_stmt in swap_spy.spy_return: - assert "EXCHANGE TABLES" in sql_stmt or "TRUNCATE TABLE" in sql_stmt - assert any("EXCHANGE TABLES" in sql for sql in swap_spy.spy_return) - - assert load_table_counts(pipeline, "items", "items__sub_items") == { - "items": 3, - "items__sub_items": 6, - } - - # second load: different data, verifies full replacement after prior swap - swap_spy.reset_mock() - info = pipeline.run(load_items(100), **destination_config.run_kwargs) - assert_load_info(info) - - assert swap_spy.call_count == 1 - assert load_table_counts(pipeline, "items", "items__sub_items") == { - "items": 3, - "items__sub_items": 6, - } - table_dicts = load_tables_to_dicts(pipeline, "items") - assert {int(r["id"]) for r in table_dicts["items"]} == {100, 101, 102} - - # third load: schema evolution adds a new column, EXCHANGE must work after ALTER - @dlt.resource(name="items", write_disposition="replace", primary_key="id") - def load_items_with_new_column(offset): - for i in range(offset, offset + 3): - yield { - "id": i, - "name": f"item {i}", - "category": f"cat {i}", - "sub_items": [ - {"id": i * 100 + 1, "name": f"sub {i * 100 + 1}"}, - ], - } - - swap_spy.reset_mock() - info = pipeline.run(load_items_with_new_column(200), **destination_config.run_kwargs) - assert_load_info(info) - assert swap_spy.call_count == 1 - assert "category" in pipeline.default_schema.get_table("items")["columns"] - table_dicts = load_tables_to_dicts(pipeline, "items") - assert {int(r["id"]) for r in table_dicts["items"]} == {200, 201, 202} - assert {r["category"] for r in table_dicts["items"]} == {"cat 200", "cat 201", "cat 202"} - - # fourth load: empty resource clears all tables - @dlt.resource(name="items", write_disposition="replace", primary_key="id") - def load_items_empty(): - if False: - yield - - swap_spy.reset_mock() - info = pipeline.run(load_items_empty(), **destination_config.run_kwargs) - assert_load_info(info) - assert_empty_tables(pipeline, "items", "items__sub_items") From 86519545b99371ce0d86121f78a7a8a5adb15e5d Mon Sep 17 00:00:00 2001 From: Filipe Silva Date: Thu, 4 Jun 2026 16:27:24 +0100 Subject: [PATCH 3/4] doc: fix wording on clickhouse --- docs/website/docs/dlt-ecosystem/destinations/clickhouse.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md index 99b96f7d15..cbd4756e60 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md +++ b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md @@ -112,7 +112,7 @@ select_sequential_consistency = 1 # Ensures read-after-wri All [write dispositions](../../general-usage/incremental-loading#choosing-a-write-disposition) are supported. -If you set the [`replace` strategy](../../general-usage/full-loading.md) to `staging-optimized`, the destination tables will be atomically swapped with by the staging tables using via [`EXCHANGE TABLES`](https://clickhouse.com/docs/en/sql-reference/statements/exchange) (requires `Atomic` or `Shared` database engine). +If you set the [`replace` strategy](../../general-usage/full-loading.md) to `staging-optimized`, the destination tables will be atomically swapped with the staging tables via [`EXCHANGE TABLES`](https://clickhouse.com/docs/en/sql-reference/statements/exchange) (requires `Atomic` or `Shared` database engine). ## Data loading From 7b65575aeb2f2f9027c7861da286f89b7ba56c62 Mon Sep 17 00:00:00 2001 From: Filipe Silva Date: Thu, 4 Jun 2026 16:34:10 +0100 Subject: [PATCH 4/4] fix: don't truncate on clickhouse staging replace --- dlt/destinations/impl/clickhouse/clickhouse.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index d0ae27e06f..86ecf884be 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -302,8 +302,8 @@ class ClickHouseStagingReplaceJob(SqlStagingReplaceFollowupJob): """Atomic staging-optimized replace via `EXCHANGE TABLES`. Requires the destination database to use the `Atomic` or `Shared` engine. - The follow-up `TRUNCATE` clears the (now-old) data left in the staging slot - so it does not retain previous production data between loads. + The previous destination data lands in the staging slot after the swap; dlt + truncates staging tables at the start of the next load. """ @classmethod @@ -318,7 +318,6 @@ def generate_sql( staging_table_name = sql_client.make_qualified_table_name(table["name"]) table_name = sql_client.make_qualified_table_name(table["name"]) sql.append(f"EXCHANGE TABLES {staging_table_name} AND {table_name}") - sql.append(f"TRUNCATE TABLE {staging_table_name}") return sql