Skip to content

Commit

Permalink
Add test for decimal type with fixed length bytes in parquet file
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 committed Feb 10, 2025
1 parent 6f06559 commit 61b99ac
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,7 @@ public UpdaterFactory visit(DecimalType decimalType) {
case BINARY:
return new BinaryToDecimalUpdater(c, decimalType);
case FIXED_LEN_BYTE_ARRAY:
int precision = decimalType.getPrecision();
if (ParquetSchemaConverter.is32BitDecimal(precision)) {
return new IntegerToDecimalUpdater(c, decimalType);
} else if (ParquetSchemaConverter.is64BitDecimal(precision)) {
return new LongToDecimalUpdater(c, decimalType);
} else {
return new FixedLenByteArrayToDecimalUpdater(c, decimalType);
}
return new FixedLenByteArrayToDecimalUpdater(c, decimalType);
}
throw new RuntimeException(
"Unsupported decimal type: " + c.getPrimitiveType().getPrimitiveTypeName());
Expand Down Expand Up @@ -745,7 +738,7 @@ public void decodeSingleDictionaryId(
}

private static class FixedLenByteArrayToDecimalUpdater
extends DecimalUpdater<WritableBytesVector> {
extends DecimalUpdater<WritableColumnVector> {
private final int arrayLen;

FixedLenByteArrayToDecimalUpdater(ColumnDescriptor descriptor, DecimalType paimonType) {
Expand All @@ -766,7 +759,7 @@ public void skipValues(int total, VectorizedValuesReader valuesReader) {

@Override
public void readValue(
int offset, WritableBytesVector values, VectorizedValuesReader valuesReader) {
int offset, WritableColumnVector values, VectorizedValuesReader valuesReader) {
Binary binary = valuesReader.readBinary(arrayLen);

int precision = paimonType.getPrecision();
Expand All @@ -776,7 +769,7 @@ public void readValue(
((HeapLongVector) values).setLong(offset, heapBinaryToLong(binary));
} else {
byte[] bytes = binary.getBytesUnsafe();
values.putByteArray(offset, bytes, 0, bytes.length);
((WritableBytesVector) values).putByteArray(offset, bytes, 0, bytes.length);
}
}

Expand All @@ -799,7 +792,7 @@ private long heapBinaryToLong(Binary binary) {
@Override
public void decodeSingleDictionaryId(
int offset,
WritableBytesVector values,
WritableColumnVector values,
WritableIntVector dictionaryIds,
Dictionary dictionary) {
Binary binary = dictionary.decodeToBinary(dictionaryIds.getInt(offset));
Expand All @@ -810,7 +803,7 @@ public void decodeSingleDictionaryId(
((HeapLongVector) values).setLong(offset, heapBinaryToLong(binary));
} else {
byte[] bytes = binary.getBytesUnsafe();
values.putByteArray(offset, bytes, 0, bytes.length);
((WritableBytesVector) values).putByteArray(offset, bytes, 0, bytes.length);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -99,6 +100,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.apache.paimon.format.parquet.ParquetSchemaConverter.computeMinBytesForDecimalPrecision;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -156,6 +159,16 @@ public class ParquetReadWriteTest {
new TimestampType(6), new VarCharType(VarCharType.MAX_LENGTH)))
.build();

private static final RowType DECIMAL_TYPE =
RowType.builder()
.fields(
new DecimalType(3, 2),
new DecimalType(6, 2),
new DecimalType(9, 2),
new DecimalType(12, 2),
new DecimalType(32, 2))
.build();

private static final RowType NESTED_ARRAY_MAP_TYPE =
RowType.of(
new IntType(),
Expand Down Expand Up @@ -478,6 +491,36 @@ public void testNestedRead(int rowGroupSize, String writerType) throws Exception
compareNestedRow(rows, results);
}

@Test
public void testDecimalWithFixedLengthRead() throws Exception {
int number = new Random().nextInt(1000) + 100;
Path path = createDecimalFile(number, folder, 10);

ParquetReaderFactory format =
new ParquetReaderFactory(new Options(), DECIMAL_TYPE, 500, FilterCompat.NOOP);
RecordReader<InternalRow> reader =
format.createReader(
new FormatReaderContext(
new LocalFileIO(), path, new LocalFileIO().getFileSize(path)));
List<InternalRow> results = new ArrayList<>(number);
InternalRowSerializer internalRowSerializer = new InternalRowSerializer(DECIMAL_TYPE);
reader.forEachRemaining(row -> results.add(internalRowSerializer.copy(row)));

BigDecimal decimalValue0 = new BigDecimal("123.67");
BigDecimal decimalValue1 = new BigDecimal("12345.67");
BigDecimal decimalValue2 = new BigDecimal("1234567.67");
BigDecimal decimalValue3 = new BigDecimal("123456789123.67");
BigDecimal decimalValue4 = new BigDecimal("123456789123456789123456789123.67");

for (InternalRow internalRow : results) {
assertThat(internalRow.getDecimal(0, 3, 2).toBigDecimal()).isEqualTo(decimalValue0);
assertThat(internalRow.getDecimal(1, 6, 2).toBigDecimal()).isEqualTo(decimalValue1);
assertThat(internalRow.getDecimal(2, 9, 2).toBigDecimal()).isEqualTo(decimalValue2);
assertThat(internalRow.getDecimal(3, 12, 2).toBigDecimal()).isEqualTo(decimalValue3);
assertThat(internalRow.getDecimal(4, 32, 2).toBigDecimal()).isEqualTo(decimalValue4);
}
}

@Test
public void testNestedNullMapKey() {
List<InternalRow> rows = prepareNestedData(1283, true);
Expand Down Expand Up @@ -968,6 +1011,90 @@ private Path createNestedDataByOriginWriter(int rowNum, File tmpDir, int rowGrou
return path;
}

private Path createDecimalFile(int rowNum, File tmpDir, int rowGroupSize) {
Path path = new Path(tmpDir.getPath(), UUID.randomUUID().toString());
Configuration conf = new Configuration();
conf.setInt("parquet.block.size", rowGroupSize);
List<Type> types = new ArrayList<>();

for (DataField dataField : DECIMAL_TYPE.getFields()) {
String name = dataField.name();
int fieldId = dataField.id();
int precision = ((DecimalType) dataField.type()).getPrecision();
int scale = ((DecimalType) dataField.type()).getScale();
Type.Repetition repetition =
dataField.type().isNullable()
? Type.Repetition.OPTIONAL
: Type.Repetition.REQUIRED;

types.add(
Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.as(LogicalTypeAnnotation.decimalType(scale, precision))
.length(computeMinBytesForDecimalPrecision(precision))
.named(name)
.withId(fieldId));
}

MessageType schema = new MessageType("paimon_schema", types);

List<Binary> decimalBytesList = new ArrayList<>();

BigDecimal decimalValue = new BigDecimal("123.67");
int scale = 2;
byte[] decimalBytes =
decimalValue.setScale(scale, RoundingMode.HALF_UP).unscaledValue().toByteArray();
Binary binaryValue = Binary.fromByteArray(decimalBytes);
decimalBytesList.add(binaryValue);

decimalValue = new BigDecimal("12345.67");
decimalBytes =
decimalValue.setScale(scale, RoundingMode.HALF_UP).unscaledValue().toByteArray();
binaryValue = Binary.fromByteArray(decimalBytes);
decimalBytesList.add(binaryValue);

decimalValue = new BigDecimal("1234567.67");
decimalBytes =
decimalValue.setScale(scale, RoundingMode.HALF_UP).unscaledValue().toByteArray();
binaryValue = Binary.fromByteArray(decimalBytes);
decimalBytesList.add(binaryValue);

decimalValue = new BigDecimal("123456789123.67");
decimalBytes =
decimalValue.setScale(scale, RoundingMode.HALF_UP).unscaledValue().toByteArray();
binaryValue = Binary.fromByteArray(decimalBytes);
decimalBytesList.add(binaryValue);

decimalValue = new BigDecimal("123456789123456789123456789123.67");
decimalBytes =
decimalValue.setScale(scale, RoundingMode.HALF_UP).unscaledValue().toByteArray();
binaryValue = Binary.fromByteArray(decimalBytes);
decimalBytesList.add(binaryValue);

try (ParquetWriter<Group> writer =
ExampleParquetWriter.builder(
HadoopOutputFile.fromPath(
new org.apache.hadoop.fs.Path(path.toString()), conf))
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withConf(new Configuration())
.withType(schema)
.build()) {
SimpleGroupFactory simpleGroupFactory = new SimpleGroupFactory(schema);
for (int i = 0; i < rowNum; i++) {

Group row = simpleGroupFactory.newGroup();

for (int j = 0; j < DECIMAL_TYPE.getFields().size(); j++) {
row.append("f" + j, decimalBytesList.get(j));
}

writer.write(row);
}
} catch (Exception e) {
throw new RuntimeException("Create data by parquet origin writer failed.", e);
}
return path;
}

private void createParquetDoubleNestedArray(Group group, int i) {
Group outside = group.addGroup(0);
Group inside = outside.addGroup(0);
Expand Down

0 comments on commit 61b99ac

Please sign in to comment.