diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ParquetVectorUpdaterFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ParquetVectorUpdaterFactory.java index f59f691f8f19..465d8824d4b5 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ParquetVectorUpdaterFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ParquetVectorUpdaterFactory.java @@ -20,6 +20,7 @@ import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.columnar.heap.HeapIntVector; +import org.apache.paimon.data.columnar.heap.HeapLongVector; import org.apache.paimon.data.columnar.writable.WritableBooleanVector; import org.apache.paimon.data.columnar.writable.WritableByteVector; import org.apache.paimon.data.columnar.writable.WritableBytesVector; @@ -64,7 +65,6 @@ import java.math.BigDecimal; import java.math.BigInteger; -import java.math.RoundingMode; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.concurrent.TimeUnit; @@ -79,13 +79,8 @@ /** Updater Factory to get {@link ParquetVectorUpdater}. */ public class ParquetVectorUpdaterFactory { - private final LogicalTypeAnnotation logicalTypeAnnotation; - - ParquetVectorUpdaterFactory(LogicalTypeAnnotation logicalTypeAnnotation) { - this.logicalTypeAnnotation = logicalTypeAnnotation; - } - - public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType paimonType) { + public static ParquetVectorUpdater getUpdater( + ColumnDescriptor descriptor, DataType paimonType) { return paimonType.accept(UpdaterFactoryVisitor.INSTANCE).apply(descriptor); } @@ -144,14 +139,7 @@ public UpdaterFactory visit(DecimalType decimalType) { case BINARY: return new BinaryToDecimalUpdater(c, decimalType); case FIXED_LEN_BYTE_ARRAY: - int precision = decimalType.getPrecision(); - if (ParquetSchemaConverter.is32BitDecimal(precision)) { - return new IntegerToDecimalUpdater(c, decimalType); - } else if (ParquetSchemaConverter.is64BitDecimal(precision)) { - return new LongToDecimalUpdater(c, decimalType); - } else { - return new FixedLenByteArrayToDecimalUpdater(c, decimalType); - } + return new FixedLenByteArrayToDecimalUpdater(c, decimalType); } throw new RuntimeException( "Unsupported decimal type: " + c.getPrimitiveType().getPrimitiveTypeName()); @@ -614,10 +602,10 @@ public void decodeSingleDictionaryId( private abstract static class DecimalUpdater implements ParquetVectorUpdater { - private final DecimalType sparkType; + protected final DecimalType paimonType; - DecimalUpdater(DecimalType sparkType) { - this.sparkType = sparkType; + DecimalUpdater(DecimalType paimonType) { + this.paimonType = paimonType; } @Override @@ -627,22 +615,6 @@ public void readValues( readValue(offset + i, values, valuesReader); } } - - protected void writeDecimal(int offset, WritableColumnVector values, BigDecimal decimal) { - BigDecimal scaledDecimal = - decimal.setScale(sparkType.getScale(), RoundingMode.UNNECESSARY); - int precision = decimal.precision(); - if (ParquetSchemaConverter.is32BitDecimal(precision)) { - ((WritableIntVector) values) - .setInt(offset, scaledDecimal.unscaledValue().intValue()); - } else if (ParquetSchemaConverter.is64BitDecimal(precision)) { - ((WritableLongVector) values) - .setLong(offset, scaledDecimal.unscaledValue().longValue()); - } else { - byte[] bytes = scaledDecimal.unscaledValue().toByteArray(); - ((WritableBytesVector) values).putByteArray(offset, bytes, 0, bytes.length); - } - } } private static class IntegerToDecimalUpdater extends DecimalUpdater { @@ -687,8 +659,8 @@ public void decodeSingleDictionaryId( private static class LongToDecimalUpdater extends DecimalUpdater { private final int parquetScale; - LongToDecimalUpdater(ColumnDescriptor descriptor, DecimalType sparkType) { - super(sparkType); + LongToDecimalUpdater(ColumnDescriptor descriptor, DecimalType paimonType) { + super(paimonType); LogicalTypeAnnotation typeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); if (typeAnnotation instanceof DecimalLogicalTypeAnnotation) { @@ -726,8 +698,8 @@ public void decodeSingleDictionaryId( private static class BinaryToDecimalUpdater extends DecimalUpdater { private final int parquetScale; - BinaryToDecimalUpdater(ColumnDescriptor descriptor, DecimalType sparkType) { - super(sparkType); + BinaryToDecimalUpdater(ColumnDescriptor descriptor, DecimalType paimonType) { + super(paimonType); LogicalTypeAnnotation typeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); this.parquetScale = ((DecimalLogicalTypeAnnotation) typeAnnotation).getScale(); @@ -766,15 +738,17 @@ public void decodeSingleDictionaryId( } private static class FixedLenByteArrayToDecimalUpdater - extends DecimalUpdater { - private final int parquetScale; + extends DecimalUpdater { private final int arrayLen; - FixedLenByteArrayToDecimalUpdater(ColumnDescriptor descriptor, DecimalType sparkType) { - super(sparkType); + FixedLenByteArrayToDecimalUpdater(ColumnDescriptor descriptor, DecimalType paimonType) { + super(paimonType); LogicalTypeAnnotation typeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); - this.parquetScale = ((DecimalLogicalTypeAnnotation) typeAnnotation).getScale(); + int parquetScale = ((DecimalLogicalTypeAnnotation) typeAnnotation).getScale(); + checkArgument( + parquetScale == paimonType.getScale(), + "Scale should be match between paimon decimal type and parquet decimal type in file"); this.arrayLen = descriptor.getPrimitiveType().getTypeLength(); } @@ -785,27 +759,52 @@ public void skipValues(int total, VectorizedValuesReader valuesReader) { @Override public void readValue( - int offset, WritableBytesVector values, VectorizedValuesReader valuesReader) { - BigInteger value = new BigInteger(valuesReader.readBinary(arrayLen).getBytesUnsafe()); - BigDecimal decimal = new BigDecimal(value, this.parquetScale); - byte[] bytes = decimal.unscaledValue().toByteArray(); - values.putByteArray(offset, bytes, 0, bytes.length); + int offset, WritableColumnVector values, VectorizedValuesReader valuesReader) { + Binary binary = valuesReader.readBinary(arrayLen); + + int precision = paimonType.getPrecision(); + if (ParquetSchemaConverter.is32BitDecimal(precision)) { + ((HeapIntVector) values).setInt(offset, (int) heapBinaryToLong(binary)); + } else if (ParquetSchemaConverter.is64BitDecimal(precision)) { + ((HeapLongVector) values).setLong(offset, heapBinaryToLong(binary)); + } else { + byte[] bytes = binary.getBytesUnsafe(); + ((WritableBytesVector) values).putByteArray(offset, bytes, 0, bytes.length); + } + } + + private long heapBinaryToLong(Binary binary) { + ByteBuffer buffer = binary.toByteBuffer(); + byte[] bytes = buffer.array(); + int start = buffer.arrayOffset() + buffer.position(); + int end = buffer.arrayOffset() + buffer.limit(); + + long unscaled = 0L; + + for (int i = start; i < end; i++) { + unscaled = (unscaled << 8) | (bytes[i] & 0xff); + } + + int bits = 8 * (end - start); + return (unscaled << (64 - bits)) >> (64 - bits); } @Override public void decodeSingleDictionaryId( int offset, - WritableBytesVector values, + WritableColumnVector values, WritableIntVector dictionaryIds, Dictionary dictionary) { - BigInteger value = - new BigInteger( - dictionary - .decodeToBinary(dictionaryIds.getInt(offset)) - .getBytesUnsafe()); - BigDecimal decimal = new BigDecimal(value, this.parquetScale); - byte[] bytes = decimal.unscaledValue().toByteArray(); - values.putByteArray(offset, bytes, 0, bytes.length); + Binary binary = dictionary.decodeToBinary(dictionaryIds.getInt(offset)); + int precision = paimonType.getPrecision(); + if (ParquetSchemaConverter.is32BitDecimal(precision)) { + ((HeapIntVector) values).setInt(offset, (int) heapBinaryToLong(binary)); + } else if (ParquetSchemaConverter.is64BitDecimal(precision)) { + ((HeapLongVector) values).setLong(offset, heapBinaryToLong(binary)); + } else { + byte[] bytes = binary.getBytesUnsafe(); + ((WritableBytesVector) values).putByteArray(offset, bytes, 0, bytes.length); + } } } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedColumnReader.java index 277cd533c515..166a5ce93567 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedColumnReader.java @@ -39,7 +39,6 @@ import org.apache.parquet.column.page.PageReader; import org.apache.parquet.column.values.RequiresPreviousReader; import org.apache.parquet.column.values.ValuesReader; -import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; import java.io.IOException; @@ -67,9 +66,6 @@ public class VectorizedColumnReader { /** Vectorized RLE decoder for repetition levels. */ private VectorizedRleValuesReader repColumn; - /** Factory to get type-specific vector updater. */ - private final ParquetVectorUpdaterFactory updaterFactory; - /** * Helper struct to track intermediate states while reading Parquet pages in the column chunk. */ @@ -83,7 +79,6 @@ public class VectorizedColumnReader { private final PageReader pageReader; private final ColumnDescriptor descriptor; - private final LogicalTypeAnnotation logicalTypeAnnotation; private final ParsedVersion writerVersion; public VectorizedColumnReader( @@ -97,8 +92,6 @@ public VectorizedColumnReader( this.readState = new ParquetReadState( descriptor, isRequired, pageReadStore.getRowIndexes().orElse(null)); - this.logicalTypeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); - this.updaterFactory = new ParquetVectorUpdaterFactory(logicalTypeAnnotation); DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); if (dictionaryPage != null) { @@ -120,7 +113,7 @@ public VectorizedColumnReader( } private boolean isLazyDecodingSupported( - PrimitiveType.PrimitiveTypeName typeName, DataType sparkType) { + PrimitiveType.PrimitiveTypeName typeName, DataType paimonType) { return true; } @@ -133,7 +126,7 @@ void readBatch( WritableIntVector definitionLevels) throws IOException { WritableIntVector dictionaryIds = null; - ParquetVectorUpdater updater = updaterFactory.getUpdater(descriptor, type); + ParquetVectorUpdater updater = ParquetVectorUpdaterFactory.getUpdater(descriptor, type); if (dictionary != null) { // SPARK-16334: We only maintain a single dictionary per row batch, so that it can be diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java index c1028082a27d..58fc10cc5174 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java @@ -83,6 +83,7 @@ import java.io.File; import java.io.IOException; import java.math.BigDecimal; +import java.math.RoundingMode; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; @@ -99,6 +100,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static org.apache.paimon.format.parquet.ParquetSchemaConverter.computeMinBytesForDecimalPrecision; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; import static org.assertj.core.api.Assertions.assertThat; @@ -156,6 +159,16 @@ public class ParquetReadWriteTest { new TimestampType(6), new VarCharType(VarCharType.MAX_LENGTH))) .build(); + private static final RowType DECIMAL_TYPE = + RowType.builder() + .fields( + new DecimalType(3, 2), + new DecimalType(6, 2), + new DecimalType(9, 2), + new DecimalType(12, 2), + new DecimalType(32, 2)) + .build(); + private static final RowType NESTED_ARRAY_MAP_TYPE = RowType.of( new IntType(), @@ -478,6 +491,36 @@ public void testNestedRead(int rowGroupSize, String writerType) throws Exception compareNestedRow(rows, results); } + @Test + public void testDecimalWithFixedLengthRead() throws Exception { + int number = new Random().nextInt(1000) + 100; + Path path = createDecimalFile(number, folder, 10); + + ParquetReaderFactory format = + new ParquetReaderFactory(new Options(), DECIMAL_TYPE, 500, FilterCompat.NOOP); + RecordReader reader = + format.createReader( + new FormatReaderContext( + new LocalFileIO(), path, new LocalFileIO().getFileSize(path))); + List results = new ArrayList<>(number); + InternalRowSerializer internalRowSerializer = new InternalRowSerializer(DECIMAL_TYPE); + reader.forEachRemaining(row -> results.add(internalRowSerializer.copy(row))); + + BigDecimal decimalValue0 = new BigDecimal("123.67"); + BigDecimal decimalValue1 = new BigDecimal("12345.67"); + BigDecimal decimalValue2 = new BigDecimal("1234567.67"); + BigDecimal decimalValue3 = new BigDecimal("123456789123.67"); + BigDecimal decimalValue4 = new BigDecimal("123456789123456789123456789123.67"); + + for (InternalRow internalRow : results) { + assertThat(internalRow.getDecimal(0, 3, 2).toBigDecimal()).isEqualTo(decimalValue0); + assertThat(internalRow.getDecimal(1, 6, 2).toBigDecimal()).isEqualTo(decimalValue1); + assertThat(internalRow.getDecimal(2, 9, 2).toBigDecimal()).isEqualTo(decimalValue2); + assertThat(internalRow.getDecimal(3, 12, 2).toBigDecimal()).isEqualTo(decimalValue3); + assertThat(internalRow.getDecimal(4, 32, 2).toBigDecimal()).isEqualTo(decimalValue4); + } + } + @Test public void testNestedNullMapKey() { List rows = prepareNestedData(1283, true); @@ -968,6 +1011,90 @@ private Path createNestedDataByOriginWriter(int rowNum, File tmpDir, int rowGrou return path; } + private Path createDecimalFile(int rowNum, File tmpDir, int rowGroupSize) { + Path path = new Path(tmpDir.getPath(), UUID.randomUUID().toString()); + Configuration conf = new Configuration(); + conf.setInt("parquet.block.size", rowGroupSize); + List types = new ArrayList<>(); + + for (DataField dataField : DECIMAL_TYPE.getFields()) { + String name = dataField.name(); + int fieldId = dataField.id(); + int precision = ((DecimalType) dataField.type()).getPrecision(); + int scale = ((DecimalType) dataField.type()).getScale(); + Type.Repetition repetition = + dataField.type().isNullable() + ? Type.Repetition.OPTIONAL + : Type.Repetition.REQUIRED; + + types.add( + Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition) + .as(LogicalTypeAnnotation.decimalType(scale, precision)) + .length(computeMinBytesForDecimalPrecision(precision)) + .named(name) + .withId(fieldId)); + } + + MessageType schema = new MessageType("paimon_schema", types); + + List decimalBytesList = new ArrayList<>(); + + BigDecimal decimalValue = new BigDecimal("123.67"); + int scale = 2; + byte[] decimalBytes = + decimalValue.setScale(scale, RoundingMode.HALF_UP).unscaledValue().toByteArray(); + Binary binaryValue = Binary.fromByteArray(decimalBytes); + decimalBytesList.add(binaryValue); + + decimalValue = new BigDecimal("12345.67"); + decimalBytes = + decimalValue.setScale(scale, RoundingMode.HALF_UP).unscaledValue().toByteArray(); + binaryValue = Binary.fromByteArray(decimalBytes); + decimalBytesList.add(binaryValue); + + decimalValue = new BigDecimal("1234567.67"); + decimalBytes = + decimalValue.setScale(scale, RoundingMode.HALF_UP).unscaledValue().toByteArray(); + binaryValue = Binary.fromByteArray(decimalBytes); + decimalBytesList.add(binaryValue); + + decimalValue = new BigDecimal("123456789123.67"); + decimalBytes = + decimalValue.setScale(scale, RoundingMode.HALF_UP).unscaledValue().toByteArray(); + binaryValue = Binary.fromByteArray(decimalBytes); + decimalBytesList.add(binaryValue); + + decimalValue = new BigDecimal("123456789123456789123456789123.67"); + decimalBytes = + decimalValue.setScale(scale, RoundingMode.HALF_UP).unscaledValue().toByteArray(); + binaryValue = Binary.fromByteArray(decimalBytes); + decimalBytesList.add(binaryValue); + + try (ParquetWriter writer = + ExampleParquetWriter.builder( + HadoopOutputFile.fromPath( + new org.apache.hadoop.fs.Path(path.toString()), conf)) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .withConf(new Configuration()) + .withType(schema) + .build()) { + SimpleGroupFactory simpleGroupFactory = new SimpleGroupFactory(schema); + for (int i = 0; i < rowNum; i++) { + + Group row = simpleGroupFactory.newGroup(); + + for (int j = 0; j < DECIMAL_TYPE.getFields().size(); j++) { + row.append("f" + j, decimalBytesList.get(j)); + } + + writer.write(row); + } + } catch (Exception e) { + throw new RuntimeException("Create data by parquet origin writer failed.", e); + } + return path; + } + private void createParquetDoubleNestedArray(Group group, int i) { Group outside = group.addGroup(0); Group inside = outside.addGroup(0);