From 61b99ac806b21f7ca97885374298f08dd9c3649c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Mon, 10 Feb 2025 16:42:42 +0800 Subject: [PATCH] Add test for decimal type with fixed length bytes in parquet file --- .../ParquetVectorUpdaterFactory.java | 19 +-- .../format/parquet/ParquetReadWriteTest.java | 127 ++++++++++++++++++ 2 files changed, 133 insertions(+), 13 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ParquetVectorUpdaterFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ParquetVectorUpdaterFactory.java index 125b4c0014be..465d8824d4b5 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ParquetVectorUpdaterFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ParquetVectorUpdaterFactory.java @@ -139,14 +139,7 @@ public UpdaterFactory visit(DecimalType decimalType) { case BINARY: return new BinaryToDecimalUpdater(c, decimalType); case FIXED_LEN_BYTE_ARRAY: - int precision = decimalType.getPrecision(); - if (ParquetSchemaConverter.is32BitDecimal(precision)) { - return new IntegerToDecimalUpdater(c, decimalType); - } else if (ParquetSchemaConverter.is64BitDecimal(precision)) { - return new LongToDecimalUpdater(c, decimalType); - } else { - return new FixedLenByteArrayToDecimalUpdater(c, decimalType); - } + return new FixedLenByteArrayToDecimalUpdater(c, decimalType); } throw new RuntimeException( "Unsupported decimal type: " + c.getPrimitiveType().getPrimitiveTypeName()); @@ -745,7 +738,7 @@ public void decodeSingleDictionaryId( } private static class FixedLenByteArrayToDecimalUpdater - extends DecimalUpdater { + extends DecimalUpdater { private final int arrayLen; FixedLenByteArrayToDecimalUpdater(ColumnDescriptor descriptor, DecimalType paimonType) { @@ -766,7 +759,7 @@ public void skipValues(int total, VectorizedValuesReader valuesReader) { @Override public void readValue( - int offset, WritableBytesVector values, VectorizedValuesReader valuesReader) { + int offset, WritableColumnVector values, VectorizedValuesReader valuesReader) { Binary binary = valuesReader.readBinary(arrayLen); int precision = paimonType.getPrecision(); @@ -776,7 +769,7 @@ public void readValue( ((HeapLongVector) values).setLong(offset, heapBinaryToLong(binary)); } else { byte[] bytes = binary.getBytesUnsafe(); - values.putByteArray(offset, bytes, 0, bytes.length); + ((WritableBytesVector) values).putByteArray(offset, bytes, 0, bytes.length); } } @@ -799,7 +792,7 @@ private long heapBinaryToLong(Binary binary) { @Override public void decodeSingleDictionaryId( int offset, - WritableBytesVector values, + WritableColumnVector values, WritableIntVector dictionaryIds, Dictionary dictionary) { Binary binary = dictionary.decodeToBinary(dictionaryIds.getInt(offset)); @@ -810,7 +803,7 @@ public void decodeSingleDictionaryId( ((HeapLongVector) values).setLong(offset, heapBinaryToLong(binary)); } else { byte[] bytes = binary.getBytesUnsafe(); - values.putByteArray(offset, bytes, 0, bytes.length); + ((WritableBytesVector) values).putByteArray(offset, bytes, 0, bytes.length); } } } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java index c1028082a27d..58fc10cc5174 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java @@ -83,6 +83,7 @@ import java.io.File; import java.io.IOException; import java.math.BigDecimal; +import java.math.RoundingMode; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; @@ -99,6 +100,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static org.apache.paimon.format.parquet.ParquetSchemaConverter.computeMinBytesForDecimalPrecision; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; import static org.assertj.core.api.Assertions.assertThat; @@ -156,6 +159,16 @@ public class ParquetReadWriteTest { new TimestampType(6), new VarCharType(VarCharType.MAX_LENGTH))) .build(); + private static final RowType DECIMAL_TYPE = + RowType.builder() + .fields( + new DecimalType(3, 2), + new DecimalType(6, 2), + new DecimalType(9, 2), + new DecimalType(12, 2), + new DecimalType(32, 2)) + .build(); + private static final RowType NESTED_ARRAY_MAP_TYPE = RowType.of( new IntType(), @@ -478,6 +491,36 @@ public void testNestedRead(int rowGroupSize, String writerType) throws Exception compareNestedRow(rows, results); } + @Test + public void testDecimalWithFixedLengthRead() throws Exception { + int number = new Random().nextInt(1000) + 100; + Path path = createDecimalFile(number, folder, 10); + + ParquetReaderFactory format = + new ParquetReaderFactory(new Options(), DECIMAL_TYPE, 500, FilterCompat.NOOP); + RecordReader reader = + format.createReader( + new FormatReaderContext( + new LocalFileIO(), path, new LocalFileIO().getFileSize(path))); + List results = new ArrayList<>(number); + InternalRowSerializer internalRowSerializer = new InternalRowSerializer(DECIMAL_TYPE); + reader.forEachRemaining(row -> results.add(internalRowSerializer.copy(row))); + + BigDecimal decimalValue0 = new BigDecimal("123.67"); + BigDecimal decimalValue1 = new BigDecimal("12345.67"); + BigDecimal decimalValue2 = new BigDecimal("1234567.67"); + BigDecimal decimalValue3 = new BigDecimal("123456789123.67"); + BigDecimal decimalValue4 = new BigDecimal("123456789123456789123456789123.67"); + + for (InternalRow internalRow : results) { + assertThat(internalRow.getDecimal(0, 3, 2).toBigDecimal()).isEqualTo(decimalValue0); + assertThat(internalRow.getDecimal(1, 6, 2).toBigDecimal()).isEqualTo(decimalValue1); + assertThat(internalRow.getDecimal(2, 9, 2).toBigDecimal()).isEqualTo(decimalValue2); + assertThat(internalRow.getDecimal(3, 12, 2).toBigDecimal()).isEqualTo(decimalValue3); + assertThat(internalRow.getDecimal(4, 32, 2).toBigDecimal()).isEqualTo(decimalValue4); + } + } + @Test public void testNestedNullMapKey() { List rows = prepareNestedData(1283, true); @@ -968,6 +1011,90 @@ private Path createNestedDataByOriginWriter(int rowNum, File tmpDir, int rowGrou return path; } + private Path createDecimalFile(int rowNum, File tmpDir, int rowGroupSize) { + Path path = new Path(tmpDir.getPath(), UUID.randomUUID().toString()); + Configuration conf = new Configuration(); + conf.setInt("parquet.block.size", rowGroupSize); + List types = new ArrayList<>(); + + for (DataField dataField : DECIMAL_TYPE.getFields()) { + String name = dataField.name(); + int fieldId = dataField.id(); + int precision = ((DecimalType) dataField.type()).getPrecision(); + int scale = ((DecimalType) dataField.type()).getScale(); + Type.Repetition repetition = + dataField.type().isNullable() + ? Type.Repetition.OPTIONAL + : Type.Repetition.REQUIRED; + + types.add( + Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition) + .as(LogicalTypeAnnotation.decimalType(scale, precision)) + .length(computeMinBytesForDecimalPrecision(precision)) + .named(name) + .withId(fieldId)); + } + + MessageType schema = new MessageType("paimon_schema", types); + + List decimalBytesList = new ArrayList<>(); + + BigDecimal decimalValue = new BigDecimal("123.67"); + int scale = 2; + byte[] decimalBytes = + decimalValue.setScale(scale, RoundingMode.HALF_UP).unscaledValue().toByteArray(); + Binary binaryValue = Binary.fromByteArray(decimalBytes); + decimalBytesList.add(binaryValue); + + decimalValue = new BigDecimal("12345.67"); + decimalBytes = + decimalValue.setScale(scale, RoundingMode.HALF_UP).unscaledValue().toByteArray(); + binaryValue = Binary.fromByteArray(decimalBytes); + decimalBytesList.add(binaryValue); + + decimalValue = new BigDecimal("1234567.67"); + decimalBytes = + decimalValue.setScale(scale, RoundingMode.HALF_UP).unscaledValue().toByteArray(); + binaryValue = Binary.fromByteArray(decimalBytes); + decimalBytesList.add(binaryValue); + + decimalValue = new BigDecimal("123456789123.67"); + decimalBytes = + decimalValue.setScale(scale, RoundingMode.HALF_UP).unscaledValue().toByteArray(); + binaryValue = Binary.fromByteArray(decimalBytes); + decimalBytesList.add(binaryValue); + + decimalValue = new BigDecimal("123456789123456789123456789123.67"); + decimalBytes = + decimalValue.setScale(scale, RoundingMode.HALF_UP).unscaledValue().toByteArray(); + binaryValue = Binary.fromByteArray(decimalBytes); + decimalBytesList.add(binaryValue); + + try (ParquetWriter writer = + ExampleParquetWriter.builder( + HadoopOutputFile.fromPath( + new org.apache.hadoop.fs.Path(path.toString()), conf)) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .withConf(new Configuration()) + .withType(schema) + .build()) { + SimpleGroupFactory simpleGroupFactory = new SimpleGroupFactory(schema); + for (int i = 0; i < rowNum; i++) { + + Group row = simpleGroupFactory.newGroup(); + + for (int j = 0; j < DECIMAL_TYPE.getFields().size(); j++) { + row.append("f" + j, decimalBytesList.get(j)); + } + + writer.write(row); + } + } catch (Exception e) { + throw new RuntimeException("Create data by parquet origin writer failed.", e); + } + return path; + } + private void createParquetDoubleNestedArray(Group group, int i) { Group outside = group.addGroup(0); Group inside = outside.addGroup(0);