From 9dcb5807221cdd042b8bb3dfd3aa2a2687abd7a0 Mon Sep 17 00:00:00 2001 From: Jayce Date: Tue, 29 Apr 2025 16:58:34 -0400 Subject: [PATCH 01/19] feat: delete orphaned files --- pyiceberg/io/pyarrow.py | 15 +++++++++++++++ pyiceberg/table/__init__.py | 37 ++++++++++++++++++++++++++++++++++++- pyiceberg/table/inspect.py | 4 ++-- 3 files changed, 53 insertions(+), 3 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 522af0f344..923ac2baeb 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -67,6 +67,7 @@ from pyarrow import ChunkedArray from pyarrow.fs import ( FileInfo, + FileSelector, FileSystem, FileType, FSSpecHandler, @@ -576,6 +577,20 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None: raise PermissionError(f"Cannot delete file, access denied: {location}") from e raise # pragma: no cover - If some other kind of OSError, raise the raw error + def list_files(self, location: str) -> Iterator[str]: + """Recursively list all files in the given location. + + Args: + location (str): A URI or a path to a local directory. + + Returns: + Iterator[str]: An iterator of file paths. + """ + scheme, netloc, path = self.parse_location(location) + fs = self.fs_by_scheme(scheme, netloc) + selector = FileSelector(path, recursive=True) + return fs.get_file_info(selector) + def __getstate__(self) -> Dict[str, Any]: """Create a dictionary of the PyArrowFileIO fields used when pickling.""" fileio_copy = copy(self.__dict__) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 9e9de52dee..b286272319 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -17,6 +17,7 @@ from __future__ import annotations import itertools +import logging import os import uuid import warnings @@ -31,6 +32,7 @@ Callable, Dict, Iterable, + Iterator, List, Optional, Set, @@ -62,7 +64,7 @@ inclusive_projection, manifest_evaluator, ) -from pyiceberg.io import FileIO, load_file_io +from pyiceberg.io import FileIO, _parse_location, load_file_io from pyiceberg.manifest import ( POSITIONAL_DELETE_SCHEMA, DataFile, @@ -150,6 +152,8 @@ from pyiceberg.catalog import Catalog +logger = logging.getLogger(__name__) + ALWAYS_TRUE = AlwaysTrue() DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write" @@ -1371,6 +1375,37 @@ def to_polars(self) -> pl.LazyFrame: return pl.scan_iceberg(self) + def delete_orphaned_files(self) -> None: + """Delete orphaned files in the table.""" + try: + import pyarrow as pa # noqa: F401 + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e + + from pyarrow.fs import FileSelector + + all_known_files = [] + snapshot_ids = [snapshot.snapshot_id for snapshot in self.snapshots()] + all_manifests_table = self.inspect.all_manifests(snapshot_ids) + all_known_files.extend(all_manifests_table["path"].to_pylist()) + + executor = ExecutorFactory.get_or_create() + files_by_snapshots: Iterator["pa.Table"] = executor.map(lambda args: self.inspect.files(*args), snapshot_ids) + all_known_files.extend(pa.concat_tables(files_by_snapshots)["file_path"].to_pylist()) + + scheme, netloc, path = _parse_location(self.location()) + fs = self.io.fs_by_scheme(scheme, netloc) + selector = FileSelector(path, recursive=True) + all_files = fs.get_file_info(selector) + + orphaned_files = set(all_files).difference(set(all_known_files)) + logger.info(f"Found {len(orphaned_files)} orphaned files at {self.location()}!") + + if orphaned_files: + deletes = executor.map(self.io.delete, orphaned_files) + list(deletes) + logger.info(f"Deleted {len(orphaned_files)} orphaned files at {self.location()}!") + class StaticTable(Table): """Load a table directly from a metadata file (i.e., without using a catalog).""" diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 878ae71c81..84b40b21cc 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -645,10 +645,10 @@ def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table": def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table": return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}) - def all_manifests(self) -> "pa.Table": + def all_manifests(self, snapshot_ids: Optional[list[int]] = None) -> "pa.Table": import pyarrow as pa - snapshots = self.tbl.snapshots() + snapshots = snapshot_ids or self.tbl.snapshots() if not snapshots: return pa.Table.from_pylist([], schema=self._get_all_manifests_schema()) From e43505c0530f1688d1355b7ca9fc6848b68aac07 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Tue, 29 Apr 2025 18:39:11 -0400 Subject: [PATCH 02/19] simpler and a test --- pyiceberg/table/__init__.py | 26 +++++++---- pyiceberg/table/inspect.py | 4 +- tests/table/test_delete_orphans.py | 69 ++++++++++++++++++++++++++++++ 3 files changed, 88 insertions(+), 11 deletions(-) create mode 100644 tests/table/test_delete_orphans.py diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index b286272319..900d9e14f3 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1382,29 +1382,37 @@ def delete_orphaned_files(self) -> None: except ModuleNotFoundError as e: raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e - from pyarrow.fs import FileSelector + from pyarrow.fs import FileSelector, FileType + + from pyiceberg.io.pyarrow import _fs_from_file_path + + location = self.location() all_known_files = [] - snapshot_ids = [snapshot.snapshot_id for snapshot in self.snapshots()] - all_manifests_table = self.inspect.all_manifests(snapshot_ids) + snapshots = self.snapshots() + snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots] + all_manifests_table = self.inspect.all_manifests(snapshots) all_known_files.extend(all_manifests_table["path"].to_pylist()) executor = ExecutorFactory.get_or_create() - files_by_snapshots: Iterator["pa.Table"] = executor.map(lambda args: self.inspect.files(*args), snapshot_ids) + files_by_snapshots: Iterator["pa.Table"] = executor.map(lambda snapshot_id: self.inspect.files(snapshot_id), snapshot_ids) all_known_files.extend(pa.concat_tables(files_by_snapshots)["file_path"].to_pylist()) - scheme, netloc, path = _parse_location(self.location()) - fs = self.io.fs_by_scheme(scheme, netloc) + fs = _fs_from_file_path(self.io, location) + + _, _, path = _parse_location(location) selector = FileSelector(path, recursive=True) - all_files = fs.get_file_info(selector) + # filter to just files as it may return directories + all_files = [f.path for f in fs.get_file_info(selector) if f.type == FileType.File] orphaned_files = set(all_files).difference(set(all_known_files)) - logger.info(f"Found {len(orphaned_files)} orphaned files at {self.location()}!") + logger.info(f"Found {len(orphaned_files)} orphaned files at {location}!") if orphaned_files: deletes = executor.map(self.io.delete, orphaned_files) + # exhaust list(deletes) - logger.info(f"Deleted {len(orphaned_files)} orphaned files at {self.location()}!") + logger.info(f"Deleted {len(orphaned_files)} orphaned files at {location}!") class StaticTable(Table): diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 84b40b21cc..abf245d64a 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -645,10 +645,10 @@ def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table": def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table": return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}) - def all_manifests(self, snapshot_ids: Optional[list[int]] = None) -> "pa.Table": + def all_manifests(self, snapshots: Optional[list[Snapshot]] = None) -> "pa.Table": import pyarrow as pa - snapshots = snapshot_ids or self.tbl.snapshots() + snapshots = snapshots or self.tbl.snapshots() if not snapshots: return pa.Table.from_pylist([], schema=self._get_all_manifests_schema()) diff --git a/tests/table/test_delete_orphans.py b/tests/table/test_delete_orphans.py new file mode 100644 index 0000000000..2600188298 --- /dev/null +++ b/tests/table/test_delete_orphans.py @@ -0,0 +1,69 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from pathlib import Path, PosixPath + +import pyarrow as pa +import pytest + +from pyiceberg.catalog import Catalog +from pyiceberg.schema import Schema +from pyiceberg.types import IntegerType, NestedField, StringType +from tests.catalog.test_base import InMemoryCatalog + + +@pytest.fixture +def catalog(tmp_path: PosixPath) -> InMemoryCatalog: + catalog = InMemoryCatalog("test.in_memory.catalog", warehouse=tmp_path.absolute().as_posix()) + catalog.create_namespace("default") + return catalog + + +def test_delete_orphaned_files(catalog: Catalog) -> None: + identifier = "default.test_delete_orphaned_files" + + schema = Schema( + NestedField(1, "city", StringType(), required=True), + NestedField(2, "inhabitants", IntegerType(), required=True), + # Mark City as the identifier field, also known as the primary-key + identifier_field_ids=[1], + ) + + tbl = catalog.create_table(identifier, schema=schema) + + arrow_schema = pa.schema( + [ + pa.field("city", pa.string(), nullable=False), + pa.field("inhabitants", pa.int32(), nullable=False), + ] + ) + + df = pa.Table.from_pylist( + [ + {"city": "Drachten", "inhabitants": 45019}, + {"city": "Drachten", "inhabitants": 45019}, + ], + schema=arrow_schema, + ) + tbl.append(df) + + orphaned_file = Path(tbl.location()) / "orphan.txt" + + orphaned_file.touch() + assert orphaned_file.exists() + + tbl.delete_orphaned_files() + assert not orphaned_file.exists() From eed5ea85e1dd06cefcc9b2bee68d58c8877febbe Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Tue, 29 Apr 2025 18:42:58 -0400 Subject: [PATCH 03/19] remove --- pyiceberg/io/pyarrow.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 923ac2baeb..522af0f344 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -67,7 +67,6 @@ from pyarrow import ChunkedArray from pyarrow.fs import ( FileInfo, - FileSelector, FileSystem, FileType, FSSpecHandler, @@ -577,20 +576,6 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None: raise PermissionError(f"Cannot delete file, access denied: {location}") from e raise # pragma: no cover - If some other kind of OSError, raise the raw error - def list_files(self, location: str) -> Iterator[str]: - """Recursively list all files in the given location. - - Args: - location (str): A URI or a path to a local directory. - - Returns: - Iterator[str]: An iterator of file paths. - """ - scheme, netloc, path = self.parse_location(location) - fs = self.fs_by_scheme(scheme, netloc) - selector = FileSelector(path, recursive=True) - return fs.get_file_info(selector) - def __getstate__(self) -> Dict[str, Any]: """Create a dictionary of the PyArrowFileIO fields used when pickling.""" fileio_copy = copy(self.__dict__) From 8cca60003571c89d98ebb81bb10bef0099525334 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Fri, 2 May 2025 17:22:37 -0400 Subject: [PATCH 04/19] updates from review! --- pyiceberg/table/__init__.py | 42 +++++++------------------- pyiceberg/table/inspect.py | 47 ++++++++++++++++++++++++++++-- tests/table/test_delete_orphans.py | 37 +++++++++++++++++++++++ 3 files changed, 92 insertions(+), 34 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 900d9e14f3..9f771b7227 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import contextlib import itertools import logging import os @@ -32,7 +33,6 @@ Callable, Dict, Iterable, - Iterator, List, Optional, Set, @@ -64,7 +64,7 @@ inclusive_projection, manifest_evaluator, ) -from pyiceberg.io import FileIO, _parse_location, load_file_io +from pyiceberg.io import FileIO, load_file_io from pyiceberg.manifest import ( POSITIONAL_DELETE_SCHEMA, DataFile, @@ -1377,39 +1377,19 @@ def to_polars(self) -> pl.LazyFrame: def delete_orphaned_files(self) -> None: """Delete orphaned files in the table.""" - try: - import pyarrow as pa # noqa: F401 - except ModuleNotFoundError as e: - raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e - - from pyarrow.fs import FileSelector, FileType - - from pyiceberg.io.pyarrow import _fs_from_file_path - location = self.location() - - all_known_files = [] - snapshots = self.snapshots() - snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots] - all_manifests_table = self.inspect.all_manifests(snapshots) - all_known_files.extend(all_manifests_table["path"].to_pylist()) - - executor = ExecutorFactory.get_or_create() - files_by_snapshots: Iterator["pa.Table"] = executor.map(lambda snapshot_id: self.inspect.files(snapshot_id), snapshot_ids) - all_known_files.extend(pa.concat_tables(files_by_snapshots)["file_path"].to_pylist()) - - fs = _fs_from_file_path(self.io, location) - - _, _, path = _parse_location(location) - selector = FileSelector(path, recursive=True) - # filter to just files as it may return directories - all_files = [f.path for f in fs.get_file_info(selector) if f.type == FileType.File] - - orphaned_files = set(all_files).difference(set(all_known_files)) + orphaned_files = self.inspect.orphaned_files(location) logger.info(f"Found {len(orphaned_files)} orphaned files at {location}!") + def _delete(file: str) -> None: + # don't error if the file doesn't exist + # still catch ctrl-c, etc. + with contextlib.suppress(Exception): + self.io.delete(file) + if orphaned_files: - deletes = executor.map(self.io.delete, orphaned_files) + executor = ExecutorFactory.get_or_create() + deletes = executor.map(_delete, orphaned_files) # exhaust list(deletes) logger.info(f"Deleted {len(orphaned_files)} orphaned files at {location}!") diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index abf245d64a..742b689bbd 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -17,9 +17,11 @@ from __future__ import annotations from datetime import datetime, timezone -from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple +from functools import reduce +from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple, Union, cast from pyiceberg.conversions import from_bytes +from pyiceberg.io import _parse_location from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, PartitionFieldSummary from pyiceberg.partitioning import PartitionSpec from pyiceberg.table.snapshots import Snapshot, ancestors_of @@ -645,10 +647,16 @@ def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table": def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table": return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}) - def all_manifests(self, snapshots: Optional[list[Snapshot]] = None) -> "pa.Table": + def all_manifests(self, snapshots: Optional[Union[list[Snapshot], list[int]]] = None) -> "pa.Table": import pyarrow as pa - snapshots = snapshots or self.tbl.snapshots() + # coerce into snapshot objects if users passes in snapshot ids + if snapshots is not None: + if isinstance(snapshots[0], int): + snapshots = cast(list[Snapshot], [self.tbl.metadata.snapshot_by_id(snapshot_id) for snapshot_id in snapshots]) + else: + snapshots = self.tbl.snapshots() + if not snapshots: return pa.Table.from_pylist([], schema=self._get_all_manifests_schema()) @@ -657,3 +665,36 @@ def all_manifests(self, snapshots: Optional[list[Snapshot]] = None) -> "pa.Table lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots] ) return pa.concat_tables(manifests_by_snapshots) + + def orphaned_files(self, location: str) -> Set[str]: + try: + import pyarrow as pa # noqa: F401 + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e + + from pyarrow.fs import FileSelector, FileType + + from pyiceberg.io.pyarrow import _fs_from_file_path + + all_known_files = set() + snapshots = self.tbl.snapshots() + manifests_paths = self.all_manifests(snapshots)["path"].to_pylist() + all_known_files.update(manifests_paths) + + executor = ExecutorFactory.get_or_create() + files_by_snapshots: Iterator[Set[str]] = executor.map( + lambda snapshot_id: set(self.files(snapshot_id)["file_path"].to_pylist()) + ) + datafile_paths: set[str] = reduce(set.union, files_by_snapshots, set()) + all_known_files.update(datafile_paths) + + fs = _fs_from_file_path(self.tbl.io, location) + + _, _, path = _parse_location(location) + selector = FileSelector(path, recursive=True) + # filter to just files as it may return directories + all_files = [f.path for f in fs.get_file_info(selector) if f.type == FileType.File] + + orphaned_files = set(all_files).difference(set(all_known_files)) + + return orphaned_files diff --git a/tests/table/test_delete_orphans.py b/tests/table/test_delete_orphans.py index 2600188298..d074595300 100644 --- a/tests/table/test_delete_orphans.py +++ b/tests/table/test_delete_orphans.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. from pathlib import Path, PosixPath +from unittest.mock import PropertyMock, patch import pyarrow as pa import pytest @@ -67,3 +68,39 @@ def test_delete_orphaned_files(catalog: Catalog) -> None: tbl.delete_orphaned_files() assert not orphaned_file.exists() + + +def test_delete_orphaned_files_with_invalid_file_doesnt_error(catalog: Catalog) -> None: + identifier = "default.test_delete_orphaned_files" + + schema = Schema( + NestedField(1, "city", StringType(), required=True), + NestedField(2, "inhabitants", IntegerType(), required=True), + # Mark City as the identifier field, also known as the primary-key + identifier_field_ids=[1], + ) + + tbl = catalog.create_table(identifier, schema=schema) + + arrow_schema = pa.schema( + [ + pa.field("city", pa.string(), nullable=False), + pa.field("inhabitants", pa.int32(), nullable=False), + ] + ) + + df = pa.Table.from_pylist( + [ + {"city": "Drachten", "inhabitants": 45019}, + {"city": "Drachten", "inhabitants": 45019}, + ], + schema=arrow_schema, + ) + tbl.append(df) + + file_that_does_not_exist = "foo/bar.baz" + with patch.object(type(tbl), "inspect", new_callable=PropertyMock) as mock_inspect: + mock_inspect.return_value.orphaned_files = lambda x: {file_that_does_not_exist} + with patch.object(tbl.io, "delete", wraps=tbl.io.delete) as mock_delete: + tbl.delete_orphaned_files() + mock_delete.assert_called_with(file_that_does_not_exist) From 75b12401bbdcac9e515750525b901ec0324c0c33 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Fri, 2 May 2025 17:56:57 -0400 Subject: [PATCH 05/19] include dry run and older than --- pyiceberg/table/__init__.py | 18 +++++++++++------- pyiceberg/table/inspect.py | 11 ++++++----- tests/table/test_delete_orphans.py | 11 +++++++++-- 3 files changed, 26 insertions(+), 14 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 9f771b7227..09e8c0ef6f 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -24,6 +24,7 @@ import warnings from abc import ABC, abstractmethod from dataclasses import dataclass +from datetime import timedelta from functools import cached_property from itertools import chain from types import TracebackType @@ -1375,10 +1376,10 @@ def to_polars(self) -> pl.LazyFrame: return pl.scan_iceberg(self) - def delete_orphaned_files(self) -> None: + def delete_orphaned_files(self, older_than: Optional[timedelta] = timedelta(days=3), dry_run: bool = False) -> None: """Delete orphaned files in the table.""" location = self.location() - orphaned_files = self.inspect.orphaned_files(location) + orphaned_files = self.inspect.orphaned_files(location, older_than) logger.info(f"Found {len(orphaned_files)} orphaned files at {location}!") def _delete(file: str) -> None: @@ -1388,11 +1389,14 @@ def _delete(file: str) -> None: self.io.delete(file) if orphaned_files: - executor = ExecutorFactory.get_or_create() - deletes = executor.map(_delete, orphaned_files) - # exhaust - list(deletes) - logger.info(f"Deleted {len(orphaned_files)} orphaned files at {location}!") + if dry_run: + logger.info(f"(Dry Run) Deleted {len(orphaned_files)} orphaned files at {location}!") + else: + executor = ExecutorFactory.get_or_create() + deletes = executor.map(_delete, orphaned_files) + # exhaust + list(deletes) + logger.info(f"Deleted {len(orphaned_files)} orphaned files at {location}!") class StaticTable(Table): diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 742b689bbd..e25119af66 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -16,7 +16,7 @@ # under the License. from __future__ import annotations -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from functools import reduce from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple, Union, cast @@ -666,7 +666,7 @@ def all_manifests(self, snapshots: Optional[Union[list[Snapshot], list[int]]] = ) return pa.concat_tables(manifests_by_snapshots) - def orphaned_files(self, location: str) -> Set[str]: + def orphaned_files(self, location: str, older_than: Optional[timedelta] = timedelta(days=3)) -> Set[str]: try: import pyarrow as pa # noqa: F401 except ModuleNotFoundError as e: @@ -692,9 +692,10 @@ def orphaned_files(self, location: str) -> Set[str]: _, _, path = _parse_location(location) selector = FileSelector(path, recursive=True) - # filter to just files as it may return directories - all_files = [f.path for f in fs.get_file_info(selector) if f.type == FileType.File] + # filter to just files as it may return directories, and filter on time + as_of = datetime.now(timezone.utc) - older_than if older_than else None + all_files = [f for f in fs.get_file_info(selector) if f.type == FileType.File and (as_of is None or (f.mtime < as_of))] - orphaned_files = set(all_files).difference(set(all_known_files)) + orphaned_files = set(all_files).difference(all_known_files) return orphaned_files diff --git a/tests/table/test_delete_orphans.py b/tests/table/test_delete_orphans.py index d074595300..f31b298b38 100644 --- a/tests/table/test_delete_orphans.py +++ b/tests/table/test_delete_orphans.py @@ -14,6 +14,8 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import os +from datetime import datetime, timedelta from pathlib import Path, PosixPath from unittest.mock import PropertyMock, patch @@ -66,8 +68,13 @@ def test_delete_orphaned_files(catalog: Catalog) -> None: orphaned_file.touch() assert orphaned_file.exists() + # should not delete because it was just created... tbl.delete_orphaned_files() - assert not orphaned_file.exists() + assert orphaned_file.exists() + + # modify creation date to be older than 3 days + five_days_ago = (datetime.now() - timedelta(days=5)).timestamp() + os.utime(orphaned_file, (five_days_ago, five_days_ago)) def test_delete_orphaned_files_with_invalid_file_doesnt_error(catalog: Catalog) -> None: @@ -100,7 +107,7 @@ def test_delete_orphaned_files_with_invalid_file_doesnt_error(catalog: Catalog) file_that_does_not_exist = "foo/bar.baz" with patch.object(type(tbl), "inspect", new_callable=PropertyMock) as mock_inspect: - mock_inspect.return_value.orphaned_files = lambda x: {file_that_does_not_exist} + mock_inspect.return_value.orphaned_files = lambda location, older_than: {file_that_does_not_exist} with patch.object(tbl.io, "delete", wraps=tbl.io.delete) as mock_delete: tbl.delete_orphaned_files() mock_delete.assert_called_with(file_that_does_not_exist) From 6379480e2579b2195b0f6d26ca3cb595e1e9d206 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Fri, 2 May 2025 18:00:39 -0400 Subject: [PATCH 06/19] add case for dry run --- tests/table/test_delete_orphans.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/table/test_delete_orphans.py b/tests/table/test_delete_orphans.py index f31b298b38..266e3b8331 100644 --- a/tests/table/test_delete_orphans.py +++ b/tests/table/test_delete_orphans.py @@ -68,6 +68,10 @@ def test_delete_orphaned_files(catalog: Catalog) -> None: orphaned_file.touch() assert orphaned_file.exists() + # assert no files deleted if dry run... + tbl.delete_orphaned_files(dry_run=True) + assert orphaned_file.exists() + # should not delete because it was just created... tbl.delete_orphaned_files() assert orphaned_file.exists() From 0c2822e59689a57c8d8d2b0921c8ca4b2ce0c8c8 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 3 May 2025 14:16:09 -0400 Subject: [PATCH 07/19] use .path so we get paths pack --- pyiceberg/table/inspect.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index e25119af66..cfbbfb008a 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -694,7 +694,9 @@ def orphaned_files(self, location: str, older_than: Optional[timedelta] = timede selector = FileSelector(path, recursive=True) # filter to just files as it may return directories, and filter on time as_of = datetime.now(timezone.utc) - older_than if older_than else None - all_files = [f for f in fs.get_file_info(selector) if f.type == FileType.File and (as_of is None or (f.mtime < as_of))] + all_files = [ + f.path for f in fs.get_file_info(selector) if f.type == FileType.File and (as_of is None or (f.mtime < as_of)) + ] orphaned_files = set(all_files).difference(all_known_files) From aaf8fc2e7f392b42c3f081b58249fca8414f4698 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 3 May 2025 14:17:52 -0400 Subject: [PATCH 08/19] actually pass in iterable --- pyiceberg/table/inspect.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index cfbbfb008a..3c7dc5c228 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -678,12 +678,13 @@ def orphaned_files(self, location: str, older_than: Optional[timedelta] = timede all_known_files = set() snapshots = self.tbl.snapshots() + snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots] manifests_paths = self.all_manifests(snapshots)["path"].to_pylist() all_known_files.update(manifests_paths) executor = ExecutorFactory.get_or_create() files_by_snapshots: Iterator[Set[str]] = executor.map( - lambda snapshot_id: set(self.files(snapshot_id)["file_path"].to_pylist()) + lambda snapshot_id: set(self.files(snapshot_id)["file_path"].to_pylist()), snapshot_ids ) datafile_paths: set[str] = reduce(set.union, files_by_snapshots, set()) all_known_files.update(datafile_paths) From b09641b46e36499720c8e9fad5ef2d7ad7dfd9c4 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 3 May 2025 14:42:34 -0400 Subject: [PATCH 09/19] capture manifest_list files --- pyiceberg/table/inspect.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 3c7dc5c228..18f10146e2 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -679,8 +679,10 @@ def orphaned_files(self, location: str, older_than: Optional[timedelta] = timede all_known_files = set() snapshots = self.tbl.snapshots() snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots] - manifests_paths = self.all_manifests(snapshots)["path"].to_pylist() - all_known_files.update(manifests_paths) + + all_known_files.update(self.all_manifests(snapshots)["path"].to_pylist()) + all_known_files.update([snapshot.manifest_list for snapshot in snapshots]) + all_known_files.update([statistic.statistics_path for statistic in self.tbl.metadata.statistics]) executor = ExecutorFactory.get_or_create() files_by_snapshots: Iterator[Set[str]] = executor.map( From beec233e0521fee0dab5cd0379bba06babc5a1d9 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 3 May 2025 14:57:35 -0400 Subject: [PATCH 10/19] refactor into `all_known_files` --- pyiceberg/table/inspect.py | 50 ++++++++++++++++++++++++++------------ 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 18f10146e2..27c94b5c02 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -666,7 +666,39 @@ def all_manifests(self, snapshots: Optional[Union[list[Snapshot], list[int]]] = ) return pa.concat_tables(manifests_by_snapshots) + def all_known_files(self) -> dict[str, set[str]]: + """Get all the known files in the table. + + Returns: + dict of {file_type: list of file paths} for each file type. + """ + snapshots = self.tbl.snapshots() + + _all_known_files = {} + _all_known_files["manifests"] = set(self.all_manifests(snapshots)["path"].to_pylist()) + _all_known_files["manifest_lists"] = {snapshot.manifest_list for snapshot in snapshots} + _all_known_files["statistics"] = {statistic.statistics_path for statistic in self.tbl.metadata.statistics} + + snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots] + executor = ExecutorFactory.get_or_create() + files_by_snapshots: Iterator[Set[str]] = executor.map( + lambda snapshot_id: set(self.files(snapshot_id)["file_path"].to_pylist()), snapshot_ids + ) + _all_known_files["datafiles"] = reduce(set.union, files_by_snapshots, set()) + + return _all_known_files + def orphaned_files(self, location: str, older_than: Optional[timedelta] = timedelta(days=3)) -> Set[str]: + """Get all the orphaned files in the table. + + Args: + location: The location to check for orphaned files. + older_than: The time period to check for orphaned files. Defaults to 3 days. + + Returns: + A set of orphaned file paths. + + """ try: import pyarrow as pa # noqa: F401 except ModuleNotFoundError as e: @@ -676,20 +708,8 @@ def orphaned_files(self, location: str, older_than: Optional[timedelta] = timede from pyiceberg.io.pyarrow import _fs_from_file_path - all_known_files = set() - snapshots = self.tbl.snapshots() - snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots] - - all_known_files.update(self.all_manifests(snapshots)["path"].to_pylist()) - all_known_files.update([snapshot.manifest_list for snapshot in snapshots]) - all_known_files.update([statistic.statistics_path for statistic in self.tbl.metadata.statistics]) - - executor = ExecutorFactory.get_or_create() - files_by_snapshots: Iterator[Set[str]] = executor.map( - lambda snapshot_id: set(self.files(snapshot_id)["file_path"].to_pylist()), snapshot_ids - ) - datafile_paths: set[str] = reduce(set.union, files_by_snapshots, set()) - all_known_files.update(datafile_paths) + all_known_files = self.all_known_files() + flat_known_files: set[str] = reduce(set.union, all_known_files.values(), set()) fs = _fs_from_file_path(self.tbl.io, location) @@ -701,6 +721,6 @@ def orphaned_files(self, location: str, older_than: Optional[timedelta] = timede f.path for f in fs.get_file_info(selector) if f.type == FileType.File and (as_of is None or (f.mtime < as_of)) ] - orphaned_files = set(all_files).difference(all_known_files) + orphaned_files = set(all_files).difference(flat_known_files) return orphaned_files From b888c568d0a5f976c8cd42b735395a1e57cbb7ae Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 3 May 2025 14:58:45 -0400 Subject: [PATCH 11/19] fix type in docstring --- pyiceberg/table/inspect.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 27c94b5c02..966abfede9 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -670,7 +670,7 @@ def all_known_files(self) -> dict[str, set[str]]: """Get all the known files in the table. Returns: - dict of {file_type: list of file paths} for each file type. + dict of {file_type: set of file paths} for each file type. """ snapshots = self.tbl.snapshots() From ff461ed843793bbd40682e9504a3a9ac4c6e65aa Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 3 May 2025 15:01:29 -0400 Subject: [PATCH 12/19] mildly more readable --- pyiceberg/table/inspect.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 966abfede9..a0b3d16c97 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -679,8 +679,8 @@ def all_known_files(self) -> dict[str, set[str]]: _all_known_files["manifest_lists"] = {snapshot.manifest_list for snapshot in snapshots} _all_known_files["statistics"] = {statistic.statistics_path for statistic in self.tbl.metadata.statistics} - snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots] executor = ExecutorFactory.get_or_create() + snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots] files_by_snapshots: Iterator[Set[str]] = executor.map( lambda snapshot_id: set(self.files(snapshot_id)["file_path"].to_pylist()), snapshot_ids ) From 3b3b10e6c7f5c22f15e13b26274568b6eace75a9 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 3 May 2025 15:23:45 -0400 Subject: [PATCH 13/19] beef up tests --- tests/table/test_delete_orphans.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/table/test_delete_orphans.py b/tests/table/test_delete_orphans.py index 266e3b8331..3d797a8018 100644 --- a/tests/table/test_delete_orphans.py +++ b/tests/table/test_delete_orphans.py @@ -79,6 +79,14 @@ def test_delete_orphaned_files(catalog: Catalog) -> None: # modify creation date to be older than 3 days five_days_ago = (datetime.now() - timedelta(days=5)).timestamp() os.utime(orphaned_file, (five_days_ago, five_days_ago)) + tbl.delete_orphaned_files() + assert not orphaned_file.exists() + + # assert that all known files still exist... + all_known_files = tbl.inspect.all_known_files() + for files in all_known_files.values(): + for file in files: + assert Path(file).exists() def test_delete_orphaned_files_with_invalid_file_doesnt_error(catalog: Catalog) -> None: From a62c8cf2e9851db2c04059d3a621844f41f05cd7 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sun, 4 May 2025 12:54:08 -0400 Subject: [PATCH 14/19] make `older_than` required --- pyiceberg/table/__init__.py | 2 +- pyiceberg/table/inspect.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 09e8c0ef6f..0f3aec66ec 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1376,7 +1376,7 @@ def to_polars(self) -> pl.LazyFrame: return pl.scan_iceberg(self) - def delete_orphaned_files(self, older_than: Optional[timedelta] = timedelta(days=3), dry_run: bool = False) -> None: + def delete_orphaned_files(self, older_than: timedelta = timedelta(days=3), dry_run: bool = False) -> None: """Delete orphaned files in the table.""" location = self.location() orphaned_files = self.inspect.orphaned_files(location, older_than) diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index a0b3d16c97..44726cfe15 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -688,7 +688,7 @@ def all_known_files(self) -> dict[str, set[str]]: return _all_known_files - def orphaned_files(self, location: str, older_than: Optional[timedelta] = timedelta(days=3)) -> Set[str]: + def orphaned_files(self, location: str, older_than: timedelta = timedelta(days=3)) -> Set[str]: """Get all the orphaned files in the table. Args: @@ -716,7 +716,7 @@ def orphaned_files(self, location: str, older_than: Optional[timedelta] = timede _, _, path = _parse_location(location) selector = FileSelector(path, recursive=True) # filter to just files as it may return directories, and filter on time - as_of = datetime.now(timezone.utc) - older_than if older_than else None + as_of = datetime.now(timezone.utc) - older_than all_files = [ f.path for f in fs.get_file_info(selector) if f.type == FileType.File and (as_of is None or (f.mtime < as_of)) ] From 07cbf1b913f7f0e95ea061bbf10d4a5704729614 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sun, 4 May 2025 13:22:58 -0400 Subject: [PATCH 15/19] move under `optimize` namespace --- pyiceberg/table/__init__.py | 37 ++---- pyiceberg/table/inspect.py | 40 +------ pyiceberg/table/optimize.py | 107 ++++++++++++++++++ ...lete_orphans.py => test_remove_orphans.py} | 21 ++-- 4 files changed, 128 insertions(+), 77 deletions(-) create mode 100644 pyiceberg/table/optimize.py rename tests/table/{test_delete_orphans.py => test_remove_orphans.py} (85%) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 0f3aec66ec..b65ba4b44a 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -16,15 +16,12 @@ # under the License. from __future__ import annotations -import contextlib import itertools -import logging import os import uuid import warnings from abc import ABC, abstractmethod from dataclasses import dataclass -from datetime import timedelta from functools import cached_property from itertools import chain from types import TracebackType @@ -90,6 +87,7 @@ from pyiceberg.table.name_mapping import ( NameMapping, ) +from pyiceberg.table.optimize import OptimizeTable from pyiceberg.table.refs import SnapshotRef from pyiceberg.table.snapshots import ( Snapshot, @@ -153,8 +151,6 @@ from pyiceberg.catalog import Catalog -logger = logging.getLogger(__name__) - ALWAYS_TRUE = AlwaysTrue() DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write" @@ -912,6 +908,15 @@ def inspect(self) -> InspectTable: """ return InspectTable(self) + @property + def optimize(self) -> OptimizeTable: + """Return the OptimizeTable object to optimize. + + Returns: + OptimizeTable object based on this Table. + """ + return OptimizeTable(self) + def refresh(self) -> Table: """Refresh the current table metadata. @@ -1376,28 +1381,6 @@ def to_polars(self) -> pl.LazyFrame: return pl.scan_iceberg(self) - def delete_orphaned_files(self, older_than: timedelta = timedelta(days=3), dry_run: bool = False) -> None: - """Delete orphaned files in the table.""" - location = self.location() - orphaned_files = self.inspect.orphaned_files(location, older_than) - logger.info(f"Found {len(orphaned_files)} orphaned files at {location}!") - - def _delete(file: str) -> None: - # don't error if the file doesn't exist - # still catch ctrl-c, etc. - with contextlib.suppress(Exception): - self.io.delete(file) - - if orphaned_files: - if dry_run: - logger.info(f"(Dry Run) Deleted {len(orphaned_files)} orphaned files at {location}!") - else: - executor = ExecutorFactory.get_or_create() - deletes = executor.map(_delete, orphaned_files) - # exhaust - list(deletes) - logger.info(f"Deleted {len(orphaned_files)} orphaned files at {location}!") - class StaticTable(Table): """Load a table directly from a metadata file (i.e., without using a catalog).""" diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 44726cfe15..ccdbd9cd25 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -16,12 +16,11 @@ # under the License. from __future__ import annotations -from datetime import datetime, timedelta, timezone +from datetime import datetime, timezone from functools import reduce from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple, Union, cast from pyiceberg.conversions import from_bytes -from pyiceberg.io import _parse_location from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, PartitionFieldSummary from pyiceberg.partitioning import PartitionSpec from pyiceberg.table.snapshots import Snapshot, ancestors_of @@ -687,40 +686,3 @@ def all_known_files(self) -> dict[str, set[str]]: _all_known_files["datafiles"] = reduce(set.union, files_by_snapshots, set()) return _all_known_files - - def orphaned_files(self, location: str, older_than: timedelta = timedelta(days=3)) -> Set[str]: - """Get all the orphaned files in the table. - - Args: - location: The location to check for orphaned files. - older_than: The time period to check for orphaned files. Defaults to 3 days. - - Returns: - A set of orphaned file paths. - - """ - try: - import pyarrow as pa # noqa: F401 - except ModuleNotFoundError as e: - raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e - - from pyarrow.fs import FileSelector, FileType - - from pyiceberg.io.pyarrow import _fs_from_file_path - - all_known_files = self.all_known_files() - flat_known_files: set[str] = reduce(set.union, all_known_files.values(), set()) - - fs = _fs_from_file_path(self.tbl.io, location) - - _, _, path = _parse_location(location) - selector = FileSelector(path, recursive=True) - # filter to just files as it may return directories, and filter on time - as_of = datetime.now(timezone.utc) - older_than - all_files = [ - f.path for f in fs.get_file_info(selector) if f.type == FileType.File and (as_of is None or (f.mtime < as_of)) - ] - - orphaned_files = set(all_files).difference(flat_known_files) - - return orphaned_files diff --git a/pyiceberg/table/optimize.py b/pyiceberg/table/optimize.py new file mode 100644 index 0000000000..81b8ecbcd2 --- /dev/null +++ b/pyiceberg/table/optimize.py @@ -0,0 +1,107 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import contextlib +import logging +from datetime import datetime, timedelta, timezone +from functools import reduce +from typing import TYPE_CHECKING, Set + +from pyiceberg.io import _parse_location +from pyiceberg.utils.concurrent import ExecutorFactory + +logger = logging.getLogger(__name__) + + +if TYPE_CHECKING: + from pyiceberg.table import Table + + +class OptimizeTable: + tbl: Table + + def __init__(self, tbl: Table) -> None: + self.tbl = tbl + + try: + import pyarrow as pa # noqa + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For metadata operations PyArrow needs to be installed") from e + + def orphaned_files(self, location: str, older_than: timedelta = timedelta(days=3)) -> Set[str]: + """Get all files which are not referenced in any metadata files of an Iceberg table and can thus be considered "orphaned". + + Args: + location: The location to check for orphaned files. + older_than: The time period to check for orphaned files. Defaults to 3 days. + + Returns: + A set of orphaned file paths. + """ + try: + import pyarrow as pa # noqa: F401 + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e + + from pyarrow.fs import FileSelector, FileType + + from pyiceberg.io.pyarrow import _fs_from_file_path + + all_known_files = self.tbl.inspect.all_known_files() + flat_known_files: set[str] = reduce(set.union, all_known_files.values(), set()) + + fs = _fs_from_file_path(self.tbl.io, location) + + _, _, path = _parse_location(location) + selector = FileSelector(path, recursive=True) + # filter to just files as it may return directories, and filter on time + as_of = datetime.now(timezone.utc) - older_than + all_files = [ + f.path for f in fs.get_file_info(selector) if f.type == FileType.File and (as_of is None or (f.mtime < as_of)) + ] + + orphaned_files = set(all_files).difference(flat_known_files) + + return orphaned_files + + def remove_orphaned_files(self, older_than: timedelta = timedelta(days=3), dry_run: bool = False) -> None: + """Remove files which are not referenced in any metadata files of an Iceberg table and can thus be considered "orphaned". + + Args: + older_than: The time period to check for orphaned files. Defaults to 3 days. + dry_run: If True, only log the files that would be deleted. Defaults to False. + """ + location = self.tbl.location() + orphaned_files = self.orphaned_files(location, older_than) + logger.info(f"Found {len(orphaned_files)} orphaned files at {location}!") + + def _delete(file: str) -> None: + # don't error if the file doesn't exist + # still catch ctrl-c, etc. + with contextlib.suppress(Exception): + self.tbl.io.delete(file) + + if orphaned_files: + if dry_run: + logger.info(f"(Dry Run) Deleted {len(orphaned_files)} orphaned files at {location}!") + else: + executor = ExecutorFactory.get_or_create() + deletes = executor.map(_delete, orphaned_files) + # exhaust + list(deletes) + logger.info(f"Deleted {len(orphaned_files)} orphaned files at {location}!") diff --git a/tests/table/test_delete_orphans.py b/tests/table/test_remove_orphans.py similarity index 85% rename from tests/table/test_delete_orphans.py rename to tests/table/test_remove_orphans.py index 3d797a8018..dc09717340 100644 --- a/tests/table/test_delete_orphans.py +++ b/tests/table/test_remove_orphans.py @@ -17,7 +17,7 @@ import os from datetime import datetime, timedelta from pathlib import Path, PosixPath -from unittest.mock import PropertyMock, patch +from unittest.mock import patch import pyarrow as pa import pytest @@ -35,8 +35,8 @@ def catalog(tmp_path: PosixPath) -> InMemoryCatalog: return catalog -def test_delete_orphaned_files(catalog: Catalog) -> None: - identifier = "default.test_delete_orphaned_files" +def test_remove_orphaned_files(catalog: Catalog) -> None: + identifier = "default.test_remove_orphaned_files" schema = Schema( NestedField(1, "city", StringType(), required=True), @@ -69,17 +69,17 @@ def test_delete_orphaned_files(catalog: Catalog) -> None: assert orphaned_file.exists() # assert no files deleted if dry run... - tbl.delete_orphaned_files(dry_run=True) + tbl.optimize.remove_orphaned_files(dry_run=True) assert orphaned_file.exists() # should not delete because it was just created... - tbl.delete_orphaned_files() + tbl.optimize.remove_orphaned_files() assert orphaned_file.exists() # modify creation date to be older than 3 days five_days_ago = (datetime.now() - timedelta(days=5)).timestamp() os.utime(orphaned_file, (five_days_ago, five_days_ago)) - tbl.delete_orphaned_files() + tbl.optimize.remove_orphaned_files() assert not orphaned_file.exists() # assert that all known files still exist... @@ -89,8 +89,8 @@ def test_delete_orphaned_files(catalog: Catalog) -> None: assert Path(file).exists() -def test_delete_orphaned_files_with_invalid_file_doesnt_error(catalog: Catalog) -> None: - identifier = "default.test_delete_orphaned_files" +def test_remove_orphaned_files_with_invalid_file_doesnt_error(catalog: Catalog) -> None: + identifier = "default.test_remove_orphaned_files" schema = Schema( NestedField(1, "city", StringType(), required=True), @@ -118,8 +118,7 @@ def test_delete_orphaned_files_with_invalid_file_doesnt_error(catalog: Catalog) tbl.append(df) file_that_does_not_exist = "foo/bar.baz" - with patch.object(type(tbl), "inspect", new_callable=PropertyMock) as mock_inspect: - mock_inspect.return_value.orphaned_files = lambda location, older_than: {file_that_does_not_exist} + with patch.object(type(tbl.optimize), "orphaned_files", return_value={file_that_does_not_exist}): with patch.object(tbl.io, "delete", wraps=tbl.io.delete) as mock_delete: - tbl.delete_orphaned_files() + tbl.optimize.remove_orphaned_files(timedelta(days=3)) mock_delete.assert_called_with(file_that_does_not_exist) From 54e1e0045c1b285c4046346c6e32db759963cbc9 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sun, 4 May 2025 13:26:49 -0400 Subject: [PATCH 16/19] add some better logging about what was/was not deleted --- pyiceberg/table/optimize.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/pyiceberg/table/optimize.py b/pyiceberg/table/optimize.py index 81b8ecbcd2..dbd7ec081c 100644 --- a/pyiceberg/table/optimize.py +++ b/pyiceberg/table/optimize.py @@ -16,7 +16,6 @@ # under the License. from __future__ import annotations -import contextlib import logging from datetime import datetime, timedelta, timezone from functools import reduce @@ -89,12 +88,17 @@ def remove_orphaned_files(self, older_than: timedelta = timedelta(days=3), dry_r location = self.tbl.location() orphaned_files = self.orphaned_files(location, older_than) logger.info(f"Found {len(orphaned_files)} orphaned files at {location}!") + deleted_files = set() + failed_to_delete_files = set() def _delete(file: str) -> None: # don't error if the file doesn't exist # still catch ctrl-c, etc. - with contextlib.suppress(Exception): + try: self.tbl.io.delete(file) + deleted_files.add(file) + except Exception: + failed_to_delete_files.add(file) if orphaned_files: if dry_run: @@ -104,4 +108,10 @@ def _delete(file: str) -> None: deletes = executor.map(_delete, orphaned_files) # exhaust list(deletes) - logger.info(f"Deleted {len(orphaned_files)} orphaned files at {location}!") + logger.info(f"Deleted {len(deleted_files)} orphaned files at {location}!") + logger.info(f"Files:\n{deleted_files}") + if failed_to_delete_files: + logger.warning(f"Failed to delete {len(failed_to_delete_files)} orphaned files at {location}!") + logger.warning(f"Files:\n{failed_to_delete_files}") + else: + logger.info(f"No orphaned files found at {location}!") From 34d10b928219346a4efffe95d87353dddb9ac46d Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 17 May 2025 14:08:20 -0400 Subject: [PATCH 17/19] rename optimize -> maintenance --- pyiceberg/table/__init__.py | 10 +++++----- pyiceberg/table/{optimize.py => maintenance.py} | 2 +- tests/table/test_remove_orphans.py | 10 +++++----- 3 files changed, 11 insertions(+), 11 deletions(-) rename pyiceberg/table/{optimize.py => maintenance.py} (99%) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index eebd200344..98cf7b0849 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -80,6 +80,7 @@ from pyiceberg.schema import Schema from pyiceberg.table.inspect import InspectTable from pyiceberg.table.locations import LocationProvider, load_location_provider +from pyiceberg.table.maintenance import MaintenanceTable from pyiceberg.table.metadata import ( INITIAL_SEQUENCE_NUMBER, TableMetadata, @@ -87,7 +88,6 @@ from pyiceberg.table.name_mapping import ( NameMapping, ) -from pyiceberg.table.optimize import OptimizeTable from pyiceberg.table.refs import SnapshotRef from pyiceberg.table.snapshots import ( Snapshot, @@ -1025,13 +1025,13 @@ def inspect(self) -> InspectTable: return InspectTable(self) @property - def optimize(self) -> OptimizeTable: - """Return the OptimizeTable object to optimize. + def maintenance(self) -> MaintenanceTable: + """Return the MaintenanceTable object for maintenance. Returns: - OptimizeTable object based on this Table. + MaintenanceTable object based on this Table. """ - return OptimizeTable(self) + return MaintenanceTable(self) def refresh(self) -> Table: """Refresh the current table metadata. diff --git a/pyiceberg/table/optimize.py b/pyiceberg/table/maintenance.py similarity index 99% rename from pyiceberg/table/optimize.py rename to pyiceberg/table/maintenance.py index fc3fd2fd75..edb51d832b 100644 --- a/pyiceberg/table/optimize.py +++ b/pyiceberg/table/maintenance.py @@ -31,7 +31,7 @@ from pyiceberg.table import Table -class OptimizeTable: +class MaintenanceTable: tbl: Table def __init__(self, tbl: Table) -> None: diff --git a/tests/table/test_remove_orphans.py b/tests/table/test_remove_orphans.py index 46f07b8a3a..cf836872ce 100644 --- a/tests/table/test_remove_orphans.py +++ b/tests/table/test_remove_orphans.py @@ -69,17 +69,17 @@ def test_remove_orphaned_files(catalog: Catalog) -> None: assert orphaned_file.exists() # assert no files deleted if dry run... - tbl.optimize.remove_orphaned_files(dry_run=True) + tbl.maintenance.remove_orphaned_files(dry_run=True) assert orphaned_file.exists() # should not delete because it was just created... - tbl.optimize.remove_orphaned_files() + tbl.maintenance.remove_orphaned_files() assert orphaned_file.exists() # modify creation date to be older than 3 days five_days_ago = (datetime.now() - timedelta(days=5)).timestamp() os.utime(orphaned_file, (five_days_ago, five_days_ago)) - tbl.optimize.remove_orphaned_files() + tbl.maintenance.remove_orphaned_files() assert not orphaned_file.exists() # assert that all known files still exist... @@ -118,7 +118,7 @@ def test_remove_orphaned_files_with_invalid_file_doesnt_error(catalog: Catalog) tbl.append(df) file_that_does_not_exist = "foo/bar.baz" - with patch.object(type(tbl.optimize), "orphaned_files", return_value={file_that_does_not_exist}): + with patch.object(type(tbl.maintenance), "orphaned_files", return_value={file_that_does_not_exist}): with patch.object(tbl.io, "delete", wraps=tbl.io.delete) as mock_delete: - tbl.optimize.remove_orphaned_files(timedelta(days=3)) + tbl.maintenance.remove_orphaned_files(timedelta(days=3)) mock_delete.assert_called_with(file_that_does_not_exist) From 0335957119cae392a31c1b979bc6e99776ec78bc Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 17 May 2025 14:09:41 -0400 Subject: [PATCH 18/19] make orphaned_files private --- pyiceberg/table/maintenance.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/maintenance.py b/pyiceberg/table/maintenance.py index edb51d832b..2d868da010 100644 --- a/pyiceberg/table/maintenance.py +++ b/pyiceberg/table/maintenance.py @@ -42,7 +42,7 @@ def __init__(self, tbl: Table) -> None: except ModuleNotFoundError as e: raise ModuleNotFoundError("For metadata operations PyArrow needs to be installed") from e - def orphaned_files(self, location: str, older_than: timedelta = timedelta(days=3)) -> Set[str]: + def _orphaned_files(self, location: str, older_than: timedelta = timedelta(days=3)) -> Set[str]: """Get all files which are not referenced in any metadata files of an Iceberg table and can thus be considered "orphaned". Args: @@ -86,7 +86,7 @@ def remove_orphaned_files(self, older_than: timedelta = timedelta(days=3), dry_r dry_run: If True, only log the files that would be deleted. Defaults to False. """ location = self.tbl.location() - orphaned_files = self.orphaned_files(location, older_than) + orphaned_files = self._orphaned_files(location, older_than) logger.info(f"Found {len(orphaned_files)} orphaned files at {location}!") deleted_files = set() failed_to_delete_files = set() From 9f8145ccc1d9904818ad45303002c445b9ae4285 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 17 May 2025 14:13:36 -0400 Subject: [PATCH 19/19] correctly coerce list --- pyiceberg/table/inspect.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 19dae2968d..6993ae1c33 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -18,7 +18,7 @@ from datetime import datetime, timezone from functools import reduce -from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple, Union, cast +from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple, Union from pyiceberg.conversions import from_bytes from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary @@ -672,7 +672,11 @@ def all_manifests(self, snapshots: Optional[Union[list[Snapshot], list[int]]] = # coerce into snapshot objects if users passes in snapshot ids if snapshots is not None: if isinstance(snapshots[0], int): - snapshots = cast(list[Snapshot], [self.tbl.metadata.snapshot_by_id(snapshot_id) for snapshot_id in snapshots]) + snapshots = [ + snapshot + for snapshot_id in snapshots + if (snapshot := self.tbl.metadata.snapshot_by_id(snapshot_id)) is not None + ] else: snapshots = self.tbl.snapshots()