Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 18 additions & 85 deletions arrow-avro/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::schema::{
make_full_name, Array, Attributes, AvroSchema, ComplexType, Enum, Fixed, Map, Nullability,
PrimitiveType, Record, Schema, Type, TypeName, AVRO_ENUM_SYMBOLS_METADATA_KEY,
AVRO_FIELD_DEFAULT_METADATA_KEY, AVRO_ROOT_RECORD_DEFAULT_NAME,
make_full_name, Array, Attributes, ComplexType, Enum, Fixed, Map, Nullability, PrimitiveType,
Record, Schema, Type, TypeName, AVRO_ENUM_SYMBOLS_METADATA_KEY,
AVRO_FIELD_DEFAULT_METADATA_KEY,
};
use arrow_schema::{
ArrowError, DataType, Field, Fields, IntervalUnit, TimeUnit, UnionFields, UnionMode,
Expand Down Expand Up @@ -77,8 +76,6 @@ pub(crate) enum AvroLiteral {
Array(Vec<AvroLiteral>),
/// Represents a JSON object default for an Avro map/struct, mapping string keys to value literals.
Map(IndexMap<String, AvroLiteral>),
/// Represents an unsupported literal type.
Unsupported,
}

/// Contains the necessary information to resolve a writer's record against a reader's record schema.
Expand Down Expand Up @@ -208,7 +205,7 @@ impl AvroDataType {
}

/// Returns an arrow [`Field`] with the given name
pub fn field_with_name(&self, name: &str) -> Field {
pub(crate) fn field_with_name(&self, name: &str) -> Field {
let mut nullable = self.nullability.is_some();
if !nullable {
if let Codec::Union(children, _, _) = self.codec() {
Expand All @@ -230,7 +227,7 @@ impl AvroDataType {
///
/// The codec determines how Avro data is encoded and mapped to Arrow data types.
/// This is useful when we need to inspect or use the specific encoding of a field.
pub fn codec(&self) -> &Codec {
pub(crate) fn codec(&self) -> &Codec {
&self.codec
}

Expand Down Expand Up @@ -524,29 +521,6 @@ impl AvroField {
pub(crate) fn name(&self) -> &str {
&self.name
}

/// Performs schema resolution between a writer and reader schema.
///
/// This is the primary entry point for handling schema evolution. It produces an
/// `AvroField` that contains all the necessary information to read data written
/// with the `writer` schema as if it were written with the `reader` schema.
pub(crate) fn resolve_from_writer_and_reader<'a>(
writer_schema: &'a Schema<'a>,
reader_schema: &'a Schema<'a>,
use_utf8view: bool,
strict_mode: bool,
) -> Result<Self, ArrowError> {
let top_name = match reader_schema {
Schema::Complex(ComplexType::Record(r)) => r.name.to_string(),
_ => AVRO_ROOT_RECORD_DEFAULT_NAME.to_string(),
};
let mut resolver = Maker::new(use_utf8view, strict_mode);
let data_type = resolver.make_data_type(writer_schema, Some(reader_schema), None)?;
Ok(Self {
name: top_name,
data_type,
})
}
}

impl<'a> TryFrom<&Schema<'a>> for AvroField {
Expand Down Expand Up @@ -1629,28 +1603,6 @@ impl<'a> Maker<'a> {
Ok(datatype)
}

fn resolve_nullable_union<'s>(
&mut self,
writer_variants: &'s [Schema<'a>],
reader_variants: &'s [Schema<'a>],
namespace: Option<&'a str>,
) -> Result<AvroDataType, ArrowError> {
match (
nullable_union_variants(writer_variants),
nullable_union_variants(reader_variants),
) {
(Some((write_nb, write_nonnull)), Some((_read_nb, read_nonnull))) => {
let mut dt = self.make_data_type(write_nonnull, Some(read_nonnull), namespace)?;
dt.nullability = Some(write_nb);
Ok(dt)
}
_ => Err(ArrowError::NotYetImplemented(
"Union resolution requires both writer and reader to be 2-branch nullable unions"
.to_string(),
)),
}
}

// Resolve writer vs. reader enum schemas according to Avro 1.11.1.
//
// # How enums resolve (writer to reader)
Expand Down Expand Up @@ -1915,9 +1867,11 @@ impl<'a> Maker<'a> {
mod tests {
use super::*;
use crate::schema::{
Attributes, Field as AvroFieldSchema, Fixed, PrimitiveType, Schema, Type, TypeName,
Array, Attributes, ComplexType, Field as AvroFieldSchema, Fixed, PrimitiveType, Record,
Schema, Type, TypeName, AVRO_ROOT_RECORD_DEFAULT_NAME,
};
use serde_json;
use indexmap::IndexMap;
use serde_json::{self, Value};

fn create_schema_with_logical_type(
primitive_type: PrimitiveType,
Expand All @@ -1934,21 +1888,6 @@ mod tests {
})
}

fn create_fixed_schema(size: usize, logical_type: &'static str) -> Schema<'static> {
let attributes = Attributes {
logical_type: Some(logical_type),
additional: Default::default(),
};

Schema::Complex(ComplexType::Fixed(Fixed {
name: "fixed_type",
namespace: None,
aliases: Vec::new(),
size,
attributes,
}))
}

fn resolve_promotion(writer: PrimitiveType, reader: PrimitiveType) -> AvroDataType {
let writer_schema = Schema::TypeName(TypeName::Primitive(writer));
let reader_schema = Schema::TypeName(TypeName::Primitive(reader));
Expand All @@ -1965,17 +1904,6 @@ mod tests {
Schema::Union(branches)
}

fn mk_record_name(name: &str) -> Schema<'_> {
Schema::Complex(ComplexType::Record(Record {
name,
namespace: None,
doc: None,
aliases: vec![],
fields: vec![],
attributes: Attributes::default(),
}))
}

#[test]
fn test_date_logical_type() {
let schema = create_schema_with_logical_type(PrimitiveType::Int, "date");
Expand Down Expand Up @@ -2068,7 +1996,7 @@ mod tests {

#[test]
fn test_decimal_logical_type_not_implemented() {
let mut codec = Codec::Fixed(16);
let codec = Codec::Fixed(16);

let process_decimal = || -> Result<(), ArrowError> {
if let Codec::Fixed(_) = codec {
Expand Down Expand Up @@ -2556,9 +2484,14 @@ mod tests {
fn test_resolve_from_writer_and_reader_defaults_root_name_for_non_record_reader() {
let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
let field =
AvroField::resolve_from_writer_and_reader(&writer_schema, &reader_schema, false, false)
.expect("resolution should succeed");
let mut maker = Maker::new(false, false);
let data_type = maker
.make_data_type(&writer_schema, Some(&reader_schema), None)
.expect("resolution should succeed");
let field = AvroField {
name: AVRO_ROOT_RECORD_DEFAULT_NAME.to_string(),
data_type,
};
assert_eq!(field.name(), AVRO_ROOT_RECORD_DEFAULT_NAME);
assert!(matches!(field.data_type().codec(), Codec::Utf8));
}
Expand Down
1 change: 0 additions & 1 deletion arrow-avro/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

use arrow_schema::ArrowError;
use std::io;
use std::io::{Read, Write};

/// The metadata key used for storing the JSON encoded [`CompressionCodec`]
Expand Down
1 change: 0 additions & 1 deletion arrow-avro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@
)]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![warn(missing_docs)]
#![allow(unused)] // Temporary

/// Core functionality for reading Avro data into Arrow arrays
///
Expand Down
6 changes: 3 additions & 3 deletions arrow-avro/src/reader/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl<'a> AvroCursor<'a> {
ArrowError::ParseError("offset overflow reading avro bytes".to_string())
})?;

if (self.buf.len() < len) {
if self.buf.len() < len {
return Err(ArrowError::ParseError(
"Unexpected EOF reading bytes".to_string(),
));
Expand All @@ -97,7 +97,7 @@ impl<'a> AvroCursor<'a> {

#[inline]
pub(crate) fn get_float(&mut self) -> Result<f32, ArrowError> {
if (self.buf.len() < 4) {
if self.buf.len() < 4 {
return Err(ArrowError::ParseError(
"Unexpected EOF reading float".to_string(),
));
Expand All @@ -109,7 +109,7 @@ impl<'a> AvroCursor<'a> {

#[inline]
pub(crate) fn get_double(&mut self) -> Result<f64, ArrowError> {
if (self.buf.len() < 8) {
if self.buf.len() < 8 {
return Err(ArrowError::ParseError(
"Unexpected EOF reading float".to_string(),
));
Expand Down
27 changes: 24 additions & 3 deletions arrow-avro/src/reader/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,27 @@ use crate::compression::{CompressionCodec, CODEC_METADATA_KEY};
use crate::reader::vlq::VLQDecoder;
use crate::schema::{Schema, SCHEMA_METADATA_KEY};
use arrow_schema::ArrowError;
use std::io::BufRead;

/// Read the Avro file header (magic, metadata, sync marker) from `reader`.
pub(crate) fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
let mut decoder = HeaderDecoder::default();
loop {
let buf = reader.fill_buf()?;
if buf.is_empty() {
break;
}
let read = buf.len();
let decoded = decoder.decode(buf)?;
reader.consume(decoded);
if decoded != read {
break;
}
}
decoder.flush().ok_or_else(|| {
ArrowError::ParseError("Unexpected EOF while reading Avro header".to_string())
})
}

#[derive(Debug)]
enum HeaderDecoderState {
Expand Down Expand Up @@ -265,13 +286,13 @@ impl HeaderDecoder {
#[cfg(test)]
mod test {
use super::*;
use crate::codec::{AvroDataType, AvroField};
use crate::codec::AvroField;
use crate::reader::read_header;
use crate::schema::SCHEMA_METADATA_KEY;
use crate::test_util::arrow_test_data;
use arrow_schema::{DataType, Field, Fields, TimeUnit};
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::io::BufReader;

#[test]
fn test_header_decode() {
Expand All @@ -291,7 +312,7 @@ mod test {

fn decode_file(file: &str) -> Header {
let file = File::open(file).unwrap();
read_header(BufReader::with_capacity(100, file)).unwrap()
read_header(BufReader::with_capacity(1000, file)).unwrap()
}

#[test]
Expand Down
Loading
Loading