Skip to content

Commit

Permalink
- fix equality delete writer field id project (#751)
Browse files Browse the repository at this point in the history
- fix decimal of parquet statis parse

Co-authored-by: ZENOTME <[email protected]>
  • Loading branch information
ZENOTME and ZENOTME authored Dec 4, 2024
1 parent 1798b30 commit 4fba3f4
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 5 deletions.
15 changes: 13 additions & 2 deletions crates/iceberg/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,10 +723,21 @@ macro_rules! get_parquet_stat_as_datum {
let Some(bytes) = stats.[<$limit_type _bytes_opt>]() else {
return Ok(None);
};

Some(Datum::new(
primitive_type.clone(),
PrimitiveLiteral::Int128(i128::from_le_bytes(bytes.try_into()?)),
PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)),
))
}
(PrimitiveType::Decimal {
precision: _,
scale: _,
}, Statistics::FixedLenByteArray(stats)) => {
let Some(bytes) = stats.[<$limit_type _bytes_opt>]() else {
return Ok(None);
};
Some(Datum::new(
primitive_type.clone(),
PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)),
))
}
(
Expand Down
160 changes: 158 additions & 2 deletions crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl EqualityDeleteWriterConfig {
|field| {
// Only primitive type is allowed to be used for identifier field ids
if field.is_nullable()
|| !field.data_type().is_primitive()
|| field.data_type().is_nested()
|| matches!(
field.data_type(),
DataType::Float16 | DataType::Float32 | DataType::Float64
Expand Down Expand Up @@ -169,13 +169,14 @@ mod test {
use std::sync::Arc;

use arrow_array::types::Int32Type;
use arrow_array::{ArrayRef, Int32Array, RecordBatch, StructArray};
use arrow_array::{ArrayRef, BooleanArray, Int32Array, Int64Array, RecordBatch, StructArray};
use arrow_schema::DataType;
use arrow_select::concat::concat_batches;
use itertools::Itertools;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::file::properties::WriterProperties;
use tempfile::TempDir;
use uuid::Uuid;

use crate::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
use crate::io::{FileIO, FileIOBuilder};
Expand Down Expand Up @@ -500,4 +501,159 @@ mod test {

Ok(())
}

#[tokio::test]
async fn test_equality_delete_with_primitive_type() -> Result<(), anyhow::Error> {
let temp_dir = TempDir::new().unwrap();
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
let location_gen =
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
let file_name_gen =
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);

let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Boolean))
.into(),
NestedField::required(1, "col1", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(2, "col2", Type::Primitive(PrimitiveType::Long)).into(),
NestedField::required(
3,
"col3",
Type::Primitive(PrimitiveType::Decimal {
precision: 38,
scale: 5,
}),
)
.into(),
NestedField::required(4, "col4", Type::Primitive(PrimitiveType::Date)).into(),
NestedField::required(5, "col5", Type::Primitive(PrimitiveType::Time)).into(),
NestedField::required(6, "col6", Type::Primitive(PrimitiveType::Timestamp))
.into(),
NestedField::required(7, "col7", Type::Primitive(PrimitiveType::Timestamptz))
.into(),
NestedField::required(8, "col8", Type::Primitive(PrimitiveType::TimestampNs))
.into(),
NestedField::required(9, "col9", Type::Primitive(PrimitiveType::TimestamptzNs))
.into(),
NestedField::required(10, "col10", Type::Primitive(PrimitiveType::String))
.into(),
NestedField::required(11, "col11", Type::Primitive(PrimitiveType::Uuid)).into(),
NestedField::required(12, "col12", Type::Primitive(PrimitiveType::Fixed(10)))
.into(),
NestedField::required(13, "col13", Type::Primitive(PrimitiveType::Binary))
.into(),
])
.build()
.unwrap(),
);
let equality_ids = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13];
let config = EqualityDeleteWriterConfig::new(equality_ids, schema.clone(), None).unwrap();
let delete_arrow_schema = config.projected_arrow_schema_ref().clone();
let delete_schema = arrow_schema_to_schema(&delete_arrow_schema).unwrap();

let pb = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
Arc::new(delete_schema),
file_io.clone(),
location_gen,
file_name_gen,
);
let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb)
.build(config)
.await?;

