Skip to content

DictionaryKeyOverflowError on DataFrame.write_parquet #17445

@valkum

Description

@valkum

Describe the bug

I tried to come up with a repro case for a slow query (compared to polars) and encountered this bug. As the error is originating in https://github.com/apache/arrow-rs, I assume this is more of a tracking issue, but it might also be caused by how datafusion calls parquet. I haven't looked for the root cause yet.

With a sample dataset of 10m rows, I run into the following issue:
pyo3_runtime.PanicException: MutableArrayData::new is infallible: DictionaryKeyOverflowError

What this does is it takes an unnested schema, nests it and aggregates the newly nested struct into a list.

This does not happen for 1m rows. The input parquet file only contains a Dict with 10 keys. I am not sure why this blows up. When nesting the dict key, the same dict can be reused.

To Reproduce

For a repro case see here https://github.com/valkum/polars-datafusion-comparison/tree/datafusion_bug

Expected behavior

DataFrame.write_parquet should succeed.

Additional context

Failing plan:

DataFrame()
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: df.name, df.group, CASE WHEN array_element(markets, Int64(1)) IS NOT NULL THEN markets ELSE List() END AS CASE WHEN markets[Int64(1)] IS NOT NULL THEN markets ELSE NULL END                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|               |   Aggregate: groupBy=[[df.name, df.group]], aggr=[[array_agg(market) AS markets]]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|               |     Projection: df.name, df.group, CASE WHEN df.market IS NOT NULL AND df.price IS NOT NULL THEN named_struct(Utf8("market"), CAST(df.market AS Dictionary(UInt16, Utf8)), Utf8("price"), df.price) ELSE Struct({market:,price:}) END AS market                                                                                                                                                                                                                                                                                                                                                                                            |
|               |       TableScan: df projection=[name, group, market, price]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| physical_plan | ProjectionExec: expr=[name@0 as name, group@1 as group, CASE WHEN array_element(markets@2, 1) IS NOT NULL THEN markets@2 END as CASE WHEN markets[Int64(1)] IS NOT NULL THEN markets ELSE NULL END]                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|               |   AggregateExec: mode=FinalPartitioned, gby=[name@0 as name, group@1 as group], aggr=[markets], ordering_mode=Sorted                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |     SortExec: expr=[name@0 ASC NULLS LAST, group@1 ASC NULLS LAST], preserve_partitioning=[true]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|               |       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|               |         RepartitionExec: partitioning=Hash([name@0, group@1], 10), input_partitions=10                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |           AggregateExec: mode=Partial, gby=[name@0 as name, group@1 as group], aggr=[markets], ordering_mode=Sorted                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|               |             ProjectionExec: expr=[name@0 as name, group@1 as group, CASE WHEN market@2 IS NOT NULL AND price@3 IS NOT NULL THEN named_struct(market, CAST(market@2 AS Dictionary(UInt16, Utf8)), price, price@3) END as market]                                                                                                                                                                                                                                                                                                                                                                                                            |
|               |               DataSourceExec: file_groups={10 groups: [[Users/valkum/git/polars-datafusion-comparison/sample-10m.parquet:0..39293391], [Users/valkum/git/polars-datafusion-comparison/sample-10m.parquet:39293391..78586782], [Users/valkum/git/polars-datafusion-comparison/sample-10m.parquet:78586782..117880173], [Users/valkum/git/polars-datafusion-comparison/sample-10m.parquet:117880173..157173564], [Users/valkum/git/polars-datafusion-comparison/sample-10m.parquet:157173564..196466955], ...]}, projection=[name, group, market, price], output_ordering=[name@0 ASC NULLS LAST, group@1 ASC NULLS LAST], file_type=parquet |
|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions