diff --git a/docs/en/engines/table-engines/special/hybrid.md b/docs/en/engines/table-engines/special/hybrid.md index 12df6cd859b8..81dd4d67e364 100644 --- a/docs/en/engines/table-engines/special/hybrid.md +++ b/docs/en/engines/table-engines/special/hybrid.md @@ -20,7 +20,7 @@ Typical use cases include: - Tiered storage, for example fresh data on a local cluster and historical data in S3. - Gradual roll-outs where only a subset of rows should be served from a new backend. -By giving mutually exclusive predicates to the segments (for example, `date < watermark` and `date >= watermark`), you ensure that each row is read from exactly one source. +By giving mutually exclusive predicates to the segments (for example, `date < watermark` and `date >= watermark`), you ensure that each row is read from exactly one source. To move the boundary at runtime without recreating the table, use [`hybridParam()`](#dynamic-watermarks-with-hybridparam) placeholders in predicates. ## Enable the engine @@ -58,6 +58,57 @@ You must pass at least two arguments – the first table function and its predic - `INSERT` statements are forwarded to the first table function only. If you need multi-destination writes, use explicit `INSERT` statements into the respective sources. - Align schemas across the segments. ClickHouse builds a common header and rejects creation if any segment misses a column defined in the Hybrid schema. If the physical types differ you may need to add casts on one side or in the query, just as you would when reading from heterogeneous replicas. +## Dynamic watermarks with `hybridParam()` + +Hard-coded date literals in predicates work, but changing the boundary requires recreating the table. `hybridParam()` lets you embed a named, typed placeholder in any predicate and manage its value through ordinary engine `SETTINGS`: + +```sql +CREATE TABLE tiered +ENGINE = Hybrid( + remote('localhost:9000', currentDatabase(), 'local_hot'), + ts > hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('localhost:9000', currentDatabase(), 'local_cold'), + ts <= hybridParam('hybrid_watermark_hot', 'DateTime') +) +SETTINGS hybrid_watermark_hot = '2025-09-01' +AS local_hot; +``` + +`hybridParam(name, type)` takes exactly two string-literal arguments: + +| Argument | Description | +|----------|-------------| +| `name` | Must start with `hybrid_watermark_`. This is the setting name used in `SETTINGS` and `ALTER`. | +| `type` | A ClickHouse type name (`DateTime`, `Date`, `UInt64`, etc.). The engine validates and deserializes the setting value through this type at CREATE and ALTER time. | + +Every `hybridParam()` used in predicates must have a corresponding value in the `SETTINGS` clause. The engine rejects the `CREATE` if any declared watermark name is missing from `SETTINGS`, or if the value cannot be parsed as the declared type. + +The same watermark name can appear in multiple predicates (e.g. in both the hot and cold segments). All occurrences must declare the same type. + +### Moving the boundary at runtime + +Use `ALTER TABLE ... MODIFY SETTING` to change a watermark without recreating the table: + +```sql +ALTER TABLE tiered MODIFY SETTING hybrid_watermark_hot = '2025-10-01'; +``` + +The new value takes effect for all subsequent queries immediately. The update is persisted in table metadata and survives `DETACH`/`ATTACH` and server restarts. + +Multiple watermarks can be updated in a single statement: + +```sql +ALTER TABLE tiered MODIFY SETTING + hybrid_watermark_hot = '2025-11-01', + hybrid_watermark_cold = '2025-08-01'; +``` + +### Restrictions + +- Only `hybrid_watermark_*` settings are accepted on Hybrid tables. Regular `DistributedSettings` (e.g. `bytes_to_delay_insert`) are rejected. +- `ALTER TABLE ... RESET SETTING` is not supported on Hybrid tables. Use `MODIFY SETTING` to change a watermark value. +- Watermark names in `SETTINGS` and `ALTER` must exactly match a `hybridParam()` declared in the predicates. Typos are rejected. + ## Example: local cluster plus S3 historical tier The following commands illustrate a two-segment layout. Hot data stays on a local ClickHouse cluster, while historical rows come from public S3 Parquet files. diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 5d8c06418467..2d6c0255ef14 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -27,6 +27,7 @@ #include +#include #include #include #include @@ -41,6 +42,7 @@ #include #include #include +#include #include #include #include @@ -109,6 +111,8 @@ #include #include +#include +#include #include #include #include @@ -1225,6 +1229,48 @@ void StorageDistributed::read( LOG_TRACE(log, "rewriteSelectQuery (target: {}) -> {}", target, ast->formatForLogging()); }; + auto watermark_snapshot = hybrid_watermark_params.get(); + + auto substitute_watermarks = [&](ASTPtr predicate_ast) -> ASTPtr + { + if (!predicate_ast) + return predicate_ast; + predicate_ast = predicate_ast->clone(); + + std::function replace_hybrid_params = [&](ASTPtr & node) + { + if (auto * func = node->as(); func && func->name == "hybridParam") + { + auto * arg_list = func->arguments->as(); + const auto & param_name = arg_list->children[0]->as()->value.safeGet(); + const auto & type_name = arg_list->children[1]->as()->value.safeGet(); + + if (!watermark_snapshot) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Hybrid watermark '{}' has no value; use ALTER TABLE ... MODIFY SETTING {} = '...' to set it", + param_name, param_name); + + auto it = watermark_snapshot->find(param_name); + if (it == watermark_snapshot->end()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Hybrid watermark '{}' has no value; use ALTER TABLE ... MODIFY SETTING {} = '...' to set it", + param_name, param_name); + + auto data_type = DataTypeFactory::instance().get(type_name); + auto col = data_type->createColumn(); + ReadBufferFromString buf(it->second); + data_type->getDefaultSerialization()->deserializeWholeText(*col, buf, FormatSettings{}); + node = make_intrusive((*col)[0]); + return; + } + + for (auto & child : node->children) + replace_hybrid_params(child); + }; + replace_hybrid_params(predicate_ast); + return predicate_ast; + }; + if (settings[Setting::allow_experimental_analyzer]) { StorageID remote_storage_id = StorageID::createEmpty(); @@ -1235,7 +1281,7 @@ void StorageDistributed::read( query_info.initial_storage_snapshot ? query_info.initial_storage_snapshot : storage_snapshot, remote_storage_id, remote_table_function_ptr, - base_segment_predicate); + substitute_watermarks(base_segment_predicate)); Block block = *InterpreterSelectQueryAnalyzer::getSampleBlock(query_tree_distributed, local_context, SelectQueryOptions(processed_stage).analyze()); /** For distributed tables we do not need constants in header, since we don't send them to remote servers. * Moreover, constants can break some functions like `hostName` that are constants only for local queries. @@ -1261,7 +1307,7 @@ void StorageDistributed::read( query_info.initial_storage_snapshot ? query_info.initial_storage_snapshot : storage_snapshot, segment.storage_id ? *segment.storage_id : StorageID::createEmpty(), segment.storage_id ? nullptr : segment.table_function_ast, - segment.predicate_ast); + substitute_watermarks(segment.predicate_ast)); additional_query_info.query = queryNodeToDistributedSelectQuery(additional_query_tree); additional_query_info.query_tree = std::move(additional_query_tree); @@ -1282,7 +1328,7 @@ void StorageDistributed::read( modified_query_info.query = ClusterProxy::rewriteSelectQuery( local_context, modified_query_info.query, remote_database, remote_table, remote_table_function_ptr, - base_segment_predicate); + substitute_watermarks(base_segment_predicate)); log_rewritten_query(base_target, modified_query_info.query); if (!segments.empty()) @@ -1291,20 +1337,21 @@ void StorageDistributed::read( { SelectQueryInfo additional_query_info = query_info; + ASTPtr resolved_predicate = substitute_watermarks(segment.predicate_ast); if (segment.storage_id) { additional_query_info.query = ClusterProxy::rewriteSelectQuery( local_context, additional_query_info.query, segment.storage_id->database_name, segment.storage_id->table_name, nullptr, - segment.predicate_ast); + resolved_predicate); } else { additional_query_info.query = ClusterProxy::rewriteSelectQuery( local_context, additional_query_info.query, "", "", segment.table_function_ast, - segment.predicate_ast); + resolved_predicate); } log_rewritten_query(describe_segment_target(segment), additional_query_info.query); @@ -1701,11 +1748,92 @@ std::optional StorageDistributed::distributedWrite(const ASTInser } +/// Extract declared hybridParam types from all Hybrid predicate ASTs. +static std::unordered_map collectHybridParamTypes( + const ASTPtr & base_predicate, const std::vector & segs) +{ + std::unordered_map result; + std::function walk = [&](const ASTPtr & node) + { + if (!node) return; + if (auto * func = node->as(); func && func->name == "hybridParam") + { + auto * arg_list = func->arguments ? func->arguments->as() : nullptr; + if (arg_list && arg_list->children.size() >= 2) + { + auto * name_lit = arg_list->children[0]->as(); + auto * type_lit = arg_list->children[1]->as(); + if (name_lit && type_lit) + result.emplace(name_lit->value.safeGet(), type_lit->value.safeGet()); + } + return; + } + for (const auto & child : node->children) + walk(child); + }; + walk(base_predicate); + for (const auto & seg : segs) + walk(seg.predicate_ast); + return result; +} + void StorageDistributed::checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const { std::optional name_deps{}; for (const auto & command : commands) { + if (command.type == AlterCommand::Type::MODIFY_SETTING) + { + if (getName() != "Hybrid") + throw Exception(ErrorCodes::NOT_IMPLEMENTED, + "Alter of type '{}' is not supported by storage {}", command.type, getName()); + + auto declared_types = collectHybridParamTypes(base_segment_predicate, segments); + + for (const auto & change : command.settings_changes) + { + if (!change.name.starts_with(HYBRID_WATERMARK_PREFIX)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "ALTER MODIFY SETTING on Hybrid tables currently only supports " + "'hybrid_watermark_*' settings, got '{}'", change.name); + + auto type_it = declared_types.find(change.name); + if (type_it == declared_types.end()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "ALTER MODIFY SETTING name '{}' does not match any declared hybridParam(); " + "check for typos in the watermark name", + change.name); + + const auto & type_name = type_it->second; + auto value_str = convertFieldToString(change.value); + auto data_type = DataTypeFactory::instance().get(type_name); + try + { + auto col = data_type->createColumn(); + ReadBufferFromString buf(value_str); + data_type->getDefaultSerialization()->deserializeWholeText(*col, buf, FormatSettings{}); + } + catch (const Exception & e) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "ALTER MODIFY SETTING value for '{}' is not valid for declared type '{}': {}", + change.name, type_name, e.message()); + } + } + continue; + } + + if (command.type == AlterCommand::Type::RESET_SETTING) + { + if (getName() == "Hybrid") + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "ALTER RESET SETTING is not supported on Hybrid tables " + "(use MODIFY SETTING to change the watermark value instead)"); + + throw Exception(ErrorCodes::NOT_IMPLEMENTED, + "Alter of type '{}' is not supported by storage {}", command.type, getName()); + } + if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::MODIFY_COLUMN && command.type != AlterCommand::Type::DROP_COLUMN && command.type != AlterCommand::Type::COMMENT_COLUMN && command.type != AlterCommand::Type::RENAME_COLUMN && command.type != AlterCommand::Type::COMMENT_TABLE) @@ -1713,7 +1841,7 @@ void StorageDistributed::checkAlterIsPossible(const AlterCommands & commands, Co throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Alter of type '{}' is not supported by storage {}", command.type, getName()); - if (command.type == AlterCommand::DROP_COLUMN && !command.clear) + if (command.type == AlterCommand::Type::DROP_COLUMN && !command.clear) { if (!name_deps) name_deps = getDependentViewsByColumn(local_context); @@ -1739,10 +1867,39 @@ void StorageDistributed::alter(const AlterCommands & params, ContextPtr local_co checkAlterIsPossible(params, local_context); StorageInMemoryMetadata new_metadata = getInMemoryMetadata(); params.apply(new_metadata, local_context); - DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata, /*validate_new_create_query=*/true); + + DatabaseCatalog::instance() + .getDatabase(table_id.database_name) + ->alterTable(local_context, table_id, new_metadata, /*validate_new_create_query=*/true); + + /// Publish Hybrid watermark snapshot before metadata so concurrent + /// readers never observe new metadata with stale watermark values. + if (getName() == "Hybrid" && new_metadata.settings_changes) + { + SettingsChanges changes_copy = + new_metadata.settings_changes->as().changes; + loadHybridWatermarkParams(changes_copy); + } + setInMemoryMetadata(new_metadata); } +void StorageDistributed::loadHybridWatermarkParams(SettingsChanges & changes) +{ + auto new_params = std::make_unique(); + SettingsChanges remaining; + remaining.reserve(changes.size()); + for (auto & change : changes) + { + if (change.name.starts_with(HYBRID_WATERMARK_PREFIX)) + (*new_params)[change.name] = convertFieldToString(change.value); + else + remaining.push_back(std::move(change)); + } + changes = std::move(remaining); + hybrid_watermark_params.set(std::move(new_params)); +} + void StorageDistributed::initializeFromDisk() { if (!storage_policy) @@ -2585,12 +2742,92 @@ void registerStorageHybrid(StorageFactory & factory) "TableFunctionRemote did not return a StorageDistributed or StorageProxy, got: {}", actual_type); } + /// Declared types per watermark name — enforces a single type contract. + std::unordered_map hybridparam_declared_types; + /// Effective watermark values from SETTINGS, keyed by name. + std::unordered_map effective_watermark_values; + + /// First pass: collect declared hybridParam() names and types from a predicate AST. + std::function collect_hybrid_params = [&](const ASTPtr & node) + { + if (auto * func = node->as(); func && func->name == "hybridParam") + { + auto * arg_list = func->arguments ? func->arguments->as() : nullptr; + if (!arg_list || arg_list->children.size() != 2) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "hybridParam() requires exactly 2 arguments: (name, type)"); + + auto * name_lit = arg_list->children[0]->as(); + auto * type_lit = arg_list->children[1]->as(); + if (!name_lit || name_lit->value.getType() != Field::Types::String) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "hybridParam() first argument (name) must be a string literal"); + if (!type_lit || type_lit->value.getType() != Field::Types::String) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "hybridParam() second argument (type) must be a string literal"); + + const auto & param_name = name_lit->value.safeGet(); + const auto & type_name = type_lit->value.safeGet(); + + if (!param_name.starts_with(StorageDistributed::HYBRID_WATERMARK_PREFIX)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "hybridParam() name '{}' must start with '{}'; " + "only watermark parameters are supported", + param_name, String(StorageDistributed::HYBRID_WATERMARK_PREFIX)); + + auto [it, inserted] = hybridparam_declared_types.emplace(param_name, type_name); + if (!inserted && it->second != type_name) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "hybridParam() type conflict for '{}': " + "'{}' vs '{}'; all occurrences must declare the same type", + param_name, it->second, type_name); + return; + } + + for (const auto & child : node->children) + collect_hybrid_params(child); + }; + + /// Second pass: substitute hybridParam() with effective values and run ExpressionAnalyzer. auto validate_predicate = [&](ASTPtr & predicate, size_t argument_index) { + ASTPtr predicate_for_validation = predicate->clone(); + + std::function substitute_params = [&](ASTPtr & node) + { + if (auto * func = node->as(); func && func->name == "hybridParam") + { + auto * arg_list = func->arguments->as(); + const auto & param_name = arg_list->children[0]->as()->value.safeGet(); + const auto & type_name = arg_list->children[1]->as()->value.safeGet(); + + auto data_type = DataTypeFactory::instance().get(type_name); + auto val_it = effective_watermark_values.find(param_name); + if (val_it != effective_watermark_values.end()) + { + auto col = data_type->createColumn(); + ReadBufferFromString buf(val_it->second); + data_type->getDefaultSerialization()->deserializeWholeText(*col, buf, FormatSettings{}); + node = make_intrusive((*col)[0]); + } + else + { + auto col = data_type->createColumn(); + col->insertDefault(); + node = make_intrusive((*col)[0]); + } + return; + } + + for (auto & child : node->children) + substitute_params(child); + }; + substitute_params(predicate_for_validation); + try { - auto syntax_result = TreeRewriter(local_context).analyze(predicate, physical_columns); - ExpressionAnalyzer(predicate, syntax_result, local_context).getActions(true); + auto syntax_result = TreeRewriter(local_context).analyze(predicate_for_validation, physical_columns); + ExpressionAnalyzer(predicate_for_validation, syntax_result, local_context).getActions(true); } catch (const Exception & e) { @@ -2600,7 +2837,7 @@ void registerStorageHybrid(StorageFactory & factory) }; ASTPtr second_arg = engine_args[1]; - validate_predicate(second_arg, 1); + collect_hybrid_params(second_arg); distributed_storage->setBaseSegmentPredicate(second_arg); // Parse additional table function pairs (if any) @@ -2614,7 +2851,7 @@ void registerStorageHybrid(StorageFactory & factory) ASTPtr table_function_ast = engine_args[i]; ASTPtr predicate_ast = engine_args[i + 1]; - validate_predicate(predicate_ast, i + 1); + collect_hybrid_params(predicate_ast); // Validate table function or table identifier if (const auto * func = table_function_ast->as()) @@ -2745,13 +2982,94 @@ void registerStorageHybrid(StorageFactory & factory) distributed_storage->setCachedColumnsToCast(ColumnsDescription(cast_cols)); } + /// Validate SETTINGS and build effective watermark values map. + if (args.storage_def->settings) + { + for (const auto & change : + args.storage_def->settings->as().changes) + { + if (!change.name.starts_with(StorageDistributed::HYBRID_WATERMARK_PREFIX)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Hybrid tables only support 'hybrid_watermark_*' engine settings, " + "got '{}'", change.name); + + auto type_it = hybridparam_declared_types.find(change.name); + if (type_it == hybridparam_declared_types.end()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Hybrid SETTINGS name '{}' does not match any declared hybridParam(); " + "check for typos in the watermark name", + change.name); + + const auto & type_name = type_it->second; + auto value_str = convertFieldToString(change.value); + auto data_type = DataTypeFactory::instance().get(type_name); + try + { + auto col = data_type->createColumn(); + ReadBufferFromString buf(value_str); + data_type->getDefaultSerialization()->deserializeWholeText(*col, buf, FormatSettings{}); + } + catch (const Exception & e) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "SETTINGS value for '{}' is not valid for declared type '{}': {}", + change.name, type_name, e.message()); + } + + effective_watermark_values[change.name] = value_str; + } + } + + /// Every declared hybridParam() must have a value in SETTINGS. + for (const auto & [name, type_name] : hybridparam_declared_types) + { + if (!effective_watermark_values.contains(name)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "hybridParam('{}', '{}') has no value; " + "add SETTINGS {} = '...' to the CREATE TABLE statement", + name, type_name, name); + } + + /// Now validate all predicates with effective SETTINGS values substituted. + validate_predicate(second_arg, 1); + for (size_t i = 2; i < engine_args.size(); i += 2) + validate_predicate(engine_args[i + 1], i + 1); + + /// Build watermark changes from effective values and load into runtime. + SettingsChanges watermark_changes; + for (const auto & [name, value] : effective_watermark_values) + watermark_changes.push_back({name, value}); + distributed_storage->loadHybridWatermarkParams(watermark_changes); + + /// Rebuild the SETTINGS AST from the runtime watermark snapshot so metadata is authoritative. + auto settings_ast = make_intrusive(); + settings_ast->is_standalone = false; + auto snapshot = distributed_storage->getHybridWatermarkParams(); + if (snapshot) + for (const auto & [name, value] : *snapshot) + settings_ast->changes.push_back({name, value}); + + if (!settings_ast->changes.empty()) + { + ASTPtr settings_ptr = settings_ast; + args.storage_def->set(args.storage_def->settings, settings_ptr); + + StorageInMemoryMetadata metadata = distributed_storage->getInMemoryMetadata(); + metadata.setSettingsChanges(args.storage_def->settings->clone()); + distributed_storage->setInMemoryMetadata(metadata); + } + return distributed_storage; }, { - .supports_settings = false, + .supports_settings = true, .supports_parallel_insert = true, .supports_schema_inference = true, .source_access_type = AccessTypeObjects::Source::REMOTE, + .has_builtin_setting_fn = [](std::string_view name) -> bool + { + return name.starts_with(StorageDistributed::HYBRID_WATERMARK_PREFIX); + }, }); } diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index d3906a6dd8d9..138a49d9c992 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -4,10 +4,12 @@ #include #include #include +#include #include #include #include +#include #include namespace DB @@ -171,6 +173,15 @@ class StorageDistributed final : public IStorage, WithContext void setHybridLayout(std::vector segments_); void setCachedColumnsToCast(ColumnsDescription columns); + using WatermarkParams = std::map; + + static constexpr std::string_view HYBRID_WATERMARK_PREFIX = "hybrid_watermark_"; + + MultiVersion::Version getHybridWatermarkParams() const + { return hybrid_watermark_params.get(); } + + void loadHybridWatermarkParams(SettingsChanges & changes); + /// Getter methods for ClusterProxy::executeQuery StorageID getRemoteStorageID() const { return remote_storage; } ColumnsDescription getColumnsToCast() const; @@ -314,6 +325,8 @@ class StorageDistributed final : public IStorage, WithContext bool is_remote_function; + MultiVersion hybrid_watermark_params; + /// Additional filter expression for Hybrid engine ASTPtr base_segment_predicate; diff --git a/tests/integration/test_hybrid_watermarks/__init__.py b/tests/integration/test_hybrid_watermarks/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/integration/test_hybrid_watermarks/configs/remote_servers.xml b/tests/integration/test_hybrid_watermarks/configs/remote_servers.xml new file mode 100644 index 000000000000..68b420f36b4d --- /dev/null +++ b/tests/integration/test_hybrid_watermarks/configs/remote_servers.xml @@ -0,0 +1,18 @@ + + + + + + node1 + 9000 + + + + + node2 + 9000 + + + + + diff --git a/tests/integration/test_hybrid_watermarks/test.py b/tests/integration/test_hybrid_watermarks/test.py new file mode 100644 index 000000000000..4e80a5988494 --- /dev/null +++ b/tests/integration/test_hybrid_watermarks/test.py @@ -0,0 +1,170 @@ +import pytest + +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) + +node1 = cluster.add_instance("node1", main_configs=["configs/remote_servers.xml"]) +node2 = cluster.add_instance("node2", main_configs=["configs/remote_servers.xml"]) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + + for node in (node1, node2): + node.query("SET allow_experimental_hybrid_table = 1") + node.query( + "CREATE TABLE local_hot (ts DateTime, value UInt64) ENGINE = MergeTree ORDER BY ts" + ) + node.query( + "CREATE TABLE local_cold (ts DateTime, value UInt64) ENGINE = MergeTree ORDER BY ts" + ) + + node1.query( + "INSERT INTO local_hot VALUES ('2025-10-15', 1), ('2025-11-01', 2)" + ) + node1.query( + "INSERT INTO local_cold VALUES ('2025-08-01', 3), ('2025-06-15', 4)" + ) + node2.query( + "INSERT INTO local_hot VALUES ('2025-10-20', 10), ('2025-12-01', 20)" + ) + node2.query( + "INSERT INTO local_cold VALUES ('2025-07-01', 30), ('2025-05-01', 40)" + ) + + yield cluster + finally: + cluster.shutdown() + + +def q(node, sql): + return node.query(sql, settings={"allow_experimental_hybrid_table": 1}).strip() + + +def test_create_and_alter_watermark(started_cluster): + """Create a Hybrid table, verify routing, ALTER the watermark, verify again.""" + try: + q( + node1, + """ + CREATE TABLE hybrid_t + ENGINE = Hybrid( + remote('node1:9000', default, local_hot), + ts > hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('node1:9000', default, local_cold), + ts <= hybridParam('hybrid_watermark_hot', 'DateTime') + ) + SETTINGS hybrid_watermark_hot = '2025-09-01' + AS local_hot + """, + ) + + assert q(node1, "SELECT count() FROM hybrid_t") == "4" + assert q(node1, "SELECT count() FROM hybrid_t WHERE ts = '2025-10-15'") == "1" + assert q(node1, "SELECT count() FROM hybrid_t WHERE ts = '2025-08-01'") == "1" + + show = q(node1, "SHOW CREATE TABLE hybrid_t") + assert "hybrid_watermark_hot = \\'2025-09-01\\'" in show + + q(node1, "ALTER TABLE hybrid_t MODIFY SETTING hybrid_watermark_hot = '2025-10-01'") + + assert q(node1, "SELECT count() FROM hybrid_t WHERE ts = '2025-10-15'") == "1" + assert q(node1, "SELECT count() FROM hybrid_t WHERE ts = '2025-08-01'") == "1" + + show = q(node1, "SHOW CREATE TABLE hybrid_t") + assert "hybrid_watermark_hot = \\'2025-10-01\\'" in show + finally: + q(node1, "DROP TABLE IF EXISTS hybrid_t SYNC") + + +def test_detach_attach_persistence(started_cluster): + """Watermark value survives DETACH/ATTACH cycle.""" + try: + q( + node1, + """ + CREATE TABLE hybrid_persist + ENGINE = Hybrid( + remote('node1:9000', default, local_hot), + ts > hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('node1:9000', default, local_cold), + ts <= hybridParam('hybrid_watermark_hot', 'DateTime') + ) + SETTINGS hybrid_watermark_hot = '2025-09-01' + AS local_hot + """, + ) + + q(node1, "ALTER TABLE hybrid_persist MODIFY SETTING hybrid_watermark_hot = '2025-10-01'") + + q(node1, "DETACH TABLE hybrid_persist") + q(node1, "ATTACH TABLE hybrid_persist") + + show = q(node1, "SHOW CREATE TABLE hybrid_persist") + assert "hybrid_watermark_hot = \\'2025-10-01\\'" in show + + assert q(node1, "SELECT count() FROM hybrid_persist") == "4" + finally: + q(node1, "DROP TABLE IF EXISTS hybrid_persist SYNC") + + +def test_cross_node_create_and_alter(started_cluster): + """Hybrid table spanning two physical nodes via remote(), with ALTER.""" + try: + q( + node1, + """ + CREATE TABLE hybrid_cross + ENGINE = Hybrid( + remote('node1:9000', default, local_hot), + ts > hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('node2:9000', default, local_cold), + ts <= hybridParam('hybrid_watermark_hot', 'DateTime') + ) + SETTINGS hybrid_watermark_hot = '2025-09-01' + AS local_hot + """, + ) + + assert q(node1, "SELECT count() FROM hybrid_cross WHERE ts > '2025-09-01'") == "2" + assert q(node1, "SELECT count() FROM hybrid_cross WHERE ts <= '2025-09-01'") == "2" + assert q(node1, "SELECT count() FROM hybrid_cross") == "4" + + q(node1, "ALTER TABLE hybrid_cross MODIFY SETTING hybrid_watermark_hot = '2025-06-01'") + + assert q(node1, "SELECT count() FROM hybrid_cross WHERE ts > '2025-06-01'") == "2" + assert q(node1, "SELECT count() FROM hybrid_cross WHERE ts <= '2025-06-01'") == "1" + finally: + q(node1, "DROP TABLE IF EXISTS hybrid_cross SYNC") + + +def test_cluster_table_function(started_cluster): + """Hybrid using cluster() table function instead of remote().""" + try: + q( + node1, + """ + CREATE TABLE hybrid_cluster + ENGINE = Hybrid( + cluster('test_cluster', default, local_hot), + ts > hybridParam('hybrid_watermark_hot', 'DateTime'), + cluster('test_cluster', default, local_cold), + ts <= hybridParam('hybrid_watermark_hot', 'DateTime') + ) + SETTINGS hybrid_watermark_hot = '2025-09-01' + AS local_hot + """, + ) + + total = q(node1, "SELECT count() FROM hybrid_cluster") + assert int(total) == 8 + + q(node1, "ALTER TABLE hybrid_cluster MODIFY SETTING hybrid_watermark_hot = '2025-10-01'") + + hot = q(node1, "SELECT count() FROM hybrid_cluster WHERE ts > '2025-10-01'") + assert int(hot) > 0 + finally: + q(node1, "DROP TABLE IF EXISTS hybrid_cluster SYNC") diff --git a/tests/queries/0_stateless/03645_hybrid_watermarks.reference b/tests/queries/0_stateless/03645_hybrid_watermarks.reference new file mode 100644 index 000000000000..144780c0d6a4 --- /dev/null +++ b/tests/queries/0_stateless/03645_hybrid_watermarks.reference @@ -0,0 +1,33 @@ +--- Test 1: CREATE with watermarks +--- Test 2: SHOW CREATE TABLE +CREATE TABLE default.t\n(\n `ts` DateTime,\n `value` UInt64\n)\nENGINE = Hybrid(remote(\'localhost:9000\', \'default\', \'local_hot\'), ts > hybridParam(\'hybrid_watermark_hot\', \'DateTime\'), remote(\'localhost:9000\', currentDatabase(), \'local_cold\'), ts <= hybridParam(\'hybrid_watermark_hot\', \'DateTime\'))\nSETTINGS hybrid_watermark_hot = \'2025-09-01\' +--- Test 3: First query after CREATE +1 +1 +--- Test 4: ALTER watermark +--- Test 5: Query with updated boundary +1 +--- Test 6: SHOW CREATE after ALTER +CREATE TABLE default.t\n(\n `ts` DateTime,\n `value` UInt64\n)\nENGINE = Hybrid(remote(\'localhost:9000\', \'default\', \'local_hot\'), ts > hybridParam(\'hybrid_watermark_hot\', \'DateTime\'), remote(\'localhost:9000\', currentDatabase(), \'local_cold\'), ts <= hybridParam(\'hybrid_watermark_hot\', \'DateTime\'))\nSETTINGS hybrid_watermark_hot = \'2025-10-01\' +--- Test 7: DETACH/ATTACH persistence +CREATE TABLE default.t\n(\n `ts` DateTime,\n `value` UInt64\n)\nENGINE = Hybrid(remote(\'localhost:9000\', \'default\', \'local_hot\'), ts > hybridParam(\'hybrid_watermark_hot\', \'DateTime\'), remote(\'localhost:9000\', currentDatabase(), \'local_cold\'), ts <= hybridParam(\'hybrid_watermark_hot\', \'DateTime\'))\nSETTINGS hybrid_watermark_hot = \'2025-10-01\' +--- Test 8: Three segments, two watermarks +CREATE TABLE default.t3\n(\n `ts` DateTime,\n `value` UInt64\n)\nENGINE = Hybrid(remote(\'localhost:9000\', \'default\', \'local_hot\'), ts > hybridParam(\'hybrid_watermark_hot\', \'DateTime\'), remote(\'localhost:9000\', currentDatabase(), \'local_warm\'), (ts <= hybridParam(\'hybrid_watermark_hot\', \'DateTime\')) AND (ts > hybridParam(\'hybrid_watermark_cold\', \'DateTime\')), remote(\'localhost:9000\', currentDatabase(), \'local_cold\'), ts <= hybridParam(\'hybrid_watermark_cold\', \'DateTime\'))\nSETTINGS hybrid_watermark_cold = \'2025-07-01\', hybrid_watermark_hot = \'2025-10-01\' +--- Test 9: Reject non-watermark parameter +--- Test 10: Missing watermark SETTINGS rejected at CREATE +--- Test 11: Invalid typed value +--- Test 12: Reject non-watermark MODIFY SETTING +--- Test 13: Reject RESET SETTING on Hybrid +--- Test 14: Alter one preserves the other +CREATE TABLE default.t3\n(\n `ts` DateTime,\n `value` UInt64\n)\nENGINE = Hybrid(remote(\'localhost:9000\', \'default\', \'local_hot\'), ts > hybridParam(\'hybrid_watermark_hot\', \'DateTime\'), remote(\'localhost:9000\', currentDatabase(), \'local_warm\'), (ts <= hybridParam(\'hybrid_watermark_hot\', \'DateTime\')) AND (ts > hybridParam(\'hybrid_watermark_cold\', \'DateTime\')), remote(\'localhost:9000\', currentDatabase(), \'local_cold\'), ts <= hybridParam(\'hybrid_watermark_cold\', \'DateTime\'))\nSETTINGS hybrid_watermark_cold = \'2025-08-01\', hybrid_watermark_hot = \'2025-12-01\' +1 +--- Test 15: Reject DistributedSettings at CREATE +--- Test 16: Plain Distributed unaffected +--- Test 17: Value via SETTINGS +1 +1 +CREATE TABLE default.t_settings_only\n(\n `ts` DateTime,\n `value` UInt64\n)\nENGINE = Hybrid(remote(\'localhost:9000\', \'default\', \'local_hot\'), ts > hybridParam(\'hybrid_watermark_hot\', \'DateTime\'), remote(\'localhost:9000\', currentDatabase(), \'local_cold\'), ts <= hybridParam(\'hybrid_watermark_hot\', \'DateTime\'))\nSETTINGS hybrid_watermark_hot = \'2025-09-01\' +--- Test 18: Conflicting types rejected +--- Test 19: Invalid SETTINGS value rejected at CREATE +--- Test 20: Typo in CREATE SETTINGS rejected +--- Test 21: Typo in ALTER MODIFY SETTING rejected diff --git a/tests/queries/0_stateless/03645_hybrid_watermarks.sql b/tests/queries/0_stateless/03645_hybrid_watermarks.sql new file mode 100644 index 000000000000..dee569f887ce --- /dev/null +++ b/tests/queries/0_stateless/03645_hybrid_watermarks.sql @@ -0,0 +1,259 @@ +-- Tags: no-fasttest +-- Tag no-fasttest: requires remote() table function + +SET allow_experimental_hybrid_table = 1; + +DROP TABLE IF EXISTS local_hot SYNC; +DROP TABLE IF EXISTS local_cold SYNC; +DROP TABLE IF EXISTS local_warm SYNC; +DROP TABLE IF EXISTS t SYNC; +DROP TABLE IF EXISTS t3 SYNC; +DROP TABLE IF EXISTS t_bad_param SYNC; +DROP TABLE IF EXISTS t_missing SYNC; +DROP TABLE IF EXISTS t_dist_setting SYNC; +DROP TABLE IF EXISTS t_settings_only SYNC; +DROP TABLE IF EXISTS t_conflict SYNC; +DROP TABLE IF EXISTS dist SYNC; + +CREATE TABLE local_hot (ts DateTime, value UInt64) ENGINE = MergeTree ORDER BY ts; +CREATE TABLE local_cold (ts DateTime, value UInt64) ENGINE = MergeTree ORDER BY ts; +CREATE TABLE local_warm (ts DateTime, value UInt64) ENGINE = MergeTree ORDER BY ts; + +INSERT INTO local_hot VALUES ('2025-10-15', 1), ('2025-11-01', 2); +INSERT INTO local_cold VALUES ('2025-08-01', 3), ('2025-06-15', 4); +INSERT INTO local_warm VALUES ('2025-09-01', 5), ('2025-09-15', 6); + +-- ===================================================== +-- 1. CREATE with watermarks — basic two-segment case +-- ===================================================== +SELECT '--- Test 1: CREATE with watermarks'; +CREATE TABLE t +ENGINE = Hybrid( + remote('localhost:9000', currentDatabase(), 'local_hot'), + ts > hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('localhost:9000', currentDatabase(), 'local_cold'), + ts <= hybridParam('hybrid_watermark_hot', 'DateTime') +) +SETTINGS hybrid_watermark_hot = '2025-09-01' +AS local_hot; + +-- ===================================================== +-- 2. Verify SHOW CREATE TABLE shows template + setting +-- ===================================================== +SELECT '--- Test 2: SHOW CREATE TABLE'; +SHOW CREATE TABLE t; + +-- ===================================================== +-- 3. First query after CREATE works +-- ===================================================== +SELECT '--- Test 3: First query after CREATE'; +SELECT count() FROM t WHERE ts = '2025-10-15'; +SELECT count() FROM t WHERE ts = '2025-08-01'; + +-- ===================================================== +-- 4. Move the watermark +-- ===================================================== +SELECT '--- Test 4: ALTER watermark'; +ALTER TABLE t MODIFY SETTING hybrid_watermark_hot = '2025-10-01'; + +-- ===================================================== +-- 5. New queries use updated boundary +-- ===================================================== +SELECT '--- Test 5: Query with updated boundary'; +SELECT count() FROM t WHERE ts = '2025-10-15'; + +-- ===================================================== +-- 6. SHOW CREATE TABLE reflects the update +-- ===================================================== +SELECT '--- Test 6: SHOW CREATE after ALTER'; +SHOW CREATE TABLE t; + +-- ===================================================== +-- 7. Restart persistence (DETACH / ATTACH) +-- ===================================================== +SELECT '--- Test 7: DETACH/ATTACH persistence'; +DETACH TABLE t; +ATTACH TABLE t; +SHOW CREATE TABLE t; + +-- ===================================================== +-- 8. Three segments, two independent watermarks +-- ===================================================== +SELECT '--- Test 8: Three segments, two watermarks'; +CREATE TABLE t3 +ENGINE = Hybrid( + remote('localhost:9000', currentDatabase(), 'local_hot'), + ts > hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('localhost:9000', currentDatabase(), 'local_warm'), + ts <= hybridParam('hybrid_watermark_hot', 'DateTime') + AND ts > hybridParam('hybrid_watermark_cold', 'DateTime'), + remote('localhost:9000', currentDatabase(), 'local_cold'), + ts <= hybridParam('hybrid_watermark_cold', 'DateTime') +) +SETTINGS hybrid_watermark_hot = '2025-10-01', hybrid_watermark_cold = '2025-01-01' +AS local_hot; + +ALTER TABLE t3 MODIFY SETTING hybrid_watermark_cold = '2025-07-01'; +SHOW CREATE TABLE t3; + +ALTER TABLE t3 MODIFY SETTING + hybrid_watermark_hot = '2025-11-01', + hybrid_watermark_cold = '2025-08-01'; + +-- ===================================================== +-- 9. Non-watermark parameter name is rejected at CREATE +-- ===================================================== +SELECT '--- Test 9: Reject non-watermark parameter'; +CREATE TABLE t_bad_param +ENGINE = Hybrid( + remote('localhost:9000', currentDatabase(), 'local_hot'), + ts > hybridParam('foo', 'DateTime'), + remote('localhost:9000', currentDatabase(), 'local_cold'), + ts <= hybridParam('foo', 'DateTime') +) +SETTINGS foo = '2025-09-01' +AS local_hot; -- { serverError BAD_ARGUMENTS } + +-- ===================================================== +-- 10. Missing watermark SETTINGS — CREATE fails +-- ===================================================== +SELECT '--- Test 10: Missing watermark SETTINGS rejected at CREATE'; +CREATE TABLE t_missing +ENGINE = Hybrid( + remote('localhost:9000', currentDatabase(), 'local_hot'), + ts > hybridParam('hybrid_watermark_nope', 'DateTime'), + remote('localhost:9000', currentDatabase(), 'local_cold'), + ts <= hybridParam('hybrid_watermark_nope', 'DateTime') +) +AS local_hot; -- { serverError BAD_ARGUMENTS } + +-- ===================================================== +-- 11. Invalid typed value — rejected at ALTER time +-- ===================================================== +SELECT '--- Test 11: Invalid typed value'; +ALTER TABLE t MODIFY SETTING hybrid_watermark_hot = 'not-a-date'; -- { serverError BAD_ARGUMENTS } + +-- ===================================================== +-- 12. Non-watermark MODIFY SETTING is rejected on Hybrid +-- ===================================================== +SELECT '--- Test 12: Reject non-watermark MODIFY SETTING'; +ALTER TABLE t MODIFY SETTING bytes_to_delay_insert = 100; -- { serverError BAD_ARGUMENTS } + +-- ===================================================== +-- 13. RESET SETTING is rejected on Hybrid +-- ===================================================== +SELECT '--- Test 13: Reject RESET SETTING on Hybrid'; +ALTER TABLE t RESET SETTING hybrid_watermark_hot; -- { serverError BAD_ARGUMENTS } +ALTER TABLE t3 RESET SETTING bytes_to_delay_insert; -- { serverError BAD_ARGUMENTS } + +-- ===================================================== +-- 14. Alter one watermark preserves the other (metadata seeding) +-- ===================================================== +SELECT '--- Test 14: Alter one preserves the other'; +ALTER TABLE t3 MODIFY SETTING hybrid_watermark_hot = '2025-12-01'; +SHOW CREATE TABLE t3; +SELECT count() FROM t3 WHERE ts = '2025-06-15'; + +-- ===================================================== +-- 15. DistributedSettings are not accepted on Hybrid at CREATE +-- ===================================================== +SELECT '--- Test 15: Reject DistributedSettings at CREATE'; +CREATE TABLE t_dist_setting +ENGINE = Hybrid( + remote('localhost:9000', currentDatabase(), 'local_hot'), + ts > hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('localhost:9000', currentDatabase(), 'local_cold'), + ts <= hybridParam('hybrid_watermark_hot', 'DateTime') +) +SETTINGS + hybrid_watermark_hot = '2025-09-01', + bytes_to_delay_insert = 100 +AS local_hot; -- { serverError BAD_ARGUMENTS } + +-- ===================================================== +-- 16. Plain Distributed table is unaffected +-- ===================================================== +SELECT '--- Test 16: Plain Distributed unaffected'; +CREATE TABLE dist AS remote('localhost:9000', currentDatabase(), 'local_hot'); +ALTER TABLE dist MODIFY SETTING hybrid_watermark_hot = '2025-10-01'; -- { serverError NOT_IMPLEMENTED } + +-- ===================================================== +-- 17. Value provided via SETTINGS +-- ===================================================== +SELECT '--- Test 17: Value via SETTINGS'; +CREATE TABLE t_settings_only +ENGINE = Hybrid( + remote('localhost:9000', currentDatabase(), 'local_hot'), + ts > hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('localhost:9000', currentDatabase(), 'local_cold'), + ts <= hybridParam('hybrid_watermark_hot', 'DateTime') +) +SETTINGS hybrid_watermark_hot = '2025-09-01' +AS local_hot; + +SELECT count() FROM t_settings_only WHERE ts = '2025-10-15'; +SELECT count() FROM t_settings_only WHERE ts = '2025-08-01'; +SHOW CREATE TABLE t_settings_only; + +-- ===================================================== +-- 18. Conflicting types for same watermark name rejected +-- ===================================================== +SELECT '--- Test 18: Conflicting types rejected'; +CREATE TABLE t_conflict +ENGINE = Hybrid( + remote('localhost:9000', currentDatabase(), 'local_hot'), + ts > hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('localhost:9000', currentDatabase(), 'local_cold'), + ts <= hybridParam('hybrid_watermark_hot', 'UInt64') +) +SETTINGS hybrid_watermark_hot = '2025-09-01' +AS local_hot; -- { serverError BAD_ARGUMENTS } + +-- ===================================================== +-- 19. Invalid SETTINGS value rejected at CREATE +-- ===================================================== +SELECT '--- Test 19: Invalid SETTINGS value rejected at CREATE'; +CREATE TABLE t_conflict +ENGINE = Hybrid( + remote('localhost:9000', currentDatabase(), 'local_hot'), + ts > hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('localhost:9000', currentDatabase(), 'local_cold'), + ts <= hybridParam('hybrid_watermark_hot', 'DateTime') +) +SETTINGS hybrid_watermark_hot = 'not-a-date' +AS local_hot; -- { serverError BAD_ARGUMENTS } + +-- ===================================================== +-- 20. Typo'd watermark name rejected at CREATE SETTINGS +-- ===================================================== +SELECT '--- Test 20: Typo in CREATE SETTINGS rejected'; +CREATE TABLE t_conflict +ENGINE = Hybrid( + remote('localhost:9000', currentDatabase(), 'local_hot'), + ts > hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('localhost:9000', currentDatabase(), 'local_cold'), + ts <= hybridParam('hybrid_watermark_hot', 'DateTime') +) +SETTINGS hybrid_watermark_hott = '2025-09-01' +AS local_hot; -- { serverError BAD_ARGUMENTS } + +-- ===================================================== +-- 21. Typo'd watermark name rejected at ALTER +-- ===================================================== +SELECT '--- Test 21: Typo in ALTER MODIFY SETTING rejected'; +ALTER TABLE t MODIFY SETTING hybrid_watermark_hott = '2025-10-01'; -- { serverError BAD_ARGUMENTS } + +-- ===================================================== +-- Cleanup +-- ===================================================== +DROP TABLE IF EXISTS t SYNC; +DROP TABLE IF EXISTS t3 SYNC; +DROP TABLE IF EXISTS t_bad_param SYNC; +DROP TABLE IF EXISTS t_missing SYNC; +DROP TABLE IF EXISTS t_dist_setting SYNC; +DROP TABLE IF EXISTS t_settings_only SYNC; +DROP TABLE IF EXISTS t_conflict SYNC; +DROP TABLE IF EXISTS dist SYNC; +DROP TABLE IF EXISTS local_hot SYNC; +DROP TABLE IF EXISTS local_cold SYNC; +DROP TABLE IF EXISTS local_warm SYNC;