diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index f160ab2441..2d4b342461 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1302,6 +1302,18 @@ def snapshot_by_name(self, name: str) -> Optional[Snapshot]: return self.snapshot_by_id(ref.snapshot_id) return None + def snapshot_as_of_timestamp(self, timestamp_ms: int, inclusive: bool = True) -> Optional[Snapshot]: + """Get the snapshot that was current as of or right before the given timestamp, or None if there is no matching snapshot. + + Args: + timestamp_ms: Find snapshot that was current at/before this timestamp + inclusive: Includes timestamp_ms in search when True. Excludes timestamp_ms when False + """ + for log_entry in reversed(self.history()): + if (inclusive and log_entry.timestamp_ms <= timestamp_ms) or log_entry.timestamp_ms < timestamp_ms: + return self.snapshot_by_id(log_entry.snapshot_id) + return None + def history(self) -> List[SnapshotLogEntry]: """Get the snapshot history of this table.""" return self.metadata.snapshot_log diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index e2ce3fe4f1..b21a0f5613 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -14,10 +14,12 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from __future__ import annotations + import time from collections import defaultdict from enum import Enum -from typing import Any, DefaultDict, Dict, List, Mapping, Optional +from typing import TYPE_CHECKING, Any, DefaultDict, Dict, Iterable, List, Mapping, Optional from pydantic import Field, PrivateAttr, model_serializer @@ -25,6 +27,9 @@ from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, read_manifest_list from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema + +if TYPE_CHECKING: + from pyiceberg.table.metadata import TableMetadata from pyiceberg.typedef import IcebergBaseModel ADDED_DATA_FILES = "added-data-files" @@ -412,3 +417,12 @@ def _update_totals(total_property: str, added_property: str, removed_property: s def set_when_positive(properties: Dict[str, str], num: int, property_name: str) -> None: if num > 0: properties[property_name] = str(num) + + +def ancestors_of(current_snapshot: Optional[Snapshot], table_metadata: TableMetadata) -> Iterable[Snapshot]: + """Get the ancestors of and including the given snapshot.""" + if current_snapshot: + yield current_snapshot + if current_snapshot.parent_snapshot_id is not None: + if parent := table_metadata.snapshot_by_id(current_snapshot.parent_snapshot_id): + yield from ancestors_of(parent, table_metadata) diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 11d50db8a5..20b77b6abd 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -76,6 +76,7 @@ Snapshot, SnapshotLogEntry, Summary, + ancestors_of, ) from pyiceberg.table.sorting import ( NullOrder, @@ -204,6 +205,42 @@ def test_snapshot_by_id(table_v2: Table) -> None: ) +def test_snapshot_by_timestamp(table_v2: Table) -> None: + assert table_v2.snapshot_as_of_timestamp(1515100955770) == Snapshot( + snapshot_id=3051729675574597004, + parent_snapshot_id=None, + sequence_number=0, + timestamp_ms=1515100955770, + manifest_list="s3://a/b/1.avro", + summary=Summary(Operation.APPEND), + schema_id=None, + ) + assert table_v2.snapshot_as_of_timestamp(1515100955770, inclusive=False) is None + + +def test_ancestors_of(table_v2: Table) -> None: + assert list(ancestors_of(table_v2.current_snapshot(), table_v2.metadata)) == [ + Snapshot( + snapshot_id=3055729675574597004, + parent_snapshot_id=3051729675574597004, + sequence_number=1, + timestamp_ms=1555100955770, + manifest_list="s3://a/b/2.avro", + summary=Summary(Operation.APPEND), + schema_id=1, + ), + Snapshot( + snapshot_id=3051729675574597004, + parent_snapshot_id=None, + sequence_number=0, + timestamp_ms=1515100955770, + manifest_list="s3://a/b/1.avro", + summary=Summary(Operation.APPEND), + schema_id=None, + ), + ] + + def test_snapshot_by_id_does_not_exist(table_v2: Table) -> None: assert table_v2.snapshot_by_id(-1) is None