Skip to content

Commit f0455d1

Browse files
tustvoldalamb
andauthored
Support Parsing Avro File Headers (#4888)
* Add arrow-avro * Add HeaderDecoder * Add schema parsing * Add BlockDecoder * Further docs * Apply suggestions from code review Co-authored-by: Andrew Lamb <[email protected]> * Review feedback --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 4320a75 commit f0455d1

File tree

15 files changed

+1169
-0
lines changed

15 files changed

+1169
-0
lines changed

.github/workflows/arrow.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ on:
3939
- arrow-integration-test/**
4040
- arrow-ipc/**
4141
- arrow-json/**
42+
- arrow-avro/**
4243
- arrow-ord/**
4344
- arrow-row/**
4445
- arrow-schema/**
@@ -78,6 +79,8 @@ jobs:
7879
run: cargo test -p arrow-csv --all-features
7980
- name: Test arrow-json with all features
8081
run: cargo test -p arrow-json --all-features
82+
- name: Test arrow-avro with all features
83+
run: cargo test -p arrow-avro --all-features
8184
- name: Test arrow-string with all features
8285
run: cargo test -p arrow-string --all-features
8386
- name: Test arrow-ord with all features
@@ -202,6 +205,8 @@ jobs:
202205
run: cargo clippy -p arrow-csv --all-targets --all-features -- -D warnings
203206
- name: Clippy arrow-json with all features
204207
run: cargo clippy -p arrow-json --all-targets --all-features -- -D warnings
208+
- name: Clippy arrow-avro with all features
209+
run: cargo clippy -p arrow-avro --all-targets --all-features -- -D warnings
205210
- name: Clippy arrow-string with all features
206211
run: cargo clippy -p arrow-string --all-targets --all-features -- -D warnings
207212
- name: Clippy arrow-ord with all features

.github/workflows/dev_pr/labeler.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ arrow:
2727
- arrow-integration-testing/**/*
2828
- arrow-ipc/**/*
2929
- arrow-json/**/*
30+
- arrow-avro/**/*
3031
- arrow-ord/**/*
3132
- arrow-row/**/*
3233
- arrow-schema/**/*

.github/workflows/integration.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ on:
3838
- arrow-integration-testing/**
3939
- arrow-ipc/**
4040
- arrow-json/**
41+
- arrow-avro/**
4142
- arrow-ord/**
4243
- arrow-pyarrow-integration-testing/**
4344
- arrow-schema/**

.github/workflows/miri.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ on:
3636
- arrow-data/**
3737
- arrow-ipc/**
3838
- arrow-json/**
39+
- arrow-avro/**
3940
- arrow-schema/**
4041
- arrow-select/**
4142
- arrow-string/**

.github/workflows/parquet.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ on:
4040
- arrow-ipc/**
4141
- arrow-csv/**
4242
- arrow-json/**
43+
- arrow-avro/**
4344
- parquet/**
4445
- .github/**
4546

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ members = [
2121
"arrow",
2222
"arrow-arith",
2323
"arrow-array",
24+
"arrow-avro",
2425
"arrow-buffer",
2526
"arrow-cast",
2627
"arrow-csv",

arrow-avro/Cargo.toml

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[package]
19+
name = "arrow-avro"
20+
version = { workspace = true }
21+
description = "Support for parsing Avro format into the Arrow format"
22+
homepage = { workspace = true }
23+
repository = { workspace = true }
24+
authors = { workspace = true }
25+
license = { workspace = true }
26+
keywords = { workspace = true }
27+
include = { workspace = true }
28+
edition = { workspace = true }
29+
rust-version = { workspace = true }
30+
31+
[lib]
32+
name = "arrow_avro"
33+
path = "src/lib.rs"
34+
bench = false
35+
36+
[dependencies]
37+
arrow-array = { workspace = true }
38+
arrow-buffer = { workspace = true }
39+
arrow-cast = { workspace = true }
40+
arrow-data = { workspace = true }
41+
arrow-schema = { workspace = true }
42+
serde_json = { version = "1.0", default-features = false, features = ["std"] }
43+
serde = { version = "1.0.188", features = ["derive"] }
44+
45+
[dev-dependencies]
46+

arrow-avro/src/compression.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use serde::{Deserialize, Serialize};
19+
20+
/// The metadata key used for storing the JSON encoded [`CompressionCodec`]
21+
pub const CODEC_METADATA_KEY: &str = "avro.codec";
22+
23+
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
24+
#[serde(rename_all = "lowercase")]
25+
pub enum CompressionCodec {
26+
Null,
27+
Deflate,
28+
BZip2,
29+
Snappy,
30+
XZ,
31+
ZStandard,
32+
}

arrow-avro/src/lib.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Convert data to / from the [Apache Arrow] memory format and [Apache Avro]
19+
//!
20+
//! [Apache Arrow]: https://arrow.apache.org
21+
//! [Apache Avro]: https://avro.apache.org/
22+
23+
#![allow(unused)] // Temporary
24+
25+
pub mod reader;
26+
mod schema;
27+
28+
mod compression;

arrow-avro/src/reader/block.rs

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Decoder for [`Block`]
19+
20+
use crate::reader::vlq::VLQDecoder;
21+
use arrow_schema::ArrowError;
22+
23+
/// A file data block
24+
///
25+
/// <https://avro.apache.org/docs/1.11.1/specification/#object-container-files>
26+
#[derive(Debug, Default)]
27+
pub struct Block {
28+
/// The number of objects in this block
29+
pub count: usize,
30+
/// The serialized objects within this block
31+
pub data: Vec<u8>,
32+
/// The sync marker
33+
pub sync: [u8; 16],
34+
}
35+
36+
/// A decoder for [`Block`]
37+
#[derive(Debug)]
38+
pub struct BlockDecoder {
39+
state: BlockDecoderState,
40+
in_progress: Block,
41+
vlq_decoder: VLQDecoder,
42+
bytes_remaining: usize,
43+
}
44+
45+
#[derive(Debug)]
46+
enum BlockDecoderState {
47+
Count,
48+
Size,
49+
Data,
50+
Sync,
51+
Finished,
52+
}
53+
54+
impl Default for BlockDecoder {
55+
fn default() -> Self {
56+
Self {
57+
state: BlockDecoderState::Count,
58+
in_progress: Default::default(),
59+
vlq_decoder: Default::default(),
60+
bytes_remaining: 0,
61+
}
62+
}
63+
}
64+
65+
impl BlockDecoder {
66+
/// Parse [`Block`] from `buf`, returning the number of bytes read
67+
///
68+
/// This method can be called multiple times with consecutive chunks of data, allowing
69+
/// integration with chunked IO systems like [`BufRead::fill_buf`]
70+
///
71+
/// All errors should be considered fatal, and decoding aborted
72+
///
73+
/// Once an entire [`Block`] has been decoded this method will not read any further
74+
/// input bytes, until [`Self::flush`] is called. Afterwards [`Self::decode`]
75+
/// can then be used again to read the next block, if any
76+
///
77+
/// [`BufRead::fill_buf`]: std::io::BufRead::fill_buf
78+
pub fn decode(&mut self, mut buf: &[u8]) -> Result<usize, ArrowError> {
79+
let max_read = buf.len();
80+
while !buf.is_empty() {
81+
match self.state {
82+
BlockDecoderState::Count => {
83+
if let Some(c) = self.vlq_decoder.long(&mut buf) {
84+
self.in_progress.count = c.try_into().map_err(|_| {
85+
ArrowError::ParseError(format!(
86+
"Block count cannot be negative, got {c}"
87+
))
88+
})?;
89+
90+
self.state = BlockDecoderState::Size;
91+
}
92+
}
93+
BlockDecoderState::Size => {
94+
if let Some(c) = self.vlq_decoder.long(&mut buf) {
95+
self.bytes_remaining = c.try_into().map_err(|_| {
96+
ArrowError::ParseError(format!(
97+
"Block size cannot be negative, got {c}"
98+
))
99+
})?;
100+
101+
self.in_progress.data.reserve(self.bytes_remaining);
102+
self.state = BlockDecoderState::Data;
103+
}
104+
}
105+
BlockDecoderState::Data => {
106+
let to_read = self.bytes_remaining.min(buf.len());
107+
self.in_progress.data.extend_from_slice(&buf[..to_read]);
108+
buf = &buf[to_read..];
109+
self.bytes_remaining -= to_read;
110+
if self.bytes_remaining == 0 {
111+
self.bytes_remaining = 16;
112+
self.state = BlockDecoderState::Sync;
113+
}
114+
}
115+
BlockDecoderState::Sync => {
116+
let to_decode = buf.len().min(self.bytes_remaining);
117+
let write = &mut self.in_progress.sync[16 - to_decode..];
118+
write[..to_decode].copy_from_slice(&buf[..to_decode]);
119+
self.bytes_remaining -= to_decode;
120+
buf = &buf[to_decode..];
121+
if self.bytes_remaining == 0 {
122+
self.state = BlockDecoderState::Finished;
123+
}
124+
}
125+
BlockDecoderState::Finished => return Ok(max_read - buf.len()),
126+
}
127+
}
128+
Ok(max_read)
129+
}
130+
131+
/// Flush this decoder returning the parsed [`Block`] if any
132+
pub fn flush(&mut self) -> Option<Block> {
133+
match self.state {
134+
BlockDecoderState::Finished => {
135+
self.state = BlockDecoderState::Count;
136+
Some(std::mem::take(&mut self.in_progress))
137+
}
138+
_ => None,
139+
}
140+
}
141+
}

0 commit comments

Comments
 (0)