Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Append/Overwrite API to accept snapshot properties #419

Merged
merged 13 commits into from
Mar 19, 2024
Merged
16 changes: 8 additions & 8 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,7 @@ def name_mapping(self) -> NameMapping:
else:
return create_mapping_from_schema(self.schema())

def append(self, df: pa.Table) -> None:
def append(self, df: pa.Table, **snapshot_properties) -> None:
Gowthami03B marked this conversation as resolved.
Show resolved Hide resolved
"""
Append data to the table.

Expand All @@ -1000,9 +1000,9 @@ def append(self, df: pa.Table) -> None:
for data_file in data_files:
merge.append_data_file(data_file)

merge.commit()
merge.commit(**snapshot_properties)

def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE) -> None:
def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, **snapshot_properties) -> None:
"""
Overwrite all the data in the table.

Expand Down Expand Up @@ -1036,7 +1036,7 @@ def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_T
for data_file in data_files:
merge.append_data_file(data_file)

merge.commit()
merge.commit(**snapshot_properties)

def refs(self) -> Dict[str, SnapshotRef]:
"""Return the snapshot references in the table."""
Expand Down Expand Up @@ -2474,7 +2474,7 @@ def _fetch_existing_manifests() -> List[ManifestFile]:

return added_manifests.result() + delete_manifests.result() + existing_manifests.result()

def _summary(self) -> Summary:
def _summary(self, **snapshot_properties) -> Summary:
ssc = SnapshotSummaryCollector()

for data_file in self._added_data_files:
Expand All @@ -2483,16 +2483,16 @@ def _summary(self) -> Summary:
previous_snapshot = self._table.snapshot_by_id(self._parent_snapshot_id) if self._parent_snapshot_id is not None else None

return update_snapshot_summaries(
summary=Summary(operation=self._operation, **ssc.build()),
summary=Summary(operation=self._operation, **ssc.build(), **snapshot_properties),
previous_summary=previous_snapshot.summary if previous_snapshot is not None else None,
truncate_full_table=self._operation == Operation.OVERWRITE,
)

def commit(self) -> Snapshot:
def commit(self, **snapshot_properties) -> Snapshot:
new_manifests = self._manifests()
next_sequence_number = self._table.next_sequence_number()

summary = self._summary()
summary = self._summary(**snapshot_properties)

manifest_list_file_path = _generate_manifest_list_path(
location=self._table.location(), snapshot_id=self._snapshot_id, attempt=0, commit_uuid=self._commit_uuid
Expand Down
26 changes: 26 additions & 0 deletions tests/catalog/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.io.pyarrow import schema_to_pyarrow
from pyiceberg.schema import Schema
from pyiceberg.types import IntegerType
from tests.conftest import BUCKET_NAME, TABLE_METADATA_LOCATION_REGEX
Expand Down Expand Up @@ -671,3 +672,28 @@ def test_commit_table_properties(
updated_table_metadata = table.metadata
assert test_catalog._parse_metadata_version(table.metadata_location) == 1
assert updated_table_metadata.properties == {"test_a": "test_aa", "test_c": "test_c"}

@mock_aws
def test_commit_table_snapshot_properties(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_simple: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier=identifier, schema=table_schema_simple)

assert test_catalog._parse_metadata_version(table.metadata_location) == 0

table.append(
pa.Table.from_pylist(
[{"foo": "foo_val", "bar": 1, "baz": False}],
schema=schema_to_pyarrow(table_schema_simple),
),
snapshot_prop_a="test_prop_a",
)

updated_table_metadata = table.metadata
assert test_catalog._parse_metadata_version(table.metadata_location) == 1
assert updated_table_metadata.snapshots[-1].summary.get("snapshot_prop_a") == "test_prop_a"
Gowthami03B marked this conversation as resolved.
Show resolved Hide resolved

Loading