Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support partial deletes #569

Merged
merged 49 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
8d45920
Add option to delete datafiles
Fokko Apr 2, 2024
f6084a6
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Apr 2, 2024
87cc065
Pull in main
Fokko Apr 2, 2024
bc9c83e
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Apr 3, 2024
284d05a
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Apr 5, 2024
234d55b
WIP
Fokko Apr 5, 2024
aadc89c
Change DataScan to accept Metadata and io
Fokko Apr 5, 2024
7e59342
fix name-mapping issue
HonahX Apr 7, 2024
fbf6492
Merge pull request #1 from HonahX/honahx-update-datascan
Fokko Apr 8, 2024
c3fa7e7
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Apr 8, 2024
a925d69
Merge branch 'fd-update-datascan' of github.com:Fokko/iceberg-python …
Fokko Apr 8, 2024
5cec00a
WIP
Fokko Apr 8, 2024
a5e988a
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Apr 8, 2024
1723819
WIP
Fokko Apr 9, 2024
5025b4a
Moar tests
Fokko Apr 10, 2024
e474fda
Oops
Fokko Apr 11, 2024
172f9c0
Cleanup
Fokko Apr 16, 2024
a97c45a
WIP
Fokko Apr 16, 2024
74497fb
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Apr 17, 2024
47c9de1
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Apr 17, 2024
9c6724e
WIP
Fokko Apr 17, 2024
edff166
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Apr 18, 2024
c443af2
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Apr 18, 2024
7dae071
Fix summary generation
Fokko Apr 18, 2024
5e871fb
Last few bits
Fokko Apr 18, 2024
9910d29
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Apr 18, 2024
cd19f80
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Apr 25, 2024
edfadd9
Fix the requirement
Fokko Apr 25, 2024
d65a8a4
Make ruff happy
Fokko Apr 25, 2024
8849d97
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko May 2, 2024
3c98eef
Comments, thanks Kevin!
Fokko May 2, 2024
179fa27
Comments
Fokko May 9, 2024
2ea157e
Append rather than truncate
Fokko May 10, 2024
18392d1
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko May 10, 2024
7d036b1
Fix merge conflicts
Fokko May 10, 2024
5adf3f0
Make the tests pass
Fokko May 13, 2024
b3fcdcf
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko May 27, 2024
4ceacb8
Add another test
Fokko May 30, 2024
ddf6119
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko May 30, 2024
5b10f25
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko May 30, 2024
4cd67ac
Conflicts
Fokko May 30, 2024
5cdb363
Add docs (#33)
sungwy Jun 15, 2024
2252e71
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Jun 21, 2024
05fcf2d
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Jul 5, 2024
1ccb31d
Add a partitioned overwrite test
Fokko Jul 5, 2024
86432fe
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Jul 8, 2024
259f8c5
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-a…
Fokko Jul 9, 2024
96d6392
Fix comment
Fokko Jul 9, 2024
301bc82
Skip empty manifests
Fokko Jul 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 120 additions & 1 deletion pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,19 @@
import pyiceberg.expressions.parser as parser
from pyiceberg.exceptions import CommitFailedException, ResolveError, ValidationError
from pyiceberg.expressions import (
AlwaysFalse,
AlwaysTrue,
And,
BooleanExpression,
EqualTo,
Or,
Reference,
)
from pyiceberg.expressions.visitors import (
ROWS_CANNOT_MATCH,
ROWS_MUST_MATCH,
_InclusiveMetricsEvaluator,
_StrictMetricsEvaluator,
expression_evaluator,
inclusive_projection,
manifest_evaluator,
Expand Down Expand Up @@ -2726,6 +2731,112 @@ def _commit(self) -> UpdatesAndRequirements:
)


class DeleteFiles(_MergingSnapshotProducer):
_predicate: BooleanExpression

def __init__(
self,
operation: Operation,
transaction: Transaction,
io: FileIO,
commit_uuid: Optional[uuid.UUID] = None,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
):
super().__init__(operation, transaction, io, commit_uuid, snapshot_properties)
self._predicate = AlwaysFalse()

def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
schema = self._transaction.table_metadata.schema()
spec = self._transaction.table_metadata.specs()[spec_id]
project = inclusive_projection(schema, spec)
return project(self._predicate)

@cached_property
def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
return KeyDefaultDict(self._build_partition_projection)

def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
schema = self._transaction.table_metadata.schema()
spec = self._transaction.table_metadata.specs()[spec_id]
return manifest_evaluator(spec, schema, self.partition_filters[spec_id], case_sensitive=True)

def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]:
HonahX marked this conversation as resolved.
Show resolved Hide resolved
schema = self._transaction.table_metadata.schema()
spec = self._transaction.table_metadata.specs()[spec_id]
partition_type = spec.partition_type(schema)
partition_schema = Schema(*partition_type.fields)
partition_expr = self.partition_filters[spec_id]

return lambda data_file: expression_evaluator(partition_schema, partition_expr, case_sensitive=True)(data_file.partition)
Fokko marked this conversation as resolved.
Show resolved Hide resolved

def delete(self, predicate: BooleanExpression) -> None:
self._predicate = Or(self._predicate, predicate)

@cached_property
def _compute_deletes(self) -> Tuple[List[ManifestFile], List[ManifestEntry]]:
schema = self._transaction.table_metadata.schema()

