diff --git a/pkg/util/workqueue/delaying_queue.go b/pkg/util/workqueue/delaying_queue.go index 1bffccb95c8..f24c8815592 100644 --- a/pkg/util/workqueue/delaying_queue.go +++ b/pkg/util/workqueue/delaying_queue.go @@ -39,11 +39,12 @@ func NewDelayingQueue() DelayingInterface { func newDelayingQueue(clock util.Clock) DelayingInterface { ret := &delayingType{ - Interface: New(), - clock: clock, - heartbeat: clock.Tick(maxWait), - stopCh: make(chan struct{}), - waitingForAddCh: make(chan waitFor, 1000), + Interface: New(), + clock: clock, + heartbeat: clock.Tick(maxWait), + stopCh: make(chan struct{}), + waitingTimeByEntry: map[t]time.Time{}, + waitingForAddCh: make(chan waitFor, 1000), } go ret.waitingLoop() @@ -66,6 +67,8 @@ type delayingType struct { // waitingForAdd is an ordered slice of items to be added to the contained work queue waitingForAdd []waitFor + // waitingTimeByEntry holds wait time by entry, so we can lookup pre-existing indexes + waitingTimeByEntry map[t]time.Time // waitingForAddCh is a buffered channel that feeds waitingForAdd waitingForAddCh chan waitFor } @@ -118,6 +121,7 @@ func (q *delayingType) waitingLoop() { if q.Interface.ShuttingDown() { // discard waiting entries q.waitingForAdd = nil + q.waitingTimeByEntry = nil return } @@ -130,6 +134,7 @@ func (q *delayingType) waitingLoop() { break } q.Add(entry.data) + delete(q.waitingTimeByEntry, entry.data) readyEntries++ } q.waitingForAdd = q.waitingForAdd[readyEntries:] @@ -152,7 +157,7 @@ func (q *delayingType) waitingLoop() { case waitEntry := <-q.waitingForAddCh: if waitEntry.readyAt.After(q.clock.Now()) { - q.waitingForAdd = insert(q.waitingForAdd, waitEntry) + q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry) } else { q.Add(waitEntry.data) } @@ -162,7 +167,7 @@ func (q *delayingType) waitingLoop() { select { case waitEntry := <-q.waitingForAddCh: if waitEntry.readyAt.After(q.clock.Now()) { - q.waitingForAdd = insert(q.waitingForAdd, waitEntry) + q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry) } else { q.Add(waitEntry.data) } @@ -177,7 +182,20 @@ func (q *delayingType) waitingLoop() { // inserts the given entry into the sorted entries list // same semantics as append()... the given slice may be modified, // and the returned value should be used -func insert(entries []waitFor, entry waitFor) []waitFor { +func insert(entries []waitFor, knownEntries map[t]time.Time, entry waitFor) []waitFor { + // if the entry is already in our retry list and the existing time is before the new one, just skip it + existingTime, exists := knownEntries[entry.data] + if exists && existingTime.Before(entry.readyAt) { + return entries + } + + // if the entry exists and is scheduled for later, go ahead and remove the entry + if exists { + if existingIndex := findEntryIndex(entries, existingTime, entry.data); existingIndex >= 0 && existingIndex < len(entries) { + entries = append(entries[:existingIndex], entries[existingIndex+1:]...) + } + } + insertionIndex := sort.Search(len(entries), func(i int) bool { return entry.readyAt.Before(entries[i].readyAt) }) @@ -189,5 +207,24 @@ func insert(entries []waitFor, entry waitFor) []waitFor { // insert the record entries[insertionIndex] = entry + knownEntries[entry.data] = entry.readyAt + return entries } + +// findEntryIndex returns the index for an existing entry +func findEntryIndex(entries []waitFor, existingTime time.Time, data t) int { + index := sort.Search(len(entries), func(i int) bool { + return entries[i].readyAt.After(existingTime) || existingTime == entries[i].readyAt + }) + + // we know this is the earliest possible index, but there could be multiple with the same time + // iterate from here to find the dupe + for ; index < len(entries); index++ { + if entries[index].data == data { + break + } + } + + return index +} diff --git a/pkg/util/workqueue/delaying_queue_test.go b/pkg/util/workqueue/delaying_queue_test.go index 696a8bc2418..e2a21dadf3b 100644 --- a/pkg/util/workqueue/delaying_queue_test.go +++ b/pkg/util/workqueue/delaying_queue_test.go @@ -43,7 +43,7 @@ func TestSimpleQueue(t *testing.T) { fakeClock.Step(60 * time.Millisecond) - if err := waitForAdded(t, q, 1); err != nil { + if err := waitForAdded(q, 1); err != nil { t.Errorf("should have added") } item, _ := q.Get() @@ -68,6 +68,65 @@ func TestSimpleQueue(t *testing.T) { } } +func TestDeduping(t *testing.T) { + fakeClock := util.NewFakeClock(time.Now()) + q := newDelayingQueue(fakeClock) + + first := "foo" + + q.AddAfter(first, 50*time.Millisecond) + if err := waitForWaitingQueueToFill(q); err != nil { + t.Fatalf("unexpected err: %v", err) + } + q.AddAfter(first, 70*time.Millisecond) + if err := waitForWaitingQueueToFill(q); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if q.Len() != 0 { + t.Errorf("should not have added") + } + + // step past the first block, we should receive now + fakeClock.Step(60 * time.Millisecond) + if err := waitForAdded(q, 1); err != nil { + t.Errorf("should have added") + } + item, _ := q.Get() + q.Done(item) + + // step past the second add + fakeClock.Step(20 * time.Millisecond) + if q.Len() != 0 { + t.Errorf("should not have added") + } + + // test again, but this time the earlier should override + q.AddAfter(first, 50*time.Millisecond) + q.AddAfter(first, 30*time.Millisecond) + if err := waitForWaitingQueueToFill(q); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if q.Len() != 0 { + t.Errorf("should not have added") + } + + fakeClock.Step(40 * time.Millisecond) + if err := waitForAdded(q, 1); err != nil { + t.Errorf("should have added") + } + item, _ = q.Get() + q.Done(item) + + // step past the second add + fakeClock.Step(20 * time.Millisecond) + if q.Len() != 0 { + t.Errorf("should not have added") + } + if q.Len() != 0 { + t.Errorf("should not have added") + } +} + func TestAddTwoFireEarly(t *testing.T) { fakeClock := util.NewFakeClock(time.Now()) q := newDelayingQueue(fakeClock) @@ -88,7 +147,7 @@ func TestAddTwoFireEarly(t *testing.T) { fakeClock.Step(60 * time.Millisecond) - if err := waitForAdded(t, q, 1); err != nil { + if err := waitForAdded(q, 1); err != nil { t.Fatalf("unexpected err: %v", err) } item, _ := q.Get() @@ -99,7 +158,7 @@ func TestAddTwoFireEarly(t *testing.T) { q.AddAfter(third, 2*time.Second) fakeClock.Step(1 * time.Second) - if err := waitForAdded(t, q, 1); err != nil { + if err := waitForAdded(q, 1); err != nil { t.Fatalf("unexpected err: %v", err) } item, _ = q.Get() @@ -108,7 +167,7 @@ func TestAddTwoFireEarly(t *testing.T) { } fakeClock.Step(2 * time.Second) - if err := waitForAdded(t, q, 1); err != nil { + if err := waitForAdded(q, 1); err != nil { t.Fatalf("unexpected err: %v", err) } item, _ = q.Get() @@ -139,7 +198,7 @@ func TestCopyShifting(t *testing.T) { fakeClock.Step(2 * time.Second) - if err := waitForAdded(t, q, 3); err != nil { + if err := waitForAdded(q, 3); err != nil { t.Fatalf("unexpected err: %v", err) } actualFirst, _ := q.Get() @@ -156,19 +215,14 @@ func TestCopyShifting(t *testing.T) { } } -func waitForAdded(t *testing.T, q DelayingInterface, depth int) error { - err := wait.Poll(1*time.Millisecond, 20*time.Second, func() (done bool, err error) { +func waitForAdded(q DelayingInterface, depth int) error { + return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) { if q.Len() == depth { return true, nil } return false, nil }) - - if err != nil { - t.Logf("failed: len=%v, everything=%#v", q.Len(), q) - } - return err } func waitForWaitingQueueToFill(q DelayingInterface) error {