Skip to content

Commit 2810022

Browse files
craig[bot]Yevgeniy Miretskiy
craig[bot]
and
Yevgeniy Miretskiy
committed
Merge #110283
110283: kvcoord: Fix observability data race r=miretskiy a=miretskiy Fix data race in management of `InCatchup` range information. Epic: None Release note: None Co-authored-by: Yevgeniy Miretskiy <[email protected]>
2 parents 839783e + 005cd0c commit 2810022

File tree

2 files changed

+13
-17
lines changed

2 files changed

+13
-17
lines changed

pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -243,13 +243,9 @@ func (m *rangefeedMuxer) startSingleRangeFeed(
243243
func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error {
244244
streamID := atomic.AddInt64(&m.seqID, 1)
245245

246-
{
247-
// Before starting single rangefeed, acquire catchup scan quota.
248-
catchupRes, err := acquireCatchupScanQuota(ctx, &m.ds.st.SV, m.catchupSem, m.metrics)
249-
if err != nil {
250-
return err
251-
}
252-
s.catchupRes = catchupRes
246+
// Before starting single rangefeed, acquire catchup scan quota.
247+
if err := s.acquireCatchupScanQuota(ctx, &m.ds.st.SV, m.catchupSem, m.metrics); err != nil {
248+
return err
253249
}
254250

255251
// Start a retry loop for sending the batch to the range.

pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -256,12 +256,9 @@ func (ds *DistSender) RangeFeedSpans(
256256
}
257257
// Prior to spawning goroutine to process this feed, acquire catchup scan quota.
258258
// This quota acquisition paces the rate of new goroutine creation.
259-
catchupRes, err := acquireCatchupScanQuota(ctx, &ds.st.SV, &catchupSem, metrics)
260-
if err != nil {
261-
active.release()
259+
if err := active.acquireCatchupScanQuota(ctx, &ds.st.SV, &catchupSem, metrics); err != nil {
262260
return err
263261
}
264-
active.catchupRes = catchupRes
265262
if log.V(1) {
266263
log.Infof(ctx, "RangeFeed starting for span %s@%s (quota acquired in %s)",
267264
span, sri.startAfter, timeutil.Since(acquireStart))
@@ -518,7 +515,6 @@ func newActiveRangeFeed(
518515
Span: span,
519516
StartAfter: startAfter,
520517
CreatedTime: timeutil.Now(),
521-
InCatchup: true,
522518
},
523519
}
524520

@@ -703,24 +699,28 @@ func (a catchupAlloc) Release() {
703699
a()
704700
}
705701

706-
func acquireCatchupScanQuota(
702+
func (a *activeRangeFeed) acquireCatchupScanQuota(
707703
ctx context.Context,
708704
sv *settings.Values,
709705
catchupSem *limit.ConcurrentRequestLimiter,
710706
metrics *DistSenderRangeFeedMetrics,
711-
) (catchupAlloc, error) {
707+
) error {
712708
// Indicate catchup scan is starting; Before potentially blocking on a semaphore, take
713709
// opportunity to update semaphore limit.
714710
catchupSem.SetLimit(maxConcurrentCatchupScans(sv))
715711
res, err := catchupSem.Begin(ctx)
716712
if err != nil {
717-
return nil, err
713+
return err
718714
}
719715
metrics.RangefeedCatchupRanges.Inc(1)
720-
return func() {
716+
a.catchupRes = func() {
721717
metrics.RangefeedCatchupRanges.Dec(1)
722718
res.Release()
723-
}, nil
719+
}
720+
a.Lock()
721+
defer a.Unlock()
722+
a.InCatchup = true
723+
return nil
724724
}
725725

726726
// nweTransportForRange returns Transport for the specified range descriptor.

0 commit comments

Comments
 (0)