diff --git a/.github/workflows/remove_wip_label.yml b/.github/workflows/remove_wip_label.yml index 7c30ee892a949..b4750b0f2b0d6 100644 --- a/.github/workflows/remove_wip_label.yml +++ b/.github/workflows/remove_wip_label.yml @@ -1,6 +1,7 @@ name: Remove Work In Progress Label permissions: + issues: write pull-requests: write on: diff --git a/benches/http.rs b/benches/http.rs index 30b35b8b257f7..5c8b5eaac7efe 100644 --- a/benches/http.rs +++ b/benches/http.rs @@ -63,6 +63,7 @@ fn benchmark_http(c: &mut Criterion) { request: Default::default(), tls: Default::default(), acknowledgements: Default::default(), + retry_strategy: Default::default(), }, ); diff --git a/changelog.d/10870_http_retry_strategy.feature.md b/changelog.d/10870_http_retry_strategy.feature.md new file mode 100644 index 0000000000000..09afba2eddd40 --- /dev/null +++ b/changelog.d/10870_http_retry_strategy.feature.md @@ -0,0 +1,7 @@ +HTTP-based sinks that use the shared retry helpers now support a `retry_strategy` configuration +option to control which HTTP response codes are retried. The `http` sink also includes a new +example showing how to retry only specific transient status codes. + +Issue: https://github.com/vectordotdev/vector/issues/10870 + +authors: ndrsg diff --git a/changelog.d/10870_http_retry_transport_errors.enhancement.md b/changelog.d/10870_http_retry_transport_errors.enhancement.md new file mode 100644 index 0000000000000..f06f63b2f0e07 --- /dev/null +++ b/changelog.d/10870_http_retry_transport_errors.enhancement.md @@ -0,0 +1,9 @@ +HTTP-based sinks using the shared retry logic now classify transport-layer failures with +`HttpError::is_retriable`: connection and TLS connector issues may be retried, while failures +such as invalid HTTP request construction or an invalid proxy URI are not. Setting +`retry_strategy` to `none` disables retries for these transport errors and for request +timeouts, in addition to status-code-based retries. + +Issue: https://github.com/vectordotdev/vector/issues/10870 + +authors: ndrsg diff --git a/changelog.d/25194_windows_event_log_lost_wakeup.fix.md b/changelog.d/25194_windows_event_log_lost_wakeup.fix.md new file mode 100644 index 0000000000000..cb6f67987d67c --- /dev/null +++ b/changelog.d/25194_windows_event_log_lost_wakeup.fix.md @@ -0,0 +1,3 @@ +The `windows_event_log` source no longer freezes after periods of inactivity. + +authors: tot19 diff --git a/changelog.d/25328_kafka_oauthbearer_default.feature.md b/changelog.d/25328_kafka_oauthbearer_default.feature.md new file mode 100644 index 0000000000000..ad84b0754b1a9 --- /dev/null +++ b/changelog.d/25328_kafka_oauthbearer_default.feature.md @@ -0,0 +1,9 @@ +Added support for `sasl.oauthbearer.method=default` in Kafka sink and source. When +`sasl.oauthbearer.token.endpoint.url` is set in `librdkafka_options` and the method is +not `oidc`, Vector now implements the OAUTHBEARER token refresh callback, POSTing to the +configured endpoint per RFC 6749 §4.4 and reading `access_token` and `expires_in` from +the response. This enables OAuth2 providers that return opaque tokens, non-standard JWT +expiry fields, or custom grant types (e.g. `authorization_code`) that are incompatible +with `method=oidc`. + +authors: dvilaverde diff --git a/config/examples/http_sink_custom_retry.yaml b/config/examples/http_sink_custom_retry.yaml new file mode 100644 index 0000000000000..fd973c890e991 --- /dev/null +++ b/config/examples/http_sink_custom_retry.yaml @@ -0,0 +1,42 @@ +# HTTP sink example with a custom retry strategy +# ---------------------------------------------------- +# Sends demo logs to an HTTP endpoint and only retries the response codes +# that the upstream API documents as transient. + +data_dir: "/var/lib/vector" + +sources: + demo_logs: + type: "demo_logs" + format: "json" + interval: 1 + +sinks: + http_out: + type: "http" + inputs: ["demo_logs"] + uri: "https://example.com/ingest" + method: "post" + + # Skip the startup probe so the example can be adapted locally. + healthcheck: + enabled: false + + # Send newline-delimited JSON in the request body. + framing: + method: "newline_delimited" + encoding: + codec: "json" + + # Control how many retries are made and how quickly backoff grows. + request: + timeout_secs: 60 + retry_attempts: 8 + retry_initial_backoff_secs: 2 + retry_max_duration_secs: 30 + + # Retry only on the exact HTTP status codes that this destination + # treats as temporary failures. + retry_strategy: + type: "custom" + status_codes: [408, 425, 429, 503] diff --git a/lib/vector-config/src/http.rs b/lib/vector-config/src/http.rs index 94029610f0441..da51ea1cc156e 100644 --- a/lib/vector-config/src/http.rs +++ b/lib/vector-config/src/http.rs @@ -2,9 +2,11 @@ use std::cell::RefCell; use http::StatusCode; use serde_json::Value; +use vector_config_common::{attributes::CustomAttribute, constants}; use crate::{ Configurable, GenerateError, Metadata, ToValue, + num::NumberClass, schema::{SchemaGenerator, SchemaObject, generate_number_schema}, }; @@ -27,6 +29,10 @@ impl Configurable for StatusCode { let mut metadata = Metadata::default(); metadata.set_description("HTTP response status code"); metadata.set_default_value(StatusCode::OK); + metadata.add_custom_attribute(CustomAttribute::kv( + constants::DOCS_META_NUMERIC_TYPE, + NumberClass::Unsigned, + )); metadata } diff --git a/src/kafka.rs b/src/kafka.rs index 0c9315288e313..8465a0640a46c 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -1,9 +1,12 @@ #![allow(missing_docs)] +use std::collections::HashMap; use std::path::{Path, PathBuf}; -use rdkafka::{ClientConfig, ClientContext, Statistics, consumer::ConsumerContext}; +use rdkafka::{ + ClientConfig, ClientContext, Statistics, client::OAuthToken, consumer::ConsumerContext, +}; use snafu::Snafu; -use tracing::Span; +use tracing::{Span, warn}; use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString}; use crate::{ @@ -17,6 +20,77 @@ enum KafkaError { InvalidPath { path: PathBuf }, } +#[derive(Clone, Debug)] +pub(crate) struct KafkaOAuthBearerConfig { + pub(crate) token_url: String, + pub(crate) client_id: Option, + pub(crate) client_secret: Option, + pub(crate) scope: Option, + pub(crate) principal_name: String, + pub(crate) extra_params: Vec<(String, String)>, +} + +/// Parses space-separated `name=value` pairs from `sasl.oauthbearer.config`. +/// Returns (principal_name, extra_http_params). +/// - `principal=` → principal_name +/// - `extension_=` → silently ignored (SASL broker extensions not applicable to HTTP) +/// - anything else → extra HTTP POST body params (can override grant_type, add code=, etc.) +fn parse_oauthbearer_config(config: Option<&str>) -> (String, Vec<(String, String)>) { + let mut principal_name = String::new(); + let mut extra_params: Vec<(String, String)> = Vec::new(); + + if let Some(s) = config { + for pair in s.split_whitespace() { + if let Some((key, value)) = pair.split_once('=') { + if key == "principal" { + principal_name = value.to_owned(); + } else if key.starts_with("extension_") { + // SASL broker extensions — not sent to HTTP token endpoint; skip. + } else { + extra_params.push((key.to_owned(), value.to_owned())); + } + } + } + } + + (principal_name, extra_params) +} + +/// Reads OAUTHBEARER-relevant keys from `librdkafka_options` and returns a +/// `KafkaOAuthBearerConfig` if Vector should activate its token-fetch callback. +/// +/// Returns `None` when `sasl.oauthbearer.token.endpoint.url` is absent, +/// or when `sasl.oauthbearer.method` is `"oidc"` (librdkafka handles it natively). +pub(crate) fn extract_oauthbearer_config( + options: &HashMap, +) -> Option { + if options.get("sasl.oauthbearer.method").map(String::as_str) == Some("oidc") { + return None; + } + + let token_url = options.get("sasl.oauthbearer.token.endpoint.url")?.clone(); + + let client_id = options.get("sasl.oauthbearer.client.id").cloned(); + let client_secret = options.get("sasl.oauthbearer.client.secret").cloned(); + let scope = options.get("sasl.oauthbearer.scope").cloned(); + + let (mut principal_name, extra_params) = + parse_oauthbearer_config(options.get("sasl.oauthbearer.config").map(String::as_str)); + + if principal_name.is_empty() { + principal_name = client_id.as_deref().unwrap_or_default().to_owned(); + } + + Some(KafkaOAuthBearerConfig { + token_url, + client_id, + client_secret, + scope, + principal_name, + extra_params, + }) +} + /// Supported compression types for Kafka. #[configurable_component] #[derive(Clone, Copy, Debug, Default)] @@ -169,9 +243,12 @@ fn pathbuf_to_string(path: &Path) -> crate::Result<&str> { pub(crate) struct KafkaStatisticsContext { pub(crate) expose_lag_metrics: bool, pub span: Span, + pub(crate) oauthbearer: Option, } impl ClientContext for KafkaStatisticsContext { + const ENABLE_REFRESH_OAUTH_TOKEN: bool = true; + fn stats(&self, statistics: Statistics) { // This callback get executed on a separate thread within the rdkafka library, so we need // to propagate the span here to attach the component tags to the emitted events. @@ -181,6 +258,370 @@ impl ClientContext for KafkaStatisticsContext { expose_lag_metrics: self.expose_lag_metrics, }); } + + fn generate_oauth_token( + &self, + _oauthbearer_config: Option<&str>, + ) -> Result> { + let config = self.oauthbearer.as_ref().ok_or( + "sasl.oauthbearer.token.endpoint.url not configured; \ + set it in librdkafka_options to use method=default token fetch", + )?; + + // Build form params: start with default grant_type, then merge extra_params which can override. + let mut params: Vec<(String, String)> = + vec![("grant_type".to_owned(), "client_credentials".to_owned())]; + + if let Some(id) = &config.client_id { + params.push(("client_id".to_owned(), id.clone())); + } + if let Some(secret) = &config.client_secret { + params.push(("client_secret".to_owned(), secret.clone())); + } + if let Some(scope) = &config.scope { + params.push(("scope".to_owned(), scope.clone())); + } + + // extra_params can override defaults (e.g. grant_type=authorization_code). + for (key, value) in &config.extra_params { + params.retain(|(k, _)| k != key); + params.push((key.clone(), value.clone())); + } + + let url = config.token_url.clone(); + + let resp: serde_json::Value = match tokio::runtime::Handle::try_current() { + Ok(handle) => tokio::task::block_in_place(|| { + handle.block_on(async move { + reqwest::Client::new() + .post(&url) + .form(¶ms) + .send() + .await? + .json::() + .await + }) + })?, + Err(_) => tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()? + .block_on(async move { + reqwest::Client::new() + .post(&url) + .form(¶ms) + .send() + .await? + .json::() + .await + })?, + }; + + let token = resp["access_token"] + .as_str() + .ok_or("missing access_token in token endpoint response")? + .to_owned(); + + let expires_in = match resp["expires_in"].as_u64() { + Some(v) => v, + None => { + warn!(message = "Expires_in missing from OAUTHBEARER token response, defaulting to 3600s."); + 3600 + } + }; + + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_millis() as i64; + + Ok(OAuthToken { + token, + principal_name: config.principal_name.clone(), + lifetime_ms: now_ms + (expires_in as i64 * 1000), + }) + } } impl ConsumerContext for KafkaStatisticsContext {} + +#[cfg(test)] +mod tests { + use wiremock::{ + Mock, MockServer, ResponseTemplate, + matchers::{body_string_contains, method, path}, + }; + + use super::*; + + fn opts(pairs: &[(&str, &str)]) -> HashMap { + pairs + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect() + } + + #[test] + fn extract_returns_none_when_url_absent() { + let o = opts(&[("sasl.oauthbearer.method", "default")]); + assert!(extract_oauthbearer_config(&o).is_none()); + } + + #[test] + fn extract_returns_none_when_method_oidc() { + let o = opts(&[ + ("sasl.oauthbearer.method", "oidc"), + ( + "sasl.oauthbearer.token.endpoint.url", + "https://example.com/token", + ), + ]); + assert!(extract_oauthbearer_config(&o).is_none()); + } + + #[test] + fn extract_minimal_config() { + let o = opts(&[( + "sasl.oauthbearer.token.endpoint.url", + "https://example.com/token", + )]); + let c = extract_oauthbearer_config(&o).unwrap(); + assert_eq!(c.token_url, "https://example.com/token"); + assert!(c.client_id.is_none()); + assert!(c.client_secret.is_none()); + assert!(c.scope.is_none()); + assert_eq!(c.principal_name, ""); + assert!(c.extra_params.is_empty()); + } + + #[test] + fn extract_full_config_with_credentials() { + let o = opts(&[ + ( + "sasl.oauthbearer.token.endpoint.url", + "https://example.com/token", + ), + ("sasl.oauthbearer.client.id", "my-client"), + ("sasl.oauthbearer.client.secret", "my-secret"), + ("sasl.oauthbearer.scope", "kafka:write"), + ]); + let c = extract_oauthbearer_config(&o).unwrap(); + assert_eq!(c.client_id.as_deref(), Some("my-client")); + assert_eq!(c.client_secret.as_deref(), Some("my-secret")); + assert_eq!(c.scope.as_deref(), Some("kafka:write")); + assert_eq!(c.principal_name, "my-client"); // falls back to client_id + } + + #[test] + fn extract_absent_method_treated_as_default() { + let o = opts(&[( + "sasl.oauthbearer.token.endpoint.url", + "https://example.com/token", + )]); + assert!(extract_oauthbearer_config(&o).is_some()); + } + + #[test] + fn extract_explicit_default_method() { + let o = opts(&[ + ("sasl.oauthbearer.method", "default"), + ( + "sasl.oauthbearer.token.endpoint.url", + "https://example.com/token", + ), + ]); + assert!(extract_oauthbearer_config(&o).is_some()); + } + + #[test] + fn parse_empty_config() { + let (principal, params) = parse_oauthbearer_config(None); + assert_eq!(principal, ""); + assert!(params.is_empty()); + } + + #[test] + fn parse_principal_only() { + let (principal, params) = parse_oauthbearer_config(Some("principal=my-service")); + assert_eq!(principal, "my-service"); + assert!(params.is_empty()); + } + + #[test] + fn parse_extra_http_params() { + let (principal, params) = + parse_oauthbearer_config(Some("grant_type=authorization_code code=my-pac")); + assert_eq!(principal, ""); + assert_eq!( + params, + vec![ + ("grant_type".to_string(), "authorization_code".to_string()), + ("code".to_string(), "my-pac".to_string()), + ] + ); + } + + #[test] + fn parse_extension_pairs_ignored() { + let (principal, params) = + parse_oauthbearer_config(Some("principal=svc extension_traceId=abc extra=x")); + assert_eq!(principal, "svc"); + assert_eq!(params, vec![("extra".to_string(), "x".to_string())]); + } + + #[test] + fn extract_extra_params_from_config_string() { + let o = opts(&[ + ( + "sasl.oauthbearer.token.endpoint.url", + "https://example.com/token", + ), + ("sasl.oauthbearer.client.id", "c"), + ( + "sasl.oauthbearer.config", + "grant_type=authorization_code code=PAC123", + ), + ]); + let c = extract_oauthbearer_config(&o).unwrap(); + assert_eq!( + c.extra_params, + vec![ + ("grant_type".to_string(), "authorization_code".to_string()), + ("code".to_string(), "PAC123".to_string()), + ] + ); + assert_eq!(c.principal_name, "c"); + } + + #[test] + fn extract_principal_from_config_string_overrides_client_id() { + let o = opts(&[ + ( + "sasl.oauthbearer.token.endpoint.url", + "https://example.com/token", + ), + ("sasl.oauthbearer.client.id", "client-id"), + ("sasl.oauthbearer.config", "principal=my-service"), + ]); + let c = extract_oauthbearer_config(&o).unwrap(); + assert_eq!(c.principal_name, "my-service"); + } + + #[test] + fn parse_malformed_pair_without_equals_ignored() { + let (principal, params) = parse_oauthbearer_config(Some("noequalssign valid=ok")); + assert_eq!(principal, ""); + assert_eq!(params, vec![("valid".to_string(), "ok".to_string())]); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn generate_token_client_credentials() { + let mock = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/token")) + .and(body_string_contains("grant_type=client_credentials")) + .and(body_string_contains("client_id=my-client")) + .and(body_string_contains("client_secret=my-secret")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "access_token": "tok123", + "token_type": "bearer", + "expires_in": 3600u64, + }))) + .mount(&mock) + .await; + + let ctx = KafkaStatisticsContext { + expose_lag_metrics: false, + span: tracing::Span::current(), + oauthbearer: Some(KafkaOAuthBearerConfig { + token_url: format!("{}/token", mock.uri()), + client_id: Some("my-client".into()), + client_secret: Some("my-secret".into()), + scope: None, + principal_name: "my-client".into(), + extra_params: vec![], + }), + }; + + let token = ctx.generate_oauth_token(None).unwrap(); + assert_eq!(token.token, "tok123"); + assert_eq!(token.principal_name, "my-client"); + assert!(token.lifetime_ms > 0); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn generate_token_with_extra_params_override_grant_type() { + let mock = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/token")) + .and(body_string_contains("grant_type=authorization_code")) + .and(body_string_contains("code=PAC123")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "access_token": "ims-tok", + "expires_in": 86399u64, + }))) + .mount(&mock) + .await; + + let ctx = KafkaStatisticsContext { + expose_lag_metrics: false, + span: tracing::Span::current(), + oauthbearer: Some(KafkaOAuthBearerConfig { + token_url: format!("{}/token", mock.uri()), + client_id: None, + client_secret: None, + scope: None, + principal_name: "".into(), + extra_params: vec![ + ("grant_type".into(), "authorization_code".into()), + ("code".into(), "PAC123".into()), + ], + }), + }; + + let token = ctx.generate_oauth_token(None).unwrap(); + assert_eq!(token.token, "ims-tok"); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn generate_token_missing_expires_in_defaults_to_3600() { + let mock = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/token")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "access_token": "no-expiry-tok", + }))) + .mount(&mock) + .await; + + let ctx = KafkaStatisticsContext { + expose_lag_metrics: false, + span: tracing::Span::current(), + oauthbearer: Some(KafkaOAuthBearerConfig { + token_url: format!("{}/token", mock.uri()), + client_id: None, + client_secret: None, + scope: None, + principal_name: "".into(), + extra_params: vec![], + }), + }; + + let token = ctx.generate_oauth_token(None).unwrap(); + assert_eq!(token.token, "no-expiry-tok"); + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() as i64; + assert!(token.lifetime_ms >= now_ms + 3_595_000); + assert!(token.lifetime_ms <= now_ms + 3_605_000); + } + + #[test] + fn generate_token_returns_err_when_oauthbearer_not_configured() { + let ctx = KafkaStatisticsContext { + expose_lag_metrics: false, + span: tracing::Span::current(), + oauthbearer: None, + }; + assert!(ctx.generate_oauth_token(None).is_err()); + } +} diff --git a/src/sinks/appsignal/config.rs b/src/sinks/appsignal/config.rs index 9e8c000f995a3..fea94af05b132 100644 --- a/src/sinks/appsignal/config.rs +++ b/src/sinks/appsignal/config.rs @@ -21,7 +21,7 @@ use crate::{ prelude::{SinkConfig, SinkContext}, util::{ BatchConfig, Compression, ServiceBuilderExt, SinkBatchSettings, TowerRequestConfig, - http::HttpStatusRetryLogic, + http::{HttpStatusRetryLogic, RetryStrategy}, }, }, }; @@ -67,6 +67,10 @@ pub(super) struct AppsignalConfig { skip_serializing_if = "crate::serde::is_default" )] acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[serde(default)] + retry_strategy: RetryStrategy, } pub(super) fn default_endpoint() -> String { @@ -99,7 +103,10 @@ impl AppsignalConfig { let request_opts = self.request; let request_settings = request_opts.into_settings(); - let retry_logic = HttpStatusRetryLogic::new(|req: &AppsignalResponse| req.http_status); + let retry_logic = HttpStatusRetryLogic::new( + |req: &AppsignalResponse| req.http_status, + self.retry_strategy.clone(), + ); let service = ServiceBuilder::new() .settings(request_settings, retry_logic) diff --git a/src/sinks/axiom/config.rs b/src/sinks/axiom/config.rs index 61421973d5475..0930a997abc12 100644 --- a/src/sinks/axiom/config.rs +++ b/src/sinks/axiom/config.rs @@ -15,7 +15,8 @@ use crate::{ Healthcheck, VectorSink, http::config::{HttpMethod, HttpSinkConfig}, util::{ - BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, http::RequestConfig, + BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, + http::{RequestConfig, RetryStrategy}, }, }, tls::TlsConfig, @@ -124,6 +125,10 @@ pub struct AxiomConfig { skip_serializing_if = "crate::serde::is_default" )] pub acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[serde(default)] + pub retry_strategy: RetryStrategy, } impl GenerateConfig for AxiomConfig { @@ -180,6 +185,7 @@ impl SinkConfig for AxiomConfig { ), payload_prefix: "".into(), // Always newline delimited JSON payload_suffix: "".into(), // Always newline delimited JSON + retry_strategy: self.retry_strategy.clone(), }; http_sink_config.build(cx).await diff --git a/src/sinks/azure_logs_ingestion/config.rs b/src/sinks/azure_logs_ingestion/config.rs index 3b105d549d0c1..89da496045d9a 100644 --- a/src/sinks/azure_logs_ingestion/config.rs +++ b/src/sinks/azure_logs_ingestion/config.rs @@ -10,7 +10,10 @@ use crate::{ sinks::{ azure_common::config::AzureAuthentication, prelude::*, - util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde, http::HttpStatusRetryLogic}, + util::{ + RealtimeSizeBasedDefaultBatchSettings, UriSerde, + http::{HttpStatusRetryLogic, RetryStrategy}, + }, }, }; @@ -102,6 +105,10 @@ pub struct AzureLogsIngestionConfig { skip_serializing_if = "crate::serde::is_default" )] pub acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[serde(default)] + pub retry_strategy: RetryStrategy, } impl Default for AzureLogsIngestionConfig { @@ -118,6 +125,7 @@ impl Default for AzureLogsIngestionConfig { request: Default::default(), tls: None, acknowledgements: Default::default(), + retry_strategy: Default::default(), } } } @@ -156,8 +164,10 @@ impl AzureLogsIngestionConfig { )?; let healthcheck = service.healthcheck(); - let retry_logic = - HttpStatusRetryLogic::new(|res: &AzureLogsIngestionResponse| res.http_status); + let retry_logic = HttpStatusRetryLogic::new( + |res: &AzureLogsIngestionResponse| res.http_status, + self.retry_strategy.clone(), + ); let request_settings = self.request.into_settings(); let service = ServiceBuilder::new() .settings(request_settings, retry_logic) diff --git a/src/sinks/azure_monitor_logs/config.rs b/src/sinks/azure_monitor_logs/config.rs index dc60d179a2ff5..6d68aed7efd75 100644 --- a/src/sinks/azure_monitor_logs/config.rs +++ b/src/sinks/azure_monitor_logs/config.rs @@ -16,7 +16,10 @@ use crate::{ http::{HttpClient, get_http_scheme_from_uri}, sinks::{ prelude::*, - util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde, http::HttpStatusRetryLogic}, + util::{ + RealtimeSizeBasedDefaultBatchSettings, UriSerde, + http::{HttpStatusRetryLogic, RetryStrategy}, + }, }, }; @@ -113,6 +116,10 @@ pub struct AzureMonitorLogsConfig { skip_serializing_if = "crate::serde::is_default" )] pub acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[serde(default)] + pub retry_strategy: RetryStrategy, } impl Default for AzureMonitorLogsConfig { @@ -129,6 +136,7 @@ impl Default for AzureMonitorLogsConfig { time_generated_key: None, tls: None, acknowledgements: Default::default(), + retry_strategy: Default::default(), } } } @@ -181,8 +189,10 @@ impl AzureMonitorLogsConfig { )?; let healthcheck = service.healthcheck(); - let retry_logic = - HttpStatusRetryLogic::new(|res: &AzureMonitorLogsResponse| res.http_status); + let retry_logic = HttpStatusRetryLogic::new( + |res: &AzureMonitorLogsResponse| res.http_status, + self.retry_strategy.clone(), + ); let request_settings = self.request.into_settings(); let service = ServiceBuilder::new() .settings(request_settings, retry_logic) diff --git a/src/sinks/datadog/events/config.rs b/src/sinks/datadog/events/config.rs index 2bfdc66315c3e..f9b0f9361bba8 100644 --- a/src/sinks/datadog/events/config.rs +++ b/src/sinks/datadog/events/config.rs @@ -14,7 +14,10 @@ use crate::{ sinks::{ Healthcheck, VectorSink, datadog::{DatadogCommonConfig, LocalDatadogCommonConfig}, - util::{ServiceBuilderExt, TowerRequestConfig, http::HttpStatusRetryLogic}, + util::{ + ServiceBuilderExt, TowerRequestConfig, + http::{HttpStatusRetryLogic, RetryStrategy}, + }, }, tls::MaybeTlsSettings, }; @@ -33,6 +36,10 @@ pub struct DatadogEventsConfig { #[configurable(derived)] #[serde(default)] pub request: TowerRequestConfig, + + #[configurable(derived)] + #[serde(default)] + pub retry_strategy: RetryStrategy, } impl GenerateConfig for DatadogEventsConfig { @@ -64,7 +71,10 @@ impl DatadogEventsConfig { let request_opts = self.request; let request_settings = request_opts.into_settings(); - let retry_logic = HttpStatusRetryLogic::new(|req: &DatadogEventsResponse| req.http_status); + let retry_logic = HttpStatusRetryLogic::new( + |req: &DatadogEventsResponse| req.http_status, + self.retry_strategy.clone(), + ); let service = ServiceBuilder::new() .settings(request_settings, retry_logic) diff --git a/src/sinks/gcp/stackdriver/logs/config.rs b/src/sinks/gcp/stackdriver/logs/config.rs index 9b66eea4ac214..addb45163df76 100644 --- a/src/sinks/gcp/stackdriver/logs/config.rs +++ b/src/sinks/gcp/stackdriver/logs/config.rs @@ -21,7 +21,7 @@ use crate::{ prelude::*, util::{ BoxedRawValue, RealtimeSizeBasedDefaultBatchSettings, - http::{HttpService, http_response_retry_logic}, + http::{HttpService, RetryStrategy, http_response_retry_logic}, service::TowerRequestConfigDefaults, }, }, @@ -106,6 +106,10 @@ pub(super) struct StackdriverConfig { skip_serializing_if = "crate::serde::is_default" )] acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[serde(default)] + pub retry_strategy: RetryStrategy, } pub(super) fn default_endpoint() -> String { @@ -269,7 +273,10 @@ impl SinkConfig for StackdriverConfig { let service = HttpService::new(client.clone(), stackdriver_logs_service_request_builder); let service = ServiceBuilder::new() - .settings(request_limits, http_response_retry_logic()) + .settings( + request_limits, + http_response_retry_logic(self.retry_strategy.clone()), + ) .service(service); let sink = StackdriverLogsSink::new(service, batch_settings, request_builder); diff --git a/src/sinks/gcp/stackdriver/metrics/config.rs b/src/sinks/gcp/stackdriver/metrics/config.rs index 2398a18e9beb9..ea2cc9d6b5293 100644 --- a/src/sinks/gcp/stackdriver/metrics/config.rs +++ b/src/sinks/gcp/stackdriver/metrics/config.rs @@ -15,7 +15,8 @@ use crate::{ prelude::*, util::{ http::{ - HttpRequest, HttpService, HttpServiceRequestBuilder, http_response_retry_logic, + HttpRequest, HttpService, HttpServiceRequestBuilder, RetryStrategy, + http_response_retry_logic, }, service::TowerRequestConfigDefaults, }, @@ -77,6 +78,10 @@ pub struct StackdriverConfig { skip_serializing_if = "crate::serde::is_default" )] pub(super) acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[serde(default)] + pub retry_strategy: RetryStrategy, } fn default_metric_namespace_value() -> String { @@ -126,7 +131,10 @@ impl SinkConfig for StackdriverConfig { let service = HttpService::new(client, stackdriver_metrics_service_request_builder); let service = ServiceBuilder::new() - .settings(request_limits, http_response_retry_logic()) + .settings( + request_limits, + http_response_retry_logic(self.retry_strategy.clone()), + ) .service(service); let sink = StackdriverMetricsSink::new(service, batch_settings, request_builder); diff --git a/src/sinks/honeycomb/config.rs b/src/sinks/honeycomb/config.rs index 05a56cd9df368..f179248e1697b 100644 --- a/src/sinks/honeycomb/config.rs +++ b/src/sinks/honeycomb/config.rs @@ -16,7 +16,7 @@ use crate::{ prelude::*, util::{ BatchConfig, BoxedRawValue, - http::{HttpService, http_response_retry_logic}, + http::{HttpService, RetryStrategy, http_response_retry_logic}, }, }, }; @@ -71,6 +71,10 @@ pub struct HoneycombConfig { skip_serializing_if = "crate::serde::is_default" )] acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[serde(default)] + pub retry_strategy: RetryStrategy, } fn default_endpoint() -> String { @@ -124,7 +128,10 @@ impl SinkConfig for HoneycombConfig { let request_limits = self.request.into_settings(); let service = ServiceBuilder::new() - .settings(request_limits, http_response_retry_logic()) + .settings( + request_limits, + http_response_retry_logic(self.retry_strategy.clone()), + ) .service(service); let sink = HoneycombSink::new(service, batch_settings, request_builder); diff --git a/src/sinks/http/config.rs b/src/sinks/http/config.rs index a3a105fa5c608..2b6abebe0d402 100644 --- a/src/sinks/http/config.rs +++ b/src/sinks/http/config.rs @@ -30,7 +30,10 @@ use crate::{ prelude::*, util::{ RealtimeSizeBasedDefaultBatchSettings, UriSerde, - http::{HttpService, OrderedHeaderName, RequestConfig, http_response_retry_logic}, + http::{ + HttpService, OrderedHeaderName, RequestConfig, RetryStrategy, + http_response_retry_logic, + }, }, }, }; @@ -100,6 +103,10 @@ pub struct HttpSinkConfig { skip_serializing_if = "crate::serde::is_default" )] pub acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[serde(default)] + pub retry_strategy: RetryStrategy, } /// HTTP method. @@ -330,7 +337,10 @@ impl SinkConfig for HttpSinkConfig { let request_limits = self.request.tower.into_settings(); let service = ServiceBuilder::new() - .settings(request_limits, http_response_retry_logic()) + .settings( + request_limits, + http_response_retry_logic(self.retry_strategy.clone()), + ) .service(service); let sink = HttpSink::new( @@ -405,6 +415,7 @@ mod tests { acknowledgements: AcknowledgementsConfig::default(), payload_prefix: String::new(), payload_suffix: String::new(), + retry_strategy: RetryStrategy::default(), }; let external_resource = ExternalResource::new( diff --git a/src/sinks/http/tests.rs b/src/sinks/http/tests.rs index 12ff994239f5d..2b4b6942c5c7b 100644 --- a/src/sinks/http/tests.rs +++ b/src/sinks/http/tests.rs @@ -67,6 +67,7 @@ fn default_cfg(encoding: EncodingConfigWithFraming) -> HttpSinkConfig { request: Default::default(), tls: Default::default(), acknowledgements: Default::default(), + retry_strategy: Default::default(), } } @@ -445,6 +446,105 @@ async fn retries_on_temporary_error() { .await; } +#[tokio::test] +async fn custom_retry_retries_only_configured_status_code() { + components::assert_sink_compliance(&HTTP_SINK_TAGS, async { + const NUM_LINES: usize = 1; + const NUM_FAILURES: usize = 2; + const CUSTOM_RETRY_CONFIG: &str = r#" + request.retry_attempts = 2 + request.retry_initial_backoff_secs = 1 + request.retry_max_duration_secs = 1 + retry_strategy.type = "custom" + retry_strategy.status_codes = [408, 425, 429, 503] + "#; + + let (in_addr, sink) = build_sink(CUSTOM_RETRY_CONFIG).await; + + let counter = Arc::new(atomic::AtomicUsize::new(0)); + let in_counter = Arc::clone(&counter); + let (rx, trigger, server) = build_test_server_generic(in_addr, move || { + let count = in_counter.fetch_add(1, atomic::Ordering::Relaxed); + if count < NUM_FAILURES { + Response::builder() + .status(StatusCode::SERVICE_UNAVAILABLE) + .body(Body::empty()) + .unwrap_or_else(|_| unreachable!()) + } else { + Response::new(Body::empty()) + } + }); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (input_lines, events) = random_lines_with_stream(100, NUM_LINES, Some(batch)); + let pump = sink.run(events); + + tokio::spawn(server); + + pump.await.unwrap(); + drop(trigger); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let output_lines = get_received_gzip(rx, |parts| { + assert_eq!(Method::POST, parts.method); + assert_eq!("/frames", parts.uri.path()); + }) + .await; + + let tries = counter.load(atomic::Ordering::Relaxed); + assert_eq!(tries, NUM_FAILURES + 1); + assert_eq!(NUM_LINES, output_lines.len()); + assert_eq!(input_lines, output_lines); + }) + .await; +} + +#[tokio::test] +async fn custom_retry_does_not_retry_unconfigured_status_code() { + components::assert_sink_error(&COMPONENT_ERROR_TAGS, async { + const NUM_LINES: usize = 1; + const CUSTOM_RETRY_CONFIG: &str = r#" + request.retry_attempts = 2 + request.retry_initial_backoff_secs = 1 + request.retry_max_duration_secs = 1 + retry_strategy.type = "custom" + retry_strategy.status_codes = [408, 425, 429, 503] + "#; + + let (in_addr, sink) = build_sink(CUSTOM_RETRY_CONFIG).await; + + let counter = Arc::new(atomic::AtomicUsize::new(0)); + let in_counter = Arc::clone(&counter); + let (rx, trigger, server) = build_test_server_generic(in_addr, move || { + in_counter.fetch_add(1, atomic::Ordering::Relaxed); + Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::empty()) + .unwrap_or_else(|_| unreachable!()) + }); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (_input_lines, events) = random_lines_with_stream(100, NUM_LINES, Some(batch)); + let pump = sink.run(events); + + tokio::spawn(server); + + pump.await.unwrap(); + drop(trigger); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected)); + assert_eq!(counter.load(atomic::Ordering::Relaxed), 1); + + let output_lines = get_received_gzip(rx, |_| { + unreachable!("There should be no successful requests") + }) + .await; + assert!(output_lines.is_empty()); + }) + .await; +} + #[tokio::test] async fn fails_on_permanent_error() { components::assert_sink_error(&COMPONENT_ERROR_TAGS, async { diff --git a/src/sinks/kafka/sink.rs b/src/sinks/kafka/sink.rs index 016f569bb726d..3701c5cb61567 100644 --- a/src/sinks/kafka/sink.rs +++ b/src/sinks/kafka/sink.rs @@ -3,7 +3,7 @@ use std::time::Duration; use rdkafka::{ ClientConfig, error::KafkaError, - producer::{BaseProducer, FutureProducer, Producer}, + producer::{FutureProducer, Producer}, }; use snafu::{ResultExt, Snafu}; use tower::limit::RateLimit; @@ -38,11 +38,13 @@ pub struct KafkaSink { pub(crate) fn create_producer( client_config: ClientConfig, + oauthbearer: Option, ) -> crate::Result> { let producer = client_config .create_with_context(KafkaStatisticsContext { expose_lag_metrics: false, span: Span::current(), + oauthbearer, }) .context(KafkaCreateFailedSnafu)?; Ok(producer) @@ -51,7 +53,8 @@ pub(crate) fn create_producer( impl KafkaSink { pub(crate) fn new(config: KafkaSinkConfig) -> crate::Result { let producer_config = config.to_rdkafka()?; - let producer = create_producer(producer_config)?; + let oauthbearer = crate::kafka::extract_oauthbearer_config(&config.librdkafka_options); + let producer = create_producer(producer_config, oauthbearer)?; let transformer = config.encoding.transformer(); let serializer = config.encoding.build()?; let encoder = Encoder::<()>::new(serializer); @@ -118,6 +121,7 @@ pub(crate) async fn healthcheck( ) -> crate::Result<()> { trace!("Healthcheck started."); let client_config = config.to_rdkafka().unwrap(); + let oauthbearer = crate::kafka::extract_oauthbearer_config(&config.librdkafka_options); let topic: Option = match config.healthcheck_topic { Some(topic) => Some(topic), _ => match config.topic.render_string(&LogEvent::from_str_legacy("")) { @@ -132,14 +136,23 @@ pub(crate) async fn healthcheck( }, }; - tokio::task::spawn_blocking(move || { - let producer: BaseProducer = client_config.create().unwrap(); + tokio::task::spawn_blocking(move || -> crate::Result<()> { + // FutureProducer wraps a ThreadedProducer whose background thread processes + // OAUTHBEARER token refresh events. BaseProducer has no such thread and hangs. + let producer: FutureProducer = client_config + .create_with_context(KafkaStatisticsContext { + span: Span::current(), + expose_lag_metrics: false, + oauthbearer, + }) + .context(KafkaCreateFailedSnafu)?; let topic = topic.as_ref().map(|topic| &topic[..]); producer .client() .fetch_metadata(topic, healthcheck_options.timeout) .map(|_| ()) + .map_err(Into::into) }) .await??; trace!("Healthcheck completed."); diff --git a/src/sinks/keep/config.rs b/src/sinks/keep/config.rs index 7b81281253672..d3810976aa3c1 100644 --- a/src/sinks/keep/config.rs +++ b/src/sinks/keep/config.rs @@ -16,7 +16,7 @@ use crate::{ prelude::*, util::{ BatchConfig, BoxedRawValue, - http::{HttpService, http_response_retry_logic}, + http::{HttpService, RetryStrategy, http_response_retry_logic}, }, }, }; @@ -59,6 +59,10 @@ pub struct KeepConfig { skip_serializing_if = "crate::serde::is_default" )] acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[serde(default)] + pub retry_strategy: RetryStrategy, } fn default_endpoint() -> String { @@ -111,7 +115,10 @@ impl SinkConfig for KeepConfig { let request_limits = self.request.into_settings(); let service = ServiceBuilder::new() - .settings(request_limits, http_response_retry_logic()) + .settings( + request_limits, + http_response_retry_logic(self.retry_strategy.clone()), + ) .service(service); let sink = KeepSink::new(service, batch_settings, request_builder); diff --git a/src/sinks/opentelemetry/mod.rs b/src/sinks/opentelemetry/mod.rs index 7d73e25c4030d..dcc13624372d5 100644 --- a/src/sinks/opentelemetry/mod.rs +++ b/src/sinks/opentelemetry/mod.rs @@ -56,6 +56,7 @@ impl Default for Protocol { request: Default::default(), tls: Default::default(), acknowledgements: Default::default(), + retry_strategy: Default::default(), }) } } diff --git a/src/sinks/prometheus/remote_write/config.rs b/src/sinks/prometheus/remote_write/config.rs index c1b420d6b216d..c9f214953003c 100644 --- a/src/sinks/prometheus/remote_write/config.rs +++ b/src/sinks/prometheus/remote_write/config.rs @@ -17,7 +17,7 @@ use crate::{ prometheus::PrometheusRemoteWriteAuth, util::{ auth::Auth, - http::{OrderedHeaderName, http_response_retry_logic}, + http::{OrderedHeaderName, RetryStrategy, http_response_retry_logic}, service::TowerRequestConfig, }, }, @@ -129,6 +129,10 @@ pub struct RemoteWriteConfig { #[serde(default = "default_compression")] #[derivative(Default(value = "default_compression()"))] pub compression: Compression, + + #[configurable(derived)] + #[serde(default)] + pub retry_strategy: RetryStrategy, } const fn default_compression() -> Compression { @@ -251,7 +255,10 @@ impl SinkConfig for RemoteWriteConfig { headers: validated_headers, }; let service = ServiceBuilder::new() - .settings(request_settings, http_response_retry_logic()) + .settings( + request_settings, + http_response_retry_logic(self.retry_strategy.clone()), + ) .service(service); let sink = RemoteWriteSink { diff --git a/src/sinks/util/http.rs b/src/sinks/util/http.rs index e19025e3c6ee3..89225f4077741 100644 --- a/src/sinks/util/http.rs +++ b/src/sinks/util/http.rs @@ -7,6 +7,7 @@ use futures::{Sink, future::BoxFuture}; use headers::HeaderName; use http::{HeaderValue, Request, Response, StatusCode, header}; use http_body::Body as _; +use tracing::debug; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct OrderedHeaderName(HeaderName); @@ -550,14 +551,137 @@ impl sink::Response for http::Response { } } +/// Serializes and deserializes a [`Vec`] +mod status_code_vec { + use http::StatusCode; + use serde::{Deserialize, Deserializer, Serialize, Serializer, de::Error}; + + /// Deserializes a [`Vec`] + pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + Vec::::deserialize(deserializer)? + .into_iter() + .map(StatusCode::from_u16) + .collect::, _>>() + .map_err(Error::custom) + } + + /// Serializes a [`Vec`] + pub fn serialize(status_codes: &[StatusCode], serializer: S) -> Result + where + S: Serializer, + { + status_codes + .iter() + .map(StatusCode::as_u16) + .collect::>() + .serialize(serializer) + } +} + +/// Configurable retry strategy for `http` based sinks. +/// +/// For more information about error responses, see [Client Error Responses][error_responses]. +/// +/// [error_responses]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Status#client_error_responses +#[configurable_component] +#[derive(Debug, Clone, Default, PartialEq)] +#[serde(tag = "type", rename_all = "snake_case")] +#[configurable(metadata(docs::enum_tag_description = "The retry strategy enum."))] +pub enum RetryStrategy { + /// Don't retry any errors, including request timeouts. + None, + + /// Default strategy. See [`RetryStrategy::retry_action`] for more details. + #[default] + Default, + + /// Retry on *all* HTTP status codes except for success codes (2xx) + All, + + /// Custom retry strategy + Custom { + /// Retry on these specific HTTP status codes + #[serde(with = "status_code_vec")] + status_codes: Vec, + }, +} + +impl RetryStrategy { + /// Returns the name of the retry strategy. + #[must_use] + const fn name(&self) -> &str { + match self { + Self::None => "Never retry strategy", + Self::Default => "Default retry strategy", + Self::All => "Retry all strategy", + Self::Custom { .. } => "Custom retry strategy", + } + } + + /// Determines if the given status code should be retried. + /// + /// For the `Default` strategy, the following status codes will be retried: + /// - 429 (Too Many Requests) + /// - 408 (Request Timeout) + /// - 5xx (Server Error) + /// + /// For the `Custom` strategy, the status codes specified in the `status_codes` field will be retried. + /// + /// For the `All` strategy, all non-success status codes will be retried. + #[must_use] + pub fn retry_action(&self, status: http::StatusCode) -> RetryAction { + if status.is_success() { + return RetryAction::Successful; + } + + let reason = format!( + "{}: {}", + self.name(), + status.canonical_reason().unwrap_or_else(|| status.as_str()) + ) + .into(); + + match self { + Self::None => RetryAction::DontRetry(reason), + Self::Default => match status { + StatusCode::TOO_MANY_REQUESTS | StatusCode::REQUEST_TIMEOUT => { + RetryAction::Retry(reason) + } + StatusCode::NOT_IMPLEMENTED => RetryAction::DontRetry(reason), + _ => { + if status.is_server_error() { + RetryAction::Retry(reason) + } else { + RetryAction::DontRetry(reason) + } + } + }, + Self::All => RetryAction::Retry(reason), + Self::Custom { status_codes } => { + if status_codes.contains(&status) { + RetryAction::Retry(reason) + } else { + RetryAction::DontRetry(reason) + } + } + } + } +} + #[derive(Debug, Clone)] pub struct HttpRetryLogic { request: PhantomData, + retry_strategy: RetryStrategy, } + impl Default for HttpRetryLogic { fn default() -> Self { Self { request: PhantomData, + retry_strategy: RetryStrategy::Default, } } } @@ -567,25 +691,28 @@ impl RetryLogic for HttpRetryLogic { type Request = Req; type Response = hyper::Response; - fn is_retriable_error(&self, _error: &Self::Error) -> bool { - true + fn is_retriable_error(&self, error: &Self::Error) -> bool { + if self.retry_strategy == RetryStrategy::None { + false + } else { + error.is_retriable() + } + } + + fn is_retriable_timeout(&self) -> bool { + self.retry_strategy != RetryStrategy::None } fn should_retry_response(&self, response: &Self::Response) -> RetryAction { let status = response.status(); - - match status { - StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()), - StatusCode::REQUEST_TIMEOUT => RetryAction::Retry("request timeout".into()), - StatusCode::NOT_IMPLEMENTED => { - RetryAction::DontRetry("endpoint not implemented".into()) - } - _ if status.is_server_error() => RetryAction::Retry( - format!("{}: {}", status, String::from_utf8_lossy(response.body())).into(), - ), - _ if status.is_success() => RetryAction::Successful, - _ => RetryAction::DontRetry(format!("response status: {status}").into()), + if !status.is_success() { + debug!( + message = "HTTP response.", + %status, + body = %String::from_utf8_lossy(response.body()), + ); } + self.retry_strategy.retry_action(status) } } @@ -596,6 +723,7 @@ pub struct HttpStatusRetryLogic { func: F, request: PhantomData, response: PhantomData, + retry_strategy: RetryStrategy, } impl HttpStatusRetryLogic @@ -604,11 +732,12 @@ where Req: Send + Sync + 'static, Res: Send + Sync + 'static, { - pub const fn new(func: F) -> HttpStatusRetryLogic { + pub const fn new(func: F, retry_strategy: RetryStrategy) -> HttpStatusRetryLogic { HttpStatusRetryLogic { func, request: PhantomData, response: PhantomData, + retry_strategy, } } } @@ -623,25 +752,21 @@ where type Request = Req; type Response = Res; - fn is_retriable_error(&self, _error: &Self::Error) -> bool { - true + fn is_retriable_error(&self, error: &Self::Error) -> bool { + if self.retry_strategy == RetryStrategy::None { + false + } else { + error.is_retriable() + } + } + + fn is_retriable_timeout(&self) -> bool { + self.retry_strategy != RetryStrategy::None } fn should_retry_response(&self, response: &Res) -> RetryAction { let status = (self.func)(response); - - match status { - StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()), - StatusCode::REQUEST_TIMEOUT => RetryAction::Retry("request timeout".into()), - StatusCode::NOT_IMPLEMENTED => { - RetryAction::DontRetry("endpoint not implemented".into()) - } - _ if status.is_server_error() => { - RetryAction::Retry(format!("Http Status: {status}").into()) - } - _ if status.is_success() => RetryAction::Successful, - _ => RetryAction::DontRetry(format!("Http status: {status}").into()), - } + self.retry_strategy.retry_action(status) } } @@ -654,6 +779,7 @@ where func: self.func.clone(), request: PhantomData, response: PhantomData, + retry_strategy: self.retry_strategy.clone(), } } } @@ -820,12 +946,17 @@ impl DriverResponse for HttpResponse { } /// Creates a `RetryLogic` for use with `HttpResponse`. -pub fn http_response_retry_logic() -> HttpStatusRetryLogic< +pub fn http_response_retry_logic( + retry_strategy: RetryStrategy, +) -> HttpStatusRetryLogic< impl Fn(&HttpResponse) -> StatusCode + Clone + Send + Sync + 'static, Request, HttpResponse, > { - HttpStatusRetryLogic::new(|req: &HttpResponse| req.http_response.status()) + HttpStatusRetryLogic::new( + |req: &HttpResponse| req.http_response.status(), + retry_strategy, + ) } /// Uses the estimated json encoded size to determine batch sizing. @@ -969,6 +1100,93 @@ mod test { ); } + #[test] + fn retry_strategy_none_preserves_success_and_rejects_failures() { + let strategy = RetryStrategy::None; + + assert!(strategy.retry_action::<()>(StatusCode::OK).is_successful()); + assert!( + strategy + .retry_action::<()>(StatusCode::INTERNAL_SERVER_ERROR) + .is_not_retryable() + ); + } + + #[test] + fn retry_strategy_none_disables_timeout_retries() { + let logic = HttpRetryLogic::<()> { + request: PhantomData, + retry_strategy: RetryStrategy::None, + }; + let status_logic = + HttpStatusRetryLogic::<_, (), ()>::new(|_: &()| StatusCode::OK, RetryStrategy::None); + + assert!(!logic.is_retriable_timeout()); + assert!(!status_logic.is_retriable_timeout()); + } + + #[test] + fn retry_strategy_all_preserves_success_and_retries_failures() { + let strategy = RetryStrategy::All; + + assert!(strategy.retry_action::<()>(StatusCode::OK).is_successful()); + assert!( + strategy + .retry_action::<()>(StatusCode::BAD_REQUEST) + .is_retryable() + ); + assert!( + strategy + .retry_action::<()>(StatusCode::INTERNAL_SERVER_ERROR) + .is_retryable() + ); + } + + #[test] + fn retry_strategy_custom_only_retries_configured_statuses() { + let strategy = RetryStrategy::Custom { + status_codes: vec![StatusCode::BAD_REQUEST], + }; + + assert!(strategy.retry_action::<()>(StatusCode::OK).is_successful()); + assert!( + strategy + .retry_action::<()>(StatusCode::BAD_REQUEST) + .is_retryable() + ); + assert!( + strategy + .retry_action::<()>(StatusCode::INTERNAL_SERVER_ERROR) + .is_not_retryable() + ); + } + + #[test] + fn retry_strategy_custom_serde_roundtrips_status_codes() { + let json = r#"{"type":"custom","status_codes":[400,503]}"#; + let strategy: RetryStrategy = serde_json::from_str(json).unwrap(); + assert_eq!( + strategy, + RetryStrategy::Custom { + status_codes: vec![StatusCode::BAD_REQUEST, StatusCode::SERVICE_UNAVAILABLE], + } + ); + let encoded = serde_json::to_string(&strategy).unwrap(); + let roundtrip: RetryStrategy = serde_json::from_str(&encoded).unwrap(); + assert_eq!(roundtrip, strategy); + } + + #[test] + fn retry_strategy_custom_serde_rejects_invalid_status_codes() { + // `http::StatusCode::from_u16` only accepts 100–999; 1000 is out of range. + let json = r#"{"type":"custom","status_codes":[1000]}"#; + let result = serde_json::from_str::(json); + assert!( + result.is_err(), + "expected invalid status code to fail deserialization" + ); + } + #[tokio::test] async fn util_http_it_makes_http_requests() { let (_guard, addr) = next_addr(); diff --git a/src/sinks/util/retries.rs b/src/sinks/util/retries.rs index be05d559bb18c..2a38a2b4ee900 100644 --- a/src/sinks/util/retries.rs +++ b/src/sinks/util/retries.rs @@ -34,6 +34,12 @@ pub trait RetryLogic: Clone + Send + Sync + 'static { /// implementors to specify what kinds of errors can be retried. fn is_retriable_error(&self, error: &Self::Error) -> bool; + /// When the Service call times out, this function allows implementors to + /// specify if the timeout should be retried. + fn is_retriable_timeout(&self) -> bool { + true + } + /// When the Service call returns an `Ok` response, this function allows /// implementors to specify additional logic to determine if the success response /// is actually an error. This is particularly useful when the downstream service @@ -197,10 +203,18 @@ where None } } else if error.downcast_ref::().is_some() { - warn!( - "Request timed out. If this happens often while the events are actually reaching their destination, try decreasing `batch.max_bytes` and/or using `compression` if applicable. Alternatively `request.timeout_secs` can be increased." - ); - Some(self.build_retry()) + if self.logic.is_retriable_timeout() { + warn!( + "Request timed out. If this happens often while the events are actually reaching their destination, try decreasing `batch.max_bytes` and/or using `compression` if applicable. Alternatively `request.timeout_secs` can be increased." + ); + Some(self.build_retry()) + } else { + error!( + message = + "Request timed out and is not retriable; dropping the request." + ); + None + } } else { error!( message = "Unexpected error type; dropping the request.", @@ -338,6 +352,27 @@ mod tests { assert_eq!(fut.await.unwrap(), "world"); } + #[tokio::test] + async fn timeout_error_no_retry() { + trace_init(); + + let policy = FibonacciRetryPolicy::new( + 5, + Duration::from_secs(1), + Duration::from_secs(10), + NoTimeoutRetryLogic, + JitterMode::None, + ); + + let (mut svc, mut handle) = mock::spawn_layer(RetryLayer::new(policy)); + + assert_ready_ok!(svc.poll_ready()); + + let mut fut = task::spawn(svc.call("hello")); + assert_request_eq!(handle, "hello").send_error(Elapsed::new()); + assert_ready_err!(fut.poll()); + } + #[test] fn backoff_grows_to_max() { let mut policy = FibonacciRetryPolicy::new( @@ -425,6 +460,23 @@ mod tests { } } + #[derive(Debug, Clone)] + struct NoTimeoutRetryLogic; + + impl RetryLogic for NoTimeoutRetryLogic { + type Error = Error; + type Request = &'static str; + type Response = &'static str; + + fn is_retriable_error(&self, error: &Self::Error) -> bool { + error.0 + } + + fn is_retriable_timeout(&self) -> bool { + false + } + } + #[derive(Debug)] struct Error(bool); diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index cae76d8e02059..3b9a851e9f598 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -1227,6 +1227,11 @@ fn create_consumer( } } + let oauthbearer = config + .librdkafka_options + .as_ref() + .and_then(kafka::extract_oauthbearer_config); + let (callbacks, callback_rx) = mpsc::unbounded_channel(); let consumer = client_config .create_with_context::<_, StreamConsumer<_>>(KafkaSourceContext::new( @@ -1234,6 +1239,7 @@ fn create_consumer( acknowledgements, callbacks, Span::current(), + oauthbearer, )) .context(CreateSnafu)?; @@ -1274,11 +1280,13 @@ impl KafkaSourceContext { acknowledgements: bool, callbacks: UnboundedSender, span: Span, + oauthbearer: Option, ) -> Self { Self { stats: kafka::KafkaStatisticsContext { expose_lag_metrics, span, + oauthbearer, }, acknowledgements, consumer: OnceLock::default(), @@ -1360,9 +1368,18 @@ impl KafkaSourceContext { } impl ClientContext for KafkaSourceContext { + const ENABLE_REFRESH_OAUTH_TOKEN: bool = true; + fn stats(&self, statistics: Statistics) { self.stats.stats(statistics) } + + fn generate_oauth_token( + &self, + oauthbearer_config: Option<&str>, + ) -> Result> { + self.stats.generate_oauth_token(oauthbearer_config) + } } impl ConsumerContext for KafkaSourceContext { diff --git a/src/sources/windows_event_log/mod.rs b/src/sources/windows_event_log/mod.rs index 17d900cc07622..28dfc1db4cb79 100644 --- a/src/sources/windows_event_log/mod.rs +++ b/src/sources/windows_event_log/mod.rs @@ -29,7 +29,7 @@ cfg_if::cfg_if! { use vector_lib::EstimatedJsonEncodedSizeOf; use vector_lib::finalizer::OrderedFinalizer; use vector_lib::internal_event::{ - ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, + ByteSize, BytesReceived, CountByteSize, InternalEventHandle, Protocol, }; use windows::Win32::Foundation::{DUPLICATE_SAME_ACCESS, DuplicateHandle, HANDLE}; use windows::Win32::System::Threading::GetCurrentProcess; @@ -48,6 +48,7 @@ cfg_if::cfg_if! { error::WindowsEventLogError, parser::EventLogParser, subscription::{EventLogSubscription, WaitResult}, + xml_parser::WindowsEvent, }; } } @@ -157,6 +158,107 @@ impl Finalizer { } } +/// Parse, emit metrics for, send, and finalize a non-empty batch of pulled Windows events. +/// +/// Both the `EventsAvailable` path and the speculative-timeout path share this +/// logic. Returns `true` if the downstream pipeline closed and the caller +/// should break out of the main event loop. +async fn process_event_batch( + events: Vec, + parser: &EventLogParser, + acknowledgements: bool, + subscription: &EventLogSubscription, + out: &mut SourceSender, + finalizer: &Finalizer, + events_received: &impl InternalEventHandle, + bytes_received: &impl InternalEventHandle, +) -> bool { + // Rate limiting between batches (async-compatible). + if let Some(limiter) = subscription.rate_limiter() { + limiter.until_ready().await; + } + + let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(acknowledgements); + let mut log_events = Vec::new(); + let mut total_byte_size = 0usize; + let mut channels_in_batch = std::collections::HashSet::new(); + + for event in events { + let channel = event.channel.clone(); + channels_in_batch.insert(channel.clone()); + let event_id = event.event_id; + match parser.parse_event(event) { + Ok(mut log_event) => { + let byte_size = log_event.estimated_json_encoded_size_of(); + total_byte_size += byte_size.get(); + if let Some(ref batch) = batch { + log_event = log_event.with_batch_notifier(batch); + } + log_events.push(log_event); + } + Err(e) => { + emit!(WindowsEventLogParseError { + error: e.to_string(), + channel, + event_id: Some(event_id), + }); + } + } + } + + if !log_events.is_empty() { + let count = log_events.len(); + events_received.emit(CountByteSize(count, total_byte_size.into())); + bytes_received.emit(ByteSize(total_byte_size)); + + // BACK PRESSURE: block until the pipeline accepts the batch. + // We don't call EvtNext again until this completes. + if let Err(_error) = out.send_batch(log_events).await { + emit!(StreamClosedError { count }); + return true; // signal: break the main loop + } + + // Register checkpoint entry with the finalizer. + let bookmarks: Vec<(String, String)> = channels_in_batch + .into_iter() + .filter_map(|channel| { + subscription + .get_bookmark_xml(&channel) + .map(|xml| (channel, xml)) + }) + .collect(); + + if !bookmarks.is_empty() { + let entry = FinalizerEntry { bookmarks }; + finalizer.finalize(entry, receiver).await; + } + } + + false // pipeline still open +} + +/// Transfer ownership of `subscription` into a `spawn_blocking` task, run `f` +/// on it, then return both the subscription and the result. +/// +/// All blocking Windows APIs (`WaitForMultipleObjects`, `EvtNext`, `EvtRender`) +/// must run in `spawn_blocking` to avoid stalling the async runtime. The +/// ownership-transfer pattern ensures only one thread holds the subscription +/// at a time, preventing data races without requiring locks. +async fn with_subscription_blocking( + subscription: EventLogSubscription, + f: F, +) -> Result<(EventLogSubscription, R), WindowsEventLogError> +where + F: FnOnce(EventLogSubscription) -> (EventLogSubscription, R) + Send + 'static, + R: Send + 'static, +{ + tokio::task::spawn_blocking(move || f(subscription)) + .await + .map_err(|e| WindowsEventLogError::ConfigError { + message: format!("Blocking subscription task panicked: {e}"), + }) +} + /// Windows Event Log source implementation pub struct WindowsEventLogSource { config: WindowsEventLogConfig, @@ -281,42 +383,25 @@ impl WindowsEventLogSource { // Ownership transfer ensures no data races between the blocking thread // and async code. The shutdown watcher uses a raw HANDLE value (just an // integer) to signal shutdown without needing access to the subscription. - let (returned_sub, wait_result) = tokio::task::spawn_blocking({ - let sub = subscription; - move || { + let (returned_sub, wait_result) = + with_subscription_blocking(subscription, move |sub| { let result = sub.wait_for_events_blocking(timeout_ms); (sub, result) - } - }) - .await - .map_err(|e| WindowsEventLogError::ConfigError { - message: format!("Wait task panicked: {e}"), - })?; - + }) + .await?; subscription = returned_sub; match wait_result { WaitResult::EventsAvailable => { // Pull events via spawn_blocking (EvtNext/EvtRender are blocking APIs) - let (returned_sub, events_result) = tokio::task::spawn_blocking({ - let mut sub = subscription; - move || { + let (returned_sub, events_result) = + with_subscription_blocking(subscription, move |mut sub| { let result = sub.pull_events(batch_size); (sub, result) - } - }) - .await - .map_err(|e| WindowsEventLogError::ConfigError { - message: format!("Pull task panicked: {e}"), - })?; - + }) + .await?; subscription = returned_sub; - // Rate limiting between batches (async-compatible) - if let Some(limiter) = subscription.rate_limiter() { - limiter.until_ready().await; - } - match events_result { Ok(events) if events.is_empty() => { error_backoff = std::time::Duration::from_millis(100); @@ -328,65 +413,19 @@ impl WindowsEventLogSource { message = "Pulled Windows Event Log events.", event_count = events.len() ); - - let (batch, receiver) = - BatchNotifier::maybe_new_with_receiver(acknowledgements); - - let mut log_events = Vec::new(); - let mut total_byte_size = 0; - let mut channels_in_batch = std::collections::HashSet::new(); - - for event in events { - let channel = event.channel.clone(); - channels_in_batch.insert(channel.clone()); - let event_id = event.event_id; - match parser.parse_event(event) { - Ok(mut log_event) => { - let byte_size = log_event.estimated_json_encoded_size_of(); - total_byte_size += byte_size.get(); - - if let Some(ref batch) = batch { - log_event = log_event.with_batch_notifier(batch); - } - - log_events.push(log_event); - } - Err(e) => { - emit!(WindowsEventLogParseError { - error: e.to_string(), - channel, - event_id: Some(event_id), - }); - } - } - } - - if !log_events.is_empty() { - let count = log_events.len(); - events_received.emit(CountByteSize(count, total_byte_size.into())); - bytes_received.emit(ByteSize(total_byte_size)); - - // BACK PRESSURE: block here until the pipeline accepts - // the batch. We don't call EvtNext again until this completes. - if let Err(_error) = out.send_batch(log_events).await { - emit!(StreamClosedError { count }); - break; - } - - // Register checkpoint entry with finalizer - let bookmarks: Vec<(String, String)> = channels_in_batch - .into_iter() - .filter_map(|channel| { - subscription - .get_bookmark_xml(&channel) - .map(|xml| (channel, xml)) - }) - .collect(); - - if !bookmarks.is_empty() { - let entry = FinalizerEntry { bookmarks }; - finalizer.finalize(entry, receiver).await; - } + if process_event_batch( + events, + &parser, + acknowledgements, + &subscription, + &mut out, + &finalizer, + &events_received, + &bytes_received, + ) + .await + { + break; } } Err(e) => { @@ -415,10 +454,6 @@ impl WindowsEventLogSource { } WaitResult::Timeout => { - // A full wait cycle without errors means the system is healthy; - // reset backoff so the next transient error starts fresh. - error_backoff = std::time::Duration::from_millis(100); - // Periodic checkpoint flush (sync mode only) if !acknowledgements && last_checkpoint.elapsed() >= checkpoint_interval { if let Err(e) = subscription.flush_bookmarks().await { @@ -448,6 +483,74 @@ impl WindowsEventLogSource { ); } } + + // Speculative pull: self-heal against any lost-wakeup scenario, + // regardless of root cause. If the OS signal was lost through any + // mechanism (not just the pre-drain race fixed in #25194), this + // ensures the source recovers within one timeout period. + // Use the speculative pull variant so idle timeout cycles don't + // refresh per-channel record-count gauges via EvtOpenLog / + // EvtGetLogInfo on every configured channel. + let (returned_sub, speculative_result) = + with_subscription_blocking(subscription, move |mut sub| { + let result = sub.pull_events_speculative(batch_size); + (sub, result) + }) + .await?; + subscription = returned_sub; + + match speculative_result { + Ok(events) if events.is_empty() => { + // Healthy cycle: reset backoff so the next transient + // error starts fresh. + error_backoff = std::time::Duration::from_millis(100); + } + Ok(events) => { + // Healthy cycle: reset backoff so the next transient + // error starts fresh. + error_backoff = std::time::Duration::from_millis(100); + warn!( + message = "Speculative timeout pull recovered events; possible lost wakeup detected.", + event_count = events.len(), + ); + if process_event_batch( + events, + &parser, + acknowledgements, + &subscription, + &mut out, + &finalizer, + &events_received, + &bytes_received, + ) + .await + { + break; + } + } + Err(e) => { + emit!(WindowsEventLogQueryError { + channel: "all".to_string(), + query: None, + error: e.to_string(), + }); + if !e.is_recoverable() { + error!( + message = "Non-recoverable speculative pull error, shutting down.", + error = %e + ); + break; + } + // Exponential backoff mirrors the EventsAvailable error path. + warn!( + message = "Recoverable speculative pull error, backing off.", + backoff_ms = error_backoff.as_millis() as u64, + error = %e + ); + tokio::time::sleep(error_backoff).await; + error_backoff = (error_backoff * 2).min(MAX_ERROR_BACKOFF); + } + } } WaitResult::Shutdown => { diff --git a/src/sources/windows_event_log/subscription.rs b/src/sources/windows_event_log/subscription.rs index dc92713f691d9..4571561a2d349 100644 --- a/src/sources/windows_event_log/subscription.rs +++ b/src/sources/windows_event_log/subscription.rs @@ -18,9 +18,9 @@ use windows::Win32::System::EventLog::{ EvtSubscribeStartAfterBookmark, EvtSubscribeStartAtOldestRecord, EvtSubscribeStrict, EvtSubscribeToFutureEvents, }; -#[cfg(test)] -use windows::Win32::System::Threading::SetEvent; -use windows::Win32::System::Threading::{CreateEventW, ResetEvent, WaitForMultipleObjects}; +use windows::Win32::System::Threading::{ + CreateEventW, ResetEvent, SetEvent, WaitForMultipleObjects, +}; use windows::core::HSTRING; use super::{ @@ -30,6 +30,19 @@ use super::{ use crate::internal_events::WindowsEventLogBookmarkError; +/// Test-only hook called inside the `pull_events` drain loop after each +/// `EvtNext` invocation. Used by the lost-wakeup regression test +/// (see `test_pull_events_preserves_setevent_during_drain`) to race a +/// `SetEvent` against the drain without relying on thread-timing. +/// No-op and zero-cost in non-test builds. +/// +/// Only one test should install a hook at a time; tests that install a hook +/// must use `#[serial_test::serial]` or equivalent serialization to prevent +/// concurrent tests from triggering each other's hook. +#[cfg(test)] +static DRAIN_STEP_HOOK: std::sync::Mutex>> = + std::sync::Mutex::new(None); + /// Maximum number of entries in the EvtFormatMessage result cache. pub const FORMAT_CACHE_CAPACITY: usize = 10_000; /// Maximum number of cached publisher metadata handles. @@ -80,6 +93,7 @@ struct ChannelSubscription { // SAFETY: Same rationale as EventLogSubscription - Windows kernel handles are thread-safe. unsafe impl Send for ChannelSubscription {} +unsafe impl Sync for ChannelSubscription {} /// Result of waiting for events across all channels. pub enum WaitResult { @@ -130,8 +144,10 @@ pub struct EventLogSubscription { // SAFETY: Windows HANDLE and EVT_HANDLE are kernel objects safe to use across // threads. In windows 0.58, HANDLE wraps *mut c_void which is !Send/!Sync, -// but the underlying kernel handles are thread-safe. +// but the underlying kernel handles are thread-safe. All mutation requires +// &mut self; &self methods are read-only or delegate to Sync types (RateLimiter). unsafe impl Send for EventLogSubscription {} +unsafe impl Sync for EventLogSubscription {} impl EventLogSubscription { /// Create a new pull-model subscription for all configured channels. @@ -415,21 +431,20 @@ impl EventLogSubscription { /// Wait for events to become available on any channel, or for shutdown. /// /// Uses `WaitForMultipleObjects` via `spawn_blocking` to avoid blocking the - /// Tokio runtime. The wait array includes all channel signal events plus the - /// shutdown event. + /// Tokio runtime. The wait array puts shutdown first so a stop request wins + /// over any channel that is already signaled. pub fn wait_for_events_blocking(&self, timeout_ms: u32) -> WaitResult { - // Build wait handle array: [channel0_signal, channel1_signal, ..., shutdown_event] - let mut handles: Vec = self.channels.iter().map(|c| c.signal_event).collect(); + // Build wait handle array: [shutdown_event, channel0_signal, channel1_signal, ...] + let mut handles = Vec::with_capacity(self.channels.len() + 1); handles.push(self.shutdown_event); + handles.extend(self.channels.iter().map(|c| c.signal_event)); let result = unsafe { WaitForMultipleObjects(&handles, false, timeout_ms) }; - let shutdown_index = (self.channels.len()) as u32; - match result { r if r == WAIT_TIMEOUT => WaitResult::Timeout, - r if r.0 < WAIT_OBJECT_0.0 + shutdown_index => WaitResult::EventsAvailable, - r if r.0 == WAIT_OBJECT_0.0 + shutdown_index => WaitResult::Shutdown, + r if r == WAIT_OBJECT_0 => WaitResult::Shutdown, + r if r.0 <= WAIT_OBJECT_0.0 + self.channels.len() as u32 => WaitResult::EventsAvailable, _ => { // WAIT_FAILED or unexpected - treat as timeout to avoid tight loop warn!( @@ -459,6 +474,28 @@ impl EventLogSubscription { pub fn pull_events( &mut self, max_events: usize, + ) -> Result, WindowsEventLogError> { + self.pull_events_inner(max_events, true) + } + + /// Pull events for timeout-based speculative recovery. + /// + /// This keeps the same event-drain behavior as `pull_events`, but avoids + /// refreshing per-channel record-count gauges for channels that were empty. + /// Timeout pulls can run repeatedly while the host is idle, so skipping + /// those metadata queries prevents steady `EvtOpenLog`/`EvtGetLogInfo` + /// churn without changing event recovery behavior. + pub fn pull_events_speculative( + &mut self, + max_events: usize, + ) -> Result, WindowsEventLogError> { + self.pull_events_inner(max_events, false) + } + + fn pull_events_inner( + &mut self, + max_events: usize, + update_records_for_empty_channels: bool, ) -> Result, WindowsEventLogError> { let mut all_events = Vec::with_capacity(max_events.min(1000)); let num_channels = self.channels.len().max(1); @@ -479,9 +516,25 @@ impl EventLogSubscription { let mut bookmark_failed = false; let mut channel_count = 0usize; - // Drain loop: keep calling EvtNext until ERROR_NO_MORE_ITEMS or channel budget. - // Only reset the signal once the channel is fully drained; if we hit the - // budget limit the signal stays set so WaitForMultipleObjects returns immediately. + // Reset the signal BEFORE draining to avoid a lost-wakeup race + // (see vectordotdev/vector#25194). The Windows Event Log service + // signals this manual-reset event via SetEvent each time a new + // matching event is recorded; SetEvent on an already-signaled + // event is a no-op, so if we reset AFTER draining, any signal + // that arrives between our last EvtNext and ResetEvent is lost + // — the subscription then hangs until the next event arrives. + // Resetting first means any signal raised during the drain is + // preserved, causing the next WaitForMultipleObjects to return + // immediately. + // + // If we exit the drain loop early (channel budget exhausted or + // bookmark update failed mid-batch), we re-SetEvent at the end + // of this iteration so the next pull_events call revisits this + // channel without waiting for a fresh OS signal. + unsafe { + let _ = ResetEvent(channel_sub.signal_event); + } + 'drain: loop { if channel_count >= channel_limit { break; @@ -501,6 +554,17 @@ impl EventLogSubscription { ) }; + // Test-only hook: lets the lost-wakeup regression test race + // a SetEvent against the drain without thread-timing. No-op + // and zero-cost in non-test builds. + #[cfg(test)] + { + let hook = DRAIN_STEP_HOOK.lock().unwrap().clone(); + if let Some(h) = hook { + h(channel_sub.signal_event); + } + } + if let Err(err) = result { let code = (err.code().0 as u32) & 0xFFFF; if code == ERROR_NO_MORE_ITEMS { @@ -513,6 +577,8 @@ impl EventLogSubscription { channel = %channel_sub.channel ); channel_drained = true; + // Speculative pull on timeout in mod.rs is a safety net if the + // re-subscribed channel does not immediately re-signal. break; } if code == ERROR_EVT_QUERY_RESULT_INVALID_POSITION { @@ -526,7 +592,9 @@ impl EventLogSubscription { message = "Re-subscription succeeded after stale query.", channel = %channel_sub.channel ); - // Retry from fresh subscription — the signal will fire again + // Retry from fresh subscription — the signal will fire again. + // Speculative pull on timeout in mod.rs is a safety net if + // the new subscription does not immediately re-signal. channel_drained = true; break; } @@ -538,10 +606,23 @@ impl EventLogSubscription { ); channel_sub.subscription_active_gauge.set(0.0); channel_drained = true; + // Speculative pull on timeout in mod.rs is a safety net if + // the failed channel does not re-signal after recovery. break; } } } + // Re-arm the signal before returning. We reset it pre-drain + // but are bailing out without confirming the drain completed, + // so if events were left un-drained the next pull_events must + // still revisit this channel without waiting for a fresh OS + // signal. This mirrors the `else` branch below that handles + // budget-exhaustion and bookmark-failure early breaks, and + // avoids the same lost-wakeup symptom (vectordotdev/vector#25194) + // on transient EvtNext failures. + unsafe { + let _ = SetEvent(channel_sub.signal_event); + } return Err(WindowsEventLogError::PullEventsError { channel: channel_sub.channel.clone(), source: err, @@ -697,15 +778,22 @@ impl EventLogSubscription { } if channel_drained && !bookmark_failed { + // Update channel record count gauge for lag detection. + if update_records_for_empty_channels || channel_count > 0 { + super::render::update_channel_records( + &channel_sub.channel, + &channel_sub.channel_records_gauge, + ); + } + } else { + // Drain exited early (budget exhausted or bookmark_failed + // mid-batch). Re-arm the signal so the next pull_events + // revisits this channel immediately without waiting for a + // fresh OS notification. Pairs with the pre-drain ResetEvent + // above. unsafe { - let _ = ResetEvent(channel_sub.signal_event); + let _ = SetEvent(channel_sub.signal_event); } - - // Update channel record count gauge for lag detection. - super::render::update_channel_records( - &channel_sub.channel, - &channel_sub.channel_records_gauge, - ); } } @@ -816,6 +904,15 @@ impl EventLogSubscription { self.shutdown_event.0 } + /// Test-only accessor for the first channel's signal event handle. Used + /// by the lost-wakeup regression test to scope its drain-loop hook to + /// exactly this subscription, so it does not fire on concurrent + /// `pull_events` calls from other tests in the same process. + #[cfg(test)] + pub(super) fn first_channel_signal_raw(&self) -> isize { + self.channels[0].signal_event.0 as isize + } + /// Returns a reference to the rate limiter, if configured. pub const fn rate_limiter( &self, @@ -1005,6 +1102,7 @@ impl Drop for EventLogSubscription { #[cfg(test)] mod tests { use super::*; + use serial_test::serial; async fn create_test_checkpointer() -> (Arc, tempfile::TempDir) { let temp_dir = tempfile::TempDir::new().unwrap(); @@ -1136,6 +1234,31 @@ mod tests { drop(subscription); } + /// Test that shutdown wins when both shutdown and channel handles are signaled. + #[tokio::test] + async fn test_shutdown_signal_takes_priority_over_channel_signal() { + let mut config = WindowsEventLogConfig::default(); + config.channels = vec!["Application".to_string()]; + config.event_timeout_ms = 500; + + let (checkpointer, _temp_dir) = create_test_checkpointer().await; + + let subscription = EventLogSubscription::new(&config, checkpointer, false) + .await + .expect("Subscription creation should succeed"); + + unsafe { + let handle = HANDLE(subscription.shutdown_event_raw()); + let _ = SetEvent(handle); + } + + let result = subscription.wait_for_events_blocking(0); + assert!( + matches!(result, WaitResult::Shutdown), + "shutdown should take priority over already-signaled channels" + ); + } + /// Test pull_events with read_existing_events=true #[tokio::test] async fn test_pull_events_returns_events() { @@ -1272,4 +1395,209 @@ mod tests { // that the subscription is functional. let _events = subscription.pull_events(100).unwrap_or_default(); } + + /// Proves that `pull_events` works independently of signal state — the + /// invariant the speculative timeout pull in mod.rs relies on. + /// + /// Steps: + /// 1. Subscribe to the Application log with `read_existing_events = true`. + /// 2. Manually clear the channel signal via `ResetEvent`, simulating a lost wakeup. + /// 3. Assert `wait_for_events_blocking` times out (signal cleared, no OS wake-up). + /// 4. Assert `pull_events` still returns events — `EvtNext` fetches from the queue + /// regardless of signal state, so the speculative pull in mod.rs self-heals. + #[tokio::test] + #[serial] + async fn test_pull_events_works_with_cleared_signal() { + // Seed the Application log with a record so the "events remain + // available despite cleared signal" assertion below does not depend + // on whatever backlog the runner happens to have. Freshly provisioned + // CI images can have an empty Application log, which would otherwise + // make `pull_events` legitimately return empty and produce a spurious + // failure unrelated to the invariant under test. + let seed_output = std::process::Command::new("eventcreate") + .args([ + "/T", + "INFORMATION", + "/ID", + "100", + "/L", + "APPLICATION", + "/SO", + "VectorTestSpeculativePullSeed", + "/D", + "seed event for #25194 speculative-pull regression test", + ]) + .output() + .expect("failed to spawn eventcreate — required for deterministic seeding"); + assert!( + seed_output.status.success(), + "eventcreate failed to seed Application log (exit={:?}): stdout={:?} stderr={:?}. \ + This test requires a seeded event to be deterministic; a locked-down runner \ + without the privilege to write to Application cannot run this test reliably.", + seed_output.status.code(), + String::from_utf8_lossy(&seed_output.stdout), + String::from_utf8_lossy(&seed_output.stderr), + ); + // Give the service a moment to persist the record before we subscribe. + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + + let mut config = WindowsEventLogConfig::default(); + config.channels = vec!["Application".to_string()]; + config.read_existing_events = true; + config.event_timeout_ms = 500; + + let (checkpointer, _temp_dir) = create_test_checkpointer().await; + + let mut subscription = EventLogSubscription::new(&config, checkpointer, false) + .await + .expect("Subscription creation should succeed"); + + // Manually clear the signal to simulate a lost wakeup. The seeded + // event above guarantees at least one record is queued in EvtNext + // regardless of the runner's pre-existing log state. + let signal_raw = subscription.first_channel_signal_raw(); + unsafe { + let _ = ResetEvent(HANDLE(signal_raw as *mut std::ffi::c_void)); + } + + // Signal is cleared: an immediate (0ms) poll must report Timeout. + // A 0ms wait reads only the current signal state with no grace + // window, so unrelated Windows system events arriving between the + // `ResetEvent` above and the poll cannot re-signal the handle and + // cause a spurious failure. + let wait_result = subscription.wait_for_events_blocking(0); + + assert!( + matches!(wait_result, WaitResult::Timeout), + "expected Timeout after manual ResetEvent; signal was not cleared" + ); + + // Despite the cleared signal, pull_events must still return events. + // This is the invariant the speculative timeout pull in mod.rs depends on. + let events = subscription.pull_events(100).unwrap_or_default(); + assert!( + !events.is_empty(), + "pull_events must return events independently of signal state; \ + this is the invariant the speculative timeout pull in mod.rs depends on" + ); + } + + /// Regression test for vectordotdev/vector#25194. + /// + /// The Windows Event Log service signals the pull-mode wait handle via + /// `SetEvent` each time a new matching event is recorded. Because the + /// handle is manual-reset, `SetEvent` on an already-signaled handle is + /// a no-op. If `pull_events` resets the signal *after* draining events + /// via `EvtNext`, any signal that fires between the last `EvtNext` and + /// the `ResetEvent` call is silently lost — the subscription then + /// permanently hangs until a subsequent event arrives. + /// + /// The fix is to reset the signal *before* the drain loop, so signals + /// raised during the drain are preserved and the next wait returns + /// immediately. + /// + /// This test pins that invariant by driving the real `pull_events` + /// against a real `EvtSubscribe` handle. It installs a + /// `DRAIN_STEP_HOOK` that runs inside the drain loop after each + /// `EvtNext` and fires `SetEvent` on the subscription's signal + /// handle — simulating the OS signaling a new event arrival during + /// the drain window. After `pull_events` returns, the signal must + /// still be set — observed via a 0ms `wait_for_events_blocking` + /// so the check measures only the reset/preserve behavior of + /// `pull_events` and is not contaminated by unrelated Windows + /// system events arriving during a nonzero wait. Under the old + /// post-drain `ResetEvent` order, the hook's `SetEvent` would be + /// clobbered by the reset and the immediate poll would return + /// `Timeout` — which is exactly what #25194 reports. + #[tokio::test] + #[serial] + async fn test_pull_events_preserves_setevent_during_drain() { + use std::sync::Arc as StdArc; + + let mut config = WindowsEventLogConfig::default(); + config.channels = vec!["Application".to_string()]; + config.read_existing_events = true; + config.event_timeout_ms = 1000; + + let (checkpointer, _temp_dir) = create_test_checkpointer().await; + + let mut subscription = EventLogSubscription::new(&config, checkpointer, false) + .await + .expect("Subscription creation should succeed"); + + // Capture THIS subscription's signal handle so the hook can scope + // itself to this test. DRAIN_STEP_HOOK is a process-global, and + // cargo runs tests in parallel by default; without handle-keying, + // a concurrent test's pull_events could trigger our one-shot + // hook first, flip `fired`, and SetEvent on the wrong handle. + let target_signal_raw = subscription.first_channel_signal_raw(); + + // Install the drain-loop hook: every EvtNext call inside + // pull_events fires SetEvent on the subscription's signal + // handle. This simulates the OS signaling a fresh event + // mid-drain, which is exactly the race window #25194 exposes. + // The hook only needs to fire once to prove the invariant; we + // use an AtomicBool to keep it deterministic. The hook is keyed + // to `target_signal_raw` so concurrent pull_events calls from + // other tests no-op here. + let fired = StdArc::new(std::sync::atomic::AtomicBool::new(false)); + { + let fired = StdArc::clone(&fired); + let hook: StdArc = StdArc::new(move |signal: HANDLE| { + if signal.0 as isize != target_signal_raw { + return; + } + if !fired.swap(true, std::sync::atomic::Ordering::SeqCst) { + unsafe { + let _ = SetEvent(signal); + } + } + }); + *DRAIN_STEP_HOOK.lock().unwrap() = Some(hook); + } + + // Drop-guard: clear the hook even if the test panics, so it + // doesn't contaminate other tests in the same process. + struct HookGuard; + impl Drop for HookGuard { + fn drop(&mut self) { + *DRAIN_STEP_HOOK.lock().unwrap() = None; + } + } + let _guard = HookGuard; + + // Drive pull_events with a very large budget so the drain + // exits via ERROR_NO_MORE_ITEMS (channel_drained = true), + // which is the path that ran the post-drain ResetEvent in the + // old buggy code. Exiting via budget exhaustion would skip + // that reset and cause this test to false-pass against the + // pre-fix code. + let _ = subscription.pull_events(usize::MAX).unwrap_or_default(); + + assert!( + fired.load(std::sync::atomic::Ordering::SeqCst), + "drain-loop hook never ran — pull_events must call EvtNext \ + at least once even on an empty channel" + ); + + // Observe the signal state IMMEDIATELY with a 0ms wait. We want + // to know whether pull_events's reset clobbered the hook's + // SetEvent — NOT whether new real events arrive during some + // wait window. A nonzero timeout against the live Application + // channel lets arbitrary Windows system events re-signal us + // and false-pass against the pre-fix code. 0ms = WaitForMultiple- + // Objects returns the current state with no grace period, so + // only the reset/preserve behavior of pull_events is measured. + let result = subscription.wait_for_events_blocking(0); + + match result { + WaitResult::EventsAvailable => {} + WaitResult::Timeout => panic!( + "signal set during the drain window was lost — this is the \ + lost-wakeup race from vectordotdev/vector#25194. \ + pull_events must call ResetEvent BEFORE draining, not after." + ), + WaitResult::Shutdown => panic!("unexpected shutdown"), + } + } } diff --git a/website/cue/reference/components/sinks/generated/appsignal.cue b/website/cue/reference/components/sinks/generated/appsignal.cue index eb3710974335f..47943be3c1f93 100644 --- a/website/cue/reference/components/sinks/generated/appsignal.cue +++ b/website/cue/reference/components/sinks/generated/appsignal.cue @@ -323,6 +323,37 @@ generated: components: sinks: appsignal: configuration: { } } } + retry_strategy: { + description: """ + Configurable retry strategy for `http` based sinks. + + For more information about error responses, see [Client Error Responses][error_responses]. + + [error_responses]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Status#client_error_responses + """ + required: false + type: object: options: { + status_codes: { + description: "Retry on these specific HTTP status codes" + relevant_when: "type = \"custom\"" + required: true + type: array: items: type: uint: default: 200 + } + type: { + description: "The retry strategy enum." + required: false + type: string: { + default: "default" + enum: { + all: "Retry on *all* HTTP status codes except for success codes (2xx)" + custom: "Custom retry strategy" + default: "Default strategy. See [`RetryStrategy::retry_action`] for more details." + none: "Don't retry any errors, including request timeouts." + } + } + } + } + } tls: { description: "Configures the TLS options for incoming/outgoing connections." required: false diff --git a/website/cue/reference/components/sinks/generated/axiom.cue b/website/cue/reference/components/sinks/generated/axiom.cue index d4dc62e6294fc..a1f42404ac0c1 100644 --- a/website/cue/reference/components/sinks/generated/axiom.cue +++ b/website/cue/reference/components/sinks/generated/axiom.cue @@ -305,6 +305,37 @@ generated: components: sinks: axiom: configuration: { } } } + retry_strategy: { + description: """ + Configurable retry strategy for `http` based sinks. + + For more information about error responses, see [Client Error Responses][error_responses]. + + [error_responses]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Status#client_error_responses + """ + required: false + type: object: options: { + status_codes: { + description: "Retry on these specific HTTP status codes" + relevant_when: "type = \"custom\"" + required: true + type: array: items: type: uint: default: 200 + } + type: { + description: "The retry strategy enum." + required: false + type: string: { + default: "default" + enum: { + all: "Retry on *all* HTTP status codes except for success codes (2xx)" + custom: "Custom retry strategy" + default: "Default strategy. See [`RetryStrategy::retry_action`] for more details." + none: "Don't retry any errors, including request timeouts." + } + } + } + } + } tls: { description: """ The TLS settings for the connection. diff --git a/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue b/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue index 2aaf9b628e3c6..68944f5a6e7d5 100644 --- a/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue +++ b/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue @@ -408,6 +408,37 @@ generated: components: sinks: azure_logs_ingestion: configuration: { } } } + retry_strategy: { + description: """ + Configurable retry strategy for `http` based sinks. + + For more information about error responses, see [Client Error Responses][error_responses]. + + [error_responses]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Status#client_error_responses + """ + required: false + type: object: options: { + status_codes: { + description: "Retry on these specific HTTP status codes" + relevant_when: "type = \"custom\"" + required: true + type: array: items: type: uint: default: 200 + } + type: { + description: "The retry strategy enum." + required: false + type: string: { + default: "default" + enum: { + all: "Retry on *all* HTTP status codes except for success codes (2xx)" + custom: "Custom retry strategy" + default: "Default strategy. See [`RetryStrategy::retry_action`] for more details." + none: "Don't retry any errors, including request timeouts." + } + } + } + } + } stream_name: { description: """ The [Stream name][stream_name] for the Data collection rule. diff --git a/website/cue/reference/components/sinks/generated/azure_monitor_logs.cue b/website/cue/reference/components/sinks/generated/azure_monitor_logs.cue index 98b14f8b4faad..c7039b70097e1 100644 --- a/website/cue/reference/components/sinks/generated/azure_monitor_logs.cue +++ b/website/cue/reference/components/sinks/generated/azure_monitor_logs.cue @@ -314,6 +314,37 @@ generated: components: sinks: azure_monitor_logs: configuration: { } } } + retry_strategy: { + description: """ + Configurable retry strategy for `http` based sinks. + + For more information about error responses, see [Client Error Responses][error_responses]. + + [error_responses]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Status#client_error_responses + """ + required: false + type: object: options: { + status_codes: { + description: "Retry on these specific HTTP status codes" + relevant_when: "type = \"custom\"" + required: true + type: array: items: type: uint: default: 200 + } + type: { + description: "The retry strategy enum." + required: false + type: string: { + default: "default" + enum: { + all: "Retry on *all* HTTP status codes except for success codes (2xx)" + custom: "Custom retry strategy" + default: "Default strategy. See [`RetryStrategy::retry_action`] for more details." + none: "Don't retry any errors, including request timeouts." + } + } + } + } + } shared_key: { description: """ The [primary or the secondary key][shared_key] for the Log Analytics workspace. diff --git a/website/cue/reference/components/sinks/generated/datadog_events.cue b/website/cue/reference/components/sinks/generated/datadog_events.cue index 7316613acef4e..23b9fb43d6413 100644 --- a/website/cue/reference/components/sinks/generated/datadog_events.cue +++ b/website/cue/reference/components/sinks/generated/datadog_events.cue @@ -242,6 +242,37 @@ generated: components: sinks: datadog_events: configuration: { } } } + retry_strategy: { + description: """ + Configurable retry strategy for `http` based sinks. + + For more information about error responses, see [Client Error Responses][error_responses]. + + [error_responses]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Status#client_error_responses + """ + required: false + type: object: options: { + status_codes: { + description: "Retry on these specific HTTP status codes" + relevant_when: "type = \"custom\"" + required: true + type: array: items: type: uint: default: 200 + } + type: { + description: "The retry strategy enum." + required: false + type: string: { + default: "default" + enum: { + all: "Retry on *all* HTTP status codes except for success codes (2xx)" + custom: "Custom retry strategy" + default: "Default strategy. See [`RetryStrategy::retry_action`] for more details." + none: "Don't retry any errors, including request timeouts." + } + } + } + } + } site: { description: """ The Datadog [site][dd_site] to send observability data to. diff --git a/website/cue/reference/components/sinks/generated/gcp_stackdriver_logs.cue b/website/cue/reference/components/sinks/generated/gcp_stackdriver_logs.cue index 2909192f14d3f..9d5e76262bfd2 100644 --- a/website/cue/reference/components/sinks/generated/gcp_stackdriver_logs.cue +++ b/website/cue/reference/components/sinks/generated/gcp_stackdriver_logs.cue @@ -420,6 +420,37 @@ generated: components: sinks: gcp_stackdriver_logs: configuration: { } } } + retry_strategy: { + description: """ + Configurable retry strategy for `http` based sinks. + + For more information about error responses, see [Client Error Responses][error_responses]. + + [error_responses]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Status#client_error_responses + """ + required: false + type: object: options: { + status_codes: { + description: "Retry on these specific HTTP status codes" + relevant_when: "type = \"custom\"" + required: true + type: array: items: type: uint: default: 200 + } + type: { + description: "The retry strategy enum." + required: false + type: string: { + default: "default" + enum: { + all: "Retry on *all* HTTP status codes except for success codes (2xx)" + custom: "Custom retry strategy" + default: "Default strategy. See [`RetryStrategy::retry_action`] for more details." + none: "Don't retry any errors, including request timeouts." + } + } + } + } + } severity_key: { description: """ The field of the log event from which to take the outgoing log’s `severity` field. diff --git a/website/cue/reference/components/sinks/generated/gcp_stackdriver_metrics.cue b/website/cue/reference/components/sinks/generated/gcp_stackdriver_metrics.cue index 23ef271177f3a..52c344e278250 100644 --- a/website/cue/reference/components/sinks/generated/gcp_stackdriver_metrics.cue +++ b/website/cue/reference/components/sinks/generated/gcp_stackdriver_metrics.cue @@ -334,6 +334,37 @@ generated: components: sinks: gcp_stackdriver_metrics: configuration: { } } } + retry_strategy: { + description: """ + Configurable retry strategy for `http` based sinks. + + For more information about error responses, see [Client Error Responses][error_responses]. + + [error_responses]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Status#client_error_responses + """ + required: false + type: object: options: { + status_codes: { + description: "Retry on these specific HTTP status codes" + relevant_when: "type = \"custom\"" + required: true + type: array: items: type: uint: default: 200 + } + type: { + description: "The retry strategy enum." + required: false + type: string: { + default: "default" + enum: { + all: "Retry on *all* HTTP status codes except for success codes (2xx)" + custom: "Custom retry strategy" + default: "Default strategy. See [`RetryStrategy::retry_action`] for more details." + none: "Don't retry any errors, including request timeouts." + } + } + } + } + } tls: { description: "TLS configuration." required: false diff --git a/website/cue/reference/components/sinks/generated/honeycomb.cue b/website/cue/reference/components/sinks/generated/honeycomb.cue index 66ba9ed795a27..c65064a135a00 100644 --- a/website/cue/reference/components/sinks/generated/honeycomb.cue +++ b/website/cue/reference/components/sinks/generated/honeycomb.cue @@ -321,4 +321,35 @@ generated: components: sinks: honeycomb: configuration: { } } } + retry_strategy: { + description: """ + Configurable retry strategy for `http` based sinks. + + For more information about error responses, see [Client Error Responses][error_responses]. + + [error_responses]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Status#client_error_responses + """ + required: false + type: object: options: { + status_codes: { + description: "Retry on these specific HTTP status codes" + relevant_when: "type = \"custom\"" + required: true + type: array: items: type: uint: default: 200 + } + type: { + description: "The retry strategy enum." + required: false + type: string: { + default: "default" + enum: { + all: "Retry on *all* HTTP status codes except for success codes (2xx)" + custom: "Custom retry strategy" + default: "Default strategy. See [`RetryStrategy::retry_action`] for more details." + none: "Don't retry any errors, including request timeouts." + } + } + } + } + } } diff --git a/website/cue/reference/components/sinks/generated/http.cue b/website/cue/reference/components/sinks/generated/http.cue index 0fcc53fab8092..8523a27dd0d9b 100644 --- a/website/cue/reference/components/sinks/generated/http.cue +++ b/website/cue/reference/components/sinks/generated/http.cue @@ -1024,6 +1024,37 @@ generated: components: sinks: http: configuration: { } } } + retry_strategy: { + description: """ + Configurable retry strategy for `http` based sinks. + + For more information about error responses, see [Client Error Responses][error_responses]. + + [error_responses]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Status#client_error_responses + """ + required: false + type: object: options: { + status_codes: { + description: "Retry on these specific HTTP status codes" + relevant_when: "type = \"custom\"" + required: true + type: array: items: type: uint: default: 200 + } + type: { + description: "The retry strategy enum." + required: false + type: string: { + default: "default" + enum: { + all: "Retry on *all* HTTP status codes except for success codes (2xx)" + custom: "Custom retry strategy" + default: "Default strategy. See [`RetryStrategy::retry_action`] for more details." + none: "Don't retry any errors, including request timeouts." + } + } + } + } + } tls: { description: "TLS configuration." required: false diff --git a/website/cue/reference/components/sinks/generated/keep.cue b/website/cue/reference/components/sinks/generated/keep.cue index 29b1d7536611b..d2ed08114a2a9 100644 --- a/website/cue/reference/components/sinks/generated/keep.cue +++ b/website/cue/reference/components/sinks/generated/keep.cue @@ -286,4 +286,35 @@ generated: components: sinks: keep: configuration: { } } } + retry_strategy: { + description: """ + Configurable retry strategy for `http` based sinks. + + For more information about error responses, see [Client Error Responses][error_responses]. + + [error_responses]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Status#client_error_responses + """ + required: false + type: object: options: { + status_codes: { + description: "Retry on these specific HTTP status codes" + relevant_when: "type = \"custom\"" + required: true + type: array: items: type: uint: default: 200 + } + type: { + description: "The retry strategy enum." + required: false + type: string: { + default: "default" + enum: { + all: "Retry on *all* HTTP status codes except for success codes (2xx)" + custom: "Custom retry strategy" + default: "Default strategy. See [`RetryStrategy::retry_action`] for more details." + none: "Don't retry any errors, including request timeouts." + } + } + } + } + } } diff --git a/website/cue/reference/components/sinks/generated/opentelemetry.cue b/website/cue/reference/components/sinks/generated/opentelemetry.cue index 4505d8a9bf512..4e8e9d9ff7210 100644 --- a/website/cue/reference/components/sinks/generated/opentelemetry.cue +++ b/website/cue/reference/components/sinks/generated/opentelemetry.cue @@ -1025,6 +1025,37 @@ generated: components: sinks: opentelemetry: configuration: protocol: { } } } + retry_strategy: { + description: """ + Configurable retry strategy for `http` based sinks. + + For more information about error responses, see [Client Error Responses][error_responses]. + + [error_responses]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Status#client_error_responses + """ + required: false + type: object: options: { + status_codes: { + description: "Retry on these specific HTTP status codes" + relevant_when: "type = \"custom\"" + required: true + type: array: items: type: uint: default: 200 + } + type: { + description: "The retry strategy enum." + required: false + type: string: { + default: "default" + enum: { + all: "Retry on *all* HTTP status codes except for success codes (2xx)" + custom: "Custom retry strategy" + default: "Default strategy. See [`RetryStrategy::retry_action`] for more details." + none: "Don't retry any errors, including request timeouts." + } + } + } + } + } tls: { description: "TLS configuration." required: false diff --git a/website/cue/reference/components/sinks/generated/prometheus_remote_write.cue b/website/cue/reference/components/sinks/generated/prometheus_remote_write.cue index c13fa424999ec..5d74d52359bdd 100644 --- a/website/cue/reference/components/sinks/generated/prometheus_remote_write.cue +++ b/website/cue/reference/components/sinks/generated/prometheus_remote_write.cue @@ -539,6 +539,37 @@ generated: components: sinks: prometheus_remote_write: configuration: { } } } + retry_strategy: { + description: """ + Configurable retry strategy for `http` based sinks. + + For more information about error responses, see [Client Error Responses][error_responses]. + + [error_responses]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Status#client_error_responses + """ + required: false + type: object: options: { + status_codes: { + description: "Retry on these specific HTTP status codes" + relevant_when: "type = \"custom\"" + required: true + type: array: items: type: uint: default: 200 + } + type: { + description: "The retry strategy enum." + required: false + type: string: { + default: "default" + enum: { + all: "Retry on *all* HTTP status codes except for success codes (2xx)" + custom: "Custom retry strategy" + default: "Default strategy. See [`RetryStrategy::retry_action`] for more details." + none: "Don't retry any errors, including request timeouts." + } + } + } + } + } tenant_id: { description: """ The tenant ID to send.