From 57f65513f4435fcea97044bbe69e07a5e402b02e Mon Sep 17 00:00:00 2001 From: Liwei Li Date: Tue, 30 Apr 2024 18:20:19 +0800 Subject: [PATCH] move to snapshot --- pyiceberg/manifest.py | 5 +- pyiceberg/table/__init__.py | 88 +++++++----------------------------- pyiceberg/table/snapshots.py | 43 +++++++++++++++++- 3 files changed, 60 insertions(+), 76 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 48a35c100d..283e3a6436 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -550,10 +550,7 @@ def __init__(self, *data: Any, **named_data: Any) -> None: def __eq__(self, other: Any) -> bool: """Return the equality of two instances of the ManifestFile class.""" - if not isinstance(other, ManifestFile): - return False - else: - return self.manifest_path == other.manifest_path + return self.manifest_path == other.manifest_path if isinstance(other, ManifestFile) else False def __hash__(self) -> int: """Return the hash of manifest_path.""" diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 53b6da275e..d752cad8a8 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -112,6 +112,8 @@ SnapshotLogEntry, SnapshotSummaryCollector, Summary, + ancestors_between, + is_parent_ancestor_of, update_snapshot_summaries, ) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder @@ -1679,10 +1681,6 @@ def snapshot(self) -> Optional[Snapshot]: return self.table_metadata.snapshot_by_id(self.snapshot_id) return self.table_metadata.current_snapshot() - def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: - spec = self.table_metadata.specs()[spec_id] - return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive) - def projection(self) -> Schema: current_schema = self.table_metadata.schema() if self.snapshot_id is not None: @@ -1703,41 +1701,6 @@ def projection(self) -> Schema: return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) - def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]: - spec = self.table_metadata.specs()[spec_id] - partition_type = spec.partition_type(self.table_metadata.schema()) - partition_schema = Schema(*partition_type.fields) - partition_expr = self.partition_filters[spec_id] - - # The lambda created here is run in multiple threads. - # So we avoid creating _EvaluatorExpression methods bound to a single - # shared instance across multiple threads. - return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition) - - def _check_sequence_number(self, min_data_sequence_number: int, manifest: ManifestFile) -> bool: - """Ensure that no manifests are loaded that contain deletes that are older than the data. - - Args: - min_data_sequence_number (int): The minimal sequence number. - manifest (ManifestFile): A ManifestFile that can be either data or deletes. - - Returns: - Boolean indicating if it is either a data file, or a relevant delete file. - """ - return manifest.content == ManifestContent.DATA or ( - # Not interested in deletes that are older than the data - manifest.content == ManifestContent.DELETES - and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_data_sequence_number - ) - - def use_ref(self: S, name: str) -> S: - if self.snapshot_id: # type: ignore - raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}") # type: ignore - if snapshot := self.table_metadata.snapshot_by_name(name): - return self.update(snapshot_id=snapshot.snapshot_id) - - raise ValueError(f"Cannot scan unknown ref={name}") - def plan_files(self) -> Iterable[FileScanTask]: """Plans the relevant files by filtering on the PartitionSpecs. @@ -1825,6 +1788,14 @@ def to_arrow(self) -> pa.Table: def to_pandas(self, **kwargs: Any) -> pd.DataFrame: return self.to_arrow().to_pandas(**kwargs) + def use_ref(self: S, name: str) -> S: + if self.snapshot_id: # type: ignore + raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}") # type: ignore + if snapshot := self.table_metadata.snapshot_by_name(name): + return self.update(snapshot_id=snapshot.snapshot_id) + + raise ValueError(f"Cannot scan unknown ref={name}") + def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection: import duckdb @@ -1840,6 +1811,13 @@ def to_ray(self) -> ray.data.dataset.Dataset: class BaseIncrementalScan(TableScan): + """Base class for incremental scans. + + Args: + to_snapshot_id: The end snapshot ID (inclusive). + from_snapshot_id_exclusive: The start snapshot ID (exclusive). + """ + to_snapshot_id: Optional[int] from_snapshot_id_exclusive: Optional[int] @@ -3913,35 +3891,3 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T table_partitions: list[TablePartition] = _get_table_partitions(arrow_table, spec, schema, slice_instructions) return table_partitions - - -def ancestors_between(to_snapshot: int, from_snapshot: Optional[int], table_metadata: TableMetadata) -> Iterable[Snapshot]: - if from_snapshot is not None: - for snapshot in ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata): # type: ignore - if snapshot.snapshot_id == from_snapshot: - break - yield snapshot - else: - yield from ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata) # type: ignore - - -def is_parent_ancestor_of(snapshot_id: int, ancestor_parent_snapshot_id: int, table_metadata: TableMetadata) -> bool: - for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore - if snapshot.parent_snapshot_id and snapshot.parent_snapshot_id == ancestor_parent_snapshot_id: - return True - return False - - -def oldest_ancestor_of(snapshot_id: int, table_metadata: TableMetadata) -> Optional[int]: - last_snapshot = None - for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore - last_snapshot = snapshot.snapshot_id - return last_snapshot - - -def ancestors_of(latest_snapshot: Snapshot, table_metadata: TableMetadata) -> Iterable[Snapshot]: - if latest_snapshot: - yield latest_snapshot - if latest_snapshot.parent_snapshot_id: - if parent := table_metadata.snapshot_by_id(latest_snapshot.parent_snapshot_id): - yield from ancestors_of(parent, table_metadata) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index f74ac4b7d4..394203a4ea 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -17,7 +17,15 @@ import time from collections import defaultdict from enum import Enum -from typing import Any, DefaultDict, Dict, List, Mapping, Optional +from typing import ( + Any, + DefaultDict, + Dict, + Iterable, + List, + Mapping, + Optional, +) from pydantic import Field, PrivateAttr, model_serializer @@ -25,6 +33,7 @@ from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, read_manifest_list from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema +from pyiceberg.table.metadata import TableMetadata from pyiceberg.typedef import IcebergBaseModel ADDED_DATA_FILES = 'added-data-files' @@ -412,3 +421,35 @@ def _update_totals(total_property: str, added_property: str, removed_property: s def set_when_positive(properties: Dict[str, str], num: int, property_name: str) -> None: if num > 0: properties[property_name] = str(num) + + +def ancestors_between(to_snapshot: int, from_snapshot: Optional[int], table_metadata: TableMetadata) -> Iterable[Snapshot]: + if from_snapshot is not None: + for snapshot in ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata): # type: ignore + if snapshot.snapshot_id == from_snapshot: + break + yield snapshot + else: + yield from ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata) # type: ignore + + +def is_parent_ancestor_of(snapshot_id: int, ancestor_parent_snapshot_id: int, table_metadata: TableMetadata) -> bool: + for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore + if snapshot.parent_snapshot_id and snapshot.parent_snapshot_id == ancestor_parent_snapshot_id: + return True + return False + + +def oldest_ancestor_of(snapshot_id: int, table_metadata: TableMetadata) -> Optional[int]: + last_snapshot = None + for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore + last_snapshot = snapshot.snapshot_id + return last_snapshot + + +def ancestors_of(latest_snapshot: Snapshot, table_metadata: TableMetadata) -> Iterable[Snapshot]: + if latest_snapshot: + yield latest_snapshot + if latest_snapshot.parent_snapshot_id: + if parent := table_metadata.snapshot_by_id(latest_snapshot.parent_snapshot_id): + yield from ancestors_of(parent, table_metadata)