diff --git a/util/workqueue/delaying_queue.go b/util/workqueue/delaying_queue.go index 57c77e4b..e33a6c69 100644 --- a/util/workqueue/delaying_queue.go +++ b/util/workqueue/delaying_queue.go @@ -141,7 +141,7 @@ func newDelayingQueue[T comparable](clock clock.WithTicker, q TypedInterface[T], clock: clock, heartbeat: clock.NewTicker(maxWait), stopCh: make(chan struct{}), - waitingForAddCh: make(chan *waitFor, 1000), + waitingForAddCh: make(chan *waitFor[T], 1000), metrics: newRetryMetrics(name, provider), } @@ -165,15 +165,15 @@ type delayingType[T comparable] struct { heartbeat clock.Ticker // waitingForAddCh is a buffered channel that feeds waitingForAdd - waitingForAddCh chan *waitFor + waitingForAddCh chan *waitFor[T] // metrics counts the number of retries metrics retryMetrics } // waitFor holds the data to add and the time it should be added -type waitFor struct { - data t +type waitFor[T any] struct { + data T readyAt time.Time // index in the priority queue (heap) index int @@ -187,15 +187,15 @@ type waitFor struct { // it has been removed from the queue and placed at index Len()-1 by // container/heap. Push adds an item at index Len(), and container/heap // percolates it into the correct location. -type waitForPriorityQueue []*waitFor +type waitForPriorityQueue[T any] []*waitFor[T] -func (pq waitForPriorityQueue) Len() int { +func (pq waitForPriorityQueue[T]) Len() int { return len(pq) } -func (pq waitForPriorityQueue) Less(i, j int) bool { +func (pq waitForPriorityQueue[T]) Less(i, j int) bool { return pq[i].readyAt.Before(pq[j].readyAt) } -func (pq waitForPriorityQueue) Swap(i, j int) { +func (pq waitForPriorityQueue[T]) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] pq[i].index = i pq[j].index = j @@ -203,16 +203,16 @@ func (pq waitForPriorityQueue) Swap(i, j int) { // Push adds an item to the queue. Push should not be called directly; instead, // use `heap.Push`. -func (pq *waitForPriorityQueue) Push(x interface{}) { +func (pq *waitForPriorityQueue[T]) Push(x interface{}) { n := len(*pq) - item := x.(*waitFor) + item := x.(*waitFor[T]) item.index = n *pq = append(*pq, item) } // Pop removes an item from the queue. Pop should not be called directly; // instead, use `heap.Pop`. -func (pq *waitForPriorityQueue) Pop() interface{} { +func (pq *waitForPriorityQueue[T]) Pop() interface{} { n := len(*pq) item := (*pq)[n-1] item.index = -1 @@ -222,7 +222,7 @@ func (pq *waitForPriorityQueue) Pop() interface{} { // Peek returns the item at the beginning of the queue, without removing the // item or otherwise mutating the queue. It is safe to call directly. -func (pq waitForPriorityQueue) Peek() interface{} { +func (pq waitForPriorityQueue[T]) Peek() interface{} { return pq[0] } @@ -254,7 +254,7 @@ func (q *delayingType[T]) AddAfter(item T, duration time.Duration) { select { case <-q.stopCh: // unblock if ShutDown() is called - case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}: + case q.waitingForAddCh <- &waitFor[T]{data: item, readyAt: q.clock.Now().Add(duration)}: } } @@ -273,10 +273,10 @@ func (q *delayingType[T]) waitingLoop() { // Make a timer that expires when the item at the head of the waiting queue is ready var nextReadyAtTimer clock.Timer - waitingForQueue := &waitForPriorityQueue{} + waitingForQueue := &waitForPriorityQueue[T]{} heap.Init(waitingForQueue) - waitingEntryByData := map[t]*waitFor{} + waitingEntryByData := map[T]*waitFor[T]{} for { if q.TypedInterface.ShuttingDown() { @@ -287,13 +287,13 @@ func (q *delayingType[T]) waitingLoop() { // Add ready entries for waitingForQueue.Len() > 0 { - entry := waitingForQueue.Peek().(*waitFor) + entry := waitingForQueue.Peek().(*waitFor[T]) if entry.readyAt.After(now) { break } - entry = heap.Pop(waitingForQueue).(*waitFor) - q.Add(entry.data.(T)) + entry = heap.Pop(waitingForQueue).(*waitFor[T]) + q.Add(entry.data) delete(waitingEntryByData, entry.data) } @@ -303,7 +303,7 @@ func (q *delayingType[T]) waitingLoop() { if nextReadyAtTimer != nil { nextReadyAtTimer.Stop() } - entry := waitingForQueue.Peek().(*waitFor) + entry := waitingForQueue.Peek().(*waitFor[T]) nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now)) nextReadyAt = nextReadyAtTimer.C() } @@ -322,7 +322,7 @@ func (q *delayingType[T]) waitingLoop() { if waitEntry.readyAt.After(q.clock.Now()) { insert(waitingForQueue, waitingEntryByData, waitEntry) } else { - q.Add(waitEntry.data.(T)) + q.Add(waitEntry.data) } drained := false @@ -332,7 +332,7 @@ func (q *delayingType[T]) waitingLoop() { if waitEntry.readyAt.After(q.clock.Now()) { insert(waitingForQueue, waitingEntryByData, waitEntry) } else { - q.Add(waitEntry.data.(T)) + q.Add(waitEntry.data) } default: drained = true @@ -343,7 +343,7 @@ func (q *delayingType[T]) waitingLoop() { } // insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue -func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) { +func insert[T comparable](q *waitForPriorityQueue[T], knownEntries map[T]*waitFor[T], entry *waitFor[T]) { // if the entry already exists, update the time only if it would cause the item to be queued sooner existing, exists := knownEntries[entry.data] if exists { diff --git a/util/workqueue/delaying_queue_test.go b/util/workqueue/delaying_queue_test.go index 5eac320b..3c50709b 100644 --- a/util/workqueue/delaying_queue_test.go +++ b/util/workqueue/delaying_queue_test.go @@ -214,7 +214,7 @@ func TestCopyShifting(t *testing.T) { func BenchmarkDelayingQueue_AddAfter(b *testing.B) { fakeClock := testingclock.NewFakeClock(time.Now()) - q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock}) + q := NewTypedDelayingQueueWithConfig(TypedDelayingQueueConfig[string]{Clock: fakeClock}) // Add items for n := 0; n < b.N; n++ { diff --git a/util/workqueue/queue.go b/util/workqueue/queue.go index 226d9f9d..3cec1768 100644 --- a/util/workqueue/queue.go +++ b/util/workqueue/queue.go @@ -212,7 +212,6 @@ type Typed[t comparable] struct { } type empty struct{} -type t interface{} type set[t comparable] map[t]empty func (s set[t]) has(item t) bool { diff --git a/util/workqueue/queue_test.go b/util/workqueue/queue_test.go index 7ede527f..e6f84fae 100644 --- a/util/workqueue/queue_test.go +++ b/util/workqueue/queue_test.go @@ -467,6 +467,7 @@ func BenchmarkQueue(b *testing.B) { for idx := range keys { keys[idx] = fmt.Sprintf("key-%d", idx) } + b.ResetTimer() for i := 0; i < b.N; i++ { b.StopTimer() q := workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[string]{}) diff --git a/util/workqueue/rate_limiting_queue_test.go b/util/workqueue/rate_limiting_queue_test.go index 3c55f829..9bf22c4b 100644 --- a/util/workqueue/rate_limiting_queue_test.go +++ b/util/workqueue/rate_limiting_queue_test.go @@ -32,7 +32,7 @@ func TestRateLimitingQueue(t *testing.T) { clock: fakeClock, heartbeat: fakeClock.NewTicker(maxWait), stopCh: make(chan struct{}), - waitingForAddCh: make(chan *waitFor, 1000), + waitingForAddCh: make(chan *waitFor[any], 1000), metrics: newRetryMetrics("", nil), } queue.TypedDelayingInterface = delayingQueue