Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 63 additions & 2 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from copy import deepcopy
from textwrap import dedent
from typing import Any, Dict, Literal, Optional, List, Sequence, cast
from typing import Any, Dict, Iterable, Literal, Optional, List, Sequence, cast
from urllib.parse import ParseResult, urlparse

import clickhouse_connect
Expand Down Expand Up @@ -31,6 +31,8 @@
from dlt.common.storages import FileStorage
from dlt.common.storages.configuration import FilesystemConfiguration, ensure_canonical_az_url
from dlt.common.storages.fsspec_filesystem import AZURE_BLOB_STORAGE_PROTOCOLS
from dlt.common.storages.load_package import ParsedLoadJobFileName
from dlt.common.destination.exceptions import DestinationTerminalException
from dlt.destinations.exceptions import LoadJobTerminalException
from dlt.destinations.impl.clickhouse.configuration import (
ClickHouseClientConfiguration,
Expand Down Expand Up @@ -62,7 +64,7 @@
)
from dlt.destinations.job_impl import ReferenceFollowupJobRequest
from dlt.destinations.sql_client import SqlClientBase
from dlt.destinations.sql_jobs import SqlMergeFollowupJob
from dlt.destinations.sql_jobs import SqlMergeFollowupJob, SqlStagingReplaceFollowupJob
from dlt.destinations.utils import get_deterministic_temp_table_name
from dlt.destinations.path_utils import get_file_format_and_compression

Expand Down Expand Up @@ -296,6 +298,29 @@ def requires_temp_table_for_delete(cls) -> bool:
return False


class ClickHouseStagingReplaceJob(SqlStagingReplaceFollowupJob):
"""Atomic staging-optimized replace via `EXCHANGE TABLES`.

Requires the destination database to use the `Atomic` or `Shared` engine.
The previous destination data lands in the staging slot after the swap; dlt
truncates staging tables at the start of the next load.
"""

@classmethod
def generate_sql(
cls,
table_chain: Sequence[PreparedTableSchema],
sql_client: SqlClientBase[Any],
) -> List[str]:
sql: List[str] = []
for table in table_chain:
with sql_client.with_staging_dataset():
staging_table_name = sql_client.make_qualified_table_name(table["name"])
table_name = sql_client.make_qualified_table_name(table["name"])
sql.append(f"EXCHANGE TABLES {staging_table_name} AND {table_name}")
return sql


class ClickHouseClient(SqlJobClientWithStagingDataset, SupportsStagingDestination):
def __init__(
self,
Expand Down Expand Up @@ -348,6 +373,42 @@ def _create_merge_followup_jobs(
) -> List[FollowupJobRequest]:
return [ClickHouseMergeJob.from_table_chain(table_chain, self.sql_client)]

def _create_replace_followup_jobs(
self, table_chain: Sequence[PreparedTableSchema]
) -> List[FollowupJobRequest]:
root_table = table_chain[0]
if root_table["x-replace-strategy"] == "staging-optimized": # type: ignore[typeddict-item]
return [ClickHouseStagingReplaceJob.from_table_chain(table_chain, self.sql_client)]
return super()._create_replace_followup_jobs(table_chain)

def verify_schema(
self,
only_tables: Iterable[str] = None,
new_jobs: Iterable[ParsedLoadJobFileName] = None,
) -> List[PreparedTableSchema]:
loaded_tables = super().verify_schema(only_tables, new_jobs)
# probe the database engine early so staging-optimized fails fast at init,
# before any data is extracted or loaded into staging
if any(
table.get("x-replace-strategy") == "staging-optimized" for table in loaded_tables
):
self._verify_database_supports_exchange()
return loaded_tables

def _verify_database_supports_exchange(self) -> None:
# `EXCHANGE TABLES` is only supported on Atomic and Shared database engines
result = self.sql_client.execute_sql(
"SELECT engine FROM system.databases WHERE name = currentDatabase()"
)
engine = result[0][0] if result else "unknown"
if engine not in ("Atomic", "Shared"):
raise DestinationTerminalException(
"ClickHouse replace_strategy='staging-optimized' requires the Atomic or Shared"
f" database engine to use EXCHANGE TABLES (current: {engine}). Either choose"
" 'insert-from-staging' or 'truncate-and-insert', or recreate the database with"
" ENGINE = Atomic."
)

def _get_column_def_sql(self, c: TColumnSchema, table: PreparedTableSchema = None) -> str:
# Build column definition.
# The primary key and sort order definition is defined outside column specification.
Expand Down
6 changes: 5 additions & 1 deletion dlt/destinations/impl/clickhouse/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_truncate_command = True

caps.supported_merge_strategies = ["delete-insert", "scd2"]
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]
caps.supported_replace_strategies = [
"truncate-and-insert",
"insert-from-staging",
"staging-optimized",
]
caps.enforces_nulls_on_alter = False

