Skip to content

Commit eba4bee

Browse files
authored
Initial implementation of the manifest table (#717)
1 parent 996afd0 commit eba4bee

File tree

3 files changed

+222
-0
lines changed

3 files changed

+222
-0
lines changed

mkdocs/docs/api.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,56 @@ min_snapshots_to_keep: [[null,10]]
606606
max_snapshot_age_in_ms: [[null,604800000]]
607607
```
608608

609+
### Manifests
610+
611+
To show a table's current file manifests:
612+
613+
```python
614+
table.inspect.manifests()
615+
```
616+
617+
```
618+
pyarrow.Table
619+
content: int8 not null
620+
path: string not null
621+
length: int64 not null
622+
partition_spec_id: int32 not null
623+
added_snapshot_id: int64 not null
624+
added_data_files_count: int32 not null
625+
existing_data_files_count: int32 not null
626+
deleted_data_files_count: int32 not null
627+
added_delete_files_count: int32 not null
628+
existing_delete_files_count: int32 not null
629+
deleted_delete_files_count: int32 not null
630+
partition_summaries: list<item: struct<contains_null: bool not null, contains_nan: bool, lower_bound: string, upper_bound: string>> not null
631+
child 0, item: struct<contains_null: bool not null, contains_nan: bool, lower_bound: string, upper_bound: string>
632+
child 0, contains_null: bool not null
633+
child 1, contains_nan: bool
634+
child 2, lower_bound: string
635+
child 3, upper_bound: string
636+
----
637+
content: [[0]]
638+
path: [["s3://warehouse/default/table_metadata_manifests/metadata/3bf5b4c6-a7a4-4b43-a6ce-ca2b4887945a-m0.avro"]]
639+
length: [[6886]]
640+
partition_spec_id: [[0]]
641+
added_snapshot_id: [[3815834705531553721]]
642+
added_data_files_count: [[1]]
643+
existing_data_files_count: [[0]]
644+
deleted_data_files_count: [[0]]
645+
added_delete_files_count: [[0]]
646+
existing_delete_files_count: [[0]]
647+
deleted_delete_files_count: [[0]]
648+
partition_summaries: [[ -- is_valid: all not null
649+
-- child 0 type: bool
650+
[false]
651+
-- child 1 type: bool
652+
[false]
653+
-- child 2 type: string
654+
["test"]
655+
-- child 3 type: string
656+
["test"]]]
657+
```
658+
609659
## Add Files
610660

611661
Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them.

pyiceberg/table/__init__.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
ManifestEntry,
7272
ManifestEntryStatus,
7373
ManifestFile,
74+
PartitionFieldSummary,
7475
write_manifest,
7576
write_manifest_list,
7677
)
@@ -3547,6 +3548,94 @@ def update_partitions_map(
35473548
schema=table_schema,
35483549
)
35493550

3551+
def manifests(self) -> "pa.Table":
3552+
import pyarrow as pa
3553+
3554+
from pyiceberg.conversions import from_bytes
3555+
3556+
partition_summary_schema = pa.struct([
3557+
pa.field("contains_null", pa.bool_(), nullable=False),
3558+
pa.field("contains_nan", pa.bool_(), nullable=True),
3559+
pa.field("lower_bound", pa.string(), nullable=True),
3560+
pa.field("upper_bound", pa.string(), nullable=True),
3561+
])
3562+
3563+
manifest_schema = pa.schema([
3564+
pa.field('content', pa.int8(), nullable=False),
3565+
pa.field('path', pa.string(), nullable=False),
3566+
pa.field('length', pa.int64(), nullable=False),
3567+
pa.field('partition_spec_id', pa.int32(), nullable=False),
3568+
pa.field('added_snapshot_id', pa.int64(), nullable=False),
3569+
pa.field('added_data_files_count', pa.int32(), nullable=False),
3570+
pa.field('existing_data_files_count', pa.int32(), nullable=False),
3571+
pa.field('deleted_data_files_count', pa.int32(), nullable=False),
3572+
pa.field('added_delete_files_count', pa.int32(), nullable=False),
3573+
pa.field('existing_delete_files_count', pa.int32(), nullable=False),
3574+
pa.field('deleted_delete_files_count', pa.int32(), nullable=False),
3575+
pa.field('partition_summaries', pa.list_(partition_summary_schema), nullable=False),
3576+
])
3577+
3578+
def _partition_summaries_to_rows(
3579+
spec: PartitionSpec, partition_summaries: List[PartitionFieldSummary]
3580+
) -> List[Dict[str, Any]]:
3581+
rows = []
3582+
for i, field_summary in enumerate(partition_summaries):
3583+
field = spec.fields[i]
3584+
partition_field_type = spec.partition_type(self.tbl.schema()).fields[i].field_type
3585+
lower_bound = (
3586+
(
3587+
field.transform.to_human_string(
3588+
partition_field_type, from_bytes(partition_field_type, field_summary.lower_bound)
3589+
)
3590+
)
3591+
if field_summary.lower_bound
3592+
else None
3593+
)
3594+
upper_bound = (
3595+
(
3596+
field.transform.to_human_string(
3597+
partition_field_type, from_bytes(partition_field_type, field_summary.upper_bound)
3598+
)
3599+
)
3600+
if field_summary.upper_bound
3601+
else None
3602+
)
3603+
rows.append({
3604+
'contains_null': field_summary.contains_null,
3605+
'contains_nan': field_summary.contains_nan,
3606+
'lower_bound': lower_bound,
3607+
'upper_bound': upper_bound,
3608+
})
3609+
return rows
3610+
3611+
specs = self.tbl.metadata.specs()
3612+
manifests = []
3613+
if snapshot := self.tbl.metadata.current_snapshot():
3614+
for manifest in snapshot.manifests(self.tbl.io):
3615+
is_data_file = manifest.content == ManifestContent.DATA
3616+
is_delete_file = manifest.content == ManifestContent.DELETES
3617+
manifests.append({
3618+
'content': manifest.content,
3619+
'path': manifest.manifest_path,
3620+
'length': manifest.manifest_length,
3621+
'partition_spec_id': manifest.partition_spec_id,
3622+
'added_snapshot_id': manifest.added_snapshot_id,
3623+
'added_data_files_count': manifest.added_files_count if is_data_file else 0,
3624+
'existing_data_files_count': manifest.existing_files_count if is_data_file else 0,
3625+
'deleted_data_files_count': manifest.deleted_files_count if is_data_file else 0,
3626+
'added_delete_files_count': manifest.added_files_count if is_delete_file else 0,
3627+
'existing_delete_files_count': manifest.existing_files_count if is_delete_file else 0,
3628+
'deleted_delete_files_count': manifest.deleted_files_count if is_delete_file else 0,
3629+
'partition_summaries': _partition_summaries_to_rows(specs[manifest.partition_spec_id], manifest.partitions)
3630+
if manifest.partitions
3631+
else [],
3632+
})
3633+
3634+
return pa.Table.from_pylist(
3635+
manifests,
3636+
schema=manifest_schema,
3637+
)
3638+
35503639

35513640
@dataclass(frozen=True)
35523641
class TablePartition:

tests/integration/test_inspect_table.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,3 +445,86 @@ def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> Non
445445
df = tbl.inspect.partitions(snapshot_id=snapshot.snapshot_id)
446446
spark_df = spark.sql(f"SELECT * FROM {identifier}.partitions VERSION AS OF {snapshot.snapshot_id}")
447447
check_pyiceberg_df_equals_spark_df(df, spark_df)
448+
449+
450+
@pytest.mark.integration
451+
@pytest.mark.parametrize("format_version", [1, 2])
452+
def test_inspect_manifests(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
453+
identifier = "default.table_metadata_manifests"
454+
try:
455+
session_catalog.drop_table(identifier=identifier)
456+
except NoSuchTableError:
457+
pass
458+
459+
spark.sql(
460+
f"""
461+
CREATE TABLE {identifier} (
462+
id int,
463+
data string
464+
)
465+
PARTITIONED BY (data)
466+
"""
467+
)
468+
469+
spark.sql(
470+
f"""
471+
INSERT INTO {identifier} VALUES (1, "a")
472+
"""
473+
)
474+
475+
spark.sql(
476+
f"""
477+
INSERT INTO {identifier} VALUES (2, "b")
478+
"""
479+
)
480+
481+
df = session_catalog.load_table(identifier).inspect.manifests()
482+
483+
assert df.column_names == [
484+
'content',
485+
'path',
486+
'length',
487+
'partition_spec_id',
488+
'added_snapshot_id',
489+
'added_data_files_count',
490+
'existing_data_files_count',
491+
'deleted_data_files_count',
492+
'added_delete_files_count',
493+
'existing_delete_files_count',
494+
'deleted_delete_files_count',
495+
'partition_summaries',
496+
]
497+
498+
int_cols = [
499+
'content',
500+
'length',
501+
'partition_spec_id',
502+
'added_snapshot_id',
503+
'added_data_files_count',
504+
'existing_data_files_count',
505+
'deleted_data_files_count',
506+
'added_delete_files_count',
507+
'existing_delete_files_count',
508+
'deleted_delete_files_count',
509+
]
510+
511+
for column in int_cols:
512+
for value in df[column]:
513+
assert isinstance(value.as_py(), int)
514+
515+
for value in df["path"]:
516+
assert isinstance(value.as_py(), str)
517+
518+
for value in df["partition_summaries"]:
519+
assert isinstance(value.as_py(), list)
520+
for row in value:
521+
assert isinstance(row["contains_null"].as_py(), bool)
522+
assert isinstance(row["contains_nan"].as_py(), (bool, type(None)))
523+
assert isinstance(row["lower_bound"].as_py(), (str, type(None)))
524+
assert isinstance(row["upper_bound"].as_py(), (str, type(None)))
525+
526+
lhs = spark.table(f"{identifier}.manifests").toPandas()
527+
rhs = df.to_pandas()
528+
for column in df.column_names:
529+
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
530+
assert left == right, f"Difference in column {column}: {left} != {right}"

0 commit comments

Comments
 (0)