Skip to content

Commit a2055f0

Browse files
authored
Fix querier panic when marshaling QueryResultRequest (issue: #6599) (#6601)
Signed-off-by: SungJin1212 <[email protected]>
1 parent c1c4079 commit a2055f0

File tree

5 files changed

+218
-4
lines changed

5 files changed

+218
-4
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
* [BUGFIX] Query Frontend: Fix samples scanned and peak samples query stats when query hits results cache. #6591
2525
* [BUGFIX] Query Frontend: Fix panic caused by nil pointer dereference. #6609
2626
* [BUGFIX] Ingester: Add check to avoid query 5xx when closing tsdb. #6616
27+
* [BUGFIX] Querier: Fix panic when marshaling QueryResultRequest. #6601
2728

2829
## 1.19.0 2025-02-27
2930

pkg/querier/stats/stats.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,16 @@ func IsEnabled(ctx context.Context) bool {
4949
return FromContext(ctx) != nil
5050
}
5151

52+
func (s *QueryStats) Copy() *QueryStats {
53+
if s == nil {
54+
return nil
55+
}
56+
57+
copied := &QueryStats{}
58+
copied.Merge(s)
59+
return copied
60+
}
61+
5262
// AddWallTime adds some time to the counter.
5363
func (s *QueryStats) AddWallTime(t time.Duration) {
5464
if s == nil {

pkg/querier/stats/stats_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,26 @@ import (
88
"github.com/stretchr/testify/assert"
99
)
1010

11+
func TestStats_Copy(t *testing.T) {
12+
t.Run("stats is nil", func(t *testing.T) {
13+
var stats *QueryStats
14+
copied := stats.Copy()
15+
assert.Nil(t, copied)
16+
})
17+
t.Run("stats is not nil", func(t *testing.T) {
18+
stats, _ := ContextWithEmptyStats(context.Background())
19+
stats.AddWallTime(time.Second)
20+
copied := stats.Copy()
21+
22+
// value should be the same
23+
assert.Equal(t, time.Second, copied.LoadWallTime())
24+
p1, p2 := &copied, &stats
25+
26+
// address should be different
27+
assert.False(t, p1 == p2)
28+
})
29+
}
30+
1131
func TestStats_WallTime(t *testing.T) {
1232
t.Run("add and load wall time", func(t *testing.T) {
1333
stats, _ := ContextWithEmptyStats(context.Background())

pkg/querier/worker/scheduler_processor.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, r
3939
querierID: cfg.QuerierID,
4040
grpcConfig: cfg.GRPCClientConfig,
4141
targetHeaders: cfg.TargetHeaders,
42+
schedulerClientFactory: func(conn *grpc.ClientConn) schedulerpb.SchedulerForQuerierClient {
43+
return schedulerpb.NewSchedulerForQuerierClient(conn)
44+
},
4245
frontendClientRequestDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
4346
Name: "cortex_querier_query_frontend_request_duration_seconds",
4447
Help: "Time spend doing requests to frontend.",
@@ -72,12 +75,13 @@ type schedulerProcessor struct {
7275
frontendPool *client.Pool
7376
frontendClientRequestDuration *prometheus.HistogramVec
7477

75-
targetHeaders []string
78+
targetHeaders []string
79+
schedulerClientFactory func(conn *grpc.ClientConn) schedulerpb.SchedulerForQuerierClient
7680
}
7781

7882
// notifyShutdown implements processor.
7983
func (sp *schedulerProcessor) notifyShutdown(ctx context.Context, conn *grpc.ClientConn, address string) {
80-
client := schedulerpb.NewSchedulerForQuerierClient(conn)
84+
client := sp.schedulerClientFactory(conn)
8185

8286
req := &schedulerpb.NotifyQuerierShutdownRequest{QuerierID: sp.querierID}
8387
if _, err := client.NotifyQuerierShutdown(ctx, req); err != nil {
@@ -87,7 +91,7 @@ func (sp *schedulerProcessor) notifyShutdown(ctx context.Context, conn *grpc.Cli
8791
}
8892

8993
func (sp *schedulerProcessor) processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address string) {
90-
schedulerClient := schedulerpb.NewSchedulerForQuerierClient(conn)
94+
schedulerClient := sp.schedulerClientFactory(conn)
9195

9296
backoff := backoff.New(ctx, processorBackoffConfig)
9397
for backoff.Ongoing() {
@@ -211,11 +215,15 @@ func (sp *schedulerProcessor) runRequest(ctx context.Context, logger log.Logger,
211215

212216
c, err := sp.frontendPool.GetClientFor(frontendAddress)
213217
if err == nil {
218+
// To prevent querier panic, the panic could happen when the go-routines not-exited
219+
// yet in `fetchSeriesFromStores` are increment query-stats while progressing
220+
// (*QueryResultRequest).MarshalToSizedBuffer under the same query-stat objects are used.
221+
copiedStats := stats.Copy()
214222
// Response is empty and uninteresting.
215223
_, err = c.(frontendv2pb.FrontendForQuerierClient).QueryResult(ctx, &frontendv2pb.QueryResultRequest{
216224
QueryID: queryID,
217225
HttpResponse: response,
218-
Stats: stats,
226+
Stats: copiedStats,
219227
})
220228
}
221229
if err != nil {
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
package worker
2+
3+
import (
4+
"context"
5+
"net"
6+
"testing"
7+
"time"
8+
9+
"github.com/go-kit/log"
10+
"github.com/stretchr/testify/mock"
11+
"github.com/stretchr/testify/require"
12+
"github.com/weaveworks/common/httpgrpc"
13+
"go.uber.org/atomic"
14+
"google.golang.org/grpc"
15+
"google.golang.org/grpc/metadata"
16+
17+
"github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb"
18+
"github.com/cortexproject/cortex/pkg/querier/stats"
19+
"github.com/cortexproject/cortex/pkg/scheduler/schedulerpb"
20+
)
21+
22+
// mock querier request handler
23+
type mockRequestHandler struct {
24+
mock.Mock
25+
}
26+
27+
func (m *mockRequestHandler) Handle(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
28+
args := m.Called(ctx, req)
29+
return args.Get(0).(*httpgrpc.HTTPResponse), args.Error(1)
30+
}
31+
32+
type mockFrontendForQuerierServer struct {
33+
mock.Mock
34+
}
35+
36+
func (m *mockFrontendForQuerierServer) QueryResult(_ context.Context, _ *frontendv2pb.QueryResultRequest) (*frontendv2pb.QueryResultResponse, error) {
37+
return &frontendv2pb.QueryResultResponse{}, nil
38+
}
39+
40+
type mockSchedulerForQuerierClient struct {
41+
mock.Mock
42+
}
43+
44+
func (m *mockSchedulerForQuerierClient) QuerierLoop(ctx context.Context, opts ...grpc.CallOption) (schedulerpb.SchedulerForQuerier_QuerierLoopClient, error) {
45+
args := m.Called(ctx, opts)
46+
return args.Get(0).(schedulerpb.SchedulerForQuerier_QuerierLoopClient), args.Error(1)
47+
}
48+
49+
func (m *mockSchedulerForQuerierClient) NotifyQuerierShutdown(ctx context.Context, in *schedulerpb.NotifyQuerierShutdownRequest, opts ...grpc.CallOption) (*schedulerpb.NotifyQuerierShutdownResponse, error) {
50+
args := m.Called(ctx, in, opts)
51+
return args.Get(0).(*schedulerpb.NotifyQuerierShutdownResponse), args.Error(1)
52+
}
53+
54+
// mock SchedulerForQuerier_QuerierLoopClient
55+
type mockQuerierLoopClient struct {
56+
ctx context.Context
57+
mock.Mock
58+
}
59+
60+
func (m *mockQuerierLoopClient) Send(msg *schedulerpb.QuerierToScheduler) error {
61+
args := m.Called(msg)
62+
return args.Error(0)
63+
}
64+
65+
func (m *mockQuerierLoopClient) Recv() (*schedulerpb.SchedulerToQuerier, error) {
66+
args := m.Called()
67+
68+
if fn, ok := args.Get(0).(func() (*schedulerpb.SchedulerToQuerier, error)); ok {
69+
return fn()
70+
}
71+
72+
return args.Get(0).(*schedulerpb.SchedulerToQuerier), args.Error(1)
73+
}
74+
75+
func (m *mockQuerierLoopClient) Header() (metadata.MD, error) {
76+
args := m.Called()
77+
return args.Get(0).(metadata.MD), args.Error(1)
78+
}
79+
80+
func (m *mockQuerierLoopClient) Trailer() metadata.MD {
81+
args := m.Called()
82+
return args.Get(0).(metadata.MD)
83+
}
84+
85+
func (m *mockQuerierLoopClient) CloseSend() error {
86+
args := m.Called()
87+
return args.Error(0)
88+
}
89+
90+
func (m *mockQuerierLoopClient) Context() context.Context {
91+
args := m.Called()
92+
return args.Get(0).(context.Context)
93+
}
94+
95+
func (m *mockQuerierLoopClient) SendMsg(msg interface{}) error {
96+
args := m.Called(msg)
97+
return args.Error(0)
98+
}
99+
100+
func (m *mockQuerierLoopClient) RecvMsg(msg interface{}) error {
101+
args := m.Called(msg)
102+
return args.Error(0)
103+
}
104+
105+
// To show https://github.com/cortexproject/cortex/issues/6599 issue has been resolved
106+
func Test_ToShowNotPanic_RelatedIssue6599(t *testing.T) {
107+
cfg := Config{}
108+
frontendAddress := ":50001"
109+
userID := "user-1"
110+
recvCount := 20000
111+
112+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
113+
defer cancel()
114+
115+
recvCall := atomic.Uint32{}
116+
117+
// mocking query scheduler
118+
querierLoopClient := &mockQuerierLoopClient{}
119+
querierLoopClient.ctx = ctx
120+
querierLoopClient.On("Send", mock.Anything).Return(nil)
121+
querierLoopClient.On("Context").Return(querierLoopClient.ctx)
122+
querierLoopClient.On("Recv").Return(func() (*schedulerpb.SchedulerToQuerier, error) {
123+
recvCall.Add(1)
124+
if recvCall.Load() <= uint32(recvCount) {
125+
return &schedulerpb.SchedulerToQuerier{
126+
QueryID: 1,
127+
HttpRequest: &httpgrpc.HTTPRequest{},
128+
FrontendAddress: frontendAddress,
129+
UserID: userID,
130+
StatsEnabled: true,
131+
}, nil
132+
} else {
133+
<-querierLoopClient.ctx.Done()
134+
return nil, context.Canceled
135+
}
136+
137+
})
138+
139+
requestHandler := &mockRequestHandler{}
140+
requestHandler.On("Handle", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
141+
stat := stats.FromContext(args.Get(0).(context.Context))
142+
143+
// imitate add query-stat at fetchSeriesFromStores
144+
go stat.AddFetchedChunkBytes(10)
145+
}).Return(&httpgrpc.HTTPResponse{}, nil)
146+
147+
sp, _ := newSchedulerProcessor(cfg, requestHandler, log.NewNopLogger(), nil)
148+
schedulerClient := &mockSchedulerForQuerierClient{}
149+
schedulerClient.On("QuerierLoop", mock.Anything, mock.Anything).Return(querierLoopClient, nil)
150+
151+
sp.schedulerClientFactory = func(conn *grpc.ClientConn) schedulerpb.SchedulerForQuerierClient {
152+
return schedulerClient
153+
}
154+
155+
// frontendForQuerierServer
156+
grpcServer := grpc.NewServer()
157+
server := &mockFrontendForQuerierServer{}
158+
frontendv2pb.RegisterFrontendForQuerierServer(grpcServer, server)
159+
160+
lis, err := net.Listen("tcp", frontendAddress)
161+
require.NoError(t, err)
162+
stopChan := make(chan struct{})
163+
go func() {
164+
defer close(stopChan)
165+
if err := grpcServer.Serve(lis); err != nil {
166+
return
167+
}
168+
}()
169+
defer func() {
170+
grpcServer.GracefulStop()
171+
<-stopChan // Wait util stop complete
172+
}()
173+
174+
sp.processQueriesOnSingleStream(ctx, nil, lis.Addr().String())
175+
}

0 commit comments

Comments
 (0)