Skip to content

arrow_scan ignores explicit PyArrow schema #163

@kaijennissen

Description

@kaijennissen

What happens?

We query an unmaterialized PyArrow dataset (Hive-partitioned CSV files) with an explicit schema using DuckDB. The dataset remains unmaterialized because it may be larger than available memory. After applying filters and aggregations, we convert the result back to Arrow with .arrow(). At this point, the query fails with a type conversion error for a column that is explicitly defined as string in the schema but contains many NULL values followed by non-NULL string values.

Invalid Input Error: arrow_scan: get_next failed(): Invalid: In CSV column #2: Row #48522: CSV conversion error to null: invalid value 'STRING_27548395'

Important: We were unable to create a simpler test case. It seems the bug requires the large number of files with variable row counts to reproduce consistently.

Key observations:

  • When we change only the row counts to be fixed (10,000 per file), keeping everything else identical, the query succeeds
  • When we materialize the dataset before querying (dataset.to_table()), the query succeeds

Tested with
DuckDB Version 1.4.1, 1.4.0, 1.3.2
Pyarrow Version 21.0.0, 22.0.0

To Reproduce

import random
import tempfile
from pathlib import Path

import duckdb
import pyarrow as pa
import pyarrow.csv as csv
import pyarrow.dataset as ds

NUM_STORES = 10
NUM_YEARWEEKS = 52
YEAR_START = 2024
FILES_PER_PARTITION = 4
MIN_ROWS_PER_FILE = 2_000
MAX_ROWS_PER_FILE = 60_000
NULL_ROWS_PERCENTAGE = 0.85
PROBLEMATIC_PARTITION_INDICES = list(range(0, NUM_STORES * NUM_YEARWEEKS, 10))


def create_dataset():
    """Create Hive-partitioned CSV dataset with variable row counts."""
    temp_dir = Path(tempfile.mkdtemp())

    stores = [f"{i:03d}" for i in range(1, NUM_STORES + 1)]
    yearweeks = [YEAR_START * 100 + week for week in range(1, NUM_YEARWEEKS + 1)]

    total_rows = 0
    partition_index = 0

    for store in stores:
        for yearweek in yearweeks:
            partition_dir = temp_dir / f"store={store}" / f"yearweek={yearweek}"
            partition_dir.mkdir(parents=True, exist_ok=True)

            has_null_problem = partition_index in PROBLEMATIC_PARTITION_INDICES

            for file_idx in range(FILES_PER_PARTITION):
                csv_file = partition_dir / f"data_{file_idx:03d}.csv"
                num_rows = random.randint(MIN_ROWS_PER_FILE, MAX_ROWS_PER_FILE)
                null_rows = int(num_rows * NULL_ROWS_PERCENTAGE)

                with open(csv_file, "w") as f:
                    f.write("id;col_a;col_b\n")
                    base_id = total_rows

                    if has_null_problem and file_idx == 0:
                        for i in range(num_rows):
                            f.write(f"{base_id + i};value_{base_id + i};\n")
                    elif has_null_problem and file_idx == 1:
                        for i in range(null_rows):
                            f.write(f"{base_id + i};value_{base_id + i};\n")
                        for i in range(null_rows, num_rows):
                            f.write(
                                f"{base_id + i};value_{base_id + i};STRING_{base_id + i}\n"
                            )
                    else:
                        for i in range(num_rows):
                            f.write(
                                f"{base_id + i};value_{base_id + i};STRING_{base_id + i}\n"
                            )

                total_rows += num_rows

            partition_index += 1

    return temp_dir


def cleanup(temp_dir):
    """Delete dataset directory."""
    for path in temp_dir.rglob("*"):
        if path.is_file():
            path.unlink()
    for path in sorted(temp_dir.rglob("*"), reverse=True):
        if path.is_dir():
            path.rmdir()
    temp_dir.rmdir()


def main():
    random.seed(42)

    temp_dir = create_dataset()

    schema = pa.schema(
        [
            pa.field("id", pa.int64()),
            pa.field("col_b", pa.string()),
            pa.field("col_b", pa.string()), # problematic column
            pa.field("store", pa.string()),
            pa.field("yearweek", pa.int64()),
        ]
    )

    try:
        # Create PyArrow dataset
        dataset = ds.dataset(
            str(temp_dir),
            format=ds.CsvFileFormat(parse_options=csv.ParseOptions(delimiter=";")),
            partitioning="hive",
            schema=schema,
        )
        # Uncomment and the error disappears
        # dataset = dataset.to_table()
        con = duckdb.connect()
        result = con.execute(
            """
            SELECT * FROM dataset
            WHERE store = '005'
              AND yearweek = (SELECT MAX(yearweek) FROM dataset WHERE store = '005')
            """
        ).arrow()

    except Exception as e:
        raise Exception(f"BUG REPRODUCED: {type(e).__name__}\n{e}")
    finally:
        cleanup(temp_dir)


if __name__ == "__main__":
    main()
Invalid Input Error: arrow_scan: get_next failed(): Invalid: In CSV column #2: Row #48522: CSV conversion error to null: invalid value 'STRING_27548395'

OS:

macOS 15.6.1

DuckDB Package Version:

1.4.1

Python Version:

3.11.12

Full Name:

Kai Jennissen

Affiliation:

Dohle Handelsgruppe

What is the latest build you tested with? If possible, we recommend testing with the latest nightly build.

I have tested with a stable release

Did you include all relevant data sets for reproducing the issue?

Yes

Did you include all code required to reproduce the issue?

  • Yes, I have

Did you include all relevant configuration to reproduce the issue?

  • Yes, I have

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions