Skip to content

Commit

Permalink
partitions table
Browse files Browse the repository at this point in the history
  • Loading branch information
sungwy committed Apr 12, 2024
1 parent 53549c0 commit 70308c9
Showing 1 changed file with 45 additions and 17 deletions.
62 changes: 45 additions & 17 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3411,15 +3411,14 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
schema=entries_schema,
)


def partitions(self) -> "pa.Table":
import pyarrow as pa

from pyiceberg.io.pyarrow import schema_to_pyarrow

partition_record = self.tbl.metadata.specs_struct()
pa_record_struct = schema_to_pyarrow(partition_record)

partitions_schema = pa.schema([
pa.field('partition', pa_record_struct, nullable=False),
pa.field('spec_id', pa.int32(), nullable=False),
Expand All @@ -3430,25 +3429,53 @@ def partitions(self) -> "pa.Table":
pa.field('position_delete_file_count', pa.int32(), nullable=False),
pa.field('equality_delete_record_count', pa.int64(), nullable=False),
pa.field('equality_delete_file_count', pa.int32(), nullable=False),
pa.field('last_updated_at', pa.timestamp(unit='ms'), nullable=False),
pa.field('last_updated_snapshot_id', pa.int64(), nullable=False),
pa.field('last_updated_at', pa.timestamp(unit='ms'), nullable=True),
pa.field('last_updated_snapshot_id', pa.int64(), nullable=True),
])

def update_partitions_map(partitions_map: Tuple[str, Any], file: DataFile, partition_record_dict: Dict[str, Any]) -> None:
def update_partitions_map(
partitions_map: Dict[Tuple[str, Any], Any],
file: DataFile,
partition_record_dict: Dict[str, Any],
snapshot: Optional[Snapshot],
) -> None:
partition_record_key = _convert_to_hashable_type(partition_record_dict)
if partition_row := partitions_map.get(partition_record_key):
if file.content == DataFileContent.DATA:
print()
elif file.content == DataFileContent.POSITION_DELETES:
print()
elif file.content == DataFileContent.EQUALITY_DELETES:
print()
else:
raise ValueError(f"Unknown DataFileContent ({file.content})")

if partition_record_key not in partitions_map:
partitions_map[partition_record_key] = {
"partition": partition_record_dict,
"spec_id": file.spec_id,
"record_count": 0,
"file_count": 0,
"total_data_file_size_in_bytes": 0,
"position_delete_record_count": 0,
"position_delete_file_count": 0,
"equality_delete_record_count": 0,
"equality_delete_file_count": 0,
"last_updated_at": snapshot.timestamp_ms if snapshot else None,
"last_updated_snapshot_id": snapshot.snapshot_id if snapshot else None,
}

partition_row = partitions_map[partition_record_key]

if snapshot is not None:
if partition_row["last_updated_at"] is None or partition_row["last_updated_snapshot_id"] < snapshot.timestamp_ms:
partition_row["last_updated_at"] = snapshot.timestamp_ms
partition_row["last_updated_snapshot_id"] = snapshot.snapshot_id

if file.content == DataFileContent.DATA:
partition_row["record_count"] += file.record_count
partition_row["file_count"] += 1
partition_row["total_data_file_size_in_bytes"] += file.file_size_in_bytes
elif file.content == DataFileContent.POSITION_DELETES:
partition_row["position_delete_record_count"] += file.record_count
partition_row["position_delete_file_count"] += 1
elif file.content == DataFileContent.EQUALITY_DELETES:
partition_row["equality_delete_record_count"] += file.record_count
partition_row["equality_delete_file_count"] += 1
else:
raise ValueError(f"Unknown DataFileContent ({file.content})")

partitions_map = dict()
partitions_map: Dict[Tuple[str, Any], Any] = {}
if snapshot := self.tbl.metadata.current_snapshot():
for manifest in snapshot.manifests(self.tbl.io):
for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
Expand All @@ -3457,7 +3484,8 @@ def update_partitions_map(partitions_map: Tuple[str, Any], file: DataFile, parti
field.name: partition[pos]
for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
}
update_partitions_map(partitions_map, entry.data_file, partition_record_dict)
entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None
update_partitions_map(partitions_map, entry.data_file, partition_record_dict, entry_snapshot)

return pa.Table.from_pylist(
partitions_map.values(),
Expand Down

0 comments on commit 70308c9

Please sign in to comment.