Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: Implement Variant readers #12139

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ public Set<Integer> struct(Types.StructType struct, List<Set<Integer>> fieldResu

@Override
public Set<Integer> field(Types.NestedField field, Set<Integer> fieldResult) {
if ((includeStructIds && field.type().isStructType()) || field.type().isPrimitiveType()) {
if ((includeStructIds && field.type().isStructType())
|| field.type().isPrimitiveType()
|| field.type() instanceof Types.VariantType) {
fieldIds.add(field.fieldId());
}
return fieldIds;
Expand All @@ -72,4 +74,9 @@ public Set<Integer> map(Types.MapType map, Set<Integer> keyResult, Set<Integer>
}
return fieldIds;
}

@Override
public Set<Integer> variant() {
return null;
}
}
36 changes: 20 additions & 16 deletions core/src/main/java/org/apache/iceberg/variants/Variants.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ enum BasicType {
ARRAY
}

public static VariantMetadata emptyMetadata() {
return SerializedMetadata.EMPTY_V1_METADATA;
}

public static VariantMetadata metadata(ByteBuffer metadata) {
return SerializedMetadata.from(metadata);
}
Expand Down Expand Up @@ -209,59 +213,59 @@ public static VariantPrimitive<Void> ofNull() {
return new PrimitiveWrapper<>(PhysicalType.NULL, null);
}

static VariantPrimitive<Boolean> of(boolean value) {
public static VariantPrimitive<Boolean> of(boolean value) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to expose these to create test values in the Parquet package.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be fine to expose it here, but it we could also add some builders in the test module if we want to not have as much exposure. So TestParquet depends on TestCore and core has public builders for Variants.

return new PrimitiveWrapper<>(PhysicalType.BOOLEAN_TRUE, value);
}

static VariantPrimitive<Byte> of(byte value) {
public static VariantPrimitive<Byte> of(byte value) {
return new PrimitiveWrapper<>(PhysicalType.INT8, value);
}

static VariantPrimitive<Short> of(short value) {
public static VariantPrimitive<Short> of(short value) {
return new PrimitiveWrapper<>(PhysicalType.INT16, value);
}

static VariantPrimitive<Integer> of(int value) {
public static VariantPrimitive<Integer> of(int value) {
return new PrimitiveWrapper<>(PhysicalType.INT32, value);
}

static VariantPrimitive<Long> of(long value) {
public static VariantPrimitive<Long> of(long value) {
return new PrimitiveWrapper<>(PhysicalType.INT64, value);
}

static VariantPrimitive<Float> of(float value) {
public static VariantPrimitive<Float> of(float value) {
return new PrimitiveWrapper<>(PhysicalType.FLOAT, value);
}

static VariantPrimitive<Double> of(double value) {
public static VariantPrimitive<Double> of(double value) {
return new PrimitiveWrapper<>(PhysicalType.DOUBLE, value);
}

static VariantPrimitive<Integer> ofDate(int value) {
public static VariantPrimitive<Integer> ofDate(int value) {
return new PrimitiveWrapper<>(PhysicalType.DATE, value);
}

static VariantPrimitive<Integer> ofIsoDate(String value) {
public static VariantPrimitive<Integer> ofIsoDate(String value) {
return ofDate(DateTimeUtil.isoDateToDays(value));
}

static VariantPrimitive<Long> ofTimestamptz(long value) {
public static VariantPrimitive<Long> ofTimestamptz(long value) {
return new PrimitiveWrapper<>(PhysicalType.TIMESTAMPTZ, value);
}

static VariantPrimitive<Long> ofIsoTimestamptz(String value) {
public static VariantPrimitive<Long> ofIsoTimestamptz(String value) {
return ofTimestamptz(DateTimeUtil.isoTimestamptzToMicros(value));
}

static VariantPrimitive<Long> ofTimestampntz(long value) {
public static VariantPrimitive<Long> ofTimestampntz(long value) {
return new PrimitiveWrapper<>(PhysicalType.TIMESTAMPNTZ, value);
}

static VariantPrimitive<Long> ofIsoTimestampntz(String value) {
public static VariantPrimitive<Long> ofIsoTimestampntz(String value) {
return ofTimestampntz(DateTimeUtil.isoTimestampToMicros(value));
}

static VariantPrimitive<BigDecimal> of(BigDecimal value) {
public static VariantPrimitive<BigDecimal> of(BigDecimal value) {
int bitLength = value.unscaledValue().bitLength();
if (bitLength < 32) {
return new PrimitiveWrapper<>(PhysicalType.DECIMAL4, value);
Expand All @@ -274,11 +278,11 @@ static VariantPrimitive<BigDecimal> of(BigDecimal value) {
throw new UnsupportedOperationException("Unsupported decimal precision: " + value.precision());
}

static VariantPrimitive<ByteBuffer> of(ByteBuffer value) {
public static VariantPrimitive<ByteBuffer> of(ByteBuffer value) {
return new PrimitiveWrapper<>(PhysicalType.BINARY, value);
}

static VariantPrimitive<String> of(String value) {
public static VariantPrimitive<String> of(String value) {
return new PrimitiveWrapper<>(PhysicalType.STRING, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.variants;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
Expand All @@ -27,10 +29,55 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;

public class VariantTestUtil {
private VariantTestUtil() {}

public static void assertEqual(VariantMetadata expected, VariantMetadata actual) {
assertThat(actual).isNotNull();
assertThat(expected).isNotNull();
assertThat(actual.dictionarySize())
.as("Dictionary size should match")
.isEqualTo(expected.dictionarySize());

for (int i = 0; i < expected.dictionarySize(); i += 1) {
assertThat(actual.get(i)).isEqualTo(expected.get(i));
}
}

public static void assertEqual(VariantValue expected, VariantValue actual) {
assertThat(actual).isNotNull();
assertThat(expected).isNotNull();
assertThat(actual.type()).as("Variant type should match").isEqualTo(expected.type());

if (expected.type() == Variants.PhysicalType.OBJECT) {
VariantObject expectedObject = expected.asObject();
VariantObject actualObject = actual.asObject();
assertThat(actualObject.numFields())
.as("Variant object num fields should match")
.isEqualTo(expectedObject.numFields());
for (String fieldName : expectedObject.fieldNames()) {
assertEqual(expectedObject.get(fieldName), actualObject.get(fieldName));
}

} else if (expected.type() == Variants.PhysicalType.ARRAY) {
VariantArray expectedArray = expected.asArray();
VariantArray actualArray = actual.asArray();
assertThat(actualArray.numElements())
.as("Variant array num element should match")
.isEqualTo(expectedArray.numElements());
for (int i = 0; i < expectedArray.numElements(); i += 1) {
assertEqual(expectedArray.get(i), actualArray.get(i));
}

} else {
assertThat(actual.asPrimitive().get())
.as("Variant primitive value should match")
.isEqualTo(expected.asPrimitive().get());
}
}

private static byte primitiveHeader(int primitiveType) {
return (byte) (primitiveType << 2);
}
Expand Down Expand Up @@ -60,7 +107,11 @@ static SerializedPrimitive createString(String string) {
return SerializedPrimitive.from(buffer, buffer.get(0));
}

static ByteBuffer createMetadata(Collection<String> fieldNames, boolean sortNames) {
public static ByteBuffer emptyMetadata() {
return createMetadata(ImmutableList.of(), true);
}

public static ByteBuffer createMetadata(Collection<String> fieldNames, boolean sortNames) {
if (fieldNames.isEmpty()) {
return SerializedMetadata.EMPTY_V1_BUFFER;
}
Expand Down Expand Up @@ -108,7 +159,7 @@ static ByteBuffer createMetadata(Collection<String> fieldNames, boolean sortName
return buffer;
}

static ByteBuffer createObject(ByteBuffer metadataBuffer, Map<String, VariantValue> data) {
public static ByteBuffer createObject(ByteBuffer metadataBuffer, Map<String, VariantValue> data) {
// create the metadata to look up field names
VariantMetadata metadata = Variants.metadata(metadataBuffer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.data.parquet;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -26,7 +27,9 @@
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.ParquetVariantVisitor;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
import org.apache.iceberg.parquet.VariantReaderBuilder;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -431,6 +434,16 @@ public ParquetValueReader<?> primitive(
}
}

@Override
public ParquetValueReader<?> variant(Types.VariantType iVariant, ParquetValueReader<?> reader) {
return reader;
}

@Override
public ParquetVariantVisitor<ParquetValueReader<?>> variantVisitor() {
return new VariantReaderBuilder(type, Arrays.asList(currentPath()));
}

MessageType type() {
return type;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.parquet.io.InvalidRecordException;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
Expand Down Expand Up @@ -75,6 +76,32 @@ private static Schema convertInternal(
converter.getAliases());
}

/**
* Returns true if the name identifies a field in the struct/group.
*
* @param group a GroupType
* @param name a String name
* @return true if the group contains a field with the given name
*/
public static boolean hasField(GroupType group, String name) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New helper methods for working with Parquet fields.

return fieldType(group, name) != null;
}

/**
* Returns the Type of the named field in the struct/group, or null.
*
* @param group a GroupType
* @param name a String name
* @return the Type of the field in the group, or null if it is not present.
*/
public static Type fieldType(GroupType group, String name) {
try {
return group.getType(name);
} catch (InvalidRecordException ignored) {
return null;
}
}

public static MessageType pruneColumns(MessageType fileSchema, Schema expectedSchema) {
// column order must match the incoming type, so it doesn't matter that the ids are unordered
Set<Integer> selectedIds = TypeUtil.getProjectedIds(expectedSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ public static ParquetValueReader<Integer> unboxed(ColumnDescriptor desc) {
return new UnboxedReader<>(desc);
}

public static ParquetValueReader<Byte> intsAsByte(ColumnDescriptor desc) {
return new IntAsByteReader(desc);
}

public static ParquetValueReader<Short> intsAsShort(ColumnDescriptor desc) {
return new IntAsShortReader(desc);
}

public static ParquetValueReader<String> strings(ColumnDescriptor desc) {
return new StringReader(desc);
}
Expand Down Expand Up @@ -390,6 +398,28 @@ public String read(String reuse) {
}
}

private static class IntAsByteReader extends UnboxedReader<Byte> {
private IntAsByteReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public Byte read(Byte ignored) {
return (byte) readInteger();
}
}

private static class IntAsShortReader extends UnboxedReader<Short> {
private IntAsShortReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public Short read(Short ignored) {
return (short) readInteger();
}
}

public static class IntAsLongReader extends UnboxedReader<Long> {
public IntAsLongReader(ColumnDescriptor desc) {
super(desc);
Expand Down
Loading
Loading