-
Notifications
You must be signed in to change notification settings - Fork 207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bin-pack Writes Operation into multiple parquet files, and parallelize writing WriteTask
s
#444
Conversation
pyiceberg/io/pyarrow.py
Outdated
) | ||
return iter([data_file]) | ||
splits = tbl.to_batches() | ||
target_weight = 2 << 27 # 256 MB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Java we have the write.target-file-size-bytes
configuration. In this case, we're looking at the size in memory, and not the file size. Converting this is very tricky since Parquet has some excellent encodings to reduce the size on disk. We might want to check the heuristic on the Java side. On the other end, we also don't want to explode the memory when decoding a Parquet file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is done in Java like so #428 (comment)
Write is done row by row and on every 1000 rows, the file size is checked against the desired size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and the write.target-file-size-bytes
configuration is just a heuristic to achieve, not the absolute size of the result file.
Based on this comment, it seems that even in Spark result parquet files can be smaller than the target file size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, I propose we reuse the write.target-file-size-bytes
option and default to 512MB of arrow size in memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's a test run when we bin-packed 685.46 MB of arrow memory into 256MB chunks. We ended up with 3 ~80MB parquet files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking great @kevinjqliu! 🙌 Let me know when it is out of draft, and I'll give it a spin.
bfbc6c4
to
f632c22
Compare
@Fokko PRs ready for review. Please give it a try. I've linked an example notebook in the PR description. I've also noticed that writing one RecordBatch at a time seems to be less ideal than converting to Table and then writing the Table. Still exploring the different options here |
WriteTask
s
We rely on Updated the PR to use table size and row size heuristics to chunk the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @kevinjqliu for working on this and @bigluck for testing! This looks great! I left some minor comments. Could you please rebase this PR to main? Thanks!
ad54850
to
7123b9f
Compare
7123b9f
to
0b41791
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for adding additional tests.
0b41791
to
5ceb80d
Compare
5ceb80d
to
d80054d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good @kevinjqliu
Although we never know what the actual target size will be (check out this blog), I think it is good to chunk since we also don't want to have very efficient encoded files that blow up the memory. Thanks for working on this!
Could you do the docs exposing describing the properties in a separate PR?
@kevinjqliu can you fix the merge conflict? I'll merge right after that |
@Fokko resolved merge conflict. Please take another look. I've also updated the description to show the results of parallelized writes |
@kevinjqliu Thanks for adding the examples. I think in general we want to have slightly bigger files. A simple heuristic I can think of is that we put an upper bound on the number of files, equal to the number of threads. This way we still get decent parallelization, but avoid creating many small files (and avoid the overhead of opening new files). We can do this in a separate PR. |
This PR bin-packs write operations into multiple parquet files when necessary, for both
append
andoverwrite
.Bin-packing is determined by the
write.target-file-size-bytes
config (WRITE_TARGET_FILE_SIZE_BYTES
) and defaults to 512 MB (WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
). Bin-packing is determined based on the size of the arrow dataframe in memory, the resulting parquet files might be smaller than the target size due to parquet's compression.This PR also adds the ability to parallelize writing
WriteTask
s for #428. To use parallelism, set envPYICEBERG_MAX_WORKERS
(docs).Or in jupyter notebook,
Results
Reading a 6.7GB, 10M row arrow file and writing with pyiceberg
Without parallelized writes (using pyiceberg 0.6)
With parallelize writes (this branch)
Here's a Jupyter notebook of the test:
https://colab.research.google.com/drive/1oLYXPNisjQ0W3cwUe3e8w1KNVgizwRB_?usp=sharing