Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/connectors/sinks/iceberg_sink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ pub struct IcebergSinkConfig {
pub dynamic_routing: bool,
pub dynamic_route_field: String,
pub store_url: String,
pub store_access_key_id: String,
pub store_secret_access_key: String,
pub store_access_key_id: Option<String>,
pub store_secret_access_key: Option<String>,
pub store_region: String,
pub store_class: IcebergSinkStoreClass,
}
Expand Down
51 changes: 43 additions & 8 deletions core/connectors/sinks/iceberg_sink/src/props.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,49 @@ pub fn init_props(config: &IcebergSinkConfig) -> Result<HashMap<String, String>,
fn get_props_s3(config: &IcebergSinkConfig) -> Result<HashMap<String, String>, Error> {
let mut props: HashMap<String, String> = HashMap::new();
props.insert("s3.region".to_string(), config.store_region.clone());
props.insert(
"s3.access-key-id".to_string(),
config.store_access_key_id.clone(),
);
props.insert(
"s3.secret-access-key".to_string(),
config.store_secret_access_key.clone(),
);
if let Some(access_key_id) = &config.store_access_key_id {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_props_s3() handles each credential independently with separate if let Some(...) blocks. the partial-credential invariant (both-or-neither) is only enforced in sink.rs::open(), but init_props is pub - a future caller could bypass validation and get a silently half-configured props map. consider making init_props pub(crate) or adding both-or-neither validation here as defense-in-depth.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added the both-or-neither validation to get_props_s3. Thanks for pointing out!

props.insert("s3.access-key-id".to_string(), access_key_id.clone());
}
if let Some(secret_access_key) = &config.store_secret_access_key {
props.insert(
"s3.secret-access-key".to_string(),
secret_access_key.clone(),
);
}
props.insert("s3.endpoint".to_string(), config.store_url.clone());
Ok(props)
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{IcebergSinkConfig, IcebergSinkStoreClass, IcebergSinkTypes};

#[test]
fn test_get_props_s3() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test covers both-None and both-Some but not the mixed case (one Some, one None). if validation moves into get_props_s3 per the above comment, this test should verify the error path too.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the unit tests to cover the partial credential scenarios. Since adding the new cases made the test a bit long and bloated, I also split it up. Thanks!

let mut config = IcebergSinkConfig {
tables: vec![],
catalog_type: IcebergSinkTypes::REST,
warehouse: "warehouse".to_string(),
uri: "http://localhost:8181".to_string(),
dynamic_routing: false,
dynamic_route_field: "".to_string(),
store_url: "http://localhost:9000".to_string(),
store_access_key_id: None,
store_secret_access_key: None,
store_region: "us-east-1".to_string(),
store_class: IcebergSinkStoreClass::S3,
};

let props_none = get_props_s3(&config).expect("Should return S3 properties");
assert!(!props_none.contains_key("s3.access-key-id"));
assert!(!props_none.contains_key("s3.secret-access-key"));

config.store_access_key_id = Some("admin".to_string());
config.store_secret_access_key = Some("password".to_string());

let props_some = get_props_s3(&config).expect("Should return S3 properties");
assert_eq!(props_some.get("s3.access-key-id").unwrap(), "admin");
assert_eq!(props_some.get("s3.secret-access-key").unwrap(), "password");
}
}
42 changes: 26 additions & 16 deletions core/connectors/sinks/iceberg_sink/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,32 @@ use tracing::{debug, error, info};
#[async_trait]
impl Sink for IcebergSink {
async fn open(&mut self) -> Result<(), Error> {
let redacted_store_key = self
.config
.store_access_key_id
.chars()
.take(3)
.collect::<String>();
let redacted_store_secret = self
.config
.store_secret_access_key
.chars()
.take(3)
.collect::<String>();
info!(
"Opened Iceberg sink connector with ID: {} for URL: {}, store access key ID: {redacted_store_key}*** store secret: {redacted_store_secret}***",
self.id, self.config.uri
);
match (
&self.config.store_access_key_id,
&self.config.store_secret_access_key,
) {
(Some(store_access_key_id), Some(store_secret_access_key)) => {
let redacted_store_key = store_access_key_id.chars().take(3).collect::<String>();
let redacted_store_secret =
store_secret_access_key.chars().take(3).collect::<String>();
info!(
"Opened Iceberg sink connector with ID: {} for URL: {}, store access key ID: {redacted_store_key}*** store secret: {redacted_store_secret}***",
self.id, self.config.uri
);
}
(None, None) => {
info!(
"Opened Iceberg sink connector with ID: {} for URL: {}. No explicit credentials provided, falling back to default credential provider chain",
self.id, self.config.uri
);
}
_ => {
error!(
"Partially configured Iceberg credentials. You must provide both store_access_key_id and store_secret_access_key, or omit both."
);
return Err(Error::InvalidConfig);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error::InvalidConfig produces a bare "Invalid config" message. Error::InvalidConfigValue(String) exists (used by influxdb connectors) and carries a description - use it so the error is self-explanatory without needing to correlate with the error!() log above.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've switched to Error::InvalidConfigValue, I think it's now much cleaner!

}
}

info!(
"Configuring Iceberg catalog with the following config:\n-region: {}\n-url: {}\n-store class: {}\n-catalog type: {}\n",
Expand Down
39 changes: 39 additions & 0 deletions core/integration/tests/connectors/fixtures/iceberg/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ pub const ENV_SINK_STORE_SECRET: &str =
pub const ENV_SINK_STORE_REGION: &str = "IGGY_CONNECTORS_SINK_ICEBERG_PLUGIN_CONFIG_STORE_REGION";
pub const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_ICEBERG_PATH";

pub const ENV_AWS_ACCESS_KEY_ID: &str = "AWS_ACCESS_KEY_ID";
pub const ENV_AWS_SECRET_ACCESS_KEY: &str = "AWS_SECRET_ACCESS_KEY";

pub struct MinioContainer {
#[allow(dead_code)]
container: ContainerAsync<GenericImage>,
Expand Down Expand Up @@ -532,3 +535,39 @@ impl TestFixture for IcebergPreCreatedFixture {
self.inner.connectors_runtime_envs()
}
}

pub struct IcebergEnvAuthFixture {
inner: IcebergPreCreatedFixture,
}

impl IcebergOps for IcebergEnvAuthFixture {
fn catalog_url(&self) -> &str {
self.inner.catalog_url()
}

fn http_client(&self) -> &HttpClient {
self.inner.http_client()
}
}

#[async_trait]
impl TestFixture for IcebergEnvAuthFixture {
async fn setup() -> Result<Self, TestBinaryError> {
let inner = IcebergPreCreatedFixture::setup().await?;
// Set credentials before test server initialization.
unsafe {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsafe { std::env::set_var } without cleanup or safety justification. the tokio runtime is already running at this point, so concurrent env reads from other threads are UB. no Drop impl means these vars leak into subsequent tests.

the fix is straightforward: add AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY to the HashMap returned by connectors_runtime_envs() instead - the harness already passes that map to the child process via Command::envs() at connectors_runtime.rs:157, eliminating the unsafe entirely.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a serious issue. I missed the concurrency risks while trying to figure out how the test environment handles variables. I'll spend more time digging into the implementation to avoid similar mistakes in the future. I've fixed it and thanks for the heads-up!

std::env::set_var(ENV_AWS_ACCESS_KEY_ID, MINIO_ACCESS_KEY);
std::env::set_var(ENV_AWS_SECRET_ACCESS_KEY, MINIO_SECRET_KEY);
}
Ok(Self { inner })
}

fn connectors_runtime_envs(&self) -> HashMap<String, String> {
let mut envs = self.inner.connectors_runtime_envs();
// Remove the explicit credentials injected by the underlying fixture.
// This forces the Iceberg Sink to use the default credential provider chain instead of explicit config.
envs.remove(ENV_SINK_STORE_ACCESS_KEY);
envs.remove(ENV_SINK_STORE_SECRET);
envs
}
}
4 changes: 3 additions & 1 deletion core/integration/tests/connectors/fixtures/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@

mod container;

pub use container::{DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, IcebergPreCreatedFixture};
pub use container::{
DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergEnvAuthFixture, IcebergOps, IcebergPreCreatedFixture,
};
4 changes: 3 additions & 1 deletion core/integration/tests/connectors/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ pub use http::{
HttpSinkIndividualFixture, HttpSinkJsonArrayFixture, HttpSinkMultiTopicFixture,
HttpSinkNdjsonFixture, HttpSinkNoMetadataFixture, HttpSinkRawFixture,
};
pub use iceberg::{DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, IcebergPreCreatedFixture};
pub use iceberg::{
DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergEnvAuthFixture, IcebergOps, IcebergPreCreatedFixture,
};
pub use influxdb::{
InfluxDbSinkBase64Fixture, InfluxDbSinkFixture, InfluxDbSinkNoMetadataFixture,
InfluxDbSinkNsPrecisionFixture, InfluxDbSinkTextFixture, InfluxDbSourceFixture,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

type = "sink"
key = "iceberg"
enabled = true
version = 0
name = "Iceberg sink"
path = "../../target/release/libiggy_connector_iceberg_sink"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

path points to target/release but ENV_SINK_PATH always overrides to debug at runtime. not broken, but misleading. this is the same pattern as the original config.toml so not introduced by this PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the path to debug in this config to match the actual test environment and avoid confusion.

verbose = true

[[streams]]
stream = "test_stream"
topics = ["test_topic"]
schema = "json"
batch_length = 100
poll_interval = "5ms"
consumer_group = "iceberg_sink_connector"

# Notice: This configuration deliberately omits 'store_access_key_id' and 'store_secret_access_key'.
# It is used exclusively by integration tests to test the default credential provider chain fallback behavior.
[plugin_config]
tables = ["test.messages"]
catalog_type = "rest"
warehouse = "warehouse"
uri = "http://localhost:8181"
dynamic_routing = false
dynamic_route_field = ""
store_url = "http://localhost:9000"
store_region = "us-east-1"
store_class = "s3"
49 changes: 48 additions & 1 deletion core/integration/tests/connectors/iceberg/iceberg_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use crate::connectors::create_test_messages;
use crate::connectors::fixtures::{
DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, IcebergPreCreatedFixture,
DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergEnvAuthFixture, IcebergOps, IcebergPreCreatedFixture,
};
use bytes::Bytes;
use iggy::prelude::{IggyMessage, Partitioning};
Expand Down Expand Up @@ -207,3 +207,50 @@ async fn iceberg_sink_handles_bulk_messages(
assert_eq!(sinks.len(), 1);
assert!(sinks[0].last_error.is_none());
}

#[iggy_harness(
server(connectors_runtime(config_path = "tests/connectors/iceberg/sink_default_credentials.toml")),
seed = seeds::connector_stream
)]
async fn iceberg_sink_uses_default_credential_chain(
harness: &TestHarness,
fixture: IcebergEnvAuthFixture,
) {
let client = harness.root_client().await.unwrap();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bare .unwrap() calls here and at lines 220, 221, 227, 232 - the existing tests in this file consistently use .expect("...") with context messages.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. I replaced all unwrap() calls with expect() to provide better error context.

let stream_id: iggy_common::Identifier = seeds::names::STREAM.try_into().unwrap();
let topic_id: iggy_common::Identifier = seeds::names::TOPIC.try_into().unwrap();
let test_messages = crate::connectors::create_test_messages(5);
let mut messages: Vec<IggyMessage> = test_messages
.iter()
.enumerate()
.map(|(i, msg)| {
let payload = serde_json::to_vec(msg).unwrap();
IggyMessage::builder()
.id((i + 1) as u128)
.payload(bytes::Bytes::from(payload))
.build()
.unwrap()
})
.collect();
client
.send_messages(
&stream_id,
&topic_id,
&Partitioning::partition_id(0),
&mut messages,
)
.await
.expect("Failed to send fake messages");
let snapshot_count = fixture
.wait_for_snapshots(
DEFAULT_NAMESPACE,
DEFAULT_TABLE,
1,
SNAPSHOT_POLL_ATTEMPTS,
SNAPSHOT_POLL_INTERVAL_MS,
)
.await
.expect("Data should be written to Iceberg table");
assert!(snapshot_count >= 1);
drop(fixture);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[connectors]
config_type = "local"
config_dir = "tests/connectors/iceberg/default_credentials_config"
Loading