Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `clickhouse` and `aws_s3` sinks now use dedicated `batch_encoding` types that only expose the codecs each sink actually supports (`arrow_stream` for `clickhouse`, `parquet` for `aws_s3`). Previously the shared `BatchSerializerConfig` schema advertised codecs that were rejected at config-build time.

authors: flaviofcruz
107 changes: 49 additions & 58 deletions src/sinks/aws_s3/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use tower::ServiceBuilder;
#[cfg(feature = "codecs-parquet")]
use vector_lib::codecs::BatchEncoder;
#[cfg(feature = "codecs-parquet")]
use vector_lib::codecs::encoding::BatchSerializerConfig;
use vector_lib::codecs::encoding::{BatchSerializerConfig, format::ParquetSerializerConfig};
use vector_lib::{
TimeZone,
codecs::{
Expand Down Expand Up @@ -37,6 +37,21 @@ use crate::{
tls::TlsConfig,
};

/// Batch encoding configuration for the `aws_s3` sink.
#[cfg(feature = "codecs-parquet")]
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(tag = "codec", rename_all = "snake_case")]
#[configurable(metadata(
docs::enum_tag_description = "The codec to use for batch encoding events."
))]
pub enum S3BatchEncoding {
/// Encodes events in [Apache Parquet][apache_parquet] columnar format.
///
/// [apache_parquet]: https://parquet.apache.org/
Parquet(ParquetSerializerConfig),
}