caps.sqlglot_dialect = "clickhouse"
Expand Down
2 changes: 2 additions & 0 deletions docs/website/docs/dlt-ecosystem/destinations/clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ select_sequential_consistency = 1 # Ensures read-after-wri

All [write dispositions](../../general-usage/incremental-loading#choosing-a-write-disposition) are supported.

If you set the [`replace` strategy](../../general-usage/full-loading.md) to `staging-optimized`, the destination tables will be atomically swapped with the staging tables via [`EXCHANGE TABLES`](https://clickhouse.com/docs/en/sql-reference/statements/exchange) (requires `Atomic` or `Shared` database engine).

## Data loading

Data is loaded into ClickHouse using the most efficient method depending on the data source:
Expand Down
27 changes: 27 additions & 0 deletions tests/load/clickhouse/test_clickhouse_configuration.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from typing import Iterator

import pytest
from pytest_mock import MockerFixture

from dlt.common.configuration.resolve import resolve_configuration
from dlt.common.destination.exceptions import DestinationTerminalException
from dlt.common.libs.sql_alchemy_compat import make_url
from dlt.common.utils import custom_environ, digest128
from dlt.destinations import clickhouse
Expand Down Expand Up @@ -146,3 +148,28 @@ def test_client_table_name_and_paths(client: ClickHouseClient) -> None:
parts = client.sql_client.make_qualified_table_name_path("test_table", quote=False)
assert parts[0] in databases
assert parts[1] == f"{dataset_name}###test_table"


@pytest.mark.parametrize("engine", ["Atomic", "Shared"])
def test_staging_optimized_accepts_supported_engines(
client: ClickHouseClient, mocker: MockerFixture, engine: str
) -> None:
mocker.patch.object(client.sql_client, "execute_sql", return_value=[(engine,)])
client._verify_database_supports_exchange()


@pytest.mark.parametrize("engine", ["Ordinary", "Replicated", "Lazy"])
def test_staging_optimized_rejects_unsupported_engines(
client: ClickHouseClient, mocker: MockerFixture, engine: str
) -> None:
mocker.patch.object(client.sql_client, "execute_sql", return_value=[(engine,)])
with pytest.raises(DestinationTerminalException, match="Atomic or Shared"):
client._verify_database_supports_exchange()


def test_staging_optimized_rejects_missing_engine(
client: ClickHouseClient, mocker: MockerFixture
) -> None:
mocker.patch.object(client.sql_client, "execute_sql", return_value=[])
with pytest.raises(DestinationTerminalException, match="unknown"):
client._verify_database_supports_exchange()
7 changes: 6 additions & 1 deletion tests/load/pipeline/test_replace_disposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,11 @@ def test_replace_sql_queries(

destination_spy = mocker.spy(MsSqlStagingReplaceJob, "generate_sql")

elif dest_type == "clickhouse":
from dlt.destinations.impl.clickhouse.clickhouse import ClickHouseStagingReplaceJob

destination_spy = mocker.spy(ClickHouseStagingReplaceJob, "generate_sql")

pipeline = destination_config.setup_pipeline(
f"insert_from_staging_test_{uniq_id()}", dev_mode=True
)
Expand Down Expand Up @@ -441,7 +446,7 @@ def test_replace_sql_queries(
)

elif replace_strategy == "staging-optimized":
if dest_type in ["postgres", "mssql"]:
if dest_type in ["postgres", "mssql", "clickhouse"]:
assert destination_spy.call_count == 1
else:
assert clone_sql_generator_spy.call_count == 1
Expand Down