From f2123e3190c875f1b044918572ca7a0cc42bb102 Mon Sep 17 00:00:00 2001 From: Jiakai Li Date: Thu, 12 Dec 2024 09:18:00 +1300 Subject: [PATCH] Enable `case_sensitive` delete and overwrite --- pyiceberg/table/__init__.py | 24 ++++++++++++++++++------ pyiceberg/table/update/snapshot.py | 15 ++++++++++----- tests/table/test_snapshots.py | 20 ++++++++++++++++++++ 3 files changed, 48 insertions(+), 11 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index c224e9edc3..d2e2d3fdf2 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -421,6 +421,7 @@ def overwrite( self, df: pa.Table, overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, + case_sensitive: bool = True, snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: """ @@ -436,6 +437,7 @@ def overwrite( df: The Arrow dataframe that will be used to overwrite the table overwrite_filter: ALWAYS_TRUE when you overwrite all the data, or a boolean expression in case of a partial overwrite + case_sensitive: A bool determine if the provided `overwrite_filter` is case-sensitive snapshot_properties: Custom properties to be added to the snapshot summary """ try: @@ -459,7 +461,7 @@ def overwrite( self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us ) - self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties) + self.delete(delete_filter=overwrite_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties) with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot: # skip writing data files if the dataframe is empty @@ -470,17 +472,23 @@ def overwrite( for data_file in data_files: update_snapshot.append_data_file(data_file) - def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + def delete( + self, + delete_filter: Union[str, BooleanExpression], + case_sensitive: bool = True, + snapshot_properties: Dict[str, str] = EMPTY_DICT, + ) -> None: """ Shorthand for deleting record from a table. - An deletee may produce zero or more snapshots based on the operation: + A delete may produce zero or more snapshots based on the operation: - DELETE: In case existing Parquet files can be dropped completely. - REPLACE: In case existing Parquet files need to be rewritten Args: delete_filter: A boolean expression to delete rows from a table + case_sensitive: A bool determine if the provided `delete_filter` is case-sensitive snapshot_properties: Custom properties to be added to the snapshot summary """ from pyiceberg.io.pyarrow import ( @@ -503,7 +511,7 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti # Check if there are any files that require an actual rewrite of a data file if delete_snapshot.rewrites_needed is True: - bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive=True) + bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive) preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter) files = self._scan(row_filter=delete_filter).plan_files() @@ -1008,17 +1016,21 @@ def overwrite( tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties) def delete( - self, delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT + self, + delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, + case_sensitive: bool = True, + snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: """ Shorthand for deleting rows from the table. Args: delete_filter: The predicate that used to remove rows + case_sensitive: A bool determine if the provided `delete_filter` is case-sensitive snapshot_properties: Custom properties to be added to the snapshot summary """ with self.transaction() as tx: - tx.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties) + tx.delete(delete_filter=delete_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties) def add_files( self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 47e5fc55e3..c0d0056e7c 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -318,6 +318,7 @@ class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]): """ _predicate: BooleanExpression + _case_sensitive: bool def __init__( self, @@ -329,6 +330,7 @@ def __init__( ): super().__init__(operation, transaction, io, commit_uuid, snapshot_properties) self._predicate = AlwaysFalse() + self._case_sensitive = True def _commit(self) -> UpdatesAndRequirements: # Only produce a commit when there is something to delete @@ -340,7 +342,7 @@ def _commit(self) -> UpdatesAndRequirements: def _build_partition_projection(self, spec_id: int) -> BooleanExpression: schema = self._transaction.table_metadata.schema() spec = self._transaction.table_metadata.specs()[spec_id] - project = inclusive_projection(schema, spec) + project = inclusive_projection(schema, spec, self._case_sensitive) return project(self._predicate) @cached_property @@ -350,10 +352,11 @@ def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: schema = self._transaction.table_metadata.schema() spec = self._transaction.table_metadata.specs()[spec_id] - return manifest_evaluator(spec, schema, self.partition_filters[spec_id], case_sensitive=True) + return manifest_evaluator(spec, schema, self.partition_filters[spec_id], self._case_sensitive) - def delete_by_predicate(self, predicate: BooleanExpression) -> None: + def delete_by_predicate(self, predicate: BooleanExpression, case_sensitive: bool = True) -> None: self._predicate = Or(self._predicate, predicate) + self._case_sensitive = case_sensitive @cached_property def _compute_deletes(self) -> Tuple[List[ManifestFile], List[ManifestEntry], bool]: @@ -376,8 +379,10 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> ) manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) - strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=True).eval - inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(schema, self._predicate, case_sensitive=True).eval + strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=self._case_sensitive).eval + inclusive_metrics_evaluator = _InclusiveMetricsEvaluator( + schema, self._predicate, case_sensitive=self._case_sensitive + ).eval existing_manifests = [] total_deleted_entries = [] diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index b4dde217d4..a9e87b4fd4 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -17,9 +17,12 @@ # pylint:disable=redefined-outer-name,eval-used import pytest +from pyiceberg.expressions import EqualTo from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema +from pyiceberg.table import Table +from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2 from pyiceberg.table.snapshots import Operation, Snapshot, SnapshotSummaryCollector, Summary, update_snapshot_summaries from pyiceberg.transforms import IdentityTransform from pyiceberg.typedef import Record @@ -341,3 +344,20 @@ def test_invalid_type() -> None: ) assert "Could not parse summary property total-data-files to an int: abc" in str(e.value) + + +@pytest.mark.parametrize("case_sensitive", [True, False]) +def test_delete_table_rows_case_sensitive( + case_sensitive: bool, table_v1: Table, table_v2: Table, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setattr(TableMetadataV1, "current_snapshot", lambda _: None) + monkeypatch.setattr(TableMetadataV2, "current_snapshot", lambda _: None) + for table in [table_v1, table_v2]: + delete_file = table.transaction().update_snapshot().delete() + delete_file.delete_by_predicate(predicate=EqualTo("X", 10), case_sensitive=case_sensitive) + if case_sensitive: + with pytest.raises(ValueError) as e: + _ = delete_file._compute_deletes + assert "Could not find field with name X" in str(e.value) + else: + _ = delete_file._compute_deletes