/// Configuration for the `aws_s3` sink.
#[configurable_component(sink(
"aws_s3",
Expand Down Expand Up @@ -111,15 +126,13 @@ pub struct S3SinkConfig {

/// Batch encoding configuration for columnar formats.
///
/// When set, events are encoded together as a batch in a columnar format (for example, Parquet)
/// When set, events are encoded together as a batch in a columnar format (Parquet)
/// instead of the standard per-event framing-based encoding. The columnar format handles
/// its own internal compression, so the top-level `compression` setting is bypassed.
///
/// Only the `parquet` codec is supported by the AWS S3 sink.
#[cfg(feature = "codecs-parquet")]
#[configurable(derived)]
#[serde(default)]
pub batch_encoding: Option<BatchSerializerConfig>,
pub batch_encoding: Option<S3BatchEncoding>,
Comment thread
pront marked this conversation as resolved.

/// Compression configuration.
///
Expand Down Expand Up @@ -220,8 +233,10 @@ impl SinkConfig for S3SinkConfig {

fn input(&self) -> Input {
#[cfg(feature = "codecs-parquet")]
if let Some(batch_config) = &self.batch_encoding {
return Input::new(batch_config.input_type());
if let Some(batch_encoding) = &self.batch_encoding {
let S3BatchEncoding::Parquet(parquet_config) = batch_encoding;
let resolved = BatchSerializerConfig::Parquet(parquet_config.clone());
return Input::new(resolved.input_type());
}
Input::new(self.encoding.config().1.input_type())
}
Expand Down Expand Up @@ -272,15 +287,11 @@ impl S3SinkConfig {
// When batch_encoding is configured (e.g., Parquet), use batch mode
// with internal compression and appropriate file extension.
#[cfg(feature = "codecs-parquet")]
if let Some(batch_config) = &self.batch_encoding {
if !matches!(batch_config, BatchSerializerConfig::Parquet(_)) {
return Err(
"batch_encoding only supports encoding with parquet format for amazon s3 sink"
.into(),
);
}
if let Some(batch_encoding) = &self.batch_encoding {
let S3BatchEncoding::Parquet(parquet_config) = batch_encoding;
let resolved_batch_config = BatchSerializerConfig::Parquet(parquet_config.clone());

let batch_serializer = batch_config.build_batch_serializer()?;
let batch_serializer = resolved_batch_config.build_batch_serializer()?;
let batch_encoder = BatchEncoder::new(batch_serializer);

// Auto-detect Content-Type from batch format. Users can still
Expand All @@ -292,15 +303,14 @@ impl S3SinkConfig {

let encoder = EncoderKind::Batch(batch_encoder);

// Auto-detect file extension from batch format
let filename_extension =
self.filename_extension
.clone()
.or_else(|| match batch_config {
BatchSerializerConfig::Parquet(_) => Some("parquet".to_string()),
#[allow(unreachable_patterns)]
_ => None,
});
let filename_extension = self.filename_extension.clone().or_else(|| {
Some(
match batch_encoding {
S3BatchEncoding::Parquet(_) => "parquet",
}
.to_string(),
)
});

if self.compression != Compression::None {
warn!("Top level compression setting ignored when batch_encoding set to parquet.")
Expand Down Expand Up @@ -392,15 +402,10 @@ mod tests {
let batch_enc = config
.batch_encoding
.expect("batch_encoding should be Some");
match batch_enc {
vector_lib::codecs::encoding::BatchSerializerConfig::Parquet(ref p) => {
use vector_lib::codecs::encoding::format::{ParquetCompression, ParquetSchemaMode};
assert_eq!(p.schema_mode, ParquetSchemaMode::AutoInfer);
assert_eq!(p.compression, ParquetCompression::Snappy);
}
#[allow(unreachable_patterns)]
_ => panic!("expected Parquet variant"),
}
let super::S3BatchEncoding::Parquet(ref p) = batch_enc;
use vector_lib::codecs::encoding::format::{ParquetCompression, ParquetSchemaMode};
assert_eq!(p.schema_mode, ParquetSchemaMode::AutoInfer);
assert_eq!(p.compression, ParquetCompression::Snappy);
}

/// Content-Type must be auto-detected as `application/vnd.apache.parquet`
Expand Down Expand Up @@ -432,7 +437,7 @@ mod tests {
options: S3Options::default(),
region: crate::aws::RegionOrEndpoint::with_both("us-east-1", "http://localhost:4566"),
encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
batch_encoding: Some(BatchSerializerConfig::Parquet(parquet_config)),
batch_encoding: Some(super::S3BatchEncoding::Parquet(parquet_config)),
compression: Compression::None,
batch: BatchConfig::<BulkSizeBasedDefaultBatchSettings>::default(),
request: Default::default(),
Expand All @@ -444,7 +449,8 @@ mod tests {
retry_strategy: Default::default(),
};

let batch_config = config.batch_encoding.as_ref().unwrap();
let super::S3BatchEncoding::Parquet(p) = config.batch_encoding.as_ref().unwrap();
let batch_config = BatchSerializerConfig::Parquet(p.clone());
let batch_serializer = batch_config.build_batch_serializer().unwrap();
let batch_encoder = vector_lib::codecs::BatchEncoder::new(batch_serializer);

Expand Down Expand Up @@ -484,7 +490,8 @@ mod tests {
)
.unwrap();

let batch_config = config.batch_encoding.as_ref().unwrap();
let super::S3BatchEncoding::Parquet(p) = config.batch_encoding.as_ref().unwrap();
let batch_config = vector_lib::codecs::encoding::BatchSerializerConfig::Parquet(p.clone());
let batch_serializer = batch_config.build_batch_serializer().unwrap();
let batch_encoder = vector_lib::codecs::BatchEncoder::new(batch_serializer);

Expand Down Expand Up @@ -524,17 +531,11 @@ mod tests {
"fixture must not set filename_extension"
);

let batch_config = config.batch_encoding.as_ref().unwrap();
assert!(config.batch_encoding.is_some());
let extension = config
.filename_extension
.clone()
.or_else(|| match batch_config {
vector_lib::codecs::encoding::BatchSerializerConfig::Parquet(_) => {
Some("parquet".to_string())
}
#[allow(unreachable_patterns)]
_ => None,
});
.or_else(|| Some("parquet".to_string()));

assert_eq!(extension.as_deref(), Some("parquet"));
}
Expand Down Expand Up @@ -582,13 +583,8 @@ mod tests {
)
.unwrap();

match config.batch_encoding.unwrap() {
vector_lib::codecs::encoding::BatchSerializerConfig::Parquet(p) => {
assert_eq!(p.schema_mode, ParquetSchemaMode::Relaxed);
}
#[allow(unreachable_patterns)]
_ => panic!("expected Parquet variant"),
}
let super::S3BatchEncoding::Parquet(p) = config.batch_encoding.unwrap();
assert_eq!(p.schema_mode, ParquetSchemaMode::Relaxed);
}

/// Explicit `schema_mode = "strict"` is correctly parsed.
Expand All @@ -613,12 +609,7 @@ mod tests {
)
.unwrap();

match config.batch_encoding.unwrap() {
vector_lib::codecs::encoding::BatchSerializerConfig::Parquet(p) => {
assert_eq!(p.schema_mode, ParquetSchemaMode::Strict);
}
#[allow(unreachable_patterns)]
_ => panic!("expected Parquet variant"),
}
let super::S3BatchEncoding::Parquet(p) = config.batch_encoding.unwrap();
assert_eq!(p.schema_mode, ParquetSchemaMode::Strict);
}
}
6 changes: 3 additions & 3 deletions src/sinks/aws_s3/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use std::{
time::Duration,
};

#[cfg(feature = "codecs-parquet")]
use super::config::S3BatchEncoding;
use aws_sdk_s3::{
Client as S3Client,
operation::{create_bucket::CreateBucketError, get_object::GetObjectOutput},
Expand All @@ -22,8 +24,6 @@ use futures::{Stream, stream};
use similar_asserts::assert_eq;
use tempfile::TempDir;
use tokio_stream::StreamExt;
#[cfg(feature = "codecs-parquet")]
use vector_lib::codecs::encoding::BatchSerializerConfig;
use vector_lib::{
buffers::{BufferConfig, BufferType, WhenFull},
codecs::{TextSerializerConfig, encoding::FramingConfig},
Expand Down Expand Up @@ -502,7 +502,7 @@ async fn s3_parquet_insert_message() {
};

let config = S3SinkConfig {
batch_encoding: Some(BatchSerializerConfig::Parquet(parquet_config)),
batch_encoding: Some(S3BatchEncoding::Parquet(parquet_config)),
..config(&bucket, 100, 5.0)
};

Expand Down
40 changes: 24 additions & 16 deletions src/sinks/clickhouse/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::fmt;

use http::{Request, StatusCode, Uri};
use hyper::Body;
use vector_lib::codecs::encoding::ArrowStreamSerializerConfig;
use vector_lib::codecs::encoding::format::SchemaProvider;
use vector_lib::codecs::encoding::{ArrowStreamSerializerConfig, BatchSerializerConfig};

use super::{
request_builder::ClickhouseRequestBuilder,
Expand Down Expand Up @@ -45,6 +45,23 @@ pub enum Format {
ArrowStream,
}

/// Batch encoding configuration for the `clickhouse` sink.
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(tag = "codec", rename_all = "snake_case")]
#[configurable(metadata(
docs::enum_tag_description = "The codec to use for batch encoding events."
))]
pub enum ClickhouseBatchEncoding {
/// Encodes events in [Apache Arrow][apache_arrow] IPC streaming format.
///
/// This is the streaming variant of the Arrow IPC format, which writes
/// a continuous stream of record batches.
///
/// [apache_arrow]: https://arrow.apache.org/
ArrowStream(ArrowStreamSerializerConfig),
}

impl fmt::Display for Format {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Expand Down Expand Up @@ -104,11 +121,9 @@ pub struct ClickhouseConfig {
///
/// When specified, events are encoded together as a single batch.
/// This is mutually exclusive with per-event encoding based on the `format` field.
///
/// Only the `arrow_stream` codec is supported by the ClickHouse sink.
#[configurable(derived)]
#[serde(default)]
pub batch_encoding: Option<BatchSerializerConfig>,
pub batch_encoding: Option<ClickhouseBatchEncoding>,

#[configurable(derived)]
#[serde(default)]
Expand Down Expand Up @@ -282,6 +297,7 @@ impl ClickhouseConfig {

if let Some(batch_encoding) = &self.batch_encoding {
use vector_lib::codecs::BatchEncoder;
use vector_lib::codecs::encoding::BatchSerializerConfig;

// Validate that batch_encoding is only compatible with ArrowStream format
if self.format != Format::ArrowStream {
Expand All @@ -292,16 +308,8 @@ impl ClickhouseConfig {
.into());
}

let mut arrow_config = match batch_encoding {
BatchSerializerConfig::ArrowStream(config) => config.clone(),
#[cfg(feature = "codecs-parquet")]
BatchSerializerConfig::Parquet(_) => {
return Err(
"ClickHouse sink does not support Parquet batch encoding. Use 'arrow_stream' instead."
.into(),
);
}
};
let ClickhouseBatchEncoding::ArrowStream(arrow_config) = batch_encoding;
let mut arrow_config = arrow_config.clone();

self.resolve_arrow_schema(
client,
Expand Down Expand Up @@ -436,7 +444,7 @@ mod tests {
/// Helper to create a minimal ClickhouseConfig for testing
fn create_test_config(
format: Format,
batch_encoding: Option<BatchSerializerConfig>,
batch_encoding: Option<ClickhouseBatchEncoding>,
) -> ClickhouseConfig {
ClickhouseConfig {
endpoint: "http://localhost:8123".parse::<http::Uri>().unwrap().into(),
Expand Down Expand Up @@ -469,7 +477,7 @@ mod tests {
for (format, format_name) in incompatible_formats {
let config = create_test_config(
format,
Some(BatchSerializerConfig::ArrowStream(
Some(ClickhouseBatchEncoding::ArrowStream(
ArrowStreamSerializerConfig::default(),
)),
);
Expand Down
14 changes: 7 additions & 7 deletions src/sinks/clickhouse/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use serde::Deserialize;
use serde_json::Value;
use tokio::time::{Duration, timeout};
use vector_lib::{
codecs::encoding::{ArrowStreamSerializerConfig, BatchSerializerConfig},
codecs::encoding::ArrowStreamSerializerConfig,
event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event, LogEvent},
lookup::PathPrefix,
};
Expand All @@ -28,7 +28,7 @@ use crate::{
codecs::{TimestampFormat, Transformer},
config::{SinkConfig, SinkContext, log_schema},
sinks::{
clickhouse::config::ClickhouseConfig,
clickhouse::config::{ClickhouseBatchEncoding, ClickhouseConfig},
util::{BatchConfig, Compression, TowerRequestConfig},
},
test_util::{
Expand Down Expand Up @@ -502,7 +502,7 @@ async fn insert_events_arrow_format() {
table: table.clone().try_into().unwrap(),
compression: Compression::None,
format: crate::sinks::clickhouse::config::Format::ArrowStream,
batch_encoding: Some(BatchSerializerConfig::ArrowStream(Default::default())),
batch_encoding: Some(ClickhouseBatchEncoding::ArrowStream(Default::default())),
batch,
request: TowerRequestConfig {
retry_attempts: 1,
Expand Down Expand Up @@ -574,7 +574,7 @@ async fn insert_events_arrow_with_schema_fetching() {
table: table.clone().try_into().unwrap(),
compression: Compression::None,
format: crate::sinks::clickhouse::config::Format::ArrowStream,
batch_encoding: Some(BatchSerializerConfig::ArrowStream(Default::default())),
batch_encoding: Some(ClickhouseBatchEncoding::ArrowStream(Default::default())),
batch,
request: TowerRequestConfig {
retry_attempts: 1,
Expand Down Expand Up @@ -657,7 +657,7 @@ async fn test_complex_types() {
table: table.clone().try_into().unwrap(),
compression: Compression::None,
format: crate::sinks::clickhouse::config::Format::ArrowStream,
batch_encoding: Some(BatchSerializerConfig::ArrowStream(arrow_config)),
batch_encoding: Some(ClickhouseBatchEncoding::ArrowStream(arrow_config)),
batch,
request: TowerRequestConfig {
retry_attempts: 1,
Expand Down Expand Up @@ -1231,7 +1231,7 @@ async fn test_missing_required_field_emits_null_constraint_error() {
table: table.clone().try_into().unwrap(),
compression: Compression::None,
format: crate::sinks::clickhouse::config::Format::ArrowStream,
batch_encoding: Some(BatchSerializerConfig::ArrowStream(Default::default())),
batch_encoding: Some(ClickhouseBatchEncoding::ArrowStream(Default::default())),
batch,
request: TowerRequestConfig {
retry_attempts: 1,
Expand Down Expand Up @@ -1323,7 +1323,7 @@ async fn arrow_schema_excludes_non_insertable_columns() {
table: table.clone().try_into().unwrap(),
compression: Compression::None,
format: crate::sinks::clickhouse::config::Format::ArrowStream,
batch_encoding: Some(BatchSerializerConfig::ArrowStream(
batch_encoding: Some(ClickhouseBatchEncoding::ArrowStream(
ArrowStreamSerializerConfig::default(),
)),
batch,
Expand Down
Loading
Loading