Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7710,6 +7710,9 @@ Multiple algorithms can be specified, e.g. 'dpsize,greedy'.
)", EXPERIMENTAL) \
DECLARE(Bool, allow_experimental_database_paimon_rest_catalog, false, R"(
Allow experimental database engine DataLakeCatalog with catalog_type = 'paimon_rest'
)", EXPERIMENTAL) \
DECLARE(Bool, allow_experimental_database_s3_tables, false, R"(
Allow experimental database engine DataLakeCatalog with catalog_type = 's3tables' (Amazon S3 Tables Iceberg REST with SigV4)
)", EXPERIMENTAL) \
\
/* ####################################################### */ \
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"optimize_read_in_window_order", true, false, "Disable this logic by default."},
{"correlated_subqueries_use_in_memory_buffer", false, true, "Use in-memory buffer for input of correlated subqueries by default."},
{"allow_experimental_database_paimon_rest_catalog", false, false, "New setting"},
{"allow_experimental_database_s3_tables", false, false, "New setting"},
{"allow_experimental_object_storage_queue_hive_partitioning", false, false, "New setting."},
{"type_json_use_partial_match_to_skip_paths_by_regexp", false, true, "Add new setting that allows to use partial match in regexp paths skip in JSON type parsing"},
{"max_insert_block_size_bytes", 0, 0, "New setting that allows to control the size of blocks in bytes during parsing of data in Row Input Format."},
Expand Down
3 changes: 2 additions & 1 deletion src/Core/SettingsEnums.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ IMPLEMENT_SETTING_ENUM(
{"hive", DatabaseDataLakeCatalogType::ICEBERG_HIVE},
{"onelake", DatabaseDataLakeCatalogType::ICEBERG_ONELAKE},
{"biglake", DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE},
{"paimon_rest", DatabaseDataLakeCatalogType::PAIMON_REST}})
{"paimon_rest", DatabaseDataLakeCatalogType::PAIMON_REST},
{"s3tables", DatabaseDataLakeCatalogType::S3_TABLES}})

IMPLEMENT_SETTING_ENUM(
FileCachePolicy,
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsEnums.h
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ enum class DatabaseDataLakeCatalogType : uint8_t
ICEBERG_ONELAKE,
ICEBERG_BIGLAKE,
PAIMON_REST,
S3_TABLES,
};

DECLARE_SETTING_ENUM(DatabaseDataLakeCatalogType)
Expand Down
110 changes: 110 additions & 0 deletions src/Databases/DataLake/AWSV4Signer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#include "config.h"

#if USE_AVRO && USE_SSL && USE_AWS_S3

#include <Databases/DataLake/AWSV4Signer.h>

#include <Common/Exception.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/String.h>

#include <aws/core/auth/signer/AWSAuthV4Signer.h>
#include <aws/core/http/standard/StandardHttpRequest.h>
#include <aws/core/http/URI.h>
#include <aws/core/utils/memory/AWSMemory.h>

#include <sstream>
#include <utility>

namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int S3_ERROR;
}
}

