Skip to content

Row lineage fields for v3 #2129

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ class Snapshot(IcebergBaseModel):
manifest_list: str = Field(alias="manifest-list", description="Location of the snapshot's manifest list file")
summary: Optional[Summary] = Field(default=None)
schema_id: Optional[int] = Field(alias="schema-id", default=None)
first_row_id: Optional[int] = Field(
alias="first-row-id", default=None, description="assigned to the first row in the first data file in the first manifest"
)
_added_rows: Optional[int] = PrivateAttr()

def __str__(self) -> str:
"""Return the string representation of the Snapshot class."""
Expand All @@ -256,6 +260,10 @@ def manifests(self, io: FileIO) -> List[ManifestFile]:
"""Return the manifests for the given snapshot."""
return list(_manifests(io, self.manifest_list))

@property
def added_rows(self) -> Optional[int]:
return self._added_rows


class MetadataLogEntry(IcebergBaseModel):
metadata_file: str = Field(alias="metadata-file")
Expand Down
16 changes: 16 additions & 0 deletions pyiceberg/table/update/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,13 +418,29 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe
f"Cannot add snapshot with sequence number {update.snapshot.sequence_number} "
f"older than last sequence number {base_metadata.last_sequence_number}"
)
elif base_metadata.format_version >= 3 and update.snapshot.first_row_id is None:
raise ValueError("Cannot add snapshot without first row id")
elif (
base_metadata.format_version >= 3
and update.snapshot.first_row_id is not None
and base_metadata.next_row_id is not None
and update.snapshot.first_row_id < base_metadata.next_row_id
):
raise ValueError(
f"Cannot add a snapshot with first row id smaller than the table's next-row-id {update.snapshot.first_row_id} < {base_metadata.next_row_id}"
)

context.add_update(update)
return base_metadata.model_copy(
update={
"last_updated_ms": update.snapshot.timestamp_ms,
"last_sequence_number": update.snapshot.sequence_number,
"snapshots": base_metadata.snapshots + [update.snapshot],
"next_row_id": base_metadata.next_row_id + update.snapshot.added_rows
if base_metadata.format_version >= 3
and base_metadata.next_row_id is not None
and update.snapshot.added_rows is not None
else None,
}
)

Expand Down
19 changes: 19 additions & 0 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,19 @@ def delete_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]:
self._deleted_data_files.add(data_file)
return self

def _calculate_added_rows(self, manifests: List[ManifestFile]) -> int:
"""Calculate the number of added rows from a list of manifest files."""
added_rows = 0
for manifest in manifests:
if manifest.added_snapshot_id is None or manifest.added_snapshot_id == self._snapshot_id:
if manifest.added_rows_count is None:
raise ValueError(
"Cannot determine number of added rows in snapshot because "
f"the entry for manifest {manifest.manifest_path} is missing the field `added-rows-count`"
)
added_rows += manifest.added_rows_count
return added_rows

@abstractmethod
def _deleted_entries(self) -> List[ManifestEntry]: ...

Expand Down Expand Up @@ -269,13 +282,19 @@ def _commit(self) -> UpdatesAndRequirements:
) as writer:
writer.add_manifests(new_manifests)

first_row_id: Optional[int] = None

if self._transaction.table_metadata.format_version >= 3:
first_row_id = self._transaction.table_metadata.next_row_id

snapshot = Snapshot(
snapshot_id=self._snapshot_id,
parent_snapshot_id=self._parent_snapshot_id,
manifest_list=manifest_list_file_path,
sequence_number=next_sequence_number,
summary=summary,
schema_id=self._transaction.table_metadata.current_schema_id,
first_row_id=first_row_id,
)

