From 69b9e39763304098bb1b2b2211acfbc850e15024 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Thu, 21 Mar 2024 21:08:19 +0100 Subject: [PATCH] Add Snapshots table metadata (#524) * Add Snapshots table metadata * Use Spark for tests --- mkdocs/docs/api.md | 32 +++++++++++++ pyiceberg/table/__init__.py | 51 +++++++++++++++++++++ tests/integration/test_writes.py | 79 ++++++++++++++++++++++++++++++-- 3 files changed, 158 insertions(+), 4 deletions(-) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 3cd0d312ff..dbf7da445e 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -319,6 +319,38 @@ table.append(df) +## Inspecting tables + +To explore the table metadata, tables can be inspected. + +### Snapshots + +Inspect the snapshots of the table: + +```python +table.inspect.snapshots() +``` + +``` +pyarrow.Table +committed_at: timestamp[ms] not null +snapshot_id: int64 not null +parent_id: int64 +operation: string +manifest_list: string not null +summary: map + child 0, entries: struct not null + child 0, key: string not null + child 1, value: string +---- +committed_at: [[2024-03-15 15:01:25.682,2024-03-15 15:01:25.730,2024-03-15 15:01:25.772]] +snapshot_id: [[805611270568163028,3679426539959220963,5588071473139865870]] +parent_id: [[null,805611270568163028,3679426539959220963]] +operation: [["append","overwrite","append"]] +manifest_list: [["s3://warehouse/default/table_metadata_snapshots/metadata/snap-805611270568163028-0-43637daf-ea4b-4ceb-b096-a60c25481eb5.avro","s3://warehouse/default/table_metadata_snapshots/metadata/snap-3679426539959220963-0-8be81019-adf1-4bb6-a127-e15217bd50b3.avro","s3://warehouse/default/table_metadata_snapshots/metadata/snap-5588071473139865870-0-1382dd7e-5fbc-4c51-9776-a832d7d0984e.avro"]] +summary: [[keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","1","0","3","5459","0","0"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-records",...,"total-equality-deletes","total-files-size","deleted-data-files","deleted-records","removed-files-size"]values:["5459","1","3","1","3",...,"0","5459","1","3","5459"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","2","0","6","10918","0","0"]]] +``` + ### Add Files Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them. diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 437a2640e8..cffd4188df 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -971,6 +971,11 @@ def transaction(self) -> Transaction: """ return Transaction(self) + @property + def inspect(self) -> InspectTable: + """Return the InspectTable object to browse the table metadata.""" + return InspectTable(self) + def refresh(self) -> Table: """Refresh the current table metadata.""" fresh = self.catalog.load_table(self.identifier[1:]) @@ -3046,3 +3051,49 @@ def _new_field_id(self) -> int: def _is_duplicate_partition(self, transform: Transform[Any, Any], partition_field: PartitionField) -> bool: return partition_field.field_id not in self._deletes and partition_field.transform == transform + + +class InspectTable: + tbl: Table + + def __init__(self, tbl: Table) -> None: + self.tbl = tbl + + try: + import pyarrow as pa # noqa + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For metadata operations PyArrow needs to be installed") from e + + def snapshots(self) -> "pa.Table": + import pyarrow as pa + + snapshots_schema = pa.schema([ + pa.field('committed_at', pa.timestamp(unit='ms'), nullable=False), + pa.field('snapshot_id', pa.int64(), nullable=False), + pa.field('parent_id', pa.int64(), nullable=True), + pa.field('operation', pa.string(), nullable=True), + pa.field('manifest_list', pa.string(), nullable=False), + pa.field('summary', pa.map_(pa.string(), pa.string()), nullable=True), + ]) + snapshots = [] + for snapshot in self.tbl.metadata.snapshots: + if summary := snapshot.summary: + operation = summary.operation.value + additional_properties = snapshot.summary.additional_properties + else: + operation = None + additional_properties = None + + snapshots.append({ + 'committed_at': datetime.datetime.utcfromtimestamp(snapshot.timestamp_ms / 1000.0), + 'snapshot_id': snapshot.snapshot_id, + 'parent_id': snapshot.parent_snapshot_id, + 'operation': str(operation), + 'manifest_list': snapshot.manifest_list, + 'summary': additional_properties, + }) + + return pa.Table.from_pylist( + snapshots, + schema=snapshots_schema, + ) diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index c7095cb71b..87c33d651b 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -15,12 +15,13 @@ # specific language governing permissions and limitations # under the License. # pylint:disable=redefined-outer-name +import math import os import time import uuid from datetime import date, datetime from pathlib import Path -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional from urllib.parse import urlparse import pyarrow as pa @@ -135,15 +136,19 @@ def arrow_table_with_only_nulls(pa_schema: pa.Schema) -> pa.Table: return pa.Table.from_pylist([{}, {}], schema=pa_schema) -def _create_table(session_catalog: Catalog, identifier: str, properties: Properties, data: List[pa.Table]) -> Table: +def _create_table( + session_catalog: Catalog, identifier: str, properties: Properties, data: Optional[List[pa.Table]] = None +) -> Table: try: session_catalog.drop_table(identifier=identifier) except NoSuchTableError: pass tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties=properties) - for d in data: - tbl.append(d) + + if data: + for d in data: + tbl.append(d) return tbl @@ -667,3 +672,69 @@ def test_table_properties_raise_for_none_value( session_catalog, identifier, {"format-version": format_version, **property_with_none}, [arrow_table_with_null] ) assert "None type is not a supported value in properties: property_name" in str(exc_info.value) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_snapshots( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.table_metadata_snapshots" + tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) + + tbl.overwrite(arrow_table_with_null) + # should produce a DELETE entry + tbl.overwrite(arrow_table_with_null) + # Since we don't rewrite, this should produce a new manifest with an ADDED entry + tbl.append(arrow_table_with_null) + + df = tbl.inspect.snapshots() + + assert df.column_names == [ + 'committed_at', + 'snapshot_id', + 'parent_id', + 'operation', + 'manifest_list', + 'summary', + ] + + for committed_at in df['committed_at']: + assert isinstance(committed_at.as_py(), datetime) + + for snapshot_id in df['snapshot_id']: + assert isinstance(snapshot_id.as_py(), int) + + assert df['parent_id'][0].as_py() is None + assert df['parent_id'][1:] == df['snapshot_id'][:2] + + assert [operation.as_py() for operation in df['operation']] == ['append', 'overwrite', 'append'] + + for manifest_list in df['manifest_list']: + assert manifest_list.as_py().startswith("s3://") + + assert df['summary'][0].as_py() == [ + ('added-files-size', '5459'), + ('added-data-files', '1'), + ('added-records', '3'), + ('total-data-files', '1'), + ('total-delete-files', '0'), + ('total-records', '3'), + ('total-files-size', '5459'), + ('total-position-deletes', '0'), + ('total-equality-deletes', '0'), + ] + + lhs = spark.table(f"{identifier}.snapshots").toPandas() + rhs = df.to_pandas() + for column in df.column_names: + for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): + if column == 'summary': + # Arrow returns a list of tuples, instead of a dict + right = dict(right) + + if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right): + # NaN != NaN in Python + continue + + assert left == right, f"Difference in column {column}: {left} != {right}"