fix(connectors): support default credential provider chain for iceberg sink#3045
fix(connectors): support default credential provider chain for iceberg sink#3045gomnitrix wants to merge 7 commits intoapache:masterfrom
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #3045 +/- ##
============================================
+ Coverage 70.52% 70.54% +0.01%
Complexity 943 943
============================================
Files 1115 1115
Lines 95388 95445 +57
Branches 72589 72646 +57
============================================
+ Hits 67275 67330 +55
+ Misses 25634 25633 -1
- Partials 2479 2482 +3
🚀 New features to boost your workflow:
|
1ae3328 to
b324786
Compare
|
@EdgarModesto23 would you mind checking this? |
|
@hubcio sure! |
| "s3.secret-access-key".to_string(), | ||
| config.store_secret_access_key.clone(), | ||
| ); | ||
| if let Some(access_key_id) = &config.store_access_key_id { |
There was a problem hiding this comment.
get_props_s3() handles each credential independently with separate if let Some(...) blocks. the partial-credential invariant (both-or-neither) is only enforced in sink.rs::open(), but init_props is pub - a future caller could bypass validation and get a silently half-configured props map. consider making init_props pub(crate) or adding both-or-neither validation here as defense-in-depth.
There was a problem hiding this comment.
I've added the both-or-neither validation to get_props_s3. Thanks for pointing out!
| async fn setup() -> Result<Self, TestBinaryError> { | ||
| let inner = IcebergPreCreatedFixture::setup().await?; | ||
| // Set credentials before test server initialization. | ||
| unsafe { |
There was a problem hiding this comment.
unsafe { std::env::set_var } without cleanup or safety justification. the tokio runtime is already running at this point, so concurrent env reads from other threads are UB. no Drop impl means these vars leak into subsequent tests.
the fix is straightforward: add AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY to the HashMap returned by connectors_runtime_envs() instead - the harness already passes that map to the child process via Command::envs() at connectors_runtime.rs:157, eliminating the unsafe entirely.
There was a problem hiding this comment.
Yes, this is a serious issue. I missed the concurrency risks while trying to figure out how the test environment handles variables. I'll spend more time digging into the implementation to avoid similar mistakes in the future. I've fixed it and thanks for the heads-up!
| error!( | ||
| "Partially configured Iceberg credentials. You must provide both store_access_key_id and store_secret_access_key, or omit both." | ||
| ); | ||
| return Err(Error::InvalidConfig); |
There was a problem hiding this comment.
Error::InvalidConfig produces a bare "Invalid config" message. Error::InvalidConfigValue(String) exists (used by influxdb connectors) and carries a description - use it so the error is self-explanatory without needing to correlate with the error!() log above.
There was a problem hiding this comment.
I've switched to Error::InvalidConfigValue, I think it's now much cleaner!
| enabled = true | ||
| version = 0 | ||
| name = "Iceberg sink" | ||
| path = "../../target/release/libiggy_connector_iceberg_sink" |
There was a problem hiding this comment.
path points to target/release but ENV_SINK_PATH always overrides to debug at runtime. not broken, but misleading. this is the same pattern as the original config.toml so not introduced by this PR.
There was a problem hiding this comment.
I've updated the path to debug in this config to match the actual test environment and avoid confusion.
| use crate::{IcebergSinkConfig, IcebergSinkStoreClass, IcebergSinkTypes}; | ||
|
|
||
| #[test] | ||
| fn test_get_props_s3() { |
There was a problem hiding this comment.
test covers both-None and both-Some but not the mixed case (one Some, one None). if validation moves into get_props_s3 per the above comment, this test should verify the error path too.
There was a problem hiding this comment.
Updated the unit tests to cover the partial credential scenarios. Since adding the new cases made the test a bit long and bloated, I also split it up. Thanks!
| harness: &TestHarness, | ||
| fixture: IcebergEnvAuthFixture, | ||
| ) { | ||
| let client = harness.root_client().await.unwrap(); |
There was a problem hiding this comment.
bare .unwrap() calls here and at lines 220, 221, 227, 232 - the existing tests in this file consistently use .expect("...") with context messages.
There was a problem hiding this comment.
Fixed. I replaced all unwrap() calls with expect() to provide better error context.
Which issue does this PR close?
Closes #2911
Rationale
Production environments prefer the default credential provider chain over static keys, which the Iceberg Sink connector currently lacks support for.
What changed?
The Iceberg Sink connector previously failed to initialize if
store_access_key_idandstore_secret_access_keywere missing from the config, blocking the use of standard identity-based auth. This fix makes these config fields optional, allowing the underlying OpenDAL seamlessly fall back to the default credential provider chain. An isolated integration test was also added to verify this fallback behavior using environment variables.Local Execution
E2E Validation Results
The fallback mechanism has been verified through:
1. Automated Integration Test
Added a new test fixture:
IcebergEnvAuthFixtureand a corresponding integration test case:iceberg_sink_uses_default_credential_chain. It simulates the environment by omitting config credentials, injectingAWS_environment variables, and validating whether the test data successfully flushes to MinIO using the fallback chain.2. Manual Local Verification
I also successfully verified this end-to-end. Providing credentials exclusively via the environment triggered the fallback correctly and successfully routed Iggy messages to the catalog:
Click to view the local verification logs
AI Usage
iceberg_sink_uses_default_credential_chainintegration test.