diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 8d1a24c420..b25588a5fa 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -243,6 +243,10 @@ class Snapshot(IcebergBaseModel): manifest_list: str = Field(alias="manifest-list", description="Location of the snapshot's manifest list file") summary: Optional[Summary] = Field(default=None) schema_id: Optional[int] = Field(alias="schema-id", default=None) + first_row_id: Optional[int] = Field( + alias="first-row-id", default=None, description="assigned to the first row in the first data file in the first manifest" + ) + _added_rows: Optional[int] = PrivateAttr() def __str__(self) -> str: """Return the string representation of the Snapshot class.""" @@ -256,6 +260,10 @@ def manifests(self, io: FileIO) -> List[ManifestFile]: """Return the manifests for the given snapshot.""" return list(_manifests(io, self.manifest_list)) + @property + def added_rows(self) -> Optional[int]: + return self._added_rows + class MetadataLogEntry(IcebergBaseModel): metadata_file: str = Field(alias="metadata-file") diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 4905c31bfb..4b2837c831 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -418,6 +418,17 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe f"Cannot add snapshot with sequence number {update.snapshot.sequence_number} " f"older than last sequence number {base_metadata.last_sequence_number}" ) + elif base_metadata.format_version >= 3 and update.snapshot.first_row_id is None: + raise ValueError("Cannot add snapshot without first row id") + elif ( + base_metadata.format_version >= 3 + and update.snapshot.first_row_id is not None + and base_metadata.next_row_id is not None + and update.snapshot.first_row_id < base_metadata.next_row_id + ): + raise ValueError( + f"Cannot add a snapshot with first row id smaller than the table's next-row-id {update.snapshot.first_row_id} < {base_metadata.next_row_id}" + ) context.add_update(update) return base_metadata.model_copy( @@ -425,6 +436,11 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe "last_updated_ms": update.snapshot.timestamp_ms, "last_sequence_number": update.snapshot.sequence_number, "snapshots": base_metadata.snapshots + [update.snapshot], + "next_row_id": base_metadata.next_row_id + update.snapshot.added_rows + if base_metadata.format_version >= 3 + and base_metadata.next_row_id is not None + and update.snapshot.added_rows is not None + else None, } ) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 08c4f5d0bf..7fc240f5c4 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -142,6 +142,19 @@ def delete_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]: self._deleted_data_files.add(data_file) return self + def _calculate_added_rows(self, manifests: List[ManifestFile]) -> int: + """Calculate the number of added rows from a list of manifest files.""" + added_rows = 0 + for manifest in manifests: + if manifest.added_snapshot_id is None or manifest.added_snapshot_id == self._snapshot_id: + if manifest.added_rows_count is None: + raise ValueError( + "Cannot determine number of added rows in snapshot because " + f"the entry for manifest {manifest.manifest_path} is missing the field `added-rows-count`" + ) + added_rows += manifest.added_rows_count + return added_rows + @abstractmethod def _deleted_entries(self) -> List[ManifestEntry]: ... @@ -269,6 +282,11 @@ def _commit(self) -> UpdatesAndRequirements: ) as writer: writer.add_manifests(new_manifests) + first_row_id: Optional[int] = None + + if self._transaction.table_metadata.format_version >= 3: + first_row_id = self._transaction.table_metadata.next_row_id + snapshot = Snapshot( snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, @@ -276,6 +294,7 @@ def _commit(self) -> UpdatesAndRequirements: sequence_number=next_sequence_number, summary=summary, schema_id=self._transaction.table_metadata.current_schema_id, + first_row_id=first_row_id, ) return ( diff --git a/tests/conftest.py b/tests/conftest.py index 729e29cb0c..c9abaadac2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -64,7 +64,7 @@ from pyiceberg.schema import Accessor, Schema from pyiceberg.serializers import ToOutputFile from pyiceberg.table import FileScanTask, Table -from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2 +from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2, TableMetadataV3 from pyiceberg.types import ( BinaryType, BooleanType, @@ -906,6 +906,7 @@ def generate_snapshot( "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", "location": "s3://bucket/test/location", "last-sequence-number": 34, + "next-row-id": 1, "last-updated-ms": 1602638573590, "last-column-id": 3, "current-schema-id": 1, @@ -2342,6 +2343,18 @@ def table_v2(example_table_metadata_v2: Dict[str, Any]) -> Table: ) +@pytest.fixture +def table_v3(example_table_metadata_v3: Dict[str, Any]) -> Table: + table_metadata = TableMetadataV3(**example_table_metadata_v3) + return Table( + identifier=("database", "table"), + metadata=table_metadata, + metadata_location=f"{table_metadata.location}/uuid.metadata.json", + io=load_file_io(), + catalog=NoopCatalog("NoopCatalog"), + ) + + @pytest.fixture def table_v2_with_fixed_and_decimal_types( table_metadata_v2_with_fixed_and_decimal_types: Dict[str, Any], diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 6165dadec4..a13bc8f45b 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -1345,3 +1345,57 @@ def test_remove_statistics_update(table_v2_with_statistics: Table) -> None: table_v2_with_statistics.metadata, (RemoveStatisticsUpdate(snapshot_id=123456789),), ) + + +def test_add_snapshot_update_fails_without_first_row_id(table_v3: Table) -> None: + new_snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + sequence_number=200, + timestamp_ms=1602638593590, + manifest_list="s3:/a/b/c.avro", + summary=Summary(Operation.APPEND), + schema_id=3, + ) + + with pytest.raises( + ValueError, + match="Cannot add snapshot without first row id", + ): + update_table_metadata(table_v3.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),)) + + +def test_add_snapshot_update_fails_with_smaller_first_row_id(table_v3: Table) -> None: + new_snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + sequence_number=200, + timestamp_ms=1602638593590, + manifest_list="s3:/a/b/c.avro", + summary=Summary(Operation.APPEND), + schema_id=3, + first_row_id=0, + ) + + with pytest.raises( + ValueError, + match="Cannot add a snapshot with first row id smaller than the table's next-row-id", + ): + update_table_metadata(table_v3.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),)) + + +def test_add_snapshot_update_updates_next_row_id(table_v3: Table) -> None: + new_snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + sequence_number=200, + timestamp_ms=1602638593590, + manifest_list="s3:/a/b/c.avro", + summary=Summary(Operation.APPEND), + schema_id=3, + first_row_id=2, + added_rows=10, + ) + + new_metadata = update_table_metadata(table_v3.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),)) + assert new_metadata.next_row_id == 11 diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index b71d92aa55..1b8bd9565a 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -139,7 +139,7 @@ def test_deserialize_snapshot_with_properties(snapshot_with_properties: Snapshot def test_snapshot_repr(snapshot: Snapshot) -> None: assert ( repr(snapshot) - == """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', summary=Summary(Operation.APPEND), schema_id=3)""" + == """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', summary=Summary(Operation.APPEND), schema_id=3, first_row_id=None)""" ) assert snapshot == eval(repr(snapshot)) @@ -147,7 +147,7 @@ def test_snapshot_repr(snapshot: Snapshot) -> None: def test_snapshot_with_properties_repr(snapshot_with_properties: Snapshot) -> None: assert ( repr(snapshot_with_properties) - == """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', summary=Summary(Operation.APPEND, **{'foo': 'bar'}), schema_id=3)""" + == """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', summary=Summary(Operation.APPEND, **{'foo': 'bar'}), schema_id=3, first_row_id=None)""" ) assert snapshot_with_properties == eval(repr(snapshot_with_properties))