-
Notifications
You must be signed in to change notification settings - Fork 281
feat: delete orphaned files #1958
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this @jayceslesar, sorry for the late review.
I think this is a great start, I left some comments, let me know what you think!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @jayceslesar, using InpsectTable
to get orphaned files to submit to the executor pool is a nice idea! Just some concerns / suggestions / debugging help 😄
pyiceberg/table/inspect.py
Outdated
|
||
from pyiceberg.io.pyarrow import _fs_from_file_path | ||
|
||
all_known_files = set() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also want to have manifest list files here (I don't see them now). Otherwise, they'll be removed by the procedure and the table will be "corrupted".
(Related: when looking at Java tests, I noticed apache/iceberg#12957)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same goes for the current metadata JSON file, and I think to match Java behaviour we want to include all files in the metadata log of the current metadata file too.
I think there are more files we might be missing - I think tests would be nice to make sure we're not missing something! (Perhaps inspiration can be taken from the Java ones)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see! I just pushed a change that will capture those, as well as the statistic file paths
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! I added a few comments. ptal :)
pyiceberg/table/__init__.py
Outdated
deletes = executor.map(_delete, orphaned_files) | ||
# exhaust | ||
list(deletes) | ||
logger.info(f"Deleted {len(orphaned_files)} orphaned files at {location}!") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this might not necessary be always true, esp when _delete errors are suppressed.
what we do count the number of successfully deletes here? maybe _delete
can return True/False
for whether the delete was successful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the spark procedure outputs the orphan_file_location
which are all the files set to be deleted. this is pretty useful for logging
https://iceberg.apache.org/docs/nightly/spark-procedures/#output_7
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just modified, let me know!
pyiceberg/table/inspect.py
Outdated
|
||
return _all_known_files | ||
|
||
def orphaned_files(self, location: str, older_than: Optional[timedelta] = timedelta(days=3)) -> Set[str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we expose this as a public function given that there's no equivalent from java/spark side? we modeled the inspect
tables based on java's metadata tables.
maybe we can change this to _orphaned_files
for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this still need to be addressed now that this is under a new namespace?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes :) The less we expose publicly, the easier it is to evolve the API in the future (otherwise we have to go through a deprecation cycle).
a meta question, wydt of moving the orphan file function to its own file/namespace, similar to how to use i like the idea of having all the table maintenance functions together, similar to delta table's optimize |
I think that makes sense -- would #1880 end up there too? Also ideally there is a CLI that exposes all the maintenance actions too right? I think moving things to a new |
That's a good point. However, I think we should be able to either run them separate as well. For example, delete orphan files won't affect the speed of the table, so it is more of a maintenance feature to reduce object storage costs. Delete orphan files can also be pretty costly because of the list operation, ideally you would delegate this to the catalog that uses, for example, s3 inventory. |
@@ -1023,6 +1024,15 @@ def inspect(self) -> InspectTable: | |||
""" | |||
return InspectTable(self) | |||
|
|||
@property | |||
def optimize(self) -> OptimizeTable: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned before, I don't think deleting orphan files will optimize the table. Since Iceberg does not list any prefixes, there won't be any improvements (such as listing fewer objects). How about changing this to maintenance
?
# coerce into snapshot objects if users passes in snapshot ids | ||
if snapshots is not None: | ||
if isinstance(snapshots[0], int): | ||
snapshots = cast(list[Snapshot], [self.tbl.metadata.snapshot_by_id(snapshot_id) for snapshot_id in snapshots]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this cast is the correct way to fix this, since it returns a List[Optional[Snapshot]]
, so this might cause issues when you push snapshot-IDs that do not exist. Instead we should filter out the None
's.
@@ -678,6 +685,28 @@ def all_manifests(self) -> "pa.Table": | |||
) | |||
return pa.concat_tables(manifests_by_snapshots) | |||
|
|||
def _all_known_files(self) -> dict[str, set[str]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this now we have all_files
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think that all_files
includes manifest lists or statistics files right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should reference deletes, but thinking of it, this is probably much more efficient because it does not pull in all the other data.
except ModuleNotFoundError as e: | ||
raise ModuleNotFoundError("For metadata operations PyArrow needs to be installed") from e | ||
|
||
def orphaned_files(self, location: str, older_than: timedelta = timedelta(days=3)) -> Set[str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to expose both oprhaned_files
and remove_ophaned_files
to the public? How about making this one private:
def orphaned_files(self, location: str, older_than: timedelta = timedelta(days=3)) -> Set[str]: | |
def _orphaned_files(self, location: str, older_than: timedelta = timedelta(days=3)) -> Set[str]: |
Closes #1200
Rationale for this change
Ability to do more table maintenance from pyiceberg (iceberg-python?)
Are these changes tested?
Added a test!
Are there any user-facing changes?
Yes, this is a new method on the
Table
class.