Skip to content

Commit 81d4f82

Browse files
committed
Map AvroSchema to Arrow (apache#4886)
1 parent cc23cac commit 81d4f82

File tree

4 files changed

+349
-6
lines changed

4 files changed

+349
-6
lines changed

arrow-avro/src/codec.rs

Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::schema::{ComplexType, PrimitiveType, Record, Schema, TypeName};
19+
use arrow_schema::{
20+
ArrowError, DataType, Field, FieldRef, IntervalUnit, SchemaBuilder, SchemaRef, TimeUnit,
21+
};
22+
use std::borrow::Cow;
23+
use std::collections::HashMap;
24+
use std::sync::Arc;
25+
26+
#[derive(Debug, Copy, Clone)]
27+
enum Nulls {
28+
NullFirst,
29+
NullSecond,
30+
}
31+
32+
/// An Avro field mapped to the arrow data model
33+
#[derive(Debug, Clone)]
34+
pub struct AvroField {
35+
nulls: Option<Nulls>,
36+
meta: Arc<AvroFieldMeta>,
37+
}
38+
39+
#[derive(Debug, Clone)]
40+
struct AvroFieldMeta {
41+
name: String,
42+
metadata: HashMap<String, String>,
43+
codec: Codec,
44+
}
45+
46+
impl AvroField {
47+
/// Returns the arrow [`Field`]
48+
pub fn field(&self) -> Field {
49+
let d = self.meta.codec.data_type();
50+
Field::new(&self.meta.name, d, self.nulls.is_some())
51+
.with_metadata(self.meta.metadata.clone())
52+
}
53+
54+
/// Returns the [`Codec`]
55+
pub fn codec(&self) -> &Codec {
56+
&self.meta.codec
57+
}
58+
}
59+
60+
impl<'a> TryFrom<&Schema<'a>> for AvroField {
61+
type Error = ArrowError;
62+
63+
fn try_from(schema: &Schema<'a>) -> Result<Self, Self::Error> {
64+
let mut resolver = Resolver::default();
65+
make_field(schema, "item", None, &mut resolver)
66+
}
67+
}
68+
69+
#[derive(Debug, Clone)]
70+
pub enum Codec {
71+
Null,
72+
Boolean,
73+
Int32,
74+
Int64,
75+
Float32,
76+
Float64,
77+
Binary,
78+
Utf8,
79+
Date32,
80+
TimeMillis,
81+
TimeMicros,
82+
/// Timestamp (is_utc)
83+
TimestampMillis(bool),
84+
TimestampMicros(bool),
85+
Fixed(i32),
86+
List(Arc<AvroField>),
87+
Struct(Arc<[AvroField]>),
88+
Duration,
89+
}
90+
91+
impl Codec {
92+
fn data_type(&self) -> DataType {
93+
match self {
94+
Self::Null => DataType::Null,
95+
Self::Boolean => DataType::Boolean,
96+
Self::Int32 => DataType::Int32,
97+
Self::Int64 => DataType::Int64,
98+
Self::Float32 => DataType::Float32,
99+
Self::Float64 => DataType::Float64,
100+
Self::Binary => DataType::Binary,
101+
Self::Utf8 => DataType::Utf8,
102+
Self::Date32 => DataType::Date32,
103+
Self::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
104+
Self::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
105+
Self::TimestampMillis(is_utc) => {
106+
DataType::Timestamp(TimeUnit::Millisecond, is_utc.then(|| "+00:00".into()))
107+
}
108+
Self::TimestampMicros(is_utc) => {
109+
DataType::Timestamp(TimeUnit::Microsecond, is_utc.then(|| "+00:00".into()))
110+
}
111+
Self::Duration => DataType::Interval(IntervalUnit::MonthDayNano),
112+
Self::Fixed(size) => DataType::FixedSizeBinary(*size),
113+
Self::List(f) => DataType::List(Arc::new(f.field())),
114+
Self::Struct(f) => DataType::Struct(f.iter().map(|x| x.field()).collect()),
115+
}
116+
}
117+
}
118+
119+
impl From<PrimitiveType> for Codec {
120+
fn from(value: PrimitiveType) -> Self {
121+
match value {
122+
PrimitiveType::Null => Self::Null,
123+
PrimitiveType::Boolean => Self::Boolean,
124+
PrimitiveType::Int => Self::Int32,
125+
PrimitiveType::Long => Self::Int64,
126+
PrimitiveType::Float => Self::Float32,
127+
PrimitiveType::Double => Self::Float64,
128+
PrimitiveType::Bytes => Self::Binary,
129+
PrimitiveType::String => Self::Utf8,
130+
}
131+
}
132+
}
133+
134+
#[derive(Debug, Default)]
135+
struct Resolver<'a> {
136+
map: HashMap<(&'a str, &'a str), AvroField>,
137+
}
138+
139+
impl<'a> Resolver<'a> {
140+
fn register(&mut self, name: &'a str, namespace: Option<&'a str>, schema: AvroField) {
141+
self.map.insert((name, namespace.unwrap_or("")), schema);
142+
}
143+
144+
fn resolve(&self, name: &str, namespace: Option<&'a str>) -> Result<AvroField, ArrowError> {
145+
let (namespace, name) = name
146+
.rsplit_once('.')
147+
.unwrap_or_else(|| (namespace.unwrap_or(""), name));
148+
149+
self.map
150+
.get(&(namespace, name))
151+
.ok_or_else(|| ArrowError::ParseError(format!("Failed to resolve {namespace}.{name}")))
152+
.cloned()
153+
}
154+
}
155+
156+
fn make_field<'a>(
157+
schema: &Schema<'a>,
158+
name: &'a str,
159+
namespace: Option<&'a str>,
160+
resolver: &mut Resolver<'a>,
161+
) -> Result<AvroField, ArrowError> {
162+
match schema {
163+
Schema::TypeName(TypeName::Primitive(p)) => Ok(AvroField {
164+
nulls: None,
165+
meta: Arc::new(AvroFieldMeta {
166+
name: name.to_string(),
167+
metadata: Default::default(),
168+
codec: (*p).into(),
169+
}),
170+
}),
171+
Schema::TypeName(TypeName::Ref(name)) => resolver.resolve(name, namespace),
172+
Schema::Union(f) => {
173+
let null = f
174+
.iter()
175+
.position(|x| x == &Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)));
176+
match (f.len() == 2, null) {
177+
(true, Some(0)) => {
178+
let mut field = make_field(&f[1], name, namespace, resolver)?;
179+
field.nulls = Some(Nulls::NullFirst);
180+
Ok(field)
181+
}
182+
(true, Some(1)) => {
183+
let mut field = make_field(&f[0], name, namespace, resolver)?;
184+
field.nulls = Some(Nulls::NullSecond);
185+
Ok(field)
186+
}
187+
_ => Err(ArrowError::NotYetImplemented(format!(
188+
"Union of {f:?} not currently supported"
189+
))),
190+
}
191+
}
192+
Schema::Complex(c) => match c {
193+
ComplexType::Record(r) => {
194+
let namespace = r.namespace.or(namespace);
195+
let fields = r
196+
.fields
197+
.iter()
198+
.map(|field| make_field(&field.r#type, field.name, namespace, resolver))
199+
.collect::<Result<_, _>>()?;
200+
201+
let field = AvroField {
202+
nulls: None,
203+
meta: Arc::new(AvroFieldMeta {
204+
name: r.name.to_string(),
205+
codec: Codec::Struct(fields),
206+
metadata: extract_metadata(&r.attributes.additional),
207+
}),
208+
};
209+
resolver.register(name, namespace, field.clone());
210+
Ok(field)
211+
}
212+
ComplexType::Array(a) => {
213+
let mut field = make_field(a.items.as_ref(), "item", namespace, resolver)?;
214+
Ok(AvroField {
215+
nulls: None,
216+
meta: Arc::new(AvroFieldMeta {
217+
name: name.to_string(),
218+
metadata: extract_metadata(&a.attributes.additional),
219+
codec: Codec::List(Arc::new(field)),
220+
}),
221+
})
222+
}
223+
ComplexType::Fixed(f) => {
224+
let size = f.size.try_into().map_err(|e| {
225+
ArrowError::ParseError(format!("Overflow converting size to i32: {e}"))
226+
})?;
227+
228+
let field = AvroField {
229+
nulls: None,
230+
meta: Arc::new(AvroFieldMeta {
231+
name: f.name.to_string(),
232+
metadata: extract_metadata(&f.attributes.additional),
233+
codec: Codec::Fixed(size),
234+
}),
235+
};
236+
resolver.register(f.name, namespace, field.clone());
237+
Ok(field)
238+
}
239+
ComplexType::Enum(e) => Err(ArrowError::NotYetImplemented(format!(
240+
"Enum of {e:?} not currently supported"
241+
))),
242+
ComplexType::Map(m) => Err(ArrowError::NotYetImplemented(format!(
243+
"Map of {m:?} not currently supported"
244+
))),
245+
},
246+
Schema::Type(t) => {
247+
let mut field = make_field(
248+
&Schema::TypeName(t.r#type.clone()),
249+
name,
250+
namespace,
251+
resolver,
252+
)?;
253+
let meta = Arc::make_mut(&mut field.meta);
254+
255+
// https://avro.apache.org/docs/1.11.1/specification/#logical-types
256+
match (t.attributes.logical_type, &mut meta.codec) {
257+
(Some("decimal"), c @ Codec::Fixed(_)) => {
258+
return Err(ArrowError::NotYetImplemented(format!(
259+
"Decimals are not currently supported"
260+
)))
261+
}
262+
(Some("date"), c @ Codec::Int32) => *c = Codec::Date32,
263+
(Some("time-millis"), c @ Codec::Int32) => *c = Codec::TimeMillis,
264+
(Some("time-micros"), c @ Codec::Int64) => *c = Codec::TimeMicros,
265+
(Some("timestamp-millis"), c @ Codec::Int64) => *c = Codec::TimestampMillis(true),
266+
(Some("timestamp-micros"), c @ Codec::Int64) => *c = Codec::TimestampMicros(true),
267+
(Some("local-timestamp-millis"), c @ Codec::Int64) => {
268+
*c = Codec::TimestampMillis(false)
269+
}
270+
(Some("local-timestamp-micros"), c @ Codec::Int64) => {
271+
*c = Codec::TimestampMicros(false)
272+
}
273+
(Some("duration"), c @ Codec::Fixed(12)) => *c = Codec::Duration,
274+
_ => {}
275+
}
276+
277+
if !t.attributes.additional.is_empty() {
278+
for (k, v) in &t.attributes.additional {
279+
meta.metadata.insert(k.to_string(), v.to_string());
280+
}
281+
}
282+
Ok(field)
283+
}
284+
}
285+
}
286+
287+
fn extract_metadata(metadata: &HashMap<&str, serde_json::Value>) -> HashMap<String, String> {
288+
metadata
289+
.iter()
290+
.map(|(k, v)| (k.to_string(), v.to_string()))
291+
.collect()
292+
}

arrow-avro/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ mod schema;
2727

2828
mod compression;
2929

30+
mod codec;
31+
3032
#[cfg(test)]
3133
mod test_util {
3234
pub fn arrow_test_data(path: &str) -> String {

arrow-avro/src/reader/header.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,9 +236,11 @@ impl HeaderDecoder {
236236
#[cfg(test)]
237237
mod test {
238238
use super::*;
239+
use crate::codec::AvroField;
239240
use crate::reader::read_header;
240241
use crate::schema::SCHEMA_METADATA_KEY;
241242
use crate::test_util::arrow_test_data;
243+
use arrow_schema::{DataType, Field, Fields, TimeUnit};
242244
use std::fs::File;
243245
use std::io::{BufRead, BufReader};
244246

@@ -269,7 +271,34 @@ mod test {
269271
let schema_json = header.get(SCHEMA_METADATA_KEY).unwrap();
270272
let expected = br#"{"type":"record","name":"topLevelRecord","fields":[{"name":"id","type":["int","null"]},{"name":"bool_col","type":["boolean","null"]},{"name":"tinyint_col","type":["int","null"]},{"name":"smallint_col","type":["int","null"]},{"name":"int_col","type":["int","null"]},{"name":"bigint_col","type":["long","null"]},{"name":"float_col","type":["float","null"]},{"name":"double_col","type":["double","null"]},{"name":"date_string_col","type":["bytes","null"]},{"name":"string_col","type":["bytes","null"]},{"name":"timestamp_col","type":[{"type":"long","logicalType":"timestamp-micros"},"null"]}]}"#;
271273
assert_eq!(schema_json, expected);
272-
let _schema: Schema<'_> = serde_json::from_slice(schema_json).unwrap();
274+
let schema: Schema<'_> = serde_json::from_slice(schema_json).unwrap();
275+
let field = AvroField::try_from(&schema).unwrap();
276+
277+
assert_eq!(
278+
field.field(),
279+
Field::new(
280+
"topLevelRecord",
281+
DataType::Struct(Fields::from(vec![
282+
Field::new("id", DataType::Int32, true),
283+
Field::new("bool_col", DataType::Boolean, true),
284+
Field::new("tinyint_col", DataType::Int32, true),
285+
Field::new("smallint_col", DataType::Int32, true),
286+
Field::new("int_col", DataType::Int32, true),
287+
Field::new("bigint_col", DataType::Int64, true),
288+
Field::new("float_col", DataType::Float32, true),
289+
Field::new("double_col", DataType::Float64, true),
290+
Field::new("date_string_col", DataType::Binary, true),
291+
Field::new("string_col", DataType::Binary, true),
292+
Field::new(
293+
"timestamp_col",
294+
DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
295+
true
296+
),
297+
])),
298+
false
299+
)
300+
);
301+
273302
assert_eq!(
274303
u128::from_le_bytes(header.sync()),
275304
226966037233754408753420635932530907102

0 commit comments

Comments
 (0)