diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs index cf514ff88df7..27f5070a6104 100644 --- a/arrow-avro/src/codec.rs +++ b/arrow-avro/src/codec.rs @@ -28,6 +28,7 @@ use arrow_schema::{ use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION}; use indexmap::IndexMap; use serde_json::Value; +use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::fmt; use std::fmt::Display; @@ -82,7 +83,7 @@ pub(crate) enum AvroLiteral { /// Contains the necessary information to resolve a writer's record against a reader's record schema. #[derive(Debug, Clone, PartialEq)] -pub struct ResolvedRecord { +pub(crate) struct ResolvedRecord { /// Maps a writer's field index to the corresponding reader's field index. /// `None` if the writer's field is not present in the reader's schema. pub(crate) writer_to_reader: Arc<[Option]>, @@ -137,7 +138,7 @@ impl Display for Promotion { /// Information required to resolve a writer union against a reader union (or single type). #[derive(Debug, Clone, PartialEq)] -pub struct ResolvedUnion { +pub(crate) struct ResolvedUnion { /// For each writer branch index, the reader branch index and how to read it. /// `None` means the writer branch doesn't resolve against the reader. pub(crate) writer_to_reader: Arc<[Option<(usize, Promotion)>]>, @@ -151,7 +152,7 @@ pub struct ResolvedUnion { /// /// When resolving schemas, the writer's enum symbols must be mapped to the reader's symbols. #[derive(Debug, Clone, PartialEq, Eq)] -pub struct EnumMapping { +pub(crate) struct EnumMapping { /// A mapping from the writer's symbol index to the reader's symbol index. pub(crate) mapping: Arc<[i32]>, /// The index to use for a writer's symbol that is not present in the reader's enum @@ -169,7 +170,7 @@ fn with_extension_type(codec: &Codec, field: Field) -> Field { /// An Avro datatype mapped to the arrow data model #[derive(Debug, Clone, PartialEq)] -pub struct AvroDataType { +pub(crate) struct AvroDataType { nullability: Option, metadata: HashMap, codec: Codec, @@ -178,7 +179,7 @@ pub struct AvroDataType { impl AvroDataType { /// Create a new [`AvroDataType`] with the given parts. - pub fn new( + pub(crate) fn new( codec: Codec, metadata: HashMap, nullability: Option, @@ -208,7 +209,15 @@ impl AvroDataType { /// Returns an arrow [`Field`] with the given name pub fn field_with_name(&self, name: &str) -> Field { - let nullable = self.nullability.is_some(); + let mut nullable = self.nullability.is_some(); + if !nullable { + if let Codec::Union(children, _, _) = self.codec() { + // If any encoded branch is `null`, mark field as nullable + if children.iter().any(|c| matches!(c.codec(), Codec::Null)) { + nullable = true; + } + } + } let data_type = self.codec.data_type(); let field = Field::new(name, data_type, nullable).with_metadata(self.metadata.clone()); #[cfg(feature = "canonical_extension_types")] @@ -232,7 +241,7 @@ impl AvroDataType { /// - `Some(Nullability::NullFirst)` - Nulls are encoded as the first union variant /// - `Some(Nullability::NullSecond)` - Nulls are encoded as the second union variant /// - `None` - The type is not nullable - pub fn nullability(&self) -> Option { + pub(crate) fn nullability(&self) -> Option { self.nullability } @@ -476,19 +485,19 @@ impl AvroDataType { /// A named [`AvroDataType`] #[derive(Debug, Clone, PartialEq)] -pub struct AvroField { +pub(crate) struct AvroField { name: String, data_type: AvroDataType, } impl AvroField { /// Returns the arrow [`Field`] - pub fn field(&self) -> Field { + pub(crate) fn field(&self) -> Field { self.data_type.field_with_name(&self.name) } /// Returns the [`AvroDataType`] - pub fn data_type(&self) -> &AvroDataType { + pub(crate) fn data_type(&self) -> &AvroDataType { &self.data_type } @@ -500,7 +509,7 @@ impl AvroField { /// /// Returns a new `AvroField` with the same structure, but with string types /// converted to use `Utf8View` instead of `Utf8`. - pub fn with_utf8view(&self) -> Self { + pub(crate) fn with_utf8view(&self) -> Self { let mut field = self.clone(); if let Codec::Utf8 = field.data_type.codec { field.data_type.codec = Codec::Utf8View; @@ -512,7 +521,7 @@ impl AvroField { /// /// This is the field name as defined in the Avro schema. /// It's used to identify fields within a record structure. - pub fn name(&self) -> &str { + pub(crate) fn name(&self) -> &str { &self.name } @@ -562,7 +571,7 @@ impl<'a> TryFrom<&Schema<'a>> for AvroField { /// Builder for an [`AvroField`] #[derive(Debug)] -pub struct AvroFieldBuilder<'a> { +pub(crate) struct AvroFieldBuilder<'a> { writer_schema: &'a Schema<'a>, reader_schema: Option<&'a Schema<'a>>, use_utf8view: bool, @@ -571,7 +580,7 @@ pub struct AvroFieldBuilder<'a> { impl<'a> AvroFieldBuilder<'a> { /// Creates a new [`AvroFieldBuilder`] for a given writer schema. - pub fn new(writer_schema: &'a Schema<'a>) -> Self { + pub(crate) fn new(writer_schema: &'a Schema<'a>) -> Self { Self { writer_schema, reader_schema: None, @@ -585,25 +594,25 @@ impl<'a> AvroFieldBuilder<'a> { /// If a reader schema is provided, the builder will produce a resolved `AvroField` /// that can handle differences between the writer's and reader's schemas. #[inline] - pub fn with_reader_schema(mut self, reader_schema: &'a Schema<'a>) -> Self { + pub(crate) fn with_reader_schema(mut self, reader_schema: &'a Schema<'a>) -> Self { self.reader_schema = Some(reader_schema); self } /// Enable or disable Utf8View support - pub fn with_utf8view(mut self, use_utf8view: bool) -> Self { + pub(crate) fn with_utf8view(mut self, use_utf8view: bool) -> Self { self.use_utf8view = use_utf8view; self } /// Enable or disable strict mode. - pub fn with_strict_mode(mut self, strict_mode: bool) -> Self { + pub(crate) fn with_strict_mode(mut self, strict_mode: bool) -> Self { self.strict_mode = strict_mode; self } /// Build an [`AvroField`] from the builder - pub fn build(self) -> Result { + pub(crate) fn build(self) -> Result { match self.writer_schema { Schema::Complex(ComplexType::Record(r)) => { let mut resolver = Maker::new(self.use_utf8view, self.strict_mode); @@ -626,7 +635,7 @@ impl<'a> AvroFieldBuilder<'a> { /// /// #[derive(Debug, Clone, PartialEq)] -pub enum Codec { +pub(crate) enum Codec { /// Represents Avro null type, maps to Arrow's Null data type Null, /// Represents Avro boolean type, maps to Arrow's Boolean data type @@ -760,9 +769,7 @@ impl Codec { } Self::Struct(f) => DataType::Struct(f.iter().map(|x| x.field()).collect()), Self::Map(value_type) => { - let val_dt = value_type.codec.data_type(); - let val_field = Field::new("value", val_dt, value_type.nullability.is_some()) - .with_metadata(value_type.metadata.clone()); + let val_field = value_type.field_with_name("value"); DataType::Map( Arc::new(Field::new( "entries", @@ -792,22 +799,7 @@ impl Codec { /// The conversion only happens if both: /// 1. `use_utf8view` is true /// 2. The codec is currently `Utf8` - /// - /// # Example - /// ``` - /// # use arrow_avro::codec::Codec; - /// let utf8_codec1 = Codec::Utf8; - /// let utf8_codec2 = Codec::Utf8; - /// - /// // Convert to Utf8View - /// let view_codec = utf8_codec1.with_utf8view(true); - /// assert!(matches!(view_codec, Codec::Utf8View)); - /// - /// // Don't convert if use_utf8view is false - /// let unchanged_codec = utf8_codec2.with_utf8view(false); - /// assert!(matches!(unchanged_codec, Codec::Utf8)); - /// ``` - pub fn with_utf8view(self, use_utf8view: bool) -> Self { + pub(crate) fn with_utf8view(self, use_utf8view: bool) -> Self { if use_utf8view && matches!(self, Self::Utf8) { Self::Utf8View } else { @@ -1011,25 +1003,48 @@ impl<'a> Resolver<'a> { } } +fn full_name_set(name: &str, ns: Option<&str>, aliases: &[&str]) -> HashSet { + let mut out = HashSet::with_capacity(1 + aliases.len()); + let (full, _) = make_full_name(name, ns, None); + out.insert(full); + for a in aliases { + let (fa, _) = make_full_name(a, None, ns); + out.insert(fa); + } + out +} + fn names_match( writer_name: &str, + writer_namespace: Option<&str>, writer_aliases: &[&str], reader_name: &str, + reader_namespace: Option<&str>, reader_aliases: &[&str], ) -> bool { - writer_name == reader_name - || reader_aliases.contains(&writer_name) - || writer_aliases.contains(&reader_name) + let writer_set = full_name_set(writer_name, writer_namespace, writer_aliases); + let reader_set = full_name_set(reader_name, reader_namespace, reader_aliases); + // If the canonical full names match, or any alias matches cross-wise. + !writer_set.is_disjoint(&reader_set) } fn ensure_names_match( data_type: &str, writer_name: &str, + writer_namespace: Option<&str>, writer_aliases: &[&str], reader_name: &str, + reader_namespace: Option<&str>, reader_aliases: &[&str], ) -> Result<(), ArrowError> { - if names_match(writer_name, writer_aliases, reader_name, reader_aliases) { + if names_match( + writer_name, + writer_namespace, + writer_aliases, + reader_name, + reader_namespace, + reader_aliases, + ) { Ok(()) } else { Err(ArrowError::ParseError(format!( @@ -1137,6 +1152,7 @@ impl<'a> Maker<'a> { strict_mode, } } + fn make_data_type<'s>( &mut self, writer_schema: &'s Schema<'a>, @@ -1149,7 +1165,7 @@ impl<'a> Maker<'a> { } } - /// Parses a [`AvroDataType`] from the provided [`Schema`] and the given `name` and `namespace` + /// Parses a [`AvroDataType`] from the provided `Schema` and the given `name` and `namespace` /// /// `name`: is the name used to refer to `schema` in its parent /// `namespace`: an optional qualifier used as part of a type hierarchy @@ -1569,8 +1585,10 @@ impl<'a> Maker<'a> { ensure_names_match( "Fixed", writer_fixed.name, + writer_fixed.namespace, &writer_fixed.aliases, reader_fixed.name, + reader_fixed.namespace, &reader_fixed.aliases, )?; if writer_fixed.size != reader_fixed.size { @@ -1698,8 +1716,10 @@ impl<'a> Maker<'a> { ensure_names_match( "Enum", writer_enum.name, + writer_enum.namespace, &writer_enum.aliases, reader_enum.name, + reader_enum.namespace, &reader_enum.aliases, )?; if writer_enum.symbols == reader_enum.symbols { @@ -1747,6 +1767,33 @@ impl<'a> Maker<'a> { Ok(dt) } + #[inline] + fn build_writer_lookup( + writer_record: &Record<'a>, + ) -> (HashMap<&'a str, usize>, HashSet<&'a str>) { + let mut map: HashMap<&str, usize> = HashMap::with_capacity(writer_record.fields.len() * 2); + for (idx, wf) in writer_record.fields.iter().enumerate() { + // Avro field names are unique; last-in wins are acceptable and match previous behavior. + map.insert(wf.name, idx); + } + // Track ambiguous writer aliases (alias used by multiple writer fields) + let mut ambiguous: HashSet<&str> = HashSet::new(); + for (idx, wf) in writer_record.fields.iter().enumerate() { + for &alias in &wf.aliases { + match map.entry(alias) { + Entry::Occupied(e) if *e.get() != idx => { + ambiguous.insert(alias); + } + Entry::Vacant(e) => { + e.insert(idx); + } + _ => {} + } + } + } + (map, ambiguous) + } + fn resolve_records( &mut self, writer_record: &Record<'a>, @@ -1756,84 +1803,97 @@ impl<'a> Maker<'a> { ensure_names_match( "Record", writer_record.name, + writer_record.namespace, &writer_record.aliases, reader_record.name, + reader_record.namespace, &reader_record.aliases, )?; let writer_ns = writer_record.namespace.or(namespace); let reader_ns = reader_record.namespace.or(namespace); let reader_md = reader_record.attributes.field_metadata(); - let writer_index_map: HashMap<&str, usize> = writer_record - .fields - .iter() - .enumerate() - .map(|(idx, wf)| (wf.name, idx)) - .collect(); + // Build writer lookup and ambiguous alias set. + let (writer_lookup, ambiguous_writer_aliases) = Self::build_writer_lookup(writer_record); let mut writer_to_reader: Vec> = vec![None; writer_record.fields.len()]; - let reader_fields: Vec = reader_record - .fields - .iter() - .enumerate() - .map(|(reader_idx, r_field)| -> Result { - if let Some(&writer_idx) = writer_index_map.get(r_field.name) { - let w_schema = &writer_record.fields[writer_idx].r#type; - let dt = self.make_data_type(w_schema, Some(&r_field.r#type), reader_ns)?; - writer_to_reader[writer_idx] = Some(reader_idx); - Ok(AvroField { - name: r_field.name.to_string(), - data_type: dt, - }) - } else { - let mut dt = self.parse_type(&r_field.r#type, reader_ns)?; - match r_field.default.as_ref() { - Some(default_json) => { - dt.resolution = Some(ResolutionInfo::DefaultValue( - dt.parse_and_store_default(default_json)?, - )); - } - None => { - if dt.nullability() == Some(Nullability::NullFirst) { - dt.resolution = Some(ResolutionInfo::DefaultValue( - dt.parse_and_store_default(&Value::Null)?, - )); - } else { - return Err(ArrowError::SchemaError(format!( - "Reader field '{}' not present in writer schema must have a default value", - r_field.name - ))); - } + let mut reader_fields: Vec = Vec::with_capacity(reader_record.fields.len()); + // Capture default field indices during the main loop (one pass). + let mut default_fields: Vec = Vec::new(); + for (reader_idx, r_field) in reader_record.fields.iter().enumerate() { + // Direct name match, then reader aliases (a writer alias map is pre-populated). + let mut match_idx = writer_lookup.get(r_field.name).copied(); + let mut matched_via_alias: Option<&str> = None; + if match_idx.is_none() { + for &alias in &r_field.aliases { + if let Some(i) = writer_lookup.get(alias).copied() { + if self.strict_mode && ambiguous_writer_aliases.contains(alias) { + return Err(ArrowError::SchemaError(format!( + "Ambiguous alias '{alias}' on reader field '{}' matches multiple writer fields", + r_field.name + ))); } + match_idx = Some(i); + matched_via_alias = Some(alias); + break; } - Ok(AvroField { - name: r_field.name.to_string(), - data_type: dt, - }) } - }) - .collect::>()?; - let default_fields: Vec = reader_fields - .iter() - .enumerate() - .filter_map(|(index, field)| { - matches!( - field.data_type().resolution, - Some(ResolutionInfo::DefaultValue(_)) - ) - .then_some(index) - }) - .collect(); - let skip_fields: Vec> = writer_record - .fields - .iter() - .enumerate() - .map(|(writer_index, writer_field)| { - if writer_to_reader[writer_index].is_some() { - Ok(None) - } else { - self.parse_type(&writer_field.r#type, writer_ns).map(Some) + } + if let Some(wi) = match_idx { + if writer_to_reader[wi].is_none() { + let w_schema = &writer_record.fields[wi].r#type; + let dt = self.make_data_type(w_schema, Some(&r_field.r#type), reader_ns)?; + writer_to_reader[wi] = Some(reader_idx); + reader_fields.push(AvroField { + name: r_field.name.to_owned(), + data_type: dt, + }); + continue; + } else if self.strict_mode { + // Writer field already mapped and strict_mode => error + let existing_reader = writer_to_reader[wi].unwrap(); + let via = matched_via_alias + .map(|a| format!("alias '{a}'")) + .unwrap_or_else(|| "name match".to_string()); + return Err(ArrowError::SchemaError(format!( + "Multiple reader fields map to the same writer field '{}' via {via} (existing reader index {existing_reader}, new reader index {reader_idx})", + writer_record.fields[wi].name + ))); } - }) - .collect::>()?; + // Non-strict and already mapped -> fall through to defaulting logic + } + // No match (or conflicted in non-strict mode): attach default per Avro spec. + let mut dt = self.parse_type(&r_field.r#type, reader_ns)?; + if let Some(default_json) = r_field.default.as_ref() { + dt.resolution = Some(ResolutionInfo::DefaultValue( + dt.parse_and_store_default(default_json)?, + )); + default_fields.push(reader_idx); + } else if dt.nullability() == Some(Nullability::NullFirst) { + // The only valid implicit default for a union is the first branch (null-first case). + dt.resolution = Some(ResolutionInfo::DefaultValue( + dt.parse_and_store_default(&Value::Null)?, + )); + default_fields.push(reader_idx); + } else { + return Err(ArrowError::SchemaError(format!( + "Reader field '{}' not present in writer schema must have a default value", + r_field.name + ))); + } + reader_fields.push(AvroField { + name: r_field.name.to_owned(), + data_type: dt, + }); + } + // Build skip_fields in writer order; pre-size and push. + let mut skip_fields: Vec> = + Vec::with_capacity(writer_record.fields.len()); + for (writer_index, writer_field) in writer_record.fields.iter().enumerate() { + if writer_to_reader[writer_index].is_some() { + skip_fields.push(None); + } else { + skip_fields.push(Some(self.parse_type(&writer_field.r#type, writer_ns)?)); + } + } let resolved = AvroDataType::new_with_resolution( Codec::Struct(Arc::from(reader_fields)), reader_md, @@ -1844,7 +1904,7 @@ impl<'a> Maker<'a> { skip_fields: Arc::from(skip_fields), })), ); - // Register a resolved record by reader name+namespace for potential named type refs + // Register a resolved record by reader name+namespace for potential named type refs. self.resolver .register(reader_record.name, reader_ns, resolved.clone()); Ok(resolved) @@ -1854,7 +1914,9 @@ impl<'a> Maker<'a> { #[cfg(test)] mod tests { use super::*; - use crate::schema::{Attributes, Fixed, PrimitiveType, Schema, Type, TypeName}; + use crate::schema::{ + Attributes, Field as AvroFieldSchema, Fixed, PrimitiveType, Schema, Type, TypeName, + }; use serde_json; fn create_schema_with_logical_type( @@ -2068,6 +2130,7 @@ mod tests { r#type: field_schema, default: None, doc: None, + aliases: vec![], }; let record = Record { @@ -2845,18 +2908,21 @@ mod tests { doc: None, r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)), default: None, + aliases: vec![], }, crate::schema::Field { name: "skipme", doc: None, r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)), default: None, + aliases: vec![], }, crate::schema::Field { name: "b", doc: None, r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)), default: None, + aliases: vec![], }, ], attributes: Attributes::default(), @@ -2872,18 +2938,21 @@ mod tests { doc: None, r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)), default: None, + aliases: vec![], }, crate::schema::Field { name: "a", doc: None, r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)), default: None, + aliases: vec![], }, crate::schema::Field { name: "name", doc: None, r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)), default: Some(json_string("anon")), + aliases: vec![], }, crate::schema::Field { name: "opt", @@ -2893,6 +2962,7 @@ mod tests { Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)), ]), default: None, // should default to null because NullFirst + aliases: vec![], }, ], attributes: Attributes::default(), @@ -2935,4 +3005,101 @@ mod tests { Some(&"null".to_string()) ); } + + #[test] + fn test_named_type_alias_resolution_record_cross_namespace() { + let writer_record = Record { + name: "PersonV2", + namespace: Some("com.example.v2"), + doc: None, + aliases: vec!["com.example.Person"], + fields: vec![ + AvroFieldSchema { + name: "name", + doc: None, + r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)), + default: None, + aliases: vec![], + }, + AvroFieldSchema { + name: "age", + doc: None, + r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)), + default: None, + aliases: vec![], + }, + ], + attributes: Attributes::default(), + }; + let reader_record = Record { + name: "Person", + namespace: Some("com.example"), + doc: None, + aliases: vec![], + fields: writer_record.fields.clone(), + attributes: Attributes::default(), + }; + let writer_schema = Schema::Complex(ComplexType::Record(writer_record)); + let reader_schema = Schema::Complex(ComplexType::Record(reader_record)); + let mut maker = Maker::new(false, false); + let result = maker + .make_data_type(&writer_schema, Some(&reader_schema), None) + .expect("record alias resolution should succeed"); + match result.codec { + Codec::Struct(ref fields) => assert_eq!(fields.len(), 2), + other => panic!("expected struct, got {other:?}"), + } + } + + #[test] + fn test_named_type_alias_resolution_enum_cross_namespace() { + let writer_enum = Enum { + name: "ColorV2", + namespace: Some("org.example.v2"), + doc: None, + aliases: vec!["org.example.Color"], + symbols: vec!["RED", "GREEN", "BLUE"], + default: None, + attributes: Attributes::default(), + }; + let reader_enum = Enum { + name: "Color", + namespace: Some("org.example"), + doc: None, + aliases: vec![], + symbols: vec!["RED", "GREEN", "BLUE"], + default: None, + attributes: Attributes::default(), + }; + let writer_schema = Schema::Complex(ComplexType::Enum(writer_enum)); + let reader_schema = Schema::Complex(ComplexType::Enum(reader_enum)); + let mut maker = Maker::new(false, false); + maker + .make_data_type(&writer_schema, Some(&reader_schema), None) + .expect("enum alias resolution should succeed"); + } + + #[test] + fn test_named_type_alias_resolution_fixed_cross_namespace() { + let writer_fixed = Fixed { + name: "Fx10V2", + namespace: Some("ns.v2"), + aliases: vec!["ns.Fx10"], + size: 10, + attributes: Attributes::default(), + }; + let reader_fixed = Fixed { + name: "Fx10", + namespace: Some("ns"), + aliases: vec![], + size: 10, + attributes: Attributes::default(), + }; + let writer_schema = Schema::Complex(ComplexType::Fixed(writer_fixed)); + let reader_schema = Schema::Complex(ComplexType::Fixed(reader_fixed)); + let mut maker = Maker::new(false, false); + maker + .make_data_type(&writer_schema, Some(&reader_schema), None) + .expect("fixed alias resolution should succeed"); + } } diff --git a/arrow-avro/src/reader/header.rs b/arrow-avro/src/reader/header.rs index 2d26df07aa9c..46f025d642bb 100644 --- a/arrow-avro/src/reader/header.rs +++ b/arrow-avro/src/reader/header.rs @@ -91,7 +91,7 @@ impl Header { } } - /// Returns the [`Schema`] if any + /// Returns the `Schema` if any pub(crate) fn schema(&self) -> Result>, ArrowError> { self.get(SCHEMA_METADATA_KEY) .map(|x| { diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs index 7642c90eacb3..dc2fb630b2c3 100644 --- a/arrow-avro/src/reader/mod.rs +++ b/arrow-avro/src/reader/mod.rs @@ -476,8 +476,8 @@ //! --- use crate::codec::{AvroField, AvroFieldBuilder}; use crate::schema::{ - compare_schemas, AvroSchema, Fingerprint, FingerprintAlgorithm, Schema, SchemaStore, - CONFLUENT_MAGIC, SINGLE_OBJECT_MAGIC, + AvroSchema, Fingerprint, FingerprintAlgorithm, Schema, SchemaStore, CONFLUENT_MAGIC, + SINGLE_OBJECT_MAGIC, }; use arrow_array::{Array, RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, SchemaRef}; @@ -927,7 +927,7 @@ impl Decoder { /// /// Build a `Decoder` for Confluent messages: /// -/// ```no_run +/// ``` /// use arrow_avro::schema::{AvroSchema, SchemaStore, Fingerprint, FingerprintAlgorithm}; /// use arrow_avro::reader::ReaderBuilder; /// @@ -1297,6 +1297,7 @@ mod test { SchemaStore, AVRO_ENUM_SYMBOLS_METADATA_KEY, CONFLUENT_MAGIC, SINGLE_OBJECT_MAGIC, }; use crate::test_util::arrow_test_data; + use crate::writer::AvroWriter; use arrow::array::ArrayDataBuilder; use arrow_array::builder::{ ArrayBuilder, BooleanBuilder, Float32Builder, Float64Builder, Int32Builder, Int64Builder, @@ -1615,6 +1616,121 @@ mod test { AvroSchema::new(root.to_string()) } + fn write_ocf(schema: &Schema, batches: &[RecordBatch]) -> Vec { + let mut w = AvroWriter::new(Vec::::new(), schema.clone()).expect("writer"); + for b in batches { + w.write(b).expect("write"); + } + w.finish().expect("finish"); + w.into_inner() + } + + #[test] + fn writer_string_reader_nullable_with_alias() -> Result<(), Box> { + // Writer: { id: long, name: string } + let writer_schema = Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, false), + ]); + let batch = RecordBatch::try_new( + Arc::new(writer_schema.clone()), + vec![ + Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef, + Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef, + ], + )?; + let bytes = write_ocf(&writer_schema, &[batch]); + let reader_json = r#" + { + "type": "record", + "name": "topLevelRecord", + "fields": [ + { "name": "id", "type": "long" }, + { "name": "full_name", "type": ["null","string"], "aliases": ["name"], "default": null }, + { "name": "is_active", "type": "boolean", "default": true } + ] + }"#; + let mut reader = ReaderBuilder::new() + .with_reader_schema(AvroSchema::new(reader_json.to_string())) + .build(Cursor::new(bytes))?; + let out = reader.next().unwrap()?; + // Evolved aliased field should be non-null and match original writer values + let full_name = out.column(1).as_string::(); + assert_eq!(full_name.value(0), "a"); + assert_eq!(full_name.value(1), "b"); + + Ok(()) + } + + #[test] + fn writer_string_reader_string_null_order_second() -> Result<(), Box> { + // Writer: { name: string } + let writer_schema = Schema::new(vec![Field::new("name", DataType::Utf8, false)]); + let batch = RecordBatch::try_new( + Arc::new(writer_schema.clone()), + vec![Arc::new(StringArray::from(vec!["x", "y"])) as ArrayRef], + )?; + let bytes = write_ocf(&writer_schema, &[batch]); + + // Reader: ["string","null"] (NullSecond) + let reader_json = r#" + { + "type":"record", "name":"topLevelRecord", + "fields":[ { "name":"name", "type":["string","null"], "default":"x" } ] + }"#; + + let mut reader = ReaderBuilder::new() + .with_reader_schema(AvroSchema::new(reader_json.to_string())) + .build(Cursor::new(bytes))?; + + let out = reader.next().unwrap()?; + assert_eq!(out.num_rows(), 2); + + // Should decode as non-null strings (writer non-union -> reader union) + let name = out.column(0).as_string::(); + assert_eq!(name.value(0), "x"); + assert_eq!(name.value(1), "y"); + + Ok(()) + } + + #[test] + fn promotion_writer_int_reader_nullable_long() -> Result<(), Box> { + // Writer: { v: int } + let writer_schema = Schema::new(vec![Field::new("v", DataType::Int32, false)]); + let batch = RecordBatch::try_new( + Arc::new(writer_schema.clone()), + vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef], + )?; + let bytes = write_ocf(&writer_schema, &[batch]); + + // Reader: { v: ["null","long"] } + let reader_json = r#" + { + "type":"record", "name":"topLevelRecord", + "fields":[ { "name":"v", "type":["null","long"], "default": null } ] + }"#; + + let mut reader = ReaderBuilder::new() + .with_reader_schema(AvroSchema::new(reader_json.to_string())) + .build(Cursor::new(bytes))?; + + let out = reader.next().unwrap()?; + assert_eq!(out.num_rows(), 3); + + // Should have promoted to Int64 and be non-null (no union tag in writer) + let v = out + .column(0) + .as_primitive::(); + assert_eq!(v.values(), &[1, 2, 3]); + assert!( + out.column(0).nulls().is_none(), + "expected no validity bitmap for all-valid column" + ); + + Ok(()) + } + #[test] fn test_alltypes_schema_promotion_mixed() { let files = [ @@ -6051,4 +6167,2159 @@ mod test { "INACTIVE" ); } + + #[test] + fn comprehensive_e2e_test() { + let path = "test/data/comprehensive_e2e.avro"; + let batch = read_file(path, 1024, false); + let schema = batch.schema(); + + #[inline] + fn tid_by_name(fields: &UnionFields, want: &str) -> i8 { + for (tid, f) in fields.iter() { + if f.name() == want { + return tid; + } + } + panic!("union child '{want}' not found"); + } + + #[inline] + fn tid_by_dt(fields: &UnionFields, pred: impl Fn(&DataType) -> bool) -> i8 { + for (tid, f) in fields.iter() { + if pred(f.data_type()) { + return tid; + } + } + panic!("no union child matches predicate"); + } + + fn mk_dense_union( + fields: &UnionFields, + type_ids: Vec, + offsets: Vec, + provide: impl Fn(&Field) -> Option, + ) -> ArrayRef { + fn empty_child_for(dt: &DataType) -> Arc { + match dt { + DataType::Null => Arc::new(NullArray::new(0)), + DataType::Boolean => Arc::new(BooleanArray::from(Vec::::new())), + DataType::Int32 => Arc::new(Int32Array::from(Vec::::new())), + DataType::Int64 => Arc::new(Int64Array::from(Vec::::new())), + DataType::Float32 => Arc::new(Float32Array::from(Vec::::new())), + DataType::Float64 => Arc::new(Float64Array::from(Vec::::new())), + DataType::Binary => Arc::new(BinaryArray::from(Vec::<&[u8]>::new())), + DataType::Utf8 => Arc::new(StringArray::from(Vec::<&str>::new())), + DataType::Date32 => Arc::new(Date32Array::from(Vec::::new())), + DataType::Time32(arrow_schema::TimeUnit::Millisecond) => { + Arc::new(Time32MillisecondArray::from(Vec::::new())) + } + DataType::Time64(arrow_schema::TimeUnit::Microsecond) => { + Arc::new(Time64MicrosecondArray::from(Vec::::new())) + } + DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, tz) => { + let a = TimestampMillisecondArray::from(Vec::::new()); + Arc::new(if let Some(tz) = tz { + a.with_timezone(tz.clone()) + } else { + a + }) + } + DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, tz) => { + let a = TimestampMicrosecondArray::from(Vec::::new()); + Arc::new(if let Some(tz) = tz { + a.with_timezone(tz.clone()) + } else { + a + }) + } + DataType::Interval(IntervalUnit::MonthDayNano) => Arc::new( + IntervalMonthDayNanoArray::from(Vec::::new()), + ), + DataType::FixedSizeBinary(sz) => Arc::new( + FixedSizeBinaryArray::try_from_sparse_iter_with_size( + std::iter::empty::>>(), + *sz, + ) + .unwrap(), + ), + DataType::Dictionary(_, _) => { + let keys = Int32Array::from(Vec::::new()); + let values = Arc::new(StringArray::from(Vec::<&str>::new())); + Arc::new(DictionaryArray::::try_new(keys, values).unwrap()) + } + DataType::Struct(fields) => { + let children: Vec = fields + .iter() + .map(|f| empty_child_for(f.data_type()) as ArrayRef) + .collect(); + Arc::new(StructArray::new(fields.clone(), children, None)) + } + DataType::List(field) => { + let offsets = OffsetBuffer::new(ScalarBuffer::::from(vec![0])); + Arc::new( + ListArray::try_new( + field.clone(), + offsets, + empty_child_for(field.data_type()), + None, + ) + .unwrap(), + ) + } + DataType::Map(entry_field, is_sorted) => { + let (key_field, val_field) = match entry_field.data_type() { + DataType::Struct(fs) => (fs[0].clone(), fs[1].clone()), + other => panic!("unexpected map entries type: {other:?}"), + }; + let keys = StringArray::from(Vec::<&str>::new()); + let vals: ArrayRef = match val_field.data_type() { + DataType::Null => Arc::new(NullArray::new(0)) as ArrayRef, + DataType::Boolean => { + Arc::new(BooleanArray::from(Vec::::new())) as ArrayRef + } + DataType::Int32 => { + Arc::new(Int32Array::from(Vec::::new())) as ArrayRef + } + DataType::Int64 => { + Arc::new(Int64Array::from(Vec::::new())) as ArrayRef + } + DataType::Float32 => { + Arc::new(Float32Array::from(Vec::::new())) as ArrayRef + } + DataType::Float64 => { + Arc::new(Float64Array::from(Vec::::new())) as ArrayRef + } + DataType::Utf8 => { + Arc::new(StringArray::from(Vec::<&str>::new())) as ArrayRef + } + DataType::Binary => { + Arc::new(BinaryArray::from(Vec::<&[u8]>::new())) as ArrayRef + } + DataType::Union(uf, _) => { + let children: Vec = uf + .iter() + .map(|(_, f)| empty_child_for(f.data_type())) + .collect(); + Arc::new( + UnionArray::try_new( + uf.clone(), + ScalarBuffer::::from(Vec::::new()), + Some(ScalarBuffer::::from(Vec::::new())), + children, + ) + .unwrap(), + ) as ArrayRef + } + other => panic!("unsupported map value type: {other:?}"), + }; + let entries = StructArray::new( + Fields::from(vec![ + key_field.as_ref().clone(), + val_field.as_ref().clone(), + ]), + vec![Arc::new(keys) as ArrayRef, vals], + None, + ); + let offsets = OffsetBuffer::new(ScalarBuffer::::from(vec![0])); + Arc::new(MapArray::new( + entry_field.clone(), + offsets, + entries, + None, + *is_sorted, + )) + } + other => panic!("empty_child_for: unhandled type {other:?}"), + } + } + let children: Vec = fields + .iter() + .map(|(_, f)| provide(f).unwrap_or_else(|| empty_child_for(f.data_type()))) + .collect(); + Arc::new( + UnionArray::try_new( + fields.clone(), + ScalarBuffer::::from(type_ids), + Some(ScalarBuffer::::from(offsets)), + children, + ) + .unwrap(), + ) as ArrayRef + } + + #[inline] + fn uuid16_from_str(s: &str) -> [u8; 16] { + let mut out = [0u8; 16]; + let mut idx = 0usize; + let mut hi: Option = None; + for ch in s.chars() { + if ch == '-' { + continue; + } + let v = ch.to_digit(16).expect("invalid hex digit in UUID") as u8; + if let Some(h) = hi { + out[idx] = (h << 4) | v; + idx += 1; + hi = None; + } else { + hi = Some(v); + } + } + assert_eq!(idx, 16, "UUID must decode to 16 bytes"); + out + } + let date_a: i32 = 19_000; // 2022-01-08 + let time_ms_a: i32 = 12 * 3_600_000 + 34 * 60_000 + 56_000 + 789; + let time_us_eod: i64 = 86_400_000_000 - 1; + let ts_ms_2024_01_01: i64 = 1_704_067_200_000; // 2024-01-01T00:00:00Z + let ts_us_2024_01_01: i64 = ts_ms_2024_01_01 * 1_000; + let dur_small = IntervalMonthDayNanoType::make_value(1, 2, 3_000_000_000); + let dur_zero = IntervalMonthDayNanoType::make_value(0, 0, 0); + let dur_large = + IntervalMonthDayNanoType::make_value(12, 31, ((86_400_000 - 1) as i64) * 1_000_000); + let dur_2years = IntervalMonthDayNanoType::make_value(24, 0, 0); + let uuid1 = uuid16_from_str("fe7bc30b-4ce8-4c5e-b67c-2234a2d38e66"); + let uuid2 = uuid16_from_str("0826cc06-d2e3-4599-b4ad-af5fa6905cdb"); + + #[inline] + fn push_like( + reader_schema: &arrow_schema::Schema, + name: &str, + arr: ArrayRef, + fields: &mut Vec, + cols: &mut Vec, + ) { + let src = reader_schema + .field_with_name(name) + .unwrap_or_else(|_| panic!("source schema missing field '{name}'")); + let mut f = Field::new(name, arr.data_type().clone(), src.is_nullable()); + let md = src.metadata(); + if !md.is_empty() { + f = f.with_metadata(md.clone()); + } + fields.push(Arc::new(f)); + cols.push(arr); + } + + let mut fields: Vec = Vec::new(); + let mut columns: Vec = Vec::new(); + push_like( + schema.as_ref(), + "id", + Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef, + &mut fields, + &mut columns, + ); + push_like( + schema.as_ref(), + "flag", + Arc::new(BooleanArray::from(vec![true, false, true, false])) as ArrayRef, + &mut fields, + &mut columns, + ); + push_like( + schema.as_ref(), + "ratio_f32", + Arc::new(Float32Array::from(vec![1.25f32, -0.0, 3.5, 9.75])) as ArrayRef, + &mut fields, + &mut columns, + ); + push_like( + schema.as_ref(), + "ratio_f64", + Arc::new(Float64Array::from(vec![2.5f64, -1.0, 7.0, -2.25])) as ArrayRef, + &mut fields, + &mut columns, + ); + push_like( + schema.as_ref(), + "count_i32", + Arc::new(Int32Array::from(vec![7, -1, 0, 123])) as ArrayRef, + &mut fields, + &mut columns, + ); + push_like( + schema.as_ref(), + "count_i64", + Arc::new(Int64Array::from(vec![ + 7_000_000_000i64, + -2, + 0, + -9_876_543_210i64, + ])) as ArrayRef, + &mut fields, + &mut columns, + ); + push_like( + schema.as_ref(), + "opt_i32_nullfirst", + Arc::new(Int32Array::from(vec![None, Some(42), None, Some(0)])) as ArrayRef, + &mut fields, + &mut columns, + ); + push_like( + schema.as_ref(), + "opt_str_nullsecond", + Arc::new(StringArray::from(vec![ + Some("alpha"), + None, + Some("s3"), + Some(""), + ])) as ArrayRef, + &mut fields, + &mut columns, + ); + { + let uf = match schema + .field_with_name("tri_union_prim") + .unwrap() + .data_type() + { + DataType::Union(f, UnionMode::Dense) => f.clone(), + other => panic!("tri_union_prim should be dense union, got {other:?}"), + }; + let tid_i = tid_by_name(&uf, "int"); + let tid_s = tid_by_name(&uf, "string"); + let tid_b = tid_by_name(&uf, "boolean"); + let tids = vec![tid_i, tid_s, tid_b, tid_s]; + let offs = vec![0, 0, 0, 1]; + let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() { + DataType::Int32 => Some(Arc::new(Int32Array::from(vec![0])) as ArrayRef), + DataType::Utf8 => Some(Arc::new(StringArray::from(vec!["hi", ""])) as ArrayRef), + DataType::Boolean => Some(Arc::new(BooleanArray::from(vec![true])) as ArrayRef), + _ => None, + }); + push_like( + schema.as_ref(), + "tri_union_prim", + arr, + &mut fields, + &mut columns, + ); + } + + push_like( + schema.as_ref(), + "str_utf8", + Arc::new(StringArray::from(vec!["hello", "", "world", "✓ unicode"])) as ArrayRef, + &mut fields, + &mut columns, + ); + push_like( + schema.as_ref(), + "raw_bytes", + Arc::new(BinaryArray::from(vec![ + b"\x00\x01".as_ref(), + b"".as_ref(), + b"\xFF\x00".as_ref(), + b"\x10\x20\x30\x40".as_ref(), + ])) as ArrayRef, + &mut fields, + &mut columns, + ); + { + let it = [ + Some(*b"0123456789ABCDEF"), + Some([0u8; 16]), + Some(*b"ABCDEFGHIJKLMNOP"), + Some([0xAA; 16]), + ] + .into_iter(); + let arr = + Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size(it, 16).unwrap()) + as ArrayRef; + push_like( + schema.as_ref(), + "fx16_plain", + arr, + &mut fields, + &mut columns, + ); + } + { + #[cfg(feature = "small_decimals")] + let dec10_2 = Arc::new( + Decimal64Array::from_iter_values([123456i64, -1, 0, 9_999_999_999i64]) + .with_precision_and_scale(10, 2) + .unwrap(), + ) as ArrayRef; + #[cfg(not(feature = "small_decimals"))] + let dec10_2 = Arc::new( + Decimal128Array::from_iter_values([123456i128, -1, 0, 9_999_999_999i128]) + .with_precision_and_scale(10, 2) + .unwrap(), + ) as ArrayRef; + push_like( + schema.as_ref(), + "dec_bytes_s10_2", + dec10_2, + &mut fields, + &mut columns, + ); + } + { + #[cfg(feature = "small_decimals")] + let dec20_4 = Arc::new( + Decimal128Array::from_iter_values([1_234_567_891_234i128, -420_000i128, 0, -1i128]) + .with_precision_and_scale(20, 4) + .unwrap(), + ) as ArrayRef; + #[cfg(not(feature = "small_decimals"))] + let dec20_4 = Arc::new( + Decimal128Array::from_iter_values([1_234_567_891_234i128, -420_000i128, 0, -1i128]) + .with_precision_and_scale(20, 4) + .unwrap(), + ) as ArrayRef; + push_like( + schema.as_ref(), + "dec_fix_s20_4", + dec20_4, + &mut fields, + &mut columns, + ); + } + { + let it = [Some(uuid1), Some(uuid2), Some(uuid1), Some(uuid2)].into_iter(); + let arr = + Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size(it, 16).unwrap()) + as ArrayRef; + push_like(schema.as_ref(), "uuid_str", arr, &mut fields, &mut columns); + } + push_like( + schema.as_ref(), + "d_date", + Arc::new(Date32Array::from(vec![date_a, 0, 1, 365])) as ArrayRef, + &mut fields, + &mut columns, + ); + push_like( + schema.as_ref(), + "t_millis", + Arc::new(Time32MillisecondArray::from(vec![ + time_ms_a, + 0, + 1, + 86_400_000 - 1, + ])) as ArrayRef, + &mut fields, + &mut columns, + ); + push_like( + schema.as_ref(), + "t_micros", + Arc::new(Time64MicrosecondArray::from(vec![ + time_us_eod, + 0, + 1, + 1_000_000, + ])) as ArrayRef, + &mut fields, + &mut columns, + ); + { + let a = TimestampMillisecondArray::from(vec![ + ts_ms_2024_01_01, + -1, + ts_ms_2024_01_01 + 123, + 0, + ]) + .with_timezone("+00:00"); + push_like( + schema.as_ref(), + "ts_millis_utc", + Arc::new(a) as ArrayRef, + &mut fields, + &mut columns, + ); + } + { + let a = TimestampMicrosecondArray::from(vec![ + ts_us_2024_01_01, + 1, + ts_us_2024_01_01 + 456, + 0, + ]) + .with_timezone("+00:00"); + push_like( + schema.as_ref(), + "ts_micros_utc", + Arc::new(a) as ArrayRef, + &mut fields, + &mut columns, + ); + } + push_like( + schema.as_ref(), + "ts_millis_local", + Arc::new(TimestampMillisecondArray::from(vec![ + ts_ms_2024_01_01 + 86_400_000, + 0, + ts_ms_2024_01_01 + 789, + 123_456_789, + ])) as ArrayRef, + &mut fields, + &mut columns, + ); + push_like( + schema.as_ref(), + "ts_micros_local", + Arc::new(TimestampMicrosecondArray::from(vec![ + ts_us_2024_01_01 + 123_456, + 0, + ts_us_2024_01_01 + 101_112, + 987_654_321, + ])) as ArrayRef, + &mut fields, + &mut columns, + ); + { + let v = vec![dur_small, dur_zero, dur_large, dur_2years]; + push_like( + schema.as_ref(), + "interval_mdn", + Arc::new(IntervalMonthDayNanoArray::from(v)) as ArrayRef, + &mut fields, + &mut columns, + ); + } + { + let keys = Int32Array::from(vec![1, 2, 3, 0]); // NEW, PROCESSING, DONE, UNKNOWN + let values = Arc::new(StringArray::from(vec![ + "UNKNOWN", + "NEW", + "PROCESSING", + "DONE", + ])) as ArrayRef; + let dict = DictionaryArray::::try_new(keys, values).unwrap(); + push_like( + schema.as_ref(), + "status", + Arc::new(dict) as ArrayRef, + &mut fields, + &mut columns, + ); + } + { + let list_field = match schema.field_with_name("arr_union").unwrap().data_type() { + DataType::List(f) => f.clone(), + other => panic!("arr_union should be List, got {other:?}"), + }; + let uf = match list_field.data_type() { + DataType::Union(f, UnionMode::Dense) => f.clone(), + other => panic!("arr_union item should be union, got {other:?}"), + }; + let tid_l = tid_by_name(&uf, "long"); + let tid_s = tid_by_name(&uf, "string"); + let tid_n = tid_by_name(&uf, "null"); + let type_ids = vec![ + tid_l, tid_s, tid_n, tid_l, tid_n, tid_s, tid_l, tid_l, tid_s, tid_n, tid_l, + ]; + let offsets = vec![0, 0, 0, 1, 1, 1, 2, 3, 2, 2, 4]; + let values = mk_dense_union(&uf, type_ids, offsets, |f| match f.data_type() { + DataType::Int64 => { + Some(Arc::new(Int64Array::from(vec![1i64, -3, 0, -1, 0])) as ArrayRef) + } + DataType::Utf8 => { + Some(Arc::new(StringArray::from(vec!["x", "z", "end"])) as ArrayRef) + } + DataType::Null => Some(Arc::new(NullArray::new(3)) as ArrayRef), + _ => None, + }); + let list_offsets = OffsetBuffer::new(ScalarBuffer::::from(vec![0, 4, 7, 8, 11])); + let arr = Arc::new(ListArray::try_new(list_field, list_offsets, values, None).unwrap()) + as ArrayRef; + push_like(schema.as_ref(), "arr_union", arr, &mut fields, &mut columns); + } + { + let (entry_field, entries_fields, uf, is_sorted) = + match schema.field_with_name("map_union").unwrap().data_type() { + DataType::Map(entry_field, is_sorted) => { + let fs = match entry_field.data_type() { + DataType::Struct(fs) => fs.clone(), + other => panic!("map entries must be struct, got {other:?}"), + }; + let val_f = fs[1].clone(); + let uf = match val_f.data_type() { + DataType::Union(f, UnionMode::Dense) => f.clone(), + other => panic!("map value must be union, got {other:?}"), + }; + (entry_field.clone(), fs, uf, *is_sorted) + } + other => panic!("map_union should be Map, got {other:?}"), + }; + let keys = StringArray::from(vec!["a", "b", "c", "neg", "pi", "ok"]); + let moff = OffsetBuffer::new(ScalarBuffer::::from(vec![0, 3, 4, 4, 6])); + let tid_null = tid_by_name(&uf, "null"); + let tid_d = tid_by_name(&uf, "double"); + let tid_s = tid_by_name(&uf, "string"); + let type_ids = vec![tid_d, tid_null, tid_s, tid_d, tid_d, tid_s]; + let offsets = vec![0, 0, 0, 1, 2, 1]; + let pi_5dp = (std::f64::consts::PI * 100_000.0).trunc() / 100_000.0; + let vals = mk_dense_union(&uf, type_ids, offsets, |f| match f.data_type() { + DataType::Float64 => { + Some(Arc::new(Float64Array::from(vec![1.5f64, -0.5, pi_5dp])) as ArrayRef) + } + DataType::Utf8 => { + Some(Arc::new(StringArray::from(vec!["yes", "true"])) as ArrayRef) + } + DataType::Null => Some(Arc::new(NullArray::new(2)) as ArrayRef), + _ => None, + }); + let entries = StructArray::new( + entries_fields.clone(), + vec![Arc::new(keys) as ArrayRef, vals], + None, + ); + let map = + Arc::new(MapArray::new(entry_field, moff, entries, None, is_sorted)) as ArrayRef; + push_like(schema.as_ref(), "map_union", map, &mut fields, &mut columns); + } + { + let fs = match schema.field_with_name("address").unwrap().data_type() { + DataType::Struct(fs) => fs.clone(), + other => panic!("address should be Struct, got {other:?}"), + }; + let street = Arc::new(StringArray::from(vec![ + "100 Main", + "", + "42 Galaxy Way", + "End Ave", + ])) as ArrayRef; + let zip = Arc::new(Int32Array::from(vec![12345, 0, 42424, 1])) as ArrayRef; + let country = Arc::new(StringArray::from(vec!["US", "CA", "US", "GB"])) as ArrayRef; + let arr = Arc::new(StructArray::new(fs, vec![street, zip, country], None)) as ArrayRef; + push_like(schema.as_ref(), "address", arr, &mut fields, &mut columns); + } + { + let fs = match schema.field_with_name("maybe_auth").unwrap().data_type() { + DataType::Struct(fs) => fs.clone(), + other => panic!("maybe_auth should be Struct, got {other:?}"), + }; + let user = + Arc::new(StringArray::from(vec!["alice", "bob", "carol", "dave"])) as ArrayRef; + let token_values: Vec> = vec![ + None, // row 1: null + Some(b"\x01\x02\x03".as_ref()), // row 2: bytes + None, // row 3: null + Some(b"".as_ref()), // row 4: empty bytes + ]; + let token = Arc::new(BinaryArray::from(token_values)) as ArrayRef; + let arr = Arc::new(StructArray::new(fs, vec![user, token], None)) as ArrayRef; + push_like( + schema.as_ref(), + "maybe_auth", + arr, + &mut fields, + &mut columns, + ); + } + { + let uf = match schema + .field_with_name("union_enum_record_array_map") + .unwrap() + .data_type() + { + DataType::Union(f, UnionMode::Dense) => f.clone(), + other => panic!("union_enum_record_array_map should be union, got {other:?}"), + }; + let mut tid_enum: Option = None; + let mut tid_rec_a: Option = None; + let mut tid_array: Option = None; + let mut tid_map: Option = None; + let mut map_entry_field: Option = None; + let mut map_sorted: bool = false; + for (tid, f) in uf.iter() { + match f.data_type() { + DataType::Dictionary(_, _) => tid_enum = Some(tid), + DataType::Struct(childs) + if childs.len() == 2 + && childs[0].name() == "a" + && childs[1].name() == "b" => + { + tid_rec_a = Some(tid) + } + DataType::List(item) if matches!(item.data_type(), DataType::Int64) => { + tid_array = Some(tid) + } + DataType::Map(ef, is_sorted) => { + tid_map = Some(tid); + map_entry_field = Some(ef.clone()); + map_sorted = *is_sorted; + } + _ => {} + } + } + let (tid_enum, tid_rec_a, tid_array, tid_map) = ( + tid_enum.unwrap(), + tid_rec_a.unwrap(), + tid_array.unwrap(), + tid_map.unwrap(), + ); + let tids = vec![tid_enum, tid_rec_a, tid_array, tid_map]; + let offs = vec![0, 0, 0, 0]; + let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() { + DataType::Dictionary(_, _) => { + let keys = Int32Array::from(vec![0i32]); + let values = + Arc::new(StringArray::from(vec!["RED", "GREEN", "BLUE"])) as ArrayRef; + Some( + Arc::new(DictionaryArray::::try_new(keys, values).unwrap()) + as ArrayRef, + ) + } + DataType::Struct(fs) + if fs.len() == 2 && fs[0].name() == "a" && fs[1].name() == "b" => + { + let a = Int32Array::from(vec![7]); + let b = StringArray::from(vec!["rec"]); + Some(Arc::new(StructArray::new( + fs.clone(), + vec![Arc::new(a), Arc::new(b)], + None, + )) as ArrayRef) + } + DataType::List(field) => { + let values = Int64Array::from(vec![1i64, 2, 3]); + let offsets = OffsetBuffer::new(ScalarBuffer::::from(vec![0, 3])); + Some(Arc::new( + ListArray::try_new(field.clone(), offsets, Arc::new(values), None).unwrap(), + ) as ArrayRef) + } + DataType::Map(_, _) => { + let entry_field = map_entry_field.clone().unwrap(); + let (key_field, val_field) = match entry_field.data_type() { + DataType::Struct(fs) => (fs[0].clone(), fs[1].clone()), + _ => unreachable!(), + }; + let keys = StringArray::from(vec!["k"]); + let vals = StringArray::from(vec!["v"]); + let entries = StructArray::new( + Fields::from(vec![key_field.as_ref().clone(), val_field.as_ref().clone()]), + vec![Arc::new(keys) as ArrayRef, Arc::new(vals) as ArrayRef], + None, + ); + let offsets = OffsetBuffer::new(ScalarBuffer::::from(vec![0, 1])); + Some(Arc::new(MapArray::new( + entry_field.clone(), + offsets, + entries, + None, + map_sorted, + )) as ArrayRef) + } + _ => None, + }); + push_like( + schema.as_ref(), + "union_enum_record_array_map", + arr, + &mut fields, + &mut columns, + ); + } + { + let uf = match schema + .field_with_name("union_date_or_fixed4") + .unwrap() + .data_type() + { + DataType::Union(f, UnionMode::Dense) => f.clone(), + other => panic!("union_date_or_fixed4 should be union, got {other:?}"), + }; + let tid_date = tid_by_dt(&uf, |dt| matches!(dt, DataType::Date32)); + let tid_fx4 = tid_by_dt(&uf, |dt| matches!(dt, DataType::FixedSizeBinary(4))); + let tids = vec![tid_date, tid_fx4, tid_date, tid_fx4]; + let offs = vec![0, 0, 1, 1]; + let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() { + DataType::Date32 => Some(Arc::new(Date32Array::from(vec![date_a, 0])) as ArrayRef), + DataType::FixedSizeBinary(4) => { + let it = [Some(*b"\x00\x11\x22\x33"), Some(*b"ABCD")].into_iter(); + Some(Arc::new( + FixedSizeBinaryArray::try_from_sparse_iter_with_size(it, 4).unwrap(), + ) as ArrayRef) + } + _ => None, + }); + push_like( + schema.as_ref(), + "union_date_or_fixed4", + arr, + &mut fields, + &mut columns, + ); + } + { + let uf = match schema + .field_with_name("union_interval_or_string") + .unwrap() + .data_type() + { + DataType::Union(f, UnionMode::Dense) => f.clone(), + other => panic!("union_interval_or_string should be union, got {other:?}"), + }; + let tid_dur = tid_by_dt(&uf, |dt| { + matches!(dt, DataType::Interval(IntervalUnit::MonthDayNano)) + }); + let tid_str = tid_by_dt(&uf, |dt| matches!(dt, DataType::Utf8)); + let tids = vec![tid_dur, tid_str, tid_dur, tid_str]; + let offs = vec![0, 0, 1, 1]; + let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() { + DataType::Interval(IntervalUnit::MonthDayNano) => Some(Arc::new( + IntervalMonthDayNanoArray::from(vec![dur_small, dur_large]), + ) + as ArrayRef), + DataType::Utf8 => Some(Arc::new(StringArray::from(vec![ + "duration-as-text", + "iso-8601-period-P1Y", + ])) as ArrayRef), + _ => None, + }); + push_like( + schema.as_ref(), + "union_interval_or_string", + arr, + &mut fields, + &mut columns, + ); + } + { + let uf = match schema + .field_with_name("union_uuid_or_fixed10") + .unwrap() + .data_type() + { + DataType::Union(f, UnionMode::Dense) => f.clone(), + other => panic!("union_uuid_or_fixed10 should be union, got {other:?}"), + }; + let tid_uuid = tid_by_dt(&uf, |dt| matches!(dt, DataType::FixedSizeBinary(16))); + let tid_fx10 = tid_by_dt(&uf, |dt| matches!(dt, DataType::FixedSizeBinary(10))); + let tids = vec![tid_uuid, tid_fx10, tid_uuid, tid_fx10]; + let offs = vec![0, 0, 1, 1]; + let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() { + DataType::FixedSizeBinary(16) => { + let it = [Some(uuid1), Some(uuid2)].into_iter(); + Some(Arc::new( + FixedSizeBinaryArray::try_from_sparse_iter_with_size(it, 16).unwrap(), + ) as ArrayRef) + } + DataType::FixedSizeBinary(10) => { + let fx10_a = [0xAAu8; 10]; + let fx10_b = [0x00u8, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99]; + let it = [Some(fx10_a), Some(fx10_b)].into_iter(); + Some(Arc::new( + FixedSizeBinaryArray::try_from_sparse_iter_with_size(it, 10).unwrap(), + ) as ArrayRef) + } + _ => None, + }); + push_like( + schema.as_ref(), + "union_uuid_or_fixed10", + arr, + &mut fields, + &mut columns, + ); + } + { + let list_field = match schema + .field_with_name("array_records_with_union") + .unwrap() + .data_type() + { + DataType::List(f) => f.clone(), + other => panic!("array_records_with_union should be List, got {other:?}"), + }; + let kv_fields = match list_field.data_type() { + DataType::Struct(fs) => fs.clone(), + other => panic!("array_records_with_union items must be Struct, got {other:?}"), + }; + let val_field = kv_fields + .iter() + .find(|f| f.name() == "val") + .unwrap() + .clone(); + let uf = match val_field.data_type() { + DataType::Union(f, UnionMode::Dense) => f.clone(), + other => panic!("KV.val should be union, got {other:?}"), + }; + let keys = Arc::new(StringArray::from(vec!["k1", "k2", "k", "k3", "x"])) as ArrayRef; + let tid_null = tid_by_name(&uf, "null"); + let tid_i = tid_by_name(&uf, "int"); + let tid_l = tid_by_name(&uf, "long"); + let type_ids = vec![tid_i, tid_null, tid_l, tid_null, tid_i]; + let offsets = vec![0, 0, 0, 1, 1]; + let vals = mk_dense_union(&uf, type_ids, offsets, |f| match f.data_type() { + DataType::Int32 => Some(Arc::new(Int32Array::from(vec![5, -5])) as ArrayRef), + DataType::Int64 => Some(Arc::new(Int64Array::from(vec![99i64])) as ArrayRef), + DataType::Null => Some(Arc::new(NullArray::new(2)) as ArrayRef), + _ => None, + }); + let values_struct = + Arc::new(StructArray::new(kv_fields.clone(), vec![keys, vals], None)) as ArrayRef; + let list_offsets = OffsetBuffer::new(ScalarBuffer::::from(vec![0, 2, 3, 4, 5])); + let arr = Arc::new( + ListArray::try_new(list_field, list_offsets, values_struct, None).unwrap(), + ) as ArrayRef; + push_like( + schema.as_ref(), + "array_records_with_union", + arr, + &mut fields, + &mut columns, + ); + } + { + let uf = match schema + .field_with_name("union_map_or_array_int") + .unwrap() + .data_type() + { + DataType::Union(f, UnionMode::Dense) => f.clone(), + other => panic!("union_map_or_array_int should be union, got {other:?}"), + }; + let tid_map = tid_by_dt(&uf, |dt| matches!(dt, DataType::Map(_, _))); + let tid_list = tid_by_dt(&uf, |dt| matches!(dt, DataType::List(_))); + let map_child: ArrayRef = { + let (entry_field, is_sorted) = match uf + .iter() + .find(|(tid, _)| *tid == tid_map) + .unwrap() + .1 + .data_type() + { + DataType::Map(ef, is_sorted) => (ef.clone(), *is_sorted), + _ => unreachable!(), + }; + let (key_field, val_field) = match entry_field.data_type() { + DataType::Struct(fs) => (fs[0].clone(), fs[1].clone()), + _ => unreachable!(), + }; + let keys = StringArray::from(vec!["x", "y", "only"]); + let vals = Int32Array::from(vec![1, 2, 10]); + let entries = StructArray::new( + Fields::from(vec![key_field.as_ref().clone(), val_field.as_ref().clone()]), + vec![Arc::new(keys) as ArrayRef, Arc::new(vals) as ArrayRef], + None, + ); + let moff = OffsetBuffer::new(ScalarBuffer::::from(vec![0, 2, 3])); + Arc::new(MapArray::new(entry_field, moff, entries, None, is_sorted)) as ArrayRef + }; + let list_child: ArrayRef = { + let list_field = match uf + .iter() + .find(|(tid, _)| *tid == tid_list) + .unwrap() + .1 + .data_type() + { + DataType::List(f) => f.clone(), + _ => unreachable!(), + }; + let values = Int32Array::from(vec![1, 2, 3, 0]); + let offsets = OffsetBuffer::new(ScalarBuffer::::from(vec![0, 3, 4])); + Arc::new(ListArray::try_new(list_field, offsets, Arc::new(values), None).unwrap()) + as ArrayRef + }; + let tids = vec![tid_map, tid_list, tid_map, tid_list]; + let offs = vec![0, 0, 1, 1]; + let arr = mk_dense_union(&uf, tids, offs, |f| match f.data_type() { + DataType::Map(_, _) => Some(map_child.clone()), + DataType::List(_) => Some(list_child.clone()), + _ => None, + }); + push_like( + schema.as_ref(), + "union_map_or_array_int", + arr, + &mut fields, + &mut columns, + ); + } + push_like( + schema.as_ref(), + "renamed_with_default", + Arc::new(Int32Array::from(vec![100, 42, 7, 42])) as ArrayRef, + &mut fields, + &mut columns, + ); + { + let fs = match schema.field_with_name("person").unwrap().data_type() { + DataType::Struct(fs) => fs.clone(), + other => panic!("person should be Struct, got {other:?}"), + }; + let name = + Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol", "Dave"])) as ArrayRef; + let age = Arc::new(Int32Array::from(vec![30, 0, 25, 41])) as ArrayRef; + let arr = Arc::new(StructArray::new(fs, vec![name, age], None)) as ArrayRef; + push_like(schema.as_ref(), "person", arr, &mut fields, &mut columns); + } + let expected = + RecordBatch::try_new(Arc::new(Schema::new(Fields::from(fields))), columns).unwrap(); + assert_eq!( + expected, batch, + "entire RecordBatch mismatch (schema, all columns, all rows)" + ); + } + + #[test] + fn comprehensive_e2e_resolution_test() { + use serde_json::Value; + use std::collections::HashMap; + + // Build a reader schema that stresses Avro schema‑resolution + // + // Changes relative to writer schema: + // * Rename fields using writer aliases: id -> identifier, renamed_with_default -> old_count + // * Promote numeric types: count_i32 (int) -> long, ratio_f32 (float) -> double + // * Reorder many union branches (reverse), incl. nested unions + // * Reorder array/map union item/value branches + // * Rename nested Address field: street -> street_name (uses alias in writer) + // * Change Person type name/namespace: com.example.Person (matches writer alias) + // * Reverse top‑level field order + // + // Reader‑side aliases are added wherever names change (per Avro spec). + fn make_comprehensive_reader_schema(path: &str) -> AvroSchema { + fn set_type_string(f: &mut Value, new_ty: &str) { + if let Some(ty) = f.get_mut("type") { + match ty { + Value::String(_) | Value::Object(_) => { + *ty = Value::String(new_ty.to_string()); + } + Value::Array(arr) => { + for b in arr.iter_mut() { + match b { + Value::String(s) if s != "null" => { + *b = Value::String(new_ty.to_string()); + break; + } + Value::Object(_) => { + *b = Value::String(new_ty.to_string()); + break; + } + _ => {} + } + } + } + _ => {} + } + } + } + fn reverse_union_array(f: &mut Value) { + if let Some(arr) = f.get_mut("type").and_then(|t| t.as_array_mut()) { + arr.reverse(); + } + } + fn reverse_items_union(f: &mut Value) { + if let Some(obj) = f.get_mut("type").and_then(|t| t.as_object_mut()) { + if let Some(items) = obj.get_mut("items").and_then(|v| v.as_array_mut()) { + items.reverse(); + } + } + } + fn reverse_map_values_union(f: &mut Value) { + if let Some(obj) = f.get_mut("type").and_then(|t| t.as_object_mut()) { + if let Some(values) = obj.get_mut("values").and_then(|v| v.as_array_mut()) { + values.reverse(); + } + } + } + fn reverse_nested_union_in_record(f: &mut Value, field_name: &str) { + if let Some(obj) = f.get_mut("type").and_then(|t| t.as_object_mut()) { + if let Some(fields) = obj.get_mut("fields").and_then(|v| v.as_array_mut()) { + for ff in fields.iter_mut() { + if ff.get("name").and_then(|n| n.as_str()) == Some(field_name) { + if let Some(ty) = ff.get_mut("type") { + if let Some(arr) = ty.as_array_mut() { + arr.reverse(); + } + } + } + } + } + } + } + fn rename_nested_field_with_alias(f: &mut Value, old: &str, new: &str) { + if let Some(obj) = f.get_mut("type").and_then(|t| t.as_object_mut()) { + if let Some(fields) = obj.get_mut("fields").and_then(|v| v.as_array_mut()) { + for ff in fields.iter_mut() { + if ff.get("name").and_then(|n| n.as_str()) == Some(old) { + ff["name"] = Value::String(new.to_string()); + ff["aliases"] = Value::Array(vec![Value::String(old.to_string())]); + } + } + } + } + } + let mut root = load_writer_schema_json(path); + assert_eq!(root["type"], "record", "writer schema must be a record"); + let fields = root + .get_mut("fields") + .and_then(|f| f.as_array_mut()) + .expect("record has fields"); + for f in fields.iter_mut() { + let Some(name) = f.get("name").and_then(|n| n.as_str()) else { + continue; + }; + match name { + // Field aliasing (reader‑side aliases added) + "id" => { + f["name"] = Value::String("identifier".into()); + f["aliases"] = Value::Array(vec![Value::String("id".into())]); + } + "renamed_with_default" => { + f["name"] = Value::String("old_count".into()); + f["aliases"] = + Value::Array(vec![Value::String("renamed_with_default".into())]); + } + // Promotions + "count_i32" => set_type_string(f, "long"), + "ratio_f32" => set_type_string(f, "double"), + // Union reorder (exercise resolution) + "opt_str_nullsecond" => reverse_union_array(f), + "union_enum_record_array_map" => reverse_union_array(f), + "union_date_or_fixed4" => reverse_union_array(f), + "union_interval_or_string" => reverse_union_array(f), + "union_uuid_or_fixed10" => reverse_union_array(f), + "union_map_or_array_int" => reverse_union_array(f), + "maybe_auth" => reverse_nested_union_in_record(f, "token"), + // Array/Map unions + "arr_union" => reverse_items_union(f), + "map_union" => reverse_map_values_union(f), + // Nested rename using reader‑side alias + "address" => rename_nested_field_with_alias(f, "street", "street_name"), + // Type‑name alias for nested record + "person" => { + if let Some(tobj) = f.get_mut("type").and_then(|t| t.as_object_mut()) { + tobj.insert("name".to_string(), Value::String("Person".into())); + tobj.insert( + "namespace".to_string(), + Value::String("com.example".into()), + ); + tobj.insert( + "aliases".into(), + Value::Array(vec![ + Value::String("PersonV2".into()), + Value::String("com.example.v2.PersonV2".into()), + ]), + ); + } + } + _ => {} + } + } + fields.reverse(); + AvroSchema::new(root.to_string()) + } + + let path = "test/data/comprehensive_e2e.avro"; + let reader_schema = make_comprehensive_reader_schema(path); + let batch = read_alltypes_with_reader_schema(path, reader_schema.clone()); + + const UUID_EXT_KEY: &str = "ARROW:extension:name"; + + let uuid_md_top: Option> = batch + .schema() + .field_with_name("uuid_str") + .ok() + .and_then(|f| { + let md = f.metadata(); + if md.get(UUID_EXT_KEY).is_some() { + Some(md.clone()) + } else { + None + } + }); + + let uuid_md_union: Option> = batch + .schema() + .field_with_name("union_uuid_or_fixed10") + .ok() + .and_then(|f| match f.data_type() { + DataType::Union(uf, _) => uf + .iter() + .find(|(_, child)| child.name() == "uuid") + .and_then(|(_, child)| { + let md = child.metadata(); + if md.get(UUID_EXT_KEY).is_some() { + Some(md.clone()) + } else { + None + } + }), + _ => None, + }); + + let add_uuid_ext_top = |f: Field| -> Field { + if let Some(md) = &uuid_md_top { + f.with_metadata(md.clone()) + } else { + f + } + }; + let add_uuid_ext_union = |f: Field| -> Field { + if let Some(md) = &uuid_md_union { + f.with_metadata(md.clone()) + } else { + f + } + }; + + #[inline] + fn uuid16_from_str(s: &str) -> [u8; 16] { + let mut out = [0u8; 16]; + let mut idx = 0usize; + let mut hi: Option = None; + for ch in s.chars() { + if ch == '-' { + continue; + } + let v = ch.to_digit(16).expect("invalid hex digit in UUID") as u8; + if let Some(h) = hi { + out[idx] = (h << 4) | v; + idx += 1; + hi = None; + } else { + hi = Some(v); + } + } + assert_eq!(idx, 16, "UUID must decode to 16 bytes"); + out + } + + fn mk_dense_union( + fields: &UnionFields, + type_ids: Vec, + offsets: Vec, + provide: impl Fn(&Field) -> Option, + ) -> ArrayRef { + fn empty_child_for(dt: &DataType) -> Arc { + match dt { + DataType::Null => Arc::new(NullArray::new(0)), + DataType::Boolean => Arc::new(BooleanArray::from(Vec::::new())), + DataType::Int32 => Arc::new(Int32Array::from(Vec::::new())), + DataType::Int64 => Arc::new(Int64Array::from(Vec::::new())), + DataType::Float32 => Arc::new(Float32Array::from(Vec::::new())), + DataType::Float64 => Arc::new(Float64Array::from(Vec::::new())), + DataType::Binary => Arc::new(BinaryArray::from(Vec::<&[u8]>::new())), + DataType::Utf8 => Arc::new(StringArray::from(Vec::<&str>::new())), + DataType::Date32 => Arc::new(Date32Array::from(Vec::::new())), + DataType::Time32(arrow_schema::TimeUnit::Millisecond) => { + Arc::new(Time32MillisecondArray::from(Vec::::new())) + } + DataType::Time64(arrow_schema::TimeUnit::Microsecond) => { + Arc::new(Time64MicrosecondArray::from(Vec::::new())) + } + DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, tz) => { + let a = TimestampMillisecondArray::from(Vec::::new()); + Arc::new(if let Some(tz) = tz { + a.with_timezone(tz.clone()) + } else { + a + }) + } + DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, tz) => { + let a = TimestampMicrosecondArray::from(Vec::::new()); + Arc::new(if let Some(tz) = tz { + a.with_timezone(tz.clone()) + } else { + a + }) + } + DataType::Interval(IntervalUnit::MonthDayNano) => Arc::new( + IntervalMonthDayNanoArray::from(Vec::::new()), + ), + DataType::FixedSizeBinary(sz) => Arc::new( + FixedSizeBinaryArray::try_from_sparse_iter_with_size( + std::iter::empty::>>(), + *sz, + ) + .unwrap(), + ), + DataType::Dictionary(_, _) => { + let keys = Int32Array::from(Vec::::new()); + let values = Arc::new(StringArray::from(Vec::<&str>::new())); + Arc::new(DictionaryArray::::try_new(keys, values).unwrap()) + } + DataType::Struct(fields) => { + let children: Vec = fields + .iter() + .map(|f| empty_child_for(f.data_type()) as ArrayRef) + .collect(); + Arc::new(StructArray::new(fields.clone(), children, None)) + } + DataType::List(field) => { + let offsets = OffsetBuffer::new(ScalarBuffer::::from(vec![0])); + Arc::new( + ListArray::try_new( + field.clone(), + offsets, + empty_child_for(field.data_type()), + None, + ) + .unwrap(), + ) + } + DataType::Map(entry_field, is_sorted) => { + let (key_field, val_field) = match entry_field.data_type() { + DataType::Struct(fs) => (fs[0].clone(), fs[1].clone()), + other => panic!("unexpected map entries type: {other:?}"), + }; + let keys = StringArray::from(Vec::<&str>::new()); + let vals: ArrayRef = match val_field.data_type() { + DataType::Null => Arc::new(NullArray::new(0)) as ArrayRef, + DataType::Boolean => { + Arc::new(BooleanArray::from(Vec::::new())) as ArrayRef + } + DataType::Int32 => { + Arc::new(Int32Array::from(Vec::::new())) as ArrayRef + } + DataType::Int64 => { + Arc::new(Int64Array::from(Vec::::new())) as ArrayRef + } + DataType::Float32 => { + Arc::new(Float32Array::from(Vec::::new())) as ArrayRef + } + DataType::Float64 => { + Arc::new(Float64Array::from(Vec::::new())) as ArrayRef + } + DataType::Utf8 => { + Arc::new(StringArray::from(Vec::<&str>::new())) as ArrayRef + } + DataType::Binary => { + Arc::new(BinaryArray::from(Vec::<&[u8]>::new())) as ArrayRef + } + DataType::Union(uf, _) => { + let children: Vec = uf + .iter() + .map(|(_, f)| empty_child_for(f.data_type())) + .collect(); + Arc::new( + UnionArray::try_new( + uf.clone(), + ScalarBuffer::::from(Vec::::new()), + Some(ScalarBuffer::::from(Vec::::new())), + children, + ) + .unwrap(), + ) as ArrayRef + } + other => panic!("unsupported map value type: {other:?}"), + }; + let entries = StructArray::new( + Fields::from(vec![ + key_field.as_ref().clone(), + val_field.as_ref().clone(), + ]), + vec![Arc::new(keys) as ArrayRef, vals], + None, + ); + let offsets = OffsetBuffer::new(ScalarBuffer::::from(vec![0])); + Arc::new(MapArray::new( + entry_field.clone(), + offsets, + entries, + None, + *is_sorted, + )) + } + other => panic!("empty_child_for: unhandled type {other:?}"), + } + } + let children: Vec = fields + .iter() + .map(|(_, f)| provide(f).unwrap_or_else(|| empty_child_for(f.data_type()))) + .collect(); + Arc::new( + UnionArray::try_new( + fields.clone(), + ScalarBuffer::::from(type_ids), + Some(ScalarBuffer::::from(offsets)), + children, + ) + .unwrap(), + ) as ArrayRef + } + let date_a: i32 = 19_000; // 2022-01-08 + let time_ms_a: i32 = 12 * 3_600_000 + 34 * 60_000 + 56_000 + 789; + let time_us_eod: i64 = 86_400_000_000 - 1; + let ts_ms_2024_01_01: i64 = 1_704_067_200_000; // 2024-01-01T00:00:00Z + let ts_us_2024_01_01: i64 = ts_ms_2024_01_01 * 1_000; + let dur_small = IntervalMonthDayNanoType::make_value(1, 2, 3_000_000_000); + let dur_zero = IntervalMonthDayNanoType::make_value(0, 0, 0); + let dur_large = + IntervalMonthDayNanoType::make_value(12, 31, ((86_400_000 - 1) as i64) * 1_000_000); + let dur_2years = IntervalMonthDayNanoType::make_value(24, 0, 0); + let uuid1 = uuid16_from_str("fe7bc30b-4ce8-4c5e-b67c-2234a2d38e66"); + let uuid2 = uuid16_from_str("0826cc06-d2e3-4599-b4ad-af5fa6905cdb"); + let item_name = Field::LIST_FIELD_DEFAULT_NAME; + let uf_tri = UnionFields::new( + vec![0, 1, 2], + vec![ + Field::new("int", DataType::Int32, false), + Field::new("string", DataType::Utf8, false), + Field::new("boolean", DataType::Boolean, false), + ], + ); + let uf_arr_items = UnionFields::new( + vec![0, 1, 2], + vec![ + Field::new("null", DataType::Null, false), + Field::new("string", DataType::Utf8, false), + Field::new("long", DataType::Int64, false), + ], + ); + let arr_items_field = Arc::new(Field::new( + item_name, + DataType::Union(uf_arr_items.clone(), UnionMode::Dense), + true, + )); + let uf_map_vals = UnionFields::new( + vec![0, 1, 2], + vec![ + Field::new("string", DataType::Utf8, false), + Field::new("double", DataType::Float64, false), + Field::new("null", DataType::Null, false), + ], + ); + let map_entries_field = Arc::new(Field::new( + "entries", + DataType::Struct(Fields::from(vec![ + Field::new("key", DataType::Utf8, false), + Field::new( + "value", + DataType::Union(uf_map_vals.clone(), UnionMode::Dense), + true, + ), + ])), + false, + )); + let enum_md_color = { + let mut m = HashMap::::new(); + m.insert( + AVRO_ENUM_SYMBOLS_METADATA_KEY.to_string(), + serde_json::to_string(&vec!["RED", "GREEN", "BLUE"]).unwrap(), + ); + m + }; + let union_rec_a_fields = Fields::from(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, false), + ]); + let union_rec_b_fields = Fields::from(vec![ + Field::new("x", DataType::Int64, false), + Field::new("y", DataType::Binary, false), + ]); + let union_map_entries = Arc::new(Field::new( + "entries", + DataType::Struct(Fields::from(vec![ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Utf8, false), + ])), + false, + )); + let uf_union_big = UnionFields::new( + vec![0, 1, 2, 3, 4], + vec![ + Field::new( + "map", + DataType::Map(union_map_entries.clone(), false), + false, + ), + Field::new( + "array", + DataType::List(Arc::new(Field::new(item_name, DataType::Int64, false))), + false, + ), + Field::new( + "record", + DataType::Struct(union_rec_b_fields.clone()), + false, + ), + Field::new( + "record", + DataType::Struct(union_rec_a_fields.clone()), + false, + ), + Field::new( + "enum", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + false, + ) + .with_metadata(enum_md_color.clone()), + ], + ); + let uf_date_fixed4 = UnionFields::new( + vec![0, 1], + vec![ + Field::new("fixed", DataType::FixedSizeBinary(4), false), + Field::new("date", DataType::Date32, false), + ], + ); + let uf_dur_or_str = UnionFields::new( + vec![0, 1], + vec![ + Field::new("string", DataType::Utf8, false), + Field::new( + "duration", + DataType::Interval(arrow_schema::IntervalUnit::MonthDayNano), + false, + ), + ], + ); + let uf_uuid_or_fx10 = UnionFields::new( + vec![0, 1], + vec![ + Field::new("fixed", DataType::FixedSizeBinary(10), false), + add_uuid_ext_union(Field::new("uuid", DataType::FixedSizeBinary(16), false)), + ], + ); + let uf_kv_val = UnionFields::new( + vec![0, 1, 2], + vec![ + Field::new("null", DataType::Null, false), + Field::new("int", DataType::Int32, false), + Field::new("long", DataType::Int64, false), + ], + ); + let kv_fields = Fields::from(vec![ + Field::new("key", DataType::Utf8, false), + Field::new( + "val", + DataType::Union(uf_kv_val.clone(), UnionMode::Dense), + true, + ), + ]); + let kv_item_field = Arc::new(Field::new( + item_name, + DataType::Struct(kv_fields.clone()), + false, + )); + let map_int_entries = Arc::new(Field::new( + "entries", + DataType::Struct(Fields::from(vec![ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Int32, false), + ])), + false, + )); + let uf_map_or_array = UnionFields::new( + vec![0, 1], + vec![ + Field::new( + "array", + DataType::List(Arc::new(Field::new(item_name, DataType::Int32, false))), + false, + ), + Field::new("map", DataType::Map(map_int_entries.clone(), false), false), + ], + ); + // Metadata & decimal types used within the fields vector + let enum_md_status = { + let mut m = HashMap::::new(); + m.insert( + crate::schema::AVRO_ENUM_SYMBOLS_METADATA_KEY.to_string(), + serde_json::to_string(&vec!["UNKNOWN", "NEW", "PROCESSING", "DONE"]).unwrap(), + ); + m + }; + let mut dec20_md = HashMap::::new(); + dec20_md.insert("precision".to_string(), "20".to_string()); + dec20_md.insert("scale".to_string(), "4".to_string()); + let mut dec10_md = HashMap::::new(); + dec10_md.insert("precision".to_string(), "10".to_string()); + dec10_md.insert("scale".to_string(), "2".to_string()); + #[cfg(feature = "small_decimals")] + let dec20_dt = DataType::Decimal128(20, 4); + #[cfg(not(feature = "small_decimals"))] + let dec20_dt = DataType::Decimal128(20, 4); + #[cfg(feature = "small_decimals")] + let dec10_dt = DataType::Decimal64(10, 2); + #[cfg(not(feature = "small_decimals"))] + let dec10_dt = DataType::Decimal128(10, 2); + let fields: Vec = vec![ + Arc::new(Field::new( + "person", + DataType::Struct(Fields::from(vec![ + Field::new("name", DataType::Utf8, false), + Field::new("age", DataType::Int32, false), + ])), + false, + )), + Arc::new(Field::new("old_count", DataType::Int32, false)), + Arc::new(Field::new( + "union_map_or_array_int", + DataType::Union(uf_map_or_array.clone(), UnionMode::Dense), + false, + )), + Arc::new(Field::new( + "array_records_with_union", + DataType::List(kv_item_field.clone()), + false, + )), + Arc::new(Field::new( + "union_uuid_or_fixed10", + DataType::Union(uf_uuid_or_fx10.clone(), UnionMode::Dense), + false, + )), + Arc::new(Field::new( + "union_interval_or_string", + DataType::Union(uf_dur_or_str.clone(), UnionMode::Dense), + false, + )), + Arc::new(Field::new( + "union_date_or_fixed4", + DataType::Union(uf_date_fixed4.clone(), UnionMode::Dense), + false, + )), + Arc::new(Field::new( + "union_enum_record_array_map", + DataType::Union(uf_union_big.clone(), UnionMode::Dense), + false, + )), + Arc::new(Field::new( + "maybe_auth", + DataType::Struct(Fields::from(vec![ + Field::new("user", DataType::Utf8, false), + Field::new("token", DataType::Binary, true), // [bytes,null] -> nullable bytes + ])), + false, + )), + Arc::new(Field::new( + "address", + DataType::Struct(Fields::from(vec![ + Field::new("street_name", DataType::Utf8, false), + Field::new("zip", DataType::Int32, false), + Field::new("country", DataType::Utf8, false), + ])), + false, + )), + Arc::new(Field::new( + "map_union", + DataType::Map(map_entries_field.clone(), false), + false, + )), + Arc::new(Field::new( + "arr_union", + DataType::List(arr_items_field.clone()), + false, + )), + Arc::new( + Field::new( + "status", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + false, + ) + .with_metadata(enum_md_status.clone()), + ), + Arc::new(Field::new( + "interval_mdn", + DataType::Interval(IntervalUnit::MonthDayNano), + false, + )), + Arc::new(Field::new( + "ts_micros_local", + DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None), + false, + )), + Arc::new(Field::new( + "ts_millis_local", + DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None), + false, + )), + Arc::new(Field::new( + "ts_micros_utc", + DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, Some("+00:00".into())), + false, + )), + Arc::new(Field::new( + "ts_millis_utc", + DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, Some("+00:00".into())), + false, + )), + Arc::new(Field::new( + "t_micros", + DataType::Time64(arrow_schema::TimeUnit::Microsecond), + false, + )), + Arc::new(Field::new( + "t_millis", + DataType::Time32(arrow_schema::TimeUnit::Millisecond), + false, + )), + Arc::new(Field::new("d_date", DataType::Date32, false)), + Arc::new(add_uuid_ext_top(Field::new( + "uuid_str", + DataType::FixedSizeBinary(16), + false, + ))), + Arc::new(Field::new("dec_fix_s20_4", dec20_dt, false).with_metadata(dec20_md.clone())), + Arc::new( + Field::new("dec_bytes_s10_2", dec10_dt, false).with_metadata(dec10_md.clone()), + ), + Arc::new(Field::new( + "fx16_plain", + DataType::FixedSizeBinary(16), + false, + )), + Arc::new(Field::new("raw_bytes", DataType::Binary, false)), + Arc::new(Field::new("str_utf8", DataType::Utf8, false)), + Arc::new(Field::new( + "tri_union_prim", + DataType::Union(uf_tri.clone(), UnionMode::Dense), + false, + )), + Arc::new(Field::new("opt_str_nullsecond", DataType::Utf8, true)), + Arc::new(Field::new("opt_i32_nullfirst", DataType::Int32, true)), + Arc::new(Field::new("count_i64", DataType::Int64, false)), + Arc::new(Field::new("count_i32", DataType::Int64, false)), + Arc::new(Field::new("ratio_f64", DataType::Float64, false)), + Arc::new(Field::new("ratio_f32", DataType::Float64, false)), + Arc::new(Field::new("flag", DataType::Boolean, false)), + Arc::new(Field::new("identifier", DataType::Int64, false)), + ]; + let expected_schema = Arc::new(arrow_schema::Schema::new(Fields::from(fields))); + let mut cols: Vec = vec![ + Arc::new(StructArray::new( + match expected_schema + .field_with_name("person") + .unwrap() + .data_type() + { + DataType::Struct(fs) => fs.clone(), + _ => unreachable!(), + }, + vec![ + Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol", "Dave"])) as ArrayRef, + Arc::new(Int32Array::from(vec![30, 0, 25, 41])) as ArrayRef, + ], + None, + )) as ArrayRef, + Arc::new(Int32Array::from(vec![100, 42, 7, 42])) as ArrayRef, + ]; + { + let map_child: ArrayRef = { + let keys = StringArray::from(vec!["x", "y", "only"]); + let vals = Int32Array::from(vec![1, 2, 10]); + let entries = StructArray::new( + Fields::from(vec![ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Int32, false), + ]), + vec![Arc::new(keys) as ArrayRef, Arc::new(vals) as ArrayRef], + None, + ); + let moff = OffsetBuffer::new(ScalarBuffer::::from(vec![0, 2, 3])); + Arc::new(MapArray::new( + map_int_entries.clone(), + moff, + entries, + None, + false, + )) as ArrayRef + }; + let list_child: ArrayRef = { + let values = Int32Array::from(vec![1, 2, 3, 0]); + let offsets = OffsetBuffer::new(ScalarBuffer::::from(vec![0, 3, 4])); + Arc::new( + ListArray::try_new( + Arc::new(Field::new(item_name, DataType::Int32, false)), + offsets, + Arc::new(values), + None, + ) + .unwrap(), + ) as ArrayRef + }; + let tids = vec![1, 0, 1, 0]; + let offs = vec![0, 0, 1, 1]; + let arr = mk_dense_union(&uf_map_or_array, tids, offs, |f| match f.name().as_str() { + "array" => Some(list_child.clone()), + "map" => Some(map_child.clone()), + _ => None, + }); + cols.push(arr); + } + { + let keys = Arc::new(StringArray::from(vec!["k1", "k2", "k", "k3", "x"])) as ArrayRef; + let type_ids = vec![1, 0, 2, 0, 1]; + let offsets = vec![0, 0, 0, 1, 1]; + let vals = mk_dense_union(&uf_kv_val, type_ids, offsets, |f| match f.data_type() { + DataType::Int32 => Some(Arc::new(Int32Array::from(vec![5, -5])) as ArrayRef), + DataType::Int64 => Some(Arc::new(Int64Array::from(vec![99i64])) as ArrayRef), + DataType::Null => Some(Arc::new(NullArray::new(2)) as ArrayRef), + _ => None, + }); + let values_struct = + Arc::new(StructArray::new(kv_fields.clone(), vec![keys, vals], None)); + let list_offsets = OffsetBuffer::new(ScalarBuffer::::from(vec![0, 2, 3, 4, 5])); + let arr = Arc::new( + ListArray::try_new(kv_item_field.clone(), list_offsets, values_struct, None) + .unwrap(), + ) as ArrayRef; + cols.push(arr); + } + { + let type_ids = vec![1, 0, 1, 0]; // [uuid, fixed10, uuid, fixed10] but uf order = [fixed10, uuid] + let offs = vec![0, 0, 1, 1]; + let arr = mk_dense_union(&uf_uuid_or_fx10, type_ids, offs, |f| match f.data_type() { + DataType::FixedSizeBinary(16) => { + let it = [Some(uuid1), Some(uuid2)].into_iter(); + Some(Arc::new( + FixedSizeBinaryArray::try_from_sparse_iter_with_size(it, 16).unwrap(), + ) as ArrayRef) + } + DataType::FixedSizeBinary(10) => { + let fx10_a = [0xAAu8; 10]; + let fx10_b = [0x00u8, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99]; + let it = [Some(fx10_a), Some(fx10_b)].into_iter(); + Some(Arc::new( + FixedSizeBinaryArray::try_from_sparse_iter_with_size(it, 10).unwrap(), + ) as ArrayRef) + } + _ => None, + }); + cols.push(arr); + } + { + let type_ids = vec![1, 0, 1, 0]; // [duration, string, duration, string] but uf order = [string, duration] + let offs = vec![0, 0, 1, 1]; + let arr = mk_dense_union(&uf_dur_or_str, type_ids, offs, |f| match f.data_type() { + DataType::Interval(arrow_schema::IntervalUnit::MonthDayNano) => Some(Arc::new( + IntervalMonthDayNanoArray::from(vec![dur_small, dur_large]), + ) + as ArrayRef), + DataType::Utf8 => Some(Arc::new(StringArray::from(vec![ + "duration-as-text", + "iso-8601-period-P1Y", + ])) as ArrayRef), + _ => None, + }); + cols.push(arr); + } + { + let type_ids = vec![1, 0, 1, 0]; // [date, fixed, date, fixed] but uf order = [fixed, date] + let offs = vec![0, 0, 1, 1]; + let arr = mk_dense_union(&uf_date_fixed4, type_ids, offs, |f| match f.data_type() { + DataType::Date32 => Some(Arc::new(Date32Array::from(vec![date_a, 0])) as ArrayRef), + DataType::FixedSizeBinary(4) => { + let it = [Some(*b"\x00\x11\x22\x33"), Some(*b"ABCD")].into_iter(); + Some(Arc::new( + FixedSizeBinaryArray::try_from_sparse_iter_with_size(it, 4).unwrap(), + ) as ArrayRef) + } + _ => None, + }); + cols.push(arr); + } + { + let tids = vec![4, 3, 1, 0]; // uf order = [map(0), array(1), RecB(2), RecA(3), enum(4)] + let offs = vec![0, 0, 0, 0]; + let arr = mk_dense_union(&uf_union_big, tids, offs, |f| match f.data_type() { + DataType::Dictionary(_, _) => { + let keys = Int32Array::from(vec![0i32]); + let values = + Arc::new(StringArray::from(vec!["RED", "GREEN", "BLUE"])) as ArrayRef; + Some( + Arc::new(DictionaryArray::::try_new(keys, values).unwrap()) + as ArrayRef, + ) + } + DataType::Struct(fs) if fs == &union_rec_a_fields => { + let a = Int32Array::from(vec![7]); + let b = StringArray::from(vec!["rec"]); + Some(Arc::new(StructArray::new( + fs.clone(), + vec![Arc::new(a) as ArrayRef, Arc::new(b) as ArrayRef], + None, + )) as ArrayRef) + } + DataType::List(_) => { + let values = Int64Array::from(vec![1i64, 2, 3]); + let offsets = OffsetBuffer::new(ScalarBuffer::::from(vec![0, 3])); + Some(Arc::new( + ListArray::try_new( + Arc::new(Field::new(item_name, DataType::Int64, false)), + offsets, + Arc::new(values), + None, + ) + .unwrap(), + ) as ArrayRef) + } + DataType::Map(_, _) => { + let keys = StringArray::from(vec!["k"]); + let vals = StringArray::from(vec!["v"]); + let entries = StructArray::new( + Fields::from(vec![ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Utf8, false), + ]), + vec![Arc::new(keys) as ArrayRef, Arc::new(vals) as ArrayRef], + None, + ); + let moff = OffsetBuffer::new(ScalarBuffer::::from(vec![0, 1])); + Some(Arc::new(MapArray::new( + union_map_entries.clone(), + moff, + entries, + None, + false, + )) as ArrayRef) + } + _ => None, + }); + cols.push(arr); + } + { + let fs = match expected_schema + .field_with_name("maybe_auth") + .unwrap() + .data_type() + { + DataType::Struct(fs) => fs.clone(), + _ => unreachable!(), + }; + let user = + Arc::new(StringArray::from(vec!["alice", "bob", "carol", "dave"])) as ArrayRef; + let token_values: Vec> = vec![ + None, + Some(b"\x01\x02\x03".as_ref()), + None, + Some(b"".as_ref()), + ]; + let token = Arc::new(BinaryArray::from(token_values)) as ArrayRef; + cols.push(Arc::new(StructArray::new(fs, vec![user, token], None)) as ArrayRef); + } + { + let fs = match expected_schema + .field_with_name("address") + .unwrap() + .data_type() + { + DataType::Struct(fs) => fs.clone(), + _ => unreachable!(), + }; + let street = Arc::new(StringArray::from(vec![ + "100 Main", + "", + "42 Galaxy Way", + "End Ave", + ])) as ArrayRef; + let zip = Arc::new(Int32Array::from(vec![12345, 0, 42424, 1])) as ArrayRef; + let country = Arc::new(StringArray::from(vec!["US", "CA", "US", "GB"])) as ArrayRef; + cols.push(Arc::new(StructArray::new(fs, vec![street, zip, country], None)) as ArrayRef); + } + { + let keys = StringArray::from(vec!["a", "b", "c", "neg", "pi", "ok"]); + let moff = OffsetBuffer::new(ScalarBuffer::::from(vec![0, 3, 4, 4, 6])); + let tid_s = 0; // string + let tid_d = 1; // double + let tid_n = 2; // null + let type_ids = vec![tid_d, tid_n, tid_s, tid_d, tid_d, tid_s]; + let offsets = vec![0, 0, 0, 1, 2, 1]; + let pi_5dp = (std::f64::consts::PI * 100_000.0).trunc() / 100_000.0; + let vals = mk_dense_union(&uf_map_vals, type_ids, offsets, |f| match f.data_type() { + DataType::Float64 => { + Some(Arc::new(Float64Array::from(vec![1.5f64, -0.5, pi_5dp])) as ArrayRef) + } + DataType::Utf8 => { + Some(Arc::new(StringArray::from(vec!["yes", "true"])) as ArrayRef) + } + DataType::Null => Some(Arc::new(NullArray::new(1)) as ArrayRef), + _ => None, + }); + let entries = StructArray::new( + Fields::from(vec![ + Field::new("key", DataType::Utf8, false), + Field::new( + "value", + DataType::Union(uf_map_vals.clone(), UnionMode::Dense), + true, + ), + ]), + vec![Arc::new(keys) as ArrayRef, vals], + None, + ); + let map = Arc::new(MapArray::new( + map_entries_field.clone(), + moff, + entries, + None, + false, + )) as ArrayRef; + cols.push(map); + } + { + let type_ids = vec![ + 2, 1, 0, 2, 0, 1, 2, 2, 1, 0, + 2, // long,string,null,long,null,string,long,long,string,null,long + ]; + let offsets = vec![0, 0, 0, 1, 1, 1, 2, 3, 2, 2, 4]; + let values = + mk_dense_union(&uf_arr_items, type_ids, offsets, |f| match f.data_type() { + DataType::Int64 => { + Some(Arc::new(Int64Array::from(vec![1i64, -3, 0, -1, 0])) as ArrayRef) + } + DataType::Utf8 => { + Some(Arc::new(StringArray::from(vec!["x", "z", "end"])) as ArrayRef) + } + DataType::Null => Some(Arc::new(NullArray::new(3)) as ArrayRef), + _ => None, + }); + let list_offsets = OffsetBuffer::new(ScalarBuffer::::from(vec![0, 4, 7, 8, 11])); + let arr = Arc::new( + ListArray::try_new(arr_items_field.clone(), list_offsets, values, None).unwrap(), + ) as ArrayRef; + cols.push(arr); + } + { + let keys = Int32Array::from(vec![1, 2, 3, 0]); // NEW, PROCESSING, DONE, UNKNOWN + let values = Arc::new(StringArray::from(vec![ + "UNKNOWN", + "NEW", + "PROCESSING", + "DONE", + ])) as ArrayRef; + let dict = DictionaryArray::::try_new(keys, values).unwrap(); + cols.push(Arc::new(dict) as ArrayRef); + } + cols.push(Arc::new(IntervalMonthDayNanoArray::from(vec![ + dur_small, dur_zero, dur_large, dur_2years, + ])) as ArrayRef); + cols.push(Arc::new(TimestampMicrosecondArray::from(vec![ + ts_us_2024_01_01 + 123_456, + 0, + ts_us_2024_01_01 + 101_112, + 987_654_321, + ])) as ArrayRef); + cols.push(Arc::new(TimestampMillisecondArray::from(vec![ + ts_ms_2024_01_01 + 86_400_000, + 0, + ts_ms_2024_01_01 + 789, + 123_456_789, + ])) as ArrayRef); + { + let a = TimestampMicrosecondArray::from(vec![ + ts_us_2024_01_01, + 1, + ts_us_2024_01_01 + 456, + 0, + ]) + .with_timezone("+00:00"); + cols.push(Arc::new(a) as ArrayRef); + } + { + let a = TimestampMillisecondArray::from(vec![ + ts_ms_2024_01_01, + -1, + ts_ms_2024_01_01 + 123, + 0, + ]) + .with_timezone("+00:00"); + cols.push(Arc::new(a) as ArrayRef); + } + cols.push(Arc::new(Time64MicrosecondArray::from(vec![ + time_us_eod, + 0, + 1, + 1_000_000, + ])) as ArrayRef); + cols.push(Arc::new(Time32MillisecondArray::from(vec![ + time_ms_a, + 0, + 1, + 86_400_000 - 1, + ])) as ArrayRef); + cols.push(Arc::new(Date32Array::from(vec![date_a, 0, 1, 365])) as ArrayRef); + { + let it = [Some(uuid1), Some(uuid2), Some(uuid1), Some(uuid2)].into_iter(); + cols.push(Arc::new( + FixedSizeBinaryArray::try_from_sparse_iter_with_size(it, 16).unwrap(), + ) as ArrayRef); + } + { + #[cfg(feature = "small_decimals")] + let arr = Arc::new( + Decimal128Array::from_iter_values([1_234_567_891_234i128, -420_000i128, 0, -1i128]) + .with_precision_and_scale(20, 4) + .unwrap(), + ) as ArrayRef; + #[cfg(not(feature = "small_decimals"))] + let arr = Arc::new( + Decimal128Array::from_iter_values([1_234_567_891_234i128, -420_000i128, 0, -1i128]) + .with_precision_and_scale(20, 4) + .unwrap(), + ) as ArrayRef; + cols.push(arr); + } + { + #[cfg(feature = "small_decimals")] + let arr = Arc::new( + Decimal64Array::from_iter_values([123456i64, -1, 0, 9_999_999_999i64]) + .with_precision_and_scale(10, 2) + .unwrap(), + ) as ArrayRef; + #[cfg(not(feature = "small_decimals"))] + let arr = Arc::new( + Decimal128Array::from_iter_values([123456i128, -1, 0, 9_999_999_999i128]) + .with_precision_and_scale(10, 2) + .unwrap(), + ) as ArrayRef; + cols.push(arr); + } + { + let it = [ + Some(*b"0123456789ABCDEF"), + Some([0u8; 16]), + Some(*b"ABCDEFGHIJKLMNOP"), + Some([0xAA; 16]), + ] + .into_iter(); + cols.push(Arc::new( + FixedSizeBinaryArray::try_from_sparse_iter_with_size(it, 16).unwrap(), + ) as ArrayRef); + } + cols.push(Arc::new(BinaryArray::from(vec![ + b"\x00\x01".as_ref(), + b"".as_ref(), + b"\xFF\x00".as_ref(), + b"\x10\x20\x30\x40".as_ref(), + ])) as ArrayRef); + cols.push(Arc::new(StringArray::from(vec!["hello", "", "world", "✓ unicode"])) as ArrayRef); + { + let tids = vec![0, 1, 2, 1]; + let offs = vec![0, 0, 0, 1]; + let arr = mk_dense_union(&uf_tri, tids, offs, |f| match f.data_type() { + DataType::Int32 => Some(Arc::new(Int32Array::from(vec![0])) as ArrayRef), + DataType::Utf8 => Some(Arc::new(StringArray::from(vec!["hi", ""])) as ArrayRef), + DataType::Boolean => Some(Arc::new(BooleanArray::from(vec![true])) as ArrayRef), + _ => None, + }); + cols.push(arr); + } + cols.push(Arc::new(StringArray::from(vec![ + Some("alpha"), + None, + Some("s3"), + Some(""), + ])) as ArrayRef); + cols.push(Arc::new(Int32Array::from(vec![None, Some(42), None, Some(0)])) as ArrayRef); + cols.push(Arc::new(Int64Array::from(vec![ + 7_000_000_000i64, + -2, + 0, + -9_876_543_210i64, + ])) as ArrayRef); + cols.push(Arc::new(Int64Array::from(vec![7i64, -1, 0, 123])) as ArrayRef); + cols.push(Arc::new(Float64Array::from(vec![2.5f64, -1.0, 7.0, -2.25])) as ArrayRef); + cols.push(Arc::new(Float64Array::from(vec![1.25f64, -0.0, 3.5, 9.75])) as ArrayRef); + cols.push(Arc::new(BooleanArray::from(vec![true, false, true, false])) as ArrayRef); + cols.push(Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef); + let expected = RecordBatch::try_new(expected_schema, cols).unwrap(); + assert_eq!( + expected, batch, + "entire RecordBatch mismatch (schema, all columns, all rows)" + ); + } } diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index 726c109e219e..15b9530fa381 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -40,6 +40,16 @@ use uuid::Uuid; const DEFAULT_CAPACITY: usize = 1024; +/// Runtime plan for decoding reader-side `["null", T]` types. +#[derive(Clone, Copy, Debug)] +enum NullablePlan { + /// Writer actually wrote a union (branch tag present). + ReadTag, + /// Writer wrote a single (non-union) value resolved to the non-null branch + /// of the reader union; do NOT read a branch tag, but apply any promotion. + FromSingle { promotion: Promotion }, +} + /// Macro to decode a decimal payload for a given width and integer type. macro_rules! decode_decimal { ($size:expr, $buf:expr, $builder:expr, $N:expr, $Int:ty) => {{ @@ -267,7 +277,7 @@ enum Decoder { Decimal128(usize, Option, Option, Decimal128Builder), Decimal256(usize, Option, Option, Decimal256Builder), Union(UnionDecoder), - Nullable(Nullability, NullBufferBuilder, Box), + Nullable(Nullability, NullBufferBuilder, Box, NullablePlan), } impl Decoder { @@ -508,11 +518,23 @@ impl Decoder { } }; Ok(match data_type.nullability() { - Some(nullability) => Self::Nullable( - nullability, - NullBufferBuilder::new(DEFAULT_CAPACITY), - Box::new(decoder), - ), + Some(nullability) => { + // Default to reading a union branch tag unless the resolution proves otherwise. + let mut plan = NullablePlan::ReadTag; + if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() { + if !info.writer_is_union && info.reader_is_union { + if let Some(Some((_reader_idx, promo))) = info.writer_to_reader.first() { + plan = NullablePlan::FromSingle { promotion: *promo }; + } + } + } + Self::Nullable( + nullability, + NullBufferBuilder::new(DEFAULT_CAPACITY), + Box::new(decoder), + plan, + ) + } None => decoder, }) } @@ -571,7 +593,7 @@ impl Decoder { Self::Enum(indices, _, _) => indices.push(0), Self::Duration(builder) => builder.append_null(), Self::Union(u) => u.append_null()?, - Self::Nullable(_, null_buffer, inner) => { + Self::Nullable(_, null_buffer, inner, _) => { null_buffer.append(false); inner.append_null(); } @@ -582,7 +604,7 @@ impl Decoder { /// Append a single default literal into the decoder's buffers fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> { match self { - Self::Nullable(_, nb, inner) => { + Self::Nullable(_, nb, inner, _) => { if matches!(lit, AvroLiteral::Null) { nb.append(false); inner.append_null() @@ -939,19 +961,27 @@ impl Decoder { builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos)); } Self::Union(u) => u.decode(buf)?, - Self::Nullable(order, nb, encoding) => { - let branch = buf.read_vlq()?; - let is_not_null = match *order { - Nullability::NullFirst => branch != 0, - Nullability::NullSecond => branch == 0, - }; - if is_not_null { - // It is important to decode before appending to null buffer in case of decode error - encoding.decode(buf)?; - } else { - encoding.append_null(); + Self::Nullable(order, nb, encoding, plan) => { + match *plan { + NullablePlan::FromSingle { promotion } => { + encoding.decode_with_promotion(buf, promotion)?; + nb.append(true); + } + NullablePlan::ReadTag => { + let branch = buf.read_vlq()?; + let is_not_null = match *order { + Nullability::NullFirst => branch != 0, + Nullability::NullSecond => branch == 0, + }; + if is_not_null { + // It is important to decode before appending to null buffer in case of decode error + encoding.decode(buf)?; + } else { + encoding.append_null(); + } + nb.append(is_not_null); + } } - nb.append(is_not_null); } } Ok(()) @@ -1018,7 +1048,7 @@ impl Decoder { /// Flush decoded records to an [`ArrayRef`] fn flush(&mut self, nulls: Option) -> Result { Ok(match self { - Self::Nullable(_, n, e) => e.flush(n.finish())?, + Self::Nullable(_, n, e, _) => e.flush(n.finish())?, Self::Null(size) => Arc::new(NullArray::new(std::mem::replace(size, 0))), Self::Boolean(b) => Arc::new(BooleanArray::new(b.finish(), nulls)), Self::Int32(values) => Arc::new(flush_primitive::(values, nulls)), @@ -2742,6 +2772,7 @@ mod tests { Nullability::NullSecond, NullBufferBuilder::new(DEFAULT_CAPACITY), Box::new(inner), + NullablePlan::ReadTag, ); let mut data = Vec::new(); data.extend_from_slice(&encode_avro_int(0)); @@ -2784,6 +2815,7 @@ mod tests { Nullability::NullSecond, NullBufferBuilder::new(DEFAULT_CAPACITY), Box::new(inner), + NullablePlan::ReadTag, ); let row1 = [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, @@ -3663,6 +3695,7 @@ mod tests { Nullability::NullFirst, NullBufferBuilder::new(DEFAULT_CAPACITY), Box::new(inner), + NullablePlan::ReadTag, ); dec.append_default(&AvroLiteral::Null).unwrap(); dec.append_default(&AvroLiteral::Int(11)).unwrap(); @@ -3916,6 +3949,7 @@ mod tests { Nullability::NullSecond, NullBufferBuilder::new(DEFAULT_CAPACITY), Box::new(Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY))), + NullablePlan::ReadTag, ); let enc_b = Decoder::Nullable( Nullability::NullSecond, @@ -3924,6 +3958,7 @@ mod tests { OffsetBufferBuilder::new(DEFAULT_CAPACITY), Vec::with_capacity(DEFAULT_CAPACITY), )), + NullablePlan::ReadTag, ); encoders.push(enc_a); encoders.push(enc_b); diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs index 78bac9167b2b..4f48337b7891 100644 --- a/arrow-avro/src/schema.rs +++ b/arrow-avro/src/schema.rs @@ -23,6 +23,7 @@ use serde::{Deserialize, Serialize}; use serde_json::{json, Map as JsonMap, Value}; #[cfg(feature = "sha256")] use sha2::{Digest, Sha256}; +use std::borrow::Cow; use std::cmp::PartialEq; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; @@ -38,7 +39,7 @@ pub const CONFLUENT_MAGIC: [u8; 1] = [0x00]; /// SHA256 (32) + single-object magic (2) pub const MAX_PREFIX_LEN: usize = 34; -/// The metadata key used for storing the JSON encoded [`Schema`] +/// The metadata key used for storing the JSON encoded `Schema` pub const SCHEMA_METADATA_KEY: &str = "avro.schema"; /// Metadata key used to represent Avro enum symbols in an Arrow schema. @@ -59,21 +60,13 @@ pub const AVRO_DOC_METADATA_KEY: &str = "avro.doc"; /// Default name for the root record in an Avro schema. pub const AVRO_ROOT_RECORD_DEFAULT_NAME: &str = "topLevelRecord"; -/// Compare two Avro schemas for equality (identical schemas). -/// Returns true if the schemas have the same parsing canonical form (i.e., logically identical). -pub fn compare_schemas(writer: &Schema, reader: &Schema) -> Result { - let canon_writer = AvroSchema::generate_canonical_form(writer)?; - let canon_reader = AvroSchema::generate_canonical_form(reader)?; - Ok(canon_writer == canon_reader) -} - /// Avro types are not nullable, with nullability instead encoded as a union /// where one of the variants is the null type. /// /// To accommodate this, we specially case two-variant unions where one of the /// variants is the null type, and use this to derive arrow's notion of nullability #[derive(Debug, Copy, Clone, PartialEq, Default)] -pub enum Nullability { +pub(crate) enum Nullability { /// The nulls are encoded as the first union variant #[default] NullFirst, @@ -89,7 +82,7 @@ pub enum Nullability { /// A type name in an Avro schema /// /// This represents the different ways a type can be referenced in an Avro schema. -pub enum TypeName<'a> { +pub(crate) enum TypeName<'a> { /// A primitive type like null, boolean, int, etc. Primitive(PrimitiveType), /// A reference to another named type @@ -102,7 +95,7 @@ pub enum TypeName<'a> { #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, AsRefStr)] #[serde(rename_all = "camelCase")] #[strum(serialize_all = "lowercase")] -pub enum PrimitiveType { +pub(crate) enum PrimitiveType { /// null: no value Null, /// boolean: a binary value @@ -121,21 +114,21 @@ pub enum PrimitiveType { String, } -/// Additional attributes within a [`Schema`] +/// Additional attributes within a `Schema` /// /// #[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] -pub struct Attributes<'a> { +pub(crate) struct Attributes<'a> { /// A logical type name /// /// #[serde(default)] - pub logical_type: Option<&'a str>, + pub(crate) logical_type: Option<&'a str>, /// Additional JSON attributes #[serde(flatten)] - pub additional: HashMap<&'a str, Value>, + pub(crate) additional: HashMap<&'a str, Value>, } impl Attributes<'_> { @@ -151,13 +144,13 @@ impl Attributes<'_> { /// A type definition that is not a variant of [`ComplexType`] #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] -pub struct Type<'a> { +pub(crate) struct Type<'a> { /// The type of this Avro data structure #[serde(borrow)] - pub r#type: TypeName<'a>, + pub(crate) r#type: TypeName<'a>, /// Additional attributes associated with this type #[serde(flatten)] - pub attributes: Attributes<'a>, + pub(crate) attributes: Attributes<'a>, } /// An Avro schema @@ -166,7 +159,7 @@ pub struct Type<'a> { /// See for more details. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(untagged)] -pub enum Schema<'a> { +pub(crate) enum Schema<'a> { /// A direct type name (primitive or reference) #[serde(borrow)] TypeName(TypeName<'a>), @@ -186,7 +179,7 @@ pub enum Schema<'a> { /// #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "camelCase")] -pub enum ComplexType<'a> { +pub(crate) enum ComplexType<'a> { /// Record type: a sequence of fields with names and types #[serde(borrow)] Record(Record<'a>), @@ -208,70 +201,81 @@ pub enum ComplexType<'a> { /// /// #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct Record<'a> { +pub(crate) struct Record<'a> { /// Name of the record #[serde(borrow)] - pub name: &'a str, + pub(crate) name: &'a str, /// Optional namespace for the record, provides a way to organize names #[serde(borrow, default)] - pub namespace: Option<&'a str>, + pub(crate) namespace: Option<&'a str>, /// Optional documentation string for the record #[serde(borrow, default)] - pub doc: Option<&'a str>, + pub(crate) doc: Option>, /// Alternative names for this record #[serde(borrow, default)] - pub aliases: Vec<&'a str>, + pub(crate) aliases: Vec<&'a str>, /// The fields contained in this record #[serde(borrow)] - pub fields: Vec>, + pub(crate) fields: Vec>, /// Additional attributes for this record #[serde(flatten)] - pub attributes: Attributes<'a>, + pub(crate) attributes: Attributes<'a>, +} + +fn deserialize_default<'de, D>(deserializer: D) -> Result, D::Error> +where + D: serde::Deserializer<'de>, +{ + Value::deserialize(deserializer).map(Some) } /// A field within a [`Record`] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct Field<'a> { +pub(crate) struct Field<'a> { /// Name of the field within the record #[serde(borrow)] - pub name: &'a str, + pub(crate) name: &'a str, /// Optional documentation for this field #[serde(borrow, default)] - pub doc: Option<&'a str>, + pub(crate) doc: Option>, /// The field's type definition #[serde(borrow)] - pub r#type: Schema<'a>, + pub(crate) r#type: Schema<'a>, /// Optional default value for this field - #[serde(default)] - pub default: Option, + #[serde(deserialize_with = "deserialize_default", default)] + pub(crate) default: Option, + /// Alternative names (aliases) for this field (Avro spec: field-level aliases). + /// Borrowed from input JSON where possible. + #[serde(borrow, default)] + pub(crate) aliases: Vec<&'a str>, } /// An enumeration /// /// #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct Enum<'a> { +pub(crate) struct Enum<'a> { /// Name of the enum #[serde(borrow)] - pub name: &'a str, + pub(crate) name: &'a str, /// Optional namespace for the enum, provides organizational structure #[serde(borrow, default)] - pub namespace: Option<&'a str>, + pub(crate) namespace: Option<&'a str>, /// Optional documentation string describing the enum #[serde(borrow, default)] - pub doc: Option<&'a str>, + pub(crate) doc: Option>, /// Alternative names for this enum #[serde(borrow, default)] - pub aliases: Vec<&'a str>, + pub(crate) aliases: Vec<&'a str>, /// The symbols (values) that this enum can have #[serde(borrow)] - pub symbols: Vec<&'a str>, + pub(crate) symbols: Vec<&'a str>, /// Optional default value for this enum #[serde(borrow, default)] - pub default: Option<&'a str>, + pub(crate) default: Option<&'a str>, /// Additional attributes for this enum #[serde(flatten)] - pub attributes: Attributes<'a>, + pub(crate) attributes: Attributes<'a>, } /// An array @@ -281,44 +285,44 @@ pub struct Enum<'a> { pub struct Array<'a> { /// The schema for items in this array #[serde(borrow)] - pub items: Box>, + pub(crate) items: Box>, /// Additional attributes for this array #[serde(flatten)] - pub attributes: Attributes<'a>, + pub(crate) attributes: Attributes<'a>, } /// A map /// /// #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct Map<'a> { +pub(crate) struct Map<'a> { /// The schema for values in this map #[serde(borrow)] - pub values: Box>, + pub(crate) values: Box>, /// Additional attributes for this map #[serde(flatten)] - pub attributes: Attributes<'a>, + pub(crate) attributes: Attributes<'a>, } /// A fixed length binary array /// /// #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct Fixed<'a> { +pub(crate) struct Fixed<'a> { /// Name of the fixed type #[serde(borrow)] - pub name: &'a str, + pub(crate) name: &'a str, /// Optional namespace for the fixed type #[serde(borrow, default)] - pub namespace: Option<&'a str>, + pub(crate) namespace: Option<&'a str>, /// Alternative names for this fixed type #[serde(borrow, default)] - pub aliases: Vec<&'a str>, + pub(crate) aliases: Vec<&'a str>, /// The number of bytes in this fixed type - pub size: usize, + pub(crate) size: usize, /// Additional attributes for this fixed type #[serde(flatten)] - pub attributes: Attributes<'a>, + pub(crate) attributes: Attributes<'a>, } /// A wrapper for an Avro schema in its JSON string representation. @@ -406,18 +410,7 @@ impl AvroSchema { } } - /// Generates the 64-bit Rabin fingerprint for the given `Schema`. - /// - /// The fingerprint is computed from the canonical form of the schema. - /// This is also known as `CRC-64-AVRO`. - /// - /// # Returns - /// A `Fingerprint::Rabin` variant containing the 64-bit fingerprint. - pub fn generate_fingerprint_rabin(schema: &Schema) -> Result { - Self::generate_fingerprint(schema, FingerprintAlgorithm::Rabin) - } - - /// Generates the Parsed Canonical Form for the given [`Schema`]. + /// Generates the Parsed Canonical Form for the given `Schema`. /// /// The canonical form is a standardized JSON representation of the schema, /// primarily used for generating a schema fingerprint for equality checking. @@ -437,7 +430,7 @@ impl AvroSchema { /// [`SCHEMA_METADATA_KEY`], that JSON is returned verbatim to preserve /// the exact header encoding alignment; otherwise, a new JSON is generated /// honoring `null_union_order` at **all nullable sites**. - pub fn from_arrow_with_options( + pub(crate) fn from_arrow_with_options( schema: &ArrowSchema, null_order: Option, ) -> Result { @@ -477,7 +470,7 @@ impl AvroSchema { /// A stack-allocated, fixed-size buffer for the prefix. #[derive(Debug, Copy, Clone)] -pub struct Prefix { +pub(crate) struct Prefix { buf: [u8; MAX_PREFIX_LEN], len: u8, } @@ -674,7 +667,7 @@ impl Fingerprint { /// - You can optionally enable the `md5` feature to include the `MD5` variant. /// - You can optionally enable the `sha256` feature to include the `SHA256` variant. /// - pub fn make_prefix(&self) -> Prefix { + pub(crate) fn make_prefix(&self) -> Prefix { let mut buf = [0u8; MAX_PREFIX_LEN]; let len = match self { Self::Id(val) => write_prefix(&mut buf, &CONFLUENT_MAGIC, &val.to_be_bytes()), @@ -1560,7 +1553,7 @@ fn arrow_field_to_avro( #[cfg(test)] mod tests { use super::*; - use crate::codec::{AvroDataType, AvroField}; + use crate::codec::{AvroDataType, AvroField, AvroFieldBuilder}; use arrow_schema::{DataType, Fields, SchemaBuilder, TimeUnit, UnionFields}; use serde_json::json; use std::sync::Arc; @@ -1573,20 +1566,22 @@ mod tests { Schema::Complex(ComplexType::Record(Record { name: "record1", namespace: Some("test.namespace"), - doc: Some("A test record"), + doc: Some(Cow::from("A test record")), aliases: vec![], fields: vec![ Field { name: "field1", - doc: Some("An integer field"), + doc: Some(Cow::from("An integer field")), r#type: int_schema(), default: None, + aliases: vec![], }, Field { name: "field2", doc: None, r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)), default: None, + aliases: vec![], }, ], attributes: Attributes::default(), @@ -1709,6 +1704,7 @@ mod tests { Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)), ]), default: None, + aliases: vec![], },], attributes: Default::default(), })) @@ -1740,6 +1736,7 @@ mod tests { doc: None, r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)), default: None, + aliases: vec![], }, Field { name: "next", @@ -1749,6 +1746,7 @@ mod tests { Schema::TypeName(TypeName::Ref("LongList")), ]), default: None, + aliases: vec![], } ], attributes: Attributes::default(), @@ -1802,6 +1800,7 @@ mod tests { Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)), ]), default: None, + aliases: vec![], }, Field { name: "timestamp_col", @@ -1811,6 +1810,7 @@ mod tests { Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)), ]), default: None, + aliases: vec![], } ], attributes: Default::default(), @@ -1866,6 +1866,7 @@ mod tests { attributes: Default::default(), })), default: None, + aliases: vec![], }, Field { name: "clientProtocol", @@ -1875,12 +1876,14 @@ mod tests { Schema::TypeName(TypeName::Primitive(PrimitiveType::String)), ]), default: None, + aliases: vec![], }, Field { name: "serverHash", doc: None, r#type: Schema::TypeName(TypeName::Ref("MD5")), default: None, + aliases: vec![], }, Field { name: "meta", @@ -1895,6 +1898,7 @@ mod tests { })), ]), default: None, + aliases: vec![], } ], attributes: Default::default(), @@ -1902,6 +1906,98 @@ mod tests { ); } + #[test] + fn test_canonical_form_generation_comprehensive_record() { + // NOTE: This schema is identical to the one used in test_deserialize_comprehensive. + let json_str = r#"{ + "type": "record", + "name": "E2eComprehensive", + "namespace": "org.apache.arrow.avrotests.v1", + "doc": "Comprehensive Avro writer schema to exercise arrow-avro Reader/Decoder paths.", + "fields": [ + {"name": "id", "type": "long", "doc": "Primary row id", "aliases": ["identifier"]}, + {"name": "flag", "type": "boolean", "default": true, "doc": "A sample boolean with default true"}, + {"name": "ratio_f32", "type": "float", "default": 0.0, "doc": "Float32 example"}, + {"name": "ratio_f64", "type": "double", "default": 0.0, "doc": "Float64 example"}, + {"name": "count_i32", "type": "int", "default": 0, "doc": "Int32 example"}, + {"name": "count_i64", "type": "long", "default": 0, "doc": "Int64 example"}, + {"name": "opt_i32_nullfirst", "type": ["null", "int"], "default": null, "doc": "Nullable int (null-first)"}, + {"name": "opt_str_nullsecond", "type": ["string", "null"], "default": "", "aliases": ["old_opt_str"], "doc": "Nullable string (null-second). Default is empty string."}, + {"name": "tri_union_prim", "type": ["int", "string", "boolean"], "default": 0, "doc": "Union[int, string, boolean] with default on first branch (int=0)."}, + {"name": "str_utf8", "type": "string", "default": "default", "doc": "Plain Utf8 string (Reader may use Utf8View)."}, + {"name": "raw_bytes", "type": "bytes", "default": "", "doc": "Raw bytes field"}, + {"name": "fx16_plain", "type": {"type": "fixed", "name": "Fx16", "namespace": "org.apache.arrow.avrotests.v1.types", "aliases": ["Fixed16Old"], "size": 16}, "doc": "Plain fixed(16)"}, + {"name": "dec_bytes_s10_2", "type": {"type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2}, "doc": "Decimal encoded on bytes, precision 10, scale 2"}, + {"name": "dec_fix_s20_4", "type": {"type": "fixed", "name": "DecFix20", "namespace": "org.apache.arrow.avrotests.v1.types", "size": 20, "logicalType": "decimal", "precision": 20, "scale": 4}, "doc": "Decimal encoded on fixed(20), precision 20, scale 4"}, + {"name": "uuid_str", "type": {"type": "string", "logicalType": "uuid"}, "doc": "UUID logical type on string"}, + {"name": "d_date", "type": {"type": "int", "logicalType": "date"}, "doc": "Date32: days since 1970-01-01"}, + {"name": "t_millis", "type": {"type": "int", "logicalType": "time-millis"}, "doc": "Time32-millis"}, + {"name": "t_micros", "type": {"type": "long", "logicalType": "time-micros"}, "doc": "Time64-micros"}, + {"name": "ts_millis_utc", "type": {"type": "long", "logicalType": "timestamp-millis"}, "doc": "Timestamp ms (UTC)"}, + {"name": "ts_micros_utc", "type": {"type": "long", "logicalType": "timestamp-micros"}, "doc": "Timestamp µs (UTC)"}, + {"name": "ts_millis_local", "type": {"type": "long", "logicalType": "local-timestamp-millis"}, "doc": "Local timestamp ms"}, + {"name": "ts_micros_local", "type": {"type": "long", "logicalType": "local-timestamp-micros"}, "doc": "Local timestamp µs"}, + {"name": "interval_mdn", "type": {"type": "fixed", "name": "Dur12", "namespace": "org.apache.arrow.avrotests.v1.types", "size": 12, "logicalType": "duration"}, "doc": "Duration: fixed(12) little-endian (months, days, millis)"}, + {"name": "status", "type": {"type": "enum", "name": "Status", "namespace": "org.apache.arrow.avrotests.v1.types", "symbols": ["UNKNOWN", "NEW", "PROCESSING", "DONE"], "aliases": ["State"], "doc": "Processing status enum with default"}, "default": "UNKNOWN", "doc": "Enum field using default when resolving"}, + {"name": "arr_union", "type": {"type": "array", "items": ["long", "string", "null"]}, "default": [], "doc": "Array whose items are a union[long,string,null]"}, + {"name": "map_union", "type": {"type": "map", "values": ["null", "double", "string"]}, "default": {}, "doc": "Map whose values are a union[null,double,string]"}, + {"name": "address", "type": {"type": "record", "name": "Address", "namespace": "org.apache.arrow.avrotests.v1.types", "doc": "Postal address with defaults and field alias", "fields": [ + {"name": "street", "type": "string", "default": "", "aliases": ["street_name"], "doc": "Street (field alias = street_name)"}, + {"name": "zip", "type": "int", "default": 0, "doc": "ZIP/postal code"}, + {"name": "country", "type": "string", "default": "US", "doc": "Country code"} + ]}, "doc": "Embedded Address record"}, + {"name": "maybe_auth", "type": {"type": "record", "name": "MaybeAuth", "namespace": "org.apache.arrow.avrotests.v1.types", "doc": "Optional auth token model", "fields": [ + {"name": "user", "type": "string", "doc": "Username"}, + {"name": "token", "type": ["null", "bytes"], "default": null, "doc": "Nullable auth token"} + ]}}, + {"name": "union_enum_record_array_map", "type": [ + {"type": "enum", "name": "Color", "namespace": "org.apache.arrow.avrotests.v1.types", "symbols": ["RED", "GREEN", "BLUE"], "doc": "Color enum"}, + {"type": "record", "name": "RecA", "namespace": "org.apache.arrow.avrotests.v1.types", "fields": [{"name": "a", "type": "int"}, {"name": "b", "type": "string"}]}, + {"type": "record", "name": "RecB", "namespace": "org.apache.arrow.avrotests.v1.types", "fields": [{"name": "x", "type": "long"}, {"name": "y", "type": "bytes"}]}, + {"type": "array", "items": "long"}, + {"type": "map", "values": "string"} + ], "doc": "Union of enum, two records, array, and map"}, + {"name": "union_date_or_fixed4", "type": [ + {"type": "int", "logicalType": "date"}, + {"type": "fixed", "name": "Fx4", "size": 4} + ], "doc": "Union of date(int) or fixed(4)"}, + {"name": "union_interval_or_string", "type": [ + {"type": "fixed", "name": "Dur12U", "size": 12, "logicalType": "duration"}, + "string" + ], "doc": "Union of duration(fixed12) or string"}, + {"name": "union_uuid_or_fixed10", "type": [ + {"type": "string", "logicalType": "uuid"}, + {"type": "fixed", "name": "Fx10", "size": 10} + ], "doc": "Union of UUID string or fixed(10)"}, + {"name": "array_records_with_union", "type": {"type": "array", "items": { + "type": "record", "name": "KV", "namespace": "org.apache.arrow.avrotests.v1.types", + "fields": [ + {"name": "key", "type": "string"}, + {"name": "val", "type": ["null", "int", "long"], "default": null} + ] + }}, "doc": "Array", "default": []}, + {"name": "union_map_or_array_int", "type": [ + {"type": "map", "values": "int"}, + {"type": "array", "items": "int"} + ], "doc": "Union[map, array]"}, + {"name": "renamed_with_default", "type": "int", "default": 42, "aliases": ["old_count"], "doc": "Field with alias and default"}, + {"name": "person", "type": {"type": "record", "name": "PersonV2", "namespace": "com.example.v2", "aliases": ["com.example.Person"], "doc": "Person record with alias pointing to previous namespace/name", "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "int", "default": 0} + ]}, "doc": "Record using type alias for schema evolution tests"} + ] + }"#; + let avro = AvroSchema::new(json_str.to_string()); + let parsed = avro.schema().expect("schema should deserialize"); + let expected_canonical_form = r#"{"name":"org.apache.arrow.avrotests.v1.E2eComprehensive","type":"record","fields":[{"name":"id","type":"long"},{"name":"flag","type":"boolean"},{"name":"ratio_f32","type":"float"},{"name":"ratio_f64","type":"double"},{"name":"count_i32","type":"int"},{"name":"count_i64","type":"long"},{"name":"opt_i32_nullfirst","type":["null","int"]},{"name":"opt_str_nullsecond","type":["string","null"]},{"name":"tri_union_prim","type":["int","string","boolean"]},{"name":"str_utf8","type":"string"},{"name":"raw_bytes","type":"bytes"},{"name":"fx16_plain","type":{"name":"org.apache.arrow.avrotests.v1.types.Fx16","type":"fixed","size":16}},{"name":"dec_bytes_s10_2","type":"bytes"},{"name":"dec_fix_s20_4","type":{"name":"org.apache.arrow.avrotests.v1.types.DecFix20","type":"fixed","size":20}},{"name":"uuid_str","type":"string"},{"name":"d_date","type":"int"},{"name":"t_millis","type":"int"},{"name":"t_micros","type":"long"},{"name":"ts_millis_utc","type":"long"},{"name":"ts_micros_utc","type":"long"},{"name":"ts_millis_local","type":"long"},{"name":"ts_micros_local","type":"long"},{"name":"interval_mdn","type":{"name":"org.apache.arrow.avrotests.v1.types.Dur12","type":"fixed","size":12}},{"name":"status","type":{"name":"org.apache.arrow.avrotests.v1.types.Status","type":"enum","symbols":["UNKNOWN","NEW","PROCESSING","DONE"]}},{"name":"arr_union","type":{"type":"array","items":["long","string","null"]}},{"name":"map_union","type":{"type":"map","values":["null","double","string"]}},{"name":"address","type":{"name":"org.apache.arrow.avrotests.v1.types.Address","type":"record","fields":[{"name":"street","type":"string"},{"name":"zip","type":"int"},{"name":"country","type":"string"}]}},{"name":"maybe_auth","type":{"name":"org.apache.arrow.avrotests.v1.types.MaybeAuth","type":"record","fields":[{"name":"user","type":"string"},{"name":"token","type":["null","bytes"]}]}},{"name":"union_enum_record_array_map","type":[{"name":"org.apache.arrow.avrotests.v1.types.Color","type":"enum","symbols":["RED","GREEN","BLUE"]},{"name":"org.apache.arrow.avrotests.v1.types.RecA","type":"record","fields":[{"name":"a","type":"int"},{"name":"b","type":"string"}]},{"name":"org.apache.arrow.avrotests.v1.types.RecB","type":"record","fields":[{"name":"x","type":"long"},{"name":"y","type":"bytes"}]},{"type":"array","items":"long"},{"type":"map","values":"string"}]},{"name":"union_date_or_fixed4","type":["int",{"name":"org.apache.arrow.avrotests.v1.Fx4","type":"fixed","size":4}]},{"name":"union_interval_or_string","type":[{"name":"org.apache.arrow.avrotests.v1.Dur12U","type":"fixed","size":12},"string"]},{"name":"union_uuid_or_fixed10","type":["string",{"name":"org.apache.arrow.avrotests.v1.Fx10","type":"fixed","size":10}]},{"name":"array_records_with_union","type":{"type":"array","items":{"name":"org.apache.arrow.avrotests.v1.types.KV","type":"record","fields":[{"name":"key","type":"string"},{"name":"val","type":["null","int","long"]}]}}},{"name":"union_map_or_array_int","type":[{"type":"map","values":"int"},{"type":"array","items":"int"}]},{"name":"renamed_with_default","type":"int"},{"name":"person","type":{"name":"com.example.v2.PersonV2","type":"record","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}}]}"#; + let canonical_form = + AvroSchema::generate_canonical_form(&parsed).expect("canonical form should be built"); + assert_eq!( + canonical_form, expected_canonical_form, + "Canonical form must match Avro spec PCF exactly" + ); + } + #[test] fn test_new_schema_store() { let store = SchemaStore::new(); @@ -2113,11 +2209,11 @@ mod tests { let schema_with_attrs = Schema::Complex(ComplexType::Record(Record { name: "record_with_attrs", namespace: None, - doc: Some("This doc should be stripped"), + doc: Some(Cow::from("This doc should be stripped")), aliases: vec!["alias1", "alias2"], fields: vec![Field { name: "f1", - doc: Some("field doc"), + doc: Some(Cow::from("field doc")), r#type: Schema::Type(Type { r#type: TypeName::Primitive(PrimitiveType::Bytes), attributes: Attributes { @@ -2126,6 +2222,7 @@ mod tests { }, }), default: None, + aliases: vec![], }], attributes: Attributes { logical_type: None, @@ -2521,4 +2618,165 @@ mod tests { let fx = json!({ "type": "fixed", "name": "Bytes16", "size": 16 }); assert_eq!(union_branch_signature(&fx).unwrap(), "N:fixed:Bytes16"); } + + #[test] + fn test_record_field_alias_resolution_without_default() { + let writer_json = r#"{ + "type":"record", + "name":"R", + "fields":[{"name":"old","type":"int"}] + }"#; + let reader_json = r#"{ + "type":"record", + "name":"R", + "fields":[{"name":"new","aliases":["old"],"type":"int"}] + }"#; + let writer: Schema = serde_json::from_str(writer_json).unwrap(); + let reader: Schema = serde_json::from_str(reader_json).unwrap(); + let resolved = AvroFieldBuilder::new(&writer) + .with_reader_schema(&reader) + .with_utf8view(false) + .with_strict_mode(false) + .build() + .unwrap(); + let expected = ArrowField::new( + "R", + DataType::Struct(Fields::from(vec![ArrowField::new( + "new", + DataType::Int32, + false, + )])), + false, + ); + assert_eq!(resolved.field(), expected); + } + + #[test] + fn test_record_field_alias_ambiguous_in_strict_mode_errors() { + let writer_json = r#"{ + "type":"record", + "name":"R", + "fields":[ + {"name":"a","type":"int","aliases":["old"]}, + {"name":"b","type":"int","aliases":["old"]} + ] + }"#; + let reader_json = r#"{ + "type":"record", + "name":"R", + "fields":[{"name":"target","type":"int","aliases":["old"]}] + }"#; + let writer: Schema = serde_json::from_str(writer_json).unwrap(); + let reader: Schema = serde_json::from_str(reader_json).unwrap(); + let err = AvroFieldBuilder::new(&writer) + .with_reader_schema(&reader) + .with_utf8view(false) + .with_strict_mode(true) + .build() + .unwrap_err() + .to_string(); + assert!( + err.contains("Ambiguous alias 'old'"), + "expected ambiguous-alias error, got: {err}" + ); + } + + #[test] + fn test_pragmatic_writer_field_alias_mapping_non_strict() { + let writer_json = r#"{ + "type":"record", + "name":"R", + "fields":[{"name":"before","type":"int","aliases":["now"]}] + }"#; + let reader_json = r#"{ + "type":"record", + "name":"R", + "fields":[{"name":"now","type":"int"}] + }"#; + let writer: Schema = serde_json::from_str(writer_json).unwrap(); + let reader: Schema = serde_json::from_str(reader_json).unwrap(); + let resolved = AvroFieldBuilder::new(&writer) + .with_reader_schema(&reader) + .with_utf8view(false) + .with_strict_mode(false) + .build() + .unwrap(); + let expected = ArrowField::new( + "R", + DataType::Struct(Fields::from(vec![ArrowField::new( + "now", + DataType::Int32, + false, + )])), + false, + ); + assert_eq!(resolved.field(), expected); + } + + #[test] + fn test_missing_reader_field_null_first_no_default_is_ok() { + let writer_json = r#"{ + "type":"record", + "name":"R", + "fields":[{"name":"a","type":"int"}] + }"#; + let reader_json = r#"{ + "type":"record", + "name":"R", + "fields":[ + {"name":"a","type":"int"}, + {"name":"b","type":["null","int"]} + ] + }"#; + let writer: Schema = serde_json::from_str(writer_json).unwrap(); + let reader: Schema = serde_json::from_str(reader_json).unwrap(); + let resolved = AvroFieldBuilder::new(&writer) + .with_reader_schema(&reader) + .with_utf8view(false) + .with_strict_mode(false) + .build() + .unwrap(); + let expected = ArrowField::new( + "R", + DataType::Struct(Fields::from(vec![ + ArrowField::new("a", DataType::Int32, false), + ArrowField::new("b", DataType::Int32, true).with_metadata(HashMap::from([( + AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(), + "null".to_string(), + )])), + ])), + false, + ); + assert_eq!(resolved.field(), expected); + } + + #[test] + fn test_missing_reader_field_null_second_without_default_errors() { + let writer_json = r#"{ + "type":"record", + "name":"R", + "fields":[{"name":"a","type":"int"}] + }"#; + let reader_json = r#"{ + "type":"record", + "name":"R", + "fields":[ + {"name":"a","type":"int"}, + {"name":"b","type":["int","null"]} + ] + }"#; + let writer: Schema = serde_json::from_str(writer_json).unwrap(); + let reader: Schema = serde_json::from_str(reader_json).unwrap(); + let err = AvroFieldBuilder::new(&writer) + .with_reader_schema(&reader) + .with_utf8view(false) + .with_strict_mode(false) + .build() + .unwrap_err() + .to_string(); + assert!( + err.contains("must have a default value"), + "expected missing-default error, got: {err}" + ); + } } diff --git a/arrow-avro/src/writer/encoder.rs b/arrow-avro/src/writer/encoder.rs index d63d39177b4b..f90a19f931bf 100644 --- a/arrow-avro/src/writer/encoder.rs +++ b/arrow-avro/src/writer/encoder.rs @@ -43,7 +43,7 @@ use uuid::Uuid; /// /// Spec: #[inline] -pub fn write_long(out: &mut W, value: i64) -> Result<(), ArrowError> { +pub(crate) fn write_long(out: &mut W, value: i64) -> Result<(), ArrowError> { let mut zz = ((value << 1) ^ (value >> 63)) as u64; // At most 10 bytes for 64-bit varint let mut buf = [0u8; 10]; @@ -532,7 +532,7 @@ struct FieldBinding { /// Builder for `RecordEncoder` write plan #[derive(Debug)] -pub struct RecordEncoderBuilder<'a> { +pub(crate) struct RecordEncoderBuilder<'a> { avro_root: &'a AvroField, arrow_schema: &'a ArrowSchema, fingerprint: Option, @@ -540,7 +540,7 @@ pub struct RecordEncoderBuilder<'a> { impl<'a> RecordEncoderBuilder<'a> { /// Create a new builder from the Avro root and Arrow schema. - pub fn new(avro_root: &'a AvroField, arrow_schema: &'a ArrowSchema) -> Self { + pub(crate) fn new(avro_root: &'a AvroField, arrow_schema: &'a ArrowSchema) -> Self { Self { avro_root, arrow_schema, diff --git a/arrow-avro/src/writer/mod.rs b/arrow-avro/src/writer/mod.rs index cbb171f89014..471423b6f7bc 100644 --- a/arrow-avro/src/writer/mod.rs +++ b/arrow-avro/src/writer/mod.rs @@ -364,7 +364,7 @@ impl Writer { } fn write_ocf_block(&mut self, batch: &RecordBatch, sync: &[u8; 16]) -> Result<(), ArrowError> { - let mut buf = Vec::::with_capacity(1024); + let mut buf = Vec::::with_capacity(self.capacity); self.encoder.encode(&mut buf, batch)?; let encoded = match self.compression { Some(codec) => codec.compress(&buf)?, @@ -892,7 +892,42 @@ mod tests { } #[test] + fn test_builder_propagates_capacity_to_writer() -> Result<(), ArrowError> { + let cap = 64 * 1024; + let buffer = Vec::::new(); + let mut writer = WriterBuilder::new(make_schema()) + .with_capacity(cap) + .build::<_, AvroOcfFormat>(buffer)?; + assert_eq!(writer.capacity, cap, "builder capacity not propagated"); + let batch = make_batch(); + writer.write(&batch)?; + writer.finish()?; + let out = writer.into_inner(); + assert_eq!(&out[..4], b"Obj\x01", "OCF magic missing/incorrect"); + Ok(()) + } + + #[test] + fn test_stream_writer_stores_capacity_direct_writes() -> Result<(), ArrowError> { + use arrow_array::{ArrayRef, Int32Array}; + use arrow_schema::{DataType, Field, Schema}; + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef], + )?; + let cap = 8192; + let mut writer = WriterBuilder::new(schema) + .with_capacity(cap) + .build::<_, AvroBinaryFormat>(Vec::new())?; + assert_eq!(writer.capacity, cap); + writer.write(&batch)?; + let _bytes = writer.into_inner(); + Ok(()) + } + #[cfg(feature = "avro_custom_types")] + #[test] fn test_roundtrip_duration_logical_types_ocf() -> Result<(), ArrowError> { let file_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) .join("test/data/duration_logical_types.avro") @@ -952,7 +987,6 @@ mod tests { arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip"); assert_eq!(round_trip, input); - Ok(()) } } diff --git a/arrow-avro/test/data/README.md b/arrow-avro/test/data/README.md index 32ae4f273c07..226e0700fb94 100644 --- a/arrow-avro/test/data/README.md +++ b/arrow-avro/test/data/README.md @@ -268,6 +268,91 @@ Options: **Source / Repro script:** `create_avro_union_file.py` (Gist): contains the full writer schema, record builders covering four rows, and the `fastavro.writer` call which emits `union_fields.avro`. +## Comprehensive E2E Coverage File + +**Purpose:** A single OCF that exercises **all decoder paths** used by `arrow-avro` with both **nested and non‑nested** shapes, including **dense unions** (null‑first, null‑second, multi‑branch), **aliases** (type and field), **default values**, **docs** and **namespaces**, and combinations thereof. It’s intended to validate the final `Reader` implementation and to stress schema‑resolution behavior in the tests under `arrow-avro/src/reader/mod.rs`. + +**File:** `comprehensive_e2e.avro` +**Top‑level record (writer schema):** `org.apache.arrow.avrotests.v1.E2eComprehensive` +**Record count:** four rows (each row selects different union branches and nested shapes) + +**Coverage summary (by Arrow / Avro mapping):** + +* Primitives: **boolean, int, long, float, double** +* Binary / Text: **bytes**, **string (UTF‑8)** +* Logical types: **date**, **time‑millis**, **time‑micros**, **timestamp‑millis (UTC)**, **timestamp‑micros (UTC)**, **local‑timestamp‑millis**, **local‑timestamp‑micros**, **uuid (string)**, **decimal** on **bytes** and **fixed**, **duration** on **fixed(12)** +* Named types: **fixed**, **enum**, **record** +* Collections: **array**, **map** +* Unions: **nullable unions**, **ambiguous scalar unions**, **unions of named types**, and **unions nested inside arrays/maps/records** +* Schema‑evolution hooks: **type aliases**, **field aliases**, **defaults** (including union defaults on the first branch), **docs**, and **namespaces** + +**Writer schema (overview of fields):** + +| Field | Type / details | +|-------------------------------|---------------------------------------------------------------------------------------------------------| +| `id` | `long` | +| `flag` | `boolean` (default `true`) | +| `ratio_f32` | `float` (default `0.0`) | +| `ratio_f64` | `double` (default `0.0`) | +| `count_i32` | `int` (default `0`) | +| `count_i64` | `long` (default `0`) | +| `opt_i32_nullfirst` | `["null","int"]` (default `null`) | +| `opt_str_nullsecond` | `["string","null"]` (default `""`, alias: `old_opt_str`) | +| `tri_union_prim` | `["int","string","boolean"]` (default `0`) | +| `str_utf8` | `string` (default `"default"`) | +| `raw_bytes` | `bytes` (default `""`) | +| `fx16_plain` | `fixed` `types.Fx16` (size 16, alias `Fixed16Old`) | +| `dec_bytes_s10_2` | `bytes` + `logicalType: decimal` (precision 10, scale 2) | +| `dec_fix_s20_4` | `fixed` `types.DecFix20` (size 20) + `logicalType: decimal` (precision 20, scale 4) | +| `uuid_str` | `string` + `logicalType: uuid` | +| `d_date` | `int` + `logicalType: date` | +| `t_millis` | `int` + `logicalType: time-millis` | +| `t_micros` | `long` + `logicalType: time-micros` | +| `ts_millis_utc` | `long` + `logicalType: timestamp-millis` | +| `ts_micros_utc` | `long` + `logicalType: timestamp-micros` | +| `ts_millis_local` | `long` + `logicalType: local-timestamp-millis` | +| `ts_micros_local` | `long` + `logicalType: local-timestamp-micros` | +| `interval_mdn` | `fixed` `types.Dur12` (size 12) + `logicalType: duration` | +| `status` | `enum` `types.Status` = {`UNKNOWN`,`NEW`,`PROCESSING`,`DONE`} (alias: `State`) | +| `arr_union` | `array<["long","string","null"]>` | +| `map_union` | `map<["null","double","string"]>` | +| `address` | `record` `types.Address` {`street` (alias: `street_name`), `zip:int`, `country:string`} | +| `maybe_auth` | `record` `types.MaybeAuth` {`user:string`, `token:["null","bytes"]` (default `null`)} | +| `union_enum_record_array_map` | `[types.Color enum, types.RecA record, types.RecB record, array, map]` | +| `union_date_or_fixed4` | `[int (logicalType=date), fixed Fx4 size 4]` | +| `union_interval_or_string` | `[fixed Dur12U size 12 (logicalType=duration), string]` | +| `union_uuid_or_fixed10` | `[string (logicalType=uuid), fixed Fx10 size 10]` | +| `array_records_with_union` | `array` | +| `union_map_or_array_int` | `[map, array]` | +| `renamed_with_default` | `int` (default `42`, alias: `old_count`) | +| `person` | `record` `com.example.v2.PersonV2` (alias: `com.example.Person`) `{ name:string, age:int (default 0) }` | + +**How this file was created** + +* Script: [`create_comprehensive_avro_file.py`](https://gist.github.com/jecsand838/26f9666da8de22651027d485bd83f4a3) + Uses **fastavro** to write `comprehensive_e2e.avro` with the schema above and four records that intentionally vary union branches and nested shapes. + +**Re‑generation** + +From the repository root: + +```bash +# 1) Ensure Python 3 is available, then install fastavro +python -m pip install --upgrade fastavro + +# 2) Run the generator (writes ./comprehensive_e2e.avro by default) +python create_comprehensive_avro_file.py + +# 3) Move or copy the file into this directory if needed +mv comprehensive_e2e.avro arrow-avro/test/data/ +``` + +**Notes / tips for tests** + +* For **unions of named types** (record/enum/fixed), the generator uses fastavro’s **tuple notation** to select the union branch and, where needed, supplies the **fully‑qualified name (FQN)** to avoid ambiguity when namespaces apply. +* The file contains many **defaults** and **aliases** (type and field) to exercise **schema resolution** code paths. +* As with all OCFs, a random **sync marker** is embedded in the file header; byte‑for‑byte output may vary across runs without affecting the schema or logical content. + ## Other Files This directory contains other small OCF files used by `arrow-avro` tests. Details on these will be added in diff --git a/arrow-avro/test/data/comprehensive_e2e.avro b/arrow-avro/test/data/comprehensive_e2e.avro new file mode 100644 index 000000000000..a3e55716c325 Binary files /dev/null and b/arrow-avro/test/data/comprehensive_e2e.avro differ