Skip to content

Commit e891bcd

Browse files
authored
Fix tracing existing entries when there are deletes (#1046)
1 parent 5cce906 commit e891bcd

File tree

2 files changed

+85
-10
lines changed

2 files changed

+85
-10
lines changed

pyiceberg/table/__init__.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262
Reference,
6363
)
6464
from pyiceberg.expressions.visitors import (
65-
ROWS_CANNOT_MATCH,
65+
ROWS_MIGHT_NOT_MATCH,
6666
ROWS_MUST_MATCH,
6767
_InclusiveMetricsEvaluator,
6868
_StrictMetricsEvaluator,
@@ -3379,13 +3379,14 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
33793379
existing_entries = []
33803380
for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True):
33813381
if strict_metrics_evaluator(entry.data_file) == ROWS_MUST_MATCH:
3382+
# Based on the metadata, it can be dropped right away
33823383
deleted_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.DELETED))
33833384
self._deleted_data_files.add(entry.data_file)
3384-
elif inclusive_metrics_evaluator(entry.data_file) == ROWS_CANNOT_MATCH:
3385-
existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING))
33863385
else:
3387-
# Based on the metadata, it is unsure to say if the file can be deleted
3388-
partial_rewrites_needed = True
3386+
# Based on the metadata, we cannot determine if it can be deleted
3387+
existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING))
3388+
if inclusive_metrics_evaluator(entry.data_file) != ROWS_MIGHT_NOT_MATCH:
3389+
partial_rewrites_needed = True
33893390

33903391
if len(deleted_entries) > 0:
33913392
total_deleted_entries += deleted_entries
@@ -3402,8 +3403,6 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
34023403
for existing_entry in existing_entries:
34033404
writer.add_entry(existing_entry)
34043405
existing_manifests.append(writer.to_manifest_file())
3405-
# else:
3406-
# deleted_manifests.append()
34073406
else:
34083407
existing_manifests.append(manifest_file)
34093408
else:

tests/integration/test_writes/test_writes.py

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from typing import Any, Dict
2424
from urllib.parse import urlparse
2525

26+
import numpy as np
2627
import pandas as pd
2728
import pyarrow as pa
2829
import pyarrow.parquet as pq
@@ -38,13 +39,20 @@
3839
from pyiceberg.catalog.rest import RestCatalog
3940
from pyiceberg.catalog.sql import SqlCatalog
4041
from pyiceberg.exceptions import NoSuchTableError
41-
from pyiceberg.expressions import In
42+
from pyiceberg.expressions import GreaterThanOrEqual, In, Not
4243
from pyiceberg.io.pyarrow import _dataframe_to_data_files
4344
from pyiceberg.partitioning import PartitionField, PartitionSpec
4445
from pyiceberg.schema import Schema
4546
from pyiceberg.table import TableProperties
46-
from pyiceberg.transforms import IdentityTransform
47-
from pyiceberg.types import IntegerType, LongType, NestedField, StringType
47+
from pyiceberg.transforms import DayTransform, IdentityTransform
48+
from pyiceberg.types import (
49+
DateType,
50+
DoubleType,
51+
IntegerType,
52+
LongType,
53+
NestedField,
54+
StringType,
55+
)
4856
from utils import _create_table
4957

5058

@@ -1333,3 +1341,71 @@ def test_overwrite_all_data_with_filter(session_catalog: Catalog) -> None:
13331341
tbl.overwrite(data, In("id", ["1", "2", "3"]))
13341342

13351343
assert len(tbl.scan().to_arrow()) == 3
1344+
1345+
1346+
@pytest.mark.integration
1347+
def test_delete_threshold() -> None:
1348+
catalog = load_catalog(
1349+
"local",
1350+
**{
1351+
"type": "rest",
1352+
"uri": "http://localhost:8181",
1353+
"s3.endpoint": "http://localhost:9000",
1354+
"s3.access-key-id": "admin",
1355+
"s3.secret-access-key": "password",
1356+
},
1357+
)
1358+
1359+
schema = Schema(
1360+
NestedField(field_id=101, name="id", field_type=LongType(), required=True),
1361+
NestedField(field_id=103, name="created_at", field_type=DateType(), required=False),
1362+
NestedField(field_id=104, name="relevancy_score", field_type=DoubleType(), required=False),
1363+
)
1364+
1365+
partition_spec = PartitionSpec(PartitionField(source_id=103, field_id=2000, transform=DayTransform(), name="created_at_day"))
1366+
1367+
try:
1368+
catalog.drop_table(
1369+
identifier="default.scores",
1370+
)
1371+
except NoSuchTableError:
1372+
pass
1373+
1374+
catalog.create_table(
1375+
identifier="default.scores",
1376+
schema=schema,
1377+
partition_spec=partition_spec,
1378+
)
1379+
1380+
# Parameters
1381+
num_rows = 100 # Number of rows in the dataframe
1382+
id_min, id_max = 1, 10000
1383+
date_start, date_end = date(2024, 1, 1), date(2024, 2, 1)
1384+
1385+
# Generate the 'id' column
1386+
id_column = np.random.randint(id_min, id_max, num_rows)
1387+
1388+
# Generate the 'created_at' column as dates only
1389+
date_range = pd.date_range(start=date_start, end=date_end, freq="D") # Daily frequency for dates
1390+
created_at_column = np.random.choice(date_range, num_rows) # Convert to string (YYYY-MM-DD format)
1391+
1392+
# Generate the 'relevancy_score' column with a peak around 0.1
1393+
relevancy_score_column = np.random.beta(a=2, b=20, size=num_rows) # Adjusting parameters to peak around 0.1
1394+
1395+
# Create the dataframe
1396+
df = pd.DataFrame({"id": id_column, "created_at": created_at_column, "relevancy_score": relevancy_score_column})
1397+
1398+
iceberg_table = catalog.load_table("default.scores")
1399+
1400+
# Convert the pandas DataFrame to a PyArrow Table with the Iceberg schema
1401+
arrow_schema = iceberg_table.schema().as_arrow()
1402+
docs_table = pa.Table.from_pandas(df, schema=arrow_schema)
1403+
1404+
# Append the data to the Iceberg table
1405+
iceberg_table.append(docs_table)
1406+
1407+
delete_condition = GreaterThanOrEqual("relevancy_score", 0.1)
1408+
lower_before = len(iceberg_table.scan(row_filter=Not(delete_condition)).to_arrow())
1409+
assert len(iceberg_table.scan(row_filter=Not(delete_condition)).to_arrow()) == lower_before
1410+
iceberg_table.delete(delete_condition)
1411+
assert len(iceberg_table.scan().to_arrow()) == lower_before

0 commit comments

Comments
 (0)