Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `aws_s3` and `clickhouse` sinks now correctly advertise only the `batch_encoding.codec` values they actually support: `parquet` for `aws_s3` and `arrow_stream` for `clickhouse`. Previously the documentation and configuration schema listed both codecs for both sinks, even though picking the wrong one produced a startup error.

authors: flaviofcruz
139 changes: 60 additions & 79 deletions src/sinks/aws_s3/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use tower::ServiceBuilder;
#[cfg(feature = "codecs-parquet")]
use vector_lib::codecs::BatchEncoder;
#[cfg(feature = "codecs-parquet")]
use vector_lib::codecs::encoding::BatchSerializerConfig;
use vector_lib::codecs::encoding::{BatchSerializerConfig, format::ParquetSerializerConfig};
use vector_lib::{
TimeZone,
codecs::{
Expand Down Expand Up @@ -37,6 +37,21 @@ use crate::{
tls::TlsConfig,
};

/// Batch encoding configuration for the `aws_s3` sink.
#[cfg(feature = "codecs-parquet")]
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(tag = "codec", rename_all = "snake_case")]
#[configurable(metadata(
docs::enum_tag_description = "The codec to use for batch encoding events."
))]
pub enum S3BatchEncoding {
/// Encodes events in [Apache Parquet][apache_parquet] columnar format.
///
/// [apache_parquet]: https://parquet.apache.org/
Parquet(ParquetSerializerConfig),
}

