Skip to content

Commit c32aa04

Browse files
kaushiksrinijayceslesarFokko
authored
Validate added data files for snapshot compatibility (#2050)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> Closes #1929 # Rationale for this change - Since we want to support snapshot write compatibility (#1772) and is part of the following parent issue #819 # Are these changes tested? Yes # Are there any user-facing changes? No <!-- In the case of user-facing changes, please add the changelog label. --> --------- Co-authored-by: Jayce Slesar <[email protected]> Co-authored-by: Fokko Driesprong <[email protected]>
1 parent 6b19f0c commit c32aa04

File tree

2 files changed

+240
-22
lines changed

2 files changed

+240
-22
lines changed

pyiceberg/table/update/validate.py

Lines changed: 106 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,19 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17-
from typing import Iterator, Optional
17+
from typing import Iterator, Optional, Set
1818

1919
from pyiceberg.exceptions import ValidationException
2020
from pyiceberg.expressions import BooleanExpression
2121
from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, _InclusiveMetricsEvaluator
2222
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
23+
from pyiceberg.schema import Schema
2324
from pyiceberg.table import Table
2425
from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
2526
from pyiceberg.typedef import Record
2627

27-
VALIDATE_DATA_FILES_EXIST_OPERATIONS = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE}
28+
VALIDATE_DATA_FILES_EXIST_OPERATIONS: Set[Operation] = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE}
29+
VALIDATE_ADDED_DATA_FILES_OPERATIONS: Set[Operation] = {Operation.APPEND, Operation.OVERWRITE}
2830

2931

