mirror of
https://github.com/kubernetes/client-go.git
synced 2025-07-31 06:41:52 +00:00
Uses container/heap
Kubernetes-commit: dad64459a9b8c56d5bca681e37c3d0f44921d472
This commit is contained in:
parent
fe9ac9f9c0
commit
f851632ada
@ -17,7 +17,7 @@ limitations under the License.
|
||||
package workqueue
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"container/heap"
|
||||
"time"
|
||||
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
@ -43,13 +43,12 @@ func NewNamedDelayingQueue(name string) DelayingInterface {
|
||||
|
||||
func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
|
||||
ret := &delayingType{
|
||||
Interface: NewNamed(name),
|
||||
clock: clock,
|
||||
heartbeat: clock.Tick(maxWait),
|
||||
stopCh: make(chan struct{}),
|
||||
waitingTimeByEntry: map[t]time.Time{},
|
||||
waitingForAddCh: make(chan waitFor, 1000),
|
||||
metrics: newRetryMetrics(name),
|
||||
Interface: NewNamed(name),
|
||||
clock: clock,
|
||||
heartbeat: clock.Tick(maxWait),
|
||||
stopCh: make(chan struct{}),
|
||||
waitingForAddCh: make(chan *waitFor, 1000),
|
||||
metrics: newRetryMetrics(name),
|
||||
}
|
||||
|
||||
go ret.waitingLoop()
|
||||
@ -73,12 +72,8 @@ type delayingType struct {
|
||||
// clock.Tick will leak.
|
||||
heartbeat <-chan time.Time
|
||||
|
||||
// waitingForAdd is an ordered slice of items to be added to the contained work queue
|
||||
waitingForAdd []waitFor
|
||||
// waitingTimeByEntry holds wait time by entry, so we can lookup pre-existing indexes
|
||||
waitingTimeByEntry map[t]time.Time
|
||||
// waitingForAddCh is a buffered channel that feeds waitingForAdd
|
||||
waitingForAddCh chan waitFor
|
||||
waitingForAddCh chan *waitFor
|
||||
|
||||
// metrics counts the number of retries
|
||||
metrics retryMetrics
|
||||
@ -88,6 +83,55 @@ type delayingType struct {
|
||||
type waitFor struct {
|
||||
data t
|
||||
readyAt time.Time
|
||||
// index in the priority queue (heap)
|
||||
index int
|
||||
}
|
||||
|
||||
// waitForPriorityQueue implements a priority queue for waitFor items.
|
||||
//
|
||||
// waitForPriorityQueue implements heap.Interface. The item occuring next in
|
||||
// time (i.e., the item with the smallest readyAt) is at the root (index 0).
|
||||
// Peek returns this minimum item at index 0. Pop returns the minimum item after
|
||||
// it has been removed from the queue and placed at index Len()-1 by
|
||||
// container/heap. Push adds an item at index Len(), and container/heap
|
||||
// percolates it into the correct location.
|
||||
type waitForPriorityQueue []*waitFor
|
||||
|
||||
func (pq waitForPriorityQueue) Len() int {
|
||||
return len(pq)
|
||||
}
|
||||
func (pq waitForPriorityQueue) Less(i, j int) bool {
|
||||
return pq[i].readyAt.Before(pq[j].readyAt)
|
||||
}
|
||||
func (pq waitForPriorityQueue) Swap(i, j int) {
|
||||
pq[i], pq[j] = pq[j], pq[i]
|
||||
pq[i].index = i
|
||||
pq[j].index = j
|
||||
}
|
||||
|
||||
// Push adds an item to the queue. Push should not be called directly; instead,
|
||||
// use `heap.Push`.
|
||||
func (pq *waitForPriorityQueue) Push(x interface{}) {
|
||||
n := len(*pq)
|
||||
item := x.(*waitFor)
|
||||
item.index = n
|
||||
*pq = append(*pq, item)
|
||||
}
|
||||
|
||||
// Pop removes an item from the queue. Pop should not be called directly;
|
||||
// instead, use `heap.Pop`.
|
||||
func (pq *waitForPriorityQueue) Pop() interface{} {
|
||||
n := len(*pq)
|
||||
item := (*pq)[n-1]
|
||||
item.index = -1
|
||||
*pq = (*pq)[0:(n - 1)]
|
||||
return item
|
||||
}
|
||||
|
||||
// Peek returns the item at the beginning of the queue, without removing the
|
||||
// item or otherwise mutating the queue. It is safe to call directly.
|
||||
func (pq waitForPriorityQueue) Peek() interface{} {
|
||||
return pq[0]
|
||||
}
|
||||
|
||||
// ShutDown gives a way to shut off this queue
|
||||
@ -114,7 +158,7 @@ func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
|
||||
select {
|
||||
case <-q.stopCh:
|
||||
// unblock if ShutDown() is called
|
||||
case q.waitingForAddCh <- waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
|
||||
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
|
||||
}
|
||||
}
|
||||
|
||||
@ -130,32 +174,35 @@ func (q *delayingType) waitingLoop() {
|
||||
// Make a placeholder channel to use when there are no items in our list
|
||||
never := make(<-chan time.Time)
|
||||
|
||||
waitingForQueue := &waitForPriorityQueue{}
|
||||
heap.Init(waitingForQueue)
|
||||
|
||||
waitingEntryByData := map[t]*waitFor{}
|
||||
|
||||
for {
|
||||
if q.Interface.ShuttingDown() {
|
||||
// discard waiting entries
|
||||
q.waitingForAdd = nil
|
||||
q.waitingTimeByEntry = nil
|
||||
return
|
||||
}
|
||||
|
||||
now := q.clock.Now()
|
||||
|
||||
// Add ready entries
|
||||
readyEntries := 0
|
||||
for _, entry := range q.waitingForAdd {
|
||||
for waitingForQueue.Len() > 0 {
|
||||
entry := waitingForQueue.Peek().(*waitFor)
|
||||
if entry.readyAt.After(now) {
|
||||
break
|
||||
}
|
||||
|
||||
entry = heap.Pop(waitingForQueue).(*waitFor)
|
||||
q.Add(entry.data)
|
||||
delete(q.waitingTimeByEntry, entry.data)
|
||||
readyEntries++
|
||||
delete(waitingEntryByData, entry.data)
|
||||
}
|
||||
q.waitingForAdd = q.waitingForAdd[readyEntries:]
|
||||
|
||||
// Set up a wait for the first item's readyAt (if one exists)
|
||||
nextReadyAt := never
|
||||
if len(q.waitingForAdd) > 0 {
|
||||
nextReadyAt = q.clock.After(q.waitingForAdd[0].readyAt.Sub(now))
|
||||
if waitingForQueue.Len() > 0 {
|
||||
entry := waitingForQueue.Peek().(*waitFor)
|
||||
nextReadyAt = q.clock.After(entry.readyAt.Sub(now))
|
||||
}
|
||||
|
||||
select {
|
||||
@ -170,7 +217,7 @@ func (q *delayingType) waitingLoop() {
|
||||
|
||||
case waitEntry := <-q.waitingForAddCh:
|
||||
if waitEntry.readyAt.After(q.clock.Now()) {
|
||||
q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry)
|
||||
insert(waitingForQueue, waitingEntryByData, waitEntry)
|
||||
} else {
|
||||
q.Add(waitEntry.data)
|
||||
}
|
||||
@ -180,7 +227,7 @@ func (q *delayingType) waitingLoop() {
|
||||
select {
|
||||
case waitEntry := <-q.waitingForAddCh:
|
||||
if waitEntry.readyAt.After(q.clock.Now()) {
|
||||
q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry)
|
||||
insert(waitingForQueue, waitingEntryByData, waitEntry)
|
||||
} else {
|
||||
q.Add(waitEntry.data)
|
||||
}
|
||||
@ -192,55 +239,19 @@ func (q *delayingType) waitingLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
// inserts the given entry into the sorted entries list
|
||||
// same semantics as append()... the given slice may be modified,
|
||||
// and the returned value should be used
|
||||
//
|
||||
// TODO: This should probably be converted to use container/heap to improve
|
||||
// running time for a large number of items.
|
||||
func insert(entries []waitFor, knownEntries map[t]time.Time, entry waitFor) []waitFor {
|
||||
// if the entry is already in our retry list and the existing time is before the new one, just skip it
|
||||
existingTime, exists := knownEntries[entry.data]
|
||||
if exists && existingTime.Before(entry.readyAt) {
|
||||
return entries
|
||||
}
|
||||
|
||||
// if the entry exists and is scheduled for later, go ahead and remove the entry
|
||||
// insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue
|
||||
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
|
||||
// if the entry already exists, update the time only if it would cause the item to be queued sooner
|
||||
existing, exists := knownEntries[entry.data]
|
||||
if exists {
|
||||
if existingIndex := findEntryIndex(entries, existingTime, entry.data); existingIndex >= 0 && existingIndex < len(entries) {
|
||||
entries = append(entries[:existingIndex], entries[existingIndex+1:]...)
|
||||
if existing.readyAt.After(entry.readyAt) {
|
||||
existing.readyAt = entry.readyAt
|
||||
heap.Fix(q, existing.index)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
insertionIndex := sort.Search(len(entries), func(i int) bool {
|
||||
return entry.readyAt.Before(entries[i].readyAt)
|
||||
})
|
||||
|
||||
// grow by 1
|
||||
entries = append(entries, waitFor{})
|
||||
// shift items from the insertion point to the end
|
||||
copy(entries[insertionIndex+1:], entries[insertionIndex:])
|
||||
// insert the record
|
||||
entries[insertionIndex] = entry
|
||||
|
||||
knownEntries[entry.data] = entry.readyAt
|
||||
|
||||
return entries
|
||||
}
|
||||
|
||||
// findEntryIndex returns the index for an existing entry
|
||||
func findEntryIndex(entries []waitFor, existingTime time.Time, data t) int {
|
||||
index := sort.Search(len(entries), func(i int) bool {
|
||||
return entries[i].readyAt.After(existingTime) || existingTime == entries[i].readyAt
|
||||
})
|
||||
|
||||
// we know this is the earliest possible index, but there could be multiple with the same time
|
||||
// iterate from here to find the dupe
|
||||
for ; index < len(entries); index++ {
|
||||
if entries[index].data == data {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return index
|
||||
heap.Push(q, entry)
|
||||
knownEntries[entry.data] = entry
|
||||
}
|
||||
|
@ -32,7 +32,7 @@ func TestRateLimitingQueue(t *testing.T) {
|
||||
clock: fakeClock,
|
||||
heartbeat: fakeClock.Tick(maxWait),
|
||||
stopCh: make(chan struct{}),
|
||||
waitingForAddCh: make(chan waitFor, 1000),
|
||||
waitingForAddCh: make(chan *waitFor, 1000),
|
||||
metrics: newRetryMetrics(""),
|
||||
}
|
||||
queue.DelayingInterface = delayingQueue
|
||||
|
Loading…
Reference in New Issue
Block a user