Skip to content

Commit

Permalink
[HUDI-8681] Unifying supported cols across col stats and partition st…
Browse files Browse the repository at this point in the history
…ats index (#12638)
  • Loading branch information
nsivabalan authored Jan 17, 2025
1 parent dee6274 commit baf141a
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
Expand Down Expand Up @@ -65,7 +66,8 @@ public static void updateColsToIndex(HoodieTable dataTable, HoodieWriteConfig co
HoodieCommitMetadata.class);
if (mdtCommitMetadata.getPartitionToWriteStats().containsKey(MetadataPartitionType.COLUMN_STATS.getPartitionPath())) {
// update data table's table config for list of columns indexed.
List<String> columnsToIndex = HoodieTableMetadataUtil.getColumnsToIndex(commitMetadata, dataTable.getMetaClient(), config.getMetadataConfig());
List<String> columnsToIndex = HoodieTableMetadataUtil.getColumnsToIndex(commitMetadata, dataTable.getMetaClient(), config.getMetadataConfig(),
Option.of(config.getRecordMerger().getRecordType()));
// if col stats is getting updated, lets also update list of columns indexed if changed.
updateColSatsFunc.apply(dataTable.getMetaClient(), columnsToIndex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Either;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
Expand Down Expand Up @@ -440,7 +441,7 @@ private void initializeFromFilesystem(String initializationTime, List<MetadataPa
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key());
continue;
}
fileGroupCountAndRecordsPair = initializePartitionStatsIndex(partitionInfoList);
fileGroupCountAndRecordsPair = initializePartitionStatsIndex();
partitionName = PARTITION_STATS.getPartitionPath();
break;
case SECONDARY_INDEX:
Expand Down Expand Up @@ -524,17 +525,18 @@ private String generateUniqueInstantTime(String initializationTime) {
}
}

private Pair<Integer, HoodieData<HoodieRecord>> initializePartitionStatsIndex(List<DirectoryInfo> partitionInfoList) throws IOException {
private Pair<Integer, HoodieData<HoodieRecord>> initializePartitionStatsIndex() throws IOException {
HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(engineContext, getPartitionFileSlicePairs(), dataWriteConfig.getMetadataConfig(), dataMetaClient,
Option.of(new Schema.Parser().parse(dataWriteConfig.getWriteSchema())));
Option.of(new Schema.Parser().parse(dataWriteConfig.getWriteSchema())), Option.of(dataWriteConfig.getRecordMerger().getRecordType()));
final int fileGroupCount = dataWriteConfig.getMetadataConfig().getPartitionStatsIndexFileGroupCount();
return Pair.of(fileGroupCount, records);
}

private Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> initializeColumnStatsPartition(Map<String, Map<String, Long>> partitionToFilesMap) {
// Find the columns to index
final List<String> columnsToIndex = HoodieTableMetadataUtil.getColumnsToIndex(dataMetaClient.getTableConfig(),
dataWriteConfig.getMetadataConfig(), Lazy.lazily(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient)), true);
dataWriteConfig.getMetadataConfig(), Either.right(Lazy.lazily(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient))), true,
Option.of(dataWriteConfig.getRecordMerger().getRecordType()));

final int fileGroupCount = dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
if (columnsToIndex.isEmpty()) {
Expand Down Expand Up @@ -1090,7 +1092,8 @@ public void update(HoodieCommitMetadata commitMetadata, String instantTime) {
engineContext, dataWriteConfig, commitMetadata, instantTime, dataMetaClient, getTableMetadata(),
dataWriteConfig.getMetadataConfig(),
enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
dataWriteConfig.getBloomIndexParallelism(), dataWriteConfig.getWritesFileIdEncoding(), getEngineType());
dataWriteConfig.getBloomIndexParallelism(), dataWriteConfig.getWritesFileIdEncoding(), getEngineType(),
Option.of(dataWriteConfig.getRecordMerger().getRecordType()));

// Updates for record index are created by parsing the WriteStatus which is a hudi-client object. Hence, we cannot yet move this code
// to the HoodieTableMetadataUtil class in hudi-common.
Expand All @@ -1111,7 +1114,8 @@ public void update(HoodieCommitMetadata commitMetadata, HoodieData<HoodieRecord>
Map<String, HoodieData<HoodieRecord>> partitionToRecordMap =
HoodieTableMetadataUtil.convertMetadataToRecords(
engineContext, dataWriteConfig, commitMetadata, instantTime, dataMetaClient, getTableMetadata(), dataWriteConfig.getMetadataConfig(),
enabledPartitionTypes, dataWriteConfig.getBloomFilterType(), dataWriteConfig.getBloomIndexParallelism(), dataWriteConfig.getWritesFileIdEncoding(), getEngineType());
enabledPartitionTypes, dataWriteConfig.getBloomFilterType(), dataWriteConfig.getBloomIndexParallelism(), dataWriteConfig.getWritesFileIdEncoding(), getEngineType(),
Option.of(dataWriteConfig.getRecordMerger().getRecordType()));
HoodieData<HoodieRecord> additionalUpdates = getRecordIndexAdditionalUpserts(records, commitMetadata);
partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), records.union(additionalUpdates));
updateExpressionIndexIfPresent(commitMetadata, instantTime, partitionToRecordMap);
Expand Down Expand Up @@ -1231,7 +1235,7 @@ private static List<Pair<String, Pair<String, List<String>>>> getPartitionFilePa
public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
cleanMetadata, instantTime, dataMetaClient, dataWriteConfig.getMetadataConfig(), enabledPartitionTypes,
dataWriteConfig.getBloomIndexParallelism()));
dataWriteConfig.getBloomIndexParallelism(), Option.of(dataWriteConfig.getRecordMerger().getRecordType())));
closeInternal();
}

Expand Down
Loading

0 comments on commit baf141a

Please sign in to comment.