From 21ca6c498aec2879fd7b9796f55743e5f14d5a80 Mon Sep 17 00:00:00 2001 From: Andy Lindeman Date: Thu, 27 Apr 2017 23:26:05 -0400 Subject: [PATCH 1/2] Adds benchmark test --- .../util/workqueue/delaying_queue_test.go | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/staging/src/k8s.io/client-go/util/workqueue/delaying_queue_test.go b/staging/src/k8s.io/client-go/util/workqueue/delaying_queue_test.go index 78c92ad4dad..4c61939fa7e 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/delaying_queue_test.go +++ b/staging/src/k8s.io/client-go/util/workqueue/delaying_queue_test.go @@ -18,6 +18,7 @@ package workqueue import ( "fmt" + "math/rand" "reflect" "testing" "time" @@ -214,6 +215,25 @@ func TestCopyShifting(t *testing.T) { } } +func BenchmarkDelayingQueue_AddAfter(b *testing.B) { + r := rand.New(rand.NewSource(time.Now().Unix())) + + fakeClock := clock.NewFakeClock(time.Now()) + q := newDelayingQueue(fakeClock, "") + + // Add items + for n := 0; n < b.N; n++ { + data := fmt.Sprintf("%d", n) + q.AddAfter(data, time.Duration(r.Int63n(int64(10*time.Minute)))) + } + + // Exercise item removal as well + fakeClock.Step(11 * time.Minute) + for n := 0; n < b.N; n++ { + _, _ = q.Get() + } +} + func waitForAdded(q DelayingInterface, depth int) error { return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) { if q.Len() == depth { From dad64459a9b8c56d5bca681e37c3d0f44921d472 Mon Sep 17 00:00:00 2001 From: Andy Lindeman Date: Thu, 27 Apr 2017 23:26:37 -0400 Subject: [PATCH 2/2] Uses container/heap --- .../util/workqueue/delaying_queue.go | 157 ++++++++++-------- .../workqueue/rate_limitting_queue_test.go | 2 +- 2 files changed, 85 insertions(+), 74 deletions(-) diff --git a/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go b/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go index 593ad9ad415..660c6572c22 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go +++ b/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go @@ -17,7 +17,7 @@ limitations under the License. package workqueue import ( - "sort" + "container/heap" "time" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -43,13 +43,12 @@ func NewNamedDelayingQueue(name string) DelayingInterface { func newDelayingQueue(clock clock.Clock, name string) DelayingInterface { ret := &delayingType{ - Interface: NewNamed(name), - clock: clock, - heartbeat: clock.Tick(maxWait), - stopCh: make(chan struct{}), - waitingTimeByEntry: map[t]time.Time{}, - waitingForAddCh: make(chan waitFor, 1000), - metrics: newRetryMetrics(name), + Interface: NewNamed(name), + clock: clock, + heartbeat: clock.Tick(maxWait), + stopCh: make(chan struct{}), + waitingForAddCh: make(chan *waitFor, 1000), + metrics: newRetryMetrics(name), } go ret.waitingLoop() @@ -73,12 +72,8 @@ type delayingType struct { // clock.Tick will leak. heartbeat <-chan time.Time - // waitingForAdd is an ordered slice of items to be added to the contained work queue - waitingForAdd []waitFor - // waitingTimeByEntry holds wait time by entry, so we can lookup pre-existing indexes - waitingTimeByEntry map[t]time.Time // waitingForAddCh is a buffered channel that feeds waitingForAdd - waitingForAddCh chan waitFor + waitingForAddCh chan *waitFor // metrics counts the number of retries metrics retryMetrics @@ -88,6 +83,55 @@ type delayingType struct { type waitFor struct { data t readyAt time.Time + // index in the priority queue (heap) + index int +} + +// waitForPriorityQueue implements a priority queue for waitFor items. +// +// waitForPriorityQueue implements heap.Interface. The item occuring next in +// time (i.e., the item with the smallest readyAt) is at the root (index 0). +// Peek returns this minimum item at index 0. Pop returns the minimum item after +// it has been removed from the queue and placed at index Len()-1 by +// container/heap. Push adds an item at index Len(), and container/heap +// percolates it into the correct location. +type waitForPriorityQueue []*waitFor + +func (pq waitForPriorityQueue) Len() int { + return len(pq) +} +func (pq waitForPriorityQueue) Less(i, j int) bool { + return pq[i].readyAt.Before(pq[j].readyAt) +} +func (pq waitForPriorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +// Push adds an item to the queue. Push should not be called directly; instead, +// use `heap.Push`. +func (pq *waitForPriorityQueue) Push(x interface{}) { + n := len(*pq) + item := x.(*waitFor) + item.index = n + *pq = append(*pq, item) +} + +// Pop removes an item from the queue. Pop should not be called directly; +// instead, use `heap.Pop`. +func (pq *waitForPriorityQueue) Pop() interface{} { + n := len(*pq) + item := (*pq)[n-1] + item.index = -1 + *pq = (*pq)[0:(n - 1)] + return item +} + +// Peek returns the item at the beginning of the queue, without removing the +// item or otherwise mutating the queue. It is safe to call directly. +func (pq waitForPriorityQueue) Peek() interface{} { + return pq[0] } // ShutDown gives a way to shut off this queue @@ -114,7 +158,7 @@ func (q *delayingType) AddAfter(item interface{}, duration time.Duration) { select { case <-q.stopCh: // unblock if ShutDown() is called - case q.waitingForAddCh <- waitFor{data: item, readyAt: q.clock.Now().Add(duration)}: + case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}: } } @@ -130,32 +174,35 @@ func (q *delayingType) waitingLoop() { // Make a placeholder channel to use when there are no items in our list never := make(<-chan time.Time) + waitingForQueue := &waitForPriorityQueue{} + heap.Init(waitingForQueue) + + waitingEntryByData := map[t]*waitFor{} + for { if q.Interface.ShuttingDown() { - // discard waiting entries - q.waitingForAdd = nil - q.waitingTimeByEntry = nil return } now := q.clock.Now() // Add ready entries - readyEntries := 0 - for _, entry := range q.waitingForAdd { + for waitingForQueue.Len() > 0 { + entry := waitingForQueue.Peek().(*waitFor) if entry.readyAt.After(now) { break } + + entry = heap.Pop(waitingForQueue).(*waitFor) q.Add(entry.data) - delete(q.waitingTimeByEntry, entry.data) - readyEntries++ + delete(waitingEntryByData, entry.data) } - q.waitingForAdd = q.waitingForAdd[readyEntries:] // Set up a wait for the first item's readyAt (if one exists) nextReadyAt := never - if len(q.waitingForAdd) > 0 { - nextReadyAt = q.clock.After(q.waitingForAdd[0].readyAt.Sub(now)) + if waitingForQueue.Len() > 0 { + entry := waitingForQueue.Peek().(*waitFor) + nextReadyAt = q.clock.After(entry.readyAt.Sub(now)) } select { @@ -170,7 +217,7 @@ func (q *delayingType) waitingLoop() { case waitEntry := <-q.waitingForAddCh: if waitEntry.readyAt.After(q.clock.Now()) { - q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry) + insert(waitingForQueue, waitingEntryByData, waitEntry) } else { q.Add(waitEntry.data) } @@ -180,7 +227,7 @@ func (q *delayingType) waitingLoop() { select { case waitEntry := <-q.waitingForAddCh: if waitEntry.readyAt.After(q.clock.Now()) { - q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry) + insert(waitingForQueue, waitingEntryByData, waitEntry) } else { q.Add(waitEntry.data) } @@ -192,55 +239,19 @@ func (q *delayingType) waitingLoop() { } } -// inserts the given entry into the sorted entries list -// same semantics as append()... the given slice may be modified, -// and the returned value should be used -// -// TODO: This should probably be converted to use container/heap to improve -// running time for a large number of items. -func insert(entries []waitFor, knownEntries map[t]time.Time, entry waitFor) []waitFor { - // if the entry is already in our retry list and the existing time is before the new one, just skip it - existingTime, exists := knownEntries[entry.data] - if exists && existingTime.Before(entry.readyAt) { - return entries - } - - // if the entry exists and is scheduled for later, go ahead and remove the entry +// insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue +func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) { + // if the entry already exists, update the time only if it would cause the item to be queued sooner + existing, exists := knownEntries[entry.data] if exists { - if existingIndex := findEntryIndex(entries, existingTime, entry.data); existingIndex >= 0 && existingIndex < len(entries) { - entries = append(entries[:existingIndex], entries[existingIndex+1:]...) + if existing.readyAt.After(entry.readyAt) { + existing.readyAt = entry.readyAt + heap.Fix(q, existing.index) } + + return } - insertionIndex := sort.Search(len(entries), func(i int) bool { - return entry.readyAt.Before(entries[i].readyAt) - }) - - // grow by 1 - entries = append(entries, waitFor{}) - // shift items from the insertion point to the end - copy(entries[insertionIndex+1:], entries[insertionIndex:]) - // insert the record - entries[insertionIndex] = entry - - knownEntries[entry.data] = entry.readyAt - - return entries -} - -// findEntryIndex returns the index for an existing entry -func findEntryIndex(entries []waitFor, existingTime time.Time, data t) int { - index := sort.Search(len(entries), func(i int) bool { - return entries[i].readyAt.After(existingTime) || existingTime == entries[i].readyAt - }) - - // we know this is the earliest possible index, but there could be multiple with the same time - // iterate from here to find the dupe - for ; index < len(entries); index++ { - if entries[index].data == data { - break - } - } - - return index + heap.Push(q, entry) + knownEntries[entry.data] = entry } diff --git a/staging/src/k8s.io/client-go/util/workqueue/rate_limitting_queue_test.go b/staging/src/k8s.io/client-go/util/workqueue/rate_limitting_queue_test.go index 47100689d10..f6b2c0f8e72 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/rate_limitting_queue_test.go +++ b/staging/src/k8s.io/client-go/util/workqueue/rate_limitting_queue_test.go @@ -32,7 +32,7 @@ func TestRateLimitingQueue(t *testing.T) { clock: fakeClock, heartbeat: fakeClock.Tick(maxWait), stopCh: make(chan struct{}), - waitingForAddCh: make(chan waitFor, 1000), + waitingForAddCh: make(chan *waitFor, 1000), metrics: newRetryMetrics(""), } queue.DelayingInterface = delayingQueue