Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion arrow-avro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ all-features = true
default = ["deflate", "snappy", "zstd", "bzip2", "xz"]
deflate = ["flate2"]
snappy = ["snap", "crc"]
snap = ["dep:snipsnap"]
canonical_extension_types = ["arrow-schema/canonical_extension_types"]
md5 = ["dep:md5"]
sha256 = ["dep:sha2"]
Expand All @@ -65,7 +66,7 @@ flate2 = { version = "1.0", default-features = false, features = [
"rust_backend",
], optional = true }
futures = { version = "0.3", default-features = false, features = ["std"], optional = true }
snap = { version = "1.0", default-features = false, optional = true }
snipsnap = { version = "0.0.2", default-features = false, optional = true }
zstd = { version = "0.13", default-features = false, optional = true }
bzip2 = { version = "0.6.0", optional = true }
xz = { package = "liblzma", version = "0.4", default-features = false, optional = true }
Expand Down
4 changes: 2 additions & 2 deletions arrow-avro/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ async fn main() -> anyhow::Result<()> {
| Feature | Default | What it enables | When to use |
|-----------|--------:|---------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------|
| `deflate` | ✅ | DEFLATE compression via `flate2` (pure‑Rust backend) | Most compatible; widely supported; good compression, slower than Snappy. |
| `snappy` | ✅ | Snappy block compression via `snap` with CRC‑32 as required by Avro | Fastest decode/encode; common in streaming/data‑lake pipelines. (Avro requires a 4‑byte big‑endian CRC of the **uncompressed** block.) |
| `snappy` | ✅ | Snappy block compression via `snipsnap` with CRC‑32 as required by Avro | Fastest decode/encode; common in streaming/data‑lake pipelines. (Avro requires a 4‑byte big‑endian CRC of the **uncompressed** block.) |
| `zstd` | ✅ | Zstandard block compression via `zstd` | Great compression/speed trade‑off on modern systems. May pull in a native library. |
| `bzip2` | ✅ | BZip2 block compression | For compatibility with older datasets that used BZip2. Slower; larger deps. |
| `xz` | ✅ | XZ/LZMA block compression | Highest compression for archival data; slowest; larger deps. |
Expand Down Expand Up @@ -179,7 +179,7 @@ async fn main() -> anyhow::Result<()> {

**Lower‑level/internal toggles (rarely used directly)**

* `flate2`, `snap`, `crc`, `zstd`, `bzip2`, `xz` are optional **dependencies** wired to the user‑facing features above. You normally enable `deflate`/`snappy`/`zstd`/`bzip2`/`xz`, not these directly.
* `flate2`, `snipsnap`, `crc`, `zstd`, `bzip2`, `xz` are optional **dependencies** wired to the user‑facing features above. You normally enable `deflate`/`snappy`/`zstd`/`bzip2`/`xz`, not these directly.

### Feature snippets

Expand Down
4 changes: 2 additions & 2 deletions arrow-avro/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl CompressionCodec {
let crc = &block[block.len() - 4..];
let block = &block[..block.len() - 4];

let mut decoder = snap::raw::Decoder::new();
let mut decoder = snipsnap::raw::Decoder::new();
let decoded = decoder
.decompress_vec(block)
.map_err(|e| AvroError::External(Box::new(e)))?;
Expand Down Expand Up @@ -144,7 +144,7 @@ impl CompressionCodec {

#[cfg(feature = "snappy")]
CompressionCodec::Snappy => {
let mut encoder = snap::raw::Encoder::new();
let mut encoder = snipsnap::raw::Encoder::new();
// Allocate and compress in one step for efficiency
let mut compressed = encoder
.compress_vec(data)
Expand Down
6 changes: 4 additions & 2 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ parquet-variant-compute = { workspace = true, optional = true }
object_store = { workspace = true, optional = true, features = ["tokio"] }

bytes = { version = "1.1", default-features = false, features = ["std"] }
snap = { version = "1.0", default-features = false, optional = true }
snipsnap = { version = "0.0.2", default-features = false, optional = true }
brotli = { version = "8.0", default-features = false, features = ["std"], optional = true }
# To use `flate2` you must enable either the `flate2-zlib-rs` or `flate2-rust_backened` backends
flate2 = { version = "1.1", default-features = false, optional = true }
Expand Down Expand Up @@ -80,7 +80,7 @@ ring = { version = "0.17", default-features = false, features = ["std"], optiona
[dev-dependencies]
base64 = { version = "0.22", default-features = false, features = ["std"] }
criterion = { workspace = true, default-features = false, features = ["async_futures"] }
snap = { version = "1.0", default-features = false }
snipsnap = { version = "0.0.2", default-features = false }
tempfile = { version = "3.0", default-features = false }
insta = { workspace = true, default-features = true }
brotli = { version = "8.0", default-features = false, features = ["std"] }
Expand All @@ -100,6 +100,8 @@ all-features = true

[features]
default = ["arrow", "snap", "brotli", "flate2-zlib-rs", "lz4", "zstd", "base64", "simdutf8"]
# Enable Snappy
snap = ["dep:snipsnap"]
# Enable lz4
lz4 = ["lz4_flex"]
# Enable arrow reader/writer APIs
Expand Down
39 changes: 20 additions & 19 deletions parquet/src/bin/parquet-fromcsv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,25 +371,26 @@ fn convert_csv_to_parquet(args: &Args) -> Result<(), ParquetFromCsvError> {
})?;

// open input file decoder
let input_file_decoder = match args.csv_compression {
Compression::UNCOMPRESSED => Box::new(input_file) as Box<dyn Read>,
Compression::SNAPPY => Box::new(snap::read::FrameDecoder::new(input_file)) as Box<dyn Read>,
Compression::GZIP(_) => {
Box::new(flate2::read::MultiGzDecoder::new(input_file)) as Box<dyn Read>
}
Compression::BROTLI(_) => {
Box::new(brotli::Decompressor::new(input_file, 0)) as Box<dyn Read>
}
Compression::LZ4 => {
Box::new(lz4_flex::frame::FrameDecoder::new(input_file)) as Box<dyn Read>
}
Compression::ZSTD(_) => {
Box::new(zstd::Decoder::new(input_file).map_err(|e| {
let input_file_decoder =
match args.csv_compression {
Compression::UNCOMPRESSED => Box::new(input_file) as Box<dyn Read>,
Compression::SNAPPY => {
Box::new(snipsnap::read::FrameDecoder::new(input_file)) as Box<dyn Read>
}
Compression::GZIP(_) => {
Box::new(flate2::read::MultiGzDecoder::new(input_file)) as Box<dyn Read>
}
Compression::BROTLI(_) => {
Box::new(brotli::Decompressor::new(input_file, 0)) as Box<dyn Read>
}
Compression::LZ4 => {
Box::new(lz4_flex::frame::FrameDecoder::new(input_file)) as Box<dyn Read>
}
Compression::ZSTD(_) => Box::new(zstd::Decoder::new(input_file).map_err(|e| {
ParquetFromCsvError::with_context(e, "Failed to create zstd::Decoder")
})?) as Box<dyn Read>
}
d => unimplemented!("compression type {d}"),
};
})?) as Box<dyn Read>,
d => unimplemented!("compression type {d}"),
};

// create input csv reader
let builder = configure_reader_builder(args, arrow_schema);
Expand Down Expand Up @@ -427,7 +428,7 @@ mod tests {
use flate2::write::GzEncoder;
use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel};
use parquet::file::reader::{FileReader, SerializedFileReader};
use snap::write::FrameEncoder;
use snipsnap::write::FrameEncoder;
use tempfile::NamedTempFile;

#[test]
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2751,8 +2751,8 @@ mod tests {
}
}

// Default threshold keeps the compressed buffer for constant data.
assert!(write_v2_page(1.0));
// A permissive threshold keeps the compressed buffer.
assert!(write_v2_page(2.0));
// A strict threshold (require >1000x reduction) discards it.
assert!(!write_v2_page(0.001));
}
Expand Down
22 changes: 14 additions & 8 deletions parquet/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,22 +198,21 @@ pub fn create_codec(codec: CodecType, _options: &CodecOptions) -> Result<Option<

#[cfg(any(feature = "snap", test))]
mod snappy_codec {
use snap::raw::{Decoder, Encoder, decompress_len, max_compress_len};
use snipsnap::raw::{Encoder, max_compress_len};
use snipsnap::{decompress_into_uninit, decompress_len};

use crate::compression::Codec;
use crate::errors::Result;
use crate::errors::{ParquetError, Result};

/// Codec for Snappy compression format.
pub struct SnappyCodec {
decoder: Decoder,
encoder: Encoder,
}

impl SnappyCodec {
/// Creates new Snappy compression codec.
pub(crate) fn new() -> Self {
Self {
decoder: Decoder::new(),
encoder: Encoder::new(),
}
}
Expand All @@ -231,10 +230,17 @@ mod snappy_codec {
None => decompress_len(input_buf)?,
};
let offset = output_buf.len();
output_buf.resize(offset + len, 0);
self.decoder
.decompress(input_buf, &mut output_buf[offset..])
.map_err(|e| e.into())
output_buf
.try_reserve(len)
.map_err(|e| ParquetError::External(Box::new(e)))?;
let decoded_len = {
let spare = &mut output_buf.spare_capacity_mut()[..len];
decompress_into_uninit(input_buf, spare)?.len()
};
unsafe {
output_buf.set_len(offset + decoded_len);
}
Ok(decoded_len)
}

fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ impl From<io::Error> for ParquetError {
}

#[cfg(any(feature = "snap", test))]
impl From<snap::Error> for ParquetError {
fn from(e: snap::Error) -> ParquetError {
impl From<snipsnap::Error> for ParquetError {
fn from(e: snipsnap::Error) -> ParquetError {
ParquetError::External(Box::new(e))
}
}
Expand Down
Loading