|
44 | 44 | import org.apache.amoro.server.persistence.mapper.TableMetaMapper; |
45 | 45 | import org.apache.amoro.server.persistence.mapper.TableProcessMapper; |
46 | 46 | import org.apache.amoro.server.process.TableProcessMeta; |
| 47 | +import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; |
47 | 48 | import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList; |
48 | 49 | import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables; |
49 | 50 | import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; |
|
74 | 75 | import org.apache.commons.collections.CollectionUtils; |
75 | 76 | import org.apache.commons.lang3.tuple.Pair; |
76 | 77 | import org.apache.iceberg.ContentFile; |
| 78 | +import org.apache.iceberg.FileScanTask; |
77 | 79 | import org.apache.iceberg.HasTableOperations; |
78 | 80 | import org.apache.iceberg.IcebergFindFiles; |
| 81 | +import org.apache.iceberg.MetadataTableType; |
| 82 | +import org.apache.iceberg.MetadataTableUtils; |
79 | 83 | import org.apache.iceberg.PartitionField; |
80 | 84 | import org.apache.iceberg.PartitionSpec; |
81 | 85 | import org.apache.iceberg.Schema; |
82 | 86 | import org.apache.iceberg.Snapshot; |
83 | 87 | import org.apache.iceberg.SnapshotRef; |
84 | 88 | import org.apache.iceberg.SnapshotSummary; |
| 89 | +import org.apache.iceberg.StructLike; |
85 | 90 | import org.apache.iceberg.Table; |
| 91 | +import org.apache.iceberg.TableOperations; |
| 92 | +import org.apache.iceberg.TableScan; |
86 | 93 | import org.apache.iceberg.data.GenericRecord; |
87 | 94 | import org.apache.iceberg.io.CloseableIterable; |
88 | 95 | import org.apache.iceberg.types.Types; |
@@ -445,35 +452,170 @@ public List<PartitionBaseInfo> getTablePartitions(AmoroTable<?> amoroTable) { |
445 | 452 | if (mixedTable.spec().isUnpartitioned()) { |
446 | 453 | return new ArrayList<>(); |
447 | 454 | } |
448 | | - Map<String, PartitionBaseInfo> partitionBaseInfoHashMap = new HashMap<>(); |
449 | 455 |
|
450 | | - CloseableIterable<PartitionFileBaseInfo> tableFiles = |
451 | | - getTableFilesInternal(amoroTable, null, null); |
| 456 | + List<PartitionBaseInfo> result = new ArrayList<>(); |
| 457 | + |
| 458 | + // For keyed tables, we need to collect partitions from both change and base tables |
| 459 | + if (mixedTable.isKeyedTable()) { |
| 460 | + Map<String, PartitionBaseInfo> partitionMap = new HashMap<>(); |
| 461 | + |
| 462 | + // Collect from change table |
| 463 | + List<PartitionBaseInfo> changePartitions = |
| 464 | + collectPartitionsFromTable(mixedTable.asKeyedTable().changeTable()); |
| 465 | + for (PartitionBaseInfo partition : changePartitions) { |
| 466 | + partitionMap.put(partition.getPartition(), partition); |
| 467 | + } |
| 468 | + |
| 469 | + // Collect from base table and merge |
| 470 | + List<PartitionBaseInfo> basePartitions = |
| 471 | + collectPartitionsFromTable(mixedTable.asKeyedTable().baseTable()); |
| 472 | + for (PartitionBaseInfo basePartition : basePartitions) { |
| 473 | + if (partitionMap.containsKey(basePartition.getPartition())) { |
| 474 | + PartitionBaseInfo existing = partitionMap.get(basePartition.getPartition()); |
| 475 | + existing.setFileCount(existing.getFileCount() + basePartition.getFileCount()); |
| 476 | + existing.setFileSize(existing.getFileSize() + basePartition.getFileSize()); |
| 477 | + } else { |
| 478 | + partitionMap.put(basePartition.getPartition(), basePartition); |
| 479 | + } |
| 480 | + } |
| 481 | + |
| 482 | + result.addAll(partitionMap.values()); |
| 483 | + } else { |
| 484 | + result = collectPartitionsFromTable(mixedTable.asUnkeyedTable()); |
| 485 | + } |
| 486 | + |
| 487 | + return result; |
| 488 | + } |
| 489 | + |
| 490 | + /** |
| 491 | + * Collect partition information from an Iceberg table using the PARTITIONS metadata table. This |
| 492 | + * is much more efficient than scanning all data files, especially for tables with many files. |
| 493 | + * |
| 494 | + * @param table The Iceberg table to collect partitions from |
| 495 | + * @return List of partition information |
| 496 | + */ |
| 497 | + private List<PartitionBaseInfo> collectPartitionsFromTable(Table table) { |
| 498 | + List<PartitionBaseInfo> partitions = new ArrayList<>(); |
| 499 | + |
452 | 500 | try { |
453 | | - for (PartitionFileBaseInfo fileInfo : tableFiles) { |
454 | | - if (!partitionBaseInfoHashMap.containsKey(fileInfo.getPartition())) { |
455 | | - PartitionBaseInfo partitionBaseInfo = new PartitionBaseInfo(); |
456 | | - partitionBaseInfo.setPartition(fileInfo.getPartition()); |
457 | | - partitionBaseInfo.setSpecId(fileInfo.getSpecId()); |
458 | | - partitionBaseInfoHashMap.put(fileInfo.getPartition(), partitionBaseInfo); |
| 501 | + Preconditions.checkArgument( |
| 502 | + table instanceof HasTableOperations, "table must support table operations"); |
| 503 | + TableOperations ops = ((HasTableOperations) table).operations(); |
| 504 | + |
| 505 | + // Use PARTITIONS metadata table for efficient partition statistics |
| 506 | + Table partitionsTable = |
| 507 | + MetadataTableUtils.createMetadataTableInstance( |
| 508 | + ops, |
| 509 | + table.name(), |
| 510 | + table.name() + "#" + MetadataTableType.PARTITIONS.name(), |
| 511 | + MetadataTableType.PARTITIONS); |
| 512 | + |
| 513 | + TableScan scan = partitionsTable.newScan(); |
| 514 | + try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) { |
| 515 | + CloseableIterable<CloseableIterable<StructLike>> transform = |
| 516 | + CloseableIterable.transform(tasks, task -> task.asDataTask().rows()); |
| 517 | + |
| 518 | + try (CloseableIterable<StructLike> rows = CloseableIterable.concat(transform)) { |
| 519 | + for (StructLike row : rows) { |
| 520 | + PartitionBaseInfo partitionInfo = new PartitionBaseInfo(); |
| 521 | + |
| 522 | + // Get partition field - it's a struct |
| 523 | + StructLike partition = row.get(0, StructLike.class); |
| 524 | + int specId = row.get(1, Integer.class); |
| 525 | + |
| 526 | + // Convert partition struct to path string |
| 527 | + PartitionSpec spec = table.specs().get(specId); |
| 528 | + String partitionPath = spec.partitionToPath(partition); |
| 529 | + |
| 530 | + partitionInfo.setPartition(partitionPath); |
| 531 | + partitionInfo.setSpecId(specId); |
| 532 | + |
| 533 | + // Get file statistics from the partition metadata table |
| 534 | + // Schema: partition, spec_id, record_count, file_count, |
| 535 | + // total_data_file_size_in_bytes, |
| 536 | + // position_delete_record_count, position_delete_file_count, |
| 537 | + // equality_delete_record_count, equality_delete_file_count, |
| 538 | + // last_updated_at, last_updated_snapshot_id |
| 539 | + Integer dataFileCount = row.get(3, Integer.class); |
| 540 | + Long totalDataFileSize = row.get(4, Long.class); |
| 541 | + Integer posDeleteFileCount = row.get(6, Integer.class); |
| 542 | + Integer eqDeleteFileCount = row.get(8, Integer.class); |
| 543 | + Long lastUpdatedAt = row.get(9, Long.class); |
| 544 | + |
| 545 | + // Total file count = data files + position delete files + equality delete files |
| 546 | + int totalFileCount = |
| 547 | + (dataFileCount != null ? dataFileCount : 0) |
| 548 | + + (posDeleteFileCount != null ? posDeleteFileCount : 0) |
| 549 | + + (eqDeleteFileCount != null ? eqDeleteFileCount : 0); |
| 550 | + partitionInfo.setFileCount(totalFileCount); |
| 551 | + // TODO: Iceberg partitions table currently only reports data file sizes, not delete |
| 552 | + // file sizes. |
| 553 | + // This will be updated once Iceberg supports reporting delete file sizes. |
| 554 | + // See: https://github.com/apache/iceberg/issues/14803 |
| 555 | + partitionInfo.setFileSize(totalDataFileSize != null ? totalDataFileSize : 0L); |
| 556 | + partitionInfo.setLastCommitTime(lastUpdatedAt != null ? lastUpdatedAt : 0L); |
| 557 | + |
| 558 | + partitions.add(partitionInfo); |
| 559 | + } |
459 | 560 | } |
460 | | - PartitionBaseInfo partitionInfo = partitionBaseInfoHashMap.get(fileInfo.getPartition()); |
461 | | - partitionInfo.setFileCount(partitionInfo.getFileCount() + 1); |
462 | | - partitionInfo.setFileSize(partitionInfo.getFileSize() + fileInfo.getFileSize()); |
463 | | - partitionInfo.setLastCommitTime( |
464 | | - partitionInfo.getLastCommitTime() > fileInfo.getCommitTime() |
465 | | - ? partitionInfo.getLastCommitTime() |
466 | | - : fileInfo.getCommitTime()); |
467 | 561 | } |
468 | | - } finally { |
469 | | - try { |
470 | | - tableFiles.close(); |
471 | | - } catch (IOException e) { |
472 | | - LOG.warn("Failed to close the manifest reader.", e); |
| 562 | + } catch (Exception e) { |
| 563 | + LOG.warn( |
| 564 | + "Failed to use PARTITIONS metadata table, falling back to file scanning. Error: {}", |
| 565 | + e.getMessage()); |
| 566 | + // Fallback to the old approach if metadata table access fails |
| 567 | + return collectPartitionsFromFileScan(table); |
| 568 | + } |
| 569 | + |
| 570 | + return partitions; |
| 571 | + } |
| 572 | + |
| 573 | + /** |
| 574 | + * Fallback method to collect partitions by scanning all files. This is kept for backward |
| 575 | + * compatibility and error cases. |
| 576 | + * |
| 577 | + * @param table The Iceberg table to scan |
| 578 | + * @return List of partition information |
| 579 | + */ |
| 580 | + protected List<PartitionBaseInfo> collectPartitionsFromFileScan(Table table) { |
| 581 | + Map<String, PartitionBaseInfo> partitionMap = new HashMap<>(); |
| 582 | + |
| 583 | + IcebergFindFiles manifestReader = |
| 584 | + new IcebergFindFiles(table).ignoreDeleted().planWith(executorService); |
| 585 | + |
| 586 | + try (CloseableIterable<IcebergFindFiles.IcebergManifestEntry> entries = |
| 587 | + manifestReader.entries()) { |
| 588 | + |
| 589 | + for (IcebergFindFiles.IcebergManifestEntry entry : entries) { |
| 590 | + ContentFile<?> file = entry.getFile(); |
| 591 | + PartitionSpec spec = table.specs().get(file.specId()); |
| 592 | + String partitionPath = spec.partitionToPath(file.partition()); |
| 593 | + |
| 594 | + if (!partitionMap.containsKey(partitionPath)) { |
| 595 | + PartitionBaseInfo partitionInfo = new PartitionBaseInfo(); |
| 596 | + partitionInfo.setPartition(partitionPath); |
| 597 | + partitionInfo.setSpecId(file.specId()); |
| 598 | + partitionInfo.setFileCount(0); |
| 599 | + partitionInfo.setFileSize(0L); |
| 600 | + partitionInfo.setLastCommitTime(0L); |
| 601 | + partitionMap.put(partitionPath, partitionInfo); |
| 602 | + } |
| 603 | + |
| 604 | + PartitionBaseInfo partitionInfo = partitionMap.get(partitionPath); |
| 605 | + partitionInfo.setFileCount(partitionInfo.getFileCount() + 1); |
| 606 | + partitionInfo.setFileSize(partitionInfo.getFileSize() + file.fileSizeInBytes()); |
| 607 | + |
| 608 | + long snapshotId = entry.getSnapshotId(); |
| 609 | + if (table.snapshot(snapshotId) != null) { |
| 610 | + long commitTime = table.snapshot(snapshotId).timestampMillis(); |
| 611 | + partitionInfo.setLastCommitTime(Math.max(partitionInfo.getLastCommitTime(), commitTime)); |
| 612 | + } |
473 | 613 | } |
| 614 | + } catch (IOException e) { |
| 615 | + LOG.error("Failed to scan files for partition information", e); |
474 | 616 | } |
475 | 617 |
|
476 | | - return new ArrayList<>(partitionBaseInfoHashMap.values()); |
| 618 | + return new ArrayList<>(partitionMap.values()); |
477 | 619 | } |
478 | 620 |
|
479 | 621 | @Override |
|
0 commit comments