Parallel bounded RANGE-frame window functions without PARTITION BY (draft)#23026
Draft
avantgardnerio wants to merge 12 commits into
Draft
Parallel bounded RANGE-frame window functions without PARTITION BY (draft)#23026avantgardnerio wants to merge 12 commits into
avantgardnerio wants to merge 12 commits into
Conversation
…E-frame windows Adds a new physical optimizer rule (run just before EnsureRequirements) that finds BoundedWindowAggExec nodes with a single ORDER BY column, no PARTITION BY, and a RANGE frame, then logs per-partition min/max on the order column via partition_statistics(). No transformation yet — this confirms that the per-input-partition Exact stats are available at the right spot in the pipeline to drive a future range-repartitioning step. Also adds a sqllogictest fixture (parallel_window.slt) with four scrambled parquet files, overlapping seq ranges, so the routing problem is non-trivial and stats remain Exact per partition. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Reduces per-input-partition Exact stats into a global (min, max) on the ORDER BY column, then splits that range into target_partitions equal-width buckets and logs the N-1 interior cut points. Int64-only for now to keep the boundary-math API tractable; types we don't handle log a skip message and fall through to today's plan. Still no plan transformation — boundaries are computed and printed only. With the test fixture (min=0, max=99, target_partitions=4) we now log: interior boundaries: [Int64(24), Int64(49), Int64(74)] Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Reads (halo_preceding, halo_following) from the window frame's start/end
bounds (Int64 only, finite values only). For each output partition, logs
its primary range [b_i, b_{i+1}) and the halo-expanded range
[b_i - halo_p, b_{i+1} + halo_f) — the latter is what the routing layer
will need to actually deliver per partition so the window frames at the
seams compute correctly.
Adds a TODO referencing a future HaloDropExec that sits above the window
per partition and drops rows outside the primary range, so each input row
surfaces in exactly one output partition.
With the test fixture (min=0, max=99, target_partitions=4,
RANGE BETWEEN 5 PRECEDING AND CURRENT ROW) we now log:
bucket 0: primary [0, 24) expanded [-5, 24)
bucket 1: primary [24, 49) expanded [19, 49)
bucket 2: primary [49, 74) expanded [44, 74)
bucket 3: primary [74, 100) expanded [69, 100)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
ExecutionPlan gains a default-impl runtime_statistics(partition) returning Pin<Box<dyn Future<Output = Result<Arc<Statistics>>> + Send>>. Default resolves immediately to partition_statistics(Some(partition)); pipeline- breaking operators (e.g. SortExec) will later override to complete the future only after relevant work has run. For the in-memory ExternalSorter path in SortExec, wrap the sorted output stream with an observer that captures first/last value of the leading sort column and logs the per-partition min/max once the stream ends. This is side-effect logging only — the trait override is not wired yet — but confirms the values we need for the upcoming RangeRepartitionExec are computable at runtime and match what plan-time stats reported. Updates parallel_window.slt to actually execute the query (in addition to EXPLAIN) so the sort path runs and the new log fires. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…t path Reshape the runtime hook to match the data: a `LexOrdering` (one or more sort exprs over arbitrary expressions, ASC/DESC, nulls first/last) doesn't align with `Statistics`'s per-column min/max. Replace the previous `runtime_statistics` trait method with `runtime_sort_extremes`, returning `Option<SortExtremes>` with `min`/`max: Vec<ScalarValue>` whose length matches the operator's output ordering. SortExec carries one `Arc<Mutex<Option<SortExtremes>>>` per output partition. The slot is populated by `ExternalSorter::sort_batch_stream` right after `sort_batch_chunked` produces sorted chunks — leading and trailing rows are the lex-smallest and lex-largest endpoints for that chunk, and `merge_chunk_into_slot` folds across chunks (path 3 spill case) using a small `SortOptions`-aware lex_compare. Zero double-sort, zero row iteration on our side. `runtime_sort_extremes` reads the slot synchronously: by the time downstream sees the first output batch, the slot is populated for that partition. Default impl on the trait returns `Ok(None)` so other operators opt in only when they have something to say. RangeRepartitionExec is rewritten as a pass-through that wraps each forwarded stream with a tiny one-shot observer: on the first batch yielded, call `child.runtime_sort_extremes(partition)` and log. Confirms end-to-end wiring; future commits replace the pass-through with real range routing. Adds a TODO in `evaluate_row` to switch from "evaluate over the whole batch, take one row" to a single-row slice/RowConverter once we care about the constant-factor cost. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace the per-stream LogOnFirstBatch wrapper with a single-instance
coordinator task that:
1. opens child.execute(k) for every input partition k,
2. pulls the first batch from each — enough to drive a pipeline-
breaking child sort to populate its SortExtremes slot,
3. reads child.runtime_sort_extremes(k) for each input,
4. lex-reduces the per-input extremes into one global SortExtremes
using the input's declared output_ordering (so descending /
nulls-first are honored), and logs it,
5. hands each input's (first_batch, remaining_stream) pair to the
corresponding output partition via a tokio::oneshot channel.
Output partition i returns a stream that awaits its handoff and emits
the buffered first batch followed by the remainder — so semantically
this is still a pass-through, but it now demonstrates the K-way fan-in
machinery the real routing impl needs.
Makes `lex_compare` pub in sort.rs so the reducer can reuse the small
SortOptions-aware comparator instead of duplicating it.
With the test fixture the log now reads:
RangeRepartitionExec: coordinator gathered 4 input partitions;
global extremes = Some(SortExtremes {
min: [Int64(0)], max: [Int64(99)], row_count: 100
})
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Boundary math is now consumed at runtime against the global SortExtremes the coordinator gathers — no more plan-time partition_statistics math in the optimizer rule. RangeRepartitionExec::new takes the halo distances from the optimizer rule (extracted from the window frame at plan time) and the coordinator combines them with the runtime global to compute interior cut points and per-bucket primary / expanded ranges. ParallelWindow shrinks to shape detection + halo extraction + plan rewrite. The previous probe_candidate plan-time logging is gone — the identical log lines now come out of RangeRepartitionExec at execution time, where they're computed from actual data rather than from possibly- inexact stats. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Coordinator's hand-off changes from "give output i my input partition i's remaining stream" to "give output i an mpsc receiver, route from every input into the per-bucket channel I own". For each input batch the router computes bucket b's expanded range `[boundaries[b-1] - halo_preceding, boundaries[b] + halo_following)` and sends the matching slice of the batch (via arrow::compute::take_arrays) to bucket b's channel. Bucket-driven loop — N takes per input batch. Routing is Int64-only by design (matches the optimizer rule's gate); non-Int64 leading keys propagate a clear error to every output stream instead of silently producing wrong data. SLT picks up a `count(rolling_sum)` assertion that exposes halo duplication: plain `count(*)` would be statistics-pruned from parquet's row count and never instantiate the window operator, so it wouldn't notice. With routing in place but no halo drop yet, the merged output has 115 rows = 100 + 15 halo duplicates (5 per boundary × 3 interior boundaries). HaloDropExec is the next commit and will bring this back to 100. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Move ParallelWindow rule ahead of EnsureRequirements so it owns the distribution decision instead of surgically undoing what EnsureRequirements inserts. - BoundedWindowAggExec gains a `parallel_aware: bool` and a with_parallel_aware() builder. When set, required_input_distribution() returns UnspecifiedDistribution instead of SinglePartition, so EnsureRequirements stops wrapping us in an SPM and collapsing back to one partition. - RangeRepartitionExec now takes a LexOrdering, declares required_input_ordering (Hard) on it, and maintains_input_order=true. This anchors the pipeline-breaking SortExec beneath us instead of letting EnsureRequirements push it above. - ParallelWindow builds BWAG(parallel_aware) -> RangeRepartitionExec directly; EnsureRequirements plants the per-partition SortExec on its next pass. Removed the descend-and-wrap helper, no longer needed. - SLT EXPLAIN updated to match the new (correct) plan shape. Outer SPM in the EXPLAIN is just the user-visible `ORDER BY ... LIMIT`, not BWAG distribution. count(rolling_sum)=115 still holds; the halo duplication is now real per-partition output rather than corrupted single-partition slide. HaloDropExec follow-up brings it to 100.
`SortExtremes` reads as "data extremes observed by a sort", which doesn't cover what `RangeRepartitionExec` does when it implements the trait method: it returns each output partition's *intended primary range* (narrower than the data the partition will actually carry, by the halo distance), so `HaloDropExec` upstream can read each bucket's bound without threading a side-channel. That's a different interpretation of "extremes" — *intended* rather than *observed* — and the old name reads as a lie at that call site. Rename: - struct: SortExtremes → PartitionExtremes - trait method: runtime_sort_extremes → runtime_partition_extremes - internal slot aliases in sorts/sort.rs follow Type doc at `execution_plan.rs:97-127` documents both interpretations so future consumers don't assume observed-only.
1 task
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
None, yet.
Rationale for this change
This is a draft PR as a PoC up for architecture feedback, not approval.
In Coralogix, roughly 80% of the observed slow/failed/OOM queries are window queries with an order, bounds, and no patitionby. These move all the data to 1 core of 1 node and are never likely to be successful this way. This PR demonstrates what I believe is a relatively simple & straight forward solution to parallelizing these queries.
What changes are included in this PR?
Exploiting an observed fact within DataFusion. Although it is generally a streaming engine, pipeline-breaking methods like sorts load every row into memory/disk before they emit a single result. This is analogous to stages in Ballista. So this PR exploits that by adding a run time
PartitionExtremesmethod to physical plan nodes to expose the min/max values of their partition, if known. SortExec can easily know this once it has seen all the data and before it emits it's first batch.RangeRepartitionExec. I know this is duplicate work ATM, and before a PR like this one is ever merged, I would unify this operator withPartitionExec::Rangeto align directionally with upstream. But for reasons of expedience it is a new operator in this PR. Rather than trying to add dynamic numbers of output partitions, it usesPartitionExtremesat run time to calculate the ranges of values that should go into each partition, including the new concept of Halo Rows (also under investigation in this PR). The output partitions from RRE can have overlaps, and if a row belongs in multiple output partitions, it gets duplicated - this gives BoundedWindowAgg (BWAG) what it needs to correctly calculate totals, and the halo rows will be dropped laterHaloDropExec. A new operator that checks rows against what should be inPartitionExtremes, dropping any rows that are outside ownership of the partition. This is how we remove halo rows laterParallelWindowoptimizer rule. This detects candidate BWAGs and inserts the appropriate RRE & HaloDropTarget physical plan shape (from the included SLT):
flowchart TB subgraph Before["Before ParallelWindow"] direction TB DS1[DataSource<br/>4 partitions] --> S1[SortExec<br/>preserve_partitioning] S1 --> SPM1[SortPreservingMergeExec<br/><b>N → 1</b>] SPM1 --> BWAG1["BoundedWindowAggExec<br/><b>SinglePartition</b><br/>(serial)"] BWAG1 --> Out1[Projection + SortLimit] end subgraph After["After ParallelWindow"] direction TB DS2[DataSource<br/>4 partitions] --> S2[SortExec<br/>preserve_partitioning] S2 --> RRE["RangeRepartitionExec<br/>K-way coordinator<br/>+ halo duplication"] RRE --> BWAG2["BoundedWindowAggExec<br/><b>parallel_aware</b><br/>N partitions"] BWAG2 --> HDE["HaloDropExec<br/>per-partition filter<br/>reads runtime_partition_extremes"] HDE --> SPM2[SortPreservingMergeExec<br/>N → 1, fetch=5] SPM2 --> Out2[Projection] end style BWAG1 fill:#f9b,stroke:#a00 style BWAG2 fill:#bf9,stroke:#0a0 style RRE fill:#bf9,stroke:#0a0 style HDE fill:#bf9,stroke:#0a0Are these changes tested?
sqllogictest/test_files/parallel_window.sltis the fixture:seqranges (so routing actuallyhas to move rows around),
(
count(rolling_sum) = 100, not 115).I have not yet added a feature flag + equality SLT comparing parallel
vs. serial outputs row-for-row. That's the next baby step and is what
will catch slide-accumulator boundary bugs that the current count
assertion can't see. Posting now to drive the design conversation
before investing in that loop.
Are there any user-facing changes?
RangeRepartitionExec,HaloDropExec.ExecutionPlan::runtime_partition_extremeswith adefault
Ok(None)— existing implementations are unaffected.PartitionExtremesre-export fromphysical-plan.Known scope cuts (PoC)
The
runtime_partition_extremestype is generic(
Vec<ScalarValue>); the coordinator's bucketing arithmetic iswhat's Int64-only.
DataType-dispatch is the permanent architecturehere, not sample-based bucketing.
LexOrderinglen 1). Multi-key extremes arein the type from day one; coordinator + halo math are not.
windows. A session config
optimizer.enable_parallel_window(defaultoff) is the next commit.
Next commit after the flag.
reducer; lifting it scheduler-side is a follow-up (~300-500 LoC).
Coexistence with #22395
Partitioning::Range+RangePartitioning { ordering, split_points }have already merged from @gabotechs. The execution slot in
RepartitionExecforPartitioning::Rangeis stillnot_impl_err!. This PR keepsRangeRepartitionExecas a separateoperator for now because it does something
RepartitionExecisunlikely to absorb (halo duplication). Once the design is settled here
I expect to thread the existing
RangePartitioningshape throughwhere the boundaries live, rather than carry a parallel
Vec<ScalarValue>representation forever.Draft status
This is up for architecture review, not approval. CI will be red —
clippy / fmt / unrelated tests are not yet tended. I will not push
forward to "green" until reviewers agree the shape is roughly right.