Skip to content

Commit

Permalink
feat: parse Parquet schema from ParquetMetadataConverter (#6550)
Browse files Browse the repository at this point in the history
This removes the DH's duplication of schema reading logic, and removes
the warning

> DO NOT RELY ON {@link ParquetMetadataConverter} FOR THIS! USE

There has been no tests to indicate that using ParquetMetadataConverter
causes any issues. It may have been true historically, but does not seem
to be true anymore.
  • Loading branch information
devinrsmith authored Jan 16, 2025
1 parent 6682564 commit 50e5408
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 261 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
import io.deephaven.util.channel.SeekableChannelContext;
import io.deephaven.util.channel.SeekableChannelsProvider;
import org.apache.parquet.format.*;
import org.apache.parquet.format.ColumnOrder;
import org.apache.parquet.format.Type;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.File;
import java.io.IOException;
Expand All @@ -31,14 +30,16 @@ public class ParquetFileReader {
private static final int FOOTER_LENGTH_SIZE = 4;
public static final String FILE_URI_SCHEME = "file";

private static final ParquetMetadataConverter PARQUET_METADATA_CONVERTER = new ParquetMetadataConverter();

public final FileMetaData fileMetaData;
private final ParquetMetadata metadata;
private final SeekableChannelsProvider channelsProvider;

/**
* If reading a single parquet file, root URI is the URI of the file, else the parent directory for a metadata file
*/
private final URI rootURI;
private final MessageType schema;

/**
* Make a {@link ParquetFileReader} for the supplied {@link URI}. Wraps {@link IOException} as
Expand Down Expand Up @@ -79,10 +80,18 @@ private ParquetFileReader(
final SeekableByteChannel ch = channelsProvider.getReadChannel(context, parquetFileURI)) {
final int footerLength = positionToFileMetadata(parquetFileURI, ch);
try (final InputStream in = channelsProvider.getInputStream(ch, footerLength)) {
// Ideally, we would be able to get rid of our dependency on the underlying thrift structures, but there
// is a non-trivial chain of usages stemming from fileMetaData. For now, we will create ParquetMetadata
// in a two-step process that preserves the thrift structure.
// metadata = PARQUET_METADATA_CONVERTER.readParquetMetadata(in, ParquetMetadataConverter.NO_FILTER);
fileMetaData = Util.readFileMetaData(in);
}
}
schema = fromParquetSchema(fileMetaData.schema, fileMetaData.column_orders);
try {
metadata = PARQUET_METADATA_CONVERTER.fromParquetMetadata(fileMetaData);
} catch (IOException e) {
throw new IOException("Failed to convert Parquet file metadata: " + parquetFileURI, e);
}
}

/**
Expand Down Expand Up @@ -138,231 +147,16 @@ public RowGroupReader getRowGroup(final int groupNumber, final String version) {
fileMetaData.getRow_groups().get(groupNumber),
channelsProvider,
rootURI,
schema,
getSchema(),
getSchema(),
version);
}

private static MessageType fromParquetSchema(List<SchemaElement> schema, List<ColumnOrder> columnOrders)
throws ParquetFileReaderException {
final Iterator<SchemaElement> iterator = schema.iterator();
final SchemaElement root = iterator.next();
final Types.MessageTypeBuilder builder = Types.buildMessage();
if (root.isSetField_id()) {
builder.id(root.field_id);
}
buildChildren(builder, iterator, root.getNum_children(), columnOrders, 0);
return builder.named(root.name);
}

private static void buildChildren(Types.GroupBuilder builder, Iterator<SchemaElement> schema,
int childrenCount, List<ColumnOrder> columnOrders, int columnCount) throws ParquetFileReaderException {
for (int i = 0; i < childrenCount; ++i) {
SchemaElement schemaElement = schema.next();
Object childBuilder;
if (schemaElement.type != null) {
Types.PrimitiveBuilder primitiveBuilder =
builder.primitive(getPrimitive(schemaElement.type),
fromParquetRepetition(schemaElement.repetition_type));
if (schemaElement.isSetType_length()) {
primitiveBuilder.length(schemaElement.type_length);
}

if (schemaElement.isSetPrecision()) {
primitiveBuilder.precision(schemaElement.precision);
}

if (schemaElement.isSetScale()) {
primitiveBuilder.scale(schemaElement.scale);
}

if (columnOrders != null) {
org.apache.parquet.schema.ColumnOrder columnOrder =
fromParquetColumnOrder(columnOrders.get(columnCount));
if (columnOrder
.getColumnOrderName() == org.apache.parquet.schema.ColumnOrder.ColumnOrderName.TYPE_DEFINED_ORDER
&& (schemaElement.type == Type.INT96
|| schemaElement.converted_type == ConvertedType.INTERVAL)) {
columnOrder = org.apache.parquet.schema.ColumnOrder.undefined();
}

primitiveBuilder.columnOrder(columnOrder);
}

childBuilder = primitiveBuilder;
} else {
childBuilder = builder.group(fromParquetRepetition(schemaElement.repetition_type));
buildChildren((Types.GroupBuilder) childBuilder, schema, schemaElement.num_children,
columnOrders, columnCount);
}

if (schemaElement.isSetLogicalType()) {
final LogicalTypeAnnotation logicalType = getLogicalTypeAnnotation(schemaElement.logicalType);
((Types.Builder) childBuilder).as(logicalType);
} else if (schemaElement.isSetConverted_type()) {
final LogicalTypeAnnotation logicalType = getLogicalTypeFromConvertedType(
schemaElement.converted_type, schemaElement);
((Types.Builder) childBuilder).as(logicalType);
}

if (schemaElement.isSetField_id()) {
((Types.Builder) childBuilder).id(schemaElement.field_id);
}

((Types.Builder) childBuilder).named(schemaElement.name);
++columnCount;
}

}

private static LogicalTypeAnnotation.TimeUnit convertTimeUnit(TimeUnit unit) throws ParquetFileReaderException {
switch (unit.getSetField()) {
case MICROS:
return LogicalTypeAnnotation.TimeUnit.MICROS;
case MILLIS:
return LogicalTypeAnnotation.TimeUnit.MILLIS;
case NANOS:
return LogicalTypeAnnotation.TimeUnit.NANOS;
default:
throw new ParquetFileReaderException("Unknown time unit " + unit);
}
}

@Nullable
private static LogicalTypeAnnotation getLogicalTypeAnnotation(@NotNull final LogicalType type)
throws ParquetFileReaderException {
switch (type.getSetField()) {
case MAP:
return LogicalTypeAnnotation.mapType();
case BSON:
return LogicalTypeAnnotation.bsonType();
case DATE:
return LogicalTypeAnnotation.dateType();
case ENUM:
return LogicalTypeAnnotation.enumType();
case JSON:
return LogicalTypeAnnotation.jsonType();
case LIST:
return LogicalTypeAnnotation.listType();
case TIME:
final TimeType time = type.getTIME();
return LogicalTypeAnnotation.timeType(time.isAdjustedToUTC, convertTimeUnit(time.unit));
case STRING:
return LogicalTypeAnnotation.stringType();
case DECIMAL:
final DecimalType decimal = type.getDECIMAL();
return LogicalTypeAnnotation.decimalType(decimal.scale, decimal.precision);
case INTEGER:
final IntType integer = type.getINTEGER();
return LogicalTypeAnnotation.intType(integer.bitWidth, integer.isSigned);
case UNKNOWN:
return null;
case TIMESTAMP:
final TimestampType timestamp = type.getTIMESTAMP();
return LogicalTypeAnnotation.timestampType(timestamp.isAdjustedToUTC, convertTimeUnit(timestamp.unit));
default:
throw new ParquetFileReaderException("Unknown logical type " + type);
}
}

private static org.apache.parquet.schema.Type.Repetition fromParquetRepetition(FieldRepetitionType repetition) {
return org.apache.parquet.schema.Type.Repetition.valueOf(repetition.name());
}

private static PrimitiveType.PrimitiveTypeName getPrimitive(Type type) throws ParquetFileReaderException {
switch (type) {
case BYTE_ARRAY: // TODO: rename BINARY and remove this switch
return PrimitiveType.PrimitiveTypeName.BINARY;
case INT64:
return PrimitiveType.PrimitiveTypeName.INT64;
case INT32:
return PrimitiveType.PrimitiveTypeName.INT32;
case BOOLEAN:
return PrimitiveType.PrimitiveTypeName.BOOLEAN;
case FLOAT:
return PrimitiveType.PrimitiveTypeName.FLOAT;
case DOUBLE:
return PrimitiveType.PrimitiveTypeName.DOUBLE;
case INT96:
return PrimitiveType.PrimitiveTypeName.INT96;
case FIXED_LEN_BYTE_ARRAY:
return PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
default:
throw new ParquetFileReaderException("Unknown type " + type);
}
}

private static org.apache.parquet.schema.ColumnOrder fromParquetColumnOrder(ColumnOrder columnOrder) {
if (columnOrder.isSetTYPE_ORDER()) {
return org.apache.parquet.schema.ColumnOrder.typeDefined();
}
// The column order is not yet supported by this API
return org.apache.parquet.schema.ColumnOrder.undefined();
}

/**
* This method will convert the {@link ConvertedType} to a {@link LogicalTypeAnnotation} and should only be called
* if the logical type is not set in the schema element.
*
* @see <a href="https://github.com/apache/parquet-format/blob/master/LogicalTypes.md">Reference for conversions</a>
*/
private static LogicalTypeAnnotation getLogicalTypeFromConvertedType(
final ConvertedType convertedType,
final SchemaElement schemaElement) throws ParquetFileReaderException {
switch (convertedType) {
case UTF8:
return LogicalTypeAnnotation.stringType();
case MAP:
return LogicalTypeAnnotation.mapType();
case MAP_KEY_VALUE:
return LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance();
case LIST:
return LogicalTypeAnnotation.listType();
case ENUM:
return LogicalTypeAnnotation.enumType();
case DECIMAL:
final int scale = schemaElement == null ? 0 : schemaElement.scale;
final int precision = schemaElement == null ? 0 : schemaElement.precision;
return LogicalTypeAnnotation.decimalType(scale, precision);
case DATE:
return LogicalTypeAnnotation.dateType();
case TIME_MILLIS:
return LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MILLIS);
case TIME_MICROS:
return LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MICROS);
case TIMESTAMP_MILLIS:
return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS);
case TIMESTAMP_MICROS:
return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS);
case INTERVAL:
return LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance();
case INT_8:
return LogicalTypeAnnotation.intType(8, true);
case INT_16:
return LogicalTypeAnnotation.intType(16, true);
case INT_32:
return LogicalTypeAnnotation.intType(32, true);
case INT_64:
return LogicalTypeAnnotation.intType(64, true);
case UINT_8:
return LogicalTypeAnnotation.intType(8, false);
case UINT_16:
return LogicalTypeAnnotation.intType(16, false);
case UINT_32:
return LogicalTypeAnnotation.intType(32, false);
case UINT_64:
return LogicalTypeAnnotation.intType(64, false);
case JSON:
return LogicalTypeAnnotation.jsonType();
case BSON:
return LogicalTypeAnnotation.bsonType();
default:
throw new ParquetFileReaderException(
"Can't convert converted type to logical type, unknown converted type " + convertedType);
}
public ParquetMetadata getMetadata() {
return metadata;
}