/// Configuration for the `aws_s3` sink.
#[configurable_component(sink(
"aws_s3",
Expand Down Expand Up @@ -111,15 +126,13 @@ pub struct S3SinkConfig {

/// Batch encoding configuration for columnar formats.
///
/// When set, events are encoded together as a batch in a columnar format (for example, Parquet)
/// When set, events are encoded together as a batch in a columnar format (Parquet)
/// instead of the standard per-event framing-based encoding. The columnar format handles
/// its own internal compression, so the top-level `compression` setting is bypassed.
///
/// Only the `parquet` codec is supported by the AWS S3 sink.
#[cfg(feature = "codecs-parquet")]
#[configurable(derived)]
#[serde(default)]
pub batch_encoding: Option<BatchSerializerConfig>,
pub batch_encoding: Option<S3BatchEncoding>,
Comment thread
pront marked this conversation as resolved.

/// Compression configuration.
///
Expand Down Expand Up @@ -220,8 +233,10 @@ impl SinkConfig for S3SinkConfig {

fn input(&self) -> Input {
#[cfg(feature = "codecs-parquet")]
if let Some(batch_config) = &self.batch_encoding {
return Input::new(batch_config.input_type());
if let Some(batch_encoding) = &self.batch_encoding {
let S3BatchEncoding::Parquet(parquet_config) = batch_encoding;
let resolved = BatchSerializerConfig::Parquet(parquet_config.clone());
return Input::new(resolved.input_type());
}
Input::new(self.encoding.config().1.input_type())
}
Expand Down Expand Up @@ -272,15 +287,11 @@ impl S3SinkConfig {
// When batch_encoding is configured (e.g., Parquet), use batch mode
// with internal compression and appropriate file extension.
#[cfg(feature = "codecs-parquet")]
if let Some(batch_config) = &self.batch_encoding {
if !matches!(batch_config, BatchSerializerConfig::Parquet(_)) {
return Err(
"batch_encoding only supports encoding with parquet format for amazon s3 sink"
.into(),
);
}
if let Some(batch_encoding) = &self.batch_encoding {
let S3BatchEncoding::Parquet(parquet_config) = batch_encoding;
let resolved_batch_config = BatchSerializerConfig::Parquet(parquet_config.clone());

let batch_serializer = batch_config.build_batch_serializer()?;
let batch_serializer = resolved_batch_config.build_batch_serializer()?;
let batch_encoder = BatchEncoder::new(batch_serializer);

// Auto-detect Content-Type from batch format. Users can still
Expand All @@ -292,15 +303,14 @@ impl S3SinkConfig {

let encoder = EncoderKind::Batch(batch_encoder);

// Auto-detect file extension from batch format
let filename_extension =
self.filename_extension
.clone()
.or_else(|| match batch_config {
BatchSerializerConfig::Parquet(_) => Some("parquet".to_string()),
#[allow(unreachable_patterns)]
_ => None,
});
let filename_extension = self.filename_extension.clone().or_else(|| {
Some(
match batch_encoding {
S3BatchEncoding::Parquet(_) => "parquet",
}
.to_string(),
)
});

if self.compression != Compression::None {
warn!("Top level compression setting ignored when batch_encoding set to parquet.")
Expand Down Expand Up @@ -392,15 +402,10 @@ mod tests {
let batch_enc = config
.batch_encoding
.expect("batch_encoding should be Some");
match batch_enc {
vector_lib::codecs::encoding::BatchSerializerConfig::Parquet(ref p) => {
use vector_lib::codecs::encoding::format::{ParquetCompression, ParquetSchemaMode};
assert_eq!(p.schema_mode, ParquetSchemaMode::AutoInfer);
assert_eq!(p.compression, ParquetCompression::Snappy);
}
#[allow(unreachable_patterns)]
_ => panic!("expected Parquet variant"),
}
let super::S3BatchEncoding::Parquet(ref p) = batch_enc;
use vector_lib::codecs::encoding::format::{ParquetCompression, ParquetSchemaMode};
assert_eq!(p.schema_mode, ParquetSchemaMode::AutoInfer);
assert_eq!(p.compression, ParquetCompression::Snappy);
}

/// Content-Type must be auto-detected as `application/vnd.apache.parquet`
Expand Down Expand Up @@ -432,7 +437,7 @@ mod tests {
options: S3Options::default(),
region: crate::aws::RegionOrEndpoint::with_both("us-east-1", "http://localhost:4566"),
encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
batch_encoding: Some(BatchSerializerConfig::Parquet(parquet_config)),
batch_encoding: Some(super::S3BatchEncoding::Parquet(parquet_config)),
compression: Compression::None,
batch: BatchConfig::<BulkSizeBasedDefaultBatchSettings>::default(),
request: Default::default(),
Expand All @@ -444,7 +449,8 @@ mod tests {
retry_strategy: Default::default(),
};

let batch_config = config.batch_encoding.as_ref().unwrap();
let super::S3BatchEncoding::Parquet(p) = config.batch_encoding.as_ref().unwrap();
let batch_config = BatchSerializerConfig::Parquet(p.clone());
let batch_serializer = batch_config.build_batch_serializer().unwrap();
let batch_encoder = vector_lib::codecs::BatchEncoder::new(batch_serializer);

Expand Down Expand Up @@ -484,7 +490,8 @@ mod tests {
)
.unwrap();

let batch_config = config.batch_encoding.as_ref().unwrap();
let super::S3BatchEncoding::Parquet(p) = config.batch_encoding.as_ref().unwrap();
let batch_config = vector_lib::codecs::encoding::BatchSerializerConfig::Parquet(p.clone());
let batch_serializer = batch_config.build_batch_serializer().unwrap();
let batch_encoder = vector_lib::codecs::BatchEncoder::new(batch_serializer);

Expand All @@ -500,43 +507,27 @@ mod tests {
);
}

/// Parquet filename extension defaults to `.parquet` when not explicitly set.
/// Codecs other than `parquet` must be rejected at parse time, since
/// `S3BatchEncoding` only exposes the `parquet` variant.
#[cfg(feature = "codecs-parquet")]
#[test]
fn parquet_filename_extension_defaults_to_parquet() {
let config: S3SinkConfig = toml::from_str(
fn parquet_batch_encoding_rejects_unsupported_codec() {
let err = serde_yaml::from_str::<S3SinkConfig>(
r#"
bucket = "test-bucket"
compression = "none"

[encoding]
codec = "text"

[batch_encoding]
codec = "parquet"
schema_mode = "auto_infer"
bucket: test-bucket
compression: none
encoding:
codec: text
batch_encoding:
codec: arrow_stream
"#,
)
.unwrap();
.unwrap_err();

assert!(
config.filename_extension.is_none(),
"fixture must not set filename_extension"
err.to_string().contains("arrow_stream"),
"expected error to mention the offending codec, got: {err}"
);

let batch_config = config.batch_encoding.as_ref().unwrap();
let extension = config
.filename_extension
.clone()
.or_else(|| match batch_config {
vector_lib::codecs::encoding::BatchSerializerConfig::Parquet(_) => {
Some("parquet".to_string())
}
#[allow(unreachable_patterns)]
_ => None,
});

assert_eq!(extension.as_deref(), Some("parquet"));
}

/// Explicit filename_extension overrides the `.parquet` default.
Expand Down Expand Up @@ -582,13 +573,8 @@ mod tests {
)
.unwrap();

match config.batch_encoding.unwrap() {
vector_lib::codecs::encoding::BatchSerializerConfig::Parquet(p) => {
assert_eq!(p.schema_mode, ParquetSchemaMode::Relaxed);
}
#[allow(unreachable_patterns)]
_ => panic!("expected Parquet variant"),
}
let super::S3BatchEncoding::Parquet(p) = config.batch_encoding.unwrap();
assert_eq!(p.schema_mode, ParquetSchemaMode::Relaxed);
}

/// Explicit `schema_mode = "strict"` is correctly parsed.
Expand All @@ -613,12 +599,7 @@ mod tests {
)
.unwrap();

match config.batch_encoding.unwrap() {
vector_lib::codecs::encoding::BatchSerializerConfig::Parquet(p) => {
assert_eq!(p.schema_mode, ParquetSchemaMode::Strict);
}
#[allow(unreachable_patterns)]
_ => panic!("expected Parquet variant"),
}
let super::S3BatchEncoding::Parquet(p) = config.batch_encoding.unwrap();
assert_eq!(p.schema_mode, ParquetSchemaMode::Strict);
}
}
6 changes: 3 additions & 3 deletions src/sinks/aws_s3/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use std::{
time::Duration,
};

#[cfg(feature = "codecs-parquet")]
use super::config::S3BatchEncoding;
use aws_sdk_s3::{
Client as S3Client,
operation::{create_bucket::CreateBucketError, get_object::GetObjectOutput},
Expand All @@ -22,8 +24,6 @@ use futures::{Stream, stream};
use similar_asserts::assert_eq;
use tempfile::TempDir;
use tokio_stream::StreamExt;
#[cfg(feature = "codecs-parquet")]
use vector_lib::codecs::encoding::BatchSerializerConfig;
use vector_lib::{
buffers::{BufferConfig, BufferType, WhenFull},
codecs::{TextSerializerConfig, encoding::FramingConfig},
Expand Down Expand Up @@ -502,7 +502,7 @@ async fn s3_parquet_insert_message() {
};

let config = S3SinkConfig {
batch_encoding: Some(BatchSerializerConfig::Parquet(parquet_config)),
batch_encoding: Some(S3BatchEncoding::Parquet(parquet_config)),
..config(&bucket, 100, 5.0)
};

Expand Down
61 changes: 45 additions & 16 deletions src/sinks/clickhouse/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::fmt;

use http::{Request, StatusCode, Uri};
use hyper::Body;
use vector_lib::codecs::encoding::ArrowStreamSerializerConfig;
use vector_lib::codecs::encoding::format::SchemaProvider;
use vector_lib::codecs::encoding::{ArrowStreamSerializerConfig, BatchSerializerConfig};

use super::{
request_builder::ClickhouseRequestBuilder,
Expand Down Expand Up @@ -45,6 +45,23 @@ pub enum Format {
ArrowStream,
}

/// Batch encoding configuration for the `clickhouse` sink.
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(tag = "codec", rename_all = "snake_case")]
#[configurable(metadata(
docs::enum_tag_description = "The codec to use for batch encoding events."
))]
pub enum ClickhouseBatchEncoding {
/// Encodes events in [Apache Arrow][apache_arrow] IPC streaming format.
///
/// This is the streaming variant of the Arrow IPC format, which writes
/// a continuous stream of record batches.
///
/// [apache_arrow]: https://arrow.apache.org/
ArrowStream(ArrowStreamSerializerConfig),
}

impl fmt::Display for Format {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Expand Down Expand Up @@ -104,11 +121,9 @@ pub struct ClickhouseConfig {
///
/// When specified, events are encoded together as a single batch.
/// This is mutually exclusive with per-event encoding based on the `format` field.
///
/// Only the `arrow_stream` codec is supported by the ClickHouse sink.
#[configurable(derived)]
#[serde(default)]
pub batch_encoding: Option<BatchSerializerConfig>,
pub batch_encoding: Option<ClickhouseBatchEncoding>,

#[configurable(derived)]
#[serde(default)]
Expand Down Expand Up @@ -282,6 +297,7 @@ impl ClickhouseConfig {

if let Some(batch_encoding) = &self.batch_encoding {
use vector_lib::codecs::BatchEncoder;
use vector_lib::codecs::encoding::BatchSerializerConfig;

// Validate that batch_encoding is only compatible with ArrowStream format
if self.format != Format::ArrowStream {
Expand All @@ -292,16 +308,8 @@ impl ClickhouseConfig {
.into());
}

let mut arrow_config = match batch_encoding {
BatchSerializerConfig::ArrowStream(config) => config.clone(),
#[cfg(feature = "codecs-parquet")]
BatchSerializerConfig::Parquet(_) => {
return Err(
"ClickHouse sink does not support Parquet batch encoding. Use 'arrow_stream' instead."
.into(),
);
}
};
let ClickhouseBatchEncoding::ArrowStream(arrow_config) = batch_encoding;
let mut arrow_config = arrow_config.clone();

self.resolve_arrow_schema(
client,
Expand Down Expand Up @@ -433,10 +441,31 @@ mod tests {
);
}

/// Codecs other than `arrow_stream` must be rejected at parse time, since
/// `ClickhouseBatchEncoding` only exposes the `arrow_stream` variant.
#[cfg(feature = "codecs-parquet")]
#[test]
fn batch_encoding_rejects_unsupported_codec() {
let err = serde_yaml::from_str::<ClickhouseConfig>(
r#"
endpoint: http://localhost:8123
table: test_table
batch_encoding:
codec: parquet
"#,
)
.unwrap_err();

assert!(
err.to_string().contains("parquet"),
"expected error to mention the offending codec, got: {err}"
);
}

/// Helper to create a minimal ClickhouseConfig for testing
fn create_test_config(
format: Format,
batch_encoding: Option<BatchSerializerConfig>,
batch_encoding: Option<ClickhouseBatchEncoding>,
) -> ClickhouseConfig {
ClickhouseConfig {
endpoint: "http://localhost:8123".parse::<http::Uri>().unwrap().into(),
Expand Down Expand Up @@ -469,7 +498,7 @@ mod tests {
for (format, format_name) in incompatible_formats {
let config = create_test_config(
format,
Some(BatchSerializerConfig::ArrowStream(
Some(ClickhouseBatchEncoding::ArrowStream(
ArrowStreamSerializerConfig::default(),
)),
);
Expand Down
Loading
Loading