Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow: Avoid buffer-overflow by avoid doing a sort #1539

Closed
wants to merge 0 commits into from

Conversation

Fokko
Copy link
Contributor

@Fokko Fokko commented Jan 20, 2025

This was already being discussed back here: #208 (comment)

This PR changes from doing a sort, and then a single pass over the table to the approach where we determine the unique partition tuples filter on them individually.

Fixes #1491

Because the sort caused buffers to be joined where it would overflow in Arrow. I think this is an issue on the Arrow side, and it should automatically break up into smaller buffers. The combine_chunks method does this correctly.

Now:

0.42877754200890195
Run 1 took: 0.2507691659993725
Run 2 took: 0.24833179199777078
Run 3 took: 0.24401691700040828
Run 4 took: 0.2419595829996979
Average runtime of 0.28 seconds

Before:

Run 0 took: 1.0768639159941813
Run 1 took: 0.8784021250030492
Run 2 took: 0.8486490420036716
Run 3 took: 0.8614017910003895
Run 4 took: 0.8497851670108503
Average runtime of 0.9 seconds

So it comes with a nice speedup as well :)

Copy link
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM i left a few comments

pyiceberg/partitioning.py Outdated Show resolved Hide resolved
y = ["fixed_string"] * 30_000
tb = pa.chunked_array([y] * 10_000)
# Create pa.table
arrow_table = pa.table({"a": ta, "b": tb})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it wasnt obv to me that this test offset is beyond 32 bits, but i ran it and 4800280000 is >2^32/4294967296

>>> len(arrow_table)
300000000
>>> arrow_table.get_total_buffer_size()
4800280000

tests/benchmark/test_benchmark.py Outdated Show resolved Hide resolved
pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
@Fokko Fokko force-pushed the fd-fix-overflowing-buffer branch from 04a8218 to 3841fe7 Compare January 20, 2025 19:15
Comment on lines 418 to 419
# When adding files, it can be that we still need to convert from logical types to physical types
iceberg_typed_value = _to_partition_representation(iceberg_type, value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this due to the fact that we already transform the partition key value

partition.transform.pyarrow_transform(source_field.field_type)(arrow_table[source_field.name])

and this expects the untransformed value?

if thats the case, can we just omit the transformation before the group_by?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, of course. We want to know the output tuples after the transform, so omitting the transformation is not possible. I think we could do a follow-up PR where we split out the logic for the write path, and the add-files path. Since after this PR, this is not needed when doing partitioned writes, we just need it to preprocess when importing partitions.

@Fokko Fokko closed this Jan 21, 2025
@Fokko Fokko force-pushed the fd-fix-overflowing-buffer branch from 3841fe7 to c84dd8d Compare January 21, 2025 14:02
@Fokko
Copy link
Contributor Author

Fokko commented Jan 21, 2025

Ugh, accidentally pushed main 🤦

@bigluck
Copy link
Contributor

bigluck commented Jan 21, 2025

:'(

Fokko added a commit that referenced this pull request Jan 23, 2025
Second attempt of #1539

This was already being discussed back here:
#208 (comment)

This PR changes from doing a sort, and then a single pass over the table
to the approach where we determine the unique partition tuples filter on
them individually.

Fixes #1491

Because the sort caused buffers to be joined where it would overflow in
Arrow. I think this is an issue on the Arrow side, and it should
automatically break up into smaller buffers. The `combine_chunks` method
does this correctly.

Now:

```
0.42877754200890195
Run 1 took: 0.2507691659993725
Run 2 took: 0.24833179199777078
Run 3 took: 0.24401691700040828
Run 4 took: 0.2419595829996979
Average runtime of 0.28 seconds
```

Before:

```
Run 0 took: 1.0768639159941813
Run 1 took: 0.8784021250030492
Run 2 took: 0.8486490420036716
Run 3 took: 0.8614017910003895
Run 4 took: 0.8497851670108503
Average runtime of 0.9 seconds
```

So it comes with a nice speedup as well :)

---------

Co-authored-by: Kevin Liu <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] Error in overwrite(): pyarrow.lib.ArrowInvalid: offset overflow with large dataset (~3M rows)
3 participants