Skip to content

Commit

Permalink
Enable case_sensitive delete and overwrite
Browse files Browse the repository at this point in the history
  • Loading branch information
jiakai-li committed Dec 13, 2024
1 parent f842a6c commit f2123e3
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 11 deletions.
24 changes: 18 additions & 6 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ def overwrite(
self,
df: pa.Table,
overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
case_sensitive: bool = True,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
) -> None:
"""
Expand All @@ -436,6 +437,7 @@ def overwrite(
df: The Arrow dataframe that will be used to overwrite the table
overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
or a boolean expression in case of a partial overwrite
case_sensitive: A bool determine if the provided `overwrite_filter` is case-sensitive
snapshot_properties: Custom properties to be added to the snapshot summary
"""
try:
Expand All @@ -459,7 +461,7 @@ def overwrite(
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
)

self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties)
self.delete(delete_filter=overwrite_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties)

with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
# skip writing data files if the dataframe is empty
Expand All @@ -470,17 +472,23 @@ def overwrite(
for data_file in data_files:
update_snapshot.append_data_file(data_file)

def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
def delete(
self,
delete_filter: Union[str, BooleanExpression],
case_sensitive: bool = True,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
) -> None:
"""
Shorthand for deleting record from a table.
An deletee may produce zero or more snapshots based on the operation:
A delete may produce zero or more snapshots based on the operation:
- DELETE: In case existing Parquet files can be dropped completely.
- REPLACE: In case existing Parquet files need to be rewritten
Args:
delete_filter: A boolean expression to delete rows from a table
case_sensitive: A bool determine if the provided `delete_filter` is case-sensitive
snapshot_properties: Custom properties to be added to the snapshot summary
"""
from pyiceberg.io.pyarrow import (
Expand All @@ -503,7 +511,7 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti

# Check if there are any files that require an actual rewrite of a data file
if delete_snapshot.rewrites_needed is True:
bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive=True)
bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive)
preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter)

files = self._scan(row_filter=delete_filter).plan_files()
Expand Down Expand Up @@ -1008,17 +1016,21 @@ def overwrite(
tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties)

def delete(
self, delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT
self,
delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
case_sensitive: bool = True,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
) -> None:
"""
Shorthand for deleting rows from the table.
Args:
delete_filter: The predicate that used to remove rows
case_sensitive: A bool determine if the provided `delete_filter` is case-sensitive
snapshot_properties: Custom properties to be added to the snapshot summary
"""
with self.transaction() as tx:
tx.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties)
tx.delete(delete_filter=delete_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties)

def add_files(
self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
Expand Down
15 changes: 10 additions & 5 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]):
"""

_predicate: BooleanExpression
_case_sensitive: bool

def __init__(
self,
Expand All @@ -329,6 +330,7 @@ def __init__(
):
super().__init__(operation, transaction, io, commit_uuid, snapshot_properties)
self._predicate = AlwaysFalse()
self._case_sensitive = True

def _commit(self) -> UpdatesAndRequirements:
# Only produce a commit when there is something to delete
Expand All @@ -340,7 +342,7 @@ def _commit(self) -> UpdatesAndRequirements:
def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
schema = self._transaction.table_metadata.schema()
spec = self._transaction.table_metadata.specs()[spec_id]
project = inclusive_projection(schema, spec)
project = inclusive_projection(schema, spec, self._case_sensitive)
return project(self._predicate)

@cached_property
Expand All @@ -350,10 +352,11 @@ def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
schema = self._transaction.table_metadata.schema()
spec = self._transaction.table_metadata.specs()[spec_id]
return manifest_evaluator(spec, schema, self.partition_filters[spec_id], case_sensitive=True)
return manifest_evaluator(spec, schema, self.partition_filters[spec_id], self._case_sensitive)

def delete_by_predicate(self, predicate: BooleanExpression) -> None:
def delete_by_predicate(self, predicate: BooleanExpression, case_sensitive: bool = True) -> None:
self._predicate = Or(self._predicate, predicate)
self._case_sensitive = case_sensitive

@cached_property
def _compute_deletes(self) -> Tuple[List[ManifestFile], List[ManifestEntry], bool]:
Expand All @@ -376,8 +379,10 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
)

manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)
strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=True).eval
inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(schema, self._predicate, case_sensitive=True).eval
strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=self._case_sensitive).eval
inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(
schema, self._predicate, case_sensitive=self._case_sensitive
).eval

existing_manifests = []
total_deleted_entries = []
Expand Down
20 changes: 20 additions & 0 deletions tests/table/test_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
# pylint:disable=redefined-outer-name,eval-used
import pytest

from pyiceberg.expressions import EqualTo
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import Table
from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2
from pyiceberg.table.snapshots import Operation, Snapshot, SnapshotSummaryCollector, Summary, update_snapshot_summaries
from pyiceberg.transforms import IdentityTransform
from pyiceberg.typedef import Record
Expand Down Expand Up @@ -341,3 +344,20 @@ def test_invalid_type() -> None:
)

assert "Could not parse summary property total-data-files to an int: abc" in str(e.value)


@pytest.mark.parametrize("case_sensitive", [True, False])
def test_delete_table_rows_case_sensitive(
case_sensitive: bool, table_v1: Table, table_v2: Table, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.setattr(TableMetadataV1, "current_snapshot", lambda _: None)
monkeypatch.setattr(TableMetadataV2, "current_snapshot", lambda _: None)
for table in [table_v1, table_v2]:
delete_file = table.transaction().update_snapshot().delete()
delete_file.delete_by_predicate(predicate=EqualTo("X", 10), case_sensitive=case_sensitive)
if case_sensitive:
with pytest.raises(ValueError) as e:
_ = delete_file._compute_deletes
assert "Could not find field with name X" in str(e.value)
else:
_ = delete_file._compute_deletes

0 comments on commit f2123e3

Please sign in to comment.