Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize parquet footer reader #24007

Conversation

jinyangli34
Copy link
Contributor

@jinyangli34 jinyangli34 commented Nov 2, 2024

Description

Improve efficiency of reading parquet footers with:

  1. Only loads row groups overlaps with offset/length. (if file_offset is set on parquet)
  2. Only load referenced columns.

Additional context and related issues

We hit an issue when reading Iceberg table with very small row group size Parquet files. Even data size is not large (25GB), the query caused entire cluster busy for 1 hour. Most of the workers are busy processing the Parquet footers.

While the root cause is on Iceberg writer (more discussion in apache/iceberg#11258), we found there could be some optimization on Trino Parquet footer reader.

Each Parquet file is 200MB, with 1300 row groups, 80 columns.
Each time, Trino loads Parquet footer with all row groups and all columns, but only reads data from one row group and only reads 1 column.
Total operation is 1300 rounds * 1300 row groups * 80 columns = 135M
After optimization: Operation will be reduced to 1300 * 1 * 1 = 1300.

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

## Section
* Fix some things. ({issue}`issuenumber`)

@cla-bot cla-bot bot added the cla-signed label Nov 2, 2024
@github-actions github-actions bot added hudi Hudi connector iceberg Iceberg connector delta-lake Delta Lake connector hive Hive connector labels Nov 2, 2024
@jinyangli34 jinyangli34 force-pushed the jinyang-optimize_parquet_footer_reader branch 2 times, most recently from 301055d to 1152756 Compare November 6, 2024 18:40
@findinpath
Copy link
Contributor

@jinyangli34 please take some time to make the description of the PR a bit more detailed.

Please put effort into describing as well the business case of this PR.
Where will this optimization come actually into play? Concrete use cases appreciated. ❤️

@jinyangli34
Copy link
Contributor Author

@jinyangli34 please take some time to make the description of the PR a bit more detailed.

Please put effort into describing as well the business case of this PR. Where will this optimization come actually into play? Concrete use cases appreciated. ❤️

Updated more details in context

@jinyangli34 jinyangli34 force-pushed the jinyang-optimize_parquet_footer_reader branch 2 times, most recently from 32ca37b to 2ca2cf8 Compare December 10, 2024 00:27
@@ -119,144 +90,11 @@ public static ParquetMetadata readFooter(ParquetDataSource dataSource, Optional<
InputStream metadataStream = buffer.slice(buffer.length() - completeFooterSize, metadataLength).getInput();

FileMetaData fileMetaData = readFileMetaData(metadataStream);
ParquetMetadata parquetMetadata = createParquetMetadata(fileMetaData, dataSource.getId());
ParquetMetadata parquetMetadata = ParquetMetadata.createParquetMetadata(fileMetaData, dataSource.getId());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static import

MessageType messageType = readParquetSchema(schema);
List<BlockMetadata> blocks = new ArrayList<>();
List<RowGroup> rowGroups = fileMetaData.getRow_groups();
if (rowGroups != null) {
for (RowGroup rowGroup : rowGroups) {
List<ColumnChunk> columns = rowGroup.getColumns();
validateParquet(!columns.isEmpty(), dataSourceId, "No columns in row group: %s", rowGroup);
checkState(!columns.isEmpty(), "No columns in row group: %s [%s]", rowGroup, dataSourceId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is validateParquet changing to checkState ? We should retain existing behaviour of throwing ParquetCorruptionException

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping

@@ -82,12 +82,12 @@ public FileMetrics getFileMetrics()
{
ParquetMetadata parquetMetadata;
try {
parquetMetadata = ParquetMetadata.createParquetMetadata(parquetFileWriter.getFileMetadata(), new ParquetDataSourceId(location.toString()));
parquetMetadata = new ParquetMetadata(parquetFileWriter.getFileMetadata(), new ParquetDataSourceId(location.toString()));
return new FileMetrics(footerMetrics(parquetMetadata, Stream.empty(), metricsConfig), Optional.of(getSplitOffsets(parquetMetadata)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will parse row groups twice now. It may be better to cache the full list of parsed row groups once that is computed in ParquetMetadata

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping


return buildBlocks(paths);
}

public List<BlockMetadata> getBlocks()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to getBlocksWithoutColumnsMetadata

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping

@@ -104,6 +123,9 @@ public List<BlockMetadata> getBlocks()
.map(value -> value.toLowerCase(Locale.ENGLISH))
.toArray(String[]::new);
ColumnPath columnPath = ColumnPath.get(path);
if (!paths.isEmpty() && !paths.contains(columnPath)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This potentially makes io.trino.parquet.metadata.BlockMetadata#getStartingPos incorrect.
If we want to do this, we need to record starting position separately and store that as an explicit field in BlockMetadata

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems BlockMetadata#getStartingPos is only used in write path without any column filtering. it should always contain the first column.
maybe it's better to add a fileOffset entry in RowGroupInfo for more consistent startPos in rowGroup?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upto this commit, it is used in io.trino.parquet.predicate.PredicateUtils#getFilteredRowGroups as well.
Regardless of current usage, we cannot risk exposing an incorrect value here.

@@ -253,7 +253,7 @@ public static ReaderPageSource createPageSource(
start,
length,
dataSource,
parquetMetadata.getBlocks(),
parquetMetadata.getBlocks(descriptorsByPath.values()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than doing this, I think it would be better to send in blocks without column metadata here and provide a way for BlockMetadata to parse columns lazily given a list of required paths.
We should maintain io.trino.parquet.metadata.PrunedBlockMetadata#createPrunedColumnsMetadata as the place where pruning of column metadata happens instead of it happening in two different places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pointer. I added a new commit to consolidate the common logics in PrunedBlockMetadata#createPrunedColumnsMetadata and the ParquetMetadata#getBlocks.

@@ -104,6 +123,9 @@ public List<BlockMetadata> getBlocks()
.map(value -> value.toLowerCase(Locale.ENGLISH))
.toArray(String[]::new);
ColumnPath columnPath = ColumnPath.get(path);
if (!paths.isEmpty() && !paths.contains(columnPath)) {
continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering why this commit helps. We already have io.trino.parquet.metadata.PrunedBlockMetadata#createPrunedColumnsMetadata and this logic is only preventing ColumnChunkMetadata.get. Are you sure this is expensive enough to warrant special handling ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping


public ParquetMetadata(FileMetaData fileMetaData, ParquetDataSourceId dataSourceId)
public ParquetMetadata(FileMetaData fileMetaData, ParquetDataSourceId dataSourceId, Optional<Long> offset, Optional<Long> length)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a record for offset and length and make the argument Optional<OffsetLength>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Reusing the DiskRange

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That change should happen in this commit

@jinyangli34 jinyangli34 force-pushed the jinyang-optimize_parquet_footer_reader branch from 2ca2cf8 to 041670a Compare December 12, 2024 19:32
@jinyangli34 jinyangli34 force-pushed the jinyang-optimize_parquet_footer_reader branch 2 times, most recently from b86f019 to 1fb4ab3 Compare December 13, 2024 19:47
@jinyangli34 jinyangli34 force-pushed the jinyang-optimize_parquet_footer_reader branch from 1fb4ab3 to 933ccdd Compare December 14, 2024 01:20
Copy link
Member

@raunaqmorarka raunaqmorarka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you drop the 3rd and 5th commits entirely from this PR ?
It's not clear to me that we are saving much there and it's complicating the overall changes. I think we could land the remaining part, which is more impactful, more easily by reducing changes in this PR.

MessageType messageType = readParquetSchema(schema);
List<BlockMetadata> blocks = new ArrayList<>();
List<RowGroup> rowGroups = fileMetaData.getRow_groups();
if (rowGroups != null) {
for (RowGroup rowGroup : rowGroups) {
List<ColumnChunk> columns = rowGroup.getColumns();
validateParquet(!columns.isEmpty(), dataSourceId, "No columns in row group: %s", rowGroup);
checkState(!columns.isEmpty(), "No columns in row group: %s [%s]", rowGroup, dataSourceId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping

if (fileMetaData.getKey_value_metadata() == null) {
return ImmutableMap.of();
}
return fileMetaData.getKey_value_metadata().stream().collect(toMap(KeyValue::getKey, KeyValue::getValue));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toMap -> toImmutableMap(KeyValue::getKey, KeyValue::getValue, (first, second) -> second)

@@ -82,12 +82,12 @@ public FileMetrics getFileMetrics()
{
ParquetMetadata parquetMetadata;
try {
parquetMetadata = ParquetMetadata.createParquetMetadata(parquetFileWriter.getFileMetadata(), new ParquetDataSourceId(location.toString()));
parquetMetadata = new ParquetMetadata(parquetFileWriter.getFileMetadata(), new ParquetDataSourceId(location.toString()));
return new FileMetrics(footerMetrics(parquetMetadata, Stream.empty(), metricsConfig), Optional.of(getSplitOffsets(parquetMetadata)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping

@@ -104,6 +123,9 @@ public List<BlockMetadata> getBlocks()
.map(value -> value.toLowerCase(Locale.ENGLISH))
.toArray(String[]::new);
ColumnPath columnPath = ColumnPath.get(path);
if (!paths.isEmpty() && !paths.contains(columnPath)) {
continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping

@@ -104,6 +123,9 @@ public List<BlockMetadata> getBlocks()
.map(value -> value.toLowerCase(Locale.ENGLISH))
.toArray(String[]::new);
ColumnPath columnPath = ColumnPath.get(path);
if (!paths.isEmpty() && !paths.contains(columnPath)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upto this commit, it is used in io.trino.parquet.predicate.PredicateUtils#getFilteredRowGroups as well.
Regardless of current usage, we cannot risk exposing an incorrect value here.


return buildBlocks(paths);
}

public List<BlockMetadata> getBlocks()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping


public ParquetMetadata(FileMetaData fileMetaData, ParquetDataSourceId dataSourceId)
public ParquetMetadata(FileMetaData fileMetaData, ParquetDataSourceId dataSourceId, Optional<Long> offset, Optional<Long> length)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That change should happen in this commit

@raunaqmorarka
Copy link
Member

@jinyangli34 I've extracted some of the commits from here and cleaned them up in #24618

@jinyangli34
Copy link
Contributor Author

@jinyangli34 I've extracted some of the commits from here and cleaned them up in #24618

Thank you for taking care of this @raunaqmorarka !

@jinyangli34 jinyangli34 closed this Jan 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed delta-lake Delta Lake connector hive Hive connector hudi Hudi connector iceberg Iceberg connector performance
Development

Successfully merging this pull request may close these issues.

4 participants