Merge pull request #127944 from antoninbas/use-generics-for-delaying-queue-waitFor

Use generics for waitFor (delaying workqueue) in client-go

Kubernetes-commit: f7fef0384ec1c666e6b68cae83a3b3c6d95aa7b0
This commit is contained in:
Kubernetes Publisher 2024-10-09 09:42:23 +01:00
commit cae730524e
5 changed files with 25 additions and 25 deletions

View File

@ -141,7 +141,7 @@ func newDelayingQueue[T comparable](clock clock.WithTicker, q TypedInterface[T],
clock: clock, clock: clock,
heartbeat: clock.NewTicker(maxWait), heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000), waitingForAddCh: make(chan *waitFor[T], 1000),
metrics: newRetryMetrics(name, provider), metrics: newRetryMetrics(name, provider),
} }
@ -165,15 +165,15 @@ type delayingType[T comparable] struct {
heartbeat clock.Ticker heartbeat clock.Ticker
// waitingForAddCh is a buffered channel that feeds waitingForAdd // waitingForAddCh is a buffered channel that feeds waitingForAdd
waitingForAddCh chan *waitFor waitingForAddCh chan *waitFor[T]
// metrics counts the number of retries // metrics counts the number of retries
metrics retryMetrics metrics retryMetrics
} }
// waitFor holds the data to add and the time it should be added // waitFor holds the data to add and the time it should be added
type waitFor struct { type waitFor[T any] struct {
data t data T
readyAt time.Time readyAt time.Time
// index in the priority queue (heap) // index in the priority queue (heap)
index int index int
@ -187,15 +187,15 @@ type waitFor struct {
// it has been removed from the queue and placed at index Len()-1 by // it has been removed from the queue and placed at index Len()-1 by
// container/heap. Push adds an item at index Len(), and container/heap // container/heap. Push adds an item at index Len(), and container/heap
// percolates it into the correct location. // percolates it into the correct location.
type waitForPriorityQueue []*waitFor type waitForPriorityQueue[T any] []*waitFor[T]
func (pq waitForPriorityQueue) Len() int { func (pq waitForPriorityQueue[T]) Len() int {
return len(pq) return len(pq)
} }
func (pq waitForPriorityQueue) Less(i, j int) bool { func (pq waitForPriorityQueue[T]) Less(i, j int) bool {
return pq[i].readyAt.Before(pq[j].readyAt) return pq[i].readyAt.Before(pq[j].readyAt)
} }
func (pq waitForPriorityQueue) Swap(i, j int) { func (pq waitForPriorityQueue[T]) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i] pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i pq[i].index = i
pq[j].index = j pq[j].index = j
@ -203,16 +203,16 @@ func (pq waitForPriorityQueue) Swap(i, j int) {
// Push adds an item to the queue. Push should not be called directly; instead, // Push adds an item to the queue. Push should not be called directly; instead,
// use `heap.Push`. // use `heap.Push`.
func (pq *waitForPriorityQueue) Push(x interface{}) { func (pq *waitForPriorityQueue[T]) Push(x interface{}) {
n := len(*pq) n := len(*pq)
item := x.(*waitFor) item := x.(*waitFor[T])
item.index = n item.index = n
*pq = append(*pq, item) *pq = append(*pq, item)
} }
// Pop removes an item from the queue. Pop should not be called directly; // Pop removes an item from the queue. Pop should not be called directly;
// instead, use `heap.Pop`. // instead, use `heap.Pop`.
func (pq *waitForPriorityQueue) Pop() interface{} { func (pq *waitForPriorityQueue[T]) Pop() interface{} {
n := len(*pq) n := len(*pq)
item := (*pq)[n-1] item := (*pq)[n-1]
item.index = -1 item.index = -1
@ -222,7 +222,7 @@ func (pq *waitForPriorityQueue) Pop() interface{} {
// Peek returns the item at the beginning of the queue, without removing the // Peek returns the item at the beginning of the queue, without removing the
// item or otherwise mutating the queue. It is safe to call directly. // item or otherwise mutating the queue. It is safe to call directly.
func (pq waitForPriorityQueue) Peek() interface{} { func (pq waitForPriorityQueue[T]) Peek() interface{} {
return pq[0] return pq[0]
} }
@ -254,7 +254,7 @@ func (q *delayingType[T]) AddAfter(item T, duration time.Duration) {
select { select {
case <-q.stopCh: case <-q.stopCh:
// unblock if ShutDown() is called // unblock if ShutDown() is called
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}: case q.waitingForAddCh <- &waitFor[T]{data: item, readyAt: q.clock.Now().Add(duration)}:
} }
} }
@ -273,10 +273,10 @@ func (q *delayingType[T]) waitingLoop() {
// Make a timer that expires when the item at the head of the waiting queue is ready // Make a timer that expires when the item at the head of the waiting queue is ready
var nextReadyAtTimer clock.Timer var nextReadyAtTimer clock.Timer
waitingForQueue := &waitForPriorityQueue{} waitingForQueue := &waitForPriorityQueue[T]{}
heap.Init(waitingForQueue) heap.Init(waitingForQueue)
waitingEntryByData := map[t]*waitFor{} waitingEntryByData := map[T]*waitFor[T]{}
for { for {
if q.TypedInterface.ShuttingDown() { if q.TypedInterface.ShuttingDown() {
@ -287,13 +287,13 @@ func (q *delayingType[T]) waitingLoop() {
// Add ready entries // Add ready entries
for waitingForQueue.Len() > 0 { for waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor) entry := waitingForQueue.Peek().(*waitFor[T])
if entry.readyAt.After(now) { if entry.readyAt.After(now) {
break break
} }
entry = heap.Pop(waitingForQueue).(*waitFor) entry = heap.Pop(waitingForQueue).(*waitFor[T])
q.Add(entry.data.(T)) q.Add(entry.data)
delete(waitingEntryByData, entry.data) delete(waitingEntryByData, entry.data)
} }
@ -303,7 +303,7 @@ func (q *delayingType[T]) waitingLoop() {
if nextReadyAtTimer != nil { if nextReadyAtTimer != nil {
nextReadyAtTimer.Stop() nextReadyAtTimer.Stop()
} }
entry := waitingForQueue.Peek().(*waitFor) entry := waitingForQueue.Peek().(*waitFor[T])
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now)) nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
nextReadyAt = nextReadyAtTimer.C() nextReadyAt = nextReadyAtTimer.C()
} }
@ -322,7 +322,7 @@ func (q *delayingType[T]) waitingLoop() {
if waitEntry.readyAt.After(q.clock.Now()) { if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry) insert(waitingForQueue, waitingEntryByData, waitEntry)
} else { } else {
q.Add(waitEntry.data.(T)) q.Add(waitEntry.data)
} }
drained := false drained := false
@ -332,7 +332,7 @@ func (q *delayingType[T]) waitingLoop() {
if waitEntry.readyAt.After(q.clock.Now()) { if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry) insert(waitingForQueue, waitingEntryByData, waitEntry)
} else { } else {
q.Add(waitEntry.data.(T)) q.Add(waitEntry.data)
} }
default: default:
drained = true drained = true
@ -343,7 +343,7 @@ func (q *delayingType[T]) waitingLoop() {
} }
// insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue // insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) { func insert[T comparable](q *waitForPriorityQueue[T], knownEntries map[T]*waitFor[T], entry *waitFor[T]) {
// if the entry already exists, update the time only if it would cause the item to be queued sooner // if the entry already exists, update the time only if it would cause the item to be queued sooner
existing, exists := knownEntries[entry.data] existing, exists := knownEntries[entry.data]
if exists { if exists {

View File

@ -214,7 +214,7 @@ func TestCopyShifting(t *testing.T) {
func BenchmarkDelayingQueue_AddAfter(b *testing.B) { func BenchmarkDelayingQueue_AddAfter(b *testing.B) {
fakeClock := testingclock.NewFakeClock(time.Now()) fakeClock := testingclock.NewFakeClock(time.Now())
q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock}) q := NewTypedDelayingQueueWithConfig(TypedDelayingQueueConfig[string]{Clock: fakeClock})
// Add items // Add items
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {

View File

@ -212,7 +212,6 @@ type Typed[t comparable] struct {
} }
type empty struct{} type empty struct{}
type t interface{}
type set[t comparable] map[t]empty type set[t comparable] map[t]empty
func (s set[t]) has(item t) bool { func (s set[t]) has(item t) bool {

View File

@ -467,6 +467,7 @@ func BenchmarkQueue(b *testing.B) {
for idx := range keys { for idx := range keys {
keys[idx] = fmt.Sprintf("key-%d", idx) keys[idx] = fmt.Sprintf("key-%d", idx)
} }
b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
b.StopTimer() b.StopTimer()
q := workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[string]{}) q := workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[string]{})

View File

@ -32,7 +32,7 @@ func TestRateLimitingQueue(t *testing.T) {
clock: fakeClock, clock: fakeClock,
heartbeat: fakeClock.NewTicker(maxWait), heartbeat: fakeClock.NewTicker(maxWait),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000), waitingForAddCh: make(chan *waitFor[any], 1000),
metrics: newRetryMetrics("", nil), metrics: newRetryMetrics("", nil),
} }
queue.TypedDelayingInterface = delayingQueue queue.TypedDelayingInterface = delayingQueue