Skip to content

Commit 10d1517

Browse files
authored
Don't propagate cancel signal to the Prometheus rules manager context (#6326)
* Don't propagate cancel signal to the Prometheus rules manager context This change allows the rules that are still executing queries to complete before cortex if sully shutdown. Signed-off-by: Raphael Silva <[email protected]> * Make ruler unit tests to run faster Signed-off-by: Raphael Silva <[email protected]> * Avoid tests to fail due to race condition Use atomic counter to keep track of the successful queries Signed-off-by: Raphael Silva <[email protected]> --------- Signed-off-by: Raphael Silva <[email protected]>
1 parent db4ec12 commit 10d1517

File tree

5 files changed

+146
-7
lines changed

5 files changed

+146
-7
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
* [ENHANCEMENT] Querier/Ruler: Expose `store_gateway_consistency_check_max_attempts` for max retries when querying store gateway in consistency check. #6276
3636
* [ENHANCEMENT] StoreGateway: Add new `cortex_bucket_store_chunk_pool_inuse_bytes` metric to track the usage in chunk pool. #6310
3737
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224
38+
* [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326
3839

3940
## 1.18.1 2024-10-14
4041

pkg/ruler/api_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func TestRuler_rules(t *testing.T) {
6767
Alerts: []*Alert{},
6868
},
6969
},
70-
Interval: 60,
70+
Interval: 10,
7171
},
7272
},
7373
},
@@ -123,7 +123,7 @@ func TestRuler_rules_special_characters(t *testing.T) {
123123
Alerts: []*Alert{},
124124
},
125125
},
126-
Interval: 60,
126+
Interval: 10,
127127
},
128128
},
129129
},
@@ -178,7 +178,7 @@ func TestRuler_rules_limit(t *testing.T) {
178178
Alerts: []*Alert{},
179179
},
180180
},
181-
Interval: 60,
181+
Interval: 10,
182182
},
183183
},
184184
},
@@ -342,7 +342,7 @@ func TestRuler_DeleteNamespace(t *testing.T) {
342342

343343
router.ServeHTTP(w, req)
344344
require.Equal(t, http.StatusOK, w.Code)
345-
require.Equal(t, "name: group1\ninterval: 1m\nrules:\n - record: UP_RULE\n expr: up\n - alert: UP_ALERT\n expr: up < 1\n", w.Body.String())
345+
require.Equal(t, "name: group1\ninterval: 10s\nrules:\n - record: UP_RULE\n expr: up\n - alert: UP_ALERT\n expr: up < 1\n", w.Body.String())
346346

347347
// Delete namespace1
348348
req = requestFor(t, http.MethodDelete, "https://localhost:8080/api/v1/rules/namespace1", nil, "user1")

pkg/ruler/compat.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,11 +341,15 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi
341341
queryFunc = metricsQueryFunc
342342
}
343343

