Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/connectors/sinks/iceberg_sink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ pub struct IcebergSinkConfig {
pub dynamic_routing: bool,
pub dynamic_route_field: String,
pub store_url: String,
pub store_access_key_id: String,
pub store_secret_access_key: String,
pub store_access_key_id: Option<String>,
pub store_secret_access_key: Option<String>,
pub store_region: String,
pub store_class: IcebergSinkStoreClass,
}
Expand Down
93 changes: 85 additions & 8 deletions core/connectors/sinks/iceberg_sink/src/props.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,91 @@ pub fn init_props(config: &IcebergSinkConfig) -> Result<HashMap<String, String>,
fn get_props_s3(config: &IcebergSinkConfig) -> Result<HashMap<String, String>, Error> {
let mut props: HashMap<String, String> = HashMap::new();
props.insert("s3.region".to_string(), config.store_region.clone());
props.insert(
"s3.access-key-id".to_string(),
config.store_access_key_id.clone(),
);
props.insert(
"s3.secret-access-key".to_string(),
config.store_secret_access_key.clone(),
);
props.insert("s3.endpoint".to_string(), config.store_url.clone());
match (&config.store_access_key_id, &config.store_secret_access_key) {
(Some(access_key_id), Some(secret_access_key)) => {
props.insert("s3.access-key-id".to_string(), access_key_id.clone());
props.insert(
"s3.secret-access-key".to_string(),
secret_access_key.clone(),
);
}
(None, None) => {}
_ => {
return Err(Error::InvalidConfigValue("Partially configured credentials. You must provide both store_access_key_id and store_secret_access_key, or omit both.".to_owned()));
}
}
Ok(props)
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{IcebergSinkConfig, IcebergSinkStoreClass, IcebergSinkTypes};

fn base_config() -> IcebergSinkConfig {
IcebergSinkConfig {
tables: vec![],
catalog_type: IcebergSinkTypes::REST,
warehouse: "warehouse".to_string(),
uri: "http://localhost:8181".to_string(),
dynamic_routing: false,
dynamic_route_field: "".to_string(),
store_url: "http://localhost:9000".to_string(),
store_access_key_id: None,
store_secret_access_key: None,
store_region: "us-east-1".to_string(),
store_class: IcebergSinkStoreClass::S3,
}
}

#[test]
fn test_get_props_s3_no_credentials() {
let config = base_config();
let props = get_props_s3(&config).expect("Should succeed without credentials");
assert_eq!(props.get("s3.region").unwrap(), "us-east-1");
assert_eq!(props.get("s3.endpoint").unwrap(), "http://localhost:9000");
assert!(!props.contains_key("s3.access-key-id"));
assert!(!props.contains_key("s3.secret-access-key"));
}

#[test]
fn test_get_props_s3_full_credentials() {
let config = IcebergSinkConfig {
store_access_key_id: Some("admin".to_string()),
store_secret_access_key: Some("password".to_string()),
..base_config()
};
let props = get_props_s3(&config).expect("Should succeed with full credentials");
assert_eq!(props.get("s3.region").unwrap(), "us-east-1");
assert_eq!(props.get("s3.endpoint").unwrap(), "http://localhost:9000");
assert_eq!(props.get("s3.access-key-id").unwrap(), "admin");
assert_eq!(props.get("s3.secret-access-key").unwrap(), "password");
}

#[test]
fn test_get_props_s3_partial_access_key() {
let config = IcebergSinkConfig {
store_access_key_id: Some("admin".to_string()),
store_secret_access_key: None,
..base_config()
};
assert!(
get_props_s3(&config).is_err(),
"Partial credentials (only access_key_id) should be rejected"
);
}

#[test]
fn test_get_props_s3_partial_secret_key() {
let config = IcebergSinkConfig {
store_access_key_id: None,
store_secret_access_key: Some("password".to_string()),
..base_config()
};
assert!(
get_props_s3(&config).is_err(),
"Partial credentials (only secret_access_key) should be rejected"
);
}
}
41 changes: 25 additions & 16 deletions core/connectors/sinks/iceberg_sink/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,31 @@ use tracing::{debug, error, info};
#[async_trait]
impl Sink for IcebergSink {
async fn open(&mut self) -> Result<(), Error> {
let redacted_store_key = self
.config
.store_access_key_id
.chars()
.take(3)
.collect::<String>();
let redacted_store_secret = self
.config
.store_secret_access_key
.chars()
.take(3)
.collect::<String>();
info!(
"Opened Iceberg sink connector with ID: {} for URL: {}, store access key ID: {redacted_store_key}*** store secret: {redacted_store_secret}***",
self.id, self.config.uri
);
match (
&self.config.store_access_key_id,
&self.config.store_secret_access_key,
) {
(Some(store_access_key_id), Some(store_secret_access_key)) => {
let redacted_store_key = store_access_key_id.chars().take(3).collect::<String>();
let redacted_store_secret =
store_secret_access_key.chars().take(3).collect::<String>();
info!(
"Opened Iceberg sink connector with ID: {} for URL: {}, store access key ID: {redacted_store_key}*** store secret: {redacted_store_secret}***",
self.id, self.config.uri
);
}
(None, None) => {
info!(
"Opened Iceberg sink connector with ID: {} for URL: {}. No explicit credentials provided, falling back to default credential provider chain",
self.id, self.config.uri
);
}
_ => {
return Err(Error::InvalidConfigValue(
"Partially configured credentials. You must provide both store_access_key_id and store_secret_access_key, or omit both.".to_owned(),
));
}
}

info!(
"Configuring Iceberg catalog with the following config:\n-region: {}\n-url: {}\n-store class: {}\n-catalog type: {}\n",
Expand Down
43 changes: 43 additions & 0 deletions core/integration/tests/connectors/fixtures/iceberg/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ pub const ENV_SINK_STORE_SECRET: &str =
pub const ENV_SINK_STORE_REGION: &str = "IGGY_CONNECTORS_SINK_ICEBERG_PLUGIN_CONFIG_STORE_REGION";
pub const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_ICEBERG_PATH";

pub const ENV_AWS_ACCESS_KEY_ID: &str = "AWS_ACCESS_KEY_ID";
pub const ENV_AWS_SECRET_ACCESS_KEY: &str = "AWS_SECRET_ACCESS_KEY";

pub struct MinioContainer {
#[allow(dead_code)]
container: ContainerAsync<GenericImage>,
Expand Down Expand Up @@ -532,3 +535,43 @@ impl TestFixture for IcebergPreCreatedFixture {
self.inner.connectors_runtime_envs()
}
}

pub struct IcebergEnvAuthFixture {
inner: IcebergPreCreatedFixture,
}

impl IcebergOps for IcebergEnvAuthFixture {
fn catalog_url(&self) -> &str {
self.inner.catalog_url()
}

fn http_client(&self) -> &HttpClient {
self.inner.http_client()
}
}

#[async_trait]
impl TestFixture for IcebergEnvAuthFixture {
async fn setup() -> Result<Self, TestBinaryError> {
let inner = IcebergPreCreatedFixture::setup().await?;
Ok(Self { inner })
}

fn connectors_runtime_envs(&self) -> HashMap<String, String> {
let mut envs = self.inner.connectors_runtime_envs();
// Remove the explicit credentials injected by the underlying fixture.
// This forces the Iceberg Sink to use the default credential provider chain instead of explicit config.
envs.remove(ENV_SINK_STORE_ACCESS_KEY);
envs.remove(ENV_SINK_STORE_SECRET);
// Inject standard AWS env vars to test the default credential provider chain.
envs.insert(
ENV_AWS_ACCESS_KEY_ID.to_string(),
MINIO_ACCESS_KEY.to_string(),
);
envs.insert(
ENV_AWS_SECRET_ACCESS_KEY.to_string(),
MINIO_SECRET_KEY.to_string(),
);
envs
}
}
4 changes: 3 additions & 1 deletion core/integration/tests/connectors/fixtures/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@

mod container;

pub use container::{DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, IcebergPreCreatedFixture};
pub use container::{
DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergEnvAuthFixture, IcebergOps, IcebergPreCreatedFixture,
};
4 changes: 3 additions & 1 deletion core/integration/tests/connectors/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ pub use http::{
HttpSinkIndividualFixture, HttpSinkJsonArrayFixture, HttpSinkMultiTopicFixture,
HttpSinkNdjsonFixture, HttpSinkNoMetadataFixture, HttpSinkRawFixture,
};
pub use iceberg::{DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, IcebergPreCreatedFixture};
pub use iceberg::{
DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergEnvAuthFixture, IcebergOps, IcebergPreCreatedFixture,
};
pub use influxdb::{
InfluxDbSinkBase64Fixture, InfluxDbSinkFixture, InfluxDbSinkNoMetadataFixture,
InfluxDbSinkNsPrecisionFixture, InfluxDbSinkTextFixture, InfluxDbSourceFixture,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

type = "sink"
key = "iceberg"
enabled = true
version = 0
name = "Iceberg sink"
path = "../../target/debug/libiggy_connector_iceberg_sink"
verbose = true

[[streams]]
stream = "test_stream"
topics = ["test_topic"]
schema = "json"
batch_length = 100
poll_interval = "5ms"
consumer_group = "iceberg_sink_connector"

# Notice: This configuration deliberately omits 'store_access_key_id' and 'store_secret_access_key'.
# It is used exclusively by integration tests to test the default credential provider chain fallback behavior.
[plugin_config]
tables = ["test.messages"]
catalog_type = "rest"
warehouse = "warehouse"
uri = "http://localhost:8181"
dynamic_routing = false
dynamic_route_field = ""
store_url = "http://localhost:9000"
store_region = "us-east-1"
store_class = "s3"
54 changes: 53 additions & 1 deletion core/integration/tests/connectors/iceberg/iceberg_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use crate::connectors::create_test_messages;
use crate::connectors::fixtures::{
DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, IcebergPreCreatedFixture,
DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergEnvAuthFixture, IcebergOps, IcebergPreCreatedFixture,
};
use bytes::Bytes;
use iggy::prelude::{IggyMessage, Partitioning};
Expand Down Expand Up @@ -207,3 +207,55 @@ async fn iceberg_sink_handles_bulk_messages(
assert_eq!(sinks.len(), 1);
assert!(sinks[0].last_error.is_none());
}

#[iggy_harness(
server(connectors_runtime(config_path = "tests/connectors/iceberg/sink_default_credentials.toml")),
seed = seeds::connector_stream
)]
async fn iceberg_sink_uses_default_credential_chain(
harness: &TestHarness,
fixture: IcebergEnvAuthFixture,
) {
let client = harness
.root_client()
.await
.expect("Failed to get root client");
let stream_id: iggy_common::Identifier =
seeds::names::STREAM.try_into().expect("Invalid stream id");
let topic_id: iggy_common::Identifier =
seeds::names::TOPIC.try_into().expect("Invalid topic id");
let test_messages = crate::connectors::create_test_messages(5);
let mut messages: Vec<IggyMessage> = test_messages
.iter()
.enumerate()
.map(|(i, msg)| {
let payload = serde_json::to_vec(msg).expect("Failed to serialize message");
IggyMessage::builder()
.id((i + 1) as u128)
.payload(bytes::Bytes::from(payload))
.build()
.expect("Failed to build message")
})
.collect();
client
.send_messages(
&stream_id,
&topic_id,
&Partitioning::partition_id(0),
&mut messages,
)
.await
.expect("Failed to send fake messages");
let snapshot_count = fixture
.wait_for_snapshots(
DEFAULT_NAMESPACE,
DEFAULT_TABLE,
1,
SNAPSHOT_POLL_ATTEMPTS,
SNAPSHOT_POLL_INTERVAL_MS,
)
.await
.expect("Data should be written to Iceberg table");
assert!(snapshot_count >= 1);
drop(fixture);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[connectors]
config_type = "local"
config_dir = "tests/connectors/iceberg/default_credentials_config"
Loading