public MessageType getSchema() {
return schema;
return metadata.getFileMetaData().getSchema();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ public static Optional<TableInfo> parseMetadata(@NotNull final Map<String, Strin
/**
* Obtain schema information from a parquet file
*
* @param schema Parquet schema. DO NOT RELY ON {@link ParquetMetadataConverter} FOR THIS! USE
* {@link ParquetFileReader}!
* @param schema Parquet schema
* @param keyValueMetadata Parquet key-value metadata map
* @param readInstructions Parquet read instructions specifying transformations like column mappings and codecs.
* Note a new read instructions based on this one may be returned by this method to provide necessary
Expand Down Expand Up @@ -274,8 +273,7 @@ public static ParquetInstructions readParquetSchema(
/**
* Convert schema information from a {@link ParquetMetadata} into {@link ColumnDefinition ColumnDefinitions}.
*
* @param schema Parquet schema. DO NOT RELY ON {@link ParquetMetadataConverter} FOR THIS! USE
* {@link ParquetFileReader}!
* @param schema Parquet schema
* @param keyValueMetadata Parquet key-value metadata map
* @param readInstructionsIn Input conversion {@link ParquetInstructions}
* @return A {@link Pair} with {@link ColumnDefinition ColumnDefinitions} and adjusted {@link ParquetInstructions}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@
import io.deephaven.util.mutable.MutableInt;
import io.deephaven.util.type.TypeUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.format.RowGroup;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand Down Expand Up @@ -119,27 +117,17 @@ private ParquetMetadataFileLayout(
throw new TableDataException(String.format("Parquet metadata file %s does not exist", metadataFileURI));
}
final ParquetFileReader metadataFileReader = ParquetFileReader.create(metadataFileURI, channelsProvider);
final ParquetMetadataConverter converter = new ParquetMetadataConverter();
final ParquetMetadata metadataFileMetadata = convertMetadata(metadataFileURI, metadataFileReader, converter);
final ParquetMetadata metadataFileMetadata = metadataFileReader.getMetadata();
if (inputInstructions.getTableDefinition().isEmpty()) {
// Infer the definition from the metadata file
final Pair<List<ColumnDefinition<?>>, ParquetInstructions> leafSchemaInfo =
ParquetSchemaReader.convertSchema(
metadataFileReader.getSchema(),
metadataFileMetadata.getFileMetaData().getKeyValueMetaData(),
inputInstructions);

convertSchema(metadataFileMetadata, inputInstructions);
if (channelsProvider.exists(commonMetadataFileURI)) {
// Infer the partitioning columns using the common metadata file
final ParquetFileReader commonMetadataFileReader =
ParquetFileReader.create(commonMetadataFileURI, channelsProvider);
final Pair<List<ColumnDefinition<?>>, ParquetInstructions> fullSchemaInfo =
ParquetSchemaReader.convertSchema(
commonMetadataFileReader.getSchema(),
convertMetadata(commonMetadataFileURI, commonMetadataFileReader, converter)
.getFileMetaData()
.getKeyValueMetaData(),
leafSchemaInfo.getSecond());
convertSchema(commonMetadataFileReader.getMetadata(), leafSchemaInfo.getSecond());
final Collection<ColumnDefinition<?>> adjustedColumnDefinitions = new ArrayList<>();
final Map<String, ColumnDefinition<?>> leafDefinitionsMap =
leafSchemaInfo.getFirst().stream()
Expand Down Expand Up @@ -241,6 +229,15 @@ private ParquetMetadataFileLayout(
}).collect(Collectors.toList());
}

private static Pair<List<ColumnDefinition<?>>, ParquetInstructions> convertSchema(
ParquetMetadata metadata,
@NotNull ParquetInstructions readInstructionsIn) {
return ParquetSchemaReader.convertSchema(
metadata.getFileMetaData().getSchema(),
metadata.getFileMetaData().getKeyValueMetaData(),
readInstructionsIn);
}

/**
* This method takes the {@link ParquetMetadata} from the metadata file, extracts the key-value metadata specific to
* the provided file, and creates a new {@link ParquetMetadata} for this file.
Expand Down Expand Up @@ -273,16 +270,6 @@ public String toString() {
+ ']';
}

private static ParquetMetadata convertMetadata(@NotNull final URI uri,
@NotNull final ParquetFileReader fileReader,
@NotNull final ParquetMetadataConverter converter) {
try {
return converter.fromParquetMetadata(fileReader.fileMetaData);
} catch (IOException e) {
throw new TableDataException("Error while converting file metadata from " + uri);
}
}

private static ColumnDefinition<?> adjustPartitionDefinition(@NotNull final ColumnDefinition<?> columnDefinition) {
// Primitive booleans should be boxed
final Class<?> dataType = columnDefinition.getDataType();
Expand Down
Loading

0 comments on commit 50e5408

Please sign in to comment.