Skip to content

Commit 202e9a1

Browse files
tustvoldcrepererum
andauthored
Avro block decompression (#5306)
* Avro block decompression * Clippy * Update arrow-avro/src/reader/mod.rs Co-authored-by: Marco Neumann <[email protected]> --------- Co-authored-by: Marco Neumann <[email protected]>
1 parent ba6dbb8 commit 202e9a1

File tree

5 files changed

+119
-14
lines changed

5 files changed

+119
-14
lines changed

arrow-avro/Cargo.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ name = "arrow_avro"
3333
path = "src/lib.rs"
3434
bench = false
3535

36+
[features]
37+
default = ["deflate", "snappy", "zstd"]
38+
deflate = ["flate2"]
39+
snappy = ["snap", "crc"]
40+
3641
[dependencies]
3742
arrow-array = { workspace = true }
3843
arrow-buffer = { workspace = true }
@@ -41,6 +46,11 @@ arrow-data = { workspace = true }
4146
arrow-schema = { workspace = true }
4247
serde_json = { version = "1.0", default-features = false, features = ["std"] }
4348
serde = { version = "1.0.188", features = ["derive"] }
49+
flate2 = { version = "1.0", default-features = false, features = ["rust_backend"], optional = true }
50+
snap = { version = "1.0", default-features = false, optional = true }
51+
zstd = { version = "0.13", default-features = false, optional = true }
52+
crc = { version = "3.0", optional = true }
53+
4454

4555
[dev-dependencies]
4656

arrow-avro/src/compression.rs

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,69 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use serde::{Deserialize, Serialize};
18+
use arrow_schema::ArrowError;
19+
use flate2::read;
20+
use std::io;
21+
use std::io::Read;
1922

2023
/// The metadata key used for storing the JSON encoded [`CompressionCodec`]
2124
pub const CODEC_METADATA_KEY: &str = "avro.codec";
2225

23-
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
24-
#[serde(rename_all = "lowercase")]
26+
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
2527
pub enum CompressionCodec {
26-
Null,
2728
Deflate,
28-
BZip2,
2929
Snappy,
30-
XZ,
3130
ZStandard,
3231
}
32+
33+
impl CompressionCodec {
34+
pub(crate) fn decompress(&self, block: &[u8]) -> Result<Vec<u8>, ArrowError> {
35+
match self {
36+
#[cfg(feature = "deflate")]
37+
CompressionCodec::Deflate => {
38+
let mut decoder = read::DeflateDecoder::new(block);
39+
let mut out = Vec::new();
40+
decoder.read_to_end(&mut out)?;
41+
Ok(out)
42+
}
43+
#[cfg(not(feature = "deflate"))]
44+
CompressionCodec::Deflate => Err(ArrowError::ParseError(
45+
"Deflate codec requires deflate feature".to_string(),
46+
)),
47+
#[cfg(feature = "snappy")]
48+
CompressionCodec::Snappy => {
49+
// Each compressed block is followed by the 4-byte, big-endian CRC32
50+
// checksum of the uncompressed data in the block.
51+
let crc = &block[block.len() - 4..];
52+
let block = &block[..block.len() - 4];
53+
54+
let mut decoder = snap::raw::Decoder::new();
55+
let decoded = decoder
56+
.decompress_vec(block)
57+
.map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
58+
59+
let checksum = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC).checksum(&decoded);
60+
if checksum != u32::from_be_bytes(crc.try_into().unwrap()) {
61+
return Err(ArrowError::ParseError("Snappy CRC mismatch".to_string()));
62+
}
63+
Ok(decoded)
64+
}
65+
#[cfg(not(feature = "snappy"))]
66+
CompressionCodec::Snappy => Err(ArrowError::ParseError(
67+
"Snappy codec requires snappy feature".to_string(),
68+
)),
69+
70+
#[cfg(feature = "zstd")]
71+
CompressionCodec::ZStandard => {
72+
let mut decoder = zstd::Decoder::new(block)?;
73+
let mut out = Vec::new();
74+
decoder.read_to_end(&mut out)?;
75+
Ok(out)
76+
}
77+
#[cfg(not(feature = "zstd"))]
78+
CompressionCodec::ZStandard => Err(ArrowError::ParseError(
79+
"ZStandard codec requires zstd feature".to_string(),
80+
)),
81+
}
82+
}
83+
}

