Skip to content

feat: add RollingManifestWriter #650

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

felixscherz
Copy link
Contributor

Hi, this is in regards to #596 and still WIP.

The RollingManifestWriter implementation closely follows the java implementation.

It takes in a generator that produces ManifestWriter objects and rolls over to a new one once either the number of rows appended or the file size in bytes exceeds the target value.

It's not finished as of yet, I am still trying to find a good way to access the current file from the underlying reader. I tried to obtain that information from the ManifestWriter._writer.output_stream object, but that is write-only.

Any pointers on how to access the current file size of the manifest writer would help me a lot:)

@felixscherz felixscherz force-pushed the feat/rolling-manifest-writer branch 2 times, most recently from 4f45b47 to f558e34 Compare April 23, 2024 16:21
@geruh
Copy link
Contributor

geruh commented Apr 23, 2024

I believe in the Java implementation we have a concept of a PositionOutputStream which is used to keep track of bytes written to each file with a position/counter. What we can do here is extend the current OutputFile, and OutputStream implementations to account for this and roll each based on that position.

For instance, the ManifestWriter can use the AvroFileAppender which calls it's length() method to determine the size. But this length method is calling the storedLength() method in PositionOutputStream.

https://github.com/apache/iceberg/blob/866021d7d34f274349ce7de1f29d113395e7f28c/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java#L83

@felixscherz
Copy link
Contributor Author

Sounds good, I will have a look at the implementation and make a suggestion. Thank you!

@felixscherz felixscherz force-pushed the feat/rolling-manifest-writer branch from f558e34 to ee63925 Compare May 4, 2024 11:09
@felixscherz
Copy link
Contributor Author

Hi, I finally had some time to continue working on this.

Based on your suggestions @geruh I added a tell method to the OutputStream protocol that returns the number of bytes written to the stream.
I then added __len__ to the AvroOutputFile which calls out to either OutputFile or OutputStream to get the number of bytes written, depending on whether the stream is closed or not.
Finally I extended ManifestWriter with a __len__ method that calls AvroOutputFile.

I initially tried to extend OutputStream with __len__ until I realized that both FileIO implementations fsspec and pyarrow offer OutputStream implementations that implement the tell method while neither supports __len__.

If we wanted to go with __len__ instead of simply using tell we might have to implement custom FsspecOutputStream and PyarrowOutputStream classes that implement __len__. This might well be the cleaner approach but introduce a bit more abstraction.

What do you think?

@felixscherz felixscherz force-pushed the feat/rolling-manifest-writer branch 2 times, most recently from 42285e3 to bdb8d2d Compare May 7, 2024 16:06
@kevinjqliu kevinjqliu mentioned this pull request May 14, 2024
39 tasks
@felixscherz felixscherz force-pushed the feat/rolling-manifest-writer branch from bdb8d2d to da96ced Compare May 16, 2024 06:28
@Fokko Fokko mentioned this pull request May 27, 2024
@felixscherz felixscherz force-pushed the feat/rolling-manifest-writer branch from da96ced to f34d9f9 Compare July 6, 2024 12:17
self._current_file_rows = 0

def to_manifest_files(self) -> list[ManifestFile]:
self._close_current_writer()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the same pattern as in Java, where the to_manifest_files call expects the writer to be closed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it to raise a RuntimeError if the writer is not closed, similar to how trying to add an entry to a closed writer raises a RuntimeError.

traceback: Optional[TracebackType],
) -> None:
self.closed = True
if self._current_writer:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not re-use _close_current_writer here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I changed it to use _close_current_writer

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@felixscherz Sorry for the late reply here. It looks like the formatting is a bit off, could you check that one?

@felixscherz
Copy link
Contributor Author

@Fokko Thanks for taking a look! Sorry about the formatting, should be fixed now:)

@felixscherz felixscherz force-pushed the feat/rolling-manifest-writer branch 3 times, most recently from e1893a0 to 869ea57 Compare July 17, 2024 11:14
@felixscherz felixscherz force-pushed the feat/rolling-manifest-writer branch from 869ea57 to 9f01e5a Compare August 5, 2024 16:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants