Skip to content

Commit

Permalink
read row groups matching offset & length
Browse files Browse the repository at this point in the history
  • Loading branch information
jinyangli34 committed Dec 9, 2024
1 parent eacb869 commit 32ca37b
Show file tree
Hide file tree
Showing 20 changed files with 103 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.parquet.ParquetMetadataConverter.convertEncodingStats;
Expand All @@ -66,12 +67,18 @@ public class ParquetMetadata
private final FileMetaData fileMetaData;
private final ParquetDataSourceId dataSourceId;
private final FileMetadata parquetMetadata;
private final Optional<Long> offset;
private final Optional<Long> length;

public ParquetMetadata(FileMetaData fileMetaData, ParquetDataSourceId dataSourceId)
public ParquetMetadata(FileMetaData fileMetaData, ParquetDataSourceId dataSourceId, Optional<Long> offset, Optional<Long> length)
throws ParquetCorruptionException
{
this.fileMetaData = requireNonNull(fileMetaData, "fileMetaData is null");
this.dataSourceId = requireNonNull(dataSourceId, "dataSourceId is null");
this.offset = requireNonNull(offset, "offset is null");
this.length = requireNonNull(length, "length is null");
checkArgument(offset.isEmpty() && length.isEmpty() || (offset.isPresent() && length.isPresent()),
"Both offset and length must be present or absent");
this.parquetMetadata = new FileMetadata(readMessageType(), keyValueMetaData(fileMetaData), fileMetaData.getCreated_by());
}

