Skip to content

feat: delete orphaned files #1958

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 32 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9dcb580
feat: delete orphaned files
jayceslesar Apr 29, 2025
e43505c
simpler and a test
jayceslesar Apr 29, 2025
eed5ea8
remove
jayceslesar Apr 29, 2025
8cca600
updates from review!
jayceslesar May 2, 2025
75b1240
include dry run and older than
jayceslesar May 2, 2025
6379480
add case for dry run
jayceslesar May 2, 2025
0c2822e
use .path so we get paths pack
jayceslesar May 3, 2025
aaf8fc2
actually pass in iterable
jayceslesar May 3, 2025
b09641b
capture manifest_list files
jayceslesar May 3, 2025
beec233
refactor into `all_known_files`
jayceslesar May 3, 2025
b888c56
fix type in docstring
jayceslesar May 3, 2025
ff461ed
mildly more readable
jayceslesar May 3, 2025
3b3b10e
beef up tests
jayceslesar May 3, 2025
a62c8cf
make `older_than` required
jayceslesar May 4, 2025
07cbf1b
move under `optimize` namespace
jayceslesar May 4, 2025
54e1e00
add some better logging about what was/was not deleted
jayceslesar May 4, 2025
7c780d3
Merge branch 'main' into feat/orphan-files
jayceslesar May 10, 2025
9b6c9ed
Merge branch 'main' into feat/orphan-files
jayceslesar May 13, 2025
34d10b9
rename optimize -> maintenance
jayceslesar May 17, 2025
0335957
make orphaned_files private
jayceslesar May 17, 2025
9f8145c
correctly coerce list
jayceslesar May 17, 2025
fbdcbd3
add metadata files
jayceslesar May 28, 2025
85b4ab3
Merge branch 'main' into feat/orphan-files
jayceslesar May 28, 2025
c414df8
Merge branch 'main' into feat/orphan-files
jayceslesar Jun 10, 2025
aa9d536
Merge branch 'main' into feat/orphan-files
jayceslesar Jun 21, 2025
b4c14fc
fix test
jayceslesar Jun 21, 2025
f4d98d2
allow older_than to be None
jayceslesar Jun 21, 2025
acd8ed6
Merge branch 'main' into feat/orphan-files
jayceslesar Jul 13, 2025
2a9c607
add partition statistics
jayceslesar Jul 13, 2025
aae92bc
safer
jayceslesar Jul 13, 2025
756e199
Merge branch 'main' into feat/orphan-files
jayceslesar Aug 5, 2025
ad5387a
work with both file IO's
jayceslesar Aug 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
# under the License.
from __future__ import annotations

import contextlib
import itertools
import logging
import os
import uuid
import warnings
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import timedelta
from functools import cached_property
from itertools import chain
from types import TracebackType
Expand Down Expand Up @@ -150,6 +153,8 @@

from pyiceberg.catalog import Catalog

logger = logging.getLogger(__name__)

ALWAYS_TRUE = AlwaysTrue()
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write"

Expand Down Expand Up @@ -1371,6 +1376,28 @@ def to_polars(self) -> pl.LazyFrame:

return pl.scan_iceberg(self)

def delete_orphaned_files(self, older_than: Optional[timedelta] = timedelta(days=3), dry_run: bool = False) -> None:
"""Delete orphaned files in the table."""
location = self.location()
orphaned_files = self.inspect.orphaned_files(location, older_than)
logger.info(f"Found {len(orphaned_files)} orphaned files at {location}!")

def _delete(file: str) -> None:
# don't error if the file doesn't exist
# still catch ctrl-c, etc.
with contextlib.suppress(Exception):
self.io.delete(file)

if orphaned_files:
if dry_run:
logger.info(f"(Dry Run) Deleted {len(orphaned_files)} orphaned files at {location}!")
else:
executor = ExecutorFactory.get_or_create()
deletes = executor.map(_delete, orphaned_files)
# exhaust
list(deletes)
logger.info(f"Deleted {len(orphaned_files)} orphaned files at {location}!")


class StaticTable(Table):
"""Load a table directly from a metadata file (i.e., without using a catalog)."""
Expand Down
50 changes: 46 additions & 4 deletions pyiceberg/table/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
# under the License.
from __future__ import annotations

from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple
from datetime import datetime, timedelta, timezone
from functools import reduce
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple, Union, cast

from pyiceberg.conversions import from_bytes
from pyiceberg.io import _parse_location
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, PartitionFieldSummary
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.table.snapshots import Snapshot, ancestors_of
Expand Down Expand Up @@ -645,10 +647,16 @@ def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES})

def all_manifests(self) -> "pa.Table":
def all_manifests(self, snapshots: Optional[Union[list[Snapshot], list[int]]] = None) -> "pa.Table":
import pyarrow as pa

snapshots = self.tbl.snapshots()
# coerce into snapshot objects if users passes in snapshot ids
if snapshots is not None:
if isinstance(snapshots[0], int):
snapshots = cast(list[Snapshot], [self.tbl.metadata.snapshot_by_id(snapshot_id) for snapshot_id in snapshots])
else:
snapshots = self.tbl.snapshots()

