Skip to content

Commit 37692c4

Browse files
committed
Address comments
Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
1 parent b0260d3 commit 37692c4

File tree

1 file changed

+69
-15
lines changed

1 file changed

+69
-15
lines changed

docs/proposals/distributed-query-execution.md

Lines changed: 69 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,13 @@ slug: "distributed-query-execution"
1414

1515
### Background
1616

17-
Cortex currently implements distributed query execution by rewriting queries into multiple subqueries, handled through middlewares in the query-frontend. These split queries are scheduled via the query-scheduler, evaluated by queriers, and merged back in the query-frontend. This proposal introduces a new distributed query execution model based on Thanos PromQL PromQL engine.
17+
Cortex currently implements distributed query execution by rewriting queries into multiple subqueries, handled through middlewares in the query-frontend. These split queries are scheduled via the query-scheduler, evaluated by queriers, and merged back in the query-frontend. This proposal introduces a new distributed query execution model based on Thanos PromQL engine.
1818

1919
### Terminology
2020

2121
* Logical plan - Parsed PromQL expression represented as a tree of logical operators.
22-
* Physical plan - A tree of physical operators that execute the query. May not have a 1:1 mapping with logical operators.
23-
* Query Plan - The physical plan.
24-
* Optimization - Selecting the most efficient physical plan.
22+
* Query Plan - The logical query plan.
23+
* Optimization - Selecting the most efficient logical plan.
2524
* Planner/Optimizer - Component that performs optimization.
2625
* Fragment - A portion of the query plan.
2726
* Scheduling - Assigning fragments to queriers
@@ -55,8 +54,7 @@ Cortex currently supports only basic optimizations (e.g., time and by/without cl
5554

5655
### Architecture Overview
5756

58-
This proposal introduces a new model based on the volcano-iterator from Thanos PromQL engine:
59-
The new query execution model will be implemented using the volcano-iterator model from the Thanos PromQL query engine. Here is a high level overview of the proposal of implementing query optimization and distributed query execution in Cortex:
57+
This proposal introduces a new query execution model that will be implemented using the volcano-iterator model from the Thanos PromQL query engine. Here is a high level overview of the proposal of implementing query optimization and distributed query execution in Cortex:
6058

6159
* **Query Frontend**: Converts the logical plan into an optimized plan and sends the plan to the query-scheduler.
6260
* **Query Scheduler**: Fragments the plan and assigns plan fragments to queriers.
@@ -101,18 +99,74 @@ These fragments are then **enqueued** into the scheduler’s internal queue syst
10199

102100
#### Assigning query fragments to queriers
103101

104-
Each fragment is assigned to a querier based on resource availability, tenant isolation, and data locality (if applicable). The fragment include the plan, the remote fragment IDs it depends on. After child fragments are assigned to a particular querier, it’s parent fragment is updated with the location of the querier from which to read the results from.
102+
The fragment include the plan and the remote fragment IDs it depends on. Queriers pull fragments from the scheduler. After child fragments are assigned to a particular querier, it’s parent fragment is updated with the location of the querier from which to read the results from.
105103

106104
At execution time, a `Remote` operator acts as a placeholder within the querier's execution engine to **pull results from other queriers** running dependent fragments.
107105

108106
![AssignedFragments](/images/proposals/distributed-execution-assigned-fragments.png)
109107

110108
### Querier
111109

112-
The **Querier** is enhanced to support both **executing** fragments and **serving results** to other queriers. It now implements two interfaces:
110+
The **Querier** is enhanced to support both **executing** query fragments and **serving** results to other queriers. In the distributed query model, a querier may either evaluate a plan fragment or provide intermediate results to other queriers that depend on its output.
113111

114-
* **Executor** – Responsible for executing the root fragment (or any assigned fragment) of the physical query plan.
115-
* **Server** – Serves results for executed fragments to other queriers that depend on them (i.e., acts as a remote data source for `Remote` operators).
112+
To support this, the querier now implements two interfaces:
113+
* **QueryExecutor** – Executes assigned query plan root fragment.
114+
* **QueryServer** – Serves results to other queriers via streaming gRPC endpoints. (i.e., acts as a remote data source for `Remote` operators).
115+
116+
#### QueryExecutor
117+
The QueryExecutor interface enables a querier to execute the root fragment of the distributed query plan.
118+
119+
```
120+
type QueryExecutor interface {
121+
ExecuteQuery(p LogicalPlan) (QueryResult, error)
122+
}
123+
```
124+
125+
#### QueryServer
126+
The QueryServer interface enables a querier to expose query results to other queriers via a gRPC API. This is used when another querier is executing a fragment that depends on the results of this one (i.e., the child in the execution tree).
127+
128+
```
129+
type QueryServer interface {
130+
Enqueue(context.Context, p LogicalPlan) error
131+
Series(SeriesRequest, querierpb.QueryServer_SeriesServer) error
132+
Next(*executionpb.NextRequest, querierpb.QueryServer_NextServer) error
133+
}
134+
```
135+
136+
* **Enqueue** - Schedules the provided logical plan fragment for execution on the querier. Enqueuing a plan on a quereir will block a concurrency until the query finishes, or times out if it exceeds the configured limit. It respects the querier’s concurrency limits and queue capacity.
137+
* **Series** - Streams the metadata (labels) for the series produced by the fragment, allowing parent queriers to understand the shape of incoming data.
138+
* **Next** - Streams the actual step vector results of the fragment, returning time-step-wise computed samples and histograms.
139+
140+
These methods are implemented as streaming gRPC endpoints and used by `Remote` operators to pull data on-demand during execution.
141+
142+
143+
##### gRPC Service Definition
144+
145+
```
146+
service QueryServer {
147+
rpc Series(SeriesRequest) returns (stream OneSeries);
148+
rpc Next(NextRequest) returns (stream StepVectorBatch);
149+
}
150+
151+
message OneSeries {
152+
repeated Label labels = 1;
153+
}
154+
155+
message StepVectorBatch {
156+
repeated StepVector step_vectors = 1;
157+
}
158+
159+
message StepVector {
160+
int64 t = 1;
161+
repeated uint64 sample_IDs = 2;
162+
repeated double samples = 3;
163+
164+
repeated uint64 histogram_IDs = 4;
165+
repeated FloatHistogram histograms = 5;
166+
}
167+
```
168+
169+
Together, the QueryExecutor and QueryServer roles allow each querier to act as both a compute node and a data provider, enabling true parallel and distributed evaluation of PromQL queries across Cortex's infrastructure.
116170

117171
#### Fragment Execution
118172

@@ -122,10 +176,10 @@ Each fragment is evaluated using the **Thanos PromQL engine’s volcano iterator
122176

123177
The querier assigned the **root fragment** also plays the role of **coordinator**. The co-ordinator will
124178

125-
* trigger dependent fragment execution by invoking Next() on the child queriers
126-
* Waits for remote fragment results
127-
* Assembles the final result
128-
* Sends it back to the QueryFrontend
179+
* Trigger dependent fragment execution by invoking Next() on the child queriers
180+
* Wait for remote fragment results
181+
* Assemble the final result
182+
* Notify the QueryFrontend
129183

130184
This keeps the query execution tree distributed and parallelized, but the final result is merged and returned by a single coordinator node for consistency and simplicity.
131185

@@ -135,7 +189,7 @@ Not all types of queries are supported by Thanos PromQL engine. If any plan frag
135189

136190
### Error Handling
137191

138-
If any individual fragment fails or if a querier handling a fragment is restarted during execution, the entire query is considered failed. No retries or partial results are supported in the initial version. The root querier reports the error back to the query frontend, which returns it to the client.
192+
If any individual fragment fails or if a querier handling a fragment is restarted during execution, the entire query is considered failed. No retries or partial results are supported in the initial version. The root querier reports the error back to the QueryFrontend, which returns it to the client.
139193

140194

141195
## Conclusion

0 commit comments

Comments
 (0)