Skip to content

Commit b0260d3

Browse files
committed
Proposal for distributed query execution
Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
1 parent c9fa217 commit b0260d3

7 files changed

+143
-0
lines changed
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
---
2+
title: "Distributed Query Execution"
3+
linkTitle: "Distributed Query Execution"
4+
weight: 1
5+
slug: "distributed-query-execution"
6+
---
7+
8+
- Author: [Harry John](https://github.com/harry671003)
9+
- Date: June 2025
10+
- Status: Proposed
11+
12+
13+
## Overview
14+
15+
### Background
16+
17+
Cortex currently implements distributed query execution by rewriting queries into multiple subqueries, handled through middlewares in the query-frontend. These split queries are scheduled via the query-scheduler, evaluated by queriers, and merged back in the query-frontend. This proposal introduces a new distributed query execution model based on Thanos PromQL PromQL engine.
18+
19+
### Terminology
20+
21+
* Logical plan - Parsed PromQL expression represented as a tree of logical operators.
22+
* Physical plan - A tree of physical operators that execute the query. May not have a 1:1 mapping with logical operators.
23+
* Query Plan - The physical plan.
24+
* Optimization - Selecting the most efficient physical plan.
25+
* Planner/Optimizer - Component that performs optimization.
26+
* Fragment - A portion of the query plan.
27+
* Scheduling - Assigning fragments to queriers
28+
29+
## Problem
30+
31+
#### Scalability
32+
33+
The current model struggles with high-cardinality queries scanning millions of series. Ingesters are not well-partitioned, and store-gateways don’t leverage partitioning during query execution. As a result, queriers often pull all data to a single node, creating bottlenecks.
34+
35+
#### Inefficient Merging
36+
37+
Merging results in the query-frontend—which isn’t shuffle-sharded — makes it a single point of failure. Poison queries can OOM all query-frontends, impacting availability.
38+
39+
#### Limited Query Rewriting
40+
41+
Today, queries are split via string manipulation. For example:
42+
43+
```
44+
sum by (pod) (rate(http_req_total[5m])) over 2d
45+
=> MERGE(sum by (pod) (rate(http_req_total{shard="1"}[5m])), sum by (pod) (rate(http_req_total{shard="2"}[5m])))
46+
```
47+
48+
This is fragile and not scalable. Modern databases instead create a physical plan and split it into executable fragments assigned across nodes.
49+
50+
#### Lack of Optimizations
51+
52+
Cortex currently supports only basic optimizations (e.g., time and by/without clause-based splitting). Mature DBMS systems explore multiple execution strategies and pick the optimal one — something currently missing in Cortex.
53+
54+
## Solution
55+
56+
### Architecture Overview
57+
58+
This proposal introduces a new model based on the volcano-iterator from Thanos PromQL engine:
59+
The new query execution model will be implemented using the volcano-iterator model from the Thanos PromQL query engine. Here is a high level overview of the proposal of implementing query optimization and distributed query execution in Cortex:
60+
61+
* **Query Frontend**: Converts the logical plan into an optimized plan and sends the plan to the query-scheduler.
62+
* **Query Scheduler**: Fragments the plan and assigns plan fragments to queriers.
63+
* **Queriers**: Pull fragments, execute them, and return results. The root-fragment querier acts as coordinator, merging final results before notifying the frontend.
64+
65+
![Architecture](/images/proposals/distributed-execution-arch.png)
66+
67+
### Query Frontend
68+
69+
The **Query Frontend** continues to act as the entry point for `query_range` and `query` APIs. The existing API layer is reused, but a new **distributed query engine** is introduced under the hood. This engine is responsible for generating and optimizing the query plan, and coordinating with the query scheduler.
70+
71+
#### Logical Plan
72+
73+
When a query request is received, the PromQL expression is first parsed into a **logical query plan**, represented as a tree of logical operators. This logical plan is then passed through a series of **optimizers** that aim to transform the query into a more efficient structure. Consider the PromQL query `sum(rate(req_total[5m]))` to be run over 2 days. The query is parsed into the following basic logical plan.
74+
75+
![LogicalPlan](/images/proposals/distributed-execution-logical-plan.png)
76+
77+
#### Distributed Optimizer
78+
79+
A special optimizer, the **Distributed Optimizer**, analyzes the logical plan to determine where and how the query can be split and executed in parallel. This is done by inserting **`Remote` logical nodes** at appropriate points in the plan tree.
80+
These `Remote` nodes act as placeholders indicating plan fragments that can be offloaded to remote queriers. This enables sharding both over time and across data partitions.
81+
82+
![OptimizedLogicalPlan](/images/proposals/distributed-execution-optimized-plan.png)
83+
84+
### Query Scheduler
85+
86+
The **Query Scheduler** receives the optimized logical plan, including the `Remote` nodes introduced by the optimizer. It is responsible for **fragmenting** the plan and orchestrating its execution across multiple queriers.
87+
88+
#### Fragmentation
89+
90+
Fragmentation involves splitting the query plan into **multiple fragments**, where each fragment is a sub-tree of the overall plan that can be evaluated independently. Fragments are created by cutting the plan at `Remote` nodes. In the below diagram the coloured tiles mark the same fragment.
91+
92+
![PlanFragments](/images/proposals/distributed-execution-plan-fragments.png)
93+
94+
#### Enqueuing fragments
95+
96+
Fragmentation respects dependencies: **child fragments are enqueued before parent fragments**, ensuring data is available before dependent operations are scheduled.
97+
These fragments are then **enqueued** into the scheduler’s internal queue system, ready to be picked up by queriers.
98+
99+
![SchedulerQueue](/images/proposals/distributed-execution-scheduler-queue.png)
100+
101+
102+
#### Assigning query fragments to queriers
103+
104+
Each fragment is assigned to a querier based on resource availability, tenant isolation, and data locality (if applicable). The fragment include the plan, the remote fragment IDs it depends on. After child fragments are assigned to a particular querier, it’s parent fragment is updated with the location of the querier from which to read the results from.
105+
106+
At execution time, a `Remote` operator acts as a placeholder within the querier's execution engine to **pull results from other queriers** running dependent fragments.
107+
108+
![AssignedFragments](/images/proposals/distributed-execution-assigned-fragments.png)
109+
110+
### Querier
111+
112+
The **Querier** is enhanced to support both **executing** fragments and **serving results** to other queriers. It now implements two interfaces:
113+
114+
* **Executor** – Responsible for executing the root fragment (or any assigned fragment) of the physical query plan.
115+
* **Server** – Serves results for executed fragments to other queriers that depend on them (i.e., acts as a remote data source for `Remote` operators).
116+
117+
#### Fragment Execution
118+
119+
Each fragment is evaluated using the **Thanos PromQL engine’s volcano iterator model**, where operators are chained and lazily pulled for evaluation. If a fragment includes a `RemoteOperator`, it fetches the intermediate result from the peer querier serving the dependent fragment. Leaf operators such as `MatrixSelector` and `VectorSelector` read data from storage layer.
120+
121+
#### Co-ordinated Execution
122+
123+
The querier assigned the **root fragment** also plays the role of **coordinator**. The co-ordinator will
124+
125+
* trigger dependent fragment execution by invoking Next() on the child queriers
126+
* Waits for remote fragment results
127+
* Assembles the final result
128+
* Sends it back to the QueryFrontend
129+
130+
This keeps the query execution tree distributed and parallelized, but the final result is merged and returned by a single coordinator node for consistency and simplicity.
131+
132+
#### Fallback
133+
134+
Not all types of queries are supported by Thanos PromQL engine. If any plan fragment is not supported by Thanos engine, the plan is converted back into a query string and evaluated by the Prometheus PromQL engine.
135+
136+
### Error Handling
137+
138+
If any individual fragment fails or if a querier handling a fragment is restarted during execution, the entire query is considered failed. No retries or partial results are supported in the initial version. The root querier reports the error back to the query frontend, which returns it to the client.
139+
140+
141+
## Conclusion
142+
143+
This proposal lays the groundwork for a robust and scalable distributed query execution in Cortex. By leveraging Thanos PromQL's iterator engine and explicit fragmentation, it avoids string-based rewriting and unlocks deep query optimization and better fault isolation. Future iterations can explore support for retries, cost-based optimization, and fine-grained resource scheduling.
Loading
Loading
Loading
Loading
Loading
Loading

0 commit comments

Comments
 (0)