Skip to content

Commit

Permalink
Add Snapshots table metadata (#524)
Browse files Browse the repository at this point in the history
* Add Snapshots table metadata

* Use Spark for tests
  • Loading branch information
Fokko authored Mar 21, 2024
1 parent bbc7e7c commit 69b9e39
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 4 deletions.
32 changes: 32 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,38 @@ table.append(df)

<!-- prettier-ignore-end -->

## Inspecting tables

To explore the table metadata, tables can be inspected.

### Snapshots

Inspect the snapshots of the table:

```python
table.inspect.snapshots()
```

```
pyarrow.Table
committed_at: timestamp[ms] not null
snapshot_id: int64 not null
parent_id: int64
operation: string
manifest_list: string not null
summary: map<string, string>
child 0, entries: struct<key: string not null, value: string> not null
child 0, key: string not null
child 1, value: string
----
committed_at: [[2024-03-15 15:01:25.682,2024-03-15 15:01:25.730,2024-03-15 15:01:25.772]]
snapshot_id: [[805611270568163028,3679426539959220963,5588071473139865870]]
parent_id: [[null,805611270568163028,3679426539959220963]]
operation: [["append","overwrite","append"]]
manifest_list: [["s3://warehouse/default/table_metadata_snapshots/metadata/snap-805611270568163028-0-43637daf-ea4b-4ceb-b096-a60c25481eb5.avro","s3://warehouse/default/table_metadata_snapshots/metadata/snap-3679426539959220963-0-8be81019-adf1-4bb6-a127-e15217bd50b3.avro","s3://warehouse/default/table_metadata_snapshots/metadata/snap-5588071473139865870-0-1382dd7e-5fbc-4c51-9776-a832d7d0984e.avro"]]
summary: [[keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","1","0","3","5459","0","0"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-records",...,"total-equality-deletes","total-files-size","deleted-data-files","deleted-records","removed-files-size"]values:["5459","1","3","1","3",...,"0","5459","1","3","5459"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","2","0","6","10918","0","0"]]]
```
### Add Files
Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them.
Expand Down
51 changes: 51 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,11 @@ def transaction(self) -> Transaction:
"""
return Transaction(self)

@property
def inspect(self) -> InspectTable:
"""Return the InspectTable object to browse the table metadata."""
return InspectTable(self)

def refresh(self) -> Table:
"""Refresh the current table metadata."""
fresh = self.catalog.load_table(self.identifier[1:])
Expand Down Expand Up @@ -3046,3 +3051,49 @@ def _new_field_id(self) -> int:

def _is_duplicate_partition(self, transform: Transform[Any, Any], partition_field: PartitionField) -> bool:
return partition_field.field_id not in self._deletes and partition_field.transform == transform


class InspectTable:
tbl: Table

def __init__(self, tbl: Table) -> None:
self.tbl = tbl

try:
import pyarrow as pa # noqa
except ModuleNotFoundError as e:
raise ModuleNotFoundError("For metadata operations PyArrow needs to be installed") from e

def snapshots(self) -> "pa.Table":
import pyarrow as pa

snapshots_schema = pa.schema([
pa.field('committed_at', pa.timestamp(unit='ms'), nullable=False),
pa.field('snapshot_id', pa.int64(), nullable=False),
pa.field('parent_id', pa.int64(), nullable=True),
pa.field('operation', pa.string(), nullable=True),
pa.field('manifest_list', pa.string(), nullable=False),
pa.field('summary', pa.map_(pa.string(), pa.string()), nullable=True),
])
snapshots = []
for snapshot in self.tbl.metadata.snapshots:
if summary := snapshot.summary:
operation = summary.operation.value
additional_properties = snapshot.summary.additional_properties
else:
operation = None
additional_properties = None

snapshots.append({
'committed_at': datetime.datetime.utcfromtimestamp(snapshot.timestamp_ms / 1000.0),
'snapshot_id': snapshot.snapshot_id,
'parent_id': snapshot.parent_snapshot_id,
'operation': str(operation),
'manifest_list': snapshot.manifest_list,
'summary': additional_properties,
})

return pa.Table.from_pylist(
snapshots,
schema=snapshots_schema,
)
79 changes: 75 additions & 4 deletions tests/integration/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
# specific language governing permissions and limitations
# under the License.
# pylint:disable=redefined-outer-name
import math
import os
import time
import uuid
from datetime import date, datetime
from pathlib import Path
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse

import pyarrow as pa
Expand Down Expand Up @@ -135,15 +136,19 @@ def arrow_table_with_only_nulls(pa_schema: pa.Schema) -> pa.Table:
return pa.Table.from_pylist([{}, {}], schema=pa_schema)


def _create_table(session_catalog: Catalog, identifier: str, properties: Properties, data: List[pa.Table]) -> Table:
def _create_table(
session_catalog: Catalog, identifier: str, properties: Properties, data: Optional[List[pa.Table]] = None
) -> Table:
try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass

tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties=properties)
for d in data:
tbl.append(d)

if data:
for d in data:
tbl.append(d)

return tbl

Expand Down Expand Up @@ -667,3 +672,69 @@ def test_table_properties_raise_for_none_value(
session_catalog, identifier, {"format-version": format_version, **property_with_none}, [arrow_table_with_null]
)
assert "None type is not a supported value in properties: property_name" in str(exc_info.value)


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_inspect_snapshots(
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
) -> None:
identifier = "default.table_metadata_snapshots"
tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version})

tbl.overwrite(arrow_table_with_null)
# should produce a DELETE entry
tbl.overwrite(arrow_table_with_null)
# Since we don't rewrite, this should produce a new manifest with an ADDED entry
tbl.append(arrow_table_with_null)

df = tbl.inspect.snapshots()

assert df.column_names == [
'committed_at',
'snapshot_id',
'parent_id',
'operation',
'manifest_list',
'summary',
]

for committed_at in df['committed_at']:
assert isinstance(committed_at.as_py(), datetime)

for snapshot_id in df['snapshot_id']:
assert isinstance(snapshot_id.as_py(), int)

assert df['parent_id'][0].as_py() is None
assert df['parent_id'][1:] == df['snapshot_id'][:2]

assert [operation.as_py() for operation in df['operation']] == ['append', 'overwrite', 'append']

for manifest_list in df['manifest_list']:
assert manifest_list.as_py().startswith("s3://")

assert df['summary'][0].as_py() == [
('added-files-size', '5459'),
('added-data-files', '1'),
('added-records', '3'),
('total-data-files', '1'),
('total-delete-files', '0'),
('total-records', '3'),
('total-files-size', '5459'),
('total-position-deletes', '0'),
('total-equality-deletes', '0'),
]

lhs = spark.table(f"{identifier}.snapshots").toPandas()
rhs = df.to_pandas()
for column in df.column_names:
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
if column == 'summary':
# Arrow returns a list of tuples, instead of a dict
right = dict(right)

if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right):
# NaN != NaN in Python
continue

assert left == right, f"Difference in column {column}: {left} != {right}"

0 comments on commit 69b9e39

Please sign in to comment.