def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> ManifestEntry:
return ManifestEntry(
status=status,
snapshot_id=entry.snapshot_id,
data_sequence_number=entry.data_sequence_number,
file_sequence_number=entry.file_sequence_number,
data_file=entry.data_file,
)

manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)
strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=True).eval
Fokko marked this conversation as resolved.
Show resolved Hide resolved
inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(schema, self._predicate, case_sensitive=True).eval

existing_manifests = []
total_deleted_entries = []
if snapshot := self._transaction.table_metadata.current_snapshot():
for num, manifest_file in enumerate(snapshot.manifests(io=self._io)):
if not manifest_evaluators[manifest_file.partition_spec_id](manifest_file):
# If the manifest isn't relevant, we can just keep it in the manifest-list
existing_manifests.append(manifest_file)
else:
# It is relevant, let's check out the content
deleted_entries = []
existing_entries = []
for entry in manifest_file.fetch_manifest_entry(io=self._io):
if strict_metrics_evaluator(entry.data_file) == ROWS_MUST_MATCH:
deleted_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.DELETED))
elif inclusive_metrics_evaluator(entry.data_file) == ROWS_CANNOT_MATCH:
existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING))
else:
raise ValueError("Deletes do not support rewrites of data files")

if len(deleted_entries) > 0:
total_deleted_entries += deleted_entries

# Rewrite the manifest
if len(existing_entries) > 0:
output_file_location = _new_manifest_path(
location=self._transaction.table_metadata.location, num=num, commit_uuid=self.commit_uuid
)
with write_manifest(
format_version=self._transaction.table_metadata.format_version,
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
schema=self._transaction.table_metadata.schema(),
output_file=self._io.new_output(output_file_location),
snapshot_id=self._snapshot_id,
) as writer:
for existing_entry in existing_entries:
writer.add_entry(existing_entry)
Fokko marked this conversation as resolved.
Show resolved Hide resolved
else:
existing_manifests.append(manifest_file)

return existing_manifests, total_deleted_entries

def _existing_manifests(self) -> List[ManifestFile]:
return self._compute_deletes[0]

def _deleted_entries(self) -> List[ManifestEntry]:
return self._compute_deletes[1]


class FastAppendFiles(_MergingSnapshotProducer):
def _existing_manifests(self) -> List[ManifestFile]:
"""To determine if there are any existing manifest files.
Expand Down Expand Up @@ -2803,7 +2914,7 @@ class UpdateSnapshot:
_io: FileIO
_snapshot_properties: Dict[str, str]

def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str]) -> None:
def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
self._transaction = transaction
self._io = io
self._snapshot_properties = snapshot_properties
Expand All @@ -2823,6 +2934,14 @@ def overwrite(self) -> OverwriteFiles:
snapshot_properties=self._snapshot_properties,
)

def delete(self) -> DeleteFiles:
return DeleteFiles(
operation=Operation.DELETE,
transaction=self._transaction,
io=self._io,
snapshot_properties=self._snapshot_properties,
)


class UpdateSpec(UpdateTableMetadata["UpdateSpec"]):
_transaction: Transaction
Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ def get_prop(prop: str) -> int:
def update_snapshot_summaries(
summary: Summary, previous_summary: Optional[Mapping[str, str]] = None, truncate_full_table: bool = False
) -> Summary:
if summary.operation not in {Operation.APPEND, Operation.OVERWRITE}:
if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE}:
raise ValueError(f"Operation not implemented: {summary.operation}")

if truncate_full_table and summary.operation == Operation.OVERWRITE and previous_summary is not None:
Expand Down
76 changes: 76 additions & 0 deletions tests/integration/test_deletes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint:disable=redefined-outer-name
import pytest
from pyspark.sql import DataFrame, SparkSession

from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.expressions import EqualTo


@pytest.fixture
def test_deletes_table(spark: SparkSession) -> DataFrame:
identifier = 'default.table_partitioned_delete'

spark.sql(f"DROP TABLE IF EXISTS {identifier}")

spark.sql(
f"""
CREATE TABLE {identifier} (
number_partitioned int,
number int
)
USING iceberg
PARTITIONED BY (number_partitioned)
"""
)
spark.sql(
f"""
INSERT INTO {identifier} VALUES (10, 20), (10, 30)
"""
)
spark.sql(
f"""
INSERT INTO {identifier} VALUES (11, 20), (11, 30)
"""
)

return spark.table(identifier)


def test_partition_deletes(test_deletes_table: DataFrame, session_catalog: RestCatalog) -> None:
identifier = 'default.table_partitioned_delete'

tbl = session_catalog.load_table(identifier)

with tbl.transaction() as txn:
with txn.update_snapshot().delete() as delete:
delete.delete(EqualTo("number_partitioned", 10))

assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [11, 11], 'number': [20, 30]}


def test_deletes(test_deletes_table: DataFrame, session_catalog: RestCatalog) -> None:
identifier = 'default.table_partitioned_delete'

tbl = session_catalog.load_table(identifier)

with tbl.transaction() as txn:
with txn.update_snapshot().delete() as delete:
delete.delete(EqualTo("number", 30))

assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [11, 11], 'number': [20, 30]}
Loading