Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed Feb 4, 2025
1 parent 4945ec1 commit 2b61d23
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 44 deletions.
24 changes: 0 additions & 24 deletions core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -134,27 +133,4 @@ private static Collection<PartitionStats> mergeStats(

return statsMap.values();
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
public static boolean isEqual(
Comparator<StructLike> partitionComparator, PartitionStats stats1, PartitionStats stats2) {
if (stats1 == stats2) {
return true;
} else if (stats1 == null || stats2 == null) {
return false;
}

return partitionComparator.compare(stats1.partition(), stats2.partition()) == 0
&& stats1.specId() == stats2.specId()
&& stats1.dataRecordCount() == stats2.dataRecordCount()
&& stats1.dataFileCount() == stats2.dataFileCount()
&& stats1.totalDataFileSizeInBytes() == stats2.totalDataFileSizeInBytes()
&& stats1.positionDeleteRecordCount() == stats2.positionDeleteRecordCount()
&& stats1.positionDeleteFileCount() == stats2.positionDeleteFileCount()
&& stats1.equalityDeleteRecordCount() == stats2.equalityDeleteRecordCount()
&& stats1.equalityDeleteFileCount() == stats2.equalityDeleteFileCount()
&& stats1.totalRecordCount() == stats2.totalRecordCount()
&& Objects.equals(stats1.lastUpdatedAt(), stats2.lastUpdatedAt())
&& Objects.equals(stats1.lastUpdatedSnapshotId(), stats2.lastUpdatedSnapshotId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -59,7 +58,7 @@
* Computes, writes and reads the {@link PartitionStatisticsFile}. Uses generic readers and writers
* to support writing and reading of the stats in table default format.
*/
public final class PartitionStatsHandler {
public class PartitionStatsHandler {

private PartitionStatsHandler() {}

Expand Down Expand Up @@ -97,7 +96,7 @@ public int id() {
* @return a schema that corresponds to the provided unified partition type.
*/
public static Schema schema(StructType partitionType) {
Preconditions.checkState(!partitionType.fields().isEmpty(), "table must be partitioned");
Preconditions.checkState(!partitionType.fields().isEmpty(), "Table must be partitioned");
return new Schema(
NestedField.required(1, Column.PARTITION.name(), partitionType),
NestedField.required(2, Column.SPEC_ID.name(), IntegerType.get()),
Expand All @@ -117,9 +116,10 @@ public static Schema schema(StructType partitionType) {
* Computes and writes the {@link PartitionStatisticsFile} for a given table's current snapshot.
*
* @param table The {@link Table} for which the partition statistics is computed.
* @return {@link PartitionStatisticsFile} for the current snapshot.
* @return {@link PartitionStatisticsFile} for the current snapshot, or null if no statistics are
* present.
*/
public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) {
public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) throws IOException {
return computeAndWriteStatsFile(table, null);
}

Expand All @@ -128,9 +128,11 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) {
*
* @param table The {@link Table} for which the partition statistics is computed.
* @param branch A branch information to select the required snapshot.
* @return {@link PartitionStatisticsFile} for the given branch.
* @return {@link PartitionStatisticsFile} for the given branch, or null if no statistics are
* present.
*/
public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, String branch) {
public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, String branch)
throws IOException {
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
if (currentSnapshot == null) {
Preconditions.checkArgument(
Expand All @@ -140,20 +142,23 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, Stri

StructType partitionType = Partitioning.partitionType(table);
Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table, currentSnapshot);
if (stats.isEmpty()) {
return null;
}

List<PartitionStats> sortedStats = PartitionStatsUtil.sortStats(stats, partitionType);
return writePartitionStatsFile(
table, currentSnapshot.snapshotId(), schema(partitionType), sortedStats.iterator());
}

@VisibleForTesting
static PartitionStatisticsFile writePartitionStatsFile(
Table table, long snapshotId, Schema dataSchema, Iterator<PartitionStats> records) {
Table table, long snapshotId, Schema dataSchema, Iterator<PartitionStats> records)
throws IOException {
OutputFile outputFile = newPartitionStatsFile(table, snapshotId);

try (DataWriter<StructLike> writer = dataWriter(dataSchema, outputFile); ) {
try (DataWriter<StructLike> writer = dataWriter(dataSchema, outputFile)) {
records.forEachRemaining(writer::write);
} catch (IOException e) {
throw new UncheckedIOException(e);
}

return ImmutableGenericPartitionStatisticsFile.builder()
Expand All @@ -180,6 +185,9 @@ private static FileFormat fileFormat(String fileLocation) {
}

private static OutputFile newPartitionStatsFile(Table table, long snapshotId) {
Preconditions.checkArgument(
table instanceof HasTableOperations,
"Table must have operations to retrieve metadata location");
FileFormat fileFormat =
fileFormat(
table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
Expand All @@ -46,7 +47,6 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.PartitionStats;
import org.apache.iceberg.PartitionStatsUtil;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
Expand Down Expand Up @@ -104,12 +104,7 @@ public void testPartitionStatsOnEmptyTable() throws Exception {
public void testPartitionStatsOnEmptyBranch() throws Exception {
Table testTable = TestTables.create(tempDir("empty_branch"), "empty_branch", SCHEMA, SPEC, 2);
testTable.manageSnapshots().createBranch("b1").commit();
PartitionStatisticsFile partitionStatisticsFile =
PartitionStatsHandler.computeAndWriteStatsFile(testTable, "b1");
// creates an empty stats file since the dummy snapshot exist
assertThat(partitionStatisticsFile.fileSizeInBytes()).isEqualTo(0L);
assertThat(partitionStatisticsFile.snapshotId())
.isEqualTo(testTable.refs().get("b1").snapshotId());
assertThat(PartitionStatsHandler.computeAndWriteStatsFile(testTable, "b1")).isNull();
}

@Test
Expand Down Expand Up @@ -227,7 +222,7 @@ public void testAllDatatypePartitionWriting() throws Exception {
assertThat(written).hasSize(expected.size());
Comparator<StructLike> comparator = Comparators.forType(partitionSchema);
for (int i = 0; i < written.size(); i++) {
assertThat(PartitionStatsUtil.isEqual(comparator, written.get(i), expected.get(i))).isTrue();
assertThat(isEqual(comparator, written.get(i), expected.get(i))).isTrue();
}
}

Expand Down Expand Up @@ -297,7 +292,7 @@ public void testOptionalFieldsWriting() throws Exception {
assertThat(written).hasSize(expected.size());
Comparator<StructLike> comparator = Comparators.forType(partitionSchema);
for (int i = 0; i < written.size(); i++) {
assertThat(PartitionStatsUtil.isEqual(comparator, written.get(i), expected.get(i))).isTrue();
assertThat(isEqual(comparator, written.get(i), expected.get(i))).isTrue();
}
}

Expand Down Expand Up @@ -579,4 +574,27 @@ private static PositionDelete<GenericRecord> positionDelete(
private File tempDir(String folderName) throws IOException {
return java.nio.file.Files.createTempDirectory(temp.toPath(), folderName).toFile();
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
private static boolean isEqual(
Comparator<StructLike> partitionComparator, PartitionStats stats1, PartitionStats stats2) {
if (stats1 == stats2) {
return true;
} else if (stats1 == null || stats2 == null) {
return false;
}

return partitionComparator.compare(stats1.partition(), stats2.partition()) == 0
&& stats1.specId() == stats2.specId()
&& stats1.dataRecordCount() == stats2.dataRecordCount()
&& stats1.dataFileCount() == stats2.dataFileCount()
&& stats1.totalDataFileSizeInBytes() == stats2.totalDataFileSizeInBytes()
&& stats1.positionDeleteRecordCount() == stats2.positionDeleteRecordCount()
&& stats1.positionDeleteFileCount() == stats2.positionDeleteFileCount()
&& stats1.equalityDeleteRecordCount() == stats2.equalityDeleteRecordCount()
&& stats1.equalityDeleteFileCount() == stats2.equalityDeleteFileCount()
&& stats1.totalRecordCount() == stats2.totalRecordCount()
&& Objects.equals(stats1.lastUpdatedAt(), stats2.lastUpdatedAt())
&& Objects.equals(stats1.lastUpdatedSnapshotId(), stats2.lastUpdatedSnapshotId());
}
}

0 comments on commit 2b61d23

Please sign in to comment.