feat(kafka): add SASL OAUTHBEARER method=default RFC 6749 token fetch callback#25343
feat(kafka): add SASL OAUTHBEARER method=default RFC 6749 token fetch callback#25343dvilaverde wants to merge 5 commits intovectordotdev:masterfrom
Conversation
…6749 Implements a generate_oauth_token callback on KafkaStatisticsContext that fires when sasl.oauthbearer.token.endpoint.url is set and sasl.oauthbearer.method is absent or "default". Posts to the token endpoint per RFC 6749 §4.4, parses the response per §5.1, and returns an OAuthToken to librdkafka. Extra name=value pairs in sasl.oauthbearer.config are merged into the POST body, allowing grant_type overrides (e.g. authorization_code with a PAC). extension_* pairs are silently dropped (SASL broker extensions, not HTTP params). When method=oidc, librdkafka's built-in handler runs unchanged. Sink healthcheck switches from BaseProducer to FutureProducer so the background thread processes OAUTHBEARER refresh events during fetch_metadata.
|
Thank you for your contribution! Before we can merge this PR, please sign our Contributor License Agreement. To sign, copy and post the phrase below as a new comment on this PR.
I have read the CLA Document and I hereby sign the CLA 1 out of 2 committers have signed the CLA. |
|
I have read the CLA Document and I hereby sign the CLA |
|
recheck |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 37ae874790
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| } | ||
|
|
||
| impl ClientContext for KafkaStatisticsContext { | ||
| const ENABLE_REFRESH_OAUTH_TOKEN: bool = true; |
There was a problem hiding this comment.
Gate OAuth token callback to default method only
Setting ENABLE_REFRESH_OAUTH_TOKEN to true unconditionally forces librdkafka to call Vector’s generate_oauth_token for every OAUTHBEARER client, which bypasses librdkafka’s built-in handlers. In this commit, extract_oauthbearer_config returns None for sasl.oauthbearer.method=oidc (and when no token endpoint is configured), so generate_oauth_token immediately errors on self.oauthbearer.as_ref().ok_or(...); this turns previously working OIDC/default flows into authentication failures instead of preserving existing behavior.
Useful? React with 👍 / 👎.
…ectordotdev#25057) * feat(sinks): introduce a configurable retry strategy for http * feat(sinks): wire the RetryStrategy with the HttpRetryLogic * feat(sinks): wire RetryStrategy with all HttpStatusRetryLogic and all affected sinks * docs(sinks): http example config with custom retry strategy * feat(http sink): add integration test for custom retry strategy * docs(sinks): add changelog fragment * fix(http sink): update is_retriable_error logic to respect retry strategy * fix(http sink): add handling for NOT_IMPLEMENTED status in retry strategy * chore(sinks): format code * feat(sinks): implement timeout retry logic and add tests for non-retriable timeouts * docs(sinks): clarify that success status codes are not retried on RetryAll strategy * feat(sinks): improve typesafety of RetryStrategy StatusCode * docs(sinks http): enhance transport error classification for HTTP sinks with shared retry logic * feat(sinks http): add debug logging for unsuccessful HTTP responses in retry logic * refactor(http sink): avoid heap allocation on reasoning happy path * fix(http sink): format example config * chore: add custom attributes for numeric type in StatusCode metadata * docs(sinks http): update docs for http sinks supporting retry_strategy --------- Co-authored-by: Pavlos Rontidis <pavlos.rontidis@gmail.com>
…v#25339) The github.rest.issues.removeLabel call (DELETE /repos/{owner}/{repo}/issues/{issue_number}/labels/{name}) requires the issues:write permission per the GitHub REST API. The workflow only declared pull-requests:write, so the call returned 403 the first time all preconditions actually held simultaneously (MEMBER reviewer + label present + approval). Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…event lost wakeup (vectordotdev#25195) * fix(windows_event_log): fix pre-drain ResetEvent race and add lost-wakeup regression tests The Windows Event Log service signals the pull-mode wait handle via SetEvent each time a new matching event is recorded. Because the handle is manual-reset, any SetEvent that fires between the last EvtNext call and the post-drain ResetEvent is silently lost — the subscription then hangs until the next OS event arrives (vectordotdev#25194). Fix: reset the handle *before* entering the drain loop. Signals raised during the drain are preserved because SetEvent on an already-signaled handle is a no-op. Re-arm (SetEvent) on early exits so the next pull_events revisits the channel without waiting for a fresh OS notification: - budget exhaustion - bookmark failure mid-batch - transient EvtNext error Regression tests: - test_pull_events_preserves_setevent_during_drain: installs DRAIN_STEP_HOOK to fire SetEvent mid-drain and asserts wait_for_events_blocking returns EventsAvailable, not Timeout. - test_speculative_pull_recovers_without_signal: manually clears the channel signal via ResetEvent, confirms wait times out, then asserts pull_events still returns events — proving the speculative timeout pull in mod.rs self-heals independently of signal state. Also: comment re-subscription break paths (ERROR_EVT_QUERY_RESULT_STALE and INVALID_POSITION) noting the speculative pull as a safety net if the re-subbed channel does not immediately re-signal; add serialization note to DRAIN_STEP_HOOK. * fix(windows_event_log): add speculative timeout pull, deduplicate processing, fix error handling Four related improvements to mod.rs: 1. Speculative pull on WaitResult::Timeout: call pull_events on every timeout cycle as a belt-and-suspenders self-heal. EvtNext returns ERROR_NO_MORE_ITEMS immediately on an empty channel (near-zero cost). If events are recovered a warning is emitted. Guarantees recovery within one timeout period regardless of the root cause of the lost wakeup. 2. Extract with_subscription_blocking helper: wraps the spawn_blocking ownership-transfer pattern (move subscription in, run blocking fn, return subscription + result). All three blocking calls (wait, normal pull, speculative pull) now use this helper instead of inlining spawn_blocking. 3. Extract process_event_batch helper: the parse/emit/send_batch/finalize sequence was duplicated verbatim between the EventsAvailable arm and the speculative-timeout arm. Extracted into a shared free async function. Rate limiting is applied consistently in both paths via the helper. 4. Fix error-handling asymmetry: the speculative pull Err branch previously only logged warn! and continued, so a non-recoverable error (access denied, channel not found) would spam warnings indefinitely. Now mirrors the EventsAvailable path: emit WindowsEventLogQueryError, break on non-recoverable errors, apply exponential backoff on recoverable ones. * test(windows_event_log source): harden lost-wakeup regression tests against flakiness Address two independent flakiness sources in the vectordotdev#25194 regression tests so the suite is stable on real Windows CI runners. test_pull_events_preserves_setevent_during_drain: - Replaced a 1000ms blocking wait with an immediate 0ms poll after pull_events returns, so the check measures only the reset/preserve behavior of pull_events and is not contaminated by unrelated Windows system events signaling the handle during a nonzero wait window. - Keyed the DRAIN_STEP_HOOK fire-once to the subscription's own signal handle so concurrent pull_events calls from other tests can't flip the hook first and SetEvent the wrong handle. test_speculative_pull_recovers_without_signal: - Same 500ms→0ms poll change, opposite direction: real events arriving during the wait would have re-signaled the manually-cleared handle and flipped the expected Timeout into a real signal result. - Seed a deterministic record via 'eventcreate' before subscription creation so the non-empty-events assertion is independent of whatever backlog the runner happens to have. Freshly provisioned images can have an empty Application log, which would otherwise cause pull_events(100) to legitimately return empty and false-fail the test. * fix(windows_event_log): implement Sync for subscription types to satisfy Send bound on source future EventLogSubscription and ChannelSubscription had unsafe impl Send but no Sync impl. Since &T: Send requires T: Sync, process_event_batch holding &EventLogSubscription across an .await made the entire source future !Send, causing a compile error (ICE + future-not-Send) in release builds. All mutation on these types requires &mut self; &self methods are read-only or delegate to already-Sync types (RateLimiter). The underlying Windows kernel handles are safe for concurrent access. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * chore(changelog): simplify windows_event_log fix fragment to user-focused one-liner Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(windows_event_log): prioritize shutdown signal * fix(windows_event_log): lighten speculative timeout pulls --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: Pavlos Rontidis <pavlos.rontidis@gmail.com>
Summary
Adds sasl.oauthbearer.method=default support to the Kafka sink and source. Today the only supported external token path is method=oidc, which requires librdkafka's built-in OIDC handler to validate the access token as a JWT with a standard exp claim. That handler fails with:
Failed to acquire SASL OAUTHBEARER token: Expected JSON JWT response with "exp" field
This makes method=oidc unusable with:
RFC 6749 §5.1 designates expires_in in the HTTP response as the authoritative token lifetime — the JWT exp claim is not guaranteed by the OAuth2 spec.
How it works: Vector activates its own RFC 6749 token fetch callback (generate_oauth_token on ClientContext) when sasl.oauthbearer.token.endpoint.url is present in librdkafka_options and sasl.oauthbearer.method is absent or "default". When method=oidc, librdkafka's built-in OIDC handler runs unchanged. No new Vector sasl.* config keys are introduced — everything goes through existing librdkafka_options passthrough.
Extra name=value pairs in sasl.oauthbearer.config are merged into the POST body, allowing grant type overrides and additional parameters. extension_* pairs are silently dropped (SASL broker extensions per RFC 7628, not HTTP params).
Vector configuration
Standard client credentials (opaque token provider):
Custom grant type with Permanent Authorization Code:
Existing method=oidc behavior is completely unchanged.
How did you test this PR?
cargo test --no-default-features --features "sources-kafka,sinks-kafka" -p vector kafka-- kafka unit testsChange Type
Is this a breaking change?
Does this PR include user facing changes?
References