From 4b3ea4ba7b6bcae27efc2012548e5d1fc5cf9aaa Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Wed, 22 Jan 2025 10:07:29 -0800 Subject: [PATCH] feat: DH-18399: Add ParquetColumnResolver (#6558) --- .../util/annotations/InternalUseOnly.java | 7 +- .../iceberg/junit5/SqliteCatalogBase.java | 54 +-- extensions/parquet/base/build.gradle | 13 +- .../parquet/base/ColumnChunkReaderImpl.java | 5 +- .../parquet/base/ColumnWriterImpl.java | 9 +- .../parquet/base/RowGroupWriterImpl.java | 24 +- .../parquet/impl/ColumnDescriptorUtil.java | 24 + .../parquet/impl/ParquetSchemaUtil.java | 190 ++++++++ .../deephaven/parquet/impl/package-info.java | 4 + .../parquet/base/TestParquetTimeUtils.java | 31 +- .../parquet/impl/ParquetSchemaUtilTest.java | 163 +++++++ .../parquet/table/ParquetInstructions.java | 52 ++- .../parquet/table/ParquetSchemaReader.java | 5 +- .../table/location/ParquetColumnResolver.java | 46 ++ .../location/ParquetColumnResolverMap.java | 58 +++ .../ParquetFieldIdColumnResolverFactory.java | 157 +++++++ .../table/location/ParquetTableLocation.java | 30 +- .../location/ParquetTableLocationKey.java | 28 +- .../table/ParquetInstructionsTest.java | 56 +++ .../table/ParquetTableReadWriteTest.java | 60 +-- .../parquet/table/TestParquetTools.java | 413 +++++++++++------- ...rquetFieldIdColumnResolverFactoryTest.java | 192 ++++++++ 22 files changed, 1354 insertions(+), 267 deletions(-) create mode 100644 extensions/parquet/base/src/main/java/io/deephaven/parquet/impl/ColumnDescriptorUtil.java create mode 100644 extensions/parquet/base/src/main/java/io/deephaven/parquet/impl/ParquetSchemaUtil.java create mode 100644 extensions/parquet/base/src/main/java/io/deephaven/parquet/impl/package-info.java create mode 100644 extensions/parquet/base/src/test/java/io/deephaven/parquet/impl/ParquetSchemaUtilTest.java create mode 100644 extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnResolver.java create mode 100644 extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnResolverMap.java create mode 100644 extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetFieldIdColumnResolverFactory.java create mode 100644 extensions/parquet/table/src/test/java/io/deephaven/parquet/table/location/ParquetFieldIdColumnResolverFactoryTest.java diff --git a/Util/src/main/java/io/deephaven/util/annotations/InternalUseOnly.java b/Util/src/main/java/io/deephaven/util/annotations/InternalUseOnly.java index 029b30267d3..f7966473bf5 100644 --- a/Util/src/main/java/io/deephaven/util/annotations/InternalUseOnly.java +++ b/Util/src/main/java/io/deephaven/util/annotations/InternalUseOnly.java @@ -9,10 +9,11 @@ import java.lang.annotation.Target; /** - * Indicates that a particular method is for internal use only and should not be used by client code. It is subject to - * change/removal at any time. + * Indicates that a particular {@link ElementType#METHOD method}, {@link ElementType#CONSTRUCTOR constructor}, + * {@link ElementType#TYPE type}, or {@link ElementType#PACKAGE package} is for internal use only and should not be used + * by client code. It is subject to change/removal at any time. */ -@Target({ElementType.METHOD, ElementType.CONSTRUCTOR, ElementType.TYPE}) +@Target({ElementType.METHOD, ElementType.CONSTRUCTOR, ElementType.TYPE, ElementType.PACKAGE}) @Inherited @Documented public @interface InternalUseOnly { diff --git a/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java b/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java index 11962bf71ec..3bb708fb715 100644 --- a/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java +++ b/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java @@ -34,8 +34,9 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.types.Types; -import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -53,6 +54,11 @@ import java.util.List; import java.util.stream.Collectors; import static io.deephaven.engine.testutil.TstUtils.assertTableEquals; +import static org.apache.parquet.schema.LogicalTypeAnnotation.intType; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.Types.buildMessage; +import static org.apache.parquet.schema.Types.optional; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; @@ -416,8 +422,12 @@ void testColumnRenameWhileWriting() throws URISyntaxException { { final List parquetFiles = getAllParquetFilesFromDataFiles(tableIdentifier); assertThat(parquetFiles).hasSize(1); - verifyFieldIdsFromParquetFile(parquetFiles.get(0), originalDefinition.getColumnNames(), - nameToFieldIdFromSchema); + final MessageType expectedSchema = buildMessage() + .addFields( + optional(INT32).id(1).as(intType(32, true)).named("intCol"), + optional(DOUBLE).id(2).named("doubleCol")) + .named("root"); + verifySchema(parquetFiles.get(0), expectedSchema); } final Table moreData = TableTools.emptyTable(5) @@ -442,10 +452,18 @@ void testColumnRenameWhileWriting() throws URISyntaxException { final List parquetFiles = getAllParquetFilesFromDataFiles(tableIdentifier); assertThat(parquetFiles).hasSize(2); - verifyFieldIdsFromParquetFile(parquetFiles.get(0), moreData.getDefinition().getColumnNames(), - newNameToFieldId); - verifyFieldIdsFromParquetFile(parquetFiles.get(1), originalDefinition.getColumnNames(), - nameToFieldIdFromSchema); + final MessageType expectedSchema0 = buildMessage() + .addFields( + optional(INT32).id(1).as(intType(32, true)).named("newIntCol"), + optional(DOUBLE).id(2).named("newDoubleCol")) + .named("root"); + final MessageType expectedSchema1 = buildMessage() + .addFields( + optional(INT32).id(1).as(intType(32, true)).named("intCol"), + optional(DOUBLE).id(2).named("doubleCol")) + .named("root"); + verifySchema(parquetFiles.get(0), expectedSchema0); + verifySchema(parquetFiles.get(1), expectedSchema1); } // TODO: This is failing because we don't map columns based on the column ID when reading. Uncomment this @@ -455,31 +473,13 @@ void testColumnRenameWhileWriting() throws URISyntaxException { // moreData.renameColumns("intCol = newIntCol", "doubleCol = newDoubleCol")), fromIceberg); } - /** - * Verify that the schema of the parquet file read from the provided path has the provided column and corresponding - * field IDs. - */ - private void verifyFieldIdsFromParquetFile( - final String path, - final List columnNames, - final Map nameToFieldId) throws URISyntaxException { + private void verifySchema(String path, MessageType expectedSchema) throws URISyntaxException { final ParquetMetadata metadata = new ParquetTableLocationKey(new URI(path), 0, null, ParquetInstructions.builder() .setSpecialInstructions(dataInstructions()) .build()) .getMetadata(); - final List columnsMetadata = metadata.getFileMetaData().getSchema().getColumns(); - - final int numColumns = columnNames.size(); - for (int colIdx = 0; colIdx < numColumns; colIdx++) { - final String columnName = columnNames.get(colIdx); - final String columnNameFromParquetFile = columnsMetadata.get(colIdx).getPath()[0]; - assertThat(columnName).isEqualTo(columnNameFromParquetFile); - - final int expectedFieldId = nameToFieldId.get(columnName); - final int fieldIdFromParquetFile = columnsMetadata.get(colIdx).getPrimitiveType().getId().intValue(); - assertThat(fieldIdFromParquetFile).isEqualTo(expectedFieldId); - } + assertThat(metadata.getFileMetaData().getSchema()).isEqualTo(expectedSchema); } /** diff --git a/extensions/parquet/base/build.gradle b/extensions/parquet/base/build.gradle index 76bcfd2f7ed..9aff6b1022f 100644 --- a/extensions/parquet/base/build.gradle +++ b/extensions/parquet/base/build.gradle @@ -22,5 +22,16 @@ dependencies { implementation libs.guava compileOnly libs.jetbrains.annotations - testImplementation libs.junit4 + + testImplementation libs.assertj + + testImplementation platform(libs.junit.bom) + testImplementation libs.junit.jupiter + testRuntimeOnly libs.junit.jupiter.engine + testRuntimeOnly libs.junit.platform.launcher +} + +tasks.withType(Test).configureEach { + useJUnitPlatform { + } } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java index 47126bc595a..3c51f99e47d 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java @@ -4,6 +4,7 @@ package io.deephaven.parquet.base; import io.deephaven.UncheckedDeephavenException; +import io.deephaven.parquet.impl.ParquetSchemaUtil; import io.deephaven.util.channel.SeekableChannelContext; import io.deephaven.util.channel.SeekableChannelsProvider; import io.deephaven.parquet.compress.CompressorAdapter; @@ -68,8 +69,8 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader { this.columnName = columnName; this.channelsProvider = channelsProvider; this.columnChunk = columnChunk; - this.path = type - .getColumnDescription(columnChunk.meta_data.getPath_in_schema().toArray(new String[0])); + this.path = + ParquetSchemaUtil.columnDescriptor(type, columnChunk.meta_data.getPath_in_schema()).orElseThrow(); if (columnChunk.getMeta_data().isSetCodec()) { decompressor = DeephavenCompressorAdapterFactory.getInstance() .getByName(columnChunk.getMeta_data().getCodec().name()); diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java index 53e8f47103c..34334df51b2 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java @@ -32,6 +32,7 @@ import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.util.EnumSet; +import java.util.Objects; import java.util.Set; import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt; @@ -76,11 +77,11 @@ final class ColumnWriterImpl implements ColumnWriter { final CompressorAdapter compressorAdapter, final int targetPageSize, final ByteBufferAllocator allocator) { - this.countingOutput = countingOutput; - this.column = column; - this.compressorAdapter = compressorAdapter; + this.countingOutput = Objects.requireNonNull(countingOutput); + this.column = Objects.requireNonNull(column); + this.compressorAdapter = Objects.requireNonNull(compressorAdapter); this.targetPageSize = targetPageSize; - this.allocator = allocator; + this.allocator = Objects.requireNonNull(allocator); dlEncoder = column.getMaxDefinitionLevel() == 0 ? null : new RunLengthBitPackingHybridEncoder( getWidthFromMaxInt(column.getMaxDefinitionLevel()), MIN_SLAB_SIZE, targetPageSize, allocator); diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupWriterImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupWriterImpl.java index 6d387228866..07e80a62505 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupWriterImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupWriterImpl.java @@ -5,6 +5,7 @@ import com.google.common.io.CountingOutputStream; import io.deephaven.parquet.compress.CompressorAdapter; +import io.deephaven.parquet.impl.ParquetSchemaUtil; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; @@ -16,10 +17,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Objects; final class RowGroupWriterImpl implements RowGroupWriter { private final CountingOutputStream countingOutput; - private final MessageType type; + private final MessageType schema; private final int targetPageSize; private final ByteBufferAllocator allocator; private ColumnWriterImpl activeWriter; @@ -28,33 +30,33 @@ final class RowGroupWriterImpl implements RowGroupWriter { private final CompressorAdapter compressorAdapter; RowGroupWriterImpl(CountingOutputStream countingOutput, - MessageType type, + MessageType schema, int targetPageSize, ByteBufferAllocator allocator, CompressorAdapter compressorAdapter) { - this(countingOutput, type, targetPageSize, allocator, new BlockMetaData(), compressorAdapter); + this(countingOutput, schema, targetPageSize, allocator, new BlockMetaData(), compressorAdapter); } private RowGroupWriterImpl(CountingOutputStream countingOutput, - MessageType type, + MessageType schema, int targetPageSize, ByteBufferAllocator allocator, BlockMetaData blockMetaData, CompressorAdapter compressorAdapter) { - this.countingOutput = countingOutput; - this.type = type; + this.countingOutput = Objects.requireNonNull(countingOutput); + this.schema = Objects.requireNonNull(schema); this.targetPageSize = targetPageSize; - this.allocator = allocator; - this.blockMetaData = blockMetaData; - this.compressorAdapter = compressorAdapter; + this.allocator = Objects.requireNonNull(allocator); + this.blockMetaData = Objects.requireNonNull(blockMetaData); + this.compressorAdapter = Objects.requireNonNull(compressorAdapter); } String[] getPrimitivePath(String columnName) { String[] result = {columnName}; Type rollingType; - while (!(rollingType = type.getType(result)).isPrimitive()) { + while (!(rollingType = schema.getType(result)).isPrimitive()) { GroupType groupType = rollingType.asGroupType(); if (groupType.getFieldCount() != 1) { throw new UnsupportedOperationException("Encountered struct at:" + Arrays.toString(result)); @@ -74,7 +76,7 @@ public ColumnWriter addColumn(String columnName) { } activeWriter = new ColumnWriterImpl(this, countingOutput, - type.getColumnDescription(getPrimitivePath(columnName)), + ParquetSchemaUtil.columnDescriptor(schema, getPrimitivePath(columnName)).orElseThrow(), compressorAdapter, targetPageSize, allocator); diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/impl/ColumnDescriptorUtil.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/impl/ColumnDescriptorUtil.java new file mode 100644 index 00000000000..0eb346c3e12 --- /dev/null +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/impl/ColumnDescriptorUtil.java @@ -0,0 +1,24 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.parquet.impl; + +import org.apache.parquet.column.ColumnDescriptor; +import org.jetbrains.annotations.NotNull; + +import javax.annotation.Nullable; + +public final class ColumnDescriptorUtil { + /** + * A more thorough check of {@link ColumnDescriptor} equality. In addition to + * {@link ColumnDescriptor#equals(Object)} which only checks the {@link ColumnDescriptor#getPath()}, this also + * checks for the equality of {@link ColumnDescriptor#getPrimitiveType()}, + * {@link ColumnDescriptor#getMaxRepetitionLevel()}, and {@link ColumnDescriptor#getMaxDefinitionLevel()}. + */ + public static boolean equals(@NotNull ColumnDescriptor x, @Nullable ColumnDescriptor y) { + return x == y || (x.equals(y) + && x.getPrimitiveType().equals(y.getPrimitiveType()) + && x.getMaxRepetitionLevel() == y.getMaxRepetitionLevel() + && x.getMaxDefinitionLevel() == y.getMaxDefinitionLevel()); + } +} diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/impl/ParquetSchemaUtil.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/impl/ParquetSchemaUtil.java new file mode 100644 index 00000000000..5b24b9db8f5 --- /dev/null +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/impl/ParquetSchemaUtil.java @@ -0,0 +1,190 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.parquet.impl; + +import io.deephaven.base.verify.Assert; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Type.Repetition; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Deque; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Predicate; + +/** + * Various improved ways of traversing {@link MessageType}. + */ +public final class ParquetSchemaUtil { + + public interface Visitor { + + /** + * Accept a Parquet column. + * + *

+ * This represents the constituents parts of a {@link ColumnDescriptor} in an easier to consume fashion. In + * particular, it is useful when the consumer wants to iterate the Typed-path from MessageType root to leaf + * without needing to resort to extraneous allocation of {@link MessageType#getType(String...)} or state + * management needed via {@link GroupType#getType(String)}. The arguments of this method can be made into a + * {@link ColumnDescriptor} using {@link ParquetSchemaUtil#makeColumnDescriptor(Collection, PrimitiveType)}. + * + * @param typePath the fully typed path + * @param primitiveType the leaf primitiveType, guaranteed to be the last element of path + */ + void accept(Collection typePath, PrimitiveType primitiveType); + } + + /** + * A more efficient implementation of {@link MessageType#getColumns()}. + */ + public static List columns(MessageType schema) { + final List out = new ArrayList<>(); + walkColumnDescriptors(schema, out::add); + return out; + } + + /** + * A more efficient implementation of {@link MessageType#getPaths()}. + */ + public static List paths(MessageType schema) { + final List out = new ArrayList<>(); + walk(schema, (typePath, primitiveType) -> out.add(makePath(typePath))); + return out; + } + + /** + * An alternative interface for traversing the column descriptors of a Parquet {@code schema}. + */ + public static void walkColumnDescriptors(MessageType schema, Consumer consumer) { + walk(schema, new ColumnDescriptorVisitor(consumer)); + } + + /** + * An alternative interface for traversing the leaf fields of a Parquet {@code schema}. + */ + public static void walk(MessageType schema, Visitor visitor) { + walk(schema, visitor, new ArrayDeque<>()); + } + + /** + * A more efficient implementation of {@link MessageType#getColumnDescription(String[])}. + */ + public static Optional columnDescriptor(MessageType schema, String[] path) { + if (path.length == 0) { + return Optional.empty(); + } + int repeatedCount = 0; + int notRequiredCount = 0; + GroupType current = schema; + for (int i = 0; i < path.length - 1; ++i) { + if (!current.containsField(path[i])) { + return Optional.empty(); + } + final Type field = current.getFields().get(current.getFieldIndex(path[i])); + if (field == null || field.isPrimitive()) { + return Optional.empty(); + } + current = field.asGroupType(); + if (isRepeated(current)) { + ++repeatedCount; + } + if (!isRequired(current)) { + ++notRequiredCount; + } + } + final PrimitiveType primitiveType; + { + if (!current.containsField(path[path.length - 1])) { + return Optional.empty(); + } + final Type field = current.getFields().get(current.getFieldIndex(path[path.length - 1])); + if (field == null || !field.isPrimitive()) { + return Optional.empty(); + } + primitiveType = field.asPrimitiveType(); + if (isRepeated(primitiveType)) { + ++repeatedCount; + } + if (!isRequired(primitiveType)) { + ++notRequiredCount; + } + } + return Optional.of(new ColumnDescriptor(path, primitiveType, repeatedCount, notRequiredCount)); + } + + /** + * A more efficient implementation of {@link MessageType#getColumnDescription(String[])}. + */ + public static Optional columnDescriptor(MessageType schema, List path) { + return columnDescriptor(schema, path.toArray(new String[0])); + } + + public static ColumnDescriptor makeColumnDescriptor(Collection typePath, PrimitiveType primitiveType) { + final String[] path = makePath(typePath); + final int maxRep = (int) typePath.stream().filter(ParquetSchemaUtil::isRepeated).count(); + final int maxDef = (int) typePath.stream().filter(Predicate.not(ParquetSchemaUtil::isRequired)).count(); + return new ColumnDescriptor(path, primitiveType, maxRep, maxDef); + } + + /** + * Checks if {@code schema} contains {@code descriptor} based on + * {@link ColumnDescriptorUtil#equals(ColumnDescriptor, ColumnDescriptor)}. + */ + public static boolean contains(MessageType schema, ColumnDescriptor descriptor) { + return columnDescriptor(schema, descriptor.getPath()) + .filter(cd -> ColumnDescriptorUtil.equals(descriptor, cd)) + .isPresent(); + } + + private static String[] makePath(Collection typePath) { + return typePath.stream().map(Type::getName).toArray(String[]::new); + } + + private static void walk(Type type, Visitor visitor, Deque stack) { + if (type.isPrimitive()) { + visitor.accept(stack, type.asPrimitiveType()); + return; + } + walk(type.asGroupType(), visitor, stack); + } + + private static void walk(GroupType type, Visitor visitor, Deque stack) { + for (final Type field : type.getFields()) { + Assert.eqTrue(stack.offerLast(field), "stack.offerLast(field)"); + walk(field, visitor, stack); + Assert.eq(stack.pollLast(), "stack.pollLast()", field, "field"); + } + } + + private static boolean isRepeated(Type x) { + return x.isRepetition(Repetition.REPEATED); + } + + private static boolean isRequired(Type x) { + return x.isRepetition(Repetition.REQUIRED); + } + + private static class ColumnDescriptorVisitor implements Visitor { + + private final Consumer consumer; + + public ColumnDescriptorVisitor(Consumer consumer) { + this.consumer = Objects.requireNonNull(consumer); + } + + @Override + public void accept(Collection typePath, PrimitiveType primitiveType) { + consumer.accept(makeColumnDescriptor(typePath, primitiveType)); + } + } +} diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/impl/package-info.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/impl/package-info.java new file mode 100644 index 00000000000..d28eb95bb22 --- /dev/null +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/impl/package-info.java @@ -0,0 +1,4 @@ +@InternalUseOnly +package io.deephaven.parquet.impl; + +import io.deephaven.util.annotations.InternalUseOnly; diff --git a/extensions/parquet/base/src/test/java/io/deephaven/parquet/base/TestParquetTimeUtils.java b/extensions/parquet/base/src/test/java/io/deephaven/parquet/base/TestParquetTimeUtils.java index f32f9975cfe..cf2c395c96d 100644 --- a/extensions/parquet/base/src/test/java/io/deephaven/parquet/base/TestParquetTimeUtils.java +++ b/extensions/parquet/base/src/test/java/io/deephaven/parquet/base/TestParquetTimeUtils.java @@ -5,52 +5,53 @@ import io.deephaven.time.DateTimeUtils; import io.deephaven.util.QueryConstants; -import junit.framework.TestCase; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; -public class TestParquetTimeUtils { +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +class TestParquetTimeUtils { @Test - public void testEpochNanosUTC() { + void testEpochNanosUTC() { final long nanos = 123456789123456789L; final Instant dt2 = Instant.ofEpochSecond(0, nanos); final LocalDateTime ldt = LocalDateTime.ofInstant(dt2, ZoneId.of("UTC")); - TestCase.assertEquals(nanos, ParquetTimeUtils.epochNanosUTC(ldt)); - TestCase.assertEquals(QueryConstants.NULL_LONG, ParquetTimeUtils.epochNanosUTC(null)); + assertThat(ParquetTimeUtils.epochNanosUTC(ldt)).isEqualTo(nanos); + assertThat(ParquetTimeUtils.epochNanosUTC(null)).isEqualTo(QueryConstants.NULL_LONG); } @Test - public void testEpochNanosTo() { + void testEpochNanosTo() { final long nanos = 123456789123456789L; final Instant dt2 = Instant.ofEpochSecond(0, nanos); final LocalDateTime ldt = LocalDateTime.ofInstant(dt2, ZoneId.of("UTC")); - TestCase.assertEquals(ldt, ParquetTimeUtils.epochNanosToLocalDateTimeUTC(nanos)); - TestCase.assertNull(ParquetTimeUtils.epochNanosToLocalDateTimeUTC(QueryConstants.NULL_LONG)); + assertThat(ParquetTimeUtils.epochNanosToLocalDateTimeUTC(nanos)).isEqualTo(ldt); + assertThat(ParquetTimeUtils.epochNanosToLocalDateTimeUTC(QueryConstants.NULL_LONG)).isNull(); } @Test - public void testEpochMicrosTo() { + void testEpochMicrosTo() { long nanos = 123456789123456789L; final long micros = DateTimeUtils.nanosToMicros(nanos); nanos = DateTimeUtils.microsToNanos(micros); final Instant dt2 = Instant.ofEpochSecond(0, nanos); final LocalDateTime ldt = LocalDateTime.ofInstant(dt2, ZoneId.of("UTC")); - TestCase.assertEquals(ldt, ParquetTimeUtils.epochMicrosToLocalDateTimeUTC(micros)); - TestCase.assertNull(ParquetTimeUtils.epochMicrosToLocalDateTimeUTC(QueryConstants.NULL_LONG)); + assertThat(ParquetTimeUtils.epochMicrosToLocalDateTimeUTC(micros)).isEqualTo(ldt); + assertThat(ParquetTimeUtils.epochMicrosToLocalDateTimeUTC(QueryConstants.NULL_LONG)).isNull(); } @Test - public void testEpochMillisTo() { + void testEpochMillisTo() { long nanos = 123456789123456789L; final long millis = DateTimeUtils.nanosToMillis(nanos); nanos = DateTimeUtils.millisToNanos(millis); final Instant dt2 = Instant.ofEpochSecond(0, nanos); final LocalDateTime ldt = LocalDateTime.ofInstant(dt2, ZoneId.of("UTC")); - TestCase.assertEquals(ldt, ParquetTimeUtils.epochMillisToLocalDateTimeUTC(millis)); - TestCase.assertNull(ParquetTimeUtils.epochMillisToLocalDateTimeUTC(QueryConstants.NULL_LONG)); + assertThat(ParquetTimeUtils.epochMillisToLocalDateTimeUTC(millis)).isEqualTo(ldt); + assertThat(ParquetTimeUtils.epochMillisToLocalDateTimeUTC(QueryConstants.NULL_LONG)).isNull(); } } diff --git a/extensions/parquet/base/src/test/java/io/deephaven/parquet/impl/ParquetSchemaUtilTest.java b/extensions/parquet/base/src/test/java/io/deephaven/parquet/impl/ParquetSchemaUtilTest.java new file mode 100644 index 00000000000..de6cfaccce3 --- /dev/null +++ b/extensions/parquet/base/src/test/java/io/deephaven/parquet/impl/ParquetSchemaUtilTest.java @@ -0,0 +1,163 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.parquet.impl; + +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.io.InvalidRecordException; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; +import org.junit.jupiter.api.Test; + +import java.util.Comparator; +import java.util.List; +import java.util.function.BiPredicate; + +import static org.apache.parquet.schema.LogicalTypeAnnotation.intType; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.Types.optional; +import static org.apache.parquet.schema.Types.repeated; +import static org.apache.parquet.schema.Types.required; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; + +class ParquetSchemaUtilTest { + + private static final MessageType SCHEMA; + + private static final ColumnDescriptor[] NON_EXISTENT_COLUMNS = { + new ColumnDescriptor(new String[] {"Required"}, optional(INT32).named("Required"), 0, 0), + new ColumnDescriptor(new String[] {"Required"}, repeated(INT32).named("Required"), 0, 0), + new ColumnDescriptor(new String[] {"Required"}, required(INT32).id(42).named("Required"), 0, 0), + new ColumnDescriptor(new String[] {"Required2"}, required(INT32).named("Required"), 0, 0), + new ColumnDescriptor(new String[] {"Required"}, required(INT32).named("Required2"), 0, 0), + new ColumnDescriptor(new String[] {"Required"}, required(INT32).named("Required"), 1, 0), + new ColumnDescriptor(new String[] {"Required"}, required(INT32).named("Required"), 0, 1), + new ColumnDescriptor(new String[] {"Required"}, required(INT64).named("Required"), 0, 0), + new ColumnDescriptor(new String[] {"Required"}, required(INT32).as(intType(16)).named("Required"), 0, + 0), + new ColumnDescriptor(new String[] {"Required"}, optional(INT32).named("Required"), 0, 0), + new ColumnDescriptor(new String[] {}, repeated(INT32).named("Required"), 0, 0) + }; + + private static final String[][] NON_EXISTENT_LEAF_PATHS = { + {}, + {""}, + {"root"}, + {"required"}, + {"repeated"}, + {"optional"}, + {"REQUIRED"}, + {"REPEATED"}, + {"OPTIONAL"}, + {"RequiredGroup"}, + {"RepeatedGroup"}, + {"OptionalGroup"}, + {"RequiredGroup2"}, + {"RepeatedGroup2"}, + {"OptionalGroup2"}, + {"RequiredGroup", ""}, + {"RequiredGroup", "REQUIRED"}, + {"RequiredGroup2", "REQUIRED"}, + {"RequiredGroup2", "RequiredGroup", "REQUIRED"}, + {"RequiredGroup2", "REQUIREDGROUP", "Required"}, + {"REQUIREDGROUP2", "RequiredGroup", "Required"}, + {"foo"}, + {"foo", "bar"}, + {"foo", "bar", "baz"}, + {"foo", "bar", "baz", "zip"}, + }; + + static { + final PrimitiveType required = required(INT32).named("Required"); + final PrimitiveType repeated = repeated(INT32).named("Repeated"); + final PrimitiveType optional = optional(INT32).named("Optional"); + final GroupType l1 = Types.requiredList().element(required).named("L1"); + final GroupType l2 = Types.optionalList().element(required).named("L2"); + final GroupType requiredGroup = Types.requiredGroup() + .addFields(required, repeated, optional, l1, l2) + .named("RequiredGroup"); + final GroupType repeatedGroup = Types.repeatedGroup() + .addFields(required, repeated, optional, l1, l2) + .named("RepeatedGroup"); + final GroupType optionalGroup = Types.optionalGroup() + .addFields(required, repeated, optional, l1, l2) + .named("OptionalGroup"); + final GroupType requiredGroup2 = Types.requiredGroup() + .addFields(required, repeated, optional, l1, l2, requiredGroup, repeatedGroup, optionalGroup) + .named("RequiredGroup2"); + final GroupType repeatedGroup2 = Types.repeatedGroup() + .addFields(required, repeated, optional, l1, l2, requiredGroup, repeatedGroup, optionalGroup) + .named("RepeatedGroup2"); + final GroupType optionalGroup2 = Types.optionalGroup() + .addFields(required, repeated, optional, l1, l2, requiredGroup, repeatedGroup, optionalGroup) + .named("OptionalGroup2"); + SCHEMA = Types.buildMessage() + .addFields(required, repeated, optional, l1, l2, requiredGroup, repeatedGroup, optionalGroup, + requiredGroup2, + repeatedGroup2, optionalGroup2) + .named("root"); + } + + @Test + void columnsEmpty() { + final MessageType schema = Types.buildMessage().named("root"); + final List columns = ParquetSchemaUtil.columns(schema); + assertThat(columns) + .usingElementComparator(equalityMethod(ColumnDescriptorUtil::equals)) + .isEqualTo(schema.getColumns()); + } + + @Test + void columns() { + final List columns = ParquetSchemaUtil.columns(SCHEMA); + assertThat(columns) + .usingElementComparator(equalityMethod(ColumnDescriptorUtil::equals)) + .isEqualTo(SCHEMA.getColumns()); + } + + @Test + void columnDescriptor() { + for (ColumnDescriptor expected : ParquetSchemaUtil.columns(SCHEMA)) { + assertThat(ParquetSchemaUtil.columnDescriptor(SCHEMA, expected.getPath())) + .usingValueComparator(equalityMethod(ColumnDescriptorUtil::equals)) + .hasValue(expected); + // verify Parquet library has same behavior + assertThat(SCHEMA.getColumnDescription(expected.getPath())) + .usingComparator(equalityMethod(ColumnDescriptorUtil::equals)) + .isEqualTo(expected); + } + for (String[] nonExistentPath : NON_EXISTENT_LEAF_PATHS) { + assertThat(ParquetSchemaUtil.columnDescriptor(SCHEMA, nonExistentPath)).isEmpty(); + // verify Parquet library has similar behavior + try { + SCHEMA.getColumnDescription(nonExistentPath); + failBecauseExceptionWasNotThrown(Throwable.class); + } catch (InvalidRecordException | ClassCastException e) { + // good enough + } + } + } + + @Test + void contains() { + for (ColumnDescriptor column : ParquetSchemaUtil.columns(SCHEMA)) { + assertThat(ParquetSchemaUtil.contains(SCHEMA, column)).isTrue(); + } + for (ColumnDescriptor column : NON_EXISTENT_COLUMNS) { + assertThat(ParquetSchemaUtil.contains(SCHEMA, column)).isFalse(); + } + } + + /** + * This is not a valid comparator; it may only be used with assertJ for equality purposes and not comparison + * purposes. See Support specialized equality methods + */ + private static Comparator equalityMethod(BiPredicate predicate) { + // noinspection ComparatorMethodParameterNotUsed + return (o1, o2) -> predicate.test(o1, o2) ? 0 : -1; + } +} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java index d8fa3ff9aa7..e8493fc2241 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java @@ -12,8 +12,10 @@ import io.deephaven.hash.KeyedObjectKey; import io.deephaven.hash.KeyedObjectKey.Basic; import io.deephaven.parquet.base.ParquetUtils; +import io.deephaven.parquet.table.location.ParquetColumnResolver; import io.deephaven.util.annotations.VisibleForTesting; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.FileMetaData; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -167,6 +169,8 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par public abstract Optional>> getIndexColumns(); + public abstract Optional getColumnResolverFactory(); + /** * Creates a new {@link ParquetInstructions} object with the same properties as the current object but definition * set as the provided {@link TableDefinition}. @@ -316,6 +320,11 @@ public Optional>> getIndexColumns() { return Optional.empty(); } + @Override + public Optional getColumnResolverFactory() { + return Optional.empty(); + } + @Override public ParquetInstructions withTableDefinition(@Nullable final TableDefinition useDefinition) { return withTableDefinitionAndLayout(useDefinition, null); @@ -333,7 +342,7 @@ public ParquetInstructions withTableDefinitionAndLayout( return new ReadOnly(null, null, getCompressionCodecName(), getMaximumDictionaryKeys(), getMaximumDictionarySize(), isLegacyParquet(), getTargetPageSize(), isRefreshing(), getSpecialInstructions(), generateMetadataFiles(), baseNameForPartitionedParquetData(), - useLayout, useDefinition, null, null); + useLayout, useDefinition, null, null, null); } @Override @@ -341,7 +350,7 @@ ParquetInstructions withIndexColumns(final Collection> indexColumns return new ReadOnly(null, null, getCompressionCodecName(), getMaximumDictionaryKeys(), getMaximumDictionarySize(), isLegacyParquet(), getTargetPageSize(), isRefreshing(), getSpecialInstructions(), generateMetadataFiles(), baseNameForPartitionedParquetData(), - null, null, indexColumns, null); + null, null, indexColumns, null, null); } @Override @@ -458,6 +467,7 @@ private static final class ReadOnly extends ParquetInstructions { private final TableDefinition tableDefinition; private final Collection> indexColumns; private final OnWriteCompleted onWriteCompleted; + private final ParquetColumnResolver.Factory columnResolver; private ReadOnly( final KeyedObjectHashMap columnNameToInstructions, @@ -474,7 +484,8 @@ private ReadOnly( final ParquetFileLayout fileLayout, final TableDefinition tableDefinition, final Collection> indexColumns, - final OnWriteCompleted onWriteCompleted) { + final OnWriteCompleted onWriteCompleted, + final ParquetColumnResolver.Factory columnResolver) { this.columnNameToInstructions = columnNameToInstructions; this.parquetColumnNameToInstructions = parquetColumnNameToColumnName; this.compressionCodecName = compressionCodecName; @@ -493,6 +504,12 @@ private ReadOnly( .map(List::copyOf) .collect(Collectors.toUnmodifiableList()); this.onWriteCompleted = onWriteCompleted; + this.columnResolver = columnResolver; + if (columnResolver != null) { + if (tableDefinition == null) { + throw new IllegalArgumentException("When setting columnResolver, tableDefinition must be provided"); + } + } } private T getOrDefault(final String columnName, final T defaultValue, @@ -617,6 +634,11 @@ public Optional>> getIndexColumns() { return Optional.ofNullable(indexColumns); } + @Override + public Optional getColumnResolverFactory() { + return Optional.ofNullable(columnResolver); + } + @Override public ParquetInstructions withTableDefinition(@Nullable final TableDefinition useDefinition) { return withTableDefinitionAndLayout(useDefinition, fileLayout); @@ -635,7 +657,7 @@ public ParquetInstructions withTableDefinitionAndLayout( getCompressionCodecName(), getMaximumDictionaryKeys(), getMaximumDictionarySize(), isLegacyParquet(), getTargetPageSize(), isRefreshing(), getSpecialInstructions(), generateMetadataFiles(), baseNameForPartitionedParquetData(), useLayout, useDefinition, - indexColumns, onWriteCompleted); + indexColumns, onWriteCompleted, columnResolver); } @Override @@ -644,7 +666,7 @@ ParquetInstructions withIndexColumns(final Collection> useIndexColu getCompressionCodecName(), getMaximumDictionaryKeys(), getMaximumDictionarySize(), isLegacyParquet(), getTargetPageSize(), isRefreshing(), getSpecialInstructions(), generateMetadataFiles(), baseNameForPartitionedParquetData(), fileLayout, - tableDefinition, useIndexColumns, onWriteCompleted); + tableDefinition, useIndexColumns, onWriteCompleted, columnResolver); } @Override @@ -709,6 +731,7 @@ public static class Builder { private TableDefinition tableDefinition; private Collection> indexColumns; private OnWriteCompleted onWriteCompleted; + private ParquetColumnResolver.Factory columnResolverFactory; /** * For each additional field added, make sure to update the copy constructor builder @@ -737,6 +760,7 @@ public Builder(final ParquetInstructions parquetInstructions) { tableDefinition = readOnlyParquetInstructions.getTableDefinition().orElse(null); indexColumns = readOnlyParquetInstructions.getIndexColumns().orElse(null); onWriteCompleted = readOnlyParquetInstructions.onWriteCompleted().orElse(null); + columnResolverFactory = readOnlyParquetInstructions.getColumnResolverFactory().orElse(null); } public Builder addColumnNameMapping(final String parquetColumnName, final String columnName) { @@ -974,6 +998,22 @@ public Builder setOnWriteCompleted(final OnWriteCompleted onWriteCompleted) { return this; } + /** + * Sets the column resolver factory to allow higher-level managers (such as Iceberg) to use advanced column + * resolution logic based on the table key and table location key. When set, + * {@link #setTableDefinition(TableDefinition)} must also be set. As such, the factory is not used for + * inference purposes. + * + *

+ * This is not typically set by end-users. + * + * @param columnResolverFactory the column resolver factory + */ + public Builder setColumnResolverFactory(ParquetColumnResolver.Factory columnResolverFactory) { + this.columnResolverFactory = columnResolverFactory; + return this; + } + public ParquetInstructions build() { final KeyedObjectHashMap columnNameToInstructionsOut = columnNameToInstructions; columnNameToInstructions = null; @@ -983,7 +1023,7 @@ public ParquetInstructions build() { return new ReadOnly(columnNameToInstructionsOut, parquetColumnNameToColumnNameOut, compressionCodecName, maximumDictionaryKeys, maximumDictionarySize, isLegacyParquet, targetPageSize, isRefreshing, specialInstructions, generateMetadataFiles, baseNameForPartitionedParquetData, fileLayout, - tableDefinition, indexColumns, onWriteCompleted); + tableDefinition, indexColumns, onWriteCompleted, columnResolverFactory); } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java index 658d37bc20a..811a480a186 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java @@ -8,12 +8,12 @@ import io.deephaven.base.ClassUtil; import io.deephaven.base.Pair; import io.deephaven.engine.table.ColumnDefinition; +import io.deephaven.parquet.impl.ParquetSchemaUtil; import io.deephaven.stringset.StringSet; import io.deephaven.engine.table.impl.locations.TableDataException; import io.deephaven.parquet.table.metadata.CodecInfo; import io.deephaven.parquet.table.metadata.ColumnTypeInfo; import io.deephaven.parquet.table.metadata.TableInfo; -import io.deephaven.parquet.base.ParquetFileReader; import io.deephaven.util.SimpleTypeMap; import io.deephaven.vector.ByteVector; import io.deephaven.vector.CharVector; @@ -23,7 +23,6 @@ import io.deephaven.vector.LongVector; import io.deephaven.vector.ObjectVector; import io.deephaven.vector.ShortVector; -import org.apache.parquet.format.converter.ParquetMetadataConverter; import io.deephaven.util.codec.SimpleByteArrayCodec; import io.deephaven.util.codec.UTF8StringAsByteArrayCodec; import org.apache.commons.lang3.mutable.MutableObject; @@ -142,7 +141,7 @@ public static ParquetInstructions readParquetSchema( }; final ParquetMessageDefinition colDef = new ParquetMessageDefinition(); final Map parquetColumnNameToFirstPath = new HashMap<>(); - for (final ColumnDescriptor column : schema.getColumns()) { + for (final ColumnDescriptor column : ParquetSchemaUtil.columns(schema)) { if (column.getMaxRepetitionLevel() > 1) { // TODO (https://github.com/deephaven/deephaven-core/issues/871): Support this throw new UnsupportedOperationException("Unsupported maximum repetition level " diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnResolver.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnResolver.java new file mode 100644 index 00000000000..446b118e4a7 --- /dev/null +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnResolver.java @@ -0,0 +1,46 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.parquet.table.location; + +import io.deephaven.engine.table.impl.locations.TableKey; +import io.deephaven.parquet.table.ParquetInstructions; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.MessageType; + +import java.util.List; +import java.util.Optional; + +/** + * A resolver from Deephaven column names to Parquet paths. + */ +public interface ParquetColumnResolver { + + /** + * A factory for creating Parquet column resolvers. This may be useful in situations where the mapping from a + * Deephaven column name to a Parquet column is not derived directly from the Deephaven column name. + * + * @see ParquetInstructions.Builder#setColumnResolverFactory(Factory) + */ + interface Factory { + + /** + * Create a Parquet column resolver. + * + * @param tableKey the table key + * @param tableLocationKey the Parquet TLK + * @return the Parquet column resolver + */ + ParquetColumnResolver of(TableKey tableKey, ParquetTableLocationKey tableLocationKey); + } + + /** + * The path to the leaf field in the Parquet schema corresponding to the Deephaven {@code columnName}. + * + * @param columnName the column name + * @return the path to the leaf field in the Parquet schema + * @see ColumnDescriptor#getPath() + * @see MessageType#getColumnDescription(String[]) + */ + Optional> of(String columnName); +} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnResolverMap.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnResolverMap.java new file mode 100644 index 00000000000..3c92740cc26 --- /dev/null +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnResolverMap.java @@ -0,0 +1,58 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.parquet.table.location; + +import io.deephaven.annotations.BuildableStyle; +import io.deephaven.util.annotations.InternalUseOnly; +import org.immutables.value.Value; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * A simple {@link ParquetColumnResolver} implementation from a {@link Map}. + */ +@Value.Immutable +@BuildableStyle +public abstract class ParquetColumnResolverMap implements ParquetColumnResolver { + + public static Builder builder() { + return ImmutableParquetColumnResolverMap.builder(); + } + + abstract Map> mapUnsafe(); + + /** + * A map from Deephaven column name to Parquet path. + */ + public final Map> map() { + return mapUnsafe(); + } + + /** + * {@inheritDoc} + * + *

+ * Equivalent to {@code Optional.ofNullable(map().get(columnName))}. + */ + @Override + public Optional> of(String columnName) { + return Optional.ofNullable(map().get(columnName)); + } + + public interface Builder { + + // Ideally, not part of the public interface. + // See https://github.com/immutables/immutables/issues/1534 + @InternalUseOnly + Builder putMapUnsafe(String key, List value); + + default Builder putMap(String key, List value) { + return putMapUnsafe(key, List.copyOf(value)); + } + + ParquetColumnResolverMap build(); + } +} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetFieldIdColumnResolverFactory.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetFieldIdColumnResolverFactory.java new file mode 100644 index 00000000000..0b03216e3f0 --- /dev/null +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetFieldIdColumnResolverFactory.java @@ -0,0 +1,157 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.parquet.table.location; + +import io.deephaven.engine.table.impl.locations.TableKey; +import io.deephaven.parquet.impl.ParquetSchemaUtil; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This {@link ParquetColumnResolver.Factory} resolves Parquet columns via {@link Type#getId() field ids}. The field ids + * are considered for resolution no matter what level in the schema they exist. For example, the following schema has + * field ids at different levels: + * + *

+ * message root {
+ *   required int32 X = 42;
+ *   required group Y (LIST) = 43 {
+ *     repeated group list {
+ *       required int32 element;
+ *     }
+ *   }
+ *   required group Z (LIST) {
+ *     repeated group list {
+ *       required int32 element = 44;
+ *     }
+ *   }
+ * }
+ * 
+ * + * In this example, {@code 42} would be resolvable to {@code [X]}, {@code 43} would be resolvable to + * {@code [Y, list, element]}, and {@code 44} would be resolvable to {@code [Z, list, element]}. + * + *

+ * If a schema has ambiguous field ids (according to this implementation's definition), the resolution will fail if the + * user requests those field ids. For example: + * + *

+ * message root {
+ *   required int32 X = 42;
+ *   required group Y (LIST) = 43 {
+ *     repeated group list {
+ *       required int32 element;
+ *     }
+ *   }
+ *   required group Z (LIST) {
+ *     repeated group list {
+ *       required int32 element = 42;
+ *     }
+ *   }
+ * }
+ * 
+ * + * In this example, if {@code 42} was requested, resolution would fail because it is ambiguous between paths {@code [X]} + * and {@code [Z, list, element]}. If {@code 43} was requested, resolution would succeed. + */ +public final class ParquetFieldIdColumnResolverFactory implements ParquetColumnResolver.Factory { + + /** + * Creates a field id column resolver factory. + * + * @param columnNameToFieldId a map from Deephaven column names to field ids + * @return the column resolver provider + */ + public static ParquetFieldIdColumnResolverFactory of(Map columnNameToFieldId) { + return new ParquetFieldIdColumnResolverFactory(columnNameToFieldId + .entrySet() + .stream() + .collect(Collectors.groupingBy( + Map.Entry::getValue, + Collectors.mapping(Map.Entry::getKey, Collectors.toSet())))); + } + + private final Map> fieldIdsToDhColumnNames; + + private ParquetFieldIdColumnResolverFactory(Map> fieldIdsToDhColumnNames) { + this.fieldIdsToDhColumnNames = Objects.requireNonNull(fieldIdsToDhColumnNames); + } + + /** + * Resolves the requested field ids for {@code schema}. + * + * @param schema the schema + * @return the resolver map + */ + public ParquetColumnResolverMap of(MessageType schema) { + final FieldIdMappingVisitor visitor = new FieldIdMappingVisitor(); + ParquetSchemaUtil.walk(schema, visitor); + return visitor.toResolver(); + } + + /** + * Equivalent to {@code of(tableLocationKey.getFileReader().getSchema())}. + * + * @param tableKey the table key + * @param tableLocationKey the Parquet TLK + * @return the resolver map + * @see #of(MessageType) + */ + @Override + public ParquetColumnResolverMap of(TableKey tableKey, ParquetTableLocationKey tableLocationKey) { + return of(tableLocationKey.getSchema()); + } + + private class FieldIdMappingVisitor implements ParquetSchemaUtil.Visitor { + private final Map> nameToPath = new HashMap<>(); + + public ParquetColumnResolverMap toResolver() { + ParquetColumnResolverMap.Builder builder = ParquetColumnResolverMap.builder(); + for (Map.Entry> e : nameToPath.entrySet()) { + builder.putMap(e.getKey(), e.getValue()); + } + return builder.build(); + } + + @Override + public void accept(Collection typePath, PrimitiveType primitiveType) { + // There are different resolution strategies that could all be reasonable. We could consider using only the + // field id closest to the leaf. This version, however, takes the most general approach and considers field + // ids wherever they appear; ultimately, only being resolvable if the field id mapping is unambiguous. + List path = null; + for (Type type : typePath) { + final Type.ID id = type.getId(); + if (id == null) { + continue; + } + final int fieldId = id.intValue(); + final Set set = fieldIdsToDhColumnNames.get(fieldId); + if (set == null) { + continue; + } + if (path == null) { + path = typePath.stream().map(Type::getName).collect(Collectors.toUnmodifiableList()); + } + for (String columnName : set) { + final List existing = nameToPath.putIfAbsent(columnName, path); + if (existing != null && !existing.equals(path)) { + throw new IllegalArgumentException(String.format( + "Parquet columns can't be unambigously mapped. %s -> %d has multiple paths [%s], [%s]", + columnName, fieldId, String.join(", ", existing), + String.join(", ", path))); + } + } + } + } + } +} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java index 7c8f47636e5..3b11018f7bb 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java @@ -25,6 +25,7 @@ import io.deephaven.parquet.base.ColumnChunkReader; import io.deephaven.parquet.base.ParquetFileReader; import io.deephaven.parquet.base.RowGroupReader; +import io.deephaven.parquet.impl.ParquetSchemaUtil; import io.deephaven.parquet.table.ParquetInstructions; import io.deephaven.parquet.table.ParquetSchemaReader; import io.deephaven.parquet.table.ParquetTools; @@ -34,7 +35,6 @@ import io.deephaven.parquet.table.metadata.SortColumnInfo; import io.deephaven.parquet.table.metadata.TableInfo; import io.deephaven.util.channel.SeekableChannelsProvider; -import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.format.RowGroup; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.jetbrains.annotations.NotNull; @@ -60,6 +60,8 @@ public class ParquetTableLocation extends AbstractTableLocation { private final ParquetFileReader parquetFileReader; private final int[] rowGroupIndices; + private final ParquetColumnResolver resolver; + private final RowGroup[] rowGroups; private final RegionedPageStore.Parameters regionParameters; private final Map parquetColumnNameToPath; @@ -88,7 +90,9 @@ public ParquetTableLocation(@NotNull final TableKey tableKey, parquetMetadata = tableLocationKey.getMetadata(); rowGroupIndices = tableLocationKey.getRowGroupIndices(); } - + resolver = readInstructions.getColumnResolverFactory() + .map(factory -> factory.of(tableKey, tableLocationKey)) + .orElse(null); final int rowGroupCount = rowGroupIndices.length; rowGroups = IntStream.of(rowGroupIndices) .mapToObj(rgi -> parquetFileReader.fileMetaData.getRow_groups().get(rgi)) @@ -99,8 +103,7 @@ public ParquetTableLocation(@NotNull final TableKey tableKey, RegionedColumnSource.ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK, rowGroupCount, maxRowCount); parquetColumnNameToPath = new HashMap<>(); - for (final ColumnDescriptor column : parquetFileReader.getSchema().getColumns()) { - final String[] path = column.getPath(); + for (String[] path : ParquetSchemaUtil.paths(parquetFileReader.getSchema())) { if (path.length > 1) { parquetColumnNameToPath.put(path[0], path); } @@ -182,16 +185,27 @@ public List getSortedColumns() { @NotNull protected ColumnLocation makeColumnLocation(@NotNull final String columnName) { final String parquetColumnName = readInstructions.getParquetColumnNameFromColumnNameOrDefault(columnName); - final String[] columnPath = parquetColumnNameToPath.get(parquetColumnName); - final List nameList = - columnPath == null ? Collections.singletonList(parquetColumnName) : Arrays.asList(columnPath); + final List columnPath = getColumnPath(columnName, parquetColumnName); final ColumnChunkReader[] columnChunkReaders = Arrays.stream(getRowGroupReaders()) - .map(rgr -> rgr.getColumnChunk(columnName, nameList)).toArray(ColumnChunkReader[]::new); + .map(rgr -> rgr.getColumnChunk(columnName, columnPath)) + .toArray(ColumnChunkReader[]::new); final boolean exists = Arrays.stream(columnChunkReaders).anyMatch(ccr -> ccr != null && ccr.numRows() > 0); return new ParquetColumnLocation<>(this, columnName, parquetColumnName, exists ? columnChunkReaders : null); } + private List getColumnPath(@NotNull String columnName, String parquetColumnNameOrDefault) { + if (resolver != null) { + // empty list will result in exists=false + return resolver.of(columnName).orElse(List.of()); + } + final String[] columnPath = parquetColumnNameToPath.get(parquetColumnNameOrDefault); + // noinspection Java9CollectionFactory + return columnPath == null + ? Collections.singletonList(parquetColumnNameOrDefault) + : Collections.unmodifiableList(Arrays.asList(columnPath)); + } + private RowSet computeIndex() { final RowSetBuilderSequential sequentialBuilder = RowSetFactory.builderSequential(); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java index e56054fb859..1f6c07acf78 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java @@ -11,7 +11,9 @@ import io.deephaven.util.channel.SeekableChannelsProviderLoader; import org.apache.commons.io.FilenameUtils; import org.apache.parquet.format.RowGroup; +import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -136,8 +138,8 @@ public synchronized void setFileReader(final ParquetFileReader fileReader) { } /** - * Get a previously-{@link #setMetadata(ParquetMetadata) set} or on-demand created {@link ParquetMetadata} for this - * location key's {@code file}. + * Get a previously-{@link #setMetadata(ParquetMetadata) set} or the {@link ParquetMetadata} for this location key's + * {@code file}. * * @return A {@link ParquetMetadata} for this location key's {@code file}. */ @@ -159,6 +161,28 @@ public synchronized void setMetadata(final ParquetMetadata metadata) { this.metadata = metadata; } + /** + * Equivalent to {@code getMetadata().getFileMetaData()}. + * + * @return the file metadata + * @see #getMetadata() + * @see ParquetMetadata#getFileMetaData() + */ + public FileMetaData getFileMetadata() { + return getMetadata().getFileMetaData(); + } + + /** + * Equivalent to {@code getFileMetadata().getSchema()}. + * + * @return the file metadata + * @see #getFileMetadata() + * @see FileMetaData#getSchema() + */ + public MessageType getSchema() { + return getFileMetadata().getSchema(); + } + /** * Get previously-{@link #setRowGroupIndices(int[]) set} or on-demand created {@link RowGroup} indices for this * location key's current {@link ParquetFileReader}. diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetInstructionsTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetInstructionsTest.java index c1a6a4864b8..9188563c368 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetInstructionsTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetInstructionsTest.java @@ -3,6 +3,11 @@ // package io.deephaven.parquet.table; +import io.deephaven.engine.table.ColumnDefinition; +import io.deephaven.engine.table.TableDefinition; +import io.deephaven.engine.table.impl.locations.TableKey; +import io.deephaven.parquet.table.location.ParquetColumnResolver; +import io.deephaven.parquet.table.location.ParquetTableLocationKey; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; @@ -10,6 +15,27 @@ public class ParquetInstructionsTest { + @Test + public void empty() { + assertThat(ParquetInstructions.EMPTY.getSpecialInstructions()).isNull(); + assertThat(ParquetInstructions.EMPTY.getCompressionCodecName()) + .isEqualTo(ParquetInstructions.DEFAULT_COMPRESSION_CODEC_NAME); + assertThat(ParquetInstructions.EMPTY.getMaximumDictionaryKeys()) + .isEqualTo(ParquetInstructions.DEFAULT_MAXIMUM_DICTIONARY_KEYS); + assertThat(ParquetInstructions.EMPTY.getMaximumDictionarySize()) + .isEqualTo(ParquetInstructions.DEFAULT_MAXIMUM_DICTIONARY_SIZE); + assertThat(ParquetInstructions.EMPTY.isLegacyParquet()).isFalse(); + assertThat(ParquetInstructions.EMPTY.getTargetPageSize()) + .isEqualTo(ParquetInstructions.DEFAULT_TARGET_PAGE_SIZE); + assertThat(ParquetInstructions.EMPTY.isRefreshing()).isFalse(); + assertThat(ParquetInstructions.EMPTY.generateMetadataFiles()).isFalse(); + assertThat(ParquetInstructions.EMPTY.getFileLayout()).isEmpty(); + assertThat(ParquetInstructions.EMPTY.getTableDefinition()).isEmpty(); + assertThat(ParquetInstructions.EMPTY.getIndexColumns()).isEmpty(); + assertThat(ParquetInstructions.EMPTY.getColumnResolverFactory()).isEmpty(); + assertThat(ParquetInstructions.EMPTY.baseNameForPartitionedParquetData()).isEqualTo("{uuid}"); + } + @Test public void setFieldId() { final ParquetInstructions instructions = ParquetInstructions.builder() @@ -119,4 +145,34 @@ public void addColumnNameMappingBadName() { assertThat(e).hasMessageContaining("Invalid column name"); } } + + @Test + public void columnResolver() { + final ParquetInstructions instructions = ParquetInstructions.builder() + .setTableDefinition(TableDefinition.of(ColumnDefinition.ofInt("Foo"))) + .setColumnResolverFactory(ColumnResolverTestImpl.INSTANCE) + .build(); + assertThat(instructions.getColumnResolverFactory()).hasValue(ColumnResolverTestImpl.INSTANCE); + } + + @Test + public void columnResolverNoTableDefinition() { + try { + ParquetInstructions.builder() + .setColumnResolverFactory(ColumnResolverTestImpl.INSTANCE) + .build(); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageContaining("When setting columnResolver, tableDefinition must be provided"); + } + } + + private enum ColumnResolverTestImpl implements ParquetColumnResolver.Factory { + INSTANCE; + + @Override + public ParquetColumnResolver of(TableKey tableKey, ParquetTableLocationKey tableLocationKey) { + throw new UnsupportedOperationException(); + } + } } diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index 530be1c5d6b..fc6000f4d3b 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -71,14 +71,14 @@ import org.apache.commons.lang3.mutable.MutableDouble; import org.apache.commons.lang3.mutable.MutableFloat; import org.apache.commons.lang3.mutable.MutableObject; -import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.LogicalTypeAnnotation; -import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; import org.junit.*; import org.junit.experimental.categories.Category; @@ -131,6 +131,12 @@ import static io.deephaven.parquet.table.ParquetTools.writeTable; import static io.deephaven.parquet.table.ParquetTools.writeTables; import static io.deephaven.util.QueryConstants.*; +import static org.apache.parquet.schema.LogicalTypeAnnotation.decimalType; +import static org.apache.parquet.schema.LogicalTypeAnnotation.intType; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.Types.optional; import static org.junit.Assert.*; @Category(OutOfBandTest.class) @@ -1969,15 +1975,12 @@ public void decimalLogicalTypeTest() { final ParquetMetadata metadata = new ParquetTableLocationKey(new File(path).toURI(), 0, null, ParquetInstructions.EMPTY) .getMetadata(); - final List columnsMetadata = metadata.getFileMetaData().getSchema().getColumns(); - assertEquals("DECIMAL(7,5)", - columnsMetadata.get(0).getPrimitiveType().getLogicalTypeAnnotation().toString()); - assertEquals(PrimitiveType.PrimitiveTypeName.INT32, - columnsMetadata.get(0).getPrimitiveType().getPrimitiveTypeName()); - assertEquals("DECIMAL(12,8)", - columnsMetadata.get(1).getPrimitiveType().getLogicalTypeAnnotation().toString()); - assertEquals(PrimitiveType.PrimitiveTypeName.INT64, - columnsMetadata.get(1).getPrimitiveType().getPrimitiveTypeName()); + final MessageType expectedSchema = Types.buildMessage() + .addFields( + optional(INT32).as(decimalType(5, 7)).named("DecimalIntCol"), + optional(INT64).as(decimalType(8, 12)).named("DecimalLongCol")) + .named("root"); + assertEquals(expectedSchema, metadata.getFileMetaData().getSchema()); assertTableEquals(expected, fromDisk); } @@ -1989,15 +1992,12 @@ public void decimalLogicalTypeTest() { final ParquetMetadata metadata = new ParquetTableLocationKey(new File(path).toURI(), 0, null, ParquetInstructions.EMPTY) .getMetadata(); - final List columnsMetadata = metadata.getFileMetaData().getSchema().getColumns(); - assertEquals("DECIMAL(7,5)", - columnsMetadata.get(0).getPrimitiveType().getLogicalTypeAnnotation().toString()); - assertEquals(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, - columnsMetadata.get(0).getPrimitiveType().getPrimitiveTypeName()); - assertEquals("DECIMAL(12,8)", - columnsMetadata.get(1).getPrimitiveType().getLogicalTypeAnnotation().toString()); - assertEquals(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, - columnsMetadata.get(1).getPrimitiveType().getPrimitiveTypeName()); + final MessageType expectedSchema = Types.buildMessage() + .addFields( + optional(FIXED_LEN_BYTE_ARRAY).length(4).as(decimalType(5, 7)).named("DecimalIntCol"), + optional(FIXED_LEN_BYTE_ARRAY).length(6).as(decimalType(8, 12)).named("DecimalLongCol")) + .named("schema"); // note: this file must have been written down with a different schema name + assertEquals(expectedSchema, metadata.getFileMetaData().getSchema()); assertTableEquals(expected, fromDisk); } } @@ -2578,10 +2578,14 @@ public void testReadUintParquetData() { final ParquetMetadata metadata = new ParquetTableLocationKey(new File(path).toURI(), 0, null, ParquetInstructions.EMPTY).getMetadata(); - final List columnsMetadata = metadata.getFileMetaData().getSchema().getColumns(); - assertTrue(columnsMetadata.get(0).toString().contains("int32 uint8Col (INTEGER(8,false))")); - assertTrue(columnsMetadata.get(1).toString().contains("int32 uint16Col (INTEGER(16,false))")); - assertTrue(columnsMetadata.get(2).toString().contains("int32 uint32Col (INTEGER(32,false))")); + + final MessageType expectedSchema = Types.buildMessage() + .addFields( + optional(INT32).as(intType(8, false)).named("uint8Col"), + optional(INT32).as(intType(16, false)).named("uint16Col"), + optional(INT32).as(intType(32, false)).named("uint32Col")) + .named("schema"); + assertEquals(expectedSchema, metadata.getFileMetaData().getSchema()); final Table expected = newTable( charCol("uint8Col", (char) 255, (char) 2, (char) 0, NULL_CHAR), @@ -3704,25 +3708,25 @@ public void readWriteDateTimeTest() { new ParquetTableLocationKey(dest.toURI(), 0, null, ParquetInstructions.EMPTY).getMetadata(); final ColumnChunkMetaData dateColMetadata = metadata.getBlocks().get(0).getColumns().get(0); assertTrue(dateColMetadata.toString().contains("someDateColumn")); - assertEquals(PrimitiveType.PrimitiveTypeName.INT32, dateColMetadata.getPrimitiveType().getPrimitiveTypeName()); + assertEquals(INT32, dateColMetadata.getPrimitiveType().getPrimitiveTypeName()); assertEquals(LogicalTypeAnnotation.dateType(), dateColMetadata.getPrimitiveType().getLogicalTypeAnnotation()); final ColumnChunkMetaData timeColMetadata = metadata.getBlocks().get(0).getColumns().get(1); assertTrue(timeColMetadata.toString().contains("someTimeColumn")); - assertEquals(PrimitiveType.PrimitiveTypeName.INT64, timeColMetadata.getPrimitiveType().getPrimitiveTypeName()); + assertEquals(INT64, timeColMetadata.getPrimitiveType().getPrimitiveTypeName()); assertEquals(LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.NANOS), timeColMetadata.getPrimitiveType().getLogicalTypeAnnotation()); final ColumnChunkMetaData localDateTimeColMetadata = metadata.getBlocks().get(0).getColumns().get(2); assertTrue(localDateTimeColMetadata.toString().contains("someLocalDateTimeColumn")); - assertEquals(PrimitiveType.PrimitiveTypeName.INT64, + assertEquals(INT64, localDateTimeColMetadata.getPrimitiveType().getPrimitiveTypeName()); assertEquals(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.NANOS), localDateTimeColMetadata.getPrimitiveType().getLogicalTypeAnnotation()); final ColumnChunkMetaData instantColMetadata = metadata.getBlocks().get(0).getColumns().get(3); assertTrue(instantColMetadata.toString().contains("someInstantColumn")); - assertEquals(PrimitiveType.PrimitiveTypeName.INT64, + assertEquals(INT64, instantColMetadata.getPrimitiveType().getPrimitiveTypeName()); assertEquals(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.NANOS), instantColMetadata.getPrimitiveType().getLogicalTypeAnnotation()); diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java index ddb1894d25a..71632046f39 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java @@ -20,6 +20,8 @@ import io.deephaven.engine.util.TableTools; import io.deephaven.parquet.base.InvalidParquetFileException; import io.deephaven.parquet.table.layout.ParquetKeyValuePartitionedLayout; +import io.deephaven.parquet.table.location.ParquetColumnResolver; +import io.deephaven.parquet.table.location.ParquetFieldIdColumnResolverFactory; import io.deephaven.parquet.table.location.ParquetTableLocationKey; import io.deephaven.qst.type.Type; import io.deephaven.stringset.HashStringSet; @@ -52,6 +54,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.function.Function; import java.util.stream.IntStream; import java.util.stream.LongStream; @@ -59,6 +62,8 @@ import static io.deephaven.engine.testutil.TstUtils.assertTableEquals; import static io.deephaven.engine.testutil.TstUtils.tableRangesAreEqual; import static io.deephaven.engine.util.TableTools.*; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -646,25 +651,21 @@ public void testWriteParquetFieldIds() throws NoSuchAlgorithmException, IOExcept // to know whenever serialization changes in any way. assertEquals("2ea68b0ddaeb432e9c2721f15460b6c42449a479c1960e836f6ebe3b14f33dc1", sha256sum(file.toPath())); - // TODO(deephaven-core#6128): Allow Parquet column access by field_id // This test is a bit circular; but assuming we trust our reading code, we should have relative confidence that // we are writing it down correctly if we can read it correctly. - // { - // final ParquetInstructions readInstructions = ParquetInstructions.builder() - // .setFieldId(BAZ, BAZ_ID) - // .setFieldId(ZAP, ZAP_ID) - // .build(); - // { - // final Table actual = ParquetTools.readTable(file.getPath(), readInstructions); - // assertEquals(td, actual.getDefinition()); - // assertTableEquals(expected, actual); - // } - // { - // final Table actual = ParquetTools.readTable(file.getPath(), readInstructions.withTableDefinition(td)); - // assertEquals(td, actual.getDefinition()); - // assertTableEquals(expected, actual); - // } - // } + { + final ParquetInstructions readInstructions = ParquetInstructions.builder() + .setTableDefinition(td) + .setColumnResolverFactory(ParquetFieldIdColumnResolverFactory.of(Map.of( + BAZ, BAZ_ID, + ZAP, ZAP_ID))) + .build(); + { + final Table actual = ParquetTools.readTable(file.getPath(), readInstructions); + assertEquals(td, actual.getDefinition()); + assertTableEquals(expected, actual); + } + } } /** @@ -689,7 +690,6 @@ public void testWriteParquetFieldIds() throws NoSuchAlgorithmException, IOExcept * * @see Arrow Parquet field_id */ - @Ignore("TODO(deephaven-core#6128): Allow Parquet column access by field_id") @Test public void testParquetFieldIds() { final String file = TestParquetTools.class.getResource("/ReferenceSimpleParquetFieldIds.parquet").getFile(); @@ -714,22 +714,12 @@ public void testParquetFieldIds() { final ColumnDefinition bazCol = ColumnDefinition.ofLong(BAZ); final ColumnDefinition zapCol = ColumnDefinition.ofString(ZAP); + final TableDefinition td = TableDefinition.of(bazCol, zapCol); final ParquetInstructions instructions = ParquetInstructions.builder() - .setFieldId(BAZ, BAZ_ID) - .setFieldId(ZAP, ZAP_ID) + .setTableDefinition(td) + .setColumnResolverFactory(ParquetFieldIdColumnResolverFactory.of(Map.of(BAZ, BAZ_ID, ZAP, ZAP_ID))) .build(); - final TableDefinition td = TableDefinition.of(bazCol, zapCol); - - // It's enough to just provide the mapping based on field_id - { - final Table table = ParquetTools.readTable(file, instructions); - assertEquals(td, table.getDefinition()); - assertTableEquals(newTable(td, - longCol(BAZ, 99, 101), - stringCol(ZAP, "Foo", "Bar")), table); - } - // But, the user can still provide a TableDefinition { final Table table = ParquetTools.readTable(file, instructions.withTableDefinition(td)); @@ -754,18 +744,21 @@ public void testParquetFieldIds() { ColumnDefinition.ofLong(BAZ), ColumnDefinition.ofString("column_53f0de5ae06f476eb82aa3f9294fcd05")); final ParquetInstructions partialInstructions = ParquetInstructions.builder() - .setFieldId(BAZ, BAZ_ID) + .setTableDefinition(partialTD) + .setColumnResolverFactory(ParquetFieldIdColumnResolverFactory.of(Map.of(BAZ, BAZ_ID))) .build(); final Table table = ParquetTools.readTable(file, partialInstructions); assertEquals(partialTD, table.getDefinition()); } - // There are no errors if a field ID is configured but not found; it won't be inferred. + // There are no errors if a field ID is configured but not found { final Table table = ParquetTools.readTable(file, ParquetInstructions.builder() - .setFieldId(BAZ, BAZ_ID) - .setFieldId(ZAP, ZAP_ID) - .setFieldId("Fake", 99) + .setTableDefinition(td) + .setColumnResolverFactory(ParquetFieldIdColumnResolverFactory.of(Map.of( + BAZ, BAZ_ID, + ZAP, ZAP_ID, + "Fake", 99))) .build()); assertEquals(td, table.getDefinition()); assertTableEquals(newTable(td, @@ -778,11 +771,12 @@ public void testParquetFieldIds() { final TableDefinition tdWithFake = TableDefinition.of(bazCol, zapCol, ColumnDefinition.ofShort("Fake")); final Table table = ParquetTools.readTable(file, ParquetInstructions.builder() - .setFieldId(BAZ, BAZ_ID) - .setFieldId(ZAP, ZAP_ID) - .setFieldId("Fake", 99) - .build() - .withTableDefinition(tdWithFake)); + .setTableDefinition(tdWithFake) + .setColumnResolverFactory(ParquetFieldIdColumnResolverFactory.of(Map.of( + BAZ, BAZ_ID, + ZAP, ZAP_ID, + "Fake", 99))) + .build()); assertEquals(tdWithFake, table.getDefinition()); assertTableEquals(newTable(tdWithFake, longCol(BAZ, 99, 101), @@ -796,57 +790,33 @@ public void testParquetFieldIds() { final TableDefinition dupeTd = TableDefinition.of(bazCol, zapCol, ColumnDefinition.ofLong(BAZ_DUPE)); final ParquetInstructions dupeInstructions = ParquetInstructions.builder() - .setFieldId(BAZ, BAZ_ID) - .setFieldId(ZAP, ZAP_ID) - .setFieldId(BAZ_DUPE, BAZ_ID) + .setTableDefinition(dupeTd) + .setColumnResolverFactory(ParquetFieldIdColumnResolverFactory.of(Map.of( + BAZ, BAZ_ID, + ZAP, ZAP_ID, + BAZ_DUPE, BAZ_ID))) .build(); { - final Table table = ParquetTools.readTable(file, dupeInstructions.withTableDefinition(dupeTd)); + final Table table = ParquetTools.readTable(file, dupeInstructions); assertEquals(dupeTd, table.getDefinition()); assertTableEquals(newTable(dupeTd, longCol(BAZ, 99, 101), stringCol(ZAP, "Foo", "Bar"), longCol(BAZ_DUPE, 99, 101)), table); } - - // In the case where we have dupe field IDs and don't provide an explicit definition, we are preferring to - // fail during the inference step - { - try { - ParquetTools.readTable(file, dupeInstructions); - Assertions.failBecauseExceptionWasNotThrown(IllegalArgumentException.class); - } catch (IllegalArgumentException e) { - Assertions.assertThat(e).hasMessageContaining("Non-unique Field ID mapping provided"); - } - } } - // If both a field id and parquet column name mapping is provided, they need to map to the same parquet column. + // If both a field id and parquet column name mapping is provided, column resolution wins { final TableDefinition bazTd = TableDefinition.of(bazCol); final ParquetInstructions inconsistent = ParquetInstructions.builder() - .setFieldId(BAZ, BAZ_ID) + .setTableDefinition(bazTd) + .setColumnResolverFactory(ParquetFieldIdColumnResolverFactory.of(Map.of(BAZ, BAZ_ID))) .addColumnNameMapping("53f0de5a-e06f-476e-b82a-a3f9294fcd05", BAZ) .build(); - // In the case where we are inferring the TableDefinition from parquet schema, the inconsistency will be - // noticed up front - try { - ParquetTools.readTable(file, inconsistent); - Assertions.failBecauseExceptionWasNotThrown(IllegalArgumentException.class); - } catch (IllegalArgumentException e) { - Assertions.assertThat(e) - .hasMessageContaining("Supplied ColumnDefinitions include duplicate names [Baz]"); - } - // In the case where we provide a TableDefinition, the inconsistency will be noticed when reading the - // data - try { - // Need to force read of data - ParquetTools.readTable(file, inconsistent.withTableDefinition(bazTd)).select(); - Assertions.failBecauseExceptionWasNotThrown(TableDataException.class); - } catch (TableDataException e) { - Assertions.assertThat(e).getRootCause().hasMessageContaining( - "For columnName=Baz, providing an explicit parquet column name path ([53f0de5a-e06f-476e-b82a-a3f9294fcd05]) and field id (0) mapping, but they are resolving to different columns, byFieldId=[colIx=0, pathKey=[e0cf7927-45dc-4dfc-b4ef-36bf4b6ae463], fieldId=0], byPath=[colIx=1, pathKey=[53f0de5a-e06f-476e-b82a-a3f9294fcd05], fieldId=1]"); - } + final Table table = ParquetTools.readTable(file, inconsistent); + assertEquals(bazTd, table.getDefinition()); + assertTableEquals(newTable(bazTd, longCol(BAZ, 99, 101)), table); } } @@ -877,7 +847,6 @@ public void testParquetFieldIds() { * * @see Arrow Parquet field_id */ - @Ignore("TODO(deephaven-core#6128): Allow Parquet column access by field_id") @Test public void testPartitionedParquetFieldIds() { final String file = TestParquetTools.class.getResource("/ReferencePartitionedFieldIds").getFile(); @@ -890,29 +859,22 @@ public void testPartitionedParquetFieldIds() { final ColumnDefinition partitionColumn = ColumnDefinition.ofInt(PARTITION).withPartitioning(); final ColumnDefinition bazCol = ColumnDefinition.ofLong(BAZ); final ColumnDefinition zapCol = ColumnDefinition.ofString(ZAP); + final TableDefinition expectedTd = TableDefinition.of(partitionColumn, bazCol, zapCol); final ParquetInstructions instructions = ParquetInstructions.builder() - .setFieldId(BAZ, BAZ_ID) - .setFieldId(ZAP, ZAP_ID) + .setTableDefinition(expectedTd) + .setColumnResolverFactory(ParquetFieldIdColumnResolverFactory.of(Map.of( + BAZ, BAZ_ID, + ZAP, ZAP_ID))) .build(); - - final TableDefinition expectedTd = TableDefinition.of(partitionColumn, bazCol, zapCol); - final Table expected = newTable(expectedTd, intCol(PARTITION, 0, 0, 1, 1), longCol(BAZ, 99, 101, 99, 101), stringCol(ZAP, "Foo", "Bar", "Foo", "Bar")); - { final Table actual = ParquetTools.readTable(file, instructions); assertEquals(expectedTd, actual.getDefinition()); assertTableEquals(expected, actual); } - - { - final Table actual = ParquetTools.readTable(file, instructions.withTableDefinition(expectedTd)); - assertEquals(expectedTd, actual.getDefinition()); - assertTableEquals(expected, actual); - } } /** @@ -933,14 +895,14 @@ public void testPartitionedParquetFieldIds() { * * @see Arrow Parquet field_id */ - @Ignore("TODO(deephaven-core#6128): Allow Parquet column access by field_id") @Test public void testParquetFieldIdsWithListType() { final String file = TestParquetTools.class.getResource("/ReferenceListParquetFieldIds.parquet").getFile(); final String FOO = "Foo"; final TableDefinition td = TableDefinition.of(ColumnDefinition.of(FOO, Type.intType().arrayType())); final ParquetInstructions instructions = ParquetInstructions.builder() - .setFieldId(FOO, 999) + .setTableDefinition(td) + .setColumnResolverFactory(ParquetFieldIdColumnResolverFactory.of(Map.of(FOO, 999))) .build(); final Table expected = TableTools.newTable(td, new ColumnHolder<>(FOO, int[].class, int.class, false, new int[] {1, 2, 3}, @@ -952,11 +914,6 @@ public void testParquetFieldIdsWithListType() { assertEquals(td, actual.getDefinition()); assertTableEquals(expected, actual); } - { - final Table actual = ParquetTools.readTable(file, instructions.withTableDefinition(td)); - assertEquals(td, actual.getDefinition()); - assertTableEquals(expected, actual); - } } /** @@ -965,73 +922,111 @@ public void testParquetFieldIdsWithListType() { * this column as "FirstName". Both standalone, and in combination with the newer file, we should be able to read it * with the latest schema. */ - @Ignore("TODO(deephaven-core#6128): Allow Parquet column access by field_id") @Test - public void testRenamingResolveViaFieldId() { + public void testRenamingColumnResolver() { final File f1 = new File(testRoot, "testRenamingResolveViaFieldId.00.parquet"); final File f2 = new File(testRoot, "testRenamingResolveViaFieldId.01.parquet"); - final int FIRST_NAME_ID = 15; + final String NAME = "Name"; + final TableDefinition td1 = TableDefinition.of(ColumnDefinition.ofString(NAME)); + final int NAME_ID = 15; + final Table t1 = newTable(td1, stringCol(NAME, "Shivam", "Ryan")); { - final String NAME = "Name"; - final Table t1 = newTable(TableDefinition.of(ColumnDefinition.ofString(NAME)), - stringCol(NAME, "Shivam", "Ryan")); ParquetTools.writeTable(t1, f1.getPath(), ParquetInstructions.builder() - .setFieldId(NAME, FIRST_NAME_ID) + .setFieldId(NAME, NAME_ID) .build()); } - final int LAST_NAME_ID = 16; final String FIRST_NAME = "FirstName"; final String LAST_NAME = "LastName"; - final TableDefinition td = TableDefinition.of( + final TableDefinition td2 = TableDefinition.of( ColumnDefinition.ofString(FIRST_NAME), ColumnDefinition.ofString(LAST_NAME)); - final ParquetInstructions instructions = ParquetInstructions.builder() - .setFieldId(FIRST_NAME, FIRST_NAME_ID) - .setFieldId(LAST_NAME, LAST_NAME_ID) - .build(); + // noinspection UnnecessaryLocalVariable + final int FIRST_NAME_ID = NAME_ID; + final int LAST_NAME_ID = 16; + final Table t2 = newTable(td2, + stringCol(FIRST_NAME, "Pete", "Colin"), + stringCol(LAST_NAME, "Goddard", "Alworth")); { - final Table t = newTable(td, - stringCol(FIRST_NAME, "Pete", "Colin"), - stringCol(LAST_NAME, "Goddard", "Alworth")); - ParquetTools.writeTable(t, f2.getPath(), instructions); + ParquetTools.writeTable(t2, f2.getPath(), ParquetInstructions.builder() + .setFieldId(FIRST_NAME, FIRST_NAME_ID) + .setFieldId(LAST_NAME, LAST_NAME_ID) + .build()); } - // If we read first file without an explicit definition, we should only get the column from the file + final ParquetColumnResolver.Factory resolver = ParquetFieldIdColumnResolverFactory.of(Map.of( + NAME, NAME_ID, + FIRST_NAME, FIRST_NAME_ID, + LAST_NAME, LAST_NAME_ID)); + + // f1 + td1 { - final TableDefinition expectedTd = TableDefinition.of(ColumnDefinition.ofString(FIRST_NAME)); - final Table expected = newTable(expectedTd, stringCol(FIRST_NAME, "Shivam", "Ryan")); - final Table actual = ParquetTools.readTable(f1.getPath(), instructions); - assertEquals(expectedTd, actual.getDefinition()); - assertTableEquals(expected, actual); + final Table actual = ParquetTools.readTable(f1.getPath(), ParquetInstructions.builder() + .setTableDefinition(td1) + .setColumnResolverFactory(resolver) + .build()); + assertEquals(td1, actual.getDefinition()); + assertTableEquals(t1, actual); } - // If we read first file with an explicit definition, the new column should return nulls + // f1 + td2 { - final Table expected = newTable(td, + final Table expected = newTable(td2, stringCol(FIRST_NAME, "Shivam", "Ryan"), stringCol(LAST_NAME, null, null)); - final Table actual = ParquetTools.readTable(f1.getPath(), instructions.withTableDefinition(td)); - assertEquals(td, actual.getDefinition()); + final Table actual = ParquetTools.readTable(f1.getPath(), ParquetInstructions.builder() + .setTableDefinition(td2) + .setColumnResolverFactory(resolver) + .build()); + assertEquals(td2, actual.getDefinition()); assertTableEquals(expected, actual); } - // We should be able to read both (flat partitioning) with the latest schema + // f2 + td1 { - final Table expected = newTable(td, + final Table expected = newTable(td1, stringCol(NAME, "Pete", "Colin")); + final Table actual = ParquetTools.readTable(f2.getPath(), ParquetInstructions.builder() + .setTableDefinition(td1) + .setColumnResolverFactory(resolver) + .build()); + assertEquals(td1, actual.getDefinition()); + assertTableEquals(expected, actual); + } + + // f2 + td2 + { + final Table actual = ParquetTools.readTable(f2.getPath(), ParquetInstructions.builder() + .setTableDefinition(td2) + .setColumnResolverFactory(resolver) + .build()); + assertEquals(td2, actual.getDefinition()); + assertTableEquals(t2, actual); + } + + // (f1, f2) + td1 + { + final Table expected = newTable(td1, + stringCol(NAME, "Shivam", "Ryan", "Pete", "Colin")); + final Table actual = ParquetTools.readTable(testRoot, ParquetInstructions.builder() + .setTableDefinition(td1) + .setColumnResolverFactory(resolver) + .build()); + assertEquals(td1, actual.getDefinition()); + assertTableEquals(expected, actual); + } + + // (f1, f2) + td2 + { + final Table expected = newTable(td2, stringCol(FIRST_NAME, "Shivam", "Ryan", "Pete", "Colin"), stringCol(LAST_NAME, null, null, "Goddard", "Alworth")); - { - final Table actual = ParquetTools.readTable(testRoot, instructions); - assertEquals(td, actual.getDefinition()); - assertTableEquals(expected, actual); - } - { - final Table actual = ParquetTools.readTable(testRoot, instructions.withTableDefinition(td)); - assertEquals(td, actual.getDefinition()); - assertTableEquals(expected, actual); - } + final Table actual = ParquetTools.readTable(testRoot, ParquetInstructions.builder() + .setTableDefinition(td2) + .setColumnResolverFactory(resolver) + .build()); + assertEquals(td2, actual.getDefinition()); + assertTableEquals(expected, actual); } } @@ -1055,28 +1050,53 @@ public void parquetWithNonUniqueFieldIds() { } { - final String BAZ = "Baz"; - final ParquetInstructions bazInstructions = ParquetInstructions.builder() - .setFieldId(BAZ, fieldId) - .build(); + final MessageType expectedSchema = Types.buildMessage() + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .as(LogicalTypeAnnotation.intType(32)) + .id(fieldId) + .named(FOO) + .optional(PrimitiveType.PrimitiveTypeName.BINARY) + .as(LogicalTypeAnnotation.stringType()) + .id(fieldId) + .named(BAR) + .named(MappedSchema.SCHEMA_NAME); + final MessageType actualSchema = readSchema(f); + assertEquals(expectedSchema, actualSchema); + } - // fieldId _won't_ be used to actually create a Baz column since the underlying file has multiple. In this - // case, we just infer the physical parquet column names. - { + // normal + no definition + { + final Table actual = ParquetTools.readTable(f.getPath()); + assertEquals(td, actual.getDefinition()); + assertTableEquals(expected, actual); + } - final Table actual = ParquetTools.readTable(f.getPath(), bazInstructions); - assertEquals(td, actual.getDefinition()); - assertTableEquals(expected, actual); - } + // normal + definition + { + final Table actual = ParquetTools.readTable(f.getPath(), ParquetInstructions.builder() + .setTableDefinition(td) + .build()); + assertEquals(td, actual.getDefinition()); + assertTableEquals(expected, actual); + } - // If the user explicitly asks for a definition with a mapping to a non-unique field id, they will get back - // the column of default (null) values. - { - final TableDefinition bazTd = TableDefinition.of(ColumnDefinition.ofInt(BAZ)); - final Table bazTable = newTable(bazTd, intCol(BAZ, QueryConstants.NULL_INT, QueryConstants.NULL_INT)); - final Table actual = ParquetTools.readTable(f.getPath(), bazInstructions.withTableDefinition(bazTd)); - assertEquals(bazTd, actual.getDefinition()); - assertTableEquals(bazTable, actual); + // ParquetColumnResolverFieldIds will not work given the duplicate field IDs in the file + { + final Table table = ParquetTools.readTable(f.getPath(), ParquetInstructions.builder() + .setTableDefinition(td) + .setColumnResolverFactory(ParquetFieldIdColumnResolverFactory.of(Map.of( + FOO, fieldId, + BAR, fieldId))) + .build()); + + // Only noticed when we build ParquetTableLocation; if necessary, we could refactor the implementation to + // notice this earlier on. + try { + table.select(); + failBecauseExceptionWasNotThrown(IllegalStateException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageContaining( + "Parquet columns can't be unambigously mapped. Bar -> 31337 has multiple paths [Foo], [Bar]"); } } } @@ -1124,7 +1144,7 @@ public void nestedMessageEmpty() { ParquetTools.readTable(file); Assertions.failBecauseExceptionWasNotThrown(UnsupportedOperationException.class); } catch (UnsupportedOperationException e) { - Assertions.assertThat(e) + assertThat(e) .hasMessageContaining("Encountered unsupported multi-column field MyStruct"); } } @@ -1173,11 +1193,90 @@ public void nestedMessage1Row() { ParquetTools.readTable(file); Assertions.failBecauseExceptionWasNotThrown(UnsupportedOperationException.class); } catch (UnsupportedOperationException e) { - Assertions.assertThat(e) + assertThat(e) .hasMessageContaining("Encountered unsupported multi-column field MyStruct"); } } + @Test + public void intArray() { + final File f = new File(testRoot, "intArray.parquet"); + final String FOO = "Foo"; + final TableDefinition td = TableDefinition.of(ColumnDefinition.of(FOO, Type.intType().arrayType())); + final Table table = TableTools.newTable(td, new ColumnHolder<>(FOO, int[].class, int.class, false, + new int[] {1, 2, 3}, + null, + new int[0], + new int[] {42})); + { + ParquetTools.writeTable(table, f.getPath()); + } + + { + final MessageType expectedSchema = Types.buildMessage() + .optionalList() + .optionalElement(PrimitiveType.PrimitiveTypeName.INT32) + .as(LogicalTypeAnnotation.intType(32)) + .named(FOO) + .named(MappedSchema.SCHEMA_NAME); + assertEquals(readSchema(f), expectedSchema); + } + + { + final Table actual = ParquetTools.readTable(f.getPath()); + assertEquals(td, actual.getDefinition()); + assertTableEquals(table, actual); + } + + { + final Table actual = ParquetTools.readTable(f.getPath(), ParquetInstructions.builder() + .setTableDefinition(td) + .build()); + assertEquals(td, actual.getDefinition()); + assertTableEquals(table, actual); + } + } + + @Test + public void stringArray() { + final File f = new File(testRoot, "stringArray.parquet"); + final String FOO = "Foo"; + final TableDefinition td = TableDefinition.of(ColumnDefinition.of(FOO, Type.stringType().arrayType())); + final Table expected = TableTools.newTable(td, new ColumnHolder<>(FOO, String[].class, String.class, false, + new String[] {null, "", "Hello, world!"}, + null, + new String[0], + new String[] {"42"})); + { + ParquetTools.writeTable(expected, f.getPath()); + } + + { + final MessageType expectedSchema = Types.buildMessage() + .optionalList() + .optionalElement(PrimitiveType.PrimitiveTypeName.BINARY) + .as(LogicalTypeAnnotation.stringType()) + .named(FOO) + .named(MappedSchema.SCHEMA_NAME); + assertEquals(readSchema(f), expectedSchema); + } + + { + final Table actual = ParquetTools.readTable(f.getPath()); + assertEquals(td, actual.getDefinition()); + assertTableEquals(expected, actual); + } + + { + final ParquetInstructions instructions = ParquetInstructions.builder() + .setTableDefinition(td) + .build(); + final Table actual = ParquetTools.readTable(f.getPath(), instructions); + assertEquals(td, actual.getDefinition()); + assertTableEquals(expected, actual); + } + } + private static String sha256sum(Path path) throws NoSuchAlgorithmException, IOException { final MessageDigest digest = MessageDigest.getInstance("SHA-256"); final DigestOutputStream out = new DigestOutputStream(OutputStream.nullOutputStream(), digest); diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/location/ParquetFieldIdColumnResolverFactoryTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/location/ParquetFieldIdColumnResolverFactoryTest.java new file mode 100644 index 00000000000..483d77a75d5 --- /dev/null +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/location/ParquetFieldIdColumnResolverFactoryTest.java @@ -0,0 +1,192 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.parquet.table.location; + +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; + +public class ParquetFieldIdColumnResolverFactoryTest { + + // foo (42) + private static final PrimitiveType FOO_42 = Types.required(INT32) + .id(42) + .named("foo"); + + // bar (43), list, element + private static final GroupType BAR_43 = Types.requiredList() + .id(43) + .requiredElement(INT32) + .named("bar"); + + // baz, list, element (44) + private static final GroupType BAZ_44 = Types.requiredList() + .requiredElement(INT32) + .id(44) + .named("baz"); + + private static final ParquetFieldIdColumnResolverFactory FACTORY = ParquetFieldIdColumnResolverFactory.of(Map.of( + "DeepFoo", 42, + "DeepBar", 43, + "DeepBaz", 44)); + + private static String[] p(String... path) { + return path; + } + + @Test + public void messageFields() { + final MessageType schema = Types.buildMessage() + .addFields(FOO_42, BAR_43, BAZ_44) + .named("root"); + final ParquetColumnResolverMap expected = ParquetColumnResolverMap.builder() + .putMap("DeepFoo", List.of("foo")) + .putMap("DeepBar", List.of("bar", "list", "element")) + .putMap("DeepBaz", List.of("baz", "list", "element")) + .build(); + assertThat(FACTORY.of(schema)).isEqualTo(expected); + } + + @Test + public void messageGroupFields() { + final MessageType schema = Types.buildMessage() + .addFields(Types.repeatedGroup() + .addFields(FOO_42, BAR_43, BAZ_44) + .named("my_group")) + .named("root"); + final ParquetColumnResolverMap expected = ParquetColumnResolverMap.builder() + .putMap("DeepFoo", List.of("my_group", "foo")) + .putMap("DeepBar", List.of("my_group", "bar", "list", "element")) + .putMap("DeepBaz", List.of("my_group", "baz", "list", "element")) + .build(); + assertThat(FACTORY.of(schema)).isEqualTo(expected); + } + + @Test + public void messageListElements() { + final MessageType schema = Types.buildMessage() + .addFields( + Types.requiredList().element(FOO_42).named("my_list1"), + Types.requiredList().element(BAR_43).named("my_list2"), + Types.requiredList().element(BAZ_44).named("my_list3")) + .named("root"); + final ParquetColumnResolverMap expected = ParquetColumnResolverMap.builder() + .putMap("DeepFoo", List.of("my_list1", "list", "foo")) + .putMap("DeepBar", List.of("my_list2", "list", "bar", "list", "element")) + .putMap("DeepBaz", List.of("my_list3", "list", "baz", "list", "element")) + .build(); + assertThat(FACTORY.of(schema)).isEqualTo(expected); + } + + @Test + public void singleFieldMultipleIdsUnambiguous() { + final ParquetFieldIdColumnResolverFactory factory = ParquetFieldIdColumnResolverFactory.of(Map.of( + "Col1", 1, + "Col2", 2)); + + // BothCols (1), list, element (2) + final MessageType schema = Types.buildMessage().addFields(Types.requiredList() + .id(1) + .requiredElement(INT32) + .id(2) + .named("BothCols")) + .named("root"); + + final ParquetColumnResolverMap expected = ParquetColumnResolverMap.builder() + .putMap("Col1", List.of("BothCols", "list", "element")) + .putMap("Col2", List.of("BothCols", "list", "element")) + .build(); + + assertThat(factory.of(schema)).isEqualTo(expected); + } + + @Test + public void singleFieldRepeatedIds() { + final ParquetFieldIdColumnResolverFactory factory = ParquetFieldIdColumnResolverFactory.of(Map.of("Col1", 42)); + // X (42), list, element (42) + final MessageType schema = Types.buildMessage().addFields(Types.requiredList() + .id(42) + .requiredElement(INT32) + .id(42) + .named("X")) + .named("root"); + + final ParquetColumnResolverMap expected = ParquetColumnResolverMap.builder() + .putMap("Col1", List.of("X", "list", "element")) + .build(); + + // This resolution strategy is a little bit questionable... but it is unambiguous. If instead we needed to also + // provide the user with a single resulting Type with said field id, this strategy would not work. + assertThat(factory.of(schema)).isEqualTo(expected); + } + + @Test + public void ambiguousFields() { + // X (1) + // Y (1) + // Z (2) + final MessageType schema = Types.buildMessage() + .addFields( + Types.required(INT32).id(1).named("X"), + Types.required(INT32).id(1).named("Y"), + Types.required(INT32).id(2).named("Z")) + .named("root"); + try { + ParquetFieldIdColumnResolverFactory.of(Map.of("Col1", 1)).of(schema); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageContaining("Col1 -> 1 has multiple paths [X], [Y]"); + } + // Does not fail if ambiguous id is not referenced + { + final ParquetColumnResolverMap expected = ParquetColumnResolverMap.builder() + .putMap("ColZ", List.of("Z")) + .build(); + final ParquetColumnResolverMap actual = + ParquetFieldIdColumnResolverFactory.of(Map.of("ColZ", 2)).of(schema); + assertThat(actual).isEqualTo(expected); + } + } + + @Test + public void ambiguousFieldsNested() { + // X (1), list, element (2) + // Y (2), list, element (1) + // Z (3) + final MessageType schema = Types.buildMessage() + .addFields( + Types.requiredList().id(1).requiredElement(INT32).id(2).named("X"), + Types.requiredList().id(2).requiredElement(INT32).id(1).named("Y"), + Types.required(INT32).id(3).named("Z")) + .named("root"); + System.out.println(schema); + // Note: a different implementation _could_ take a different course of action here and proceed without error; + // for example, an implementation could choose to consider the innermost (or outermost) field id for matching + // purposes. + try { + ParquetFieldIdColumnResolverFactory.of(Map.of("Col1", 1)).of(schema); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageContaining("Col1 -> 1 has multiple paths [X, list, element], [Y, list, element]"); + } + // Does not fail if ambiguous id is not referenced + { + final ParquetColumnResolverMap expected = ParquetColumnResolverMap.builder() + .putMap("ColZ", List.of("Z")) + .build(); + final ParquetColumnResolverMap actual = + ParquetFieldIdColumnResolverFactory.of(Map.of("ColZ", 3)).of(schema); + assertThat(actual).isEqualTo(expected); + } + } +}