Skip to content

Commit 6e24092

Browse files
authored
Merge pull request #3368 from fossedihelm/priority-queue-fix-lock
🐛 priority queue: properly sync the `waiter` manipulation
2 parents c1e0fbc + bef0907 commit 6e24092

File tree

2 files changed

+61
-5
lines changed

2 files changed

+61
-5
lines changed

pkg/controller/priorityqueue/priorityqueue.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,8 @@ type priorityqueue[T comparable] struct {
124124
get chan item[T]
125125

126126
// waiters is the number of routines blocked in Get, we use it to determine
127-
// if we can push items.
128-
waiters atomic.Int64
127+
// if we can push items. Every manipulation has to be protected with the lock.
128+
waiters int64
129129

130130
// Configurable for testing
131131
now func() time.Time
@@ -269,15 +269,15 @@ func (w *priorityqueue[T]) spin() {
269269
}
270270
}
271271

272-
if w.waiters.Load() == 0 {
272+
if w.waiters == 0 {
273273
// Have to keep iterating here to ensure we update metrics
274274
// for further items that became ready and set nextReady.
275275
return true
276276
}
277277

278278
w.metrics.get(item.Key, item.Priority)
279279
w.locked.Insert(item.Key)
280-
w.waiters.Add(-1)
280+
w.waiters--
281281
delete(w.items, item.Key)
282282
toDelete = append(toDelete, item)
283283
w.becameReady.Delete(item.Key)
@@ -316,7 +316,9 @@ func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool)
316316
return zero, 0, true
317317
}
318318

319-
w.waiters.Add(1)
319+
w.lock.Lock()
320+
w.waiters++
321+
w.lock.Unlock()
320322

321323
w.notifyItemOrWaiterAdded()
322324

pkg/controller/priorityqueue/priorityqueue_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,60 @@ func newQueueWithTimeForwarder() (_ *priorityqueue[string], _ *fakeMetricsProvid
441441
}
442442
}
443443

444+
func TestHighPriorityItemsAreReturnedBeforeLowPriorityItemMultipleTimes(t *testing.T) {
445+
t.Parallel()
446+
synctest.Test(t, func(t *testing.T) {
447+
g := NewWithT(t)
448+
449+
q, metrics := newQueue()
450+
defer q.ShutDown()
451+
452+
const itemsPerPriority = 1000
453+
lowPriority := 0
454+
lowMiddlePriority := 5
455+
middlePriority := 10
456+
upperMiddlePriority := 15
457+
highPriority := 20
458+
for i := range itemsPerPriority {
459+
q.AddWithOpts(AddOpts{Priority: &highPriority}, fmt.Sprintf("high-%d", i))
460+
q.AddWithOpts(AddOpts{Priority: &upperMiddlePriority}, fmt.Sprintf("upperMiddle-%d", i))
461+
q.AddWithOpts(AddOpts{Priority: &middlePriority}, fmt.Sprintf("middle-%d", i))
462+
q.AddWithOpts(AddOpts{Priority: &lowMiddlePriority}, fmt.Sprintf("lowMiddle-%d", i))
463+
q.AddWithOpts(AddOpts{Priority: &lowPriority}, fmt.Sprintf("low-%d", i))
464+
}
465+
synctest.Wait()
466+
467+
for range itemsPerPriority {
468+
key, prio, _ := q.GetWithPriority()
469+
g.Expect(prio).To(Equal(highPriority))
470+
g.Expect(key).To(HavePrefix("high-"))
471+
}
472+
for range itemsPerPriority {
473+
key, prio, _ := q.GetWithPriority()
474+
g.Expect(prio).To(Equal(upperMiddlePriority))
475+
g.Expect(key).To(HavePrefix("upperMiddle-"))
476+
}
477+
for range itemsPerPriority {
478+
key, prio, _ := q.GetWithPriority()
479+
g.Expect(prio).To(Equal(middlePriority))
480+
g.Expect(key).To(HavePrefix("middle-"))
481+
}
482+
for range itemsPerPriority {
483+
key, prio, _ := q.GetWithPriority()
484+
g.Expect(prio).To(Equal(lowMiddlePriority))
485+
g.Expect(key).To(HavePrefix("lowMiddle-"))
486+
}
487+
for range itemsPerPriority {
488+
key, prio, _ := q.GetWithPriority()
489+
g.Expect(prio).To(Equal(lowPriority))
490+
g.Expect(key).To(HavePrefix("low-"))
491+
}
492+
g.Expect(metrics.depth["test"]).To(Equal(map[int]int{10: 0, 5: 0, 0: 0, 20: 0, 15: 0}))
493+
g.Expect(metrics.adds["test"]).To(Equal(itemsPerPriority * 5))
494+
g.Expect(metrics.retries["test"]).To(Equal(0))
495+
})
496+
}
497+
444498
func newQueue() (*priorityqueue[string], *fakeMetricsProvider) {
445499
metrics := newFakeMetricsProvider()
446500
q := New("test", func(o *Opts[string]) {

0 commit comments

Comments
 (0)