diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index 95712018b..004882608 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -16,10 +16,9 @@ // under the License. use arrow_array::{ - Array, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float16Array, Float32Array, - Float64Array, Int16Array, Int32Array, Int64Array, LargeBinaryArray, LargeStringArray, - StringArray, StructArray, Time64MicrosecondArray, TimestampMicrosecondArray, - TimestampNanosecondArray, + Array, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, + Int16Array, Int32Array, Int64Array, LargeBinaryArray, LargeStringArray, NullArray, StringArray, + StructArray, Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray, }; use arrow_schema::{DataType, TimeUnit}; use itertools::Itertools; @@ -27,194 +26,362 @@ use itertools::Itertools; use crate::spec::{Literal, PrimitiveType, Struct, StructType, Type}; use crate::{Error, ErrorKind, Result}; -trait ToIcebergLiteralArray { - fn to_primitive_literal_array( +trait ArrowArrayVistor { + type T; + fn null( &self, - _arrow_type: &DataType, - _iceberg_type: &PrimitiveType, - ) -> Result>>; - fn to_struct_literal_array( + array: &NullArray, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn boolean( &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>>; -} - -impl ToIcebergLiteralArray for BooleanArray { - fn to_primitive_literal_array( + array: &BooleanArray, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn int16( &self, - _arrow_type: &DataType, + array: &Int16Array, + arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { - match iceberg_type { - PrimitiveType::Boolean => Ok(self.iter().map(|v| v.map(Literal::bool)).collect()), - _ => Err(Error::new( + ) -> Result>; + fn int32( + &self, + array: &Int32Array, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn int64( + &self, + array: &Int64Array, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn float( + &self, + array: &Float32Array, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn double( + &self, + array: &Float64Array, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn decimal( + &self, + array: &Decimal128Array, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn date( + &self, + array: &Date32Array, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn time( + &self, + array: &Time64MicrosecondArray, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn timestamp( + &self, + array: &TimestampMicrosecondArray, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn timestamp_nano( + &self, + array: &TimestampNanosecondArray, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn string( + &self, + array: &StringArray, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn large_string( + &self, + array: &LargeStringArray, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn binary( + &self, + array: &BinaryArray, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn large_binary( + &self, + array: &LargeBinaryArray, + arrow_type: &DataType, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn combine_struct( + &self, + array: &StructArray, + columns: Vec>, + ) -> Result>; + fn r#struct( + &self, + array: &StructArray, + arrow_type: &DataType, + iceberg_type: &StructType, + ) -> Result> { + let DataType::Struct(arrow_struct_fields) = arrow_type else { + return Err(Error::new( ErrorKind::DataInvalid, - format!( - "The type of arrow boolean array is not compatitable with iceberg type {}", - iceberg_type - ), - )), + "The type of arrow struct array is not a struct type", + )); + }; + + if array.columns().len() != iceberg_type.fields().len() + || arrow_struct_fields.len() != iceberg_type.fields().len() + { + return Err(Error::new( + ErrorKind::DataInvalid, + "The type of arrow struct array is not compatitable with iceberg struct type", + )); + } + + let mut columns = Vec::with_capacity(array.columns().len()); + + for ((array, arrow_type), iceberg_field) in array + .columns() + .iter() + .zip_eq(arrow_struct_fields.iter().map(|field| field.data_type())) + .zip_eq(iceberg_type.fields().iter()) + { + if array.is_nullable() == iceberg_field.required { + return Err(Error::new( + ErrorKind::DataInvalid, + "The nullable field of arrow struct array is not compatitable with iceberg type", + )); + } + match (arrow_type, iceberg_field.field_type.as_ref()) { + (DataType::Null, Type::Primitive(primitive_type)) => { + if iceberg_field.required { + return Err(Error::new( + ErrorKind::DataInvalid, + "column in arrow array should not be optional", + )); + } + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.null(array, arrow_type, primitive_type)?); + } + (DataType::Boolean, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.boolean(array, arrow_type, primitive_type)?); + } + (DataType::Int16, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.int16(array, arrow_type, primitive_type)?); + } + (DataType::Int32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.int32(array, arrow_type, primitive_type)?); + } + (DataType::Int64, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.int64(array, arrow_type, primitive_type)?); + } + (DataType::Float32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.float(array, arrow_type, primitive_type)?); + } + (DataType::Float64, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.double(array, arrow_type, primitive_type)?); + } + (DataType::Decimal128(_, _), Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.decimal(array, arrow_type, primitive_type)?); + } + (DataType::Date32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.date(array, arrow_type, primitive_type)?); + } + (DataType::Time64(TimeUnit::Microsecond), Type::Primitive(primitive_type)) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(self.time(array, arrow_type, primitive_type)?); + } + ( + DataType::Timestamp(TimeUnit::Microsecond, _), + Type::Primitive(primitive_type), + ) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(self.timestamp(array, arrow_type, primitive_type)?); + } + (DataType::Timestamp(TimeUnit::Nanosecond, _), Type::Primitive(primitive_type)) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(self.timestamp_nano(array, arrow_type, primitive_type)?); + } + (DataType::Utf8, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.string(array, arrow_type, primitive_type)?); + } + (DataType::LargeUtf8, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.large_string(array, arrow_type, primitive_type)?); + } + (DataType::Binary, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.binary(array, arrow_type, primitive_type)?); + } + (DataType::LargeBinary, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.large_binary(array, arrow_type, primitive_type)?); + } + (DataType::Struct(_), Type::Struct(struct_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(self.r#struct(array, arrow_type, struct_type)?); + } + (arrow_type, iceberg_field_type) => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + format!( + "Unsupported convert arrow type {} to iceberg type: {}", + arrow_type, iceberg_field_type + ), + )) + } + } } + + self.combine_struct(array, columns) } +} + +struct LiteralArrayVisitor; + +impl ArrowArrayVistor for LiteralArrayVisitor { + type T = Option; - fn to_struct_literal_array( + fn null( &self, + array: &NullArray, _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() + _iceberg_type: &PrimitiveType, + ) -> Result> { + Ok(vec![None; array.len()]) } -} -impl ToIcebergLiteralArray for Int16Array { - fn to_primitive_literal_array( + fn boolean( &self, + array: &BooleanArray, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::Int => Ok(self.iter().map(|v| v.map(Literal::int)).collect()), - PrimitiveType::Long => Ok(self.iter().map(|v| v.map(Literal::long)).collect()), + PrimitiveType::Boolean => Ok(array.iter().map(|v| v.map(Literal::bool)).collect()), _ => Err(Error::new( ErrorKind::DataInvalid, format!( - "The type of arrow int16 array is not compatitable with iceberg type {}", + "The type of arrow boolean array is not compatitable with iceberg type {}", iceberg_type ), )), } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for Int32Array { - fn to_primitive_literal_array( + fn int16( &self, + array: &Int16Array, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::Int => Ok(self.iter().map(|v| v.map(Literal::int)).collect()), - PrimitiveType::Long => Ok(self.iter().map(|v| v.map(Literal::long)).collect()), + PrimitiveType::Int => Ok(array.iter().map(|v| v.map(Literal::int)).collect()), + PrimitiveType::Long => Ok(array.iter().map(|v| v.map(Literal::long)).collect()), _ => Err(Error::new( ErrorKind::DataInvalid, format!( - "The type of arrow int32 array is not compatitable with iceberg type {}", + "The type of arrow int16 array is not compatitable with iceberg type {}", iceberg_type ), )), } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for Int64Array { - fn to_primitive_literal_array( + fn int32( &self, + array: &Int32Array, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::Long => Ok(self.iter().map(|v| v.map(Literal::long)).collect()), + PrimitiveType::Int => Ok(array.iter().map(|v| v.map(Literal::int)).collect()), + PrimitiveType::Long => Ok(array.iter().map(|v| v.map(Literal::long)).collect()), _ => Err(Error::new( ErrorKind::DataInvalid, format!( - "The type of arrow int64 array is not compatitable with iceberg type {}", + "The type of arrow int32 array is not compatitable with iceberg type {}", iceberg_type ), )), } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for Float16Array { - fn to_primitive_literal_array( + fn int64( &self, + array: &Int64Array, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::Float => Ok(self - .iter() - .map(|v| v.map(|v| Literal::float(v.to_f32()))) - .collect()), + PrimitiveType::Long => Ok(array.iter().map(|v| v.map(Literal::long)).collect()), _ => Err(Error::new( ErrorKind::DataInvalid, format!( - "The type of arrow float16 array is not compatitable with iceberg type {}", + "The type of arrow int64 array is not compatitable with iceberg type {}", iceberg_type ), )), } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for Float32Array { - fn to_primitive_literal_array( + fn float( &self, + array: &Float32Array, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::Float => Ok(self.iter().map(|v| v.map(Literal::float)).collect()), + PrimitiveType::Float => Ok(array.iter().map(|v| v.map(Literal::float)).collect()), _ => Err(Error::new( ErrorKind::DataInvalid, format!( - "The type of arrow float32 array is not compatitable with iceberg type {}", + "The type of arrow float16 array is not compatitable with iceberg type {}", iceberg_type ), )), } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for Float64Array { - fn to_primitive_literal_array( + fn double( &self, + array: &Float64Array, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::Double => Ok(self.iter().map(|v| v.map(Literal::double)).collect()), + PrimitiveType::Double => Ok(array.iter().map(|v| v.map(Literal::double)).collect()), _ => Err(Error::new( ErrorKind::DataInvalid, format!( @@ -225,21 +392,12 @@ impl ToIcebergLiteralArray for Float64Array { } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for Decimal128Array { - fn to_primitive_literal_array( + fn decimal( &self, + array: &Decimal128Array, arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { let DataType::Decimal128(arrow_precision, arrow_scale) = arrow_type else { unreachable!() }; @@ -254,7 +412,7 @@ impl ToIcebergLiteralArray for Decimal128Array { ), )); } - Ok(self.iter().map(|v| v.map(Literal::decimal)).collect()) + Ok(array.iter().map(|v| v.map(Literal::decimal)).collect()) } _ => Err(Error::new( ErrorKind::DataInvalid, @@ -266,23 +424,14 @@ impl ToIcebergLiteralArray for Decimal128Array { } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for Date32Array { - fn to_primitive_literal_array( + fn date( &self, + array: &Date32Array, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::Date => Ok(self.iter().map(|v| v.map(Literal::date)).collect()), + PrimitiveType::Date => Ok(array.iter().map(|v| v.map(Literal::date)).collect()), _ => Err(Error::new( ErrorKind::DataInvalid, format!( @@ -293,23 +442,14 @@ impl ToIcebergLiteralArray for Date32Array { } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for Time64MicrosecondArray { - fn to_primitive_literal_array( + fn time( &self, + array: &Time64MicrosecondArray, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::Time => Ok(self + PrimitiveType::Time => Ok(array .iter() .map(|v| v.map(Literal::time)) .collect()), @@ -323,27 +463,18 @@ impl ToIcebergLiteralArray for Time64MicrosecondArray { } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for TimestampMicrosecondArray { - fn to_primitive_literal_array( + fn timestamp( &self, + array: &TimestampMicrosecondArray, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::Timestamp => Ok(self + PrimitiveType::Timestamp => Ok(array .iter() .map(|v| v.map(Literal::timestamp)) .collect()), - PrimitiveType::Timestamptz => Ok(self + PrimitiveType::Timestamptz => Ok(array .iter() .map(|v| v.map(Literal::timestamptz)) .collect()), @@ -357,27 +488,18 @@ impl ToIcebergLiteralArray for TimestampMicrosecondArray { } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for TimestampNanosecondArray { - fn to_primitive_literal_array( + fn timestamp_nano( &self, + array: &TimestampNanosecondArray, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::TimestampNs => Ok(self + PrimitiveType::TimestampNs => Ok(array .iter() .map(|v| v.map(Literal::timestamp_nano)) .collect()), - PrimitiveType::TimestamptzNs => Ok(self + PrimitiveType::TimestamptzNs => Ok(array .iter() .map(|v| v.map(Literal::timestamptz_nano)) .collect()), @@ -391,23 +513,14 @@ impl ToIcebergLiteralArray for TimestampNanosecondArray { } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for StringArray { - fn to_primitive_literal_array( + fn string( &self, + array: &StringArray, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::String => Ok(self.iter().map(|v| v.map(Literal::string)).collect()), + PrimitiveType::String => Ok(array.iter().map(|v| v.map(Literal::string)).collect()), _ => Err(Error::new( ErrorKind::DataInvalid, format!( @@ -418,23 +531,14 @@ impl ToIcebergLiteralArray for StringArray { } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for LargeStringArray { - fn to_primitive_literal_array( + fn large_string( &self, + array: &LargeStringArray, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::String => Ok(self.iter().map(|v| v.map(Literal::string)).collect()), + PrimitiveType::String => Ok(array.iter().map(|v| v.map(Literal::string)).collect()), _ => Err(Error::new( ErrorKind::DataInvalid, format!( @@ -445,23 +549,14 @@ impl ToIcebergLiteralArray for LargeStringArray { } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for BinaryArray { - fn to_primitive_literal_array( + fn binary( &self, + array: &BinaryArray, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::Binary => Ok(self + PrimitiveType::Binary => Ok(array .iter() .map(|v| v.map(|v| Literal::binary(v.to_vec()))) .collect()), @@ -475,187 +570,32 @@ impl ToIcebergLiteralArray for BinaryArray { } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for LargeBinaryArray { - fn to_primitive_literal_array( + fn large_binary( &self, + array: &LargeBinaryArray, _arrow_type: &DataType, iceberg_type: &PrimitiveType, - ) -> Result>> { + ) -> Result> { match iceberg_type { - PrimitiveType::Binary => Ok(self + PrimitiveType::Binary => Ok(array .iter() .map(|v| v.map(|v| Literal::binary(v.to_vec()))) .collect()), _ => Err(Error::new( ErrorKind::DataInvalid, format!( - "The type of arrow large binary array is not compatitable with iceberg type {}", + "The type of arrow binary array is not compatitable with iceberg type {}", iceberg_type ), )), } } - fn to_struct_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &StructType, - ) -> Result>> { - unreachable!() - } -} - -impl ToIcebergLiteralArray for StructArray { - fn to_primitive_literal_array( - &self, - _arrow_type: &DataType, - _iceberg_type: &PrimitiveType, - ) -> Result>> { - unreachable!() - } - - fn to_struct_literal_array( + fn combine_struct( &self, - arrow_type: &DataType, - iceberg_type: &StructType, - ) -> Result>> { - let DataType::Struct(arrow_struct_fields) = arrow_type else { - return Err(Error::new( - ErrorKind::DataInvalid, - "The type of arrow struct array is not a struct type", - )); - }; - - if self.columns().len() != iceberg_type.fields().len() - || arrow_struct_fields.len() != iceberg_type.fields().len() - { - return Err(Error::new( - ErrorKind::DataInvalid, - "The type of arrow struct array is not compatitable with iceberg struct type", - )); - } - - let mut columns = Vec::with_capacity(self.columns().len()); - - for ((array, arrow_type), iceberg_field) in self - .columns() - .iter() - .zip_eq(arrow_struct_fields.iter().map(|field| field.data_type())) - .zip_eq(iceberg_type.fields().iter()) - { - if array.is_nullable() == iceberg_field.required { - return Err(Error::new( - ErrorKind::DataInvalid, - "The nullable field of arrow struct array is not compatitable with iceberg type", - )); - } - match (arrow_type, iceberg_field.field_type.as_ref()) { - (DataType::Null, _) => { - if iceberg_field.required { - return Err(Error::new( - ErrorKind::DataInvalid, - "column in arrow array should not be optional", - )); - } - columns.push(vec![None; array.len()]); - } - (DataType::Boolean, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::Int16, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::Int32, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::Int64, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::Float32, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::Float64, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::Decimal128(_, _), Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::Date32, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::Time64(TimeUnit::Microsecond), Type::Primitive(primitive_type)) => { - let array = array - .as_any() - .downcast_ref::() - .unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - ( - DataType::Timestamp(TimeUnit::Microsecond, _), - Type::Primitive(primitive_type), - ) => { - let array = array - .as_any() - .downcast_ref::() - .unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::Timestamp(TimeUnit::Nanosecond, _), Type::Primitive(primitive_type)) => { - let array = array - .as_any() - .downcast_ref::() - .unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::Utf8, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::LargeUtf8, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::Binary, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::LargeBinary, Type::Primitive(primitive_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_primitive_literal_array(arrow_type, primitive_type)?); - } - (DataType::Struct(_), Type::Struct(struct_type)) => { - let array = array.as_any().downcast_ref::().unwrap(); - columns.push(array.to_struct_literal_array(arrow_type, struct_type)?); - } - (arrow_type, iceberg_field_type) => { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - format!( - "Unsupported convert arrow type {} to iceberg type: {}", - arrow_type, iceberg_field_type - ), - )) - } - } - } - + array: &StructArray, + columns: Vec>, + ) -> Result> { let struct_literal_len = columns.first().map(|column| column.len()).unwrap_or(0); let mut struct_literals = Vec::with_capacity(struct_literal_len); let mut columns_iters = columns @@ -664,7 +604,7 @@ impl ToIcebergLiteralArray for StructArray { .collect::>(); for row_idx in 0..struct_literal_len { - if self.is_null(row_idx) { + if array.is_null(row_idx) { struct_literals.push(None); continue; } @@ -680,11 +620,11 @@ impl ToIcebergLiteralArray for StructArray { } /// Convert arrow struct array to iceberg struct value array. -pub fn arrow_struct_to_iceberg_struct( +pub fn arrow_struct_to_literal( struct_array: &StructArray, ty: StructType, ) -> Result>> { - struct_array.to_struct_literal_array(struct_array.data_type(), &ty) + LiteralArrayVisitor.r#struct(struct_array, struct_array.data_type(), &ty) } #[cfg(test)] @@ -870,7 +810,7 @@ mod test { )), ]); - let result = arrow_struct_to_iceberg_struct(&struct_array, iceberg_struct_type).unwrap(); + let result = arrow_struct_to_literal(&struct_array, iceberg_struct_type).unwrap(); assert_eq!(result, vec![ Some(Literal::Struct(Struct::from_iter(vec![ @@ -920,7 +860,7 @@ mod test { "bool_field", Type::Primitive(PrimitiveType::Boolean), ))]); - let result = arrow_struct_to_iceberg_struct(&struct_array, iceberg_struct_type).unwrap(); + let result = arrow_struct_to_literal(&struct_array, iceberg_struct_type).unwrap(); assert_eq!(result, vec![None; 3]); } @@ -928,7 +868,7 @@ mod test { fn test_empty_struct() { let struct_array = StructArray::new_null(Fields::empty(), 3); let iceberg_struct_type = StructType::new(vec![]); - let result = arrow_struct_to_iceberg_struct(&struct_array, iceberg_struct_type).unwrap(); + let result = arrow_struct_to_literal(&struct_array, iceberg_struct_type).unwrap(); assert_eq!(result, vec![None; 0]); } }