Uses container/heap

This commit is contained in:
Andy Lindeman 2017-04-27 23:26:37 -04:00
parent 21ca6c498a
commit dad64459a9
No known key found for this signature in database
GPG Key ID: DF5317422B93F579
2 changed files with 85 additions and 74 deletions

View File

@ -17,7 +17,7 @@ limitations under the License.
package workqueue package workqueue
import ( import (
"sort" "container/heap"
"time" "time"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -43,13 +43,12 @@ func NewNamedDelayingQueue(name string) DelayingInterface {
func newDelayingQueue(clock clock.Clock, name string) DelayingInterface { func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
ret := &delayingType{ ret := &delayingType{
Interface: NewNamed(name), Interface: NewNamed(name),
clock: clock, clock: clock,
heartbeat: clock.Tick(maxWait), heartbeat: clock.Tick(maxWait),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
waitingTimeByEntry: map[t]time.Time{}, waitingForAddCh: make(chan *waitFor, 1000),
waitingForAddCh: make(chan waitFor, 1000), metrics: newRetryMetrics(name),
metrics: newRetryMetrics(name),
} }
go ret.waitingLoop() go ret.waitingLoop()
@ -73,12 +72,8 @@ type delayingType struct {
// clock.Tick will leak. // clock.Tick will leak.
heartbeat <-chan time.Time heartbeat <-chan time.Time
// waitingForAdd is an ordered slice of items to be added to the contained work queue
waitingForAdd []waitFor
// waitingTimeByEntry holds wait time by entry, so we can lookup pre-existing indexes
waitingTimeByEntry map[t]time.Time
// waitingForAddCh is a buffered channel that feeds waitingForAdd // waitingForAddCh is a buffered channel that feeds waitingForAdd
waitingForAddCh chan waitFor waitingForAddCh chan *waitFor
// metrics counts the number of retries // metrics counts the number of retries
metrics retryMetrics metrics retryMetrics
@ -88,6 +83,55 @@ type delayingType struct {
type waitFor struct { type waitFor struct {
data t data t
readyAt time.Time readyAt time.Time
// index in the priority queue (heap)
index int
}
// waitForPriorityQueue implements a priority queue for waitFor items.
//
// waitForPriorityQueue implements heap.Interface. The item occuring next in
// time (i.e., the item with the smallest readyAt) is at the root (index 0).
// Peek returns this minimum item at index 0. Pop returns the minimum item after
// it has been removed from the queue and placed at index Len()-1 by
// container/heap. Push adds an item at index Len(), and container/heap
// percolates it into the correct location.
type waitForPriorityQueue []*waitFor
func (pq waitForPriorityQueue) Len() int {
return len(pq)
}
func (pq waitForPriorityQueue) Less(i, j int) bool {
return pq[i].readyAt.Before(pq[j].readyAt)
}
func (pq waitForPriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
// Push adds an item to the queue. Push should not be called directly; instead,
// use `heap.Push`.
func (pq *waitForPriorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*waitFor)
item.index = n
*pq = append(*pq, item)
}
// Pop removes an item from the queue. Pop should not be called directly;
// instead, use `heap.Pop`.
func (pq *waitForPriorityQueue) Pop() interface{} {
n := len(*pq)
item := (*pq)[n-1]
item.index = -1
*pq = (*pq)[0:(n - 1)]
return item
}
// Peek returns the item at the beginning of the queue, without removing the
// item or otherwise mutating the queue. It is safe to call directly.
func (pq waitForPriorityQueue) Peek() interface{} {
return pq[0]
} }
// ShutDown gives a way to shut off this queue // ShutDown gives a way to shut off this queue
@ -114,7 +158,7 @@ func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
select { select {
case <-q.stopCh: case <-q.stopCh:
// unblock if ShutDown() is called // unblock if ShutDown() is called
case q.waitingForAddCh <- waitFor{data: item, readyAt: q.clock.Now().Add(duration)}: case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
} }
} }
@ -130,32 +174,35 @@ func (q *delayingType) waitingLoop() {
// Make a placeholder channel to use when there are no items in our list // Make a placeholder channel to use when there are no items in our list
never := make(<-chan time.Time) never := make(<-chan time.Time)
waitingForQueue := &waitForPriorityQueue{}
heap.Init(waitingForQueue)
waitingEntryByData := map[t]*waitFor{}
for { for {
if q.Interface.ShuttingDown() { if q.Interface.ShuttingDown() {
// discard waiting entries
q.waitingForAdd = nil
q.waitingTimeByEntry = nil
return return
} }
now := q.clock.Now() now := q.clock.Now()
// Add ready entries // Add ready entries
readyEntries := 0 for waitingForQueue.Len() > 0 {
for _, entry := range q.waitingForAdd { entry := waitingForQueue.Peek().(*waitFor)
if entry.readyAt.After(now) { if entry.readyAt.After(now) {
break break
} }
entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data) q.Add(entry.data)
delete(q.waitingTimeByEntry, entry.data) delete(waitingEntryByData, entry.data)
readyEntries++
} }
q.waitingForAdd = q.waitingForAdd[readyEntries:]
// Set up a wait for the first item's readyAt (if one exists) // Set up a wait for the first item's readyAt (if one exists)
nextReadyAt := never nextReadyAt := never
if len(q.waitingForAdd) > 0 { if waitingForQueue.Len() > 0 {
nextReadyAt = q.clock.After(q.waitingForAdd[0].readyAt.Sub(now)) entry := waitingForQueue.Peek().(*waitFor)
nextReadyAt = q.clock.After(entry.readyAt.Sub(now))
} }
select { select {
@ -170,7 +217,7 @@ func (q *delayingType) waitingLoop() {
case waitEntry := <-q.waitingForAddCh: case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) { if waitEntry.readyAt.After(q.clock.Now()) {
q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry) insert(waitingForQueue, waitingEntryByData, waitEntry)
} else { } else {
q.Add(waitEntry.data) q.Add(waitEntry.data)
} }
@ -180,7 +227,7 @@ func (q *delayingType) waitingLoop() {
select { select {
case waitEntry := <-q.waitingForAddCh: case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) { if waitEntry.readyAt.After(q.clock.Now()) {
q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry) insert(waitingForQueue, waitingEntryByData, waitEntry)
} else { } else {
q.Add(waitEntry.data) q.Add(waitEntry.data)
} }
@ -192,55 +239,19 @@ func (q *delayingType) waitingLoop() {
} }
} }
// inserts the given entry into the sorted entries list // insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue
// same semantics as append()... the given slice may be modified, func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
// and the returned value should be used // if the entry already exists, update the time only if it would cause the item to be queued sooner
// existing, exists := knownEntries[entry.data]
// TODO: This should probably be converted to use container/heap to improve
// running time for a large number of items.
func insert(entries []waitFor, knownEntries map[t]time.Time, entry waitFor) []waitFor {
// if the entry is already in our retry list and the existing time is before the new one, just skip it
existingTime, exists := knownEntries[entry.data]
if exists && existingTime.Before(entry.readyAt) {
return entries
}
// if the entry exists and is scheduled for later, go ahead and remove the entry
if exists { if exists {
if existingIndex := findEntryIndex(entries, existingTime, entry.data); existingIndex >= 0 && existingIndex < len(entries) { if existing.readyAt.After(entry.readyAt) {
entries = append(entries[:existingIndex], entries[existingIndex+1:]...) existing.readyAt = entry.readyAt
heap.Fix(q, existing.index)
} }
return
} }
insertionIndex := sort.Search(len(entries), func(i int) bool { heap.Push(q, entry)
return entry.readyAt.Before(entries[i].readyAt) knownEntries[entry.data] = entry
})
// grow by 1
entries = append(entries, waitFor{})
// shift items from the insertion point to the end
copy(entries[insertionIndex+1:], entries[insertionIndex:])
// insert the record
entries[insertionIndex] = entry
knownEntries[entry.data] = entry.readyAt
return entries
}
// findEntryIndex returns the index for an existing entry
func findEntryIndex(entries []waitFor, existingTime time.Time, data t) int {
index := sort.Search(len(entries), func(i int) bool {
return entries[i].readyAt.After(existingTime) || existingTime == entries[i].readyAt
})
// we know this is the earliest possible index, but there could be multiple with the same time
// iterate from here to find the dupe
for ; index < len(entries); index++ {
if entries[index].data == data {
break
}
}
return index
} }

View File

@ -32,7 +32,7 @@ func TestRateLimitingQueue(t *testing.T) {
clock: fakeClock, clock: fakeClock,
heartbeat: fakeClock.Tick(maxWait), heartbeat: fakeClock.Tick(maxWait),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
waitingForAddCh: make(chan waitFor, 1000), waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(""), metrics: newRetryMetrics(""),
} }
queue.DelayingInterface = delayingQueue queue.DelayingInterface = delayingQueue