diff --git a/Cargo.lock b/Cargo.lock index 4e60c3d96..a0d6d3132 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3099,6 +3099,7 @@ dependencies = [ "iceberg_test_utils", "parquet", "tokio", + "uuid", ] [[package]] diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index b4e15821f..6fcd59297 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -48,7 +48,7 @@ use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; -use crate::spec::{Datum, PrimitiveType, Schema}; +use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; @@ -273,6 +273,28 @@ impl ArrowReader { Ok((iceberg_field_ids, field_id_map)) } + /// Insert the leaf field id into the field_ids using for projection. + /// For nested type, it will recursively insert the leaf field id. + fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec) { + match field.field_type.as_ref() { + Type::Primitive(_) => { + field_ids.push(field.id); + } + Type::Struct(struct_type) => { + for nested_field in struct_type.fields() { + Self::include_leaf_field_id(nested_field, field_ids); + } + } + Type::List(list_type) => { + Self::include_leaf_field_id(&list_type.element_field, field_ids); + } + Type::Map(map_type) => { + Self::include_leaf_field_id(&map_type.key_field, field_ids); + Self::include_leaf_field_id(&map_type.value_field, field_ids); + } + } + } + fn get_arrow_projection_mask( field_ids: &[i32], iceberg_schema_of_task: &Schema, @@ -297,11 +319,21 @@ impl ArrowReader { scale: requested_scale, }), ) if requested_precision >= file_precision && file_scale == requested_scale => true, + // Uuid will be store as Fixed(16) in parquet file, so the read back type will be Fixed(16). + (Some(PrimitiveType::Fixed(16)), Some(PrimitiveType::Uuid)) => true, _ => false, } } - if field_ids.is_empty() { + let mut leaf_field_ids = vec![]; + for field_id in field_ids { + let field = iceberg_schema_of_task.field_by_id(*field_id); + if let Some(field) = field { + Self::include_leaf_field_id(field, &mut leaf_field_ids); + } + } + + if leaf_field_ids.is_empty() { Ok(ProjectionMask::all()) } else { // Build the map between field id and column index in Parquet schema. @@ -318,7 +350,7 @@ impl ArrowReader { .and_then(|field_id| i32::from_str(field_id).ok()) .map_or(false, |field_id| { projected_fields.insert((*f).clone(), field_id); - field_ids.contains(&field_id) + leaf_field_ids.contains(&field_id) }) }), arrow_schema.metadata().clone(), @@ -351,19 +383,26 @@ impl ArrowReader { true }); - if column_map.len() != field_ids.len() { + if column_map.len() != leaf_field_ids.len() { + let missing_fields = leaf_field_ids + .iter() + .filter(|field_id| !column_map.contains_key(field_id)) + .collect::>(); return Err(Error::new( ErrorKind::DataInvalid, format!( "Parquet schema {} and Iceberg schema {} do not match.", iceberg_schema, iceberg_schema_of_task ), - )); + ) + .with_context("column_map", format! {"{:?}", column_map}) + .with_context("field_ids", format! {"{:?}", leaf_field_ids}) + .with_context("missing_fields", format! {"{:?}", missing_fields})); } let mut indices = vec![]; - for field_id in field_ids { - if let Some(col_idx) = column_map.get(field_id) { + for field_id in leaf_field_ids { + if let Some(col_idx) = column_map.get(&field_id) { indices.push(*col_idx); } else { return Err(Error::new( diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index 142426f75..41afd8ea4 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -43,7 +43,9 @@ use crate::spec::{ use crate::{Error, ErrorKind}; /// When iceberg map type convert to Arrow map type, the default map field name is "key_value". -pub(crate) const DEFAULT_MAP_FIELD_NAME: &str = "key_value"; +pub const DEFAULT_MAP_FIELD_NAME: &str = "key_value"; +/// UTC time zone for Arrow timestamp type. +pub const UTC_TIME_ZONE: &str = "+00:00"; /// A post order arrow schema visitor. /// @@ -598,14 +600,14 @@ impl SchemaVisitor for ToArrowSchemaConverter { )), crate::spec::PrimitiveType::Timestamptz => Ok(ArrowSchemaOrFieldOrType::Type( // Timestampz always stored as UTC - DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + DataType::Timestamp(TimeUnit::Microsecond, Some(UTC_TIME_ZONE.into())), )), crate::spec::PrimitiveType::TimestampNs => Ok(ArrowSchemaOrFieldOrType::Type( DataType::Timestamp(TimeUnit::Nanosecond, None), )), crate::spec::PrimitiveType::TimestamptzNs => Ok(ArrowSchemaOrFieldOrType::Type( // Store timestamptz_ns as UTC - DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())), + DataType::Timestamp(TimeUnit::Nanosecond, Some(UTC_TIME_ZONE.into())), )), crate::spec::PrimitiveType::String => { Ok(ArrowSchemaOrFieldOrType::Type(DataType::Utf8)) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 6de541958..7e05da59a 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -248,7 +248,7 @@ impl<'a> TableScanBuilder<'a> { ) })?; - let field = schema + schema .as_struct() .field_by_id(field_id) .ok_or_else(|| { @@ -261,16 +261,6 @@ impl<'a> TableScanBuilder<'a> { ) })?; - if !field.field_type.is_primitive() { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - format!( - "Column {} is not a primitive type. Schema: {}", - column_name, schema - ), - )); - } - field_ids.push(field_id); } diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index 8b0205295..c806d16ea 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -36,9 +36,11 @@ use crate::spec::datatypes::_decimal::{MAX_PRECISION, REQUIRED_LENGTH}; use crate::spec::PrimitiveLiteral; /// Field name for list type. -pub(crate) const LIST_FILED_NAME: &str = "element"; -pub(crate) const MAP_KEY_FIELD_NAME: &str = "key"; -pub(crate) const MAP_VALUE_FIELD_NAME: &str = "value"; +pub const LIST_FIELD_NAME: &str = "element"; +/// Field name for map type's key. +pub const MAP_KEY_FIELD_NAME: &str = "key"; +/// Field name for map type's value. +pub const MAP_VALUE_FIELD_NAME: &str = "value"; pub(crate) const MAX_DECIMAL_BYTES: u32 = 24; pub(crate) const MAX_DECIMAL_PRECISION: u32 = 38; @@ -633,9 +635,9 @@ impl NestedField { /// Construct list type's element field. pub fn list_element(id: i32, field_type: Type, required: bool) -> Self { if required { - Self::required(id, LIST_FILED_NAME, field_type) + Self::required(id, LIST_FIELD_NAME, field_type) } else { - Self::optional(id, LIST_FILED_NAME, field_type) + Self::optional(id, LIST_FIELD_NAME, field_type) } } diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index 709c4cdae..f290441aa 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -30,7 +30,7 @@ use super::NestedField; use crate::error::Result; use crate::expr::accessor::StructAccessor; use crate::spec::datatypes::{ - ListType, MapType, NestedFieldRef, PrimitiveType, StructType, Type, LIST_FILED_NAME, + ListType, MapType, NestedFieldRef, PrimitiveType, StructType, Type, LIST_FIELD_NAME, MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME, }; use crate::{ensure_data_valid, Error, ErrorKind}; @@ -774,7 +774,7 @@ impl SchemaVisitor for IndexByName { } fn list(&mut self, list: &ListType, _value: Self::T) -> Result { - self.add_field(LIST_FILED_NAME, list.element_field.id) + self.add_field(LIST_FIELD_NAME, list.element_field.id) } fn map(&mut self, map: &MapType, _key_value: Self::T, _value: Self::T) -> Result { diff --git a/crates/integration_tests/Cargo.toml b/crates/integration_tests/Cargo.toml index a047d7580..4d98e9a35 100644 --- a/crates/integration_tests/Cargo.toml +++ b/crates/integration_tests/Cargo.toml @@ -33,3 +33,4 @@ iceberg-catalog-rest = { workspace = true } iceberg_test_utils = { path = "../test_utils", features = ["tests"] } parquet = { workspace = true } tokio = { workspace = true } +uuid = { workspace = true } diff --git a/crates/integration_tests/tests/scan_all_type.rs b/crates/integration_tests/tests/scan_all_type.rs new file mode 100644 index 000000000..804d554c8 --- /dev/null +++ b/crates/integration_tests/tests/scan_all_type.rs @@ -0,0 +1,369 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for rest catalog. + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_array::builder::{Int32Builder, ListBuilder, MapBuilder, StringBuilder}; +use arrow_array::{ + Array, ArrayRef, BooleanArray, Date32Array, Decimal128Array, FixedSizeBinaryArray, + Float32Array, Float64Array, Int32Array, Int64Array, LargeBinaryArray, MapArray, RecordBatch, + StringArray, StructArray, Time64MicrosecondArray, TimestampMicrosecondArray, +}; +use arrow_schema::{DataType, Field, Fields}; +use futures::TryStreamExt; +use iceberg::arrow::{DEFAULT_MAP_FIELD_NAME, UTC_TIME_ZONE}; +use iceberg::spec::{ + ListType, MapType, NestedField, PrimitiveType, Schema, StructType, Type, LIST_FIELD_NAME, + MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME, +}; +use iceberg::transaction::Transaction; +use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, +}; +use iceberg::writer::file_writer::ParquetWriterBuilder; +use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; +use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation}; +use iceberg_integration_tests::set_test_fixture; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; +use parquet::file::properties::WriterProperties; +use uuid::Uuid; + +#[tokio::test] +async fn test_scan_all_type() { + let fixture = set_test_fixture("test_scan_all_type").await; + + let ns = Namespace::with_properties( + NamespaceIdent::from_strs(["apple", "ios"]).unwrap(), + HashMap::from([ + ("owner".to_string(), "ray".to_string()), + ("community".to_string(), "apache".to_string()), + ]), + ); + + fixture + .rest_catalog + .create_namespace(ns.name(), ns.properties().clone()) + .await + .unwrap(); + let schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![2]) + .with_fields(vec![ + // test all type + NestedField::required(1, "int", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "long", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(3, "float", Type::Primitive(PrimitiveType::Float)).into(), + NestedField::required(4, "double", Type::Primitive(PrimitiveType::Double)).into(), + NestedField::required( + 5, + "decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 20, + scale: 5, + }), + ) + .into(), + NestedField::required(6, "string", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(7, "boolean", Type::Primitive(PrimitiveType::Boolean)).into(), + NestedField::required(8, "binary", Type::Primitive(PrimitiveType::Binary)).into(), + NestedField::required(9, "date", Type::Primitive(PrimitiveType::Date)).into(), + NestedField::required(10, "time", Type::Primitive(PrimitiveType::Time)).into(), + NestedField::required(11, "timestamp", Type::Primitive(PrimitiveType::Timestamp)) + .into(), + NestedField::required(12, "fixed", Type::Primitive(PrimitiveType::Fixed(10))).into(), + NestedField::required(13, "uuid", Type::Primitive(PrimitiveType::Uuid)).into(), + NestedField::required( + 14, + "timestamptz", + Type::Primitive(PrimitiveType::Timestamptz), + ) + .into(), + NestedField::required( + 15, + "struct", + Type::Struct(StructType::new(vec![ + NestedField::required(18, "int", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(19, "string", Type::Primitive(PrimitiveType::String)) + .into(), + ])), + ) + .into(), + NestedField::required( + 16, + "list", + Type::List(ListType::new( + NestedField::list_element(20, Type::Primitive(PrimitiveType::Int), true).into(), + )), + ) + .into(), + NestedField::required( + 17, + "map", + Type::Map(MapType::new( + NestedField::map_key_element(21, Type::Primitive(PrimitiveType::Int)).into(), + NestedField::map_value_element( + 22, + Type::Primitive(PrimitiveType::String), + true, + ) + .into(), + )), + ) + .into(), + ]) + .build() + .unwrap(); + + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema.clone()) + .build(); + + let table = fixture + .rest_catalog + .create_table(ns.name(), table_creation) + .await + .unwrap(); + + // Create the writer and write the data + let schema: Arc = Arc::new( + table + .metadata() + .current_schema() + .as_ref() + .try_into() + .unwrap(), + ); + let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); + let file_name_generator = DefaultFileNameGenerator::new( + "test".to_string(), + None, + iceberg::spec::DataFileFormat::Parquet, + ); + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + table.metadata().current_schema().clone(), + table.file_io().clone(), + location_generator.clone(), + file_name_generator.clone(), + ); + let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None); + let mut data_file_writer = data_file_writer_builder.build().await.unwrap(); + + // Prepare data + let col1 = Int32Array::from(vec![1, 2, 3, 4, 5]); + let col2 = Int64Array::from(vec![1, 2, 3, 4, 5]); + let col3 = Float32Array::from(vec![1.1, 2.2, 3.3, 4.4, 5.5]); + let col4 = Float64Array::from(vec![1.1, 2.2, 3.3, 4.4, 5.5]); + let col5 = Decimal128Array::from(vec![ + Some(1.into()), + Some(2.into()), + Some(3.into()), + Some(4.into()), + Some(5.into()), + ]) + .with_data_type(DataType::Decimal128(20, 5)); + let col6 = StringArray::from(vec!["a", "b", "c", "d", "e"]); + let col7 = BooleanArray::from(vec![true, false, true, false, true]); + let col8 = LargeBinaryArray::from_opt_vec(vec![ + Some(b"a"), + Some(b"b"), + Some(b"c"), + Some(b"d"), + Some(b"e"), + ]); + let col9 = Date32Array::from(vec![1, 2, 3, 4, 5]); + let col10 = Time64MicrosecondArray::from(vec![1, 2, 3, 4, 5]); + let col11 = TimestampMicrosecondArray::from(vec![1, 2, 3, 4, 5]); + let col12 = FixedSizeBinaryArray::try_from_iter( + vec![ + vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + ] + .into_iter(), + ) + .unwrap(); + assert!(col12.data_type() == &DataType::FixedSizeBinary(10)); + let col13 = FixedSizeBinaryArray::try_from_iter( + vec![ + Uuid::new_v4().as_bytes().to_vec(), + Uuid::new_v4().as_bytes().to_vec(), + Uuid::new_v4().as_bytes().to_vec(), + Uuid::new_v4().as_bytes().to_vec(), + Uuid::new_v4().as_bytes().to_vec(), + ] + .into_iter(), + ) + .unwrap(); + assert!(col13.data_type() == &DataType::FixedSizeBinary(16)); + let col14 = TimestampMicrosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone(UTC_TIME_ZONE); + let col15 = StructArray::from(vec![ + ( + Arc::new( + Field::new("int", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 18.to_string(), + )])), + ), + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])) as ArrayRef, + ), + ( + Arc::new( + Field::new("string", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 19.to_string(), + )])), + ), + Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])) as ArrayRef, + ), + ]); + let col16 = { + let mut builder = ListBuilder::new(Int32Builder::new()).with_field(Arc::new( + Field::new(LIST_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 20.to_string(), + )])), + )); + builder.append_value([Some(1), Some(2), Some(3), Some(4), Some(5)]); + builder.append_value([Some(1), Some(2), Some(3), Some(4), Some(5)]); + builder.append_value([Some(1), Some(2), Some(3), Some(4), Some(5)]); + builder.append_value([Some(1), Some(2), Some(3), Some(4), Some(5)]); + builder.append_value([Some(1), Some(2), Some(3), Some(4), Some(5)]); + builder.finish() + }; + let col17 = { + let string_builder = StringBuilder::new(); + let int_builder = Int32Builder::with_capacity(4); + let mut builder = MapBuilder::new(None, int_builder, string_builder); + builder.keys().append_value(1); + builder.values().append_value("a"); + builder.append(true).unwrap(); + builder.keys().append_value(2); + builder.values().append_value("b"); + builder.append(true).unwrap(); + builder.keys().append_value(3); + builder.values().append_value("c"); + builder.append(true).unwrap(); + builder.keys().append_value(4); + builder.values().append_value("d"); + builder.append(true).unwrap(); + builder.keys().append_value(5); + builder.values().append_value("e"); + builder.append(true).unwrap(); + let array = builder.finish(); + let (_field, offsets, entries, nulls, ordered) = array.into_parts(); + let new_struct_fields = Fields::from(vec![ + Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([ + (PARQUET_FIELD_ID_META_KEY.to_string(), 21.to_string()), + ])), + Field::new(MAP_VALUE_FIELD_NAME, DataType::Utf8, false).with_metadata(HashMap::from([ + (PARQUET_FIELD_ID_META_KEY.to_string(), 22.to_string()), + ])), + ]); + let entries = { + let (_, arrays, nulls) = entries.into_parts(); + StructArray::new(new_struct_fields.clone(), arrays, nulls) + }; + let field = Arc::new(Field::new( + DEFAULT_MAP_FIELD_NAME, + DataType::Struct(new_struct_fields), + false, + )); + MapArray::new(field, offsets, entries, nulls, ordered) + }; + + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + Arc::new(col4) as ArrayRef, + Arc::new(col5) as ArrayRef, + Arc::new(col6) as ArrayRef, + Arc::new(col7) as ArrayRef, + Arc::new(col8) as ArrayRef, + Arc::new(col9) as ArrayRef, + Arc::new(col10) as ArrayRef, + Arc::new(col11) as ArrayRef, + Arc::new(col12) as ArrayRef, + Arc::new(col13) as ArrayRef, + Arc::new(col14) as ArrayRef, + Arc::new(col15) as ArrayRef, + Arc::new(col16) as ArrayRef, + Arc::new(col17) as ArrayRef, + ]) + .unwrap(); + data_file_writer.write(batch.clone()).await.unwrap(); + let data_file = data_file_writer.close().await.unwrap(); + + // commit result + let tx = Transaction::new(&table); + let mut append_action = tx.fast_append(None, vec![]).unwrap(); + append_action.add_data_files(data_file.clone()).unwrap(); + let tx = append_action.apply().await.unwrap(); + let table = tx.commit(&fixture.rest_catalog).await.unwrap(); + + // check result + let batch_stream = table + .scan() + .select(vec![ + "int", + "long", + "float", + "double", + "decimal", + "string", + "boolean", + "binary", + "date", + "time", + "timestamp", + "fixed", + "uuid", + "timestamptz", + "struct", + "list", + "map", + ]) + .build() + .unwrap() + .to_arrow() + .await + .unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0], batch); + + // check result + let batch_stream = table + .scan() + .select_all() + .build() + .unwrap() + .to_arrow() + .await + .unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0], batch); +}