// prepare data
let col0 = Arc::new(BooleanArray::from(vec![
Some(true),
Some(false),
Some(true),
])) as ArrayRef;
let col1 = Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(4)])) as ArrayRef;
let col2 = Arc::new(Int64Array::from(vec![Some(1), Some(2), Some(4)])) as ArrayRef;
let col3 = Arc::new(
arrow_array::Decimal128Array::from(vec![Some(1), Some(2), Some(4)])
.with_precision_and_scale(38, 5)
.unwrap(),
) as ArrayRef;
let col4 = Arc::new(arrow_array::Date32Array::from(vec![
Some(0),
Some(1),
Some(3),
])) as ArrayRef;
let col5 = Arc::new(arrow_array::Time64MicrosecondArray::from(vec![
Some(0),
Some(1),
Some(3),
])) as ArrayRef;
let col6 = Arc::new(arrow_array::TimestampMicrosecondArray::from(vec![
Some(0),
Some(1),
Some(3),
])) as ArrayRef;
let col7 = Arc::new(
arrow_array::TimestampMicrosecondArray::from(vec![Some(0), Some(1), Some(3)])
.with_timezone_utc(),
) as ArrayRef;
let col8 = Arc::new(arrow_array::TimestampNanosecondArray::from(vec![
Some(0),
Some(1),
Some(3),
])) as ArrayRef;
let col9 = Arc::new(
arrow_array::TimestampNanosecondArray::from(vec![Some(0), Some(1), Some(3)])
.with_timezone_utc(),
) as ArrayRef;
let col10 = Arc::new(arrow_array::StringArray::from(vec![
Some("a"),
Some("b"),
Some("d"),
])) as ArrayRef;
let col11 = Arc::new(
arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
vec![
Some(Uuid::from_u128(0).as_bytes().to_vec()),
Some(Uuid::from_u128(1).as_bytes().to_vec()),
Some(Uuid::from_u128(3).as_bytes().to_vec()),
]
.into_iter(),
16,
)
.unwrap(),
) as ArrayRef;
let col12 = Arc::new(
arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
vec![
Some(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
Some(vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]),
Some(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30]),
]
.into_iter(),
10,
)
.unwrap(),
) as ArrayRef;
let col13 = Arc::new(arrow_array::LargeBinaryArray::from_opt_vec(vec![
Some(b"one"),
Some(b""),
Some(b"zzzz"),
])) as ArrayRef;
let to_write = RecordBatch::try_new(delete_arrow_schema.clone(), vec![
col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13,
])
.unwrap();
equality_delete_writer.write(to_write.clone()).await?;
let res = equality_delete_writer.close().await?;
assert_eq!(res.len(), 1);
check_parquet_data_file_with_equality_delete_write(
&file_io,
&res.into_iter().next().unwrap(),
&to_write,
)
.await;

Ok(())
}
}
38 changes: 37 additions & 1 deletion crates/iceberg/src/writer/file_writer/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,17 @@ mod tests {
NestedField::optional(14, "uuid", Type::Primitive(PrimitiveType::Uuid)).into(),
NestedField::optional(15, "fixed", Type::Primitive(PrimitiveType::Fixed(10)))
.into(),
// Parquet Statistics will use different representation for Decimal with precision 38 and scale 5,
// so we need to add a new field for it.
NestedField::optional(
16,
"decimal_38",
Type::Primitive(PrimitiveType::Decimal {
precision: 38,
scale: 5,
}),
)
.into(),
])
.build()
.unwrap()
Expand Down Expand Up @@ -1028,9 +1039,14 @@ mod tests {
)
.unwrap(),
) as ArrayRef;
let col16 = Arc::new(
arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
.with_precision_and_scale(38, 5)
.unwrap(),
) as ArrayRef;
let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13,
col14, col15,
col14, col15, col16,
])
.unwrap();

Expand Down Expand Up @@ -1092,6 +1108,16 @@ mod tests {
),
(14, Datum::uuid(Uuid::from_u128(0))),
(15, Datum::fixed(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
(
16,
Datum::new(
PrimitiveType::Decimal {
precision: 38,
scale: 5
},
PrimitiveLiteral::Int128(1)
)
),
])
);
assert_eq!(
Expand Down Expand Up @@ -1125,6 +1151,16 @@ mod tests {
15,
Datum::fixed(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30])
),
(
16,
Datum::new(
PrimitiveType::Decimal {
precision: 38,
scale: 5
},
PrimitiveLiteral::Int128(100)
)
),
])
);

Expand Down

0 comments on commit 4fba3f4

Please sign in to comment.