diff --git a/Cargo.lock b/Cargo.lock index af1c980d45da..359d12a4eb8b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -256,7 +256,7 @@ dependencies = [ "serde", "serde_json", "sha2", - "snap", + "snipsnap", "strum_macros 0.28.0", "tempfile", "tokio", @@ -2354,7 +2354,7 @@ dependencies = [ "serde", "serde_json", "simdutf8", - "snap", + "snipsnap", "sysinfo", "tempfile", "tokio", @@ -3212,10 +3212,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" [[package]] -name = "snap" -version = "1.1.1" +name = "snipsnap" +version = "0.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" +checksum = "777ffec062805c2b852a96078278359580ee45e7aeff9a89dce3ccf2857fb32e" [[package]] name = "socket2" diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml index f46ef7e7b999..367924b5dfa9 100644 --- a/arrow-avro/Cargo.toml +++ b/arrow-avro/Cargo.toml @@ -39,6 +39,7 @@ all-features = true default = ["deflate", "snappy", "zstd", "bzip2", "xz"] deflate = ["flate2"] snappy = ["snap", "crc"] +snap = ["dep:snipsnap"] canonical_extension_types = ["arrow-schema/canonical_extension_types"] md5 = ["dep:md5"] sha256 = ["dep:sha2"] @@ -65,7 +66,7 @@ flate2 = { version = "1.0", default-features = false, features = [ "rust_backend", ], optional = true } futures = { version = "0.3", default-features = false, features = ["std"], optional = true } -snap = { version = "1.0", default-features = false, optional = true } +snipsnap = { version = "0.0.2", default-features = false, optional = true } zstd = { version = "0.13", default-features = false, optional = true } bzip2 = { version = "0.6.0", optional = true } xz = { package = "liblzma", version = "0.4", default-features = false, optional = true } diff --git a/arrow-avro/README.md b/arrow-avro/README.md index dbc1e1760ea3..037b356eac49 100644 --- a/arrow-avro/README.md +++ b/arrow-avro/README.md @@ -146,7 +146,7 @@ async fn main() -> anyhow::Result<()> { | Feature | Default | What it enables | When to use | |-----------|--------:|---------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------| | `deflate` | ✅ | DEFLATE compression via `flate2` (pure‑Rust backend) | Most compatible; widely supported; good compression, slower than Snappy. | -| `snappy` | ✅ | Snappy block compression via `snap` with CRC‑32 as required by Avro | Fastest decode/encode; common in streaming/data‑lake pipelines. (Avro requires a 4‑byte big‑endian CRC of the **uncompressed** block.) | +| `snappy` | ✅ | Snappy block compression via `snipsnap` with CRC‑32 as required by Avro | Fastest decode/encode; common in streaming/data‑lake pipelines. (Avro requires a 4‑byte big‑endian CRC of the **uncompressed** block.) | | `zstd` | ✅ | Zstandard block compression via `zstd` | Great compression/speed trade‑off on modern systems. May pull in a native library. | | `bzip2` | ✅ | BZip2 block compression | For compatibility with older datasets that used BZip2. Slower; larger deps. | | `xz` | ✅ | XZ/LZMA block compression | Highest compression for archival data; slowest; larger deps. | @@ -179,7 +179,7 @@ async fn main() -> anyhow::Result<()> { **Lower‑level/internal toggles (rarely used directly)** -* `flate2`, `snap`, `crc`, `zstd`, `bzip2`, `xz` are optional **dependencies** wired to the user‑facing features above. You normally enable `deflate`/`snappy`/`zstd`/`bzip2`/`xz`, not these directly. +* `flate2`, `snipsnap`, `crc`, `zstd`, `bzip2`, `xz` are optional **dependencies** wired to the user‑facing features above. You normally enable `deflate`/`snappy`/`zstd`/`bzip2`/`xz`, not these directly. ### Feature snippets diff --git a/arrow-avro/src/compression.rs b/arrow-avro/src/compression.rs index 7c6a62564afc..ffa6100a2d57 100644 --- a/arrow-avro/src/compression.rs +++ b/arrow-avro/src/compression.rs @@ -68,7 +68,7 @@ impl CompressionCodec { let crc = &block[block.len() - 4..]; let block = &block[..block.len() - 4]; - let mut decoder = snap::raw::Decoder::new(); + let mut decoder = snipsnap::raw::Decoder::new(); let decoded = decoder .decompress_vec(block) .map_err(|e| AvroError::External(Box::new(e)))?; @@ -144,7 +144,7 @@ impl CompressionCodec { #[cfg(feature = "snappy")] CompressionCodec::Snappy => { - let mut encoder = snap::raw::Encoder::new(); + let mut encoder = snipsnap::raw::Encoder::new(); // Allocate and compress in one step for efficiency let mut compressed = encoder .compress_vec(data) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index dd2c872ede50..94735e87b381 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -52,7 +52,7 @@ parquet-variant-compute = { workspace = true, optional = true } object_store = { workspace = true, optional = true, features = ["tokio"] } bytes = { version = "1.1", default-features = false, features = ["std"] } -snap = { version = "1.0", default-features = false, optional = true } +snipsnap = { version = "0.0.2", default-features = false, optional = true } brotli = { version = "8.0", default-features = false, features = ["std"], optional = true } # To use `flate2` you must enable either the `flate2-zlib-rs` or `flate2-rust_backened` backends flate2 = { version = "1.1", default-features = false, optional = true } @@ -80,7 +80,7 @@ ring = { version = "0.17", default-features = false, features = ["std"], optiona [dev-dependencies] base64 = { version = "0.22", default-features = false, features = ["std"] } criterion = { workspace = true, default-features = false, features = ["async_futures"] } -snap = { version = "1.0", default-features = false } +snipsnap = { version = "0.0.2", default-features = false } tempfile = { version = "3.0", default-features = false } insta = { workspace = true, default-features = true } brotli = { version = "8.0", default-features = false, features = ["std"] } @@ -100,6 +100,8 @@ all-features = true [features] default = ["arrow", "snap", "brotli", "flate2-zlib-rs", "lz4", "zstd", "base64", "simdutf8"] +# Enable Snappy +snap = ["dep:snipsnap"] # Enable lz4 lz4 = ["lz4_flex"] # Enable arrow reader/writer APIs diff --git a/parquet/src/bin/parquet-fromcsv.rs b/parquet/src/bin/parquet-fromcsv.rs index 4a59385a4479..0c6b9bd019ee 100644 --- a/parquet/src/bin/parquet-fromcsv.rs +++ b/parquet/src/bin/parquet-fromcsv.rs @@ -371,25 +371,26 @@ fn convert_csv_to_parquet(args: &Args) -> Result<(), ParquetFromCsvError> { })?; // open input file decoder - let input_file_decoder = match args.csv_compression { - Compression::UNCOMPRESSED => Box::new(input_file) as Box, - Compression::SNAPPY => Box::new(snap::read::FrameDecoder::new(input_file)) as Box, - Compression::GZIP(_) => { - Box::new(flate2::read::MultiGzDecoder::new(input_file)) as Box - } - Compression::BROTLI(_) => { - Box::new(brotli::Decompressor::new(input_file, 0)) as Box - } - Compression::LZ4 => { - Box::new(lz4_flex::frame::FrameDecoder::new(input_file)) as Box - } - Compression::ZSTD(_) => { - Box::new(zstd::Decoder::new(input_file).map_err(|e| { + let input_file_decoder = + match args.csv_compression { + Compression::UNCOMPRESSED => Box::new(input_file) as Box, + Compression::SNAPPY => { + Box::new(snipsnap::read::FrameDecoder::new(input_file)) as Box + } + Compression::GZIP(_) => { + Box::new(flate2::read::MultiGzDecoder::new(input_file)) as Box + } + Compression::BROTLI(_) => { + Box::new(brotli::Decompressor::new(input_file, 0)) as Box + } + Compression::LZ4 => { + Box::new(lz4_flex::frame::FrameDecoder::new(input_file)) as Box + } + Compression::ZSTD(_) => Box::new(zstd::Decoder::new(input_file).map_err(|e| { ParquetFromCsvError::with_context(e, "Failed to create zstd::Decoder") - })?) as Box - } - d => unimplemented!("compression type {d}"), - }; + })?) as Box, + d => unimplemented!("compression type {d}"), + }; // create input csv reader let builder = configure_reader_builder(args, arrow_schema); @@ -427,7 +428,7 @@ mod tests { use flate2::write::GzEncoder; use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel}; use parquet::file::reader::{FileReader, SerializedFileReader}; - use snap::write::FrameEncoder; + use snipsnap::write::FrameEncoder; use tempfile::NamedTempFile; #[test] diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index aa9cef16c5ad..2363d1a48011 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -2751,8 +2751,8 @@ mod tests { } } - // Default threshold keeps the compressed buffer for constant data. - assert!(write_v2_page(1.0)); + // A permissive threshold keeps the compressed buffer. + assert!(write_v2_page(2.0)); // A strict threshold (require >1000x reduction) discards it. assert!(!write_v2_page(0.001)); } diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs index fe2fb59c5b8c..5601be8cfb9a 100644 --- a/parquet/src/compression.rs +++ b/parquet/src/compression.rs @@ -198,14 +198,14 @@ pub fn create_codec(codec: CodecType, _options: &CodecOptions) -> Result Self { Self { - decoder: Decoder::new(), encoder: Encoder::new(), } } @@ -231,10 +230,17 @@ mod snappy_codec { None => decompress_len(input_buf)?, }; let offset = output_buf.len(); - output_buf.resize(offset + len, 0); - self.decoder - .decompress(input_buf, &mut output_buf[offset..]) - .map_err(|e| e.into()) + output_buf + .try_reserve(len) + .map_err(|e| ParquetError::External(Box::new(e)))?; + let decoded_len = { + let spare = &mut output_buf.spare_capacity_mut()[..len]; + decompress_into_uninit(input_buf, spare)?.len() + }; + unsafe { + output_buf.set_len(offset + decoded_len); + } + Ok(decoded_len) } fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { diff --git a/parquet/src/errors.rs b/parquet/src/errors.rs index 73b4d76de578..fc996b424c9a 100644 --- a/parquet/src/errors.rs +++ b/parquet/src/errors.rs @@ -102,8 +102,8 @@ impl From for ParquetError { } #[cfg(any(feature = "snap", test))] -impl From for ParquetError { - fn from(e: snap::Error) -> ParquetError { +impl From for ParquetError { + fn from(e: snipsnap::Error) -> ParquetError { ParquetError::External(Box::new(e)) } }