-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark: Action to remove missing files #12106
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this @wypoon ! I went through for my own education and left some comments mostly related to the introduced API.
* <p>When committing, these changes will be applied to the latest table snapshot. Commit conflicts | ||
* will be resolved by applying the changes to the new latest snapshot and reattempting the commit. | ||
*/ | ||
public interface RemoveMissingFiles extends SnapshotUpdate<RemoveMissingFiles> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially I didn't understand why you needed this new API and why you didn't use the existing DeleteFiles API. I think I get the motivation is. Is this because DeleteFiles can only delete data files and not delete files?
Even now understanding the motivation I feel confused about this new API for the following reasons:
- It partially overlaps with the DeleteFiles API. I mean the part when we delete data files.
- The name might be misleading: it says "MissingFiles" but in fact it does nothing special wrt the files being missing or not. It just removes files from the table metadata.
- Not sure how one could use this API without using the new Spark action. (see my comment below).
I probably miss enough experience on this area but these might be some options we have here (just throwing some ideas):
- If the purpose of this new API is to provide ability to remove DeleteFiles then we might want to re-visit the existing DeleteFiles API to see if we can extend it. There probably is a reason why it doesn't give support for removing DeleteFiles, but would be nice to understand.
- If DeleteFiles API can't be changed to this purpose, then in order to use this API without the Spark action I think a single function is enough having a path parameter. DeleteFiles has the same too, we 'just' have to improve it to take care of delete files too if possible. With this approach the name of this API won't be correct, because it has nothing to do with missing files.
- This API could be smarter than just simply removing files from the table. Since it's called RemoveMissingFiles, it could also do the detection/collection of such missing files and then remove them. With this approach a 'RecoverTable' class/interface name might be more verbose having a 'removeMissingContentFiles' function.
- With the above approach, later on we would have the API to add further recovery functions to recover from missing metadata.jsons, or missing manifest or snapshot files.
* @param file a DataFile to remove from the table | ||
* @return this for method chaining | ||
*/ | ||
RemoveMissingFiles deleteFile(DataFile file); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking of how this interface would be used if not from the introduced Spark action. For instance when some user sees that their table can't be loaded due to a missing file, how would they use this API to fix the table? (Assuming they don't have Spark but they can use the Java API)?
They know a file path from the error message, but should they then figure out if it's a data or delete file? It's probably possible but requires an extra manual step. And then they have to create a DataFile/DeleteFile object somehow so that they can call this API. This seems more problematic, and adds another manual step.
I think if users want to use this API to fix tables but not using the Spark action, then the API should be rather something like this:
RemoveMissingFiles deleteFile(CharSequence path);
Similarly to what DeleteFiles does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation of DeleteFiles::deleteFile(CharSequence)
calls MergingSnapshotProducer::deleteFile(CharSequence)
:
protected void delete(CharSequence path) {
// this is an old call that never worked for delete files and can only be used to remove data
// files.
filterManager.delete(path);
}
It cannot be used to remove delete files, only data files.
It is better not to call delete(CharSequence)
, where you have no guarantees on what kind of file the path is for. It is better to call only delete(DataFile)
and delete(DeleteFile)
. Therefore it makes sense to have that as the API.
* | ||
* @return this for method chaining | ||
*/ | ||
RemoveMissingFiles validateFilesExist(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of this API is to drop missing files. I don't think it makes sense to validate existence of such files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not validate if the files to be removed exist in storage; it validates that they exist in the current metadata. It does make sense to call this.
* | ||
* @return a new {@link RemoveMissingFiles} | ||
*/ | ||
default RemoveMissingFiles newRemoveFiles() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name newRemoveFiles() might be confusing for users because there is already a newDelete() and I'm not sure how clear what the difference is.
package org.apache.iceberg; | ||
|
||
/** {@link RemoveMissingFiles} implementation. */ | ||
public class BaseRemoveFiles extends MergingSnapshotProducer<RemoveMissingFiles> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find a huge overlap between this class and StreamingDelete. I'm wondering if we can somehow avoid code duplication. Inheritance maybe?
return DataOperations.DELETE; | ||
} | ||
|
||
return DataOperations.OVERWRITE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for my understanding: Why is the operation an OVERWRITE if we delete DeleteFiles? Is it because there are going to be rows that were meant to be deleted that might re-appear again?
I checked the existing options and I don't think any of them applies comfortable to what we try to do here. Maybe introducing a new RECOVER
or something?
Well, another dilemma here is that is this class for purely removing files in general, or does it have anything to do with missing files and table recovery.
package org.apache.iceberg; | ||
|
||
/** {@link RemoveMissingFiles} implementation. */ | ||
public class BaseRemoveFiles extends MergingSnapshotProducer<RemoveMissingFiles> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parent class is RemoveMissingFiles while this one is BaseRemoveFiles. In the name we loose the information that this has something to do with missing files.
I feel the reason for this is that this patch can't decide either if this new interface is for handling missing files and table recovery in general, or it is only introduced because the existing DeleteFiles API doesn't remove DeleteFiles.
List<String> removedDeleteFiles = Lists.newArrayList(); | ||
|
||
for (DataFile f : dataFiles) { | ||
if (!fileIO.newInputFile(f.location()).exists()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These sequentially do an existence check on the storage for each of the data/delete files in the table. Not sure about how Spark actions do these in general (lack of experience), but I saw in various parts of the code that such storage operations on high volumes are performed using an ExecutorService in a parallel manner. Do you think it would make sense adding some parallelism here?
I'm wondering how a sequential effort performs in general on 10k, 100k etc number of files. If the time for these checks is negligible, then it's fine as it is. However, in object stores, this might be slow, not sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that this can be slow.
dataEntries
and deleteEntries
are DataFrame
s (Dataset<Row>
s) computed on executors; ideally, I'd like to filter them to entries for files that exist in storage. I don't recall the exact issue I ran into, but I had tried something earlier along that line and ran into some Iceberg classes not being serializable so I could not send the task to the executors. So I simply collected everything back to the driver and processed things there. Let me revisit that.
Calling the API to remove the missing files has to be done on the driver though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have reworked this. The file existence is checked in executors now. I didn't know about org.apache.iceberg.SerializableTable
and its subclass org.apache.iceberg.spark.source.SerializableTableWithSize
before.
I am open to better naming for the new API.
and StreamingDelete::operation():
Removing delete files will undelete deleted rows, i.e., add data. When data is both deleted and added, the data operation is |
... and add a negative test.
In case data and/or delete files are inadvertently deleted from the storage, an Iceberg table becomes unreadable.
We provide a Spark action for "repairing" such a table, by removing the missing files from the metadata.