Skip to content

Commit 2b8e3a5

Browse files
HagarMeirtock-ibm
authored andcommitted
address review comments
Signed-off-by: Hagar Meir <[email protected]>
1 parent 3d2cb37 commit 2b8e3a5

File tree

3 files changed

+24
-2
lines changed

3 files changed

+24
-2
lines changed

node/batcher/batcher_builder.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ func createMemPool(b *Batcher, config *node_config.BatcherNodeConfig) MemPool {
127127
BatchMaxSize: config.BatchMaxSize,
128128
BatchMaxSizeBytes: config.BatchMaxBytes,
129129
RequestMaxBytes: config.RequestMaxBytes,
130+
BatchTimeout: config.BatchCreationTimeout,
130131
SubmitTimeout: config.SubmitTimeout,
131132
FirstStrikeThreshold: config.FirstStrikeThreshold,
132133
SecondStrikeThreshold: config.SecondStrikeThreshold,

request/pool.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ type PoolOptions struct {
6262
BatchMaxSize uint32
6363
BatchMaxSizeBytes uint32
6464
RequestMaxBytes uint64
65+
BatchTimeout time.Duration
6566
SubmitTimeout time.Duration
6667
FirstStrikeThreshold time.Duration
6768
SecondStrikeThreshold time.Duration
@@ -91,6 +92,17 @@ func (rp *Pool) start() {
9192
rp.pending.Start()
9293
}
9394

95+
func (rp *Pool) addRandomnessToFirstStrike() time.Duration {
96+
if rp.options.BatchTimeout.Milliseconds() == 0 {
97+
return time.Duration(0) // no randomness
98+
}
99+
return time.Duration(randRange(int(2*rp.options.BatchTimeout.Milliseconds()), int(-2*rp.options.BatchTimeout.Milliseconds()))) * time.Millisecond
100+
}
101+
102+
func randRange(max, min int) int {
103+
return rand.Intn(max-min) + min
104+
}
105+
94106
func (rp *Pool) createPendingStore() *PendingStore {
95107
return &PendingStore{
96108
Inspector: rp.inspector,
@@ -100,12 +112,12 @@ func (rp *Pool) createPendingStore() *PendingStore {
100112
StartTime: time.Now(),
101113
Logger: rp.logger,
102114
SecondStrikeThreshold: rp.options.SecondStrikeThreshold,
103-
FirstStrikeThreshold: rp.options.FirstStrikeThreshold,
115+
FirstStrikeThreshold: rp.options.FirstStrikeThreshold + rp.addRandomnessToFirstStrike(),
104116
OnDelete: func(key string) {
105117
rp.semaphore.Release(1)
106118
atomic.AddInt64(&rp.size, -1)
107119
},
108-
Epoch: time.Duration(rand.Intn(50))*time.Millisecond + rp.options.FirstStrikeThreshold/10,
120+
Epoch: rp.options.FirstStrikeThreshold / 10,
109121
FirstStrikeCallback: rp.striker.OnFirstStrikeTimeout,
110122
SecondStrikeCallback: rp.striker.OnSecondStrikeTimeout,
111123
}

request/pool_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ func TestRestartPool(t *testing.T) {
146146
SubmitTimeout: time.Second * 10,
147147
}, &striker{})
148148

149+
assert.Equal(t, 5*time.Second, pool.pending.FirstStrikeThreshold)
150+
149151
pool.Restart(true)
150152

151153
count := 100
@@ -214,6 +216,8 @@ func TestBasicBatching(t *testing.T) {
214216
SubmitTimeout: time.Second * 10,
215217
}, &striker{})
216218

219+
assert.Equal(t, 5*time.Second, pool.pending.FirstStrikeThreshold)
220+
217221
pool.Restart(true)
218222

219223
ctx, cancel1 := context.WithTimeout(context.Background(), time.Second)
@@ -253,8 +257,13 @@ func TestBasicBatching(t *testing.T) {
253257
RequestMaxBytes: 100 * 1024,
254258
AutoRemoveTimeout: time.Second * 10,
255259
SubmitTimeout: time.Second * 10,
260+
BatchTimeout: time.Second,
256261
}, &striker{})
257262

263+
t.Logf("First strike with random is %f seconds\n", pool.pending.FirstStrikeThreshold.Seconds())
264+
assert.GreaterOrEqual(t, 7*time.Second, pool.pending.FirstStrikeThreshold)
265+
assert.LessOrEqual(t, 3*time.Second, pool.pending.FirstStrikeThreshold)
266+
258267
pool.Restart(true)
259268

260269
assert.NoError(t, pool.Submit(byteReq4))

0 commit comments

Comments
 (0)