Skip to content

Stream RecordBatches in chunks instead of single batch #20

@jayendra13

Description

@jayendra13

Summary

ZarrExec currently materializes the entire query result as a single RecordBatch before returning it to DataFusion. For large Cartesian products (e.g., 365 time steps × 100 lat × 100 lon = 3.65M rows), this means the full dataset must fit in memory at once. The execution should yield multiple smaller batches to enable streaming and reduce peak memory.

Current Behavior

All three execution paths in src/physical_plan/zarr_exec.rs follow the same pattern:

// zarr_exec.rs:305 (remote), :379 (virtualizarr), :445 (virtualizarr+adapter)
let batches: Vec<RecordBatch> = result_stream.try_collect().await?;
Ok::<_, DataFusionError>(stream::iter(batches.into_iter().map(Ok)))

The local sync path (execute_local, line ~326) reads everything into a single RecordBatch via read_zarr() and wraps it in a one-element stream.

This means:

  1. Memory: Peak memory = full result size (no streaming/backpressure)
  2. Latency: First row is only available after ALL rows are read
  3. LIMIT inefficiency: Even with limit pushdown, the batch is fully materialized before slicing

Proposed Change

Chunk the Cartesian product expansion into batches of ~8,192 rows:

  1. In read_zarr() / read_zarr_async(): Instead of building one giant RecordBatch, yield batches by iterating over row ranges [0..8192), [8192..16384), ...
  2. In ZarrExec::execute(): Return the chunked stream directly without .try_collect()
  3. Coordinate keys: Compute per-batch using the existing row-major formula (the formula in create_coord_dictionary_typed already takes total_rows — change it to accept a row range)
  4. Data variables: Slice the nD array per batch rather than loading the full flattened column

Batch size considerations

  • 8,192 rows is a common DataFusion default (batch_size config)
  • Arrow's memory allocator works well with predictable batch sizes
  • Smaller batches enable DataFusion's pipeline execution (hash joins, aggregations start before full scan)

Impact

  • Memory: O(batch_size) instead of O(total_rows) for the Cartesian product
  • Latency: First batch available after reading ~8K rows worth of data
  • LIMIT: LIMIT 10 on a 3.65M row result now reads ≤8,192 rows instead of 3.65M
  • DataFusion integration: Enables proper pipeline execution and backpressure

Files to Modify

  • src/reader/zarr_reader.rsread_zarr() and read_zarr_async(): return impl Stream<Item = Result<RecordBatch>> instead of single batch
  • src/reader/coord.rscreate_coord_dictionary_typed(): accept row offset + batch length
  • src/physical_plan/zarr_exec.rs — Remove .try_collect() pattern, pass stream through directly

Motivation

Inspired by Vortex's ChunkedArray which enables streaming output with constant memory. DataFusion's execution model is designed for streaming batches — the current single-batch approach defeats pipeline execution, backpressure, and early termination.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions