Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion docs/en/engines/table-engines/special/hybrid.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Typical use cases include:
- Tiered storage, for example fresh data on a local cluster and historical data in S3.
- Gradual roll-outs where only a subset of rows should be served from a new backend.

By giving mutually exclusive predicates to the segments (for example, `date < watermark` and `date >= watermark`), you ensure that each row is read from exactly one source.
By giving mutually exclusive predicates to the segments (for example, `date < watermark` and `date >= watermark`), you ensure that each row is read from exactly one source. To move the boundary at runtime without recreating the table, use [`hybridParam()`](#dynamic-watermarks-with-hybridparam) placeholders in predicates.

## Enable the engine

Expand Down Expand Up @@ -58,6 +58,57 @@ You must pass at least two arguments – the first table function and its predic
- `INSERT` statements are forwarded to the first table function only. If you need multi-destination writes, use explicit `INSERT` statements into the respective sources.
- Align schemas across the segments. ClickHouse builds a common header and rejects creation if any segment misses a column defined in the Hybrid schema. If the physical types differ you may need to add casts on one side or in the query, just as you would when reading from heterogeneous replicas.

## Dynamic watermarks with `hybridParam()`

Hard-coded date literals in predicates work, but changing the boundary requires recreating the table. `hybridParam()` lets you embed a named, typed placeholder in any predicate and manage its value through ordinary engine `SETTINGS`:

```sql
CREATE TABLE tiered
ENGINE = Hybrid(
remote('localhost:9000', currentDatabase(), 'local_hot'),
ts > hybridParam('hybrid_watermark_hot', 'DateTime'),
remote('localhost:9000', currentDatabase(), 'local_cold'),
ts <= hybridParam('hybrid_watermark_hot', 'DateTime')
)
SETTINGS hybrid_watermark_hot = '2025-09-01'
AS local_hot;
```

`hybridParam(name, type)` takes exactly two string-literal arguments:

| Argument | Description |
|----------|-------------|
| `name` | Must start with `hybrid_watermark_`. This is the setting name used in `SETTINGS` and `ALTER`. |
| `type` | A ClickHouse type name (`DateTime`, `Date`, `UInt64`, etc.). The engine validates and deserializes the setting value through this type at CREATE and ALTER time. |

Every `hybridParam()` used in predicates must have a corresponding value in the `SETTINGS` clause. The engine rejects the `CREATE` if any declared watermark name is missing from `SETTINGS`, or if the value cannot be parsed as the declared type.

The same watermark name can appear in multiple predicates (e.g. in both the hot and cold segments). All occurrences must declare the same type.

### Moving the boundary at runtime

Use `ALTER TABLE ... MODIFY SETTING` to change a watermark without recreating the table:

```sql
ALTER TABLE tiered MODIFY SETTING hybrid_watermark_hot = '2025-10-01';
```

The new value takes effect for all subsequent queries immediately. The update is persisted in table metadata and survives `DETACH`/`ATTACH` and server restarts.

Multiple watermarks can be updated in a single statement:

```sql
ALTER TABLE tiered MODIFY SETTING
hybrid_watermark_hot = '2025-11-01',
hybrid_watermark_cold = '2025-08-01';
```

### Restrictions

- Only `hybrid_watermark_*` settings are accepted on Hybrid tables. Regular `DistributedSettings` (e.g. `bytes_to_delay_insert`) are rejected.
- `ALTER TABLE ... RESET SETTING` is not supported on Hybrid tables. Use `MODIFY SETTING` to change a watermark value.
- Watermark names in `SETTINGS` and `ALTER` must exactly match a `hybridParam()` declared in the predicates. Typos are rejected.

## Example: local cluster plus S3 historical tier

The following commands illustrate a two-segment layout. Hot data stays on a local ClickHouse cluster, while historical rows come from public S3 Parquet files.
Expand Down
342 changes: 330 additions & 12 deletions src/Storages/StorageDistributed.cpp

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions src/Storages/StorageDistributed.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
#include <Storages/Distributed/DistributedAsyncInsertDirectoryQueue.h>
#include <Storages/getStructureOfRemoteTable.h>
#include <Columns/IColumn.h>
#include <Common/MultiVersion.h>
#include <Common/SimpleIncrement.h>
#include <Common/ActionBlocker.h>
#include <Interpreters/Cluster.h>

#include <map>
#include <pcg_random.hpp>

namespace DB
Expand Down Expand Up @@ -171,6 +173,15 @@ class StorageDistributed final : public IStorage, WithContext
void setHybridLayout(std::vector<HybridSegment> segments_);
void setCachedColumnsToCast(ColumnsDescription columns);

using WatermarkParams = std::map<String, String>;

static constexpr std::string_view HYBRID_WATERMARK_PREFIX = "hybrid_watermark_";

MultiVersion<WatermarkParams>::Version getHybridWatermarkParams() const
{ return hybrid_watermark_params.get(); }

void loadHybridWatermarkParams(SettingsChanges & changes);

/// Getter methods for ClusterProxy::executeQuery
StorageID getRemoteStorageID() const { return remote_storage; }
ColumnsDescription getColumnsToCast() const;
Expand Down Expand Up @@ -314,6 +325,8 @@ class StorageDistributed final : public IStorage, WithContext

bool is_remote_function;

MultiVersion<WatermarkParams> hybrid_watermark_params;

/// Additional filter expression for Hybrid engine
ASTPtr base_segment_predicate;

Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</clickhouse>
170 changes: 170 additions & 0 deletions tests/integration/test_hybrid_watermarks/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import pytest

from helpers.cluster import ClickHouseCluster

cluster = ClickHouseCluster(__file__)

node1 = cluster.add_instance("node1", main_configs=["configs/remote_servers.xml"])
node2 = cluster.add_instance("node2", main_configs=["configs/remote_servers.xml"])


@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()

for node in (node1, node2):
node.query("SET allow_experimental_hybrid_table = 1")
node.query(
"CREATE TABLE local_hot (ts DateTime, value UInt64) ENGINE = MergeTree ORDER BY ts"
)
node.query(
"CREATE TABLE local_cold (ts DateTime, value UInt64) ENGINE = MergeTree ORDER BY ts"
)

node1.query(
"INSERT INTO local_hot VALUES ('2025-10-15', 1), ('2025-11-01', 2)"
)
node1.query(
"INSERT INTO local_cold VALUES ('2025-08-01', 3), ('2025-06-15', 4)"
)
node2.query(
"INSERT INTO local_hot VALUES ('2025-10-20', 10), ('2025-12-01', 20)"
)
node2.query(
"INSERT INTO local_cold VALUES ('2025-07-01', 30), ('2025-05-01', 40)"
)

yield cluster
finally:
cluster.shutdown()


def q(node, sql):
return node.query(sql, settings={"allow_experimental_hybrid_table": 1}).strip()


def test_create_and_alter_watermark(started_cluster):
"""Create a Hybrid table, verify routing, ALTER the watermark, verify again."""
try:
q(
node1,
"""
CREATE TABLE hybrid_t
ENGINE = Hybrid(
remote('node1:9000', default, local_hot),
ts > hybridParam('hybrid_watermark_hot', 'DateTime'),
remote('node1:9000', default, local_cold),
ts <= hybridParam('hybrid_watermark_hot', 'DateTime')
)
SETTINGS hybrid_watermark_hot = '2025-09-01'
AS local_hot
""",
)

assert q(node1, "SELECT count() FROM hybrid_t") == "4"
assert q(node1, "SELECT count() FROM hybrid_t WHERE ts = '2025-10-15'") == "1"
assert q(node1, "SELECT count() FROM hybrid_t WHERE ts = '2025-08-01'") == "1"

show = q(node1, "SHOW CREATE TABLE hybrid_t")
assert "hybrid_watermark_hot = \\'2025-09-01\\'" in show

q(node1, "ALTER TABLE hybrid_t MODIFY SETTING hybrid_watermark_hot = '2025-10-01'")

assert q(node1, "SELECT count() FROM hybrid_t WHERE ts = '2025-10-15'") == "1"
assert q(node1, "SELECT count() FROM hybrid_t WHERE ts = '2025-08-01'") == "1"

show = q(node1, "SHOW CREATE TABLE hybrid_t")
assert "hybrid_watermark_hot = \\'2025-10-01\\'" in show
finally:
q(node1, "DROP TABLE IF EXISTS hybrid_t SYNC")


def test_detach_attach_persistence(started_cluster):
"""Watermark value survives DETACH/ATTACH cycle."""
try:
q(
node1,
"""
CREATE TABLE hybrid_persist
ENGINE = Hybrid(
remote('node1:9000', default, local_hot),
ts > hybridParam('hybrid_watermark_hot', 'DateTime'),
remote('node1:9000', default, local_cold),
ts <= hybridParam('hybrid_watermark_hot', 'DateTime')
)
SETTINGS hybrid_watermark_hot = '2025-09-01'
AS local_hot
""",
)

q(node1, "ALTER TABLE hybrid_persist MODIFY SETTING hybrid_watermark_hot = '2025-10-01'")

q(node1, "DETACH TABLE hybrid_persist")
q(node1, "ATTACH TABLE hybrid_persist")

show = q(node1, "SHOW CREATE TABLE hybrid_persist")
assert "hybrid_watermark_hot = \\'2025-10-01\\'" in show

assert q(node1, "SELECT count() FROM hybrid_persist") == "4"
finally:
q(node1, "DROP TABLE IF EXISTS hybrid_persist SYNC")


def test_cross_node_create_and_alter(started_cluster):
"""Hybrid table spanning two physical nodes via remote(), with ALTER."""
try:
q(
node1,
"""
CREATE TABLE hybrid_cross
ENGINE = Hybrid(
remote('node1:9000', default, local_hot),
ts > hybridParam('hybrid_watermark_hot', 'DateTime'),
remote('node2:9000', default, local_cold),
ts <= hybridParam('hybrid_watermark_hot', 'DateTime')
)
SETTINGS hybrid_watermark_hot = '2025-09-01'
AS local_hot
""",
)

assert q(node1, "SELECT count() FROM hybrid_cross WHERE ts > '2025-09-01'") == "2"
assert q(node1, "SELECT count() FROM hybrid_cross WHERE ts <= '2025-09-01'") == "2"
assert q(node1, "SELECT count() FROM hybrid_cross") == "4"

q(node1, "ALTER TABLE hybrid_cross MODIFY SETTING hybrid_watermark_hot = '2025-06-01'")

assert q(node1, "SELECT count() FROM hybrid_cross WHERE ts > '2025-06-01'") == "2"
assert q(node1, "SELECT count() FROM hybrid_cross WHERE ts <= '2025-06-01'") == "1"
finally:
q(node1, "DROP TABLE IF EXISTS hybrid_cross SYNC")


def test_cluster_table_function(started_cluster):
"""Hybrid using cluster() table function instead of remote()."""
try:
q(
node1,
"""
CREATE TABLE hybrid_cluster
ENGINE = Hybrid(
cluster('test_cluster', default, local_hot),
ts > hybridParam('hybrid_watermark_hot', 'DateTime'),
cluster('test_cluster', default, local_cold),
ts <= hybridParam('hybrid_watermark_hot', 'DateTime')
)
SETTINGS hybrid_watermark_hot = '2025-09-01'
AS local_hot
""",
)

total = q(node1, "SELECT count() FROM hybrid_cluster")
assert int(total) == 8

q(node1, "ALTER TABLE hybrid_cluster MODIFY SETTING hybrid_watermark_hot = '2025-10-01'")

hot = q(node1, "SELECT count() FROM hybrid_cluster WHERE ts > '2025-10-01'")
assert int(hot) > 0
finally:
q(node1, "DROP TABLE IF EXISTS hybrid_cluster SYNC")
33 changes: 33 additions & 0 deletions tests/queries/0_stateless/03645_hybrid_watermarks.reference
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
--- Test 1: CREATE with watermarks
--- Test 2: SHOW CREATE TABLE
CREATE TABLE default.t\n(\n `ts` DateTime,\n `value` UInt64\n)\nENGINE = Hybrid(remote(\'localhost:9000\', \'default\', \'local_hot\'), ts > hybridParam(\'hybrid_watermark_hot\', \'DateTime\'), remote(\'localhost:9000\', currentDatabase(), \'local_cold\'), ts <= hybridParam(\'hybrid_watermark_hot\', \'DateTime\'))\nSETTINGS hybrid_watermark_hot = \'2025-09-01\'
--- Test 3: First query after CREATE
1
1
--- Test 4: ALTER watermark
--- Test 5: Query with updated boundary
1
--- Test 6: SHOW CREATE after ALTER
CREATE TABLE default.t\n(\n `ts` DateTime,\n `value` UInt64\n)\nENGINE = Hybrid(remote(\'localhost:9000\', \'default\', \'local_hot\'), ts > hybridParam(\'hybrid_watermark_hot\', \'DateTime\'), remote(\'localhost:9000\', currentDatabase(), \'local_cold\'), ts <= hybridParam(\'hybrid_watermark_hot\', \'DateTime\'))\nSETTINGS hybrid_watermark_hot = \'2025-10-01\'
--- Test 7: DETACH/ATTACH persistence
CREATE TABLE default.t\n(\n `ts` DateTime,\n `value` UInt64\n)\nENGINE = Hybrid(remote(\'localhost:9000\', \'default\', \'local_hot\'), ts > hybridParam(\'hybrid_watermark_hot\', \'DateTime\'), remote(\'localhost:9000\', currentDatabase(), \'local_cold\'), ts <= hybridParam(\'hybrid_watermark_hot\', \'DateTime\'))\nSETTINGS hybrid_watermark_hot = \'2025-10-01\'
--- Test 8: Three segments, two watermarks
CREATE TABLE default.t3\n(\n `ts` DateTime,\n `value` UInt64\n)\nENGINE = Hybrid(remote(\'localhost:9000\', \'default\', \'local_hot\'), ts > hybridParam(\'hybrid_watermark_hot\', \'DateTime\'), remote(\'localhost:9000\', currentDatabase(), \'local_warm\'), (ts <= hybridParam(\'hybrid_watermark_hot\', \'DateTime\')) AND (ts > hybridParam(\'hybrid_watermark_cold\', \'DateTime\')), remote(\'localhost:9000\', currentDatabase(), \'local_cold\'), ts <= hybridParam(\'hybrid_watermark_cold\', \'DateTime\'))\nSETTINGS hybrid_watermark_cold = \'2025-07-01\', hybrid_watermark_hot = \'2025-10-01\'
--- Test 9: Reject non-watermark parameter
--- Test 10: Missing watermark SETTINGS rejected at CREATE
--- Test 11: Invalid typed value
--- Test 12: Reject non-watermark MODIFY SETTING
--- Test 13: Reject RESET SETTING on Hybrid
--- Test 14: Alter one preserves the other
CREATE TABLE default.t3\n(\n `ts` DateTime,\n `value` UInt64\n)\nENGINE = Hybrid(remote(\'localhost:9000\', \'default\', \'local_hot\'), ts > hybridParam(\'hybrid_watermark_hot\', \'DateTime\'), remote(\'localhost:9000\', currentDatabase(), \'local_warm\'), (ts <= hybridParam(\'hybrid_watermark_hot\', \'DateTime\')) AND (ts > hybridParam(\'hybrid_watermark_cold\', \'DateTime\')), remote(\'localhost:9000\', currentDatabase(), \'local_cold\'), ts <= hybridParam(\'hybrid_watermark_cold\', \'DateTime\'))\nSETTINGS hybrid_watermark_cold = \'2025-08-01\', hybrid_watermark_hot = \'2025-12-01\'
1
--- Test 15: Reject DistributedSettings at CREATE
--- Test 16: Plain Distributed unaffected
--- Test 17: Value via SETTINGS
1
1
CREATE TABLE default.t_settings_only\n(\n `ts` DateTime,\n `value` UInt64\n)\nENGINE = Hybrid(remote(\'localhost:9000\', \'default\', \'local_hot\'), ts > hybridParam(\'hybrid_watermark_hot\', \'DateTime\'), remote(\'localhost:9000\', currentDatabase(), \'local_cold\'), ts <= hybridParam(\'hybrid_watermark_hot\', \'DateTime\'))\nSETTINGS hybrid_watermark_hot = \'2025-09-01\'
--- Test 18: Conflicting types rejected
--- Test 19: Invalid SETTINGS value rejected at CREATE
--- Test 20: Typo in CREATE SETTINGS rejected
--- Test 21: Typo in ALTER MODIFY SETTING rejected
Loading
Loading