Skip to content

Commit

Permalink
move to snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
hililiwei committed Apr 30, 2024
1 parent 3517327 commit 57f6551
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 76 deletions.
5 changes: 1 addition & 4 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,10 +550,7 @@ def __init__(self, *data: Any, **named_data: Any) -> None:

def __eq__(self, other: Any) -> bool:
"""Return the equality of two instances of the ManifestFile class."""
if not isinstance(other, ManifestFile):
return False
else:
return self.manifest_path == other.manifest_path
return self.manifest_path == other.manifest_path if isinstance(other, ManifestFile) else False

def __hash__(self) -> int:
"""Return the hash of manifest_path."""
Expand Down
88 changes: 17 additions & 71 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@
SnapshotLogEntry,
SnapshotSummaryCollector,
Summary,
ancestors_between,
is_parent_ancestor_of,
update_snapshot_summaries,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
Expand Down Expand Up @@ -1679,10 +1681,6 @@ def snapshot(self) -> Optional[Snapshot]:
return self.table_metadata.snapshot_by_id(self.snapshot_id)
return self.table_metadata.current_snapshot()

def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
spec = self.table_metadata.specs()[spec_id]
return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive)

def projection(self) -> Schema:
current_schema = self.table_metadata.schema()
if self.snapshot_id is not None:
Expand All @@ -1703,41 +1701,6 @@ def projection(self) -> Schema:

return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive)

def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]:
spec = self.table_metadata.specs()[spec_id]
partition_type = spec.partition_type(self.table_metadata.schema())
partition_schema = Schema(*partition_type.fields)
partition_expr = self.partition_filters[spec_id]

# The lambda created here is run in multiple threads.
# So we avoid creating _EvaluatorExpression methods bound to a single
# shared instance across multiple threads.
return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition)

def _check_sequence_number(self, min_data_sequence_number: int, manifest: ManifestFile) -> bool:
"""Ensure that no manifests are loaded that contain deletes that are older than the data.
Args:
min_data_sequence_number (int): The minimal sequence number.
manifest (ManifestFile): A ManifestFile that can be either data or deletes.
Returns:
Boolean indicating if it is either a data file, or a relevant delete file.
"""
return manifest.content == ManifestContent.DATA or (
# Not interested in deletes that are older than the data
manifest.content == ManifestContent.DELETES
and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_data_sequence_number
)

def use_ref(self: S, name: str) -> S:
if self.snapshot_id: # type: ignore
raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}") # type: ignore
if snapshot := self.table_metadata.snapshot_by_name(name):
return self.update(snapshot_id=snapshot.snapshot_id)

raise ValueError(f"Cannot scan unknown ref={name}")

def plan_files(self) -> Iterable[FileScanTask]:
"""Plans the relevant files by filtering on the PartitionSpecs.
Expand Down Expand Up @@ -1825,6 +1788,14 @@ def to_arrow(self) -> pa.Table:
def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
return self.to_arrow().to_pandas(**kwargs)

def use_ref(self: S, name: str) -> S:
if self.snapshot_id: # type: ignore
raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}") # type: ignore
if snapshot := self.table_metadata.snapshot_by_name(name):
return self.update(snapshot_id=snapshot.snapshot_id)

raise ValueError(f"Cannot scan unknown ref={name}")

def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection:
import duckdb

Expand All @@ -1840,6 +1811,13 @@ def to_ray(self) -> ray.data.dataset.Dataset:


class BaseIncrementalScan(TableScan):
"""Base class for incremental scans.
Args:
to_snapshot_id: The end snapshot ID (inclusive).
from_snapshot_id_exclusive: The start snapshot ID (exclusive).
"""

to_snapshot_id: Optional[int]
from_snapshot_id_exclusive: Optional[int]

Expand Down Expand Up @@ -3913,35 +3891,3 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T
table_partitions: list[TablePartition] = _get_table_partitions(arrow_table, spec, schema, slice_instructions)

return table_partitions


def ancestors_between(to_snapshot: int, from_snapshot: Optional[int], table_metadata: TableMetadata) -> Iterable[Snapshot]:
if from_snapshot is not None:
for snapshot in ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata): # type: ignore
if snapshot.snapshot_id == from_snapshot:
break
yield snapshot
else:
yield from ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata) # type: ignore


def is_parent_ancestor_of(snapshot_id: int, ancestor_parent_snapshot_id: int, table_metadata: TableMetadata) -> bool:
for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore
if snapshot.parent_snapshot_id and snapshot.parent_snapshot_id == ancestor_parent_snapshot_id:
return True
return False


def oldest_ancestor_of(snapshot_id: int, table_metadata: TableMetadata) -> Optional[int]:
last_snapshot = None
for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore
last_snapshot = snapshot.snapshot_id
return last_snapshot


def ancestors_of(latest_snapshot: Snapshot, table_metadata: TableMetadata) -> Iterable[Snapshot]:
if latest_snapshot:
yield latest_snapshot
if latest_snapshot.parent_snapshot_id:
if parent := table_metadata.snapshot_by_id(latest_snapshot.parent_snapshot_id):
yield from ancestors_of(parent, table_metadata)
43 changes: 42 additions & 1 deletion pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,23 @@
import time
from collections import defaultdict
from enum import Enum
from typing import Any, DefaultDict, Dict, List, Mapping, Optional
from typing import (
Any,
DefaultDict,
Dict,
Iterable,
List,
Mapping,
Optional,
)

from pydantic import Field, PrivateAttr, model_serializer

from pyiceberg.io import FileIO
from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, read_manifest_list
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.typedef import IcebergBaseModel

ADDED_DATA_FILES = 'added-data-files'
Expand Down Expand Up @@ -412,3 +421,35 @@ def _update_totals(total_property: str, added_property: str, removed_property: s
def set_when_positive(properties: Dict[str, str], num: int, property_name: str) -> None:
if num > 0:
properties[property_name] = str(num)


def ancestors_between(to_snapshot: int, from_snapshot: Optional[int], table_metadata: TableMetadata) -> Iterable[Snapshot]:
if from_snapshot is not None:
for snapshot in ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata): # type: ignore
if snapshot.snapshot_id == from_snapshot:
break
yield snapshot
else:
yield from ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata) # type: ignore


def is_parent_ancestor_of(snapshot_id: int, ancestor_parent_snapshot_id: int, table_metadata: TableMetadata) -> bool:
for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore
if snapshot.parent_snapshot_id and snapshot.parent_snapshot_id == ancestor_parent_snapshot_id:
return True
return False


def oldest_ancestor_of(snapshot_id: int, table_metadata: TableMetadata) -> Optional[int]:
last_snapshot = None
for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore
last_snapshot = snapshot.snapshot_id
return last_snapshot


def ancestors_of(latest_snapshot: Snapshot, table_metadata: TableMetadata) -> Iterable[Snapshot]:
if latest_snapshot:
yield latest_snapshot
if latest_snapshot.parent_snapshot_id:
if parent := table_metadata.snapshot_by_id(latest_snapshot.parent_snapshot_id):
yield from ancestors_of(parent, table_metadata)

0 comments on commit 57f6551

Please sign in to comment.