|
|
|
@@ -141,7 +141,7 @@ func newDelayingQueue[T comparable](clock clock.WithTicker, q TypedInterface[T],
|
|
|
|
|
clock: clock,
|
|
|
|
|
heartbeat: clock.NewTicker(maxWait),
|
|
|
|
|
stopCh: make(chan struct{}),
|
|
|
|
|
waitingForAddCh: make(chan *waitFor, 1000),
|
|
|
|
|
waitingForAddCh: make(chan *waitFor[T], 1000),
|
|
|
|
|
metrics: newRetryMetrics(name, provider),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -165,15 +165,15 @@ type delayingType[T comparable] struct {
|
|
|
|
|
heartbeat clock.Ticker
|
|
|
|
|
|
|
|
|
|
// waitingForAddCh is a buffered channel that feeds waitingForAdd
|
|
|
|
|
waitingForAddCh chan *waitFor
|
|
|
|
|
waitingForAddCh chan *waitFor[T]
|
|
|
|
|
|
|
|
|
|
// metrics counts the number of retries
|
|
|
|
|
metrics retryMetrics
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// waitFor holds the data to add and the time it should be added
|
|
|
|
|
type waitFor struct {
|
|
|
|
|
data t
|
|
|
|
|
type waitFor[T any] struct {
|
|
|
|
|
data T
|
|
|
|
|
readyAt time.Time
|
|
|
|
|
// index in the priority queue (heap)
|
|
|
|
|
index int
|
|
|
|
@@ -187,15 +187,15 @@ type waitFor struct {
|
|
|
|
|
// it has been removed from the queue and placed at index Len()-1 by
|
|
|
|
|
// container/heap. Push adds an item at index Len(), and container/heap
|
|
|
|
|
// percolates it into the correct location.
|
|
|
|
|
type waitForPriorityQueue []*waitFor
|
|
|
|
|
type waitForPriorityQueue[T any] []*waitFor[T]
|
|
|
|
|
|
|
|
|
|
func (pq waitForPriorityQueue) Len() int {
|
|
|
|
|
func (pq waitForPriorityQueue[T]) Len() int {
|
|
|
|
|
return len(pq)
|
|
|
|
|
}
|
|
|
|
|
func (pq waitForPriorityQueue) Less(i, j int) bool {
|
|
|
|
|
func (pq waitForPriorityQueue[T]) Less(i, j int) bool {
|
|
|
|
|
return pq[i].readyAt.Before(pq[j].readyAt)
|
|
|
|
|
}
|
|
|
|
|
func (pq waitForPriorityQueue) Swap(i, j int) {
|
|
|
|
|
func (pq waitForPriorityQueue[T]) Swap(i, j int) {
|
|
|
|
|
pq[i], pq[j] = pq[j], pq[i]
|
|
|
|
|
pq[i].index = i
|
|
|
|
|
pq[j].index = j
|
|
|
|
@@ -203,16 +203,16 @@ func (pq waitForPriorityQueue) Swap(i, j int) {
|
|
|
|
|
|
|
|
|
|
// Push adds an item to the queue. Push should not be called directly; instead,
|
|
|
|
|
// use `heap.Push`.
|
|
|
|
|
func (pq *waitForPriorityQueue) Push(x interface{}) {
|
|
|
|
|
func (pq *waitForPriorityQueue[T]) Push(x interface{}) {
|
|
|
|
|
n := len(*pq)
|
|
|
|
|
item := x.(*waitFor)
|
|
|
|
|
item := x.(*waitFor[T])
|
|
|
|
|
item.index = n
|
|
|
|
|
*pq = append(*pq, item)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Pop removes an item from the queue. Pop should not be called directly;
|
|
|
|
|
// instead, use `heap.Pop`.
|
|
|
|
|
func (pq *waitForPriorityQueue) Pop() interface{} {
|
|
|
|
|
func (pq *waitForPriorityQueue[T]) Pop() interface{} {
|
|
|
|
|
n := len(*pq)
|
|
|
|
|
item := (*pq)[n-1]
|
|
|
|
|
item.index = -1
|
|
|
|
@@ -222,7 +222,7 @@ func (pq *waitForPriorityQueue) Pop() interface{} {
|
|
|
|
|
|
|
|
|
|
// Peek returns the item at the beginning of the queue, without removing the
|
|
|
|
|
// item or otherwise mutating the queue. It is safe to call directly.
|
|
|
|
|
func (pq waitForPriorityQueue) Peek() interface{} {
|
|
|
|
|
func (pq waitForPriorityQueue[T]) Peek() interface{} {
|
|
|
|
|
return pq[0]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -254,7 +254,7 @@ func (q *delayingType[T]) AddAfter(item T, duration time.Duration) {
|
|
|
|
|
select {
|
|
|
|
|
case <-q.stopCh:
|
|
|
|
|
// unblock if ShutDown() is called
|
|
|
|
|
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
|
|
|
|
|
case q.waitingForAddCh <- &waitFor[T]{data: item, readyAt: q.clock.Now().Add(duration)}:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -273,10 +273,10 @@ func (q *delayingType[T]) waitingLoop() {
|
|
|
|
|
// Make a timer that expires when the item at the head of the waiting queue is ready
|
|
|
|
|
var nextReadyAtTimer clock.Timer
|
|
|
|
|
|
|
|
|
|
waitingForQueue := &waitForPriorityQueue{}
|
|
|
|
|
waitingForQueue := &waitForPriorityQueue[T]{}
|
|
|
|
|
heap.Init(waitingForQueue)
|
|
|
|
|
|
|
|
|
|
waitingEntryByData := map[t]*waitFor{}
|
|
|
|
|
waitingEntryByData := map[T]*waitFor[T]{}
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
if q.TypedInterface.ShuttingDown() {
|
|
|
|
@@ -287,13 +287,13 @@ func (q *delayingType[T]) waitingLoop() {
|
|
|
|
|
|
|
|
|
|
// Add ready entries
|
|
|
|
|
for waitingForQueue.Len() > 0 {
|
|
|
|
|
entry := waitingForQueue.Peek().(*waitFor)
|
|
|
|
|
entry := waitingForQueue.Peek().(*waitFor[T])
|
|
|
|
|
if entry.readyAt.After(now) {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
entry = heap.Pop(waitingForQueue).(*waitFor)
|
|
|
|
|
q.Add(entry.data.(T))
|
|
|
|
|
entry = heap.Pop(waitingForQueue).(*waitFor[T])
|
|
|
|
|
q.Add(entry.data)
|
|
|
|
|
delete(waitingEntryByData, entry.data)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -303,7 +303,7 @@ func (q *delayingType[T]) waitingLoop() {
|
|
|
|
|
if nextReadyAtTimer != nil {
|
|
|
|
|
nextReadyAtTimer.Stop()
|
|
|
|
|
}
|
|
|
|
|
entry := waitingForQueue.Peek().(*waitFor)
|
|
|
|
|
entry := waitingForQueue.Peek().(*waitFor[T])
|
|
|
|
|
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
|
|
|
|
|
nextReadyAt = nextReadyAtTimer.C()
|
|
|
|
|
}
|
|
|
|
@@ -322,7 +322,7 @@ func (q *delayingType[T]) waitingLoop() {
|
|
|
|
|
if waitEntry.readyAt.After(q.clock.Now()) {
|
|
|
|
|
insert(waitingForQueue, waitingEntryByData, waitEntry)
|
|
|
|
|
} else {
|
|
|
|
|
q.Add(waitEntry.data.(T))
|
|
|
|
|
q.Add(waitEntry.data)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
drained := false
|
|
|
|
@@ -332,7 +332,7 @@ func (q *delayingType[T]) waitingLoop() {
|
|
|
|
|
if waitEntry.readyAt.After(q.clock.Now()) {
|
|
|
|
|
insert(waitingForQueue, waitingEntryByData, waitEntry)
|
|
|
|
|
} else {
|
|
|
|
|
q.Add(waitEntry.data.(T))
|
|
|
|
|
q.Add(waitEntry.data)
|
|
|
|
|
}
|
|
|
|
|
default:
|
|
|
|
|
drained = true
|
|
|
|
@@ -343,7 +343,7 @@ func (q *delayingType[T]) waitingLoop() {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue
|
|
|
|
|
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
|
|
|
|
|
func insert[T comparable](q *waitForPriorityQueue[T], knownEntries map[T]*waitFor[T], entry *waitFor[T]) {
|
|
|
|
|
// if the entry already exists, update the time only if it would cause the item to be queued sooner
|
|
|
|
|
existing, exists := knownEntries[entry.data]
|
|
|
|
|
if exists {
|
|
|
|
|