Skip to content

Commit 1c632b8

Browse files
ZENOTMEZENOTME
and
ZENOTME
authored
fix: parse var len of decimal for parquet statistic (#837)
Co-authored-by: ZENOTME <[email protected]>
1 parent 974d9de commit 1c632b8

File tree

2 files changed

+253
-2
lines changed

2 files changed

+253
-2
lines changed

crates/iceberg/src/arrow/schema.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use arrow_array::{
2929
};
3030
use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
3131
use bitvec::macros::internal::funty::Fundamental;
32+
use num_bigint::BigInt;
3233
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
3334
use parquet::file::statistics::Statistics;
3435
use rust_decimal::prelude::ToPrimitive;
@@ -739,9 +740,15 @@ macro_rules! get_parquet_stat_as_datum {
739740
let Some(bytes) = stats.[<$limit_type _bytes_opt>]() else {
740741
return Ok(None);
741742
};
743+
let unscaled_value = BigInt::from_signed_bytes_be(bytes);
742744
Some(Datum::new(
743745
primitive_type.clone(),
744-
PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)),
746+
PrimitiveLiteral::Int128(unscaled_value.to_i128().ok_or_else(|| {
747+
Error::new(
748+
ErrorKind::DataInvalid,
749+
format!("Can't convert bytes to i128: {:?}", bytes),
750+
)
751+
})?),
745752
))
746753
}
747754
(

crates/iceberg/src/writer/file_writer/parquet_writer.rs

Lines changed: 245 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,15 +478,18 @@ mod tests {
478478
use anyhow::Result;
479479
use arrow_array::types::Int64Type;
480480
use arrow_array::{
481-
Array, ArrayRef, BooleanArray, Int32Array, Int64Array, ListArray, RecordBatch, StructArray,
481+
Array, ArrayRef, BooleanArray, Decimal128Array, Int32Array, Int64Array, ListArray,
482+
RecordBatch, StructArray,
482483
};
483484
use arrow_schema::{DataType, SchemaRef as ArrowSchemaRef};
484485
use arrow_select::concat::concat_batches;
485486
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
487+
use rust_decimal::Decimal;
486488
use tempfile::TempDir;
487489
use uuid::Uuid;
488490

489491
use super::*;
492+
use crate::arrow::schema_to_arrow_schema;
490493
use crate::io::FileIOBuilder;
491494
use crate::spec::{PrimitiveLiteral, Struct, *};
492495
use crate::writer::file_writer::location_generator::test::MockLocationGenerator;
@@ -1169,4 +1172,245 @@ mod tests {
11691172

11701173
Ok(())
11711174
}
1175+
1176+
#[tokio::test]
1177+
async fn test_decimal_bound() -> Result<()> {
1178+
let temp_dir = TempDir::new().unwrap();
1179+
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1180+
let loccation_gen =
1181+
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
1182+
let file_name_gen =
1183+
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1184+
1185+
// test 1.1 and 2.2
1186+
let schema = Arc::new(
1187+
Schema::builder()
1188+
.with_fields(vec![NestedField::optional(
1189+
0,
1190+
"decimal",
1191+
Type::Primitive(PrimitiveType::Decimal {
1192+
precision: 28,
1193+
scale: 10,
1194+
}),
1195+
)
1196+
.into()])
1197+
.build()
1198+
.unwrap(),
1199+
);
1200+
let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1201+
let mut pw = ParquetWriterBuilder::new(
1202+
WriterProperties::builder().build(),
1203+
schema.clone(),
1204+
file_io.clone(),
1205+
loccation_gen.clone(),
1206+
file_name_gen.clone(),
1207+
)
1208+
.build()
1209+
.await?;
1210+
let col0 = Arc::new(
1211+
Decimal128Array::from(vec![Some(22000000000), Some(11000000000)])
1212+
.with_data_type(DataType::Decimal128(28, 10)),
1213+
) as ArrayRef;
1214+
let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1215+
pw.write(&to_write).await?;
1216+
let res = pw.close().await?;
1217+
assert_eq!(res.len(), 1);
1218+
let data_file = res
1219+
.into_iter()
1220+
.next()
1221+
.unwrap()
1222+
.content(crate::spec::DataContentType::Data)
1223+
.partition(Struct::empty())
1224+
.build()
1225+
.unwrap();
1226+
assert_eq!(
1227+
data_file.upper_bounds().get(&0),
1228+
Some(Datum::decimal_with_precision(Decimal::new(22000000000_i64, 10), 28).unwrap())
1229+
.as_ref()
1230+
);
1231+
assert_eq!(
1232+
data_file.lower_bounds().get(&0),
1233+
Some(Datum::decimal_with_precision(Decimal::new(11000000000_i64, 10), 28).unwrap())
1234+
.as_ref()
1235+
);
1236+
1237+
// test -1.1 and -2.2
1238+
let schema = Arc::new(
1239+
Schema::builder()
1240+
.with_fields(vec![NestedField::optional(
1241+
0,
1242+
"decimal",
1243+
Type::Primitive(PrimitiveType::Decimal {
1244+
precision: 28,
1245+
scale: 10,
1246+
}),
1247+
)
1248+
.into()])
1249+
.build()
1250+
.unwrap(),
1251+
);
1252+
let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1253+
let mut pw = ParquetWriterBuilder::new(
1254+
WriterProperties::builder().build(),
1255+
schema.clone(),
1256+
file_io.clone(),
1257+
loccation_gen.clone(),
1258+
file_name_gen.clone(),
1259+
)
1260+
.build()
1261+
.await?;
1262+
let col0 = Arc::new(
1263+
Decimal128Array::from(vec![Some(-22000000000), Some(-11000000000)])
1264+
.with_data_type(DataType::Decimal128(28, 10)),
1265+
) as ArrayRef;
1266+
let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1267+
pw.write(&to_write).await?;
1268+
let res = pw.close().await?;
1269+
assert_eq!(res.len(), 1);
1270+
let data_file = res
1271+
.into_iter()
1272+
.next()
1273+
.unwrap()
1274+
.content(crate::spec::DataContentType::Data)
1275+
.partition(Struct::empty())
1276+
.build()
1277+
.unwrap();
1278+
assert_eq!(
1279+
data_file.upper_bounds().get(&0),
1280+
Some(Datum::decimal_with_precision(Decimal::new(-11000000000_i64, 10), 28).unwrap())
1281+
.as_ref()
1282+
);
1283+
assert_eq!(
1284+
data_file.lower_bounds().get(&0),
1285+
Some(Datum::decimal_with_precision(Decimal::new(-22000000000_i64, 10), 28).unwrap())
1286+
.as_ref()
1287+
);
1288+
1289+
// test max and min of rust_decimal
1290+
let decimal_max = Decimal::MAX;
1291+
let decimal_min = Decimal::MIN;
1292+
assert_eq!(decimal_max.scale(), decimal_min.scale());
1293+
let schema = Arc::new(
1294+
Schema::builder()
1295+
.with_fields(vec![NestedField::optional(
1296+
0,
1297+
"decimal",
1298+
Type::Primitive(PrimitiveType::Decimal {
1299+
precision: 38,
1300+
scale: decimal_max.scale(),
1301+
}),
1302+
)
1303+
.into()])
1304+
.build()
1305+
.unwrap(),
1306+
);
1307+
let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1308+
let mut pw = ParquetWriterBuilder::new(
1309+
WriterProperties::builder().build(),
1310+
schema,
1311+
file_io.clone(),
1312+
loccation_gen,
1313+
file_name_gen,
1314+
)
1315+
.build()
1316+
.await?;
1317+
let col0 = Arc::new(
1318+
Decimal128Array::from(vec![
1319+
Some(decimal_max.mantissa()),
1320+
Some(decimal_min.mantissa()),
1321+
])
1322+
.with_data_type(DataType::Decimal128(38, 0)),
1323+
) as ArrayRef;
1324+
let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1325+
pw.write(&to_write).await?;
1326+
let res = pw.close().await?;
1327+
assert_eq!(res.len(), 1);
1328+
let data_file = res
1329+
.into_iter()
1330+
.next()
1331+
.unwrap()
1332+
.content(crate::spec::DataContentType::Data)
1333+
.partition(Struct::empty())
1334+
.build()
1335+
.unwrap();
1336+
assert_eq!(
1337+
data_file.upper_bounds().get(&0),
1338+
Some(Datum::decimal(decimal_max).unwrap()).as_ref()
1339+
);
1340+
assert_eq!(
1341+
data_file.lower_bounds().get(&0),
1342+
Some(Datum::decimal(decimal_min).unwrap()).as_ref()
1343+
);
1344+
1345+
// test max and min for scale 38
1346+
// # TODO
1347+
// Readd this case after resolve https://github.com/apache/iceberg-rust/issues/669
1348+
// let schema = Arc::new(
1349+
// Schema::builder()
1350+
// .with_fields(vec![NestedField::optional(
1351+
// 0,
1352+
// "decimal",
1353+
// Type::Primitive(PrimitiveType::Decimal {
1354+
// precision: 38,
1355+
// scale: 0,
1356+
// }),
1357+
// )
1358+
// .into()])
1359+
// .build()
1360+
// .unwrap(),
1361+
// );
1362+
// let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1363+
// let mut pw = ParquetWriterBuilder::new(
1364+
// WriterProperties::builder().build(),
1365+
// schema,
1366+
// file_io.clone(),
1367+
// loccation_gen,
1368+
// file_name_gen,
1369+
// )
1370+
// .build()
1371+
// .await?;
1372+
// let col0 = Arc::new(
1373+
// Decimal128Array::from(vec![
1374+
// Some(99999999999999999999999999999999999999_i128),
1375+
// Some(-99999999999999999999999999999999999999_i128),
1376+
// ])
1377+
// .with_data_type(DataType::Decimal128(38, 0)),
1378+
// ) as ArrayRef;
1379+
// let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1380+
// pw.write(&to_write).await?;
1381+
// let res = pw.close().await?;
1382+
// assert_eq!(res.len(), 1);
1383+
// let data_file = res
1384+
// .into_iter()
1385+
// .next()
1386+
// .unwrap()
1387+
// .content(crate::spec::DataContentType::Data)
1388+
// .partition(Struct::empty())
1389+
// .build()
1390+
// .unwrap();
1391+
// assert_eq!(
1392+
// data_file.upper_bounds().get(&0),
1393+
// Some(Datum::new(
1394+
// PrimitiveType::Decimal {
1395+
// precision: 38,
1396+
// scale: 0
1397+
// },
1398+
// PrimitiveLiteral::Int128(99999999999999999999999999999999999999_i128)
1399+
// ))
1400+
// .as_ref()
1401+
// );
1402+
// assert_eq!(
1403+
// data_file.lower_bounds().get(&0),
1404+
// Some(Datum::new(
1405+
// PrimitiveType::Decimal {
1406+
// precision: 38,
1407+
// scale: 0
1408+
// },
1409+
// PrimitiveLiteral::Int128(-99999999999999999999999999999999999999_i128)
1410+
// ))
1411+
// .as_ref()
1412+
// );
1413+
1414+
Ok(())
1415+
}
11721416
}

0 commit comments

Comments
 (0)