namespace DataLake
{
namespace
{

Aws::Http::HttpMethod mapPocoMethodToAws(const String & method)
{
using Aws::Http::HttpMethod;
using Poco::Net::HTTPRequest;

static const std::pair<String, HttpMethod> supported_methods[] = {
{HTTPRequest::HTTP_GET, HttpMethod::HTTP_GET},
{HTTPRequest::HTTP_POST, HttpMethod::HTTP_POST},
{HTTPRequest::HTTP_PUT, HttpMethod::HTTP_PUT},
{HTTPRequest::HTTP_DELETE, HttpMethod::HTTP_DELETE},
{HTTPRequest::HTTP_HEAD, HttpMethod::HTTP_HEAD},
{HTTPRequest::HTTP_PATCH, HttpMethod::HTTP_PATCH},
};

for (const auto & [poco_method, aws_method] : supported_methods)
if (method == poco_method)
return aws_method;

throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unsupported HTTP method for AWS SigV4 signing: {}", method);
}

}

void signRequestWithAWSV4(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Perhaps name it like signHeaders as the output of this method is a list of headers and not a request?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    if (!payload.empty())
    {
        auto body_stream = Aws::MakeShared<std::stringstream>("AWSV4Signer");
        body_stream->write(payload.data(), static_cast<std::streamsize>(payload.size()));
        body_stream->seekg(0);
        request.AddContentBody(body_stream);
    }

    static constexpr bool sign_body = true;
    if (!signer.SignRequest(request, region.c_str(), service.c_str(), sign_body))

The request body is also signed

const String & method,
const Poco::URI & uri,
const DB::HTTPHeaderEntries & extra_headers,
const String & payload,
Aws::Client::AWSAuthV4Signer & signer,
const String & region,
const String & service,
DB::HTTPHeaderEntries & out_headers)
{
const Aws::Http::URI aws_uri(uri.toString().c_str());
Aws::Http::Standard::StandardHttpRequest request(aws_uri, mapPocoMethodToAws(method));

for (const auto & h : extra_headers)
{
if (Poco::icompare(h.name, "authorization") == 0)
continue;
request.SetHeaderValue(Aws::String(h.name.c_str(), h.name.size()), Aws::String(h.value.c_str(), h.value.size()));
}

if (!payload.empty())
{
auto body_stream = Aws::MakeShared<std::stringstream>("AWSV4Signer");
body_stream->write(payload.data(), static_cast<std::streamsize>(payload.size()));
body_stream->seekg(0);
request.AddContentBody(body_stream);
}

static constexpr bool sign_body = true;
if (!signer.SignRequest(request, region.c_str(), service.c_str(), sign_body))
throw DB::Exception(DB::ErrorCodes::S3_ERROR, "AWS SigV4 signing failed");

bool has_authorization = false;
for (const auto & [key, value] : request.GetHeaders())
{
if (Poco::icompare(key, "authorization") == 0 && !value.empty())
has_authorization = true;
}
if (!has_authorization)
throw DB::Exception(
DB::ErrorCodes::BAD_ARGUMENTS,
"AWS credentials are missing or incomplete; cannot sign S3 Tables REST request");

out_headers.clear();
for (const auto & [key, value] : request.GetHeaders())
{
if (Poco::icompare(key, "host") == 0)
continue;
out_headers.emplace_back(String(key.c_str(), key.size()), String(value.c_str(), value.size()));
}
}

}

#endif
34 changes: 34 additions & 0 deletions src/Databases/DataLake/AWSV4Signer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#pragma once

#include "config.h"

#if USE_AVRO && USE_SSL && USE_AWS_S3

#include <Core/Types.h>
#include <IO/HTTPHeaderEntries.h>
#include <Poco/URI.h>

namespace Aws::Client
{
class AWSAuthV4Signer;
}

namespace DataLake
{

/// Sign a Poco-style HTTP request using the AWS SDK's AWSAuthV4Signer.
/// Builds a temporary Aws::Http::StandardHttpRequest, signs it, then extracts
/// the resulting headers into out_headers (excluding Host; ReadWriteBufferFromHTTP sets it from the URI).
void signRequestWithAWSV4(
const String & method,
const Poco::URI & uri,
const DB::HTTPHeaderEntries & extra_headers,
const String & payload,
Aws::Client::AWSAuthV4Signer & signer,
const String & region,
const String & service,
DB::HTTPHeaderEntries & out_headers);

}

#endif
54 changes: 51 additions & 3 deletions src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
#include <Databases/DataLake/RestCatalog.h>
#include <Databases/DataLake/GlueCatalog.h>
#include <Databases/DataLake/PaimonRestCatalog.h>
#if USE_AWS_S3 && USE_SSL
#include <Databases/DataLake/S3TablesCatalog.h>
#endif
#include <DataTypes/DataTypeString.h>