return (
Expand Down
15 changes: 14 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
from pyiceberg.schema import Accessor, Schema
from pyiceberg.serializers import ToOutputFile
from pyiceberg.table import FileScanTask, Table
from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2
from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2, TableMetadataV3
from pyiceberg.types import (
BinaryType,
BooleanType,
Expand Down Expand Up @@ -906,6 +906,7 @@ def generate_snapshot(
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
"location": "s3://bucket/test/location",
"last-sequence-number": 34,
"next-row-id": 1,
"last-updated-ms": 1602638573590,
"last-column-id": 3,
"current-schema-id": 1,
Expand Down Expand Up @@ -2342,6 +2343,18 @@ def table_v2(example_table_metadata_v2: Dict[str, Any]) -> Table:
)


@pytest.fixture
def table_v3(example_table_metadata_v3: Dict[str, Any]) -> Table:
table_metadata = TableMetadataV3(**example_table_metadata_v3)
return Table(
identifier=("database", "table"),
metadata=table_metadata,
metadata_location=f"{table_metadata.location}/uuid.metadata.json",
io=load_file_io(),
catalog=NoopCatalog("NoopCatalog"),
)


@pytest.fixture
def table_v2_with_fixed_and_decimal_types(
table_metadata_v2_with_fixed_and_decimal_types: Dict[str, Any],
Expand Down
54 changes: 54 additions & 0 deletions tests/table/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -1345,3 +1345,57 @@ def test_remove_statistics_update(table_v2_with_statistics: Table) -> None:
table_v2_with_statistics.metadata,
(RemoveStatisticsUpdate(snapshot_id=123456789),),
)


def test_add_snapshot_update_fails_without_first_row_id(table_v3: Table) -> None:
new_snapshot = Snapshot(
snapshot_id=25,
parent_snapshot_id=19,
sequence_number=200,
timestamp_ms=1602638593590,
manifest_list="s3:/a/b/c.avro",
summary=Summary(Operation.APPEND),
schema_id=3,
)

with pytest.raises(
ValueError,
match="Cannot add snapshot without first row id",
):
update_table_metadata(table_v3.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),))


def test_add_snapshot_update_fails_with_smaller_first_row_id(table_v3: Table) -> None:
new_snapshot = Snapshot(
snapshot_id=25,
parent_snapshot_id=19,
sequence_number=200,
timestamp_ms=1602638593590,
manifest_list="s3:/a/b/c.avro",
summary=Summary(Operation.APPEND),
schema_id=3,
first_row_id=0,
)

with pytest.raises(
ValueError,
match="Cannot add a snapshot with first row id smaller than the table's next-row-id",
):
update_table_metadata(table_v3.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),))


def test_add_snapshot_update_updates_next_row_id(table_v3: Table) -> None:
new_snapshot = Snapshot(
snapshot_id=25,
parent_snapshot_id=19,
sequence_number=200,
timestamp_ms=1602638593590,
manifest_list="s3:/a/b/c.avro",
summary=Summary(Operation.APPEND),
schema_id=3,
first_row_id=2,
added_rows=10,
)

new_metadata = update_table_metadata(table_v3.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),))
assert new_metadata.next_row_id == 11
4 changes: 2 additions & 2 deletions tests/table/test_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,15 @@ def test_deserialize_snapshot_with_properties(snapshot_with_properties: Snapshot
def test_snapshot_repr(snapshot: Snapshot) -> None:
assert (
repr(snapshot)
== """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', summary=Summary(Operation.APPEND), schema_id=3)"""
== """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', summary=Summary(Operation.APPEND), schema_id=3, first_row_id=None)"""
)
assert snapshot == eval(repr(snapshot))


def test_snapshot_with_properties_repr(snapshot_with_properties: Snapshot) -> None:
assert (
repr(snapshot_with_properties)
== """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', summary=Summary(Operation.APPEND, **{'foo': 'bar'}), schema_id=3)"""
== """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', summary=Summary(Operation.APPEND, **{'foo': 'bar'}), schema_id=3, first_row_id=None)"""
)
assert snapshot_with_properties == eval(repr(snapshot_with_properties))

Expand Down
Loading