Skip to content

Commit 49ac3a2

Browse files
authored
Add Partitions Metadata Table (#603)
1 parent aa46543 commit 49ac3a2

File tree

3 files changed

+250
-0
lines changed

3 files changed

+250
-0
lines changed

mkdocs/docs/api.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,47 @@ manifest_list: [["s3://warehouse/default/table_metadata_snapshots/metadata/snap-
382382
summary: [[keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","1","0","3","5459","0","0"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-records",...,"total-equality-deletes","total-files-size","deleted-data-files","deleted-records","removed-files-size"]values:["5459","1","3","1","3",...,"0","5459","1","3","5459"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","2","0","6","10918","0","0"]]]
383383
```
384384
385+
### Partitions
386+
387+
Inspect the partitions of the table:
388+
389+
```python
390+
table.inspect.partitions()
391+
```
392+
393+
```
394+
pyarrow.Table
395+
partition: struct<dt_month: int32, dt_day: date32[day]> not null
396+
child 0, dt_month: int32
397+
child 1, dt_day: date32[day]
398+
spec_id: int32 not null
399+
record_count: int64 not null
400+
file_count: int32 not null
401+
total_data_file_size_in_bytes: int64 not null
402+
position_delete_record_count: int64 not null
403+
position_delete_file_count: int32 not null
404+
equality_delete_record_count: int64 not null
405+
equality_delete_file_count: int32 not null
406+
last_updated_at: timestamp[ms]
407+
last_updated_snapshot_id: int64
408+
----
409+
partition: [
410+
-- is_valid: all not null
411+
-- child 0 type: int32
412+
[null,null,612]
413+
-- child 1 type: date32[day]
414+
[null,2021-02-01,null]]
415+
spec_id: [[2,1,0]]
416+
record_count: [[1,1,2]]
417+
file_count: [[1,1,2]]
418+
total_data_file_size_in_bytes: [[641,641,1260]]
419+
position_delete_record_count: [[0,0,0]]
420+
position_delete_file_count: [[0,0,0]]
421+
equality_delete_record_count: [[0,0,0]]
422+
equality_delete_file_count: [[0,0,0]]
423+
last_updated_at: [[2024-04-13 18:59:35.981,2024-04-13 18:59:35.465,2024-04-13 18:59:35.003]]
424+
```
425+
385426
### Entries
386427

387428
To show all the table's current manifest entries for both data and delete files.

pyiceberg/table/__init__.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@
137137
)
138138
from pyiceberg.utils.concurrent import ExecutorFactory
139139
from pyiceberg.utils.datetime import datetime_to_millis
140+
from pyiceberg.utils.singleton import _convert_to_hashable_type
140141

141142
if TYPE_CHECKING:
142143
import daft
@@ -3422,6 +3423,94 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
34223423
schema=entries_schema,
34233424
)
34243425

3426+
def partitions(self, snapshot_id: Optional[int] = None) -> "pa.Table":
3427+
import pyarrow as pa
3428+
3429+
from pyiceberg.io.pyarrow import schema_to_pyarrow
3430+
3431+
table_schema = pa.schema([
3432+
pa.field('record_count', pa.int64(), nullable=False),
3433+
pa.field('file_count', pa.int32(), nullable=False),
3434+
pa.field('total_data_file_size_in_bytes', pa.int64(), nullable=False),
3435+
pa.field('position_delete_record_count', pa.int64(), nullable=False),
3436+
pa.field('position_delete_file_count', pa.int32(), nullable=False),
3437+
pa.field('equality_delete_record_count', pa.int64(), nullable=False),
3438+
pa.field('equality_delete_file_count', pa.int32(), nullable=False),
3439+
pa.field('last_updated_at', pa.timestamp(unit='ms'), nullable=True),
3440+
pa.field('last_updated_snapshot_id', pa.int64(), nullable=True),
3441+
])
3442+
3443+
partition_record = self.tbl.metadata.specs_struct()
3444+
has_partitions = len(partition_record.fields) > 0
3445+
3446+
if has_partitions:
3447+
pa_record_struct = schema_to_pyarrow(partition_record)
3448+
partitions_schema = pa.schema([
3449+
pa.field('partition', pa_record_struct, nullable=False),
3450+
pa.field('spec_id', pa.int32(), nullable=False),
3451+
])
3452+
3453+
table_schema = pa.unify_schemas([partitions_schema, table_schema])
3454+
3455+
def update_partitions_map(
3456+
partitions_map: Dict[Tuple[str, Any], Any],
3457+
file: DataFile,
3458+
partition_record_dict: Dict[str, Any],
3459+
snapshot: Optional[Snapshot],
3460+
) -> None:
3461+
partition_record_key = _convert_to_hashable_type(partition_record_dict)
3462+
if partition_record_key not in partitions_map:
3463+
partitions_map[partition_record_key] = {
3464+
"partition": partition_record_dict,
3465+
"spec_id": file.spec_id,
3466+
"record_count": 0,
3467+
"file_count": 0,
3468+
"total_data_file_size_in_bytes": 0,
3469+
"position_delete_record_count": 0,
3470+
"position_delete_file_count": 0,
3471+
"equality_delete_record_count": 0,
3472+
"equality_delete_file_count": 0,
3473+
"last_updated_at": snapshot.timestamp_ms if snapshot else None,
3474+
"last_updated_snapshot_id": snapshot.snapshot_id if snapshot else None,
3475+
}
3476+
3477+
partition_row = partitions_map[partition_record_key]
3478+
3479+
if snapshot is not None:
3480+
if partition_row["last_updated_at"] is None or partition_row["last_updated_snapshot_id"] < snapshot.timestamp_ms:
3481+
partition_row["last_updated_at"] = snapshot.timestamp_ms
3482+
partition_row["last_updated_snapshot_id"] = snapshot.snapshot_id
3483+
3484+
if file.content == DataFileContent.DATA:
3485+
partition_row["record_count"] += file.record_count
3486+
partition_row["file_count"] += 1
3487+
partition_row["total_data_file_size_in_bytes"] += file.file_size_in_bytes
3488+
elif file.content == DataFileContent.POSITION_DELETES:
3489+
partition_row["position_delete_record_count"] += file.record_count
3490+
partition_row["position_delete_file_count"] += 1
3491+
elif file.content == DataFileContent.EQUALITY_DELETES:
3492+
partition_row["equality_delete_record_count"] += file.record_count
3493+
partition_row["equality_delete_file_count"] += 1
3494+
else:
3495+
raise ValueError(f"Unknown DataFileContent ({file.content})")
3496+
3497+
partitions_map: Dict[Tuple[str, Any], Any] = {}
3498+
snapshot = self._get_snapshot(snapshot_id)
3499+
for manifest in snapshot.manifests(self.tbl.io):
3500+
for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
3501+
partition = entry.data_file.partition
3502+
partition_record_dict = {
3503+
field.name: partition[pos]
3504+
for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
3505+
}
3506+
entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None
3507+
update_partitions_map(partitions_map, entry.data_file, partition_record_dict, entry_snapshot)
3508+
3509+
return pa.Table.from_pylist(
3510+
partitions_map.values(),
3511+
schema=table_schema,
3512+
)
3513+
34253514

34263515
@dataclass(frozen=True)
34273516
class TablePartition:

tests/integration/test_inspect_table.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,3 +270,123 @@ def test_inspect_entries_partitioned(spark: SparkSession, session_catalog: Catal
270270

271271
assert df.to_pydict()['data_file'][0]['partition'] == {'dt_day': date(2021, 2, 1), 'dt_month': None}
272272
assert df.to_pydict()['data_file'][1]['partition'] == {'dt_day': None, 'dt_month': 612}
273+
274+
275+
@pytest.mark.integration
276+
@pytest.mark.parametrize("format_version", [1, 2])
277+
def test_inspect_partitions_unpartitioned(
278+
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
279+
) -> None:
280+
identifier = "default.table_metadata_partitions_unpartitioned"
281+
tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version})
282+
283+
# Write some data through multiple commits
284+
tbl.append(arrow_table_with_null)
285+
tbl.append(arrow_table_with_null)
286+
287+
df = tbl.inspect.partitions()
288+
assert df.column_names == [
289+
'record_count',
290+
'file_count',
291+
'total_data_file_size_in_bytes',
292+
'position_delete_record_count',
293+
'position_delete_file_count',
294+
'equality_delete_record_count',
295+
'equality_delete_file_count',
296+
'last_updated_at',
297+
'last_updated_snapshot_id',
298+
]
299+
for last_updated_at in df['last_updated_at']:
300+
assert isinstance(last_updated_at.as_py(), datetime)
301+
302+
int_cols = [
303+
'record_count',
304+
'file_count',
305+
'total_data_file_size_in_bytes',
306+
'position_delete_record_count',
307+
'position_delete_file_count',
308+
'equality_delete_record_count',
309+
'equality_delete_file_count',
310+
'last_updated_snapshot_id',
311+
]
312+
for column in int_cols:
313+
for value in df[column]:
314+
assert isinstance(value.as_py(), int)
315+
lhs = df.to_pandas()
316+
rhs = spark.table(f"{identifier}.partitions").toPandas()
317+
for column in df.column_names:
318+
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
319+
assert left == right, f"Difference in column {column}: {left} != {right}"
320+
321+
322+
@pytest.mark.integration
323+
@pytest.mark.parametrize("format_version", [1, 2])
324+
def test_inspect_partitions_partitioned(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
325+
identifier = "default.table_metadata_partitions_partitioned"
326+
try:
327+
session_catalog.drop_table(identifier=identifier)
328+
except NoSuchTableError:
329+
pass
330+
331+
spark.sql(
332+
f"""
333+
CREATE TABLE {identifier} (
334+
name string,
335+
dt date
336+
)
337+
PARTITIONED BY (months(dt))
338+
"""
339+
)
340+
341+
spark.sql(
342+
f"""
343+
INSERT INTO {identifier} VALUES ('John', CAST('2021-01-01' AS date))
344+
"""
345+
)
346+
347+
spark.sql(
348+
f"""
349+
INSERT INTO {identifier} VALUES ('Doe', CAST('2021-01-05' AS date))
350+
"""
351+
)
352+
353+
spark.sql(
354+
f"""
355+
ALTER TABLE {identifier}
356+
REPLACE PARTITION FIELD dt_month WITH days(dt)
357+
"""
358+
)
359+
360+
spark.sql(
361+
f"""
362+
INSERT INTO {identifier} VALUES ('Jenny', CAST('2021-02-01' AS date))
363+
"""
364+
)
365+
366+
spark.sql(
367+
f"""
368+
ALTER TABLE {identifier}
369+
DROP PARTITION FIELD dt_day
370+
"""
371+
)
372+
373+
spark.sql(
374+
f"""
375+
INSERT INTO {identifier} VALUES ('James', CAST('2021-02-01' AS date))
376+
"""
377+
)
378+
379+
def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> None:
380+
lhs = df.to_pandas().sort_values('spec_id')
381+
rhs = spark_df.toPandas().sort_values('spec_id')
382+
for column in df.column_names:
383+
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
384+
if column == "partition":
385+
right = right.asDict()
386+
assert left == right, f"Difference in column {column}: {left} != {right}"
387+
388+
tbl = session_catalog.load_table(identifier)
389+
for snapshot in tbl.metadata.snapshots:
390+
df = tbl.inspect.partitions(snapshot_id=snapshot.snapshot_id)
391+
spark_df = spark.sql(f"SELECT * FROM {identifier}.partitions VERSION AS OF {snapshot.snapshot_id}")
392+
check_pyiceberg_df_equals_spark_df(df, spark_df)

0 commit comments

Comments
 (0)