if not snapshots:
return pa.Table.from_pylist([], schema=self._get_all_manifests_schema())

Expand All @@ -657,3 +665,37 @@ def all_manifests(self) -> "pa.Table":
lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots]
)
return pa.concat_tables(manifests_by_snapshots)

def orphaned_files(self, location: str, older_than: Optional[timedelta] = timedelta(days=3)) -> Set[str]:
try:
import pyarrow as pa # noqa: F401
except ModuleNotFoundError as e:
raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e

from pyarrow.fs import FileSelector, FileType

from pyiceberg.io.pyarrow import _fs_from_file_path

all_known_files = set()
snapshots = self.tbl.snapshots()
manifests_paths = self.all_manifests(snapshots)["path"].to_pylist()
all_known_files.update(manifests_paths)

executor = ExecutorFactory.get_or_create()
files_by_snapshots: Iterator[Set[str]] = executor.map(
lambda snapshot_id: set(self.files(snapshot_id)["file_path"].to_pylist())
)
datafile_paths: set[str] = reduce(set.union, files_by_snapshots, set())
all_known_files.update(datafile_paths)

fs = _fs_from_file_path(self.tbl.io, location)

_, _, path = _parse_location(location)
selector = FileSelector(path, recursive=True)
# filter to just files as it may return directories, and filter on time
as_of = datetime.now(timezone.utc) - older_than if older_than else None
all_files = [f for f in fs.get_file_info(selector) if f.type == FileType.File and (as_of is None or (f.mtime < as_of))]

orphaned_files = set(all_files).difference(all_known_files)

return orphaned_files
117 changes: 117 additions & 0 deletions tests/table/test_delete_orphans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
from datetime import datetime, timedelta
from pathlib import Path, PosixPath
from unittest.mock import PropertyMock, patch

import pyarrow as pa
import pytest

from pyiceberg.catalog import Catalog
from pyiceberg.schema import Schema
from pyiceberg.types import IntegerType, NestedField, StringType
from tests.catalog.test_base import InMemoryCatalog


@pytest.fixture
def catalog(tmp_path: PosixPath) -> InMemoryCatalog:
catalog = InMemoryCatalog("test.in_memory.catalog", warehouse=tmp_path.absolute().as_posix())
catalog.create_namespace("default")
return catalog


def test_delete_orphaned_files(catalog: Catalog) -> None:
identifier = "default.test_delete_orphaned_files"

schema = Schema(
NestedField(1, "city", StringType(), required=True),
NestedField(2, "inhabitants", IntegerType(), required=True),
# Mark City as the identifier field, also known as the primary-key
identifier_field_ids=[1],
)

tbl = catalog.create_table(identifier, schema=schema)

arrow_schema = pa.schema(
[
pa.field("city", pa.string(), nullable=False),
pa.field("inhabitants", pa.int32(), nullable=False),
]
)

df = pa.Table.from_pylist(
[
{"city": "Drachten", "inhabitants": 45019},
{"city": "Drachten", "inhabitants": 45019},
],
schema=arrow_schema,
)
tbl.append(df)

orphaned_file = Path(tbl.location()) / "orphan.txt"

orphaned_file.touch()
assert orphaned_file.exists()

# assert no files deleted if dry run...
tbl.delete_orphaned_files(dry_run=True)
assert orphaned_file.exists()

# should not delete because it was just created...
tbl.delete_orphaned_files()
assert orphaned_file.exists()

# modify creation date to be older than 3 days
five_days_ago = (datetime.now() - timedelta(days=5)).timestamp()
os.utime(orphaned_file, (five_days_ago, five_days_ago))


def test_delete_orphaned_files_with_invalid_file_doesnt_error(catalog: Catalog) -> None:
identifier = "default.test_delete_orphaned_files"

schema = Schema(
NestedField(1, "city", StringType(), required=True),
NestedField(2, "inhabitants", IntegerType(), required=True),
# Mark City as the identifier field, also known as the primary-key
identifier_field_ids=[1],
)

tbl = catalog.create_table(identifier, schema=schema)

arrow_schema = pa.schema(
[
pa.field("city", pa.string(), nullable=False),
pa.field("inhabitants", pa.int32(), nullable=False),
]
)

df = pa.Table.from_pylist(
[
{"city": "Drachten", "inhabitants": 45019},
{"city": "Drachten", "inhabitants": 45019},
],
schema=arrow_schema,
)
tbl.append(df)

file_that_does_not_exist = "foo/bar.baz"
with patch.object(type(tbl), "inspect", new_callable=PropertyMock) as mock_inspect:
mock_inspect.return_value.orphaned_files = lambda location, older_than: {file_that_does_not_exist}
with patch.object(tbl.io, "delete", wraps=tbl.io.delete) as mock_delete:
tbl.delete_orphaned_files()
mock_delete.assert_called_with(file_that_does_not_exist)