Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix table delete case sensitive #1

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ def overwrite(
self,
df: pa.Table,
overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
case_sensitive: bool = True,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
) -> None:
"""
Expand All @@ -436,6 +437,7 @@ def overwrite(
df: The Arrow dataframe that will be used to overwrite the table
overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
or a boolean expression in case of a partial overwrite
case_sensitive: A bool determine if the provided `overwrite_filter` is case-sensitive
snapshot_properties: Custom properties to be added to the snapshot summary
"""
try:
Expand All @@ -459,7 +461,7 @@ def overwrite(
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
)

self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties)
self.delete(delete_filter=overwrite_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties)

with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
# skip writing data files if the dataframe is empty
Expand All @@ -470,17 +472,23 @@ def overwrite(
for data_file in data_files:
update_snapshot.append_data_file(data_file)

def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
def delete(
self,
delete_filter: Union[str, BooleanExpression],
case_sensitive: bool = True,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
) -> None:
"""
Shorthand for deleting record from a table.

An deletee may produce zero or more snapshots based on the operation:
A delete may produce zero or more snapshots based on the operation:

- DELETE: In case existing Parquet files can be dropped completely.
- REPLACE: In case existing Parquet files need to be rewritten

Args:
delete_filter: A boolean expression to delete rows from a table
case_sensitive: A bool determine if the provided `delete_filter` is case-sensitive
snapshot_properties: Custom properties to be added to the snapshot summary
"""
from pyiceberg.io.pyarrow import (
Expand All @@ -503,7 +511,7 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti

# Check if there are any files that require an actual rewrite of a data file
if delete_snapshot.rewrites_needed is True:
bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive=True)
bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive)
preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter)

files = self._scan(row_filter=delete_filter).plan_files()
Expand Down Expand Up @@ -1008,17 +1016,21 @@ def overwrite(
tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties)

def delete(
self, delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT
self,
delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
case_sensitive: bool = True,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
) -> None:
"""
Shorthand for deleting rows from the table.

Args:
delete_filter: The predicate that used to remove rows
case_sensitive: A bool determine if the provided `delete_filter` is case-sensitive
snapshot_properties: Custom properties to be added to the snapshot summary
"""
with self.transaction() as tx:
tx.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties)
tx.delete(delete_filter=delete_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties)

def add_files(
self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
Expand Down Expand Up @@ -1311,7 +1323,7 @@ def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_ent

class DataScan(TableScan):
def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id])
project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive)
return project(self.row_filter)

@cached_property
Expand Down
13 changes: 9 additions & 4 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]):
"""

_predicate: BooleanExpression
_case_sensitive: bool

def __init__(
self,
Expand All @@ -329,6 +330,7 @@ def __init__(
):
super().__init__(operation, transaction, io, commit_uuid, snapshot_properties)
self._predicate = AlwaysFalse()
self._case_sensitive = True

def _commit(self) -> UpdatesAndRequirements:
# Only produce a commit when there is something to delete
Expand All @@ -340,7 +342,7 @@ def _commit(self) -> UpdatesAndRequirements:
def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
schema = self._transaction.table_metadata.schema()
spec = self._transaction.table_metadata.specs()[spec_id]
project = inclusive_projection(schema, spec)
project = inclusive_projection(schema, spec, self._case_sensitive)
return project(self._predicate)

@cached_property
Expand All @@ -352,8 +354,9 @@ def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bo
spec = self._transaction.table_metadata.specs()[spec_id]
return manifest_evaluator(spec, schema, self.partition_filters[spec_id], case_sensitive=True)

def delete_by_predicate(self, predicate: BooleanExpression) -> None:
def delete_by_predicate(self, predicate: BooleanExpression, case_sensitive: bool = True) -> None:
self._predicate = Or(self._predicate, predicate)
self._case_sensitive = case_sensitive

@cached_property
def _compute_deletes(self) -> Tuple[List[ManifestFile], List[ManifestEntry], bool]:
Expand All @@ -376,8 +379,10 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
)

manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)
strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=True).eval
inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(schema, self._predicate, case_sensitive=True).eval
strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=self._case_sensitive).eval
inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(
schema, self._predicate, case_sensitive=self._case_sensitive
).eval

existing_manifests = []
total_deleted_entries = []
Expand Down
13 changes: 13 additions & 0 deletions tests/table/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,19 @@ def test_table_scan_row_filter(table_v2: Table) -> None:
assert scan.filter(EqualTo("x", 10)).filter(In("y", (10, 11))).row_filter == And(EqualTo("x", 10), In("y", (10, 11)))


def test_table_scan_partition_filters_case_sensitive(table_v2: Table) -> None:
scan = table_v2.scan(row_filter=EqualTo("X", 10), case_sensitive=True)
with pytest.raises(ValueError):
for i in range(len(table_v2.metadata.specs())):
_ = scan.partition_filters[i]


def test_table_scan_partition_filters_case_insensitive(table_v2: Table) -> None:
scan = table_v2.scan(row_filter=EqualTo("X", 10), case_sensitive=False)
for i in range(len(table_v2.metadata.specs())):
_ = scan.partition_filters[i]


def test_table_scan_ref(table_v2: Table) -> None:
scan = table_v2.scan()
assert scan.use_ref("test").snapshot_id == 3051729675574597004
Expand Down