arrow-avro/src/reader/header.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
//! Decoder for [`Header`]
1919
20+
use crate::compression::{CompressionCodec, CODEC_METADATA_KEY};
2021
use crate::reader::vlq::VLQDecoder;
2122
use crate::schema::Schema;
2223
use arrow_schema::ArrowError;
@@ -55,7 +56,7 @@ impl Header {
5556
/// Returns an iterator over the meta keys in this header
5657
pub fn metadata(&self) -> impl Iterator<Item = (&[u8], &[u8])> {
5758
let mut last = 0;
58-
self.meta_offsets.windows(2).map(move |w| {
59+
self.meta_offsets.chunks_exact(2).map(move |w| {
5960
let start = last;
6061
last = w[1];
6162
(&self.meta_buf[start..w[0]], &self.meta_buf[w[0]..w[1]])
@@ -72,6 +73,22 @@ impl Header {
7273
pub fn sync(&self) -> [u8; 16] {
7374
self.sync
7475
}
76+
77+
/// Returns the [`CompressionCodec`] if any
78+
pub fn compression(&self) -> Result<Option<CompressionCodec>, ArrowError> {
79+
let v = self.get(CODEC_METADATA_KEY);
80+
81+
match v {
82+
None | Some(b"null") => Ok(None),
83+
Some(b"deflate") => Ok(Some(CompressionCodec::Deflate)),
84+
Some(b"snappy") => Ok(Some(CompressionCodec::Snappy)),
85+
Some(b"zstandard") => Ok(Some(CompressionCodec::ZStandard)),
86+
Some(v) => Err(ArrowError::ParseError(format!(
87+
"Unrecognized compression codec \'{}\'",
88+
String::from_utf8_lossy(v)
89+
))),
90+
}
91+
}
7592
}
7693

7794
/// A decoder for [`Header`]
@@ -305,6 +322,17 @@ mod test {
305322
);
306323

307324
let header = decode_file(&arrow_test_data("avro/fixed_length_decimal.avro"));
325+
326+
let meta: Vec<_> = header
327+
.metadata()
328+
.map(|(k, _)| std::str::from_utf8(k).unwrap())
329+
.collect();
330+
331+
assert_eq!(
332+
meta,
333+
&["avro.schema", "org.apache.spark.version", "avro.codec"]
334+
);
335+
308336
let schema_json = header.get(SCHEMA_METADATA_KEY).unwrap();
309337
let expected = br#"{"type":"record","name":"topLevelRecord","fields":[{"name":"value","type":[{"type":"fixed","name":"fixed","namespace":"topLevelRecord.value","size":11,"logicalType":"decimal","precision":25,"scale":2},"null"]}]}"#;
310338
assert_eq!(schema_json, expected);

arrow-avro/src/reader/mod.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,19 +73,35 @@ fn read_blocks<R: BufRead>(mut reader: R) -> impl Iterator<Item = Result<Block,
7373

7474
#[cfg(test)]
7575
mod test {
76+
use crate::compression::CompressionCodec;
7677
use crate::reader::{read_blocks, read_header};
7778
use crate::test_util::arrow_test_data;
7879
use std::fs::File;
7980
use std::io::BufReader;
8081

8182
#[test]
8283
fn test_mux() {
83-
let file = File::open(arrow_test_data("avro/alltypes_plain.avro")).unwrap();
84-
let mut reader = BufReader::new(file);
85-
let header = read_header(&mut reader).unwrap();
86-
for result in read_blocks(reader) {
87-
let block = result.unwrap();
88-
assert_eq!(block.sync, header.sync());
84+
let files = [
85+
"avro/alltypes_plain.avro",
86+
"avro/alltypes_plain.snappy.avro",
87+
"avro/alltypes_plain.zstandard.avro",
88+
"avro/alltypes_nulls_plain.avro",
89+
];
90+
91+
for file in files {
92+
println!("file: {file}");
93+
let file = File::open(arrow_test_data(file)).unwrap();
94+
let mut reader = BufReader::new(file);
95+
let header = read_header(&mut reader).unwrap();
96+
let compression = header.compression().unwrap();
97+
println!("compression: {compression:?}");
98+
for result in read_blocks(reader) {
99+
let block = result.unwrap();
100+
assert_eq!(block.sync, header.sync());
101+
if let Some(c) = compression {
102+
c.decompress(&block.data).unwrap();
103+
}
104+
}
89105
}
90106
}
91107
}

0 commit comments

Comments
 (0)