From 50e540848cf7e2b67cc1e88a58b10a9201da301a Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Wed, 15 Jan 2025 16:40:16 -0800 Subject: [PATCH] feat: parse Parquet schema from ParquetMetadataConverter (#6550) This removes the DH's duplication of schema reading logic, and removes the warning > DO NOT RELY ON {@link ParquetMetadataConverter} FOR THIS! USE There has been no tests to indicate that using ParquetMetadataConverter causes any issues. It may have been true historically, but does not seem to be true anymore. --- .../parquet/base/ParquetFileReader.java | 242 ++---------------- .../parquet/table/ParquetSchemaReader.java | 6 +- .../layout/ParquetMetadataFileLayout.java | 37 +-- .../location/ParquetTableLocationKey.java | 10 +- 4 files changed, 34 insertions(+), 261 deletions(-) diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java index 87ce0690771..137c3f61484 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java @@ -7,11 +7,10 @@ import io.deephaven.util.channel.SeekableChannelContext; import io.deephaven.util.channel.SeekableChannelsProvider; import org.apache.parquet.format.*; -import org.apache.parquet.format.ColumnOrder; -import org.apache.parquet.format.Type; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.*; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import java.io.File; import java.io.IOException; @@ -31,14 +30,16 @@ public class ParquetFileReader { private static final int FOOTER_LENGTH_SIZE = 4; public static final String FILE_URI_SCHEME = "file"; + private static final ParquetMetadataConverter PARQUET_METADATA_CONVERTER = new ParquetMetadataConverter(); + public final FileMetaData fileMetaData; + private final ParquetMetadata metadata; private final SeekableChannelsProvider channelsProvider; /** * If reading a single parquet file, root URI is the URI of the file, else the parent directory for a metadata file */ private final URI rootURI; - private final MessageType schema; /** * Make a {@link ParquetFileReader} for the supplied {@link URI}. Wraps {@link IOException} as @@ -79,10 +80,18 @@ private ParquetFileReader( final SeekableByteChannel ch = channelsProvider.getReadChannel(context, parquetFileURI)) { final int footerLength = positionToFileMetadata(parquetFileURI, ch); try (final InputStream in = channelsProvider.getInputStream(ch, footerLength)) { + // Ideally, we would be able to get rid of our dependency on the underlying thrift structures, but there + // is a non-trivial chain of usages stemming from fileMetaData. For now, we will create ParquetMetadata + // in a two-step process that preserves the thrift structure. + // metadata = PARQUET_METADATA_CONVERTER.readParquetMetadata(in, ParquetMetadataConverter.NO_FILTER); fileMetaData = Util.readFileMetaData(in); } } - schema = fromParquetSchema(fileMetaData.schema, fileMetaData.column_orders); + try { + metadata = PARQUET_METADATA_CONVERTER.fromParquetMetadata(fileMetaData); + } catch (IOException e) { + throw new IOException("Failed to convert Parquet file metadata: " + parquetFileURI, e); + } } /** @@ -138,231 +147,16 @@ public RowGroupReader getRowGroup(final int groupNumber, final String version) { fileMetaData.getRow_groups().get(groupNumber), channelsProvider, rootURI, - schema, + getSchema(), getSchema(), version); } - private static MessageType fromParquetSchema(List schema, List columnOrders) - throws ParquetFileReaderException { - final Iterator iterator = schema.iterator(); - final SchemaElement root = iterator.next(); - final Types.MessageTypeBuilder builder = Types.buildMessage(); - if (root.isSetField_id()) { - builder.id(root.field_id); - } - buildChildren(builder, iterator, root.getNum_children(), columnOrders, 0); - return builder.named(root.name); - } - - private static void buildChildren(Types.GroupBuilder builder, Iterator schema, - int childrenCount, List columnOrders, int columnCount) throws ParquetFileReaderException { - for (int i = 0; i < childrenCount; ++i) { - SchemaElement schemaElement = schema.next(); - Object childBuilder; - if (schemaElement.type != null) { - Types.PrimitiveBuilder primitiveBuilder = - builder.primitive(getPrimitive(schemaElement.type), - fromParquetRepetition(schemaElement.repetition_type)); - if (schemaElement.isSetType_length()) { - primitiveBuilder.length(schemaElement.type_length); - } - - if (schemaElement.isSetPrecision()) { - primitiveBuilder.precision(schemaElement.precision); - } - - if (schemaElement.isSetScale()) { - primitiveBuilder.scale(schemaElement.scale); - } - - if (columnOrders != null) { - org.apache.parquet.schema.ColumnOrder columnOrder = - fromParquetColumnOrder(columnOrders.get(columnCount)); - if (columnOrder - .getColumnOrderName() == org.apache.parquet.schema.ColumnOrder.ColumnOrderName.TYPE_DEFINED_ORDER - && (schemaElement.type == Type.INT96 - || schemaElement.converted_type == ConvertedType.INTERVAL)) { - columnOrder = org.apache.parquet.schema.ColumnOrder.undefined(); - } - - primitiveBuilder.columnOrder(columnOrder); - } - - childBuilder = primitiveBuilder; - } else { - childBuilder = builder.group(fromParquetRepetition(schemaElement.repetition_type)); - buildChildren((Types.GroupBuilder) childBuilder, schema, schemaElement.num_children, - columnOrders, columnCount); - } - - if (schemaElement.isSetLogicalType()) { - final LogicalTypeAnnotation logicalType = getLogicalTypeAnnotation(schemaElement.logicalType); - ((Types.Builder) childBuilder).as(logicalType); - } else if (schemaElement.isSetConverted_type()) { - final LogicalTypeAnnotation logicalType = getLogicalTypeFromConvertedType( - schemaElement.converted_type, schemaElement); - ((Types.Builder) childBuilder).as(logicalType); - } - - if (schemaElement.isSetField_id()) { - ((Types.Builder) childBuilder).id(schemaElement.field_id); - } - - ((Types.Builder) childBuilder).named(schemaElement.name); - ++columnCount; - } - - } - - private static LogicalTypeAnnotation.TimeUnit convertTimeUnit(TimeUnit unit) throws ParquetFileReaderException { - switch (unit.getSetField()) { - case MICROS: - return LogicalTypeAnnotation.TimeUnit.MICROS; - case MILLIS: - return LogicalTypeAnnotation.TimeUnit.MILLIS; - case NANOS: - return LogicalTypeAnnotation.TimeUnit.NANOS; - default: - throw new ParquetFileReaderException("Unknown time unit " + unit); - } - } - - @Nullable - private static LogicalTypeAnnotation getLogicalTypeAnnotation(@NotNull final LogicalType type) - throws ParquetFileReaderException { - switch (type.getSetField()) { - case MAP: - return LogicalTypeAnnotation.mapType(); - case BSON: - return LogicalTypeAnnotation.bsonType(); - case DATE: - return LogicalTypeAnnotation.dateType(); - case ENUM: - return LogicalTypeAnnotation.enumType(); - case JSON: - return LogicalTypeAnnotation.jsonType(); - case LIST: - return LogicalTypeAnnotation.listType(); - case TIME: - final TimeType time = type.getTIME(); - return LogicalTypeAnnotation.timeType(time.isAdjustedToUTC, convertTimeUnit(time.unit)); - case STRING: - return LogicalTypeAnnotation.stringType(); - case DECIMAL: - final DecimalType decimal = type.getDECIMAL(); - return LogicalTypeAnnotation.decimalType(decimal.scale, decimal.precision); - case INTEGER: - final IntType integer = type.getINTEGER(); - return LogicalTypeAnnotation.intType(integer.bitWidth, integer.isSigned); - case UNKNOWN: - return null; - case TIMESTAMP: - final TimestampType timestamp = type.getTIMESTAMP(); - return LogicalTypeAnnotation.timestampType(timestamp.isAdjustedToUTC, convertTimeUnit(timestamp.unit)); - default: - throw new ParquetFileReaderException("Unknown logical type " + type); - } - } - - private static org.apache.parquet.schema.Type.Repetition fromParquetRepetition(FieldRepetitionType repetition) { - return org.apache.parquet.schema.Type.Repetition.valueOf(repetition.name()); - } - - private static PrimitiveType.PrimitiveTypeName getPrimitive(Type type) throws ParquetFileReaderException { - switch (type) { - case BYTE_ARRAY: // TODO: rename BINARY and remove this switch - return PrimitiveType.PrimitiveTypeName.BINARY; - case INT64: - return PrimitiveType.PrimitiveTypeName.INT64; - case INT32: - return PrimitiveType.PrimitiveTypeName.INT32; - case BOOLEAN: - return PrimitiveType.PrimitiveTypeName.BOOLEAN; - case FLOAT: - return PrimitiveType.PrimitiveTypeName.FLOAT; - case DOUBLE: - return PrimitiveType.PrimitiveTypeName.DOUBLE; - case INT96: - return PrimitiveType.PrimitiveTypeName.INT96; - case FIXED_LEN_BYTE_ARRAY: - return PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; - default: - throw new ParquetFileReaderException("Unknown type " + type); - } - } - - private static org.apache.parquet.schema.ColumnOrder fromParquetColumnOrder(ColumnOrder columnOrder) { - if (columnOrder.isSetTYPE_ORDER()) { - return org.apache.parquet.schema.ColumnOrder.typeDefined(); - } - // The column order is not yet supported by this API - return org.apache.parquet.schema.ColumnOrder.undefined(); - } - - /** - * This method will convert the {@link ConvertedType} to a {@link LogicalTypeAnnotation} and should only be called - * if the logical type is not set in the schema element. - * - * @see Reference for conversions - */ - private static LogicalTypeAnnotation getLogicalTypeFromConvertedType( - final ConvertedType convertedType, - final SchemaElement schemaElement) throws ParquetFileReaderException { - switch (convertedType) { - case UTF8: - return LogicalTypeAnnotation.stringType(); - case MAP: - return LogicalTypeAnnotation.mapType(); - case MAP_KEY_VALUE: - return LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance(); - case LIST: - return LogicalTypeAnnotation.listType(); - case ENUM: - return LogicalTypeAnnotation.enumType(); - case DECIMAL: - final int scale = schemaElement == null ? 0 : schemaElement.scale; - final int precision = schemaElement == null ? 0 : schemaElement.precision; - return LogicalTypeAnnotation.decimalType(scale, precision); - case DATE: - return LogicalTypeAnnotation.dateType(); - case TIME_MILLIS: - return LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MILLIS); - case TIME_MICROS: - return LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MICROS); - case TIMESTAMP_MILLIS: - return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS); - case TIMESTAMP_MICROS: - return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS); - case INTERVAL: - return LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance(); - case INT_8: - return LogicalTypeAnnotation.intType(8, true); - case INT_16: - return LogicalTypeAnnotation.intType(16, true); - case INT_32: - return LogicalTypeAnnotation.intType(32, true); - case INT_64: - return LogicalTypeAnnotation.intType(64, true); - case UINT_8: - return LogicalTypeAnnotation.intType(8, false); - case UINT_16: - return LogicalTypeAnnotation.intType(16, false); - case UINT_32: - return LogicalTypeAnnotation.intType(32, false); - case UINT_64: - return LogicalTypeAnnotation.intType(64, false); - case JSON: - return LogicalTypeAnnotation.jsonType(); - case BSON: - return LogicalTypeAnnotation.bsonType(); - default: - throw new ParquetFileReaderException( - "Can't convert converted type to logical type, unknown converted type " + convertedType); - } + public ParquetMetadata getMetadata() { + return metadata; } public MessageType getSchema() { - return schema; + return metadata.getFileMetaData().getSchema(); } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java index dc99271414b..658d37bc20a 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java @@ -110,8 +110,7 @@ public static Optional parseMetadata(@NotNull final Map>, ParquetInstructions> leafSchemaInfo = - ParquetSchemaReader.convertSchema( - metadataFileReader.getSchema(), - metadataFileMetadata.getFileMetaData().getKeyValueMetaData(), - inputInstructions); - + convertSchema(metadataFileMetadata, inputInstructions); if (channelsProvider.exists(commonMetadataFileURI)) { // Infer the partitioning columns using the common metadata file final ParquetFileReader commonMetadataFileReader = ParquetFileReader.create(commonMetadataFileURI, channelsProvider); final Pair>, ParquetInstructions> fullSchemaInfo = - ParquetSchemaReader.convertSchema( - commonMetadataFileReader.getSchema(), - convertMetadata(commonMetadataFileURI, commonMetadataFileReader, converter) - .getFileMetaData() - .getKeyValueMetaData(), - leafSchemaInfo.getSecond()); + convertSchema(commonMetadataFileReader.getMetadata(), leafSchemaInfo.getSecond()); final Collection> adjustedColumnDefinitions = new ArrayList<>(); final Map> leafDefinitionsMap = leafSchemaInfo.getFirst().stream() @@ -241,6 +229,15 @@ private ParquetMetadataFileLayout( }).collect(Collectors.toList()); } + private static Pair>, ParquetInstructions> convertSchema( + ParquetMetadata metadata, + @NotNull ParquetInstructions readInstructionsIn) { + return ParquetSchemaReader.convertSchema( + metadata.getFileMetaData().getSchema(), + metadata.getFileMetaData().getKeyValueMetaData(), + readInstructionsIn); + } + /** * This method takes the {@link ParquetMetadata} from the metadata file, extracts the key-value metadata specific to * the provided file, and creates a new {@link ParquetMetadata} for this file. @@ -273,16 +270,6 @@ public String toString() { + ']'; } - private static ParquetMetadata convertMetadata(@NotNull final URI uri, - @NotNull final ParquetFileReader fileReader, - @NotNull final ParquetMetadataConverter converter) { - try { - return converter.fromParquetMetadata(fileReader.fileMetaData); - } catch (IOException e) { - throw new TableDataException("Error while converting file metadata from " + uri); - } - } - private static ColumnDefinition adjustPartitionDefinition(@NotNull final ColumnDefinition columnDefinition) { // Primitive booleans should be boxed final Class dataType = columnDefinition.getDataType(); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java index 685d973cf35..e56054fb859 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java @@ -5,19 +5,16 @@ import io.deephaven.engine.table.impl.locations.local.URITableLocationKey; import io.deephaven.parquet.table.ParquetInstructions; -import io.deephaven.engine.table.impl.locations.TableDataException; import io.deephaven.engine.table.impl.locations.TableLocationKey; import io.deephaven.parquet.base.ParquetFileReader; import io.deephaven.util.channel.SeekableChannelsProvider; import io.deephaven.util.channel.SeekableChannelsProviderLoader; import org.apache.commons.io.FilenameUtils; -import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.format.RowGroup; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.io.IOException; import java.net.URI; import java.util.List; import java.util.Map; @@ -148,11 +145,8 @@ public synchronized ParquetMetadata getMetadata() { if (metadata != null) { return metadata; } - try { - return metadata = new ParquetMetadataConverter().fromParquetMetadata(getFileReader().fileMetaData); - } catch (IOException e) { - throw new TableDataException("Failed to convert Parquet file metadata: " + getURI(), e); - } + // No need to cache the metadata; it is already present on the fileReader + return getFileReader().getMetadata(); } /**