3032
def _validation_history(
@@ -77,6 +79,47 @@ def _validation_history(
7779
return manifests_files, snapshots
7880

7981

82+
def _filter_manifest_entries(
83+
entry: ManifestEntry,
84+
snapshot_ids: set[int],
85+
data_filter: Optional[BooleanExpression],
86+
partition_set: Optional[dict[int, set[Record]]],
87+
entry_status: Optional[ManifestEntryStatus],
88+
schema: Schema,
89+
) -> bool:
90+
"""Filter manifest entries based on data filter and partition set.
91+
92+
Args:
93+
entry: Manifest entry to filter
94+
snapshot_ids: set of snapshot ids to match data files
95+
data_filter: Optional filter to match data files
96+
partition_set: Optional set of partitions to match data files
97+
entry_status: Optional status to match data files
98+
schema: schema for filtering
99+
100+
Returns:
101+
True if the entry should be included, False otherwise
102+
"""
103+
if entry.snapshot_id not in snapshot_ids:
104+
return False
105+
106+
if entry_status is not None and entry.status != entry_status:
107+
return False
108+
109+
if data_filter is not None:
110+
evaluator = _InclusiveMetricsEvaluator(schema, data_filter)
111+
if evaluator.eval(entry.data_file) is ROWS_CANNOT_MATCH:
112+
return False
113+
114+
if partition_set is not None:
115+
partition = entry.data_file.partition
116+
spec_id = entry.data_file.spec_id
117+
if spec_id not in partition_set or partition not in partition_set[spec_id]:
118+
return False
119+
120+
return True
121+
122+
80123
def _deleted_data_files(
81124
table: Table,
82125
starting_snapshot: Snapshot,
@@ -108,27 +151,12 @@ def _deleted_data_files(
108151
ManifestContent.DATA,
109152
)
110153

111-
if data_filter is not None:
112-
evaluator = _InclusiveMetricsEvaluator(table.schema(), data_filter).eval
113-
114154
for manifest in manifests:
115155
for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False):
116-
if entry.snapshot_id not in snapshot_ids:
117-
continue
118-
119-
if entry.status != ManifestEntryStatus.DELETED:
120-
continue
121-
122-
if data_filter is not None and evaluator(entry.data_file) is ROWS_CANNOT_MATCH:
123-
continue
124-
125-
if partition_set is not None:
126-
spec_id = entry.data_file.spec_id
127-
partition = entry.data_file.partition
128-
if spec_id not in partition_set or partition not in partition_set[spec_id]:
129-
continue
130-
131-
yield entry
156+
if _filter_manifest_entries(
157+
entry, snapshot_ids, data_filter, partition_set, ManifestEntryStatus.DELETED, table.schema()
158+
):
159+
yield entry
132160

133161

134162
def _validate_deleted_data_files(
@@ -150,3 +178,60 @@ def _validate_deleted_data_files(
150178
if any(conflicting_entries):
151179
conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries}
152180
raise ValidationException(f"Deleted data files were found matching the filter for snapshots {conflicting_snapshots}!")
181+
182+
183+
def _added_data_files(
184+
table: Table,
185+
starting_snapshot: Snapshot,
186+
data_filter: Optional[BooleanExpression],
187+
partition_set: Optional[dict[int, set[Record]]],
188+
parent_snapshot: Optional[Snapshot],
189+
) -> Iterator[ManifestEntry]:
190+
"""Return manifest entries for data files added between the starting snapshot and parent snapshot.
191+
192+
Args:
193+
table: Table to get the history from
194+
starting_snapshot: Starting snapshot to get the history from
195+
data_filter: Optional filter to match data files
196+
partition_set: Optional set of partitions to match data files
197+
parent_snapshot: Parent snapshot to get the history from
198+
199+
Returns:
200+
Iterator of manifest entries for added data files matching the conditions
201+
"""
202+
if parent_snapshot is None:
203+
return
204+
205+
manifests, snapshot_ids = _validation_history(
206+
table,
207+
parent_snapshot,
208+
starting_snapshot,
209+
VALIDATE_ADDED_DATA_FILES_OPERATIONS,
210+
ManifestContent.DATA,
211+
)
212+
213+
for manifest in manifests:
214+
for entry in manifest.fetch_manifest_entry(table.io):
215+
if _filter_manifest_entries(entry, snapshot_ids, data_filter, partition_set, None, table.schema()):
216+
yield entry
217+
218+
219+
def _validate_added_data_files(
220+
table: Table,
221+
starting_snapshot: Snapshot,
222+
data_filter: Optional[BooleanExpression],
223+
parent_snapshot: Optional[Snapshot],
224+
) -> None:
225+
"""Validate that no files matching a filter have been added to the table since a starting snapshot.
226+
227+
Args:
228+
table: Table to validate
229+
starting_snapshot: Snapshot current at the start of the operation
230+
data_filter: Expression used to find added data files
231+
parent_snapshot: Ending snapshot on the branch being validated
232+
233+
"""
234+
conflicting_entries = _added_data_files(table, starting_snapshot, data_filter, None, parent_snapshot)
235+
if any(conflicting_entries):
236+
conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries if entry.snapshot_id is not None}
237+
raise ValidationException(f"Added data files were found matching the filter for snapshots {conflicting_snapshots}!")

tests/table/test_validate.py

Lines changed: 134 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,13 @@
2525
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
2626
from pyiceberg.table import Table
2727
from pyiceberg.table.snapshots import Operation, Snapshot, Summary
28-
from pyiceberg.table.update.validate import _deleted_data_files, _validate_deleted_data_files, _validation_history
28+
from pyiceberg.table.update.validate import (
29+
_added_data_files,
30+
_deleted_data_files,
31+
_validate_added_data_files,
32+
_validate_deleted_data_files,
33+
_validation_history,
34+
)
2935

3036

3137
@pytest.fixture
@@ -217,3 +223,130 @@ class DummyEntry:
217223
data_filter=None,
218224
parent_snapshot=oldest_snapshot,
219225
)
226+
227+
228+
@pytest.mark.parametrize("operation", [Operation.APPEND, Operation.OVERWRITE])
229+
def test_validate_added_data_files_conflicting_count(
230+
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
231+
operation: Operation,
232+
) -> None:
233+
table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests
234+
235+
snapshot_history = 100
236+
snapshots = table.snapshots()
237+
for i in range(1, snapshot_history + 1):
238+
altered_snapshot = snapshots[-i]
239+
altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=operation)})
240+
snapshots[-i] = altered_snapshot
241+
242+
table.metadata = table.metadata.model_copy(
243+
update={"snapshots": snapshots},
244+
)
245+
246+
oldest_snapshot = table.snapshots()[-snapshot_history]
247+
newest_snapshot = cast(Snapshot, table.current_snapshot())
248+
249+
def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]:
250+
"""Mock the manifests method to use the snapshot_id for lookup."""
251+
snapshot_id = self.snapshot_id
252+
if snapshot_id in mock_manifests:
253+
return mock_manifests[snapshot_id]
254+
return []
255+
256+
def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]:
257+
return [
258+
ManifestEntry.from_args(
259+
status=ManifestEntryStatus.ADDED,
260+
snapshot_id=self.added_snapshot_id,
261+
)
262+
]
263+
264+
with (
265+
patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect),
266+
patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry),
267+
):
268+
result = list(
269+
_added_data_files(
270+
table=table,
271+
starting_snapshot=newest_snapshot,
272+
data_filter=None,
273+
parent_snapshot=oldest_snapshot,
274+
partition_set=None,
275+
)
276+
)
277+
278+
# since we only look at the ManifestContent.Data files
279+
assert len(result) == snapshot_history / 2
280+
281+
282+
@pytest.mark.parametrize("operation", [Operation.DELETE, Operation.REPLACE])
283+
def test_validate_added_data_files_non_conflicting_count(
284+
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
285+
operation: Operation,
286+
) -> None:
287+
table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests
288+
289+
snapshot_history = 100
290+
snapshots = table.snapshots()
291+
for i in range(1, snapshot_history + 1):
292+
altered_snapshot = snapshots[-i]
293+
altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=operation)})
294+
snapshots[-i] = altered_snapshot
295+
296+
table.metadata = table.metadata.model_copy(
297+
update={"snapshots": snapshots},
298+
)
299+
300+
oldest_snapshot = table.snapshots()[-snapshot_history]
301+
newest_snapshot = cast(Snapshot, table.current_snapshot())
302+
303+
def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]:
304+
"""Mock the manifests method to use the snapshot_id for lookup."""
305+
snapshot_id = self.snapshot_id
306+
if snapshot_id in mock_manifests:
307+
return mock_manifests[snapshot_id]
308+
return []
309+
310+
def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]:
311+
return [
312+
ManifestEntry.from_args(
313+
status=ManifestEntryStatus.ADDED,
314+
snapshot_id=self.added_snapshot_id,
315+
)
316+
]
317+
318+
with (
319+
patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect),
320+
patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry),
321+
):
322+
result = list(
323+
_added_data_files(
324+
table=table,
325+
starting_snapshot=newest_snapshot,
326+
data_filter=None,
327+
parent_snapshot=oldest_snapshot,
328+
partition_set=None,
329+
)
330+
)
331+
332+
assert len(result) == 0
333+
334+
335+
def test_validate_added_data_files_raises_on_conflict(
336+
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
337+
) -> None:
338+
table, _ = table_v2_with_extensive_snapshots_and_manifests
339+
oldest_snapshot = table.snapshots()[0]
340+
newest_snapshot = cast(Snapshot, table.current_snapshot())
341+
342+
class DummyEntry:
343+
snapshot_id = 123
344+
345+
with patch("pyiceberg.table.update.validate._added_data_files", return_value=[DummyEntry()]):
346+
with pytest.raises(ValidationException):
347+
_validate_added_data_files(
348+
table=table,
349+
starting_snapshot=newest_snapshot,
350+
data_filter=None,
351+
parent_snapshot=oldest_snapshot,
352+
)

0 commit comments

Comments
 (0)