Skip to content

feat(kafka sink): add SASL OAUTHBEARER method=default RFC 6749 token fetch…#25344

Open
dvilaverde wants to merge 4 commits intovectordotdev:masterfrom
dvilaverde:master
Open

feat(kafka sink): add SASL OAUTHBEARER method=default RFC 6749 token fetch…#25344
dvilaverde wants to merge 4 commits intovectordotdev:masterfrom
dvilaverde:master

Conversation

@dvilaverde
Copy link
Copy Markdown

Summary

Adds sasl.oauthbearer.method=default support to the Kafka sink and source. Today the only supported external token path is method=oidc, which requires librdkafka's built-in OIDC handler to validate the access token as a JWT with a standard exp claim. That handler fails with:

Failed to acquire SASL OAUTHBEARER token: Expected JSON JWT response with "exp" field

This makes method=oidc unusable with:

  • Providers returning opaque (non-JWT) access tokens
  • Providers returning JWTs with non-standard expiry fields
  • Custom grant types such as authorization_code with a Permanent Authorization Code (PAC)

RFC 6749 §5.1 designates expires_in in the HTTP response as the authoritative token lifetime — the JWT exp claim is not guaranteed by the OAuth2 spec.

How it works: Vector activates its own RFC 6749 token fetch callback (generate_oauth_token on ClientContext) when sasl.oauthbearer.token.endpoint.url is present in librdkafka_options and sasl.oauthbearer.method is absent or "default". When method=oidc, librdkafka's built-in OIDC handler runs unchanged. No new Vector sasl.* config keys are introduced — everything goes through existing librdkafka_options passthrough.

Extra name=value pairs in sasl.oauthbearer.config are merged into the POST body, allowing grant type overrides and additional parameters. extension_* pairs are silently dropped (SASL broker extensions per RFC 7628, not HTTP params).

Vector configuration

Standard client credentials (opaque token provider):

  librdkafka_options:
    "security.protocol": "SASL_SSL"
    "sasl.mechanism": "OAUTHBEARER"
    "sasl.oauthbearer.token.endpoint.url": "https://auth.example.com/token"
    "sasl.oauthbearer.client.id": "${CLIENT_ID}"
    "sasl.oauthbearer.client.secret": "${CLIENT_SECRET}"

Custom grant type with Permanent Authorization Code:

  librdkafka_options:
    "security.protocol": "SASL_SSL"
    "sasl.mechanism": "OAUTHBEARER"
    "sasl.oauthbearer.token.endpoint.url": "${TOKEN_URL}"
    "sasl.oauthbearer.client.id": "${CLIENT_ID}"
    "sasl.oauthbearer.client.secret": "${CLIENT_SECRET}"
    "sasl.oauthbearer.config": "grant_type=authorization_code code=${PAC}"

Existing method=oidc behavior is completely unchanged.

How did you test this PR?

  • 13 unit tests covering extract_oauthbearer_config option combinations, sasl.oauthbearer.config parsing (principal, extension_*, extra params), and missing/malformed inputs
  • 4 wiremock integration tests against a live HTTP mock server: standard client credentials flow, authorization_code grant type override, missing expires_in defaulting to 3600s, and unconfigured oauthbearer returning Err
  • cargo test --no-default-features --features "sources-kafka,sinks-kafka" -p vector kafka -- kafka unit tests
  • make fmt && make check-fmt — clean
  • make check-clippy — clean

Change Type

  • Bug fix
  • New feature
  • Dependencies
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the no-changelog label to this PR.

References

@dvilaverde dvilaverde requested a review from a team as a code owner May 1, 2026 16:17
@github-actions github-actions Bot added domain: sources Anything related to the Vector's sources domain: sinks Anything related to the Vector's sinks labels May 1, 2026
@dvilaverde dvilaverde changed the title feat(kafka): add SASL OAUTHBEARER method=default RFC 6749 token fetch… feat(kafka sink): add SASL OAUTHBEARER method=default RFC 6749 token fetch… May 1, 2026
@github-actions github-actions Bot added the domain: ci Anything related to Vector's CI environment label May 1, 2026
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: d565f2dc58

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/kafka.rs
Comment on lines +67 to +68
if options.get("sasl.oauthbearer.method").map(String::as_str) == Some("oidc") {
return None;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve built-in OIDC refresh path when method=oidc

extract_oauthbearer_config returns None for sasl.oauthbearer.method=oidc, but this commit also enables ENABLE_REFRESH_OAUTH_TOKEN on the client contexts, which causes rust-rdkafka to use the app refresh callback for OAUTHBEARER. In that OIDC configuration, generate_oauth_token then runs with no extracted config and returns an error instead of letting librdkafka's internal OIDC handler fetch tokens, so existing method=oidc source/sink setups can fail authentication after this change.

Useful? React with 👍 / 👎.

Comment thread src/kafka.rs
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 8223b1a6ed

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/kafka.rs
}

impl ClientContext for KafkaStatisticsContext {
const ENABLE_REFRESH_OAUTH_TOKEN: bool = true;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Keep default OAUTHBEARER flow when endpoint URL is absent

Enabling ClientContext::ENABLE_REFRESH_OAUTH_TOKEN unconditionally forces all OAUTHBEARER clients through generate_oauth_token, but this callback immediately errors if self.oauthbearer is None. That breaks configurations that intentionally do not set sasl.oauthbearer.token.endpoint.url (for example, librdkafka's built-in unsecured JWT path using enable.sasl.oauthbearer.unsecure.jwt=true and sasl.oauthbearer.config), so previously valid pass-through OAUTHBEARER setups now fail authentication.

Useful? React with 👍 / 👎.

Comment thread src/kafka.rs Outdated
let resp: serde_json::Value = match tokio::runtime::Handle::try_current() {
Ok(handle) => tokio::task::block_in_place(|| {
handle.block_on(async move {
reqwest::Client::new()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Honor librdkafka HTTPS CA settings in token fetch requests

The new token refresh code creates a plain reqwest::Client::new() and does not apply any HTTPS trust settings from librdkafka_options (such as https.ca.location / https.ca.pem). In deployments where the OAuth endpoint is signed by a private CA and those librdkafka settings are required, method=default refresh will fail TLS validation and prevent Kafka authentication.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 1b1200a8b3

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/kafka.rs Outdated
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 395d3ac334

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/kafka.rs
Comment on lines +336 to +337
let cert = reqwest::Certificate::from_pem(pem.as_bytes())?;
builder = builder.add_root_certificate(cert);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Parse PEM bundles instead of a single certificate

generate_oauth_token builds the HTTPS client with reqwest::Certificate::from_pem, which loads one certificate object, but https.ca.pem/https.ca.location are often configured as CA bundles containing multiple certs. In that common deployment, only one cert is trusted (or parsing can fail depending on TLS backend), so token endpoint TLS validation can fail even though the configured bundle is valid. This should load the full bundle (all certs) before calling add_root_certificate.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

domain: ci Anything related to Vector's CI environment domain: sinks Anything related to the Vector's sinks domain: sources Anything related to the Vector's sources

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat(kafka sink/source): support SASL OAUTHBEARER with method=default and external token endpoint URL

1 participant