Skip to content

Commit

Permalink
Support getting snapshot at or right before the given timestamp (#748)
Browse files Browse the repository at this point in the history
  • Loading branch information
chinmay-bhat authored Jun 3, 2024
1 parent e61ef57 commit 18448fd
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 1 deletion.
12 changes: 12 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1302,6 +1302,18 @@ def snapshot_by_name(self, name: str) -> Optional[Snapshot]:
return self.snapshot_by_id(ref.snapshot_id)
return None

def snapshot_as_of_timestamp(self, timestamp_ms: int, inclusive: bool = True) -> Optional[Snapshot]:
"""Get the snapshot that was current as of or right before the given timestamp, or None if there is no matching snapshot.
Args:
timestamp_ms: Find snapshot that was current at/before this timestamp
inclusive: Includes timestamp_ms in search when True. Excludes timestamp_ms when False
"""
for log_entry in reversed(self.history()):
if (inclusive and log_entry.timestamp_ms <= timestamp_ms) or log_entry.timestamp_ms < timestamp_ms:
return self.snapshot_by_id(log_entry.snapshot_id)
return None

def history(self) -> List[SnapshotLogEntry]:
"""Get the snapshot history of this table."""
return self.metadata.snapshot_log
Expand Down
16 changes: 15 additions & 1 deletion pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,22 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import time
from collections import defaultdict
from enum import Enum
from typing import Any, DefaultDict, Dict, List, Mapping, Optional
from typing import TYPE_CHECKING, Any, DefaultDict, Dict, Iterable, List, Mapping, Optional

from pydantic import Field, PrivateAttr, model_serializer

from pyiceberg.io import FileIO
from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, read_manifest_list
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema

if TYPE_CHECKING:
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.typedef import IcebergBaseModel

ADDED_DATA_FILES = "added-data-files"
Expand Down Expand Up @@ -412,3 +417,12 @@ def _update_totals(total_property: str, added_property: str, removed_property: s
def set_when_positive(properties: Dict[str, str], num: int, property_name: str) -> None:
if num > 0:
properties[property_name] = str(num)


def ancestors_of(current_snapshot: Optional[Snapshot], table_metadata: TableMetadata) -> Iterable[Snapshot]:
"""Get the ancestors of and including the given snapshot."""
if current_snapshot:
yield current_snapshot
if current_snapshot.parent_snapshot_id is not None:
if parent := table_metadata.snapshot_by_id(current_snapshot.parent_snapshot_id):
yield from ancestors_of(parent, table_metadata)
37 changes: 37 additions & 0 deletions tests/table/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
Snapshot,
SnapshotLogEntry,
Summary,
ancestors_of,
)
from pyiceberg.table.sorting import (
NullOrder,
Expand Down Expand Up @@ -204,6 +205,42 @@ def test_snapshot_by_id(table_v2: Table) -> None:
)


def test_snapshot_by_timestamp(table_v2: Table) -> None:
assert table_v2.snapshot_as_of_timestamp(1515100955770) == Snapshot(
snapshot_id=3051729675574597004,
parent_snapshot_id=None,
sequence_number=0,
timestamp_ms=1515100955770,
manifest_list="s3://a/b/1.avro",
summary=Summary(Operation.APPEND),
schema_id=None,
)
assert table_v2.snapshot_as_of_timestamp(1515100955770, inclusive=False) is None


def test_ancestors_of(table_v2: Table) -> None:
assert list(ancestors_of(table_v2.current_snapshot(), table_v2.metadata)) == [
Snapshot(
snapshot_id=3055729675574597004,
parent_snapshot_id=3051729675574597004,
sequence_number=1,
timestamp_ms=1555100955770,
manifest_list="s3://a/b/2.avro",
summary=Summary(Operation.APPEND),
schema_id=1,
),
Snapshot(
snapshot_id=3051729675574597004,
parent_snapshot_id=None,
sequence_number=0,
timestamp_ms=1515100955770,
manifest_list="s3://a/b/1.avro",
summary=Summary(Operation.APPEND),
schema_id=None,
),
]


def test_snapshot_by_id_does_not_exist(table_v2: Table) -> None:
assert table_v2.snapshot_by_id(-1) is None

Expand Down

0 comments on commit 18448fd

Please sign in to comment.