From 52912c06d5b1204ef9534a185df97ecc4d1fcba8 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 28 Nov 2024 19:41:26 +0800 Subject: [PATCH 1/7] add arrow_struct_to_iceberg_struct --- crates/iceberg/src/arrow/mod.rs | 3 +- crates/iceberg/src/arrow/value.rs | 934 ++++++++++++++++++++++++++++++ crates/iceberg/src/spec/values.rs | 10 + 3 files changed, 946 insertions(+), 1 deletion(-) create mode 100644 crates/iceberg/src/arrow/value.rs diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index 0f01324cb..0c885e65f 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -22,5 +22,6 @@ pub use schema::*; mod reader; pub(crate) mod record_batch_projector; pub(crate) mod record_batch_transformer; - +mod value; pub use reader::*; +pub use value::*; diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs new file mode 100644 index 000000000..95712018b --- /dev/null +++ b/crates/iceberg/src/arrow/value.rs @@ -0,0 +1,934 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::{ + Array, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float16Array, Float32Array, + Float64Array, Int16Array, Int32Array, Int64Array, LargeBinaryArray, LargeStringArray, + StringArray, StructArray, Time64MicrosecondArray, TimestampMicrosecondArray, + TimestampNanosecondArray, +}; +use arrow_schema::{DataType, TimeUnit}; +use itertools::Itertools; + +use crate::spec::{Literal, PrimitiveType, Struct, StructType, Type}; +use crate::{Error, ErrorKind, Result}; + +trait ToIcebergLiteralArray { + fn to_primitive_literal_array( + &self, + _arrow_type: &DataType, + _iceberg_type: &PrimitiveType, + ) -> Result>>; + fn to_struct_literal_array( + &self, + _arrow_type: &DataType, + _iceberg_type: &StructType, + ) -> Result>>; +} + +impl ToIcebergLiteralArray for BooleanArray { + fn to_primitive_literal_array( + &self, + _arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>> { + match iceberg_type { + PrimitiveType::Boolean => Ok(self.iter().map(|v| v.map(Literal::bool)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow boolean array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn to_struct_literal_array( + &self, + _arrow_type: &DataType, + _iceberg_type: &StructType, + ) -> Result>> { + unreachable!() + } +} + +impl ToIcebergLiteralArray for Int16Array { + fn to_primitive_literal_array( + &self, + _arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>> { + match iceberg_type { + PrimitiveType::Int => Ok(self.iter().map(|v| v.map(Literal::int)).collect()), + PrimitiveType::Long => Ok(self.iter().map(|v| v.map(Literal::long)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow int16 array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn to_struct_literal_array( + &self, + _arrow_type: &DataType, + _iceberg_type: &StructType, + ) -> Result>> { + unreachable!() + } +} + +impl ToIcebergLiteralArray for Int32Array { + fn to_primitive_literal_array( + &self, + _arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>> { + match iceberg_type { + PrimitiveType::Int => Ok(self.iter().map(|v| v.map(Literal::int)).collect()), + PrimitiveType::Long => Ok(self.iter().map(|v| v.map(Literal::long)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow int32 array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn to_struct_literal_array( + &self, + _arrow_type: &DataType, + _iceberg_type: &StructType, + ) -> Result>> { + unreachable!() + } +} + +impl ToIcebergLiteralArray for Int64Array { + fn to_primitive_literal_array( + &self, + _arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>> { + match iceberg_type { + PrimitiveType::Long => Ok(self.iter().map(|v| v.map(Literal::long)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow int64 array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn to_struct_literal_array( + &self, + _arrow_type: &DataType, + _iceberg_type: &StructType, + ) -> Result>> { + unreachable!() + } +} + +impl ToIcebergLiteralArray for Float16Array { + fn to_primitive_literal_array( + &self, + _arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>> { + match iceberg_type { + PrimitiveType::Float => Ok(self + .iter() + .map(|v| v.map(|v| Literal::float(v.to_f32()))) + .collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow float16 array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn to_struct_literal_array( + &self, + _arrow_type: &DataType, + _iceberg_type: &StructType, + ) -> Result>> { + unreachable!() + } +} + +impl ToIcebergLiteralArray for Float32Array { + fn to_primitive_literal_array( + &self, + _arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>> { + match iceberg_type { + PrimitiveType::Float => Ok(self.iter().map(|v| v.map(Literal::float)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow float32 array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn to_struct_literal_array( + &self, + _arrow_type: &DataType, + _iceberg_type: &StructType, + ) -> Result>> { + unreachable!() + } +} + +impl ToIcebergLiteralArray for Float64Array { + fn to_primitive_literal_array( + &self, + _arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>> { + match iceberg_type { + PrimitiveType::Double => Ok(self.iter().map(|v| v.map(Literal::double)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow float64 array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn to_struct_literal_array( + &self, + _arrow_type: &DataType, + _iceberg_type: &StructType, + ) -> Result>> { + unreachable!() + } +} + +impl ToIcebergLiteralArray for Decimal128Array { + fn to_primitive_literal_array( + &self, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>> { + let DataType::Decimal128(arrow_precision, arrow_scale) = arrow_type else { + unreachable!() + }; + match iceberg_type { + PrimitiveType::Decimal { precision, scale } => { + if *arrow_precision as u32 != *precision || *arrow_scale as u32 != *scale { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The precision or scale ({},{}) of arrow decimal128 array is not compatitable with iceberg decimal type ({},{})", + arrow_precision, arrow_scale, precision, scale + ), + )); + } + Ok(self.iter().map(|v| v.map(Literal::decimal)).collect()) + } + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow decimal128 array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn to_struct_literal_array( + &self, + _arrow_type: &DataType, + _iceberg_type: &StructType, + ) -> Result>> { + unreachable!() + } +} + +impl ToIcebergLiteralArray for Date32Array { + fn to_primitive_literal_array( + &self, + _arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>> { + match iceberg_type { + PrimitiveType::Date => Ok(self.iter().map(|v| v.map(Literal::date)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow date32 array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn to_struct_literal_array( + &self, + _arrow_type: &DataType, + _iceberg_type: &StructType, + ) -> Result>> { + unreachable!() + } +} + +impl ToIcebergLiteralArray for Time64MicrosecondArray { + fn to_primitive_literal_array( + &self, + _arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>> { + match iceberg_type { + PrimitiveType::Time => Ok(self + .iter() + .map(|v| v.map(Literal::time)) + .collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow time64 microsecond array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn to_struct_literal_array( + &self, + _arrow_type: &DataType, + _iceberg_type: &StructType, + ) -> Result>> { + unreachable!() + } +} + +impl ToIcebergLiteralArray for TimestampMicrosecondArray { + fn to_primitive_literal_array( + &self, + _arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>> { + match iceberg_type { + PrimitiveType::Timestamp => Ok(self + .iter() + .map(|v| v.map(Literal::timestamp)) + .collect()), + PrimitiveType::Timestamptz => Ok(self + .iter() + .map(|v| v.map(Literal::timestamptz)) + .collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow timestamp microsecond array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn to_struct_literal_array( + &self, + _arrow_type: &DataType, + _iceberg_type: &StructType, + ) -> Result>> { + unreachable!() + } +} + +impl ToIcebergLiteralArray for TimestampNanosecondArray { + fn to_primitive_literal_array( + &self, + _arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>> { + match iceberg_type { + PrimitiveType::TimestampNs => Ok(self + .iter() + .map(|v| v.map(Literal::timestamp_nano)) + .collect()), + PrimitiveType::TimestamptzNs => Ok(self + .iter() + .map(|v| v.map(Literal::timestamptz_nano)) + .collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow timestamp nanosecond array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn to_struct_literal_array( + &self, + _arrow_type: &DataType, + _iceberg_type: &StructType, + ) -> Result>> { + unreachable!() + } +} + +impl ToIcebergLiteralArray for StringArray { + fn to_primitive_literal_array( + &self, + _arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>> { + match iceberg_type { + PrimitiveType::String => Ok(self.iter().map(|v| v.map(Literal::string)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow string array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn to_struct_literal_array( + &self, + _arrow_type: &DataType, + _iceberg_type: &StructType, + ) -> Result>> { + unreachable!() + } +} + +impl ToIcebergLiteralArray for LargeStringArray { + fn to_primitive_literal_array( + &self, + _arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>> { + match iceberg_type { + PrimitiveType::String => Ok(self.iter().map(|v| v.map(Literal::string)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow large string array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn to_struct_literal_array( + &self, + _arrow_type: &DataType, + _iceberg_type: &StructType, + ) -> Result>> { + unreachable!() + } +} + +impl ToIcebergLiteralArray for BinaryArray { + fn to_primitive_literal_array( + &self, + _arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>> { + match iceberg_type { + PrimitiveType::Binary => Ok(self + .iter() + .map(|v| v.map(|v| Literal::binary(v.to_vec()))) + .collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow binary array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn to_struct_literal_array( + &self, + _arrow_type: &DataType, + _iceberg_type: &StructType, + ) -> Result>> { + unreachable!() + } +} + +impl ToIcebergLiteralArray for LargeBinaryArray { + fn to_primitive_literal_array( + &self, + _arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>> { + match iceberg_type { + PrimitiveType::Binary => Ok(self + .iter() + .map(|v| v.map(|v| Literal::binary(v.to_vec()))) + .collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow large binary array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn to_struct_literal_array( + &self, + _arrow_type: &DataType, + _iceberg_type: &StructType, + ) -> Result>> { + unreachable!() + } +} + +impl ToIcebergLiteralArray for StructArray { + fn to_primitive_literal_array( + &self, + _arrow_type: &DataType, + _iceberg_type: &PrimitiveType, + ) -> Result>> { + unreachable!() + } + + fn to_struct_literal_array( + &self, + arrow_type: &DataType, + iceberg_type: &StructType, + ) -> Result>> { + let DataType::Struct(arrow_struct_fields) = arrow_type else { + return Err(Error::new( + ErrorKind::DataInvalid, + "The type of arrow struct array is not a struct type", + )); + }; + + if self.columns().len() != iceberg_type.fields().len() + || arrow_struct_fields.len() != iceberg_type.fields().len() + { + return Err(Error::new( + ErrorKind::DataInvalid, + "The type of arrow struct array is not compatitable with iceberg struct type", + )); + } + + let mut columns = Vec::with_capacity(self.columns().len()); + + for ((array, arrow_type), iceberg_field) in self + .columns() + .iter() + .zip_eq(arrow_struct_fields.iter().map(|field| field.data_type())) + .zip_eq(iceberg_type.fields().iter()) + { + if array.is_nullable() == iceberg_field.required { + return Err(Error::new( + ErrorKind::DataInvalid, + "The nullable field of arrow struct array is not compatitable with iceberg type", + )); + } + match (arrow_type, iceberg_field.field_type.as_ref()) { + (DataType::Null, _) => { + if iceberg_field.required { + return Err(Error::new( + ErrorKind::DataInvalid, + "column in arrow array should not be optional", + )); + } + columns.push(vec![None; array.len()]); + } + (DataType::Boolean, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); + } + (DataType::Int16, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); + } + (DataType::Int32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); + } + (DataType::Int64, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); + } + (DataType::Float32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); + } + (DataType::Float64, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); + } + (DataType::Decimal128(_, _), Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); + } + (DataType::Date32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); + } + (DataType::Time64(TimeUnit::Microsecond), Type::Primitive(primitive_type)) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); + } + ( + DataType::Timestamp(TimeUnit::Microsecond, _), + Type::Primitive(primitive_type), + ) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); + } + (DataType::Timestamp(TimeUnit::Nanosecond, _), Type::Primitive(primitive_type)) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); + } + (DataType::Utf8, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); + } + (DataType::LargeUtf8, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); + } + (DataType::Binary, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); + } + (DataType::LargeBinary, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); + } + (DataType::Struct(_), Type::Struct(struct_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(array.to_struct_literal_array(arrow_type, struct_type)?); + } + (arrow_type, iceberg_field_type) => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + format!( + "Unsupported convert arrow type {} to iceberg type: {}", + arrow_type, iceberg_field_type + ), + )) + } + } + } + + let struct_literal_len = columns.first().map(|column| column.len()).unwrap_or(0); + let mut struct_literals = Vec::with_capacity(struct_literal_len); + let mut columns_iters = columns + .into_iter() + .map(|column| column.into_iter()) + .collect::>(); + + for row_idx in 0..struct_literal_len { + if self.is_null(row_idx) { + struct_literals.push(None); + continue; + } + let mut literals = Vec::with_capacity(columns_iters.len()); + for column_iter in columns_iters.iter_mut() { + literals.push(column_iter.next().unwrap()); + } + struct_literals.push(Some(Literal::Struct(Struct::from_iter(literals)))); + } + + Ok(struct_literals) + } +} + +/// Convert arrow struct array to iceberg struct value array. +pub fn arrow_struct_to_iceberg_struct( + struct_array: &StructArray, + ty: StructType, +) -> Result>> { + struct_array.to_struct_literal_array(struct_array.data_type(), &ty) +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_array::{ + ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, + Float64Array, Int16Array, Int32Array, Int64Array, StringArray, StructArray, + Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray, + }; + use arrow_schema::{DataType, Field, Fields, TimeUnit}; + + use super::*; + use crate::spec::{Literal, NestedField, PrimitiveType, StructType, Type}; + + #[test] + fn test_arrow_struct_to_iceberg_struct() { + let bool_array = BooleanArray::from(vec![Some(true), Some(false), None]); + let int16_array = Int16Array::from(vec![Some(1), Some(2), None]); + let int32_array = Int32Array::from(vec![Some(3), Some(4), None]); + let int64_array = Int64Array::from(vec![Some(5), Some(6), None]); + let float32_array = Float32Array::from(vec![Some(1.1), Some(2.2), None]); + let float64_array = Float64Array::from(vec![Some(3.3), Some(4.4), None]); + let decimal_array = Decimal128Array::from(vec![Some(1000), Some(2000), None]) + .with_precision_and_scale(10, 2) + .unwrap(); + let date_array = Date32Array::from(vec![Some(18628), Some(18629), None]); + let time_array = Time64MicrosecondArray::from(vec![Some(123456789), Some(987654321), None]); + let timestamp_micro_array = TimestampMicrosecondArray::from(vec![ + Some(1622548800000000), + Some(1622635200000000), + None, + ]); + let timestamp_nano_array = TimestampNanosecondArray::from(vec![ + Some(1622548800000000000), + Some(1622635200000000000), + None, + ]); + let string_array = StringArray::from(vec![Some("a"), Some("b"), None]); + let binary_array = + BinaryArray::from(vec![Some(b"abc".as_ref()), Some(b"def".as_ref()), None]); + + let struct_array = StructArray::from(vec![ + ( + Arc::new(Field::new("bool_field", DataType::Boolean, true)), + Arc::new(bool_array) as ArrayRef, + ), + ( + Arc::new(Field::new("int16_field", DataType::Int16, true)), + Arc::new(int16_array) as ArrayRef, + ), + ( + Arc::new(Field::new("int32_field", DataType::Int32, true)), + Arc::new(int32_array) as ArrayRef, + ), + ( + Arc::new(Field::new("int64_field", DataType::Int64, true)), + Arc::new(int64_array) as ArrayRef, + ), + ( + Arc::new(Field::new("float32_field", DataType::Float32, true)), + Arc::new(float32_array) as ArrayRef, + ), + ( + Arc::new(Field::new("float64_field", DataType::Float64, true)), + Arc::new(float64_array) as ArrayRef, + ), + ( + Arc::new(Field::new( + "decimal_field", + DataType::Decimal128(10, 2), + true, + )), + Arc::new(decimal_array) as ArrayRef, + ), + ( + Arc::new(Field::new("date_field", DataType::Date32, true)), + Arc::new(date_array) as ArrayRef, + ), + ( + Arc::new(Field::new( + "time_field", + DataType::Time64(TimeUnit::Microsecond), + true, + )), + Arc::new(time_array) as ArrayRef, + ), + ( + Arc::new(Field::new( + "timestamp_micro_field", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + )), + Arc::new(timestamp_micro_array) as ArrayRef, + ), + ( + Arc::new(Field::new( + "timestamp_nano_field", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + )), + Arc::new(timestamp_nano_array) as ArrayRef, + ), + ( + Arc::new(Field::new("string_field", DataType::Utf8, true)), + Arc::new(string_array) as ArrayRef, + ), + ( + Arc::new(Field::new("binary_field", DataType::Binary, true)), + Arc::new(binary_array) as ArrayRef, + ), + ]); + + let iceberg_struct_type = StructType::new(vec![ + Arc::new(NestedField::optional( + 0, + "bool_field", + Type::Primitive(PrimitiveType::Boolean), + )), + Arc::new(NestedField::optional( + 1, + "int16_field", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 2, + "int32_field", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 3, + "int64_field", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 4, + "float32_field", + Type::Primitive(PrimitiveType::Float), + )), + Arc::new(NestedField::optional( + 5, + "float64_field", + Type::Primitive(PrimitiveType::Double), + )), + Arc::new(NestedField::optional( + 6, + "decimal_field", + Type::Primitive(PrimitiveType::Decimal { + precision: 10, + scale: 2, + }), + )), + Arc::new(NestedField::optional( + 7, + "date_field", + Type::Primitive(PrimitiveType::Date), + )), + Arc::new(NestedField::optional( + 8, + "time_field", + Type::Primitive(PrimitiveType::Time), + )), + Arc::new(NestedField::optional( + 9, + "timestamp_micro_field", + Type::Primitive(PrimitiveType::Timestamp), + )), + Arc::new(NestedField::optional( + 10, + "timestamp_nao_field", + Type::Primitive(PrimitiveType::TimestampNs), + )), + Arc::new(NestedField::optional( + 11, + "string_field", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::optional( + 12, + "binary_field", + Type::Primitive(PrimitiveType::Binary), + )), + ]); + + let result = arrow_struct_to_iceberg_struct(&struct_array, iceberg_struct_type).unwrap(); + + assert_eq!(result, vec![ + Some(Literal::Struct(Struct::from_iter(vec![ + Some(Literal::bool(true)), + Some(Literal::int(1)), + Some(Literal::int(3)), + Some(Literal::long(5)), + Some(Literal::float(1.1)), + Some(Literal::double(3.3)), + Some(Literal::decimal(1000)), + Some(Literal::date(18628)), + Some(Literal::time(123456789)), + Some(Literal::timestamp(1622548800000000)), + Some(Literal::timestamp_nano(1622548800000000000)), + Some(Literal::string("a".to_string())), + Some(Literal::binary(b"abc".to_vec())), + ]))), + Some(Literal::Struct(Struct::from_iter(vec![ + Some(Literal::bool(false)), + Some(Literal::int(2)), + Some(Literal::int(4)), + Some(Literal::long(6)), + Some(Literal::float(2.2)), + Some(Literal::double(4.4)), + Some(Literal::decimal(2000)), + Some(Literal::date(18629)), + Some(Literal::time(987654321)), + Some(Literal::timestamp(1622635200000000)), + Some(Literal::timestamp_nano(1622635200000000000)), + Some(Literal::string("b".to_string())), + Some(Literal::binary(b"def".to_vec())), + ]))), + Some(Literal::Struct(Struct::from_iter(vec![ + None, None, None, None, None, None, None, None, None, None, None, None, None, + ]))), + ]); + } + + #[test] + fn test_single_column_nullable_struct() { + let struct_array = StructArray::new_null( + Fields::from(vec![Field::new("bool_field", DataType::Boolean, true)]), + 3, + ); + let iceberg_struct_type = StructType::new(vec![Arc::new(NestedField::optional( + 0, + "bool_field", + Type::Primitive(PrimitiveType::Boolean), + ))]); + let result = arrow_struct_to_iceberg_struct(&struct_array, iceberg_struct_type).unwrap(); + assert_eq!(result, vec![None; 3]); + } + + #[test] + fn test_empty_struct() { + let struct_array = StructArray::new_null(Fields::empty(), 3); + let iceberg_struct_type = StructType::new(vec![]); + let result = arrow_struct_to_iceberg_struct(&struct_array, iceberg_struct_type).unwrap(); + assert_eq!(result, vec![None; 0]); + } +} diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index 6fb070527..9b42152fc 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -1401,6 +1401,16 @@ impl Literal { Self::Primitive(PrimitiveLiteral::Long(value)) } + /// Creates a timestamp from unix epoch in nanoseconds. + pub fn timestamp_nano(value: i64) -> Self { + Self::Primitive(PrimitiveLiteral::Long(value)) + } + + /// Creates a timestamp with timezone from unix epoch in nanoseconds. + pub fn timestamptz_nano(value: i64) -> Self { + Self::Primitive(PrimitiveLiteral::Long(value)) + } + /// Creates a timestamp from [`DateTime`]. pub fn timestamp_from_datetime(dt: DateTime) -> Self { Self::timestamp(dt.with_timezone(&Utc).timestamp_micros()) From cc09bffe506c87292c659abcdf519731016d1290 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 4 Dec 2024 15:39:51 +0800 Subject: [PATCH 2/7] use visitor pattern --- crates/iceberg/src/arrow/value.rs | 714 ++++++++++++++---------------- 1 file changed, 327 insertions(+), 387 deletions(-) diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index 95712018b..004882608 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -16,10 +16,9 @@ // under the License. use arrow_array::{ - Array, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float16Array, Float32Array, - Float64Array, Int16Array, Int32Array, Int64Array, LargeBinaryArray, LargeStringArray, - StringArray, StructArray, Time64MicrosecondArray, TimestampMicrosecondArray, - TimestampNanosecondArray, + Array, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, + Int16Array, Int32Array, Int64Array, LargeBinaryArray, LargeStringArray, NullArray, StringArray, + StructArray, Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray, }; use arrow_schema::{DataType, TimeUnit}; use itertools::Itertools; @@ -27,194 +26,362 @@ use itertools::Itertools; use crate::spec::{Literal, PrimitiveType, Struct, StructType, Type}; use crate::{Error, ErrorKind, Result}; -trait ToIcebergLiteralArray { - fn to_primitive_literal_array( +trait ArrowArrayVistor { + type T; + fn null( &self, - _arrow_type: &DataType, - _iceberg_type: &PrimitiveType, - ) -> Result>>; - fn to_struct_literal_array( + array: &NullArray, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn boolean( &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>>; -} - -impl ToIcebergLiteralArray for BooleanArray { - fn to_primitive_literal_array( + array: &BooleanArray, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn int16( &self, - _arrow_type: &DataType, + array: &Int16Array, + arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { - match iceberg_type { - PrimitiveType::Boolean => Ok(self.iter().map(|v| v.map(Literal::bool)).collect()), - _ => Err(Error::new( + ) -> Result>; + fn int32( + &self, + array: &Int32Array, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn int64( + &self, + array: &Int64Array, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn float( + &self, + array: &Float32Array, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn double( + &self, + array: &Float64Array, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn decimal( + &self, + array: &Decimal128Array, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn date( + &self, + array: &Date32Array, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn time( + &self, + array: &Time64MicrosecondArray, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn timestamp( + &self, + array: &TimestampMicrosecondArray, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn timestamp_nano( + &self, + array: &TimestampNanosecondArray, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn string( + &self, + array: &StringArray, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn large_string( + &self, + array: &LargeStringArray, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn binary( + &self, + array: &BinaryArray, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn large_binary( + &self, + array: &LargeBinaryArray, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn combine_struct( + &self, + array: &StructArray, + columns: Vec>, + ) -> Result>; + fn r#struct( + &self, + array: &StructArray, + arrow_type: &DataType, + iceberg_type: &StructType, + ) -> Result> { + let DataType::Struct(arrow_struct_fields) = arrow_type else { + return Err(Error::new( ErrorKind::DataInvalid, - format!( - "The type of arrow boolean array is not compatitable with iceberg type {}", - iceberg_type - ), - )), + "The type of arrow struct array is not a struct type", + )); + }; + + if array.columns().len() != iceberg_type.fields().len() + || arrow_struct_fields.len() != iceberg_type.fields().len() + { + return Err(Error::new( + ErrorKind::DataInvalid, + "The type of arrow struct array is not compatitable with iceberg struct type", + )); + } + + let mut columns = Vec::with_capacity(array.columns().len()); + + for ((array, arrow_type), iceberg_field) in array + .columns() + .iter() + .zip_eq(arrow_struct_fields.iter().map(|field| field.data_type())) + .zip_eq(iceberg_type.fields().iter()) + { + if array.is_nullable() == iceberg_field.required { + return Err(Error::new( + ErrorKind::DataInvalid, + "The nullable field of arrow struct array is not compatitable with iceberg type", + )); + } + match (arrow_type, iceberg_field.field_type.as_ref()) { + (DataType::Null, Type::Primitive(primitive_type)) => { + if iceberg_field.required { + return Err(Error::new( + ErrorKind::DataInvalid, + "column in arrow array should not be optional", + )); + } + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.null(array, arrow_type, primitive_type)?); + } + (DataType::Boolean, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.boolean(array, arrow_type, primitive_type)?); + } + (DataType::Int16, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.int16(array, arrow_type, primitive_type)?); + } + (DataType::Int32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.int32(array, arrow_type, primitive_type)?); + } + (DataType::Int64, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.int64(array, arrow_type, primitive_type)?); + } + (DataType::Float32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.float(array, arrow_type, primitive_type)?); + } + (DataType::Float64, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.double(array, arrow_type, primitive_type)?); + } + (DataType::Decimal128(_, _), Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.decimal(array, arrow_type, primitive_type)?); + } + (DataType::Date32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.date(array, arrow_type, primitive_type)?); + } + (DataType::Time64(TimeUnit::Microsecond), Type::Primitive(primitive_type)) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(self.time(array, arrow_type, primitive_type)?); + } + ( + DataType::Timestamp(TimeUnit::Microsecond, _), + Type::Primitive(primitive_type), + ) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(self.timestamp(array, arrow_type, primitive_type)?); + } + (DataType::Timestamp(TimeUnit::Nanosecond, _), Type::Primitive(primitive_type)) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(self.timestamp_nano(array, arrow_type, primitive_type)?); + } + (DataType::Utf8, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.string(array, arrow_type, primitive_type)?); + } + (DataType::LargeUtf8, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.large_string(array, arrow_type, primitive_type)?); + } + (DataType::Binary, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.binary(array, arrow_type, primitive_type)?); + } + (DataType::LargeBinary, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.large_binary(array, arrow_type, primitive_type)?); + } + (DataType::Struct(_), Type::Struct(struct_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.r#struct(array, arrow_type, struct_type)?); + } + (arrow_type, iceberg_field_type) => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + format!( + "Unsupported convert arrow type {} to iceberg type: {}", + arrow_type, iceberg_field_type + ), + )) + } + } } + + self.combine_struct(array, columns) } +} + +struct LiteralArrayVisitor; + +impl ArrowArrayVistor for LiteralArrayVisitor { + type T = Option; - fn to_struct_literal_array( + fn null( &self, + array: &NullArray, _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() + _iceberg_type: &PrimitiveType, + ) -> Result> { + Ok(vec![None; array.len()]) } -} -impl ToIcebergLiteralArray for Int16Array { - fn to_primitive_literal_array( + fn boolean( &self, + array: &BooleanArray, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::Int => Ok(self.iter().map(|v| v.map(Literal::int)).collect()), - PrimitiveType::Long => Ok(self.iter().map(|v| v.map(Literal::long)).collect()), + PrimitiveType::Boolean => Ok(array.iter().map(|v| v.map(Literal::bool)).collect()), _ => Err(Error::new( ErrorKind::DataInvalid, format!( - "The type of arrow int16 array is not compatitable with iceberg type {}", + "The type of arrow boolean array is not compatitable with iceberg type {}", iceberg_type ), )), } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for Int32Array { - fn to_primitive_literal_array( + fn int16( &self, + array: &Int16Array, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::Int => Ok(self.iter().map(|v| v.map(Literal::int)).collect()), - PrimitiveType::Long => Ok(self.iter().map(|v| v.map(Literal::long)).collect()), + PrimitiveType::Int => Ok(array.iter().map(|v| v.map(Literal::int)).collect()), + PrimitiveType::Long => Ok(array.iter().map(|v| v.map(Literal::long)).collect()), _ => Err(Error::new( ErrorKind::DataInvalid, format!( - "The type of arrow int32 array is not compatitable with iceberg type {}", + "The type of arrow int16 array is not compatitable with iceberg type {}", iceberg_type ), )), } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for Int64Array { - fn to_primitive_literal_array( + fn int32( &self, + array: &Int32Array, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::Long => Ok(self.iter().map(|v| v.map(Literal::long)).collect()), + PrimitiveType::Int => Ok(array.iter().map(|v| v.map(Literal::int)).collect()), + PrimitiveType::Long => Ok(array.iter().map(|v| v.map(Literal::long)).collect()), _ => Err(Error::new( ErrorKind::DataInvalid, format!( - "The type of arrow int64 array is not compatitable with iceberg type {}", + "The type of arrow int32 array is not compatitable with iceberg type {}", iceberg_type ), )), } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for Float16Array { - fn to_primitive_literal_array( + fn int64( &self, + array: &Int64Array, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::Float => Ok(self - .iter() - .map(|v| v.map(|v| Literal::float(v.to_f32()))) - .collect()), + PrimitiveType::Long => Ok(array.iter().map(|v| v.map(Literal::long)).collect()), _ => Err(Error::new( ErrorKind::DataInvalid, format!( - "The type of arrow float16 array is not compatitable with iceberg type {}", + "The type of arrow int64 array is not compatitable with iceberg type {}", iceberg_type ), )), } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for Float32Array { - fn to_primitive_literal_array( + fn float( &self, + array: &Float32Array, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::Float => Ok(self.iter().map(|v| v.map(Literal::float)).collect()), + PrimitiveType::Float => Ok(array.iter().map(|v| v.map(Literal::float)).collect()), _ => Err(Error::new( ErrorKind::DataInvalid, format!( - "The type of arrow float32 array is not compatitable with iceberg type {}", + "The type of arrow float16 array is not compatitable with iceberg type {}", iceberg_type ), )), } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for Float64Array { - fn to_primitive_literal_array( + fn double( &self, + array: &Float64Array, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::Double => Ok(self.iter().map(|v| v.map(Literal::double)).collect()), + PrimitiveType::Double => Ok(array.iter().map(|v| v.map(Literal::double)).collect()), _ => Err(Error::new( ErrorKind::DataInvalid, format!( @@ -225,21 +392,12 @@ impl ToIcebergLiteralArray for Float64Array { } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for Decimal128Array { - fn to_primitive_literal_array( + fn decimal( &self, + array: &Decimal128Array, arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { let DataType::Decimal128(arrow_precision, arrow_scale) = arrow_type else { unreachable!() }; @@ -254,7 +412,7 @@ impl ToIcebergLiteralArray for Decimal128Array { ), )); } - Ok(self.iter().map(|v| v.map(Literal::decimal)).collect()) + Ok(array.iter().map(|v| v.map(Literal::decimal)).collect()) } _ => Err(Error::new( ErrorKind::DataInvalid, @@ -266,23 +424,14 @@ impl ToIcebergLiteralArray for Decimal128Array { } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for Date32Array { - fn to_primitive_literal_array( + fn date( &self, + array: &Date32Array, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::Date => Ok(self.iter().map(|v| v.map(Literal::date)).collect()), + PrimitiveType::Date => Ok(array.iter().map(|v| v.map(Literal::date)).collect()), _ => Err(Error::new( ErrorKind::DataInvalid, format!( @@ -293,23 +442,14 @@ impl ToIcebergLiteralArray for Date32Array { } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for Time64MicrosecondArray { - fn to_primitive_literal_array( + fn time( &self, + array: &Time64MicrosecondArray, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::Time => Ok(self + PrimitiveType::Time => Ok(array .iter() .map(|v| v.map(Literal::time)) .collect()), @@ -323,27 +463,18 @@ impl ToIcebergLiteralArray for Time64MicrosecondArray { } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for TimestampMicrosecondArray { - fn to_primitive_literal_array( + fn timestamp( &self, + array: &TimestampMicrosecondArray, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::Timestamp => Ok(self + PrimitiveType::Timestamp => Ok(array .iter() .map(|v| v.map(Literal::timestamp)) .collect()), - PrimitiveType::Timestamptz => Ok(self + PrimitiveType::Timestamptz => Ok(array .iter() .map(|v| v.map(Literal::timestamptz)) .collect()), @@ -357,27 +488,18 @@ impl ToIcebergLiteralArray for TimestampMicrosecondArray { } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for TimestampNanosecondArray { - fn to_primitive_literal_array( + fn timestamp_nano( &self, + array: &TimestampNanosecondArray, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::TimestampNs => Ok(self + PrimitiveType::TimestampNs => Ok(array .iter() .map(|v| v.map(Literal::timestamp_nano)) .collect()), - PrimitiveType::TimestamptzNs => Ok(self + PrimitiveType::TimestamptzNs => Ok(array .iter() .map(|v| v.map(Literal::timestamptz_nano)) .collect()), @@ -391,23 +513,14 @@ impl ToIcebergLiteralArray for TimestampNanosecondArray { } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for StringArray { - fn to_primitive_literal_array( + fn string( &self, + array: &StringArray, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::String => Ok(self.iter().map(|v| v.map(Literal::string)).collect()), + PrimitiveType::String => Ok(array.iter().map(|v| v.map(Literal::string)).collect()), _ => Err(Error::new( ErrorKind::DataInvalid, format!( @@ -418,23 +531,14 @@ impl ToIcebergLiteralArray for StringArray { } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for LargeStringArray { - fn to_primitive_literal_array( + fn large_string( &self, + array: &LargeStringArray, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::String => Ok(self.iter().map(|v| v.map(Literal::string)).collect()), + PrimitiveType::String => Ok(array.iter().map(|v| v.map(Literal::string)).collect()), _ => Err(Error::new( ErrorKind::DataInvalid, format!( @@ -445,23 +549,14 @@ impl ToIcebergLiteralArray for LargeStringArray { } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for BinaryArray { - fn to_primitive_literal_array( + fn binary( &self, + array: &BinaryArray, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::Binary => Ok(self + PrimitiveType::Binary => Ok(array .iter() .map(|v| v.map(|v| Literal::binary(v.to_vec()))) .collect()), @@ -475,187 +570,32 @@ impl ToIcebergLiteralArray for BinaryArray { } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for LargeBinaryArray { - fn to_primitive_literal_array( + fn large_binary( &self, + array: &LargeBinaryArray, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::Binary => Ok(self + PrimitiveType::Binary => Ok(array .iter() .map(|v| v.map(|v| Literal::binary(v.to_vec()))) .collect()), _ => Err(Error::new( ErrorKind::DataInvalid, format!( - "The type of arrow large binary array is not compatitable with iceberg type {}", + "The type of arrow binary array is not compatitable with iceberg type {}", iceberg_type ), )), } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for StructArray { - fn to_primitive_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &PrimitiveType, - ) -> Result>> { - unreachable!() - } - - fn to_struct_literal_array( + fn combine_struct( &self, - arrow_type: &DataType, - iceberg_type: &StructType, - ) -> Result>> { - let DataType::Struct(arrow_struct_fields) = arrow_type else { - return Err(Error::new( - ErrorKind::DataInvalid, - "The type of arrow struct array is not a struct type", - )); - }; - - if self.columns().len() != iceberg_type.fields().len() - || arrow_struct_fields.len() != iceberg_type.fields().len() - { - return Err(Error::new( - ErrorKind::DataInvalid, - "The type of arrow struct array is not compatitable with iceberg struct type", - )); - } - - let mut columns = Vec::with_capacity(self.columns().len()); - - for ((array, arrow_type), iceberg_field) in self - .columns() - .iter() - .zip_eq(arrow_struct_fields.iter().map(|field| field.data_type())) - .zip_eq(iceberg_type.fields().iter()) - { - if array.is_nullable() == iceberg_field.required { - return Err(Error::new( - ErrorKind::DataInvalid, - "The nullable field of arrow struct array is not compatitable with iceberg type", - )); - } - match (arrow_type, iceberg_field.field_type.as_ref()) { - (DataType::Null, _) => { - if iceberg_field.required { - return Err(Error::new( - ErrorKind::DataInvalid, - "column in arrow array should not be optional", - )); - } - columns.push(vec![None; array.len()]); - } - (DataType::Boolean, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::Int16, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::Int32, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::Int64, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::Float32, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::Float64, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::Decimal128(_, _), Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::Date32, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::Time64(TimeUnit::Microsecond), Type::Primitive(primitive_type)) => { - let array = array - .as_any() - .downcast_ref::() - .unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - ( - DataType::Timestamp(TimeUnit::Microsecond, _), - Type::Primitive(primitive_type), - ) => { - let array = array - .as_any() - .downcast_ref::() - .unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::Timestamp(TimeUnit::Nanosecond, _), Type::Primitive(primitive_type)) => { - let array = array - .as_any() - .downcast_ref::() - .unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::Utf8, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::LargeUtf8, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::Binary, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::LargeBinary, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::Struct(_), Type::Struct(struct_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_struct_literal_array(arrow_type, struct_type)?); - } - (arrow_type, iceberg_field_type) => { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - format!( - "Unsupported convert arrow type {} to iceberg type: {}", - arrow_type, iceberg_field_type - ), - )) - } - } - } - + array: &StructArray, + columns: Vec>, + ) -> Result> { let struct_literal_len = columns.first().map(|column| column.len()).unwrap_or(0); let mut struct_literals = Vec::with_capacity(struct_literal_len); let mut columns_iters = columns @@ -664,7 +604,7 @@ impl ToIcebergLiteralArray for StructArray { .collect::>(); for row_idx in 0..struct_literal_len { - if self.is_null(row_idx) { + if array.is_null(row_idx) { struct_literals.push(None); continue; } @@ -680,11 +620,11 @@ impl ToIcebergLiteralArray for StructArray { } /// Convert arrow struct array to iceberg struct value array. -pub fn arrow_struct_to_iceberg_struct( +pub fn arrow_struct_to_literal( struct_array: &StructArray, ty: StructType, ) -> Result>> { - struct_array.to_struct_literal_array(struct_array.data_type(), &ty) + LiteralArrayVisitor.r#struct(struct_array, struct_array.data_type(), &ty) } #[cfg(test)] @@ -870,7 +810,7 @@ mod test { )), ]); - let result = arrow_struct_to_iceberg_struct(&struct_array, iceberg_struct_type).unwrap(); + let result = arrow_struct_to_literal(&struct_array, iceberg_struct_type).unwrap(); assert_eq!(result, vec![ Some(Literal::Struct(Struct::from_iter(vec![ @@ -920,7 +860,7 @@ mod test { "bool_field", Type::Primitive(PrimitiveType::Boolean), ))]); - let result = arrow_struct_to_iceberg_struct(&struct_array, iceberg_struct_type).unwrap(); + let result = arrow_struct_to_literal(&struct_array, iceberg_struct_type).unwrap(); assert_eq!(result, vec![None; 3]); } @@ -928,7 +868,7 @@ mod test { fn test_empty_struct() { let struct_array = StructArray::new_null(Fields::empty(), 3); let iceberg_struct_type = StructType::new(vec![]); - let result = arrow_struct_to_iceberg_struct(&struct_array, iceberg_struct_type).unwrap(); + let result = arrow_struct_to_literal(&struct_array, iceberg_struct_type).unwrap(); assert_eq!(result, vec![None; 0]); } } From ecce70ace3438fb15c8c0adccdfb51830e20d9f0 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 11 Dec 2024 12:21:52 +0800 Subject: [PATCH 3/7] refine visitor design --- crates/iceberg/src/arrow/value.rs | 686 ++++++++++++++++++------------ 1 file changed, 411 insertions(+), 275 deletions(-) diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index 004882608..c1e075e78 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -22,272 +22,72 @@ use arrow_array::{ }; use arrow_schema::{DataType, TimeUnit}; use itertools::Itertools; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use crate::spec::{Literal, PrimitiveType, Struct, StructType, Type}; use crate::{Error, ErrorKind, Result}; +/// A post order arrow array visitor. trait ArrowArrayVistor { type T; - fn null( - &self, - array: &NullArray, - arrow_type: &DataType, - iceberg_type: &PrimitiveType, - ) -> Result>; - fn boolean( - &self, - array: &BooleanArray, - arrow_type: &DataType, - iceberg_type: &PrimitiveType, - ) -> Result>; - fn int16( - &self, - array: &Int16Array, - arrow_type: &DataType, - iceberg_type: &PrimitiveType, - ) -> Result>; - fn int32( - &self, - array: &Int32Array, - arrow_type: &DataType, - iceberg_type: &PrimitiveType, - ) -> Result>; - fn int64( - &self, - array: &Int64Array, - arrow_type: &DataType, - iceberg_type: &PrimitiveType, - ) -> Result>; - fn float( - &self, - array: &Float32Array, - arrow_type: &DataType, - iceberg_type: &PrimitiveType, - ) -> Result>; - fn double( - &self, - array: &Float64Array, - arrow_type: &DataType, - iceberg_type: &PrimitiveType, - ) -> Result>; + fn null(&self, array: &NullArray, iceberg_type: &PrimitiveType) -> Result>; + fn boolean(&self, array: &BooleanArray, iceberg_type: &PrimitiveType) -> Result>; + fn int16(&self, array: &Int16Array, iceberg_type: &PrimitiveType) -> Result>; + fn int32(&self, array: &Int32Array, iceberg_type: &PrimitiveType) -> Result>; + fn int64(&self, array: &Int64Array, iceberg_type: &PrimitiveType) -> Result>; + fn float(&self, array: &Float32Array, iceberg_type: &PrimitiveType) -> Result>; + fn double(&self, array: &Float64Array, iceberg_type: &PrimitiveType) -> Result>; fn decimal( &self, array: &Decimal128Array, - arrow_type: &DataType, - iceberg_type: &PrimitiveType, - ) -> Result>; - fn date( - &self, - array: &Date32Array, - arrow_type: &DataType, iceberg_type: &PrimitiveType, ) -> Result>; + fn date(&self, array: &Date32Array, iceberg_type: &PrimitiveType) -> Result>; fn time( &self, array: &Time64MicrosecondArray, - arrow_type: &DataType, iceberg_type: &PrimitiveType, ) -> Result>; fn timestamp( &self, array: &TimestampMicrosecondArray, - arrow_type: &DataType, iceberg_type: &PrimitiveType, ) -> Result>; fn timestamp_nano( &self, array: &TimestampNanosecondArray, - arrow_type: &DataType, - iceberg_type: &PrimitiveType, - ) -> Result>; - fn string( - &self, - array: &StringArray, - arrow_type: &DataType, iceberg_type: &PrimitiveType, ) -> Result>; + fn string(&self, array: &StringArray, iceberg_type: &PrimitiveType) -> Result>; fn large_string( &self, array: &LargeStringArray, - arrow_type: &DataType, - iceberg_type: &PrimitiveType, - ) -> Result>; - fn binary( - &self, - array: &BinaryArray, - arrow_type: &DataType, iceberg_type: &PrimitiveType, ) -> Result>; + fn binary(&self, array: &BinaryArray, iceberg_type: &PrimitiveType) -> Result>; fn large_binary( &self, array: &LargeBinaryArray, - arrow_type: &DataType, iceberg_type: &PrimitiveType, ) -> Result>; - fn combine_struct( - &self, - array: &StructArray, - columns: Vec>, - ) -> Result>; fn r#struct( &self, array: &StructArray, - arrow_type: &DataType, iceberg_type: &StructType, - ) -> Result> { - let DataType::Struct(arrow_struct_fields) = arrow_type else { - return Err(Error::new( - ErrorKind::DataInvalid, - "The type of arrow struct array is not a struct type", - )); - }; - - if array.columns().len() != iceberg_type.fields().len() - || arrow_struct_fields.len() != iceberg_type.fields().len() - { - return Err(Error::new( - ErrorKind::DataInvalid, - "The type of arrow struct array is not compatitable with iceberg struct type", - )); - } - - let mut columns = Vec::with_capacity(array.columns().len()); - - for ((array, arrow_type), iceberg_field) in array - .columns() - .iter() - .zip_eq(arrow_struct_fields.iter().map(|field| field.data_type())) - .zip_eq(iceberg_type.fields().iter()) - { - if array.is_nullable() == iceberg_field.required { - return Err(Error::new( - ErrorKind::DataInvalid, - "The nullable field of arrow struct array is not compatitable with iceberg type", - )); - } - match (arrow_type, iceberg_field.field_type.as_ref()) { - (DataType::Null, Type::Primitive(primitive_type)) => { - if iceberg_field.required { - return Err(Error::new( - ErrorKind::DataInvalid, - "column in arrow array should not be optional", - )); - } - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(self.null(array, arrow_type, primitive_type)?); - } - (DataType::Boolean, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(self.boolean(array, arrow_type, primitive_type)?); - } - (DataType::Int16, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(self.int16(array, arrow_type, primitive_type)?); - } - (DataType::Int32, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(self.int32(array, arrow_type, primitive_type)?); - } - (DataType::Int64, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(self.int64(array, arrow_type, primitive_type)?); - } - (DataType::Float32, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(self.float(array, arrow_type, primitive_type)?); - } - (DataType::Float64, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(self.double(array, arrow_type, primitive_type)?); - } - (DataType::Decimal128(_, _), Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(self.decimal(array, arrow_type, primitive_type)?); - } - (DataType::Date32, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(self.date(array, arrow_type, primitive_type)?); - } - (DataType::Time64(TimeUnit::Microsecond), Type::Primitive(primitive_type)) => { - let array = array - .as_any() - .downcast_ref::() - .unwrap(); - columns.push(self.time(array, arrow_type, primitive_type)?); - } - ( - DataType::Timestamp(TimeUnit::Microsecond, _), - Type::Primitive(primitive_type), - ) => { - let array = array - .as_any() - .downcast_ref::() - .unwrap(); - columns.push(self.timestamp(array, arrow_type, primitive_type)?); - } - (DataType::Timestamp(TimeUnit::Nanosecond, _), Type::Primitive(primitive_type)) => { - let array = array - .as_any() - .downcast_ref::() - .unwrap(); - columns.push(self.timestamp_nano(array, arrow_type, primitive_type)?); - } - (DataType::Utf8, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(self.string(array, arrow_type, primitive_type)?); - } - (DataType::LargeUtf8, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(self.large_string(array, arrow_type, primitive_type)?); - } - (DataType::Binary, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(self.binary(array, arrow_type, primitive_type)?); - } - (DataType::LargeBinary, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(self.large_binary(array, arrow_type, primitive_type)?); - } - (DataType::Struct(_), Type::Struct(struct_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(self.r#struct(array, arrow_type, struct_type)?); - } - (arrow_type, iceberg_field_type) => { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - format!( - "Unsupported convert arrow type {} to iceberg type: {}", - arrow_type, iceberg_field_type - ), - )) - } - } - } - - self.combine_struct(array, columns) - } + childs: Vec>, + ) -> Result>; } -struct LiteralArrayVisitor; +struct ArrowArrayConvert; -impl ArrowArrayVistor for LiteralArrayVisitor { +impl ArrowArrayVistor for ArrowArrayConvert { type T = Option; - fn null( - &self, - array: &NullArray, - _arrow_type: &DataType, - _iceberg_type: &PrimitiveType, - ) -> Result> { + fn null(&self, array: &NullArray, _iceberg_type: &PrimitiveType) -> Result> { Ok(vec![None; array.len()]) } - fn boolean( - &self, - array: &BooleanArray, - _arrow_type: &DataType, - iceberg_type: &PrimitiveType, - ) -> Result> { + fn boolean(&self, array: &BooleanArray, iceberg_type: &PrimitiveType) -> Result> { match iceberg_type { PrimitiveType::Boolean => Ok(array.iter().map(|v| v.map(Literal::bool)).collect()), _ => Err(Error::new( @@ -300,12 +100,7 @@ impl ArrowArrayVistor for LiteralArrayVisitor { } } - fn int16( - &self, - array: &Int16Array, - _arrow_type: &DataType, - iceberg_type: &PrimitiveType, - ) -> Result> { + fn int16(&self, array: &Int16Array, iceberg_type: &PrimitiveType) -> Result> { match iceberg_type { PrimitiveType::Int => Ok(array.iter().map(|v| v.map(Literal::int)).collect()), PrimitiveType::Long => Ok(array.iter().map(|v| v.map(Literal::long)).collect()), @@ -319,12 +114,7 @@ impl ArrowArrayVistor for LiteralArrayVisitor { } } - fn int32( - &self, - array: &Int32Array, - _arrow_type: &DataType, - iceberg_type: &PrimitiveType, - ) -> Result> { + fn int32(&self, array: &Int32Array, iceberg_type: &PrimitiveType) -> Result> { match iceberg_type { PrimitiveType::Int => Ok(array.iter().map(|v| v.map(Literal::int)).collect()), PrimitiveType::Long => Ok(array.iter().map(|v| v.map(Literal::long)).collect()), @@ -338,12 +128,7 @@ impl ArrowArrayVistor for LiteralArrayVisitor { } } - fn int64( - &self, - array: &Int64Array, - _arrow_type: &DataType, - iceberg_type: &PrimitiveType, - ) -> Result> { + fn int64(&self, array: &Int64Array, iceberg_type: &PrimitiveType) -> Result> { match iceberg_type { PrimitiveType::Long => Ok(array.iter().map(|v| v.map(Literal::long)).collect()), _ => Err(Error::new( @@ -356,12 +141,7 @@ impl ArrowArrayVistor for LiteralArrayVisitor { } } - fn float( - &self, - array: &Float32Array, - _arrow_type: &DataType, - iceberg_type: &PrimitiveType, - ) -> Result> { + fn float(&self, array: &Float32Array, iceberg_type: &PrimitiveType) -> Result> { match iceberg_type { PrimitiveType::Float => Ok(array.iter().map(|v| v.map(Literal::float)).collect()), _ => Err(Error::new( @@ -374,12 +154,7 @@ impl ArrowArrayVistor for LiteralArrayVisitor { } } - fn double( - &self, - array: &Float64Array, - _arrow_type: &DataType, - iceberg_type: &PrimitiveType, - ) -> Result> { + fn double(&self, array: &Float64Array, iceberg_type: &PrimitiveType) -> Result> { match iceberg_type { PrimitiveType::Double => Ok(array.iter().map(|v| v.map(Literal::double)).collect()), _ => Err(Error::new( @@ -395,10 +170,9 @@ impl ArrowArrayVistor for LiteralArrayVisitor { fn decimal( &self, array: &Decimal128Array, - arrow_type: &DataType, iceberg_type: &PrimitiveType, ) -> Result> { - let DataType::Decimal128(arrow_precision, arrow_scale) = arrow_type else { + let DataType::Decimal128(arrow_precision, arrow_scale) = array.data_type() else { unreachable!() }; match iceberg_type { @@ -424,12 +198,7 @@ impl ArrowArrayVistor for LiteralArrayVisitor { } } - fn date( - &self, - array: &Date32Array, - _arrow_type: &DataType, - iceberg_type: &PrimitiveType, - ) -> Result> { + fn date(&self, array: &Date32Array, iceberg_type: &PrimitiveType) -> Result> { match iceberg_type { PrimitiveType::Date => Ok(array.iter().map(|v| v.map(Literal::date)).collect()), _ => Err(Error::new( @@ -445,7 +214,6 @@ impl ArrowArrayVistor for LiteralArrayVisitor { fn time( &self, array: &Time64MicrosecondArray, - _arrow_type: &DataType, iceberg_type: &PrimitiveType, ) -> Result> { match iceberg_type { @@ -466,7 +234,6 @@ impl ArrowArrayVistor for LiteralArrayVisitor { fn timestamp( &self, array: &TimestampMicrosecondArray, - _arrow_type: &DataType, iceberg_type: &PrimitiveType, ) -> Result> { match iceberg_type { @@ -491,7 +258,6 @@ impl ArrowArrayVistor for LiteralArrayVisitor { fn timestamp_nano( &self, array: &TimestampNanosecondArray, - _arrow_type: &DataType, iceberg_type: &PrimitiveType, ) -> Result> { match iceberg_type { @@ -513,12 +279,7 @@ impl ArrowArrayVistor for LiteralArrayVisitor { } } - fn string( - &self, - array: &StringArray, - _arrow_type: &DataType, - iceberg_type: &PrimitiveType, - ) -> Result> { + fn string(&self, array: &StringArray, iceberg_type: &PrimitiveType) -> Result> { match iceberg_type { PrimitiveType::String => Ok(array.iter().map(|v| v.map(Literal::string)).collect()), _ => Err(Error::new( @@ -534,7 +295,6 @@ impl ArrowArrayVistor for LiteralArrayVisitor { fn large_string( &self, array: &LargeStringArray, - _arrow_type: &DataType, iceberg_type: &PrimitiveType, ) -> Result> { match iceberg_type { @@ -549,12 +309,7 @@ impl ArrowArrayVistor for LiteralArrayVisitor { } } - fn binary( - &self, - array: &BinaryArray, - _arrow_type: &DataType, - iceberg_type: &PrimitiveType, - ) -> Result> { + fn binary(&self, array: &BinaryArray, iceberg_type: &PrimitiveType) -> Result> { match iceberg_type { PrimitiveType::Binary => Ok(array .iter() @@ -573,7 +328,6 @@ impl ArrowArrayVistor for LiteralArrayVisitor { fn large_binary( &self, array: &LargeBinaryArray, - _arrow_type: &DataType, iceberg_type: &PrimitiveType, ) -> Result> { match iceberg_type { @@ -591,9 +345,10 @@ impl ArrowArrayVistor for LiteralArrayVisitor { } } - fn combine_struct( + fn r#struct( &self, array: &StructArray, + _iceberg_type: &StructType, columns: Vec>, ) -> Result> { let struct_literal_len = columns.first().map(|column| column.len()).unwrap_or(0); @@ -619,16 +374,306 @@ impl ArrowArrayVistor for LiteralArrayVisitor { } } +fn visit_arrow_struct_array( + array: &StructArray, + iceberg_type: &StructType, + visitor: &V, +) -> Result> { + let DataType::Struct(arrow_struct_fields) = array.data_type() else { + return Err(Error::new( + ErrorKind::DataInvalid, + "The type of arrow struct array is not a struct type", + )); + }; + + if array.columns().len() != iceberg_type.fields().len() + || arrow_struct_fields.len() != iceberg_type.fields().len() + { + return Err(Error::new( + ErrorKind::DataInvalid, + "The type of arrow struct array is not compatitable with iceberg struct type", + )); + } + + let mut columns = Vec::with_capacity(array.columns().len()); + + for ((array, arrow_type), iceberg_field) in array + .columns() + .iter() + .zip_eq(arrow_struct_fields.iter().map(|field| field.data_type())) + .zip_eq(iceberg_type.fields().iter()) + { + if array.is_nullable() == iceberg_field.required { + return Err(Error::new( + ErrorKind::DataInvalid, + "The nullable field of arrow struct array is not compatitable with iceberg type", + )); + } + match (arrow_type, iceberg_field.field_type.as_ref()) { + (DataType::Null, Type::Primitive(primitive_type)) => { + if iceberg_field.required { + return Err(Error::new( + ErrorKind::DataInvalid, + "column in arrow array should not be optional", + )); + } + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.null(array, primitive_type)?); + } + (DataType::Boolean, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.boolean(array, primitive_type)?); + } + (DataType::Int16, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.int16(array, primitive_type)?); + } + (DataType::Int32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.int32(array, primitive_type)?); + } + (DataType::Int64, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.int64(array, primitive_type)?); + } + (DataType::Float32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.float(array, primitive_type)?); + } + (DataType::Float64, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.double(array, primitive_type)?); + } + (DataType::Decimal128(_, _), Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.decimal(array, primitive_type)?); + } + (DataType::Date32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.date(array, primitive_type)?); + } + (DataType::Time64(TimeUnit::Microsecond), Type::Primitive(primitive_type)) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(visitor.time(array, primitive_type)?); + } + (DataType::Timestamp(TimeUnit::Microsecond, _), Type::Primitive(primitive_type)) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(visitor.timestamp(array, primitive_type)?); + } + (DataType::Timestamp(TimeUnit::Nanosecond, _), Type::Primitive(primitive_type)) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(visitor.timestamp_nano(array, primitive_type)?); + } + (DataType::Utf8, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.string(array, primitive_type)?); + } + (DataType::LargeUtf8, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.large_string(array, primitive_type)?); + } + (DataType::Binary, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.binary(array, primitive_type)?); + } + (DataType::LargeBinary, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.large_binary(array, primitive_type)?); + } + (DataType::Struct(_), Type::Struct(struct_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visit_arrow_struct_array(array, struct_type, visitor)?); + } + (arrow_type, iceberg_field_type) => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + format!( + "Unsupported convert arrow type {} to iceberg type: {}", + arrow_type, iceberg_field_type + ), + )) + } + } + } + + visitor.r#struct(array, iceberg_type, columns) +} + +fn visit_arrow_struct_array_from_field_id( + array: &StructArray, + iceberg_type: &StructType, + visitor: &V, +) -> Result> { + let DataType::Struct(arrow_struct_fields) = array.data_type() else { + return Err(Error::new( + ErrorKind::DataInvalid, + "The type of arrow struct array is not a struct type", + )); + }; + + if array.columns().len() < iceberg_type.fields().len() + || arrow_struct_fields.len() < iceberg_type.fields().len() + { + return Err(Error::new( + ErrorKind::DataInvalid, + "The type of arrow struct array is not compatitable with iceberg struct type", + )); + } + + let mut columns = Vec::with_capacity(array.columns().len()); + + for iceberg_field in iceberg_type.fields() { + let Some((idx, field)) = arrow_struct_fields.iter().enumerate().find(|(_idx, f)| { + f.metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .and_then(|id| id.parse::().ok().map(|id: i32| id == iceberg_field.id)) + .unwrap_or(false) + }) else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The field {} in iceberg struct type is not found in arrow struct type", + iceberg_field.name + ), + )); + }; + let array = array.column(idx); + let arrow_type = field.data_type(); + if array.is_nullable() == iceberg_field.required { + return Err(Error::new( + ErrorKind::DataInvalid, + "The nullable field of arrow struct array is not compatitable with iceberg type", + )); + } + match (arrow_type, iceberg_field.field_type.as_ref()) { + (DataType::Null, Type::Primitive(primitive_type)) => { + if iceberg_field.required { + return Err(Error::new( + ErrorKind::DataInvalid, + "column in arrow array should not be optional", + )); + } + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.null(array, primitive_type)?); + } + (DataType::Boolean, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.boolean(array, primitive_type)?); + } + (DataType::Int16, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.int16(array, primitive_type)?); + } + (DataType::Int32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.int32(array, primitive_type)?); + } + (DataType::Int64, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.int64(array, primitive_type)?); + } + (DataType::Float32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.float(array, primitive_type)?); + } + (DataType::Float64, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.double(array, primitive_type)?); + } + (DataType::Decimal128(_, _), Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.decimal(array, primitive_type)?); + } + (DataType::Date32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.date(array, primitive_type)?); + } + (DataType::Time64(TimeUnit::Microsecond), Type::Primitive(primitive_type)) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(visitor.time(array, primitive_type)?); + } + (DataType::Timestamp(TimeUnit::Microsecond, _), Type::Primitive(primitive_type)) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(visitor.timestamp(array, primitive_type)?); + } + (DataType::Timestamp(TimeUnit::Nanosecond, _), Type::Primitive(primitive_type)) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(visitor.timestamp_nano(array, primitive_type)?); + } + (DataType::Utf8, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.string(array, primitive_type)?); + } + (DataType::LargeUtf8, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.large_string(array, primitive_type)?); + } + (DataType::Binary, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.binary(array, primitive_type)?); + } + (DataType::LargeBinary, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.large_binary(array, primitive_type)?); + } + (DataType::Struct(_), Type::Struct(struct_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visit_arrow_struct_array(array, struct_type, visitor)?); + } + (arrow_type, iceberg_field_type) => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + format!( + "Unsupported convert arrow type {} to iceberg type: {}", + arrow_type, iceberg_field_type + ), + )) + } + } + } + + visitor.r#struct(array, iceberg_type, columns) +} + /// Convert arrow struct array to iceberg struct value array. +/// This function will assume the schema of arrow struct array is the same as iceberg struct type. pub fn arrow_struct_to_literal( struct_array: &StructArray, ty: StructType, ) -> Result>> { - LiteralArrayVisitor.r#struct(struct_array, struct_array.data_type(), &ty) + visit_arrow_struct_array(struct_array, &ty, &ArrowArrayConvert) +} + +/// Convert arrow struct array to iceberg struct value array. +/// This function will use field id to find the corresponding field in arrow struct array. +pub fn arrow_struct_to_literal_from_field_id( + struct_array: &StructArray, + ty: StructType, +) -> Result>> { + visit_arrow_struct_array_from_field_id(struct_array, &ty, &ArrowArrayConvert) } #[cfg(test)] mod test { + use std::collections::HashMap; use std::sync::Arc; use arrow_array::{ @@ -871,4 +916,95 @@ mod test { let result = arrow_struct_to_literal(&struct_array, iceberg_struct_type).unwrap(); assert_eq!(result, vec![None; 0]); } + + #[test] + fn test_arrow_struct_to_iceberg_struct_from_field_id() { + let bool_array = BooleanArray::from(vec![Some(true), Some(false), None]); + let int16_array = Int16Array::from(vec![Some(1), Some(2), None]); + let int32_array = Int32Array::from(vec![Some(3), Some(4), None]); + let int64_array = Int64Array::from(vec![Some(5), Some(6), None]); + let float32_array = Float32Array::from(vec![Some(1.1), Some(2.2), None]); + let struct_array = StructArray::from(vec![ + ( + Arc::new( + Field::new("bool_field", DataType::Boolean, true).with_metadata(HashMap::from( + [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())], + )), + ), + Arc::new(bool_array) as ArrayRef, + ), + ( + Arc::new( + Field::new("int16_field", DataType::Int16, true).with_metadata(HashMap::from( + [(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())], + )), + ), + Arc::new(int16_array) as ArrayRef, + ), + ( + Arc::new( + Field::new("int32_field", DataType::Int32, true).with_metadata(HashMap::from( + [(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())], + )), + ), + Arc::new(int32_array) as ArrayRef, + ), + ( + Arc::new( + Field::new("int64_field", DataType::Int64, true).with_metadata(HashMap::from( + [(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())], + )), + ), + Arc::new(int64_array) as ArrayRef, + ), + ( + Arc::new( + Field::new("float32_field", DataType::Float32, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "5".to_string())]), + ), + ), + Arc::new(float32_array) as ArrayRef, + ), + ]); + let struct_type = StructType::new(vec![ + Arc::new(NestedField::optional( + 1, + "int16_field", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 2, + "bool_field", + Type::Primitive(PrimitiveType::Boolean), + )), + Arc::new(NestedField::optional( + 3, + "int64_field", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 4, + "int32_field", + Type::Primitive(PrimitiveType::Int), + )), + ]); + let result = arrow_struct_to_literal_from_field_id(&struct_array, struct_type).unwrap(); + assert_eq!(result, vec![ + Some(Literal::Struct(Struct::from_iter(vec![ + Some(Literal::int(1)), + Some(Literal::bool(true)), + Some(Literal::long(5)), + Some(Literal::int(3)), + ]))), + Some(Literal::Struct(Struct::from_iter(vec![ + Some(Literal::int(2)), + Some(Literal::bool(false)), + Some(Literal::long(6)), + Some(Literal::int(4)), + ]))), + Some(Literal::Struct(Struct::from_iter(vec![ + None, None, None, None, + ]))), + ]); + } } From 85050d933d76f253aa6492abd4571c418e53ad99 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 11 Dec 2024 12:23:41 +0800 Subject: [PATCH 4/7] add todo comment --- crates/iceberg/src/arrow/value.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index c1e075e78..563cbb2a2 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -28,6 +28,8 @@ use crate::spec::{Literal, PrimitiveType, Struct, StructType, Type}; use crate::{Error, ErrorKind, Result}; /// A post order arrow array visitor. +/// # TODO +/// - Add support for ListArray, MapArray trait ArrowArrayVistor { type T; fn null(&self, array: &NullArray, iceberg_type: &PrimitiveType) -> Result>; @@ -508,6 +510,8 @@ fn visit_arrow_struct_array( visitor.r#struct(array, iceberg_type, columns) } +// # TODO +// Add support for fullfill the missing field in arrow struct array fn visit_arrow_struct_array_from_field_id( array: &StructArray, iceberg_type: &StructType, From 8c46815bdd13308e3b769889657d686a904382b7 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 11 Dec 2024 12:24:49 +0800 Subject: [PATCH 5/7] fix typos --- crates/iceberg/src/arrow/value.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index 563cbb2a2..8fdc6428e 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -76,7 +76,7 @@ trait ArrowArrayVistor { &self, array: &StructArray, iceberg_type: &StructType, - childs: Vec>, + columns: Vec>, ) -> Result>; } @@ -511,7 +511,7 @@ fn visit_arrow_struct_array( } // # TODO -// Add support for fullfill the missing field in arrow struct array +// Add support for fulfill the missing field in arrow struct array fn visit_arrow_struct_array_from_field_id( array: &StructArray, iceberg_type: &StructType, From 1d19c78ef7baf05adce1da2e16562b26197ca5a9 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 11 Dec 2024 14:23:22 +0800 Subject: [PATCH 6/7] use field nullable instead of array nullable --- crates/iceberg/src/arrow/value.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index 8fdc6428e..e86509927 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -399,13 +399,14 @@ fn visit_arrow_struct_array( let mut columns = Vec::with_capacity(array.columns().len()); - for ((array, arrow_type), iceberg_field) in array + for ((array, arrow_field), iceberg_field) in array .columns() .iter() - .zip_eq(arrow_struct_fields.iter().map(|field| field.data_type())) + .zip_eq(arrow_struct_fields.iter()) .zip_eq(iceberg_type.fields().iter()) { - if array.is_nullable() == iceberg_field.required { + let arrow_type = arrow_field.data_type(); + if arrow_field.is_nullable() == iceberg_field.required { return Err(Error::new( ErrorKind::DataInvalid, "The nullable field of arrow struct array is not compatitable with iceberg type", @@ -552,7 +553,7 @@ fn visit_arrow_struct_array_from_field_id( }; let array = array.column(idx); let arrow_type = field.data_type(); - if array.is_nullable() == iceberg_field.required { + if field.is_nullable() == iceberg_field.required { return Err(Error::new( ErrorKind::DataInvalid, "The nullable field of arrow struct array is not compatitable with iceberg type", From b85a9adad1d733d9924c73b1524ac6df1409533f Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Mon, 16 Dec 2024 17:03:35 +0800 Subject: [PATCH 7/7] init SchemaWithPartnerVisitor design --- crates/iceberg/src/arrow/schema.rs | 2 +- crates/iceberg/src/arrow/value.rs | 1051 ++++++++++++---------------- crates/iceberg/src/spec/schema.rs | 207 ++++++ 3 files changed, 666 insertions(+), 594 deletions(-) diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index 4de9335d9..b9e36a3b4 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -221,7 +221,7 @@ pub fn arrow_type_to_type(ty: &DataType) -> Result { const ARROW_FIELD_DOC_KEY: &str = "doc"; -fn get_field_id(field: &Field) -> Result { +pub(super) fn get_field_id(field: &Field) -> Result { if let Some(value) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) { return value.parse::().map_err(|e| { Error::new( diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index e86509927..3df12e88c 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -16,355 +16,72 @@ // under the License. use arrow_array::{ - Array, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, - Int16Array, Int32Array, Int64Array, LargeBinaryArray, LargeStringArray, NullArray, StringArray, - StructArray, Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray, + Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, FixedSizeBinaryArray, + FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array, LargeBinaryArray, + LargeListArray, LargeStringArray, ListArray, MapArray, NullArray, StringArray, StructArray, + Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray, }; -use arrow_schema::{DataType, TimeUnit}; -use itertools::Itertools; -use parquet::arrow::PARQUET_FIELD_ID_META_KEY; +use arrow_schema::DataType; +use uuid::Uuid; -use crate::spec::{Literal, PrimitiveType, Struct, StructType, Type}; +use super::get_field_id; +use crate::spec::{ + visit_struct_with_partner, ListPartnerIterator, Literal, Map, MapPartnerIterator, + PartnerAccessor, PrimitiveType, SchemaWithPartnerVisitor, Struct, StructType, +}; use crate::{Error, ErrorKind, Result}; -/// A post order arrow array visitor. -/// # TODO -/// - Add support for ListArray, MapArray -trait ArrowArrayVistor { - type T; - fn null(&self, array: &NullArray, iceberg_type: &PrimitiveType) -> Result>; - fn boolean(&self, array: &BooleanArray, iceberg_type: &PrimitiveType) -> Result>; - fn int16(&self, array: &Int16Array, iceberg_type: &PrimitiveType) -> Result>; - fn int32(&self, array: &Int32Array, iceberg_type: &PrimitiveType) -> Result>; - fn int64(&self, array: &Int64Array, iceberg_type: &PrimitiveType) -> Result>; - fn float(&self, array: &Float32Array, iceberg_type: &PrimitiveType) -> Result>; - fn double(&self, array: &Float64Array, iceberg_type: &PrimitiveType) -> Result>; - fn decimal( - &self, - array: &Decimal128Array, - iceberg_type: &PrimitiveType, - ) -> Result>; - fn date(&self, array: &Date32Array, iceberg_type: &PrimitiveType) -> Result>; - fn time( - &self, - array: &Time64MicrosecondArray, - iceberg_type: &PrimitiveType, - ) -> Result>; - fn timestamp( - &self, - array: &TimestampMicrosecondArray, - iceberg_type: &PrimitiveType, - ) -> Result>; - fn timestamp_nano( - &self, - array: &TimestampNanosecondArray, - iceberg_type: &PrimitiveType, - ) -> Result>; - fn string(&self, array: &StringArray, iceberg_type: &PrimitiveType) -> Result>; - fn large_string( - &self, - array: &LargeStringArray, - iceberg_type: &PrimitiveType, - ) -> Result>; - fn binary(&self, array: &BinaryArray, iceberg_type: &PrimitiveType) -> Result>; - fn large_binary( - &self, - array: &LargeBinaryArray, - iceberg_type: &PrimitiveType, - ) -> Result>; - fn r#struct( - &self, - array: &StructArray, - iceberg_type: &StructType, - columns: Vec>, - ) -> Result>; -} - -struct ArrowArrayConvert; - -impl ArrowArrayVistor for ArrowArrayConvert { - type T = Option; +struct ArrowArrayConverter; - fn null(&self, array: &NullArray, _iceberg_type: &PrimitiveType) -> Result> { - Ok(vec![None; array.len()]) - } - - fn boolean(&self, array: &BooleanArray, iceberg_type: &PrimitiveType) -> Result> { - match iceberg_type { - PrimitiveType::Boolean => Ok(array.iter().map(|v| v.map(Literal::bool)).collect()), - _ => Err(Error::new( - ErrorKind::DataInvalid, - format!( - "The type of arrow boolean array is not compatitable with iceberg type {}", - iceberg_type - ), - )), - } - } - - fn int16(&self, array: &Int16Array, iceberg_type: &PrimitiveType) -> Result> { - match iceberg_type { - PrimitiveType::Int => Ok(array.iter().map(|v| v.map(Literal::int)).collect()), - PrimitiveType::Long => Ok(array.iter().map(|v| v.map(Literal::long)).collect()), - _ => Err(Error::new( - ErrorKind::DataInvalid, - format!( - "The type of arrow int16 array is not compatitable with iceberg type {}", - iceberg_type - ), - )), - } - } - - fn int32(&self, array: &Int32Array, iceberg_type: &PrimitiveType) -> Result> { - match iceberg_type { - PrimitiveType::Int => Ok(array.iter().map(|v| v.map(Literal::int)).collect()), - PrimitiveType::Long => Ok(array.iter().map(|v| v.map(Literal::long)).collect()), - _ => Err(Error::new( - ErrorKind::DataInvalid, - format!( - "The type of arrow int32 array is not compatitable with iceberg type {}", - iceberg_type - ), - )), - } - } +impl SchemaWithPartnerVisitor for ArrowArrayConverter { + type T = Vec>; - fn int64(&self, array: &Int64Array, iceberg_type: &PrimitiveType) -> Result> { - match iceberg_type { - PrimitiveType::Long => Ok(array.iter().map(|v| v.map(Literal::long)).collect()), - _ => Err(Error::new( - ErrorKind::DataInvalid, - format!( - "The type of arrow int64 array is not compatitable with iceberg type {}", - iceberg_type - ), - )), - } - } - - fn float(&self, array: &Float32Array, iceberg_type: &PrimitiveType) -> Result> { - match iceberg_type { - PrimitiveType::Float => Ok(array.iter().map(|v| v.map(Literal::float)).collect()), - _ => Err(Error::new( - ErrorKind::DataInvalid, - format!( - "The type of arrow float16 array is not compatitable with iceberg type {}", - iceberg_type - ), - )), - } - } - - fn double(&self, array: &Float64Array, iceberg_type: &PrimitiveType) -> Result> { - match iceberg_type { - PrimitiveType::Double => Ok(array.iter().map(|v| v.map(Literal::double)).collect()), - _ => Err(Error::new( - ErrorKind::DataInvalid, - format!( - "The type of arrow float64 array is not compatitable with iceberg type {}", - iceberg_type - ), - )), - } - } - - fn decimal( - &self, - array: &Decimal128Array, - iceberg_type: &PrimitiveType, - ) -> Result> { - let DataType::Decimal128(arrow_precision, arrow_scale) = array.data_type() else { - unreachable!() - }; - match iceberg_type { - PrimitiveType::Decimal { precision, scale } => { - if *arrow_precision as u32 != *precision || *arrow_scale as u32 != *scale { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "The precision or scale ({},{}) of arrow decimal128 array is not compatitable with iceberg decimal type ({},{})", - arrow_precision, arrow_scale, precision, scale - ), - )); - } - Ok(array.iter().map(|v| v.map(Literal::decimal)).collect()) - } - _ => Err(Error::new( - ErrorKind::DataInvalid, - format!( - "The type of arrow decimal128 array is not compatitable with iceberg type {}", - iceberg_type - ), - )), - } + fn schema( + &mut self, + _schema: &crate::spec::Schema, + _partner: &ArrayRef, + value: Vec>, + ) -> Result>> { + Ok(value) } - fn date(&self, array: &Date32Array, iceberg_type: &PrimitiveType) -> Result> { - match iceberg_type { - PrimitiveType::Date => Ok(array.iter().map(|v| v.map(Literal::date)).collect()), - _ => Err(Error::new( - ErrorKind::DataInvalid, - format!( - "The type of arrow date32 array is not compatitable with iceberg type {}", - iceberg_type - ), - )), - } - } - - fn time( - &self, - array: &Time64MicrosecondArray, - iceberg_type: &PrimitiveType, - ) -> Result> { - match iceberg_type { - PrimitiveType::Time => Ok(array - .iter() - .map(|v| v.map(Literal::time)) - .collect()), - _ => Err(Error::new( - ErrorKind::DataInvalid, - format!( - "The type of arrow time64 microsecond array is not compatitable with iceberg type {}", - iceberg_type - ), - )), - } - } - - fn timestamp( - &self, - array: &TimestampMicrosecondArray, - iceberg_type: &PrimitiveType, - ) -> Result> { - match iceberg_type { - PrimitiveType::Timestamp => Ok(array - .iter() - .map(|v| v.map(Literal::timestamp)) - .collect()), - PrimitiveType::Timestamptz => Ok(array - .iter() - .map(|v| v.map(Literal::timestamptz)) - .collect()), - _ => Err(Error::new( - ErrorKind::DataInvalid, - format!( - "The type of arrow timestamp microsecond array is not compatitable with iceberg type {}", - iceberg_type - ), - )), - } - } - - fn timestamp_nano( - &self, - array: &TimestampNanosecondArray, - iceberg_type: &PrimitiveType, - ) -> Result> { - match iceberg_type { - PrimitiveType::TimestampNs => Ok(array - .iter() - .map(|v| v.map(Literal::timestamp_nano)) - .collect()), - PrimitiveType::TimestamptzNs => Ok(array - .iter() - .map(|v| v.map(Literal::timestamptz_nano)) - .collect()), - _ => Err(Error::new( - ErrorKind::DataInvalid, - format!( - "The type of arrow timestamp nanosecond array is not compatitable with iceberg type {}", - iceberg_type - ), - )), - } - } - - fn string(&self, array: &StringArray, iceberg_type: &PrimitiveType) -> Result> { - match iceberg_type { - PrimitiveType::String => Ok(array.iter().map(|v| v.map(Literal::string)).collect()), - _ => Err(Error::new( - ErrorKind::DataInvalid, - format!( - "The type of arrow string array is not compatitable with iceberg type {}", - iceberg_type - ), - )), - } - } - - fn large_string( - &self, - array: &LargeStringArray, - iceberg_type: &PrimitiveType, - ) -> Result> { - match iceberg_type { - PrimitiveType::String => Ok(array.iter().map(|v| v.map(Literal::string)).collect()), - _ => Err(Error::new( - ErrorKind::DataInvalid, - format!( - "The type of arrow large string array is not compatitable with iceberg type {}", - iceberg_type - ), - )), - } - } - - fn binary(&self, array: &BinaryArray, iceberg_type: &PrimitiveType) -> Result> { - match iceberg_type { - PrimitiveType::Binary => Ok(array - .iter() - .map(|v| v.map(|v| Literal::binary(v.to_vec()))) - .collect()), - _ => Err(Error::new( + fn field( + &mut self, + field: &crate::spec::NestedFieldRef, + _partner: &ArrayRef, + value: Vec>, + ) -> Result>> { + // Make there is no null value if the field is required + if field.required && value.iter().any(Option::is_none) { + return Err(Error::new( ErrorKind::DataInvalid, - format!( - "The type of arrow binary array is not compatitable with iceberg type {}", - iceberg_type - ), - )), + "The field is required but has null value", + )); } + Ok(value) } - fn large_binary( - &self, - array: &LargeBinaryArray, - iceberg_type: &PrimitiveType, - ) -> Result> { - match iceberg_type { - PrimitiveType::Binary => Ok(array - .iter() - .map(|v| v.map(|v| Literal::binary(v.to_vec()))) - .collect()), - _ => Err(Error::new( + fn r#struct( + &mut self, + _struct: &StructType, + _partner: &ArrayRef, + results: Vec>>, + ) -> Result>> { + let row_len = results.first().map(|column| column.len()).unwrap_or(0); + if results.iter().any(|column| column.len() != row_len) { + return Err(Error::new( ErrorKind::DataInvalid, - format!( - "The type of arrow binary array is not compatitable with iceberg type {}", - iceberg_type - ), - )), + "The struct columns have different row length", + )); } - } - fn r#struct( - &self, - array: &StructArray, - _iceberg_type: &StructType, - columns: Vec>, - ) -> Result> { - let struct_literal_len = columns.first().map(|column| column.len()).unwrap_or(0); - let mut struct_literals = Vec::with_capacity(struct_literal_len); - let mut columns_iters = columns + let mut struct_literals = Vec::with_capacity(row_len); + let mut columns_iters = results .into_iter() .map(|column| column.into_iter()) .collect::>(); - for row_idx in 0..struct_literal_len { - if array.is_null(row_idx) { - struct_literals.push(None); - continue; - } + for _ in 0..row_len { let mut literals = Vec::with_capacity(columns_iters.len()); for column_iter in columns_iters.iter_mut() { literals.push(column_iter.next().unwrap()); @@ -374,306 +91,453 @@ impl ArrowArrayVistor for ArrowArrayConvert { Ok(struct_literals) } -} - -fn visit_arrow_struct_array( - array: &StructArray, - iceberg_type: &StructType, - visitor: &V, -) -> Result> { - let DataType::Struct(arrow_struct_fields) = array.data_type() else { - return Err(Error::new( - ErrorKind::DataInvalid, - "The type of arrow struct array is not a struct type", - )); - }; - if array.columns().len() != iceberg_type.fields().len() - || arrow_struct_fields.len() != iceberg_type.fields().len() - { - return Err(Error::new( - ErrorKind::DataInvalid, - "The type of arrow struct array is not compatitable with iceberg struct type", - )); + fn list( + &mut self, + list: &crate::spec::ListType, + _partner: &ArrayRef, + results: Vec>>, + ) -> Result>> { + if list.element_field.required { + if results.iter().any(|row| row.iter().any(Option::is_none)) { + return Err(Error::new( + ErrorKind::DataInvalid, + "The list should not have null value", + )); + } + } + Ok(results + .into_iter() + .map(|row| Some(Literal::List(row))) + .collect()) } - let mut columns = Vec::with_capacity(array.columns().len()); - - for ((array, arrow_field), iceberg_field) in array - .columns() - .iter() - .zip_eq(arrow_struct_fields.iter()) - .zip_eq(iceberg_type.fields().iter()) - { - let arrow_type = arrow_field.data_type(); - if arrow_field.is_nullable() == iceberg_field.required { + fn map( + &mut self, + map: &crate::spec::MapType, + _partner: &ArrayRef, + key_values: Vec>>, + values: Vec>>, + ) -> Result>> { + // Make sure key_value and value have the same row length + if key_values.len() != values.len() { return Err(Error::new( ErrorKind::DataInvalid, - "The nullable field of arrow struct array is not compatitable with iceberg type", + "The key value and value of map should have the same row length", )); } - match (arrow_type, iceberg_field.field_type.as_ref()) { - (DataType::Null, Type::Primitive(primitive_type)) => { - if iceberg_field.required { - return Err(Error::new( - ErrorKind::DataInvalid, - "column in arrow array should not be optional", - )); - } - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.null(array, primitive_type)?); - } - (DataType::Boolean, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.boolean(array, primitive_type)?); - } - (DataType::Int16, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.int16(array, primitive_type)?); - } - (DataType::Int32, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.int32(array, primitive_type)?); + + let mut result = Vec::with_capacity(key_values.len()); + for (key, value) in key_values.into_iter().zip(values.into_iter()) { + // Make sure key_value and value have the same length + if key.len() != value.len() { + return Err(Error::new( + ErrorKind::DataInvalid, + "The key value and value of map should have the same length", + )); } - (DataType::Int64, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.int64(array, primitive_type)?); + // Make sure no null value in key_value + if key.iter().any(Option::is_none) { + return Err(Error::new( + ErrorKind::DataInvalid, + "The key value of map should not have null value", + )); } - (DataType::Float32, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.float(array, primitive_type)?); + + // Make sure no null value in value if value field is required + if map.value_field.required && value.iter().any(Option::is_none) { + return Err(Error::new( + ErrorKind::DataInvalid, + "The value of map should not have null value", + )); } - (DataType::Float64, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.double(array, primitive_type)?); + + let mut map = Map::new(); + for (k, v) in key.into_iter().zip(value.into_iter()) { + map.insert(k.unwrap(), v.clone()); } - (DataType::Decimal128(_, _), Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.decimal(array, primitive_type)?); + result.push(Some(Literal::Map(map))); + } + + Ok(result) + } + + fn primitive(&mut self, p: &PrimitiveType, partner: &ArrayRef) -> Result>> { + if let Some(_) = partner.as_any().downcast_ref::() { + return Ok(vec![None; partner.len()]); + } + match p { + PrimitiveType::Boolean => { + let array = partner + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "The partner is not a boolean array") + })?; + Ok(array.iter().map(|v| v.map(Literal::bool)).collect()) + } + PrimitiveType::Int => { + let array = partner + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "The partner is not a int32 array") + })?; + Ok(array.iter().map(|v| v.map(Literal::int)).collect()) + } + PrimitiveType::Long => { + let array = partner + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "The partner is not a int64 array") + })?; + Ok(array.iter().map(|v| v.map(Literal::long)).collect()) + } + PrimitiveType::Float => { + let array = partner + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "The partner is not a float32 array") + })?; + Ok(array.iter().map(|v| v.map(Literal::float)).collect()) + } + PrimitiveType::Double => { + let array = partner + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "The partner is not a float64 array") + })?; + Ok(array.iter().map(|v| v.map(Literal::double)).collect()) } - (DataType::Date32, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.date(array, primitive_type)?); + PrimitiveType::Decimal { precision, scale } => { + let array = partner + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "The partner is not a decimal128 array", + ) + })?; + if let DataType::Decimal128(arrow_precision, arrow_scale) = array.data_type() { + if *arrow_precision as u32 != *precision || *arrow_scale as u32 != *scale { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The precision or scale ({},{}) of arrow decimal128 array is not compatitable with iceberg decimal type ({},{})", + arrow_precision, arrow_scale, precision, scale + ), + )); + } + } + Ok(array.iter().map(|v| v.map(Literal::decimal)).collect()) } - (DataType::Time64(TimeUnit::Microsecond), Type::Primitive(primitive_type)) => { - let array = array + PrimitiveType::Date => { + let array = partner + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "The partner is not a date32 array") + })?; + Ok(array.iter().map(|v| v.map(Literal::date)).collect()) + } + PrimitiveType::Time => { + let array = partner .as_any() .downcast_ref::() - .unwrap(); - columns.push(visitor.time(array, primitive_type)?); + .ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "The partner is not a time64 array") + })?; + Ok(array.iter().map(|v| v.map(Literal::time)).collect()) } - (DataType::Timestamp(TimeUnit::Microsecond, _), Type::Primitive(primitive_type)) => { - let array = array + PrimitiveType::Timestamp => { + let array = partner .as_any() .downcast_ref::() - .unwrap(); - columns.push(visitor.timestamp(array, primitive_type)?); - } - (DataType::Timestamp(TimeUnit::Nanosecond, _), Type::Primitive(primitive_type)) => { - let array = array + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "The partner is not a timestamp array", + ) + })?; + Ok(array.iter().map(|v| v.map(Literal::timestamp)).collect()) + } + PrimitiveType::Timestamptz => { + let array = partner + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "The partner is not a timestamptz array", + ) + })?; + Ok(array.iter().map(|v| v.map(Literal::timestamptz)).collect()) + } + PrimitiveType::TimestampNs => { + let array = partner .as_any() .downcast_ref::() - .unwrap(); - columns.push(visitor.timestamp_nano(array, primitive_type)?); - } - (DataType::Utf8, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.string(array, primitive_type)?); - } - (DataType::LargeUtf8, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.large_string(array, primitive_type)?); - } - (DataType::Binary, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.binary(array, primitive_type)?); - } - (DataType::LargeBinary, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.large_binary(array, primitive_type)?); + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "The partner is not a timestamp_ns array", + ) + })?; + Ok(array + .iter() + .map(|v| v.map(Literal::timestamp_nano)) + .collect()) + } + PrimitiveType::TimestamptzNs => { + let array = partner + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "The partner is not a timestamptz_ns array", + ) + })?; + Ok(array + .iter() + .map(|v| v.map(Literal::timestamptz_nano)) + .collect()) + } + PrimitiveType::String => { + if let Some(array) = partner.as_any().downcast_ref::() { + Ok(array.iter().map(|v| v.map(Literal::string)).collect()) + } else if let Some(array) = partner.as_any().downcast_ref::() { + Ok(array.iter().map(|v| v.map(Literal::string)).collect()) + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "The partner is not a string array", + )); + } } - (DataType::Struct(_), Type::Struct(struct_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visit_arrow_struct_array(array, struct_type, visitor)?); + PrimitiveType::Uuid => { + if let Some(array) = partner.as_any().downcast_ref::() { + if array.value_length() != 16 { + return Err(Error::new( + ErrorKind::DataInvalid, + "The partner is not a uuid array", + )); + } + Ok(array + .iter() + .map(|v| { + v.map(|v| { + Ok(Literal::uuid(Uuid::from_bytes(v.try_into().map_err( + |_| { + Error::new( + ErrorKind::DataInvalid, + "Failed to convert binary to uuid", + ) + }, + )?))) + }) + .transpose() + }) + .collect::>>()?) + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "The partner is not a uuid array", + )); + } } - (arrow_type, iceberg_field_type) => { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - format!( - "Unsupported convert arrow type {} to iceberg type: {}", - arrow_type, iceberg_field_type - ), - )) + PrimitiveType::Fixed(len) => { + let array = partner + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "The partner is not a fixed array") + })?; + if array.value_length() != *len as i32 { + return Err(Error::new( + ErrorKind::DataInvalid, + "The length of fixed size binary array is not compatitable with iceberg fixed type", + )); + } + Ok(array + .iter() + .map(|v| v.map(|v| Literal::fixed(v.iter().cloned()))) + .collect()) + } + PrimitiveType::Binary => { + if let Some(array) = partner.as_any().downcast_ref::() { + Ok(array + .iter() + .map(|v| v.map(|v| Literal::binary(v.to_vec()))) + .collect()) + } else if let Some(array) = partner.as_any().downcast_ref::() { + Ok(array + .iter() + .map(|v| v.map(|v| Literal::binary(v.to_vec()))) + .collect()) + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "The partner is not a binary array", + )); + } } } } - visitor.r#struct(array, iceberg_type, columns) + fn visit_type_before( + &mut self, + _ty: &crate::spec::Type, + partner: &ArrayRef, + ) -> Result>>> { + if let Some(_) = partner.as_any().downcast_ref::() { + return Ok(Some(vec![None; partner.len()])); + } + Ok(None) + } } -// # TODO -// Add support for fulfill the missing field in arrow struct array -fn visit_arrow_struct_array_from_field_id( - array: &StructArray, - iceberg_type: &StructType, - visitor: &V, -) -> Result> { - let DataType::Struct(arrow_struct_fields) = array.data_type() else { - return Err(Error::new( - ErrorKind::DataInvalid, - "The type of arrow struct array is not a struct type", - )); - }; +struct ArrowArrayAccessor; + +impl PartnerAccessor for ArrowArrayAccessor { + type L = ArrowArrayListIterator; + type M = ArrowArrayMapIterator; - if array.columns().len() < iceberg_type.fields().len() - || arrow_struct_fields.len() < iceberg_type.fields().len() - { - return Err(Error::new( - ErrorKind::DataInvalid, - "The type of arrow struct array is not compatitable with iceberg struct type", - )); + fn struct_parner<'a>(&self, schema_partner: &'a ArrayRef) -> Result<&'a ArrayRef> { + if !matches!(schema_partner.data_type(), DataType::Struct(_)) { + return Err(Error::new( + ErrorKind::DataInvalid, + "The schema partner is not a struct type", + )); + } + Ok(schema_partner) } - let mut columns = Vec::with_capacity(array.columns().len()); + fn field_partner<'a>( + &self, + struct_partner: &'a ArrayRef, + field_id: i32, + _field_name: &str, + ) -> Result<&'a ArrayRef> { + let struct_array = struct_partner + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "The struct partner is not a struct array", + ) + })?; + let field_pos = struct_array + .fields() + .iter() + .position(|field| { + get_field_id(field) + .map(|id| id == field_id) + .unwrap_or(false) + }) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Field id {} not found in struct array", field_id), + ) + })?; + Ok(struct_array.column(field_pos)) + } - for iceberg_field in iceberg_type.fields() { - let Some((idx, field)) = arrow_struct_fields.iter().enumerate().find(|(_idx, f)| { - f.metadata() - .get(PARQUET_FIELD_ID_META_KEY) - .and_then(|id| id.parse::().ok().map(|id: i32| id == iceberg_field.id)) - .unwrap_or(false) - }) else { + fn list_element_partner<'a>( + &self, + list_partner: &'a ArrayRef, + ) -> Result { + if !matches!( + list_partner.data_type(), + DataType::List(_) | DataType::LargeList(_) | DataType::FixedSizeList(_, _) + ) { return Err(Error::new( ErrorKind::DataInvalid, - format!( - "The field {} in iceberg struct type is not found in arrow struct type", - iceberg_field.name - ), + "The list partner is not a list type", )); - }; - let array = array.column(idx); - let arrow_type = field.data_type(); - if field.is_nullable() == iceberg_field.required { + } + Ok(ArrowArrayListIterator { + array: list_partner.clone(), + index: 0, + }) + } + + fn map_element_partner<'a>(&self, map_partner: &'a ArrayRef) -> Result { + if !matches!(map_partner.data_type(), DataType::Map(_, _)) { return Err(Error::new( ErrorKind::DataInvalid, - "The nullable field of arrow struct array is not compatitable with iceberg type", + "The map partner is not a map type", )); } - match (arrow_type, iceberg_field.field_type.as_ref()) { - (DataType::Null, Type::Primitive(primitive_type)) => { - if iceberg_field.required { - return Err(Error::new( - ErrorKind::DataInvalid, - "column in arrow array should not be optional", - )); - } - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.null(array, primitive_type)?); - } - (DataType::Boolean, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.boolean(array, primitive_type)?); - } - (DataType::Int16, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.int16(array, primitive_type)?); - } - (DataType::Int32, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.int32(array, primitive_type)?); - } - (DataType::Int64, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.int64(array, primitive_type)?); - } - (DataType::Float32, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.float(array, primitive_type)?); - } - (DataType::Float64, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.double(array, primitive_type)?); - } - (DataType::Decimal128(_, _), Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.decimal(array, primitive_type)?); - } - (DataType::Date32, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.date(array, primitive_type)?); - } - (DataType::Time64(TimeUnit::Microsecond), Type::Primitive(primitive_type)) => { - let array = array - .as_any() - .downcast_ref::() - .unwrap(); - columns.push(visitor.time(array, primitive_type)?); - } - (DataType::Timestamp(TimeUnit::Microsecond, _), Type::Primitive(primitive_type)) => { - let array = array - .as_any() - .downcast_ref::() - .unwrap(); - columns.push(visitor.timestamp(array, primitive_type)?); - } - (DataType::Timestamp(TimeUnit::Nanosecond, _), Type::Primitive(primitive_type)) => { - let array = array - .as_any() - .downcast_ref::() - .unwrap(); - columns.push(visitor.timestamp_nano(array, primitive_type)?); - } - (DataType::Utf8, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.string(array, primitive_type)?); - } - (DataType::LargeUtf8, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.large_string(array, primitive_type)?); - } - (DataType::Binary, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.binary(array, primitive_type)?); - } - (DataType::LargeBinary, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visitor.large_binary(array, primitive_type)?); - } - (DataType::Struct(_), Type::Struct(struct_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(visit_arrow_struct_array(array, struct_type, visitor)?); - } - (arrow_type, iceberg_field_type) => { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - format!( - "Unsupported convert arrow type {} to iceberg type: {}", - arrow_type, iceberg_field_type - ), - )) - } + Ok(ArrowArrayMapIterator { + array: map_partner.clone(), + index: 0, + }) + } +} + +struct ArrowArrayListIterator { + array: ArrayRef, + index: usize, +} + +impl ListPartnerIterator for ArrowArrayListIterator { + fn next(&mut self) -> Option { + if self.index >= self.array.len() { + return None; + } + if let Some(array) = self.array.as_any().downcast_ref::() { + let result = Some(array.value(self.index)); + self.index += 1; + result + } else if let Some(array) = self.array.as_any().downcast_ref::() { + let result = Some(array.value(self.index)); + self.index += 1; + result + } else if let Some(array) = self.array.as_any().downcast_ref::() { + let result = Some(array.value(self.index)); + self.index += 1; + result + } else { + None } } +} - visitor.r#struct(array, iceberg_type, columns) +struct ArrowArrayMapIterator { + array: ArrayRef, + index: usize, } -/// Convert arrow struct array to iceberg struct value array. -/// This function will assume the schema of arrow struct array is the same as iceberg struct type. -pub fn arrow_struct_to_literal( - struct_array: &StructArray, - ty: StructType, -) -> Result>> { - visit_arrow_struct_array(struct_array, &ty, &ArrowArrayConvert) +impl MapPartnerIterator for ArrowArrayMapIterator { + fn next(&mut self) -> Option<(ArrayRef, ArrayRef)> { + if let Some(array) = self.array.as_any().downcast_ref::() { + let entry = array.value(self.index); + Some((entry.column(0).clone(), entry.column(1).clone())) + } else { + None + } + } } /// Convert arrow struct array to iceberg struct value array. -/// This function will use field id to find the corresponding field in arrow struct array. -pub fn arrow_struct_to_literal_from_field_id( - struct_array: &StructArray, - ty: StructType, +/// This function will assume the schema of arrow struct array is the same as iceberg struct type. +pub fn arrow_struct_to_literal( + struct_array: &ArrayRef, + ty: &StructType, ) -> Result>> { - visit_arrow_struct_array_from_field_id(struct_array, &ty, &ArrowArrayConvert) + visit_struct_with_partner( + ty, + struct_array, + &mut ArrowArrayConverter, + &ArrowArrayAccessor, + ) } #[cfg(test)] @@ -687,6 +551,7 @@ mod test { Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray, }; use arrow_schema::{DataType, Field, Fields, TimeUnit}; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use super::*; use crate::spec::{Literal, NestedField, PrimitiveType, StructType, Type}; @@ -718,7 +583,7 @@ mod test { let binary_array = BinaryArray::from(vec![Some(b"abc".as_ref()), Some(b"def".as_ref()), None]); - let struct_array = StructArray::from(vec![ + let struct_array = Arc::new(StructArray::from(vec![ ( Arc::new(Field::new("bool_field", DataType::Boolean, true)), Arc::new(bool_array) as ArrayRef, @@ -787,7 +652,7 @@ mod test { Arc::new(Field::new("binary_field", DataType::Binary, true)), Arc::new(binary_array) as ArrayRef, ), - ]); + ])) as ArrayRef; let iceberg_struct_type = StructType::new(vec![ Arc::new(NestedField::optional( @@ -860,7 +725,7 @@ mod test { )), ]); - let result = arrow_struct_to_literal(&struct_array, iceberg_struct_type).unwrap(); + let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap(); assert_eq!(result, vec![ Some(Literal::Struct(Struct::from_iter(vec![ @@ -901,24 +766,24 @@ mod test { #[test] fn test_single_column_nullable_struct() { - let struct_array = StructArray::new_null( + let struct_array = Arc::new(StructArray::new_null( Fields::from(vec![Field::new("bool_field", DataType::Boolean, true)]), 3, - ); + )) as ArrayRef; let iceberg_struct_type = StructType::new(vec![Arc::new(NestedField::optional( 0, "bool_field", Type::Primitive(PrimitiveType::Boolean), ))]); - let result = arrow_struct_to_literal(&struct_array, iceberg_struct_type).unwrap(); + let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap(); assert_eq!(result, vec![None; 3]); } #[test] fn test_empty_struct() { - let struct_array = StructArray::new_null(Fields::empty(), 3); + let struct_array = Arc::new(StructArray::new_null(Fields::empty(), 3)) as ArrayRef; let iceberg_struct_type = StructType::new(vec![]); - let result = arrow_struct_to_literal(&struct_array, iceberg_struct_type).unwrap(); + let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap(); assert_eq!(result, vec![None; 0]); } @@ -929,7 +794,7 @@ mod test { let int32_array = Int32Array::from(vec![Some(3), Some(4), None]); let int64_array = Int64Array::from(vec![Some(5), Some(6), None]); let float32_array = Float32Array::from(vec![Some(1.1), Some(2.2), None]); - let struct_array = StructArray::from(vec![ + let struct_array = Arc::new(StructArray::from(vec![ ( Arc::new( Field::new("bool_field", DataType::Boolean, true).with_metadata(HashMap::from( @@ -970,7 +835,7 @@ mod test { ), Arc::new(float32_array) as ArrayRef, ), - ]); + ])) as ArrayRef; let struct_type = StructType::new(vec![ Arc::new(NestedField::optional( 1, @@ -993,7 +858,7 @@ mod test { Type::Primitive(PrimitiveType::Int), )), ]); - let result = arrow_struct_to_literal_from_field_id(&struct_array, struct_type).unwrap(); + let result = arrow_struct_to_literal(&struct_array, &struct_type).unwrap(); assert_eq!(result, vec![ Some(Literal::Struct(Struct::from_iter(vec![ Some(Literal::int(1)), diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index 649b6b2c4..688bbe65d 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -1132,6 +1132,213 @@ impl ReassignFieldIds { } } +/// A post order schema visitor with partner. +/// +/// For order of methods called, please refer to [`visit_schema_with_partner`]. +pub trait SchemaWithPartnerVisitor

{ + /// Return type of this visitor. + type T; + + /// Called before struct field. + fn before_struct_field(&mut self, _field: &NestedFieldRef, _partner: &P) -> Result<()> { + Ok(()) + } + /// Called after struct field. + fn after_struct_field(&mut self, _field: &NestedFieldRef, _partner: &P) -> Result<()> { + Ok(()) + } + /// Called before list field. + fn before_list_element(&mut self, _field: &NestedFieldRef, _partner: &P) -> Result<()> { + Ok(()) + } + /// Called after list field. + fn after_list_element(&mut self, _field: &NestedFieldRef, _partner: &P) -> Result<()> { + Ok(()) + } + /// Called before map key field. + fn before_map_key(&mut self, _field: &NestedFieldRef, _partner: &P) -> Result<()> { + Ok(()) + } + /// Called after map key field. + fn after_map_key(&mut self, _field: &NestedFieldRef, _partner: &P) -> Result<()> { + Ok(()) + } + /// Called before map value field. + fn before_map_value(&mut self, _field: &NestedFieldRef, _partner: &P) -> Result<()> { + Ok(()) + } + /// Called after map value field. + fn after_map_value(&mut self, _field: &NestedFieldRef, _partner: &P) -> Result<()> { + Ok(()) + } + + /// Called before every type, if this function return `Some`, the following visiting will be skipped. + /// This function used to implement early return. + fn visit_type_before(&mut self, _ty: &Type, _partner: &P) -> Result> { + return Ok(None); + } + + /// Called after schema's type visited. + fn schema(&mut self, schema: &Schema, partner: &P, value: Self::T) -> Result; + /// Called after struct's field type visited. + fn field(&mut self, field: &NestedFieldRef, partner: &P, value: Self::T) -> Result; + /// Called after struct's fields visited. + fn r#struct( + &mut self, + r#struct: &StructType, + partner: &P, + results: Vec, + ) -> Result; + /// Called after list fields visited. + fn list(&mut self, list: &ListType, partner: &P, value: Vec) -> Result; + /// Called after map's key and value fields visited. + fn map( + &mut self, + map: &MapType, + partner: &P, + key_value: Vec, + value: Vec, + ) -> Result; + /// Called when see a primitive type. + fn primitive(&mut self, p: &PrimitiveType, partner: &P) -> Result; +} + +/// Accessor used to get child partner from parent partner. +pub trait PartnerAccessor

{ + /// List partner iterator. + type L: ListPartnerIterator

; + /// Map partner iterator. + type M: MapPartnerIterator

; + + /// Get the struct partner from schema partner. + fn struct_parner<'a>(&self, schema_partner: &'a P) -> Result<&'a P>; + /// Get the field partner from struct partner. + fn field_partner<'a>(&self, struct_partner: &'a P, field_id: i32, field: &str) + -> Result<&'a P>; + /// Get the list element partner from list partner. + fn list_element_partner<'a>(&self, list_partner: &'a P) -> Result; + /// Get the map key partner from map partner. + fn map_element_partner<'a>(&self, map_partner: &'a P) -> Result; +} + +/// Iterator for list partner. +pub trait ListPartnerIterator

{ + /// Get the next partner. + fn next(&mut self) -> Option

; +} + +/// Iterator for map partner. +pub trait MapPartnerIterator

{ + /// Get the next partner. + fn next(&mut self) -> Option<(P, P)>; +} + +/// Visiting a type in post order. +pub fn visit_type_with_partner, A: PartnerAccessor

>( + r#type: &Type, + partner: &P, + visitor: &mut V, + accessor: &A, +) -> Result { + if let Some(res) = visitor.visit_type_before(r#type, partner)? { + return Ok(res); + } + match r#type { + Type::Primitive(p) => visitor.primitive(p, partner), + Type::List(list) => { + let mut results = Vec::new(); + let mut list_element_partner_iter = accessor.list_element_partner(partner)?; + if let Some(list_element_partner) = list_element_partner_iter.next() { + visitor.before_list_element(&list.element_field, &list_element_partner)?; + let value = visit_type_with_partner( + &list.element_field.field_type, + &list_element_partner, + visitor, + accessor, + )?; + visitor.after_list_element(&list.element_field, &list_element_partner)?; + results.push(value); + } + visitor.list(list, partner, results) + } + Type::Map(map) => { + let mut k_results = Vec::new(); + let mut v_results = Vec::new(); + let mut kv_partner_iter = accessor.map_element_partner(partner)?; + if let Some((k_partner, v_partner)) = kv_partner_iter.next() { + let key_result = { + visitor.before_map_key(&map.key_field, &k_partner)?; + let ret = visit_type_with_partner( + &map.key_field.field_type, + &k_partner, + visitor, + accessor, + )?; + visitor.after_map_key(&map.key_field, &k_partner)?; + ret + }; + + let value_result = { + visitor.before_map_value(&map.value_field, &v_partner)?; + let ret = visit_type_with_partner( + &map.value_field.field_type, + &v_partner, + visitor, + accessor, + )?; + visitor.after_map_value(&map.value_field, &v_partner)?; + ret + }; + + k_results.push(key_result); + v_results.push(value_result); + } + + visitor.map(map, partner, k_results, v_results) + } + Type::Struct(s) => visit_struct_with_partner(s, partner, visitor, accessor), + } +} + +/// Visit struct type in post order. +pub fn visit_struct_with_partner, A: PartnerAccessor

>( + s: &StructType, + partner: &P, + visitor: &mut V, + accessor: &A, +) -> Result { + if let Some(res) = visitor.visit_type_before(&Type::Struct(s.clone()), partner)? { + return Ok(res); + } + let mut results = Vec::with_capacity(s.fields().len()); + for field in s.fields() { + let field_partner = accessor.field_partner(partner, field.id, &field.name)?; + visitor.before_struct_field(field, field_partner)?; + let result = visit_type_with_partner(&field.field_type, field_partner, visitor, accessor)?; + visitor.after_struct_field(field, field_partner)?; + let result = visitor.field(field, field_partner, result)?; + results.push(result); + } + + visitor.r#struct(s, partner, results) +} + +/// Visit schema in post order. +pub fn visit_schema_with_partner, A: PartnerAccessor

>( + schema: &Schema, + partner: &P, + visitor: &mut V, + accessor: &A, +) -> Result { + let result = visit_struct_with_partner( + &schema.r#struct, + accessor.struct_parner(partner)?, + visitor, + accessor, + )?; + visitor.schema(schema, partner, result) +} + pub(super) mod _serde { /// This is a helper module that defines types to help with serialization/deserialization. /// For deserialization the input first gets read into either the [SchemaV1] or [SchemaV2] struct