#include <Storages/ObjectStorage/S3/Configuration.h>
Expand Down Expand Up @@ -90,6 +93,7 @@ namespace Setting
extern const SettingsBool allow_experimental_database_glue_catalog;
extern const SettingsBool allow_experimental_database_hms_catalog;
extern const SettingsBool allow_experimental_database_paimon_rest_catalog;
extern const SettingsBool allow_experimental_database_s3_tables;
extern const SettingsBool use_hive_partitioning;
extern const SettingsBool parallel_replicas_for_cluster_engines;
extern const SettingsString cluster_for_parallel_replicas;
Expand Down Expand Up @@ -141,8 +145,20 @@ void DatabaseDataLake::validateSettings()
{
if (settings[DatabaseDataLakeSetting::region].value.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for Glue Catalog. "
ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for Glue catalog. "
"Please specify 'SETTINGS region=<region_name>' in the CREATE DATABASE query");
}
else if (settings[DatabaseDataLakeSetting::catalog_type].value == DB::DatabaseDataLakeCatalogType::S3_TABLES)
{
if (settings[DatabaseDataLakeSetting::region].value.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for S3 Tables catalog. "
"Please specify 'SETTINGS region=<region_name>' in the CREATE DATABASE query");

if (settings[DatabaseDataLakeSetting::warehouse].value.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "`warehouse` setting cannot be empty for S3 Tables catalog. "
"Please specify 'SETTINGS warehouse=<table_bucket_arn>' in the CREATE DATABASE query");
}
else if (settings[DatabaseDataLakeSetting::warehouse].value.empty())
{
Expand Down Expand Up @@ -336,6 +352,23 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
}
break;
}
case DB::DatabaseDataLakeCatalogType::S3_TABLES:
{
#if USE_AWS_S3 && USE_SSL
catalog_impl = std::make_shared<DataLake::S3TablesCatalog>(
settings[DatabaseDataLakeSetting::warehouse].value,
url,
settings[DatabaseDataLakeSetting::region].value,
catalog_parameters,
settings[DatabaseDataLakeSetting::namespaces].value,
Context::getGlobalContextInstance());
#else
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED,
"Amazon S3 Tables catalog requires ClickHouse built with USE_AWS_S3 and USE_SSL");
#endif
break;
}
}
return catalog_impl;
}
Expand Down Expand Up @@ -368,6 +401,7 @@ StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration(
case DatabaseDataLakeCatalogType::ICEBERG_HIVE:
case DatabaseDataLakeCatalogType::ICEBERG_REST:
case DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE:
case DatabaseDataLakeCatalogType::S3_TABLES:
{
switch (type)
{
Expand Down Expand Up @@ -962,9 +996,10 @@ void registerDatabaseDataLake(DatabaseFactory & factory)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name);
}

if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST)
if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST
&& catalog_type != DatabaseDataLakeCatalogType::S3_TABLES)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must have `rest` catalog type only");
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must use `rest` or `s3tables` catalog type only");
}

for (auto & engine_arg : engine_args)
Expand Down Expand Up @@ -1050,6 +1085,19 @@ void registerDatabaseDataLake(DatabaseFactory & factory)
engine_func->name = "Paimon";
break;
}
case DatabaseDataLakeCatalogType::S3_TABLES:
{
if (!args.create_query.attach
&& !args.context->getSettingsRef()[Setting::allow_experimental_database_s3_tables])
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"DatabaseDataLake with S3 Tables catalog is experimental. "
"To allow its usage, enable setting allow_experimental_database_s3_tables");
}

engine_func->name = "Iceberg";
break;
}
case DatabaseDataLakeCatalogType::NONE:
break;
}
Expand Down
5 changes: 5 additions & 0 deletions src/Databases/DataLake/ICatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,11 @@ bool TableMetadata::hasStorageCredentials() const
return storage_credentials != nullptr;
}

bool TableMetadata::hasDataLakeSpecificProperties() const
{
return data_lake_specific_metadata.has_value();
}

std::string TableMetadata::getMetadataLocation(const std::string & iceberg_metadata_file_location) const
{
std::string metadata_location = iceberg_metadata_file_location;
Expand Down
Loading
Loading