Expand All @@ -98,7 +105,7 @@ public List<BlockMetadata> getBlocks(Collection<ColumnDescriptor> columnDescript

public List<BlockMetadata> getBlocks()
{
return getBlocks(ImmutableSet.of());
return buildBlocks(ImmutableSet.of());
}

private List<BlockMetadata> buildBlocks(Set<ColumnPath> paths)
Expand All @@ -108,10 +115,20 @@ private List<BlockMetadata> buildBlocks(Set<ColumnPath> paths)
List<BlockMetadata> blocks = new ArrayList<>();
List<RowGroup> rowGroups = fileMetaData.getRow_groups();
if (rowGroups != null) {
for (RowGroup rowGroup : rowGroups) {
for (int i = 0; i < rowGroups.size(); i++) {
RowGroup rowGroup = rowGroups.get(i);
List<ColumnChunk> columns = rowGroup.getColumns();
checkState(!columns.isEmpty(), "No columns in row group: %s [%s]", rowGroup, dataSourceId);
String filePath = columns.get(0).getFile_path();

if (offset.isPresent() && length.isPresent() && rowGroup.isSetFile_offset()) {
if (rowGroup.file_offset >= offset.get() + length.get()) {
break;
}
if (i < rowGroups.size() - 1 && rowGroups.get(i + 1).isSetFile_offset() && offset.get() >= rowGroups.get(i + 1).file_offset) {
continue;
}
}
ImmutableList.Builder<ColumnChunkMetadata> columnMetadataBuilder = ImmutableList.builderWithExpectedSize(columns.size());
for (ColumnChunk columnChunk : columns) {
checkState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/
package io.trino.parquet.reader;

import io.airlift.log.Logger;
import com.google.common.annotations.VisibleForTesting;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.parquet.ParquetCorruptionException;
Expand Down Expand Up @@ -44,16 +44,15 @@

public final class MetadataReader
{
private static final Logger log = Logger.get(MetadataReader.class);

private static final Slice MAGIC = Slices.utf8Slice("PAR1");
private static final int POST_SCRIPT_SIZE = Integer.BYTES + MAGIC.length();
@VisibleForTesting
static final int POST_SCRIPT_SIZE = Integer.BYTES + MAGIC.length();
// Typical 1GB files produced by Trino were found to have footer size between 30-40KB
private static final int EXPECTED_FOOTER_SIZE = 48 * 1024;

private MetadataReader() {}

public static ParquetMetadata readFooter(ParquetDataSource dataSource, Optional<ParquetWriteValidation> parquetWriteValidation)
public static ParquetMetadata readFooter(ParquetDataSource dataSource, Optional<ParquetWriteValidation> parquetWriteValidation, Optional<Long> offset, Optional<Long> length)
throws IOException
{
// Parquet File Layout:
Expand Down Expand Up @@ -90,7 +89,7 @@ public static ParquetMetadata readFooter(ParquetDataSource dataSource, Optional<
InputStream metadataStream = buffer.slice(buffer.length() - completeFooterSize, metadataLength).getInput();

FileMetaData fileMetaData = readFileMetaData(metadataStream);
ParquetMetadata parquetMetadata = new ParquetMetadata(fileMetaData, dataSource.getId());
ParquetMetadata parquetMetadata = new ParquetMetadata(fileMetaData, dataSource.getId(), offset, length);
validateFileMetadata(dataSource.getId(), parquetMetadata.getFileMetaData(), parquetWriteValidation);
return parquetMetadata;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public void validate(ParquetDataSource input)
checkState(validationBuilder.isPresent(), "validation is not enabled");
ParquetWriteValidation writeValidation = validationBuilder.get().build();
try {
ParquetMetadata parquetMetadata = MetadataReader.readFooter(input, Optional.of(writeValidation));
ParquetMetadata parquetMetadata = MetadataReader.readFooter(input, Optional.of(writeValidation), Optional.empty(), Optional.empty());
try (ParquetReader parquetReader = createParquetReader(input, parquetMetadata, writeValidation)) {
for (Page page = parquetReader.nextPage(); page != null; page = parquetReader.nextPage()) {
// fully load the page
Expand Down Expand Up @@ -350,7 +350,7 @@ private void flush()
columnMetaDataBuilder.add(columnMetaData);
currentOffset += columnMetaData.getTotal_compressed_size();
}
updateRowGroups(columnMetaDataBuilder.build());
updateRowGroups(columnMetaDataBuilder.build(), outputStream.longSize());

// flush pages
for (BufferData bufferData : bufferDataList) {
Expand Down Expand Up @@ -409,12 +409,14 @@ private void writeBloomFilters(List<RowGroup> rowGroups, List<List<Optional<Bloo
}
}

private void updateRowGroups(List<ColumnMetaData> columnMetaData)
private void updateRowGroups(List<ColumnMetaData> columnMetaData, long fileOffset)
{
long totalCompressedBytes = columnMetaData.stream().mapToLong(ColumnMetaData::getTotal_compressed_size).sum();
long totalBytes = columnMetaData.stream().mapToLong(ColumnMetaData::getTotal_uncompressed_size).sum();
ImmutableList<org.apache.parquet.format.ColumnChunk> columnChunks = columnMetaData.stream().map(ParquetWriter::toColumnChunk).collect(toImmutableList());
fileFooter.addRowGroup(new RowGroup(columnChunks, totalBytes, rows).setTotal_compressed_size(totalCompressedBytes));
fileFooter.addRowGroup(new RowGroup(columnChunks, totalBytes, rows)
.setTotal_compressed_size(totalCompressedBytes)
.setFile_offset(fileOffset));
}

private static Slice serializeFooter(FileMetaData fileMetaData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public void setup()
testData.getColumnNames(),
testData.getPages()),
new ParquetReaderOptions());
parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty());
columnNames = columns.stream()
.map(TpchColumn::getColumnName)
.collect(toImmutableList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void testReadFloatDouble()
ParquetDataSource dataSource = new FileParquetDataSource(
new File(Resources.getResource("byte_stream_split_float_and_double.parquet").toURI()),
new ParquetReaderOptions());
ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty());
ParquetReader reader = createParquetReader(dataSource, parquetMetadata, newSimpleAggregatedMemoryContext(), types, columnNames);

readAndCompare(reader, getExpectedValues());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void testNanosOutsideDayRange()
ParquetDataSource dataSource = new FileParquetDataSource(
new File(Resources.getResource("int96_timestamps_nanos_outside_day_range.parquet").toURI()),
new ParquetReaderOptions());
ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty());
ParquetReader reader = createParquetReader(dataSource, parquetMetadata, newSimpleAggregatedMemoryContext(), types, columnNames);

Page page = reader.nextPage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.Resources;
import io.airlift.units.DataSize;
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.parquet.ParquetDataSource;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.metadata.BlockMetadata;
import io.trino.parquet.metadata.ParquetMetadata;
import io.trino.parquet.writer.ParquetWriterOptions;
import io.trino.spi.Page;
Expand All @@ -36,6 +38,8 @@
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.Type;
import io.trino.testing.TestingConnectorSession;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.Types;
import org.junit.jupiter.api.Test;

import java.io.File;
Expand All @@ -52,11 +56,13 @@
import static io.trino.parquet.ParquetTestUtils.createParquetReader;
import static io.trino.parquet.ParquetTestUtils.generateInputPages;
import static io.trino.parquet.ParquetTestUtils.writeParquetFile;
import static io.trino.parquet.reader.MetadataReader.POST_SCRIPT_SIZE;
import static io.trino.parquet.reader.ParquetReader.COLUMN_INDEX_ROWS_FILTERED;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.DateType.DATE;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand All @@ -79,7 +85,7 @@ public void testColumnReaderMemoryUsage()
columnNames,
generateInputPages(types, 100, 5)),
new ParquetReaderOptions());
ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty());
assertThat(parquetMetadata.getBlocks().size()).isGreaterThan(1);
// Verify file has only non-dictionary encodings as dictionary memory usage is already tested in TestFlatColumnReader#testMemoryUsage
parquetMetadata.getBlocks().forEach(block -> {
Expand Down Expand Up @@ -132,7 +138,7 @@ public void testEmptyRowRangesWithColumnIndex()
ParquetDataSource dataSource = new FileParquetDataSource(
new File(Resources.getResource("lineitem_sorted_by_shipdate/data.parquet").toURI()),
new ParquetReaderOptions());
ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty());
assertThat(parquetMetadata.getBlocks()).hasSize(2);
// The predicate and the file are prepared so that page indexes will result in non-overlapping row ranges and eliminate the entire first row group
// while the second row group still has to be read
Expand Down Expand Up @@ -186,14 +192,55 @@ public void testBackwardsCompatibleRepeatedPrimitiveFieldDefinedAsPrimitive()
.isInstanceOf(TrinoException.class);
}

@Test
public void testOffsetColumnFilter()
throws IOException
{
// Write a file with 100 rows per row-group
List<String> columnNames = ImmutableList.of("columna", "columnb");
List<Type> types = ImmutableList.of(INTEGER, BIGINT);

ParquetDataSource dataSource = new TestingParquetDataSource(
writeParquetFile(
ParquetWriterOptions.builder()
.setMaxBlockSize(DataSize.ofBytes(1000))
.build(),
types,
columnNames,
generateInputPages(types, 100, 5)),
new ParquetReaderOptions());

long estimatedSize = dataSource.getEstimatedSize();
long estimatedDataSize = estimatedSize - POST_SCRIPT_SIZE - dataSource.readFully(0, (int) estimatedSize).getInt((int) estimatedSize - POST_SCRIPT_SIZE);

// Read single column, 1 row group
ColumnDescriptor columnDescriptor = new ColumnDescriptor(new String[]{"columnb"}, Types.optional(INT64).named(""), 0, 0);
ParquetMetadata parquetMetadata1 = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.of(100L), Optional.of(101L));
List<BlockMetadata> columnBlocks1 = parquetMetadata1.getBlocks(ImmutableSet.of(columnDescriptor));
assertThat(columnBlocks1.stream().allMatch(block -> block.columns().size() == 1)).isTrue();
assertThat(columnBlocks1.stream().mapToLong(BlockMetadata::rowCount).sum()).isEqualTo(100);

// Read both columns, half row groups
ParquetMetadata parquetMetadata2 = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.of(0L), Optional.of(estimatedDataSize / 2));
List<BlockMetadata> columnBlocks2 = parquetMetadata2.getBlocks();
assertThat(columnBlocks2.stream().allMatch(block -> block.columns().size() == 2)).isTrue();
assertThat(columnBlocks2.stream().mapToLong(BlockMetadata::rowCount).sum()).isEqualTo(300);

// Read both columns, all row groups
ParquetMetadata parquetMetadata3 = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty());
List<BlockMetadata> columnBlocks3 = parquetMetadata3.getBlocks();
assertThat(columnBlocks3.stream().allMatch(block -> block.columns().size() == 2)).isTrue();
assertThat(columnBlocks3.stream().mapToLong(BlockMetadata::rowCount).sum()).isEqualTo(500);
}

private void testReadingOldParquetFiles(File file, List<String> columnNames, Type columnType, List<?> expectedValues)
throws IOException
{
ParquetDataSource dataSource = new FileParquetDataSource(
file,
new ParquetReaderOptions());
ConnectorSession session = TestingConnectorSession.builder().build();
ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty());
try (ParquetReader reader = createParquetReader(dataSource, parquetMetadata, newSimpleAggregatedMemoryContext(), ImmutableList.of(columnType), columnNames)) {
Page page = reader.nextPage();
Iterator<?> expected = expectedValues.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private void testTimeMillsInt32(TimeType timeType)
ParquetDataSource dataSource = new FileParquetDataSource(
new File(Resources.getResource("time_millis_int32.snappy.parquet").toURI()),
new ParquetReaderOptions());
ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty());
ParquetReader reader = createParquetReader(dataSource, parquetMetadata, newSimpleAggregatedMemoryContext(), types, columnNames);

Page page = reader.nextPage();
Expand Down
Loading

0 comments on commit 32ca37b

Please sign in to comment.