diff --git a/.github/workflows/parquet.yml b/.github/workflows/parquet.yml index c309a3fa6473..7a649e16b1ec 100644 --- a/.github/workflows/parquet.yml +++ b/.github/workflows/parquet.yml @@ -123,10 +123,12 @@ jobs: uses: ./.github/actions/setup-builder with: target: wasm32-unknown-unknown,wasm32-wasi + - name: Install clang # Needed for zlib compilation + run: apt-get update && apt-get install -y clang gcc-multilib - name: Build wasm32-unknown-unknown - run: cargo build -p parquet --no-default-features --features cli,snap,flate2,brotli --target wasm32-unknown-unknown + run: cargo build -p parquet --target wasm32-unknown-unknown - name: Build wasm32-wasi - run: cargo build -p parquet --no-default-features --features cli,snap,flate2,brotli --target wasm32-wasi + run: cargo build -p parquet --target wasm32-wasi pyspark-integration-test: name: PySpark Integration Test diff --git a/arrow-integration-test/src/lib.rs b/arrow-integration-test/src/lib.rs index 04bbcf3f6f23..07b69bffd07d 100644 --- a/arrow-integration-test/src/lib.rs +++ b/arrow-integration-test/src/lib.rs @@ -183,7 +183,8 @@ impl ArrowJson { return Ok(false); } } - _ => return Ok(false), + Some(Err(e)) => return Err(e), + None => return Ok(false), } } diff --git a/arrow-ipc/Cargo.toml b/arrow-ipc/Cargo.toml index a03f53d6641c..b5f66294a7c7 100644 --- a/arrow-ipc/Cargo.toml +++ b/arrow-ipc/Cargo.toml @@ -40,8 +40,12 @@ arrow-cast = { workspace = true } arrow-data = { workspace = true } arrow-schema = { workspace = true } flatbuffers = { version = "23.1.21", default-features = false } -lz4 = { version = "1.23", default-features = false, optional = true } +lz4_flex = { version = "0.11", default-features = false, features = ["std", "frame"], optional = true } zstd = { version = "0.12.0", default-features = false, optional = true } +[features] +default = [] +lz4 = ["lz4_flex"] + [dev-dependencies] tempfile = "3.3" diff --git a/arrow-ipc/src/compression.rs b/arrow-ipc/src/compression.rs index db05e9a6a6c6..fafc2c5c9b6d 100644 --- a/arrow-ipc/src/compression.rs +++ b/arrow-ipc/src/compression.rs @@ -103,13 +103,15 @@ impl CompressionCodec { } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA { // no compression input.slice(LENGTH_OF_PREFIX_DATA as usize) - } else { + } else if let Ok(decompressed_length) = usize::try_from(decompressed_length) { // decompress data using the codec - let mut uncompressed_buffer = - Vec::with_capacity(decompressed_length as usize); let input_data = &input[(LENGTH_OF_PREFIX_DATA as usize)..]; - self.decompress(input_data, &mut uncompressed_buffer)?; - Buffer::from(uncompressed_buffer) + self.decompress(input_data, decompressed_length as _)? + .into() + } else { + return Err(ArrowError::IpcError(format!( + "Invalid uncompressed length: {decompressed_length}" + ))); }; Ok(buffer) } @@ -128,21 +130,30 @@ impl CompressionCodec { fn decompress( &self, input: &[u8], - output: &mut Vec, - ) -> Result { - match self { - CompressionCodec::Lz4Frame => decompress_lz4(input, output), - CompressionCodec::Zstd => decompress_zstd(input, output), + decompressed_size: usize, + ) -> Result, ArrowError> { + let ret = match self { + CompressionCodec::Lz4Frame => decompress_lz4(input, decompressed_size)?, + CompressionCodec::Zstd => decompress_zstd(input, decompressed_size)?, + }; + if ret.len() != decompressed_size { + return Err(ArrowError::IpcError(format!( + "Expected compressed length of {decompressed_size} got {}", + ret.len() + ))); } + Ok(ret) } } #[cfg(feature = "lz4")] fn compress_lz4(input: &[u8], output: &mut Vec) -> Result<(), ArrowError> { use std::io::Write; - let mut encoder = lz4::EncoderBuilder::new().build(output)?; + let mut encoder = lz4_flex::frame::FrameEncoder::new(output); encoder.write_all(input)?; - encoder.finish().1?; + encoder + .finish() + .map_err(|e| ArrowError::ExternalError(Box::new(e)))?; Ok(()) } @@ -155,14 +166,19 @@ fn compress_lz4(_input: &[u8], _output: &mut Vec) -> Result<(), ArrowError> } #[cfg(feature = "lz4")] -fn decompress_lz4(input: &[u8], output: &mut Vec) -> Result { +fn decompress_lz4(input: &[u8], decompressed_size: usize) -> Result, ArrowError> { use std::io::Read; - Ok(lz4::Decoder::new(input)?.read_to_end(output)?) + let mut output = Vec::with_capacity(decompressed_size); + lz4_flex::frame::FrameDecoder::new(input).read_to_end(&mut output)?; + Ok(output) } #[cfg(not(feature = "lz4"))] #[allow(clippy::ptr_arg)] -fn decompress_lz4(_input: &[u8], _output: &mut Vec) -> Result { +fn decompress_lz4( + _input: &[u8], + _decompressed_size: usize, +) -> Result, ArrowError> { Err(ArrowError::InvalidArgumentError( "lz4 IPC decompression requires the lz4 feature".to_string(), )) @@ -186,14 +202,22 @@ fn compress_zstd(_input: &[u8], _output: &mut Vec) -> Result<(), ArrowError> } #[cfg(feature = "zstd")] -fn decompress_zstd(input: &[u8], output: &mut Vec) -> Result { +fn decompress_zstd( + input: &[u8], + decompressed_size: usize, +) -> Result, ArrowError> { use std::io::Read; - Ok(zstd::Decoder::new(input)?.read_to_end(output)?) + let mut output = Vec::with_capacity(decompressed_size); + zstd::Decoder::with_buffer(input)?.read_to_end(&mut output)?; + Ok(output) } #[cfg(not(feature = "zstd"))] #[allow(clippy::ptr_arg)] -fn decompress_zstd(_input: &[u8], _output: &mut Vec) -> Result { +fn decompress_zstd( + _input: &[u8], + _decompressed_size: usize, +) -> Result, ArrowError> { Err(ArrowError::InvalidArgumentError( "zstd IPC decompression requires the zstd feature".to_string(), )) @@ -216,28 +240,26 @@ mod tests { #[test] #[cfg(feature = "lz4")] fn test_lz4_compression() { - let input_bytes = "hello lz4".as_bytes(); + let input_bytes = b"hello lz4"; let codec = super::CompressionCodec::Lz4Frame; let mut output_bytes: Vec = Vec::new(); codec.compress(input_bytes, &mut output_bytes).unwrap(); - let mut result_output_bytes: Vec = Vec::new(); - codec - .decompress(output_bytes.as_slice(), &mut result_output_bytes) + let result = codec + .decompress(output_bytes.as_slice(), input_bytes.len()) .unwrap(); - assert_eq!(input_bytes, result_output_bytes.as_slice()); + assert_eq!(input_bytes, result.as_slice()); } #[test] #[cfg(feature = "zstd")] fn test_zstd_compression() { - let input_bytes = "hello zstd".as_bytes(); + let input_bytes = b"hello zstd"; let codec = super::CompressionCodec::Zstd; let mut output_bytes: Vec = Vec::new(); codec.compress(input_bytes, &mut output_bytes).unwrap(); - let mut result_output_bytes: Vec = Vec::new(); - codec - .decompress(output_bytes.as_slice(), &mut result_output_bytes) + let result = codec + .decompress(output_bytes.as_slice(), input_bytes.len()) .unwrap(); - assert_eq!(input_bytes, result_output_bytes.as_slice()); + assert_eq!(input_bytes, result.as_slice()); } } diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 7c346248acbb..c710c83213b9 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -51,7 +51,7 @@ thrift = { version = "0.17", default-features = false } snap = { version = "1.0", default-features = false, optional = true } brotli = { version = "3.3", default-features = false, features = ["std"], optional = true } flate2 = { version = "1.0", default-features = false, features = ["rust_backend"], optional = true } -lz4 = { version = "1.23", default-features = false, optional = true } +lz4_flex = { version = "0.11", default-features = false, features = ["std", "frame"], optional = true } zstd = { version = "0.12.0", optional = true, default-features = false } chrono = { workspace = true } num = { version = "0.4", default-features = false } @@ -74,7 +74,7 @@ snap = { version = "1.0", default-features = false } tempfile = { version = "3.0", default-features = false } brotli = { version = "3.3", default-features = false, features = ["std"] } flate2 = { version = "1.0", default-features = false, features = ["rust_backend"] } -lz4 = { version = "1.23", default-features = false } +lz4_flex = { version = "0.11", default-features = false, features = ["std", "frame"] } zstd = { version = "0.12", default-features = false } serde_json = { version = "1.0", features = ["std"], default-features = false } arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint", "json"] } @@ -86,6 +86,8 @@ all-features = true [features] default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"] +# Enable lz4 +lz4 = ["lz4_flex"] # Enable arrow reader/writer APIs arrow = ["base64", "arrow-array", "arrow-buffer", "arrow-cast", "arrow-data", "arrow-schema", "arrow-select", "arrow-ipc"] # Enable CLI tools @@ -166,5 +168,10 @@ name = "arrow_reader" required-features = ["arrow", "test_common", "experimental"] harness = false +[[bench]] +name = "compression" +required-features = ["experimental", "default"] +harness = false + [lib] bench = false diff --git a/parquet/benches/compression.rs b/parquet/benches/compression.rs new file mode 100644 index 000000000000..ce4f9aead751 --- /dev/null +++ b/parquet/benches/compression.rs @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use criterion::*; +use parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel}; +use parquet::compression::create_codec; +use rand::distributions::Alphanumeric; +use rand::prelude::*; + +fn do_bench(c: &mut Criterion, name: &str, uncompressed: &[u8]) { + let codecs = [ + Compression::BROTLI(BrotliLevel::default()), + Compression::GZIP(GzipLevel::default()), + Compression::LZ4, + Compression::LZ4_RAW, + Compression::SNAPPY, + Compression::GZIP(GzipLevel::default()), + Compression::ZSTD(ZstdLevel::default()), + ]; + + for compression in codecs { + let mut codec = create_codec(compression, &Default::default()) + .unwrap() + .unwrap(); + + c.bench_function(&format!("compress {compression} - {name}"), |b| { + b.iter(|| { + let mut out = Vec::new(); + codec.compress(uncompressed, &mut out).unwrap(); + out + }); + }); + + let mut compressed = Vec::new(); + codec.compress(uncompressed, &mut compressed).unwrap(); + println!( + "{compression} compressed {} bytes of {name} to {} bytes", + uncompressed.len(), + compressed.len() + ); + + c.bench_function(&format!("decompress {compression} - {name}"), |b| { + b.iter(|| { + let mut out = Vec::new(); + codec + .decompress( + black_box(&compressed), + &mut out, + Some(uncompressed.len()), + ) + .unwrap(); + out + }); + }); + } +} + +fn criterion_benchmark(c: &mut Criterion) { + let mut rng = StdRng::seed_from_u64(42); + let rng = &mut rng; + const DATA_SIZE: usize = 1024 * 1024; + + let uncompressed: Vec<_> = rng.sample_iter(&Alphanumeric).take(DATA_SIZE).collect(); + do_bench(c, "alphanumeric", &uncompressed); + + // Create a collection of 64 words + let words: Vec> = (0..64) + .map(|_| { + let len = rng.gen_range(1..12); + rng.sample_iter(&Alphanumeric).take(len).collect() + }) + .collect(); + + // Build data by concatenating these words randomly together + let mut uncompressed = Vec::with_capacity(DATA_SIZE); + while uncompressed.len() < DATA_SIZE { + let word = &words[rng.gen_range(0..words.len())]; + uncompressed + .extend_from_slice(&word[..word.len().min(DATA_SIZE - uncompressed.len())]) + } + assert_eq!(uncompressed.len(), DATA_SIZE); + + do_bench(c, "words", &uncompressed); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/parquet/src/bin/parquet-fromcsv.rs b/parquet/src/bin/parquet-fromcsv.rs index 1ff6fecf5a81..548bbdbfb8f1 100644 --- a/parquet/src/bin/parquet-fromcsv.rs +++ b/parquet/src/bin/parquet-fromcsv.rs @@ -386,9 +386,9 @@ fn convert_csv_to_parquet(args: &Args) -> Result<(), ParquetFromCsvError> { Compression::BROTLI(_) => { Box::new(brotli::Decompressor::new(input_file, 0)) as Box } - Compression::LZ4 => Box::new(lz4::Decoder::new(input_file).map_err(|e| { - ParquetFromCsvError::with_context(e, "Failed to create lz4::Decoder") - })?) as Box, + Compression::LZ4 => { + Box::new(lz4_flex::frame::FrameDecoder::new(input_file)) as Box + } Compression::ZSTD(_) => Box::new(zstd::Decoder::new(input_file).map_err(|e| { ParquetFromCsvError::with_context(e, "Failed to create zstd::Decoder") })?) as Box, @@ -692,19 +692,9 @@ mod tests { encoder.into_inner() } Compression::LZ4 => { - let mut encoder = lz4::EncoderBuilder::new() - .build(input_file) - .map_err(|e| { - ParquetFromCsvError::with_context( - e, - "Failed to create lz4::Encoder", - ) - }) - .unwrap(); + let mut encoder = lz4_flex::frame::FrameEncoder::new(input_file); write_tmp_file(&mut encoder); - let (inner, err) = encoder.finish(); - err.unwrap(); - inner + encoder.finish().unwrap() } Compression::ZSTD(level) => { diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs index f1831ed48444..9e0eee0e3e04 100644 --- a/parquet/src/compression.rs +++ b/parquet/src/compression.rs @@ -388,7 +388,7 @@ mod lz4_codec { use std::io::{Read, Write}; use crate::compression::Codec; - use crate::errors::Result; + use crate::errors::{ParquetError, Result}; const LZ4_BUFFER_SIZE: usize = 4096; @@ -409,7 +409,7 @@ mod lz4_codec { output_buf: &mut Vec, _uncompress_size: Option, ) -> Result { - let mut decoder = lz4::Decoder::new(input_buf)?; + let mut decoder = lz4_flex::frame::FrameDecoder::new(input_buf); let mut buffer: [u8; LZ4_BUFFER_SIZE] = [0; LZ4_BUFFER_SIZE]; let mut total_len = 0; loop { @@ -424,7 +424,7 @@ mod lz4_codec { } fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { - let mut encoder = lz4::EncoderBuilder::new().build(output_buf)?; + let mut encoder = lz4_flex::frame::FrameEncoder::new(output_buf); let mut from = 0; loop { let to = std::cmp::min(from + LZ4_BUFFER_SIZE, input_buf.len()); @@ -434,7 +434,10 @@ mod lz4_codec { break; } } - encoder.finish().1.map_err(|e| e.into()) + match encoder.finish() { + Ok(_) => Ok(()), + Err(e) => Err(ParquetError::External(Box::new(e))), + } } } } @@ -551,11 +554,7 @@ mod lz4_raw_codec { } }; output_buf.resize(offset + required_len, 0); - match lz4::block::decompress_to_buffer( - input_buf, - Some(required_len.try_into().unwrap()), - &mut output_buf[offset..], - ) { + match lz4_flex::block::decompress_into(input_buf, &mut output_buf[offset..]) { Ok(n) => { if n != required_len { return Err(ParquetError::General( @@ -564,25 +563,20 @@ mod lz4_raw_codec { } Ok(n) } - Err(e) => Err(e.into()), + Err(e) => Err(ParquetError::External(Box::new(e))), } } fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { let offset = output_buf.len(); - let required_len = lz4::block::compress_bound(input_buf.len())?; + let required_len = lz4_flex::block::get_maximum_output_size(input_buf.len()); output_buf.resize(offset + required_len, 0); - match lz4::block::compress_to_buffer( - input_buf, - None, - false, - &mut output_buf[offset..], - ) { + match lz4_flex::block::compress_into(input_buf, &mut output_buf[offset..]) { Ok(n) => { output_buf.truncate(offset + n); Ok(()) } - Err(e) => Err(e.into()), + Err(e) => Err(ParquetError::External(Box::new(e))), } } } @@ -666,11 +660,11 @@ mod lz4_hadoop_codec { "Not enough bytes to hold advertised output", )); } - let decompressed_size = lz4::block::decompress_to_buffer( + let decompressed_size = lz4_flex::decompress_into( &input[..expected_compressed_size as usize], - Some(output_len as i32), output, - )?; + ) + .map_err(|e| ParquetError::External(Box::new(e)))?; if decompressed_size != expected_decompressed_size as usize { return Err(io::Error::new( io::ErrorKind::Other,