344+
// We let the Prometheus rules manager control the context so that there is a chance
345+
// for graceful shutdown of rules that are still in execution even in case the cortex context is canceled.
346+
prometheusContext := user.InjectOrgID(context.WithoutCancel(ctx), userID)
347+
344348
return rules.NewManager(&rules.ManagerOptions{
345349
Appendable: NewPusherAppendable(p, userID, overrides, totalWrites, failedWrites),
346350
Queryable: q,
347351
QueryFunc: queryFunc,
348-
Context: user.InjectOrgID(ctx, userID),
352+
Context: prometheusContext,
349353
ExternalURL: cfg.ExternalURL.URL,
350354
NotifyFunc: SendAlerts(notifier, cfg.ExternalURL.URL.String()),
351355
Logger: log.With(logger, "user", userID),

pkg/ruler/ruler_test.go

Lines changed: 135 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,55 @@ func (e emptyQuerier) Select(ctx context.Context, sortSeries bool, hints *storag
138138
return storage.EmptySeriesSet()
139139
}
140140

141+
func fixedQueryable(querier storage.Querier) storage.Queryable {
142+
return storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) {
143+
return querier, nil
144+
})
145+
}
146+
147+
type blockingQuerier struct {
148+
queryStarted chan struct{}
149+
queryFinished chan struct{}
150+
queryBlocker chan struct{}
151+
successfulQueries *atomic.Int64
152+
}
153+
154+
func (s *blockingQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
155+
return nil, nil, nil
156+
}
157+
158+
func (s *blockingQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
159+
return nil, nil, nil
160+
}
161+
162+
func (s *blockingQuerier) Close() error {
163+
return nil
164+
}
165+
166+
func (s *blockingQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) (returnSeries storage.SeriesSet) {
167+
select {
168+
case <-s.queryStarted:
169+
default:
170+
close(s.queryStarted)
171+
}
172+
173+
select {
174+
case <-ctx.Done():
175+
returnSeries = storage.ErrSeriesSet(ctx.Err())
176+
case <-s.queryBlocker:
177+
s.successfulQueries.Add(1)
178+
returnSeries = storage.EmptySeriesSet()
179+
}
180+
181+
select {
182+
case <-s.queryFinished:
183+
default:
184+
close(s.queryFinished)
185+
}
186+
187+
return returnSeries
188+
}
189+
141190
func testQueryableFunc(querierTestConfig *querier.TestConfig, reg prometheus.Registerer, logger log.Logger) storage.QueryableFunc {
142191
if querierTestConfig != nil {
143192
// disable active query tracking for test
@@ -158,10 +207,15 @@ func testQueryableFunc(querierTestConfig *querier.TestConfig, reg prometheus.Reg
158207
func testSetup(t *testing.T, querierTestConfig *querier.TestConfig) (*promql.Engine, storage.QueryableFunc, Pusher, log.Logger, RulesLimits, prometheus.Registerer) {
159208
tracker := promql.NewActiveQueryTracker(t.TempDir(), 20, log.NewNopLogger())
160209

210+
timeout := time.Minute * 2
211+
212+
if querierTestConfig != nil && querierTestConfig.Cfg.Timeout != 0 {
213+
timeout = querierTestConfig.Cfg.Timeout
214+
}
161215
engine := promql.NewEngine(promql.EngineOpts{
162216
MaxSamples: 1e6,
163217
ActiveQueryTracker: tracker,
164-
Timeout: 2 * time.Minute,
218+
Timeout: timeout,
165219
})
166220

167221
// Mock the pusher
@@ -322,6 +376,86 @@ func TestNotifierSendsUserIDHeader(t *testing.T) {
322376
`), "prometheus_notifications_dropped_total"))
323377
}
324378

379+
func TestRuler_TestShutdown(t *testing.T) {
380+
tests := []struct {
381+
name string
382+
shutdownFn func(*blockingQuerier, *Ruler)
383+
}{
384+
{
385+
name: "successful query after shutdown",
386+
shutdownFn: func(querier *blockingQuerier, ruler *Ruler) {
387+
// Wait query to start
388+
<-querier.queryStarted
389+
390+
// The following cancel the context of the ruler service.
391+
ruler.StopAsync()
392+
393+
// Simulate the completion of the query
394+
close(querier.queryBlocker)
395+
396+
// Wait query to finish
397+
<-querier.queryFinished
398+
399+
require.GreaterOrEqual(t, querier.successfulQueries.Load(), int64(1), "query failed to complete successfully failed to complete")
400+
},
401+
},
402+
{
403+
name: "query timeout while shutdown",
404+
shutdownFn: func(querier *blockingQuerier, ruler *Ruler) {
405+
// Wait query to start
406+
<-querier.queryStarted
407+
408+
// The following cancel the context of the ruler service.
409+
ruler.StopAsync()
410+
411+
// Wait query to finish
412+
<-querier.queryFinished
413+
414+
require.Equal(t, querier.successfulQueries.Load(), int64(0), "query should not be succesfull")
415+
},
416+
},
417+
}
418+
419+
for _, test := range tests {
420+
t.Run(test.name, func(t *testing.T) {
421+
store := newMockRuleStore(mockRules, nil)
422+
cfg := defaultRulerConfig(t)
423+
mockQuerier := &blockingQuerier{
424+
queryBlocker: make(chan struct{}),
425+
queryStarted: make(chan struct{}),
426+
queryFinished: make(chan struct{}),
427+
successfulQueries: atomic.NewInt64(0),
428+
}
429+
sleepQueriable := fixedQueryable(mockQuerier)
430+
431+
d := &querier.MockDistributor{}
432+
433+
d.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
434+
&client.QueryStreamResponse{
435+
Chunkseries: []client.TimeSeriesChunk{},
436+
}, nil)
437+
d.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Panic("This should not be called for the ruler use-cases.")
438+
439+
r := newTestRuler(t, cfg, store, &querier.TestConfig{
440+
Distributor: d,
441+
Stores: []querier.QueryableWithFilter{
442+
querier.UseAlwaysQueryable(sleepQueriable),
443+
},
444+
Cfg: querier.Config{Timeout: time.Second * 1},
445+
})
446+
447+
test.shutdownFn(mockQuerier, r)
448+
449+
err := r.AwaitTerminated(context.Background())
450+
require.NoError(t, err)
451+
452+
e := r.FailureCase()
453+
require.NoError(t, e)
454+
})
455+
}
456+
457+
}
458+
325459
func TestRuler_Rules(t *testing.T) {
326460
store := newMockRuleStore(mockRules, nil)
327461
cfg := defaultRulerConfig(t)

pkg/ruler/store_mock_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ type mockRuleStore struct {
1919

2020
var (
2121
delim = "/"
22-
interval, _ = time.ParseDuration("1m")
22+
interval, _ = time.ParseDuration("10s")
2323
mockRulesNamespaces = map[string]rulespb.RuleGroupList{
2424
"user1": {
2525
&rulespb.RuleGroupDesc{

0 commit comments

Comments
 (0)