Skip to content

Commit 95dcf06

Browse files
craig[bot]aadityasondhi
craig[bot]
andcommitted
Merge #109446
109446: kvadmission: fix handling of non-elastic work with flow control r=irfansharif a=aadityasondhi Previously, when we were skipping flow control for regular work, we were still encoding raft messages with AC encoding. This would enqueue essentially no-op AC work items below raft, increasing the possibility of OOM. This is because we would not be pacing the rate of regular work by below-raft admission rates but it would be consuming memory in below-raft work queues. This change skips encoding raft messages with AC encoding for cases where we bypass flow control. Additionally, we still subject such work to above-raft IO admission control, if enabled. Informs #104154. Release note: None Co-authored-by: Aaditya Sondhi <[email protected]>
2 parents deebc56 + 2734eda commit 95dcf06

File tree

9 files changed

+92
-57
lines changed

9 files changed

+92
-57
lines changed

pkg/kv/kvserver/flow_control_replica_integration_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -413,9 +413,9 @@ func newMockFlowHandle(
413413

414414
func (m *mockFlowHandle) Admit(
415415
ctx context.Context, pri admissionpb.WorkPriority, ct time.Time,
416-
) error {
416+
) (bool, error) {
417417
m.t.Fatal("unimplemented")
418-
return nil
418+
return false, nil
419419
}
420420

421421
func (m *mockFlowHandle) DeductTokensFor(

pkg/kv/kvserver/kvadmission/kvadmission.go

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -310,29 +310,41 @@ func (n *controllerImpl) AdmitKVWork(
310310
// to continue even when throttling since there are often significant
311311
// number of tokens available.
312312
if ba.IsWrite() && !ba.IsSingleHeartbeatTxnRequest() {
313-
if !bypassAdmission &&
314-
kvflowcontrol.Enabled.Get(&n.settings.SV) &&
315-
n.settings.Version.IsActive(ctx, clusterversion.V23_2_UseACRaftEntryEntryEncodings) {
313+
var admitted bool
314+
attemptFlowControl := kvflowcontrol.Enabled.Get(&n.settings.SV) &&
315+
n.settings.Version.IsActive(ctx, clusterversion.V23_2_UseACRaftEntryEntryEncodings)
316+
if attemptFlowControl && !bypassAdmission {
316317
kvflowHandle, found := n.kvflowHandles.Lookup(ba.RangeID)
317318
if !found {
318319
return Handle{}, nil
319320
}
320-
if err := kvflowHandle.Admit(ctx, admissionInfo.Priority, timeutil.FromUnixNanos(createTime)); err != nil {
321+
var err error
322+
admitted, err = kvflowHandle.Admit(ctx, admissionInfo.Priority, timeutil.FromUnixNanos(createTime))
323+
if err != nil {
321324
return Handle{}, err
325+
} else if admitted {
326+
// NB: It's possible for us to be waiting for available flow tokens
327+
// for a different set of streams that the ones we'll eventually
328+
// deduct tokens from, if the range experiences a split between now
329+
// and the point of deduction. That's ok, there's no strong
330+
// synchronization needed between these two points.
331+
ah.raftAdmissionMeta = &kvflowcontrolpb.RaftAdmissionMeta{
332+
AdmissionPriority: int32(admissionInfo.Priority),
333+
AdmissionCreateTime: admissionInfo.CreateTime,
334+
AdmissionOriginNode: n.nodeID.Get(),
335+
}
322336
}
323-
// NB: It's possible for us to be waiting for available flow tokens
324-
// for a different set of streams that the ones we'll eventually
325-
// deduct tokens from, if the range experiences a split between now
326-
// and the point of deduction. That's ok, there's no strong
327-
// synchronization needed between these two points.
328-
ah.raftAdmissionMeta = &kvflowcontrolpb.RaftAdmissionMeta{
329-
AdmissionPriority: int32(admissionInfo.Priority),
330-
AdmissionCreateTime: admissionInfo.CreateTime,
331-
AdmissionOriginNode: n.nodeID.Get(),
332-
}
333-
} else {
337+
338+
}
339+
// If flow control is disabled or if work bypasses flow control, we still
340+
// subject it above-raft, leaseholder-only IO admission control.
341+
if !attemptFlowControl || !admitted {
334342
storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(ba.Replica.StoreID))
335343
if storeAdmissionQ != nil {
344+
// NB: Even though we would know here we're bypassing admission (via
345+
// `bypassAdmission`), we still have to explicitly invoke `.Admit()`.
346+
// We do it for correct token accounting (i.e. we deduct tokens without
347+
// blocking).
336348
storeWorkHandle, err := storeAdmissionQ.Admit(
337349
ctx, admission.StoreWriteWorkInfo{WorkInfo: admissionInfo})
338350
if err != nil {

pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,6 @@ const (
6060
// virtually enqueued in below-raft admission queues and dequeued in
6161
// priority order, but only empty elastic flow token buckets above-raft will
6262
// block further elastic traffic from being admitted.
63-
//
64-
// TODO(irfansharif): We're potentially risking OOMs doing all this tracking
65-
// for regular work, without coalescing state. With a bit of plumbing, for
66-
// requests that bypass flow control we could fallback to using the non-AC
67-
// raft encodings and avoid the potential OOMs. Address this as part of
68-
// #95563.
6963
ApplyToElastic ModeT = iota
7064
// ApplyToAll uses flow control for both elastic and regular traffic,
7165
// i.e. all work will wait for flow tokens to be available.
@@ -117,11 +111,12 @@ type Tokens int64
117111
// Controller provides flow control for replication traffic in KV, held at the
118112
// node-level.
119113
type Controller interface {
120-
// Admit seeks admission to replicate data, regardless of size, for work
121-
// with the given priority, create-time, and over the given stream. This
122-
// blocks until there are flow tokens available or the stream disconnects,
123-
// subject to context cancellation.
124-
Admit(context.Context, admissionpb.WorkPriority, time.Time, ConnectedStream) error
114+
// Admit seeks admission to replicate data, regardless of size, for work with
115+
// the given priority, create-time, and over the given stream. This blocks
116+
// until there are flow tokens available or the stream disconnects, subject to
117+
// context cancellation. This returns true if the request was admitted through
118+
// flow control. Ignore the first return type if err != nil
119+
Admit(context.Context, admissionpb.WorkPriority, time.Time, ConnectedStream) (admitted bool, _ error)
125120
// DeductTokens deducts (without blocking) flow tokens for replicating work
126121
// with given priority over the given stream. Requests are expected to
127122
// have been Admit()-ed first.
@@ -158,10 +153,11 @@ type Controller interface {
158153
// given priority, takes log position into account -- see
159154
// kvflowcontrolpb.AdmittedRaftLogEntries for more details).
160155
type Handle interface {
161-
// Admit seeks admission to replicate data, regardless of size, for work
162-
// with the given priority and create-time. This blocks until there are
163-
// flow tokens available for all connected streams.
164-
Admit(context.Context, admissionpb.WorkPriority, time.Time) error
156+
// Admit seeks admission to replicate data, regardless of size, for work with
157+
// the given priority and create-time. This blocks until there are flow tokens
158+
// available for all connected streams. This returns true if the request was
159+
// admitted through flow control. Ignore the first return type if err != nil.
160+
Admit(context.Context, admissionpb.WorkPriority, time.Time) (admitted bool, _ error)
165161
// DeductTokensFor deducts (without blocking) flow tokens for replicating
166162
// work with given priority along connected streams. The deduction is
167163
// tracked with respect to the specific raft log position it's expecting it

pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func (c *Controller) Admit(
136136
pri admissionpb.WorkPriority,
137137
_ time.Time,
138138
connection kvflowcontrol.ConnectedStream,
139-
) error {
139+
) (bool, error) {
140140
class := admissionpb.WorkClassFromPri(pri)
141141
c.metrics.onWaiting(class)
142142

@@ -148,12 +148,11 @@ func (c *Controller) Admit(
148148
c.mu.Unlock()
149149

150150
tokens := b.tokens(class)
151-
if tokens > 0 ||
152-
// In addition to letting requests through when there are tokens
153-
// being available, we'll also let them through if we're not
154-
// applying flow control to their specific work class.
155-
c.mode() == kvflowcontrol.ApplyToElastic && class == admissionpb.RegularWorkClass {
156-
151+
// In addition to letting requests through when there are tokens
152+
// being available, we'll also let them through if we're not
153+
// applying flow control to their specific work class.
154+
bypass := c.mode() == kvflowcontrol.ApplyToElastic && class == admissionpb.RegularWorkClass
155+
if tokens > 0 || bypass {
157156
if log.ExpensiveLogEnabled(ctx, 2) {
158157
log.Infof(ctx, "admitted request (pri=%s stream=%s tokens=%s wait-duration=%s mode=%s)",
159158
pri, connection.Stream(), tokens, c.clock.PhysicalTime().Sub(tstart), c.mode())
@@ -179,7 +178,10 @@ func (c *Controller) Admit(
179178

180179
b.signal() // signal a waiter, if any
181180
c.metrics.onAdmitted(class, c.clock.PhysicalTime().Sub(tstart))
182-
return nil
181+
if bypass {
182+
return false, nil
183+
}
184+
return true, nil
183185
}
184186

185187
if !logged && log.ExpensiveLogEnabled(ctx, 2) {
@@ -192,12 +194,12 @@ func (c *Controller) Admit(
192194
case <-b.wait(): // wait for a signal
193195
case <-connection.Disconnected():
194196
c.metrics.onBypassed(class, c.clock.PhysicalTime().Sub(tstart))
195-
return nil
197+
return true, nil
196198
case <-ctx.Done():
197199
if ctx.Err() != nil {
198200
c.metrics.onErrored(class, c.clock.PhysicalTime().Sub(tstart))
199201
}
200-
return ctx.Err()
202+
return false, ctx.Err()
201203
}
202204
}
203205

pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,9 @@ func TestInspectController(t *testing.T) {
196196

197197
// Set up a single connected stream, s1/t1, and ensure it shows up in the
198198
// Inspect() state.
199-
require.NoError(t, controller.Admit(ctx, admissionpb.NormalPri, time.Time{}, makeConnectedStream(1)))
199+
admitted, err := controller.Admit(ctx, admissionpb.NormalPri, time.Time{}, makeConnectedStream(1))
200+
require.NoError(t, err)
201+
require.True(t, admitted)
200202
require.Len(t, controller.Inspect(ctx), 1)
201203
require.Equal(t, controller.Inspect(ctx)[0],
202204
makeInspectStream(1, 8<<20 /* 8MiB */, 16<<20 /* 16 MiB */))
@@ -211,7 +213,9 @@ func TestInspectController(t *testing.T) {
211213

212214
// Connect another stream, s1/s2, and ensure it shows up in the Inspect()
213215
// state.
214-
require.NoError(t, controller.Admit(ctx, admissionpb.BulkNormalPri, time.Time{}, makeConnectedStream(2)))
216+
admitted, err = controller.Admit(ctx, admissionpb.BulkNormalPri, time.Time{}, makeConnectedStream(2))
217+
require.NoError(t, err)
218+
require.True(t, admitted)
215219
require.Len(t, controller.Inspect(ctx), 2)
216220
require.Equal(t, controller.Inspect(ctx)[1],
217221
makeInspectStream(2, 8<<20 /* 8MiB */, 16<<20 /* 16 MiB */))

pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,9 @@ func New(
7979
var _ kvflowcontrol.Handle = &Handle{}
8080

8181
// Admit is part of the kvflowcontrol.Handle interface.
82-
func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct time.Time) error {
82+
func (h *Handle) Admit(
83+
ctx context.Context, pri admissionpb.WorkPriority, ct time.Time,
84+
) (bool, error) {
8385
if h == nil {
8486
// TODO(irfansharif): This can happen if we're proposing immediately on
8587
// a newly split off RHS that doesn't know it's a leader yet (so we
@@ -92,14 +94,14 @@ func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct tim
9294
// As for cluster settings that disable flow control entirely or only
9395
// for regular traffic, that can be dealt with at the caller by not
9496
// calling .Admit() and ensuring we use the right raft entry encodings.
95-
return nil
97+
return false, nil
9698
}
9799

98100
h.mu.Lock()
99101
if h.mu.closed {
100102
h.mu.Unlock()
101103
log.Errorf(ctx, "operating on a closed handle")
102-
return nil
104+
return false, nil
103105
}
104106

105107
// NB: We're using a copy-on-write scheme elsewhere to maintain this slice
@@ -115,15 +117,20 @@ func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct tim
115117
h.metrics.onWaiting(class)
116118
tstart := h.clock.PhysicalTime()
117119

120+
// NB: We track whether the last stream was subject to flow control, this
121+
// helps us decide later if we should be deducting tokens for this work.
122+
var admitted bool
118123
for _, c := range connections {
119-
if err := h.controller.Admit(ctx, pri, ct, c); err != nil {
124+
var err error
125+
admitted, err = h.controller.Admit(ctx, pri, ct, c)
126+
if err != nil {
120127
h.metrics.onErrored(class, h.clock.PhysicalTime().Sub(tstart))
121-
return err
128+
return false, err
122129
}
123130
}
124131

125132
h.metrics.onAdmitted(class, h.clock.PhysicalTime().Sub(tstart))
126-
return nil
133+
return admitted, nil
127134
}
128135

129136
// DeductTokensFor is part of the kvflowcontrol.Handle interface.

pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,9 @@ func TestHandleAdmit(t *testing.T) {
110110
// the goroutine is blocked.
111111
admitCh := make(chan struct{})
112112
go func() {
113-
require.NoError(t, handle.Admit(ctx, admissionpb.NormalPri, time.Time{}))
113+
admitted, err := handle.Admit(ctx, admissionpb.NormalPri, time.Time{})
114+
require.NoError(t, err)
115+
require.True(t, admitted)
114116
close(admitCh)
115117
}()
116118

@@ -189,16 +191,26 @@ func TestFlowControlMode(t *testing.T) {
189191
handle.ConnectStream(ctx, pos(0), stream)
190192
handle.DeductTokensFor(ctx, admissionpb.NormalPri, pos(1), kvflowcontrol.Tokens(16<<20 /* 16MiB */))
191193

194+
mode := tc.mode // copy to avoid nogo error
195+
192196
// Invoke .Admit() for {regular,elastic} work in a separate
193197
// goroutines, and test below whether the goroutines are blocked.
194198
regularAdmitCh := make(chan struct{})
195199
elasticAdmitCh := make(chan struct{})
196200
go func() {
197-
require.NoError(t, handle.Admit(ctx, admissionpb.NormalPri, time.Time{}))
201+
admitted, err := handle.Admit(ctx, admissionpb.NormalPri, time.Time{})
202+
require.NoError(t, err)
203+
if mode == kvflowcontrol.ApplyToElastic {
204+
require.False(t, admitted)
205+
} else {
206+
require.True(t, admitted)
207+
}
198208
close(regularAdmitCh)
199209
}()
200210
go func() {
201-
require.NoError(t, handle.Admit(ctx, admissionpb.BulkNormalPri, time.Time{}))
211+
admitted, err := handle.Admit(ctx, admissionpb.BulkNormalPri, time.Time{})
212+
require.NoError(t, err)
213+
require.True(t, admitted)
202214
close(elasticAdmitCh)
203215
}()
204216

pkg/kv/kvserver/kvflowcontrol/kvflowhandle/noop.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@ type Noop struct{}
2626
var _ kvflowcontrol.Handle = Noop{}
2727

2828
// Admit is part of the kvflowcontrol.Handle interface.
29-
func (n Noop) Admit(ctx context.Context, priority admissionpb.WorkPriority, time time.Time) error {
30-
return nil
29+
func (n Noop) Admit(
30+
ctx context.Context, priority admissionpb.WorkPriority, time time.Time,
31+
) (bool, error) {
32+
return false, nil
3133
}
3234

3335
// DeductTokensFor is part of the kvflowcontrol.Handle interface.

pkg/kv/kvserver/replica_proposal_buf_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1166,8 +1166,8 @@ var _ kvflowcontrol.Handle = &testFlowTokenHandle{}
11661166

11671167
func (t *testFlowTokenHandle) Admit(
11681168
ctx context.Context, priority admissionpb.WorkPriority, t2 time.Time,
1169-
) error {
1170-
return nil
1169+
) (bool, error) {
1170+
return false, nil
11711171
}
11721172

11731173
func (t *testFlowTokenHandle) DeductTokensFor(

0 commit comments

Comments
 (0)