diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index ab20ef00e452..6ba195bfa8c7 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -7710,6 +7710,9 @@ Multiple algorithms can be specified, e.g. 'dpsize,greedy'. )", EXPERIMENTAL) \ DECLARE(Bool, allow_experimental_database_paimon_rest_catalog, false, R"( Allow experimental database engine DataLakeCatalog with catalog_type = 'paimon_rest' +)", EXPERIMENTAL) \ + DECLARE(Bool, allow_experimental_database_s3_tables, false, R"( +Allow experimental database engine DataLakeCatalog with catalog_type = 's3tables' (Amazon S3 Tables Iceberg REST with SigV4) )", EXPERIMENTAL) \ \ /* ####################################################### */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 643b5ccbb7cf..9cb8af42703d 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -63,6 +63,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"optimize_read_in_window_order", true, false, "Disable this logic by default."}, {"correlated_subqueries_use_in_memory_buffer", false, true, "Use in-memory buffer for input of correlated subqueries by default."}, {"allow_experimental_database_paimon_rest_catalog", false, false, "New setting"}, + {"allow_experimental_database_s3_tables", false, false, "New setting"}, {"allow_experimental_object_storage_queue_hive_partitioning", false, false, "New setting."}, {"type_json_use_partial_match_to_skip_paths_by_regexp", false, true, "Add new setting that allows to use partial match in regexp paths skip in JSON type parsing"}, {"max_insert_block_size_bytes", 0, 0, "New setting that allows to control the size of blocks in bytes during parsing of data in Row Input Format."}, diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index 19d47df4a88d..2c9cc986e4f2 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -346,7 +346,8 @@ IMPLEMENT_SETTING_ENUM( {"hive", DatabaseDataLakeCatalogType::ICEBERG_HIVE}, {"onelake", DatabaseDataLakeCatalogType::ICEBERG_ONELAKE}, {"biglake", DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE}, - {"paimon_rest", DatabaseDataLakeCatalogType::PAIMON_REST}}) + {"paimon_rest", DatabaseDataLakeCatalogType::PAIMON_REST}, + {"s3tables", DatabaseDataLakeCatalogType::S3_TABLES}}) IMPLEMENT_SETTING_ENUM( FileCachePolicy, diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index 05c3b26340f4..8160bf4ee9df 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -439,6 +439,7 @@ enum class DatabaseDataLakeCatalogType : uint8_t ICEBERG_ONELAKE, ICEBERG_BIGLAKE, PAIMON_REST, + S3_TABLES, }; DECLARE_SETTING_ENUM(DatabaseDataLakeCatalogType) diff --git a/src/Databases/DataLake/AWSV4Signer.cpp b/src/Databases/DataLake/AWSV4Signer.cpp new file mode 100644 index 000000000000..f5bd8c4bc6ed --- /dev/null +++ b/src/Databases/DataLake/AWSV4Signer.cpp @@ -0,0 +1,110 @@ +#include "config.h" + +#if USE_AVRO && USE_SSL && USE_AWS_S3 + +#include + +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; + extern const int S3_ERROR; +} +} + +namespace DataLake +{ +namespace +{ + +Aws::Http::HttpMethod mapPocoMethodToAws(const String & method) +{ + using Aws::Http::HttpMethod; + using Poco::Net::HTTPRequest; + + static const std::pair supported_methods[] = { + {HTTPRequest::HTTP_GET, HttpMethod::HTTP_GET}, + {HTTPRequest::HTTP_POST, HttpMethod::HTTP_POST}, + {HTTPRequest::HTTP_PUT, HttpMethod::HTTP_PUT}, + {HTTPRequest::HTTP_DELETE, HttpMethod::HTTP_DELETE}, + {HTTPRequest::HTTP_HEAD, HttpMethod::HTTP_HEAD}, + {HTTPRequest::HTTP_PATCH, HttpMethod::HTTP_PATCH}, + }; + + for (const auto & [poco_method, aws_method] : supported_methods) + if (method == poco_method) + return aws_method; + + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unsupported HTTP method for AWS SigV4 signing: {}", method); +} + +} + +void signRequestWithAWSV4( + const String & method, + const Poco::URI & uri, + const DB::HTTPHeaderEntries & extra_headers, + const String & payload, + Aws::Client::AWSAuthV4Signer & signer, + const String & region, + const String & service, + DB::HTTPHeaderEntries & out_headers) +{ + const Aws::Http::URI aws_uri(uri.toString().c_str()); + Aws::Http::Standard::StandardHttpRequest request(aws_uri, mapPocoMethodToAws(method)); + + for (const auto & h : extra_headers) + { + if (Poco::icompare(h.name, "authorization") == 0) + continue; + request.SetHeaderValue(Aws::String(h.name.c_str(), h.name.size()), Aws::String(h.value.c_str(), h.value.size())); + } + + if (!payload.empty()) + { + auto body_stream = Aws::MakeShared("AWSV4Signer"); + body_stream->write(payload.data(), static_cast(payload.size())); + body_stream->seekg(0); + request.AddContentBody(body_stream); + } + + static constexpr bool sign_body = true; + if (!signer.SignRequest(request, region.c_str(), service.c_str(), sign_body)) + throw DB::Exception(DB::ErrorCodes::S3_ERROR, "AWS SigV4 signing failed"); + + bool has_authorization = false; + for (const auto & [key, value] : request.GetHeaders()) + { + if (Poco::icompare(key, "authorization") == 0 && !value.empty()) + has_authorization = true; + } + if (!has_authorization) + throw DB::Exception( + DB::ErrorCodes::BAD_ARGUMENTS, + "AWS credentials are missing or incomplete; cannot sign S3 Tables REST request"); + + out_headers.clear(); + for (const auto & [key, value] : request.GetHeaders()) + { + if (Poco::icompare(key, "host") == 0) + continue; + out_headers.emplace_back(String(key.c_str(), key.size()), String(value.c_str(), value.size())); + } +} + +} + +#endif diff --git a/src/Databases/DataLake/AWSV4Signer.h b/src/Databases/DataLake/AWSV4Signer.h new file mode 100644 index 000000000000..cdc42adaca5f --- /dev/null +++ b/src/Databases/DataLake/AWSV4Signer.h @@ -0,0 +1,34 @@ +#pragma once + +#include "config.h" + +#if USE_AVRO && USE_SSL && USE_AWS_S3 + +#include +#include +#include + +namespace Aws::Client +{ +class AWSAuthV4Signer; +} + +namespace DataLake +{ + +/// Sign a Poco-style HTTP request using the AWS SDK's AWSAuthV4Signer. +/// Builds a temporary Aws::Http::StandardHttpRequest, signs it, then extracts +/// the resulting headers into out_headers (excluding Host; ReadWriteBufferFromHTTP sets it from the URI). +void signRequestWithAWSV4( + const String & method, + const Poco::URI & uri, + const DB::HTTPHeaderEntries & extra_headers, + const String & payload, + Aws::Client::AWSAuthV4Signer & signer, + const String & region, + const String & service, + DB::HTTPHeaderEntries & out_headers); + +} + +#endif diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index 5bee6258e1c6..a48c8b529292 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -27,6 +27,9 @@ #include #include #include +#if USE_AWS_S3 && USE_SSL +#include +#endif #include #include @@ -90,6 +93,7 @@ namespace Setting extern const SettingsBool allow_experimental_database_glue_catalog; extern const SettingsBool allow_experimental_database_hms_catalog; extern const SettingsBool allow_experimental_database_paimon_rest_catalog; + extern const SettingsBool allow_experimental_database_s3_tables; extern const SettingsBool use_hive_partitioning; extern const SettingsBool parallel_replicas_for_cluster_engines; extern const SettingsString cluster_for_parallel_replicas; @@ -141,8 +145,20 @@ void DatabaseDataLake::validateSettings() { if (settings[DatabaseDataLakeSetting::region].value.empty()) throw Exception( - ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for Glue Catalog. " + ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for Glue catalog. " + "Please specify 'SETTINGS region=' in the CREATE DATABASE query"); + } + else if (settings[DatabaseDataLakeSetting::catalog_type].value == DB::DatabaseDataLakeCatalogType::S3_TABLES) + { + if (settings[DatabaseDataLakeSetting::region].value.empty()) + throw Exception( + ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for S3 Tables catalog. " "Please specify 'SETTINGS region=' in the CREATE DATABASE query"); + + if (settings[DatabaseDataLakeSetting::warehouse].value.empty()) + throw Exception( + ErrorCodes::BAD_ARGUMENTS, "`warehouse` setting cannot be empty for S3 Tables catalog. " + "Please specify 'SETTINGS warehouse=' in the CREATE DATABASE query"); } else if (settings[DatabaseDataLakeSetting::warehouse].value.empty()) { @@ -336,6 +352,23 @@ std::shared_ptr DatabaseDataLake::getCatalog() const } break; } + case DB::DatabaseDataLakeCatalogType::S3_TABLES: + { +#if USE_AWS_S3 && USE_SSL + catalog_impl = std::make_shared( + settings[DatabaseDataLakeSetting::warehouse].value, + url, + settings[DatabaseDataLakeSetting::region].value, + catalog_parameters, + settings[DatabaseDataLakeSetting::namespaces].value, + Context::getGlobalContextInstance()); +#else + throw Exception( + ErrorCodes::SUPPORT_IS_DISABLED, + "Amazon S3 Tables catalog requires ClickHouse built with USE_AWS_S3 and USE_SSL"); +#endif + break; + } } return catalog_impl; } @@ -368,6 +401,7 @@ StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration( case DatabaseDataLakeCatalogType::ICEBERG_HIVE: case DatabaseDataLakeCatalogType::ICEBERG_REST: case DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE: + case DatabaseDataLakeCatalogType::S3_TABLES: { switch (type) { @@ -962,9 +996,10 @@ void registerDatabaseDataLake(DatabaseFactory & factory) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name); } - if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST) + if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST + && catalog_type != DatabaseDataLakeCatalogType::S3_TABLES) { - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must have `rest` catalog type only"); + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must use `rest` or `s3tables` catalog type only"); } for (auto & engine_arg : engine_args) @@ -1050,6 +1085,19 @@ void registerDatabaseDataLake(DatabaseFactory & factory) engine_func->name = "Paimon"; break; } + case DatabaseDataLakeCatalogType::S3_TABLES: + { + if (!args.create_query.attach + && !args.context->getSettingsRef()[Setting::allow_experimental_database_s3_tables]) + { + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, + "DatabaseDataLake with S3 Tables catalog is experimental. " + "To allow its usage, enable setting allow_experimental_database_s3_tables"); + } + + engine_func->name = "Iceberg"; + break; + } case DatabaseDataLakeCatalogType::NONE: break; } diff --git a/src/Databases/DataLake/ICatalog.cpp b/src/Databases/DataLake/ICatalog.cpp index e2170c038e52..a135580f29b6 100644 --- a/src/Databases/DataLake/ICatalog.cpp +++ b/src/Databases/DataLake/ICatalog.cpp @@ -257,6 +257,11 @@ bool TableMetadata::hasStorageCredentials() const return storage_credentials != nullptr; } +bool TableMetadata::hasDataLakeSpecificProperties() const +{ + return data_lake_specific_metadata.has_value(); +} + std::string TableMetadata::getMetadataLocation(const std::string & iceberg_metadata_file_location) const { std::string metadata_location = iceberg_metadata_file_location; diff --git a/src/Databases/DataLake/RestCatalog.cpp b/src/Databases/DataLake/RestCatalog.cpp index fb7031e3d052..b51a13bfeef2 100644 --- a/src/Databases/DataLake/RestCatalog.cpp +++ b/src/Databases/DataLake/RestCatalog.cpp @@ -218,7 +218,12 @@ void RestCatalog::parseCatalogConfigurationSettings(const Poco::JSON::Object::Pt result.default_base_location = object->get("default-base-location").extract(); } -DB::HTTPHeaderEntries RestCatalog::getAuthHeaders(bool update_token) const +DB::HTTPHeaderEntries RestCatalog::getAuthHeaders( + bool update_token, + const String & /*method*/, + const Poco::URI & /*url*/, + const DB::HTTPHeaderEntries & /*extra_headers*/, + const String & /*body*/) const { /// Option 1: user specified auth header manually. /// Header has format: 'Authorization: '. @@ -348,7 +353,12 @@ OneLakeCatalog::OneLakeCatalog( config = loadConfig(); } -DB::HTTPHeaderEntries OneLakeCatalog::getAuthHeaders(bool update_token) const +DB::HTTPHeaderEntries OneLakeCatalog::getAuthHeaders( + bool update_token, + const String & /*method*/, + const Poco::URI & /*url*/, + const DB::HTTPHeaderEntries & /*extra_headers*/, + const String & /*body*/) const { /// User provided grant_type, client_id and client_secret. /// We would make OAuthClientCredentialsRequest @@ -477,7 +487,12 @@ BigLakeCatalog::BigLakeCatalog( config = loadConfig(); } -DB::HTTPHeaderEntries BigLakeCatalog::getAuthHeaders(bool update_token) const +DB::HTTPHeaderEntries BigLakeCatalog::getAuthHeaders( + bool update_token, + const String & /*method*/, + const Poco::URI & /*url*/, + const DB::HTTPHeaderEntries & /*extra_headers*/, + const String & /*body*/) const { if (!google_project_id.empty() || !google_adc_client_id.empty()) { @@ -711,7 +726,7 @@ DB::ReadWriteBufferFromHTTPPtr RestCatalog::createReadBuffer( auto create_buffer = [&](bool update_token) { - auto result_headers = getAuthHeaders(update_token); + auto result_headers = getAuthHeaders(update_token, Poco::Net::HTTPRequest::HTTP_GET, url, headers, {}); std::move(headers.begin(), headers.end(), std::back_inserter(result_headers)); return DB::BuilderRWBufferFromHTTP(url) @@ -1187,9 +1202,6 @@ void RestCatalog::sendRequest(const String & endpoint, Poco::JSON::Object::Ptr r request_body->stringify(oss); const std::string body_str = DB::removeEscapedSlashes(oss.str()); - DB::HTTPHeaderEntries headers = getAuthHeaders(/* update_token = */ true); - headers.emplace_back("Content-Type", "application/json"); - const auto & context = getContext(); DB::ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback; @@ -1203,6 +1215,12 @@ void RestCatalog::sendRequest(const String & endpoint, Poco::JSON::Object::Ptr r /// enable_url_encoding=false to allow use tables with encoded sequences in names like 'foo%2Fbar' Poco::URI url(endpoint, /* enable_url_encoding */ false); + + DB::HTTPHeaderEntries extra_headers; + extra_headers.emplace_back("Content-Type", "application/json"); + + DB::HTTPHeaderEntries headers = getAuthHeaders(/* update_token = */ true, method, url, extra_headers, body_str); + headers.emplace_back("Content-Type", "application/json"); auto wb = DB::BuilderRWBufferFromHTTP(url) .withConnectionGroup(DB::HTTPConnectionGroupType::HTTP) .withMethod(method) @@ -1223,7 +1241,7 @@ void RestCatalog::sendRequest(const String & endpoint, Poco::JSON::Object::Ptr r void RestCatalog::createNamespaceIfNotExists(const String & namespace_name, const String & location) const { - const std::string endpoint = fmt::format("{}/namespaces", base_url); + const std::string endpoint = base_url / config.prefix / "namespaces"; Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object; { @@ -1255,7 +1273,7 @@ void RestCatalog::createTable(const String & namespace_name, const String & tabl createNamespaceIfNotExists(namespace_name, metadata_content->getValue("location")); - const std::string endpoint = fmt::format("{}/namespaces/{}/tables", base_url, namespace_name); + const std::string endpoint = base_url / config.prefix / "namespaces" / namespace_name / "tables"; Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object; request_body->set("name", table_name); @@ -1292,7 +1310,7 @@ void RestCatalog::createTable(const String & namespace_name, const String & tabl bool RestCatalog::updateMetadata(const String & namespace_name, const String & table_name, const String & /*new_metadata_path*/, Poco::JSON::Object::Ptr new_snapshot) const { - const std::string endpoint = fmt::format("{}/namespaces/{}/tables/{}", base_url, namespace_name, table_name); + const std::string endpoint = base_url / config.prefix / "namespaces" / namespace_name / "tables" / table_name; Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object; { @@ -1362,7 +1380,9 @@ void RestCatalog::dropTable(const String & namespace_name, const String & table_ "Failed to drop table {}, namespace {} is filtered by `namespaces` database parameter", table_name, namespace_name); - const std::string endpoint = fmt::format("{}/namespaces/{}/tables/{}?purgeRequested=False", base_url, namespace_name, table_name); + const std::string endpoint + = (base_url / config.prefix / "namespaces" / namespace_name / "tables" / table_name).string() + + "?purgeRequested=False"; Poco::JSON::Object::Ptr request_body = nullptr; try diff --git a/src/Databases/DataLake/RestCatalog.h b/src/Databases/DataLake/RestCatalog.h index 17170436898d..fe473c3ccf87 100644 --- a/src/Databases/DataLake/RestCatalog.h +++ b/src/Databases/DataLake/RestCatalog.h @@ -179,7 +179,12 @@ class RestCatalog : public ICatalog, public DB::WithContext TableMetadata & result) const; Config loadConfig(); - virtual DB::HTTPHeaderEntries getAuthHeaders(bool update_token) const; + virtual DB::HTTPHeaderEntries getAuthHeaders( + bool update_token, + const String & method = {}, + const Poco::URI & url = {}, + const DB::HTTPHeaderEntries & extra_headers = {}, + const String & body = {}) const; AccessToken retrieveAccessTokenOAuth() const; static void parseCatalogConfigurationSettings(const Poco::JSON::Object::Ptr & object, Config & result); @@ -210,7 +215,12 @@ class OneLakeCatalog : public RestCatalog return DB::DatabaseDataLakeCatalogType::ICEBERG_ONELAKE; } - DB::HTTPHeaderEntries getAuthHeaders(bool update_token) const override; + DB::HTTPHeaderEntries getAuthHeaders( + bool update_token, + const String & method = {}, + const Poco::URI & url = {}, + const DB::HTTPHeaderEntries & extra_headers = {}, + const String & body = {}) const override; String getTenantId() const { return tenant_id; } @@ -241,7 +251,12 @@ class BigLakeCatalog : public RestCatalog return DB::DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE; } - DB::HTTPHeaderEntries getAuthHeaders(bool update_token) const override; + DB::HTTPHeaderEntries getAuthHeaders( + bool update_token, + const String & method = {}, + const Poco::URI & url = {}, + const DB::HTTPHeaderEntries & extra_headers = {}, + const String & body = {}) const override; private: const std::string google_project_id; diff --git a/src/Databases/DataLake/S3TablesCatalog.cpp b/src/Databases/DataLake/S3TablesCatalog.cpp new file mode 100644 index 000000000000..21f3bab6785d --- /dev/null +++ b/src/Databases/DataLake/S3TablesCatalog.cpp @@ -0,0 +1,256 @@ +#include "config.h" + +#if USE_AVRO && USE_SSL && USE_AWS_S3 + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +namespace DB::ErrorCodes +{ + extern const int BAD_ARGUMENTS; + extern const int DATALAKE_DATABASE_ERROR; + extern const int CATALOG_NAMESPACE_DISABLED; +} + +namespace DB::Setting +{ + extern const SettingsUInt64 s3_max_connections; + extern const SettingsUInt64 s3_max_redirects; + extern const SettingsUInt64 s3_retry_attempts; + extern const SettingsBool s3_slow_all_threads_after_network_error; + extern const SettingsBool enable_s3_requests_logging; + extern const SettingsUInt64 s3_connect_timeout_ms; + extern const SettingsUInt64 s3_request_timeout_ms; +} + +namespace DB::ServerSetting +{ + extern const ServerSettingsUInt64 s3_max_redirects; + extern const ServerSettingsUInt64 s3_retry_attempts; +} + +namespace DataLake +{ + +S3TablesCatalog::S3TablesCatalog( + const String & warehouse_, + const String & base_url_, + const String & region_, + const CatalogSettings & catalog_settings_, + const String & namespaces_, + DB::ContextPtr context_) + : RestCatalog(warehouse_, base_url_, "", "", false, namespaces_, context_) + , region(region_) + , storage_endpoint(catalog_settings_.storage_endpoint) + , signing_service("s3tables") +{ + if (region.empty()) + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "S3 Tables catalog requires non-empty `region` setting"); + + DB::S3::CredentialsConfiguration creds_config; + creds_config.use_environment_credentials = true; + creds_config.role_arn = catalog_settings_.aws_role_arn; + creds_config.role_session_name = catalog_settings_.aws_role_session_name; + + const auto & server_settings = getContext()->getGlobalContext()->getServerSettings(); + const DB::Settings & global_settings = getContext()->getGlobalContext()->getSettingsRef(); + + int s3_max_redirects = static_cast(server_settings[DB::ServerSetting::s3_max_redirects]); + if (global_settings.isChanged("s3_max_redirects")) + s3_max_redirects = static_cast(global_settings[DB::Setting::s3_max_redirects]); + + int s3_retry_attempts = static_cast(server_settings[DB::ServerSetting::s3_retry_attempts]); + if (global_settings.isChanged("s3_retry_attempts")) + s3_retry_attempts = static_cast(global_settings[DB::Setting::s3_retry_attempts]); + + bool s3_slow_all_threads_after_network_error = global_settings[DB::Setting::s3_slow_all_threads_after_network_error]; + bool s3_slow_all_threads_after_retryable_error = false; + bool enable_s3_requests_logging = global_settings[DB::Setting::enable_s3_requests_logging]; + + DB::S3::PocoHTTPClientConfiguration poco_config = DB::S3::ClientFactory::instance().createClientConfiguration( + region, + getContext()->getRemoteHostFilter(), + s3_max_redirects, + DB::S3::PocoHTTPClientConfiguration::RetryStrategy{.max_retries = static_cast(s3_retry_attempts)}, + s3_slow_all_threads_after_network_error, + s3_slow_all_threads_after_retryable_error, + enable_s3_requests_logging, + /* for_disk_s3 = */ false, + /* opt_disk_name = */ {}, + /* request_throttler = */ {}); + + Aws::Auth::AWSCredentials credentials(catalog_settings_.aws_access_key_id, catalog_settings_.aws_secret_access_key); + credentials_provider = DB::S3::getCredentialsProvider(poco_config, credentials, creds_config); + + signer = std::make_unique( + credentials_provider, + "s3tables", + Aws::String(region.data(), region.size()), + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Always, + /* urlEscapePath = */ false); + + config = loadConfig(); + + if (config.prefix.empty()) + { + String encoded_warehouse; + Poco::URI::encode(warehouse_, "", encoded_warehouse); + config.prefix = encoded_warehouse; + } +} + +/// S3 Tables only supports a single level of namespaces (no nesting), +/// so we use flat getNamespaces() instead of the base class's getNamespacesRecursive(). +DB::Names S3TablesCatalog::getTables() const +{ + auto namespaces = getNamespaces(""); + + auto & pool = getContext()->getIcebergCatalogThreadpool(); + DB::ThreadPoolCallbackRunnerLocal runner(pool, DB::ThreadName::DATALAKE_REST_CATALOG); + + DB::Names tables; + std::mutex mutex; + for (const auto & ns : namespaces) + { + if (!allowed_namespaces.isNamespaceAllowed(ns, /*nested*/ false)) + continue; + runner.enqueueAndKeepTrack( + [&, ns] + { + auto tables_in_ns = RestCatalog::getTables(ns); + std::lock_guard lock(mutex); + std::move(tables_in_ns.begin(), tables_in_ns.end(), std::back_inserter(tables)); + }); + } + runner.waitForAllToFinishAndRethrowFirstError(); + return tables; +} + +bool S3TablesCatalog::tryGetTableMetadata( + const std::string & namespace_name, + const std::string & table_name, + DB::ContextPtr context_, + TableMetadata & result) const +{ + if (!RestCatalog::tryGetTableMetadata(namespace_name, table_name, context_, result)) + return false; + + if (!result.requiresCredentials()) + return true; + + bool need_credentials = true; + if (const auto storage_credentials = result.getStorageCredentials()) + { + auto creds = std::dynamic_pointer_cast(storage_credentials); + if (creds && !creds->isEmpty()) + need_credentials = false; + } + + if (need_credentials) + { + LOG_DEBUG(log, "S3 Tables: no vended credentials for {}.{}, injecting catalog IAM credentials", namespace_name, table_name); + auto aws_creds = credentials_provider->GetAWSCredentials(); + result.setStorageCredentials(std::make_shared( + aws_creds.GetAWSAccessKeyId(), aws_creds.GetAWSSecretKey(), aws_creds.GetSessionToken())); + } + + if (result.getEndpoint().empty()) + { + String endpoint = storage_endpoint.empty() + ? DB::S3::resolveS3Endpoint(region) + : storage_endpoint; + LOG_DEBUG(log, "S3 Tables: no endpoint for {}.{}, injecting: {}", namespace_name, table_name, endpoint); + result.setEndpoint(endpoint); + } + + if (auto props = result.getDataLakeSpecificProperties(); + props && !props->iceberg_metadata_file_location.empty()) + { + const String & loc = props->iceberg_metadata_file_location; + auto scheme_end = loc.find("://"); + if (scheme_end != String::npos) + { + auto path_start = loc.find('/', scheme_end + 3); + if (path_start != String::npos) + props->iceberg_metadata_file_location = loc.substr(path_start + 1); + } + result.setDataLakeSpecificProperties(std::move(props)); + } + + return true; +} + +void S3TablesCatalog::dropTable(const String & namespace_name, const String & table_name) const +{ + if (!allowed_namespaces.isNamespaceAllowed(namespace_name, /*nested*/ false)) + throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, + "Failed to drop table {}, namespace {} is filtered by `namespaces` database parameter", + table_name, namespace_name); + + const std::string endpoint + = (base_url / config.prefix / "namespaces" / namespace_name / "tables" / table_name).string() + + "?purgeRequested=True"; + + Poco::JSON::Object::Ptr request_body = nullptr; + try + { + sendRequest(endpoint, request_body, Poco::Net::HTTPRequest::HTTP_DELETE, true); + } + catch (const DB::HTTPException & ex) + { + if (ex.getHTTPStatus() == Poco::Net::HTTPResponse::HTTP_NOT_FOUND) + // 404 is returned by the API when the table does + LOG_DEBUG(log, "S3 Tables: table {}.{} already does not exist (404 on purge-delete)", namespace_name, table_name); + else + throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "Failed to drop table {}", ex.displayText()); + } +} + +DB::HTTPHeaderEntries S3TablesCatalog::getAuthHeaders( + bool /*update_token*/, + const String & method, + const Poco::URI & url, + const DB::HTTPHeaderEntries & extra_headers, + const String & body) const +{ + DB::HTTPHeaderEntries all_signed; + signRequestWithAWSV4(method, url, extra_headers, body, *signer, region, signing_service, all_signed); + + // signRequestWithAWSV4 returns both input extra_headers and signer-added auth + // headers. Only return the auth portion (authorization, x-amz-*); the caller + // appends the original request headers separately. + DB::HTTPHeaderEntries auth_headers; + for (auto & h : all_signed) + { + if (h.name == "authorization" || h.name.starts_with("x-amz-")) + auth_headers.push_back(std::move(h)); + } + return auth_headers; +} + +} + +#endif diff --git a/src/Databases/DataLake/S3TablesCatalog.h b/src/Databases/DataLake/S3TablesCatalog.h new file mode 100644 index 000000000000..f328e9eb87c8 --- /dev/null +++ b/src/Databases/DataLake/S3TablesCatalog.h @@ -0,0 +1,65 @@ +#pragma once + +#include "config.h" + +#if USE_AVRO && USE_SSL && USE_AWS_S3 + +#include +#include + +#include + +#include + +namespace Aws::Auth +{ +class AWSCredentialsProvider; +} + +namespace DataLake +{ + +/// Iceberg REST catalog for Amazon S3 Tables (SigV4, signing name `s3tables`). +/// https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-integrating-open-source.html +class S3TablesCatalog final : public RestCatalog +{ +public: + S3TablesCatalog( + const String & warehouse_, + const String & base_url_, + const String & region_, + const DataLake::CatalogSettings & catalog_settings_, + const String & namespaces_, + DB::ContextPtr context_); + + DB::DatabaseDataLakeCatalogType getCatalogType() const override { return DB::DatabaseDataLakeCatalogType::S3_TABLES; } + + DB::Names getTables() const override; + + bool tryGetTableMetadata( + const std::string & namespace_name, + const std::string & table_name, + DB::ContextPtr context_, + TableMetadata & result) const override; + + void dropTable(const String & namespace_name, const String & table_name) const override; + +protected: + DB::HTTPHeaderEntries getAuthHeaders( + bool update_token, + const String & method = {}, + const Poco::URI & url = {}, + const DB::HTTPHeaderEntries & extra_headers = {}, + const String & body = {}) const override; + +private: + const String region; + const String storage_endpoint; + const String signing_service; + std::shared_ptr credentials_provider; + std::unique_ptr signer; +}; + +} + +#endif diff --git a/src/Databases/DataLake/StorageCredentials.h b/src/Databases/DataLake/StorageCredentials.h index ab09e3420889..5e261b9bb3a4 100644 --- a/src/Databases/DataLake/StorageCredentials.h +++ b/src/Databases/DataLake/StorageCredentials.h @@ -31,6 +31,8 @@ class S3Credentials final : public IStorageCredentials , session_token(session_token_) {} + bool isEmpty() const { return access_key_id.empty() || secret_access_key.empty(); } + void addCredentialsToEngineArgs(DB::ASTs & engine_args) const override { if (engine_args.size() != 1) diff --git a/src/Databases/enableAllExperimentalSettings.cpp b/src/Databases/enableAllExperimentalSettings.cpp index 2ef37686b3d2..64bac904b3da 100644 --- a/src/Databases/enableAllExperimentalSettings.cpp +++ b/src/Databases/enableAllExperimentalSettings.cpp @@ -64,6 +64,7 @@ void enableAllExperimentalSettings(ContextMutablePtr context) context->setSetting("allow_dynamic_type_in_join_keys", 1); context->setSetting("allow_experimental_alias_table_engine", 1); context->setSetting("allow_experimental_database_paimon_rest_catalog", 1); + context->setSetting("allow_experimental_database_s3_tables", 1); context->setSetting("allow_experimental_object_storage_queue_hive_partitioning", 1); /// clickhouse-private settings diff --git a/src/IO/S3/URI.cpp b/src/IO/S3/URI.cpp index 5ab8e5cfd724..4f6e2f73cc79 100644 --- a/src/IO/S3/URI.cpp +++ b/src/IO/S3/URI.cpp @@ -8,6 +8,8 @@ #include #include +#include + #include #include @@ -302,6 +304,19 @@ void URI::validateKey(const String & key, const Poco::URI & uri) } } +std::string resolveS3Endpoint(const std::string & region) +{ + Aws::S3::Endpoint::S3EndpointProvider provider; + provider.AccessBuiltInParameters().SetStringParameter("Region", Aws::String(region)); + auto outcome = provider.ResolveEndpoint({}); + if (outcome.IsSuccess()) + { + auto uri = outcome.GetResult().GetURI(); + return uri.GetURIString(); + } + return "https://s3." + region + ".amazonaws.com"; +} + } } diff --git a/src/IO/S3/URI.h b/src/IO/S3/URI.h index fd45baa39774..9710e640ce66 100644 --- a/src/IO/S3/URI.h +++ b/src/IO/S3/URI.h @@ -50,6 +50,10 @@ struct URI static bool isAWSRegion(std::string_view region); }; +/// Resolve the S3 endpoint URL for a given AWS region using the SDK's +/// Smithy endpoint rules (handles all partitions: standard, China, GovCloud, etc.). +std::string resolveS3Endpoint(const std::string & region); + } #endif diff --git a/src/IO/S3/tests/gtest_s3_uri.cpp b/src/IO/S3/tests/gtest_s3_uri.cpp index a429c645fca3..5c0d3debc0c0 100644 --- a/src/IO/S3/tests/gtest_s3_uri.cpp +++ b/src/IO/S3/tests/gtest_s3_uri.cpp @@ -38,4 +38,23 @@ TEST(IOTestS3URI, PathStyleWithKey) ASSERT_EQ(uri_with_no_key_and_with_slash.key, "key/key/key/key"); } +TEST(IOTestS3URI, ResolveS3Endpoint) +{ + using namespace DB; + + ASSERT_EQ(S3::resolveS3Endpoint("us-east-1"), + "https://s3.us-east-1.amazonaws.com"); + ASSERT_EQ(S3::resolveS3Endpoint("eu-west-1"), + "https://s3.eu-west-1.amazonaws.com"); + + auto cn_north = S3::resolveS3Endpoint("cn-north-1"); + ASSERT_TRUE(cn_north.ends_with(".amazonaws.com.cn")) + << "China region should resolve to .amazonaws.com.cn suffix, got: " << cn_north; + ASSERT_TRUE(cn_north.find("cn-north-1") != std::string::npos) + << "Got: " << cn_north; + + ASSERT_EQ(S3::resolveS3Endpoint("us-gov-west-1"), + "https://s3.us-gov-west-1.amazonaws.com"); +} + #endif