Merge pull request #45070 from alindeman/container-heap

Automatic merge from submit-queue

Uses container/heap for DelayingQueue

The current implementation of DelayingQueue doesn't perform very well when a large number of items (at random delays) are inserted. The original authors seemed to be aware of this and noted it in a `TODO` comment. This is my attempt at switching the implementation to use a priority queue based on `container/heap`.

Benchmarks from before the change:
```
╰─ go test -bench=. -benchmem | tee /tmp/before.txt
BenchmarkDelayingQueue_AddAfter-8         300000            256824 ns/op             520 B/op          3 allocs/op
PASS
ok      k8s.io/kubernetes/staging/src/k8s.io/client-go/util/workqueue   77.237s
```

After:
```
╰─ go test -bench=. -benchmem | tee /tmp/after.txt
BenchmarkDelayingQueue_AddAfter-8         500000              3519 ns/op             406 B/op          4 allocs/op
PASS
ok      k8s.io/kubernetes/staging/src/k8s.io/client-go/util/workqueue   2.969s
```

Comparison:
```
╰─ benchcmp /tmp/before.txt /tmp/after.txt
benchmark                             old ns/op     new ns/op     delta
BenchmarkDelayingQueue_AddAfter-8     256824        3519          -98.63%

benchmark                             old allocs     new allocs     delta
BenchmarkDelayingQueue_AddAfter-8     3              4              +33.33%

benchmark                             old bytes     new bytes     delta
BenchmarkDelayingQueue_AddAfter-8     520           406           -21.92%
```

I also find the `container/heap`-based code a bit more easy to understand. The implementation of the PriorityQueue is based on the documentation for `container/heap`.

Feedback definitely welcomed. This is one of my first contributions.

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-05-15 07:06:03 -07:00 committed by GitHub
commit 9590b94f7c
3 changed files with 105 additions and 74 deletions

View File

@ -17,7 +17,7 @@ limitations under the License.
package workqueue
import (
"sort"
"container/heap"
"time"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -43,13 +43,12 @@ func NewNamedDelayingQueue(name string) DelayingInterface {
func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
ret := &delayingType{
Interface: NewNamed(name),
clock: clock,
heartbeat: clock.Tick(maxWait),
stopCh: make(chan struct{}),
waitingTimeByEntry: map[t]time.Time{},
waitingForAddCh: make(chan waitFor, 1000),
metrics: newRetryMetrics(name),
Interface: NewNamed(name),
clock: clock,
heartbeat: clock.Tick(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
}
go ret.waitingLoop()
@ -73,12 +72,8 @@ type delayingType struct {
// clock.Tick will leak.
heartbeat <-chan time.Time
// waitingForAdd is an ordered slice of items to be added to the contained work queue
waitingForAdd []waitFor
// waitingTimeByEntry holds wait time by entry, so we can lookup pre-existing indexes
waitingTimeByEntry map[t]time.Time
// waitingForAddCh is a buffered channel that feeds waitingForAdd
waitingForAddCh chan waitFor
waitingForAddCh chan *waitFor
// metrics counts the number of retries
metrics retryMetrics
@ -88,6 +83,55 @@ type delayingType struct {
type waitFor struct {
data t
readyAt time.Time
// index in the priority queue (heap)
index int
}
// waitForPriorityQueue implements a priority queue for waitFor items.
//
// waitForPriorityQueue implements heap.Interface. The item occuring next in
// time (i.e., the item with the smallest readyAt) is at the root (index 0).
// Peek returns this minimum item at index 0. Pop returns the minimum item after
// it has been removed from the queue and placed at index Len()-1 by
// container/heap. Push adds an item at index Len(), and container/heap
// percolates it into the correct location.
type waitForPriorityQueue []*waitFor
func (pq waitForPriorityQueue) Len() int {
return len(pq)
}
func (pq waitForPriorityQueue) Less(i, j int) bool {
return pq[i].readyAt.Before(pq[j].readyAt)
}
func (pq waitForPriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
// Push adds an item to the queue. Push should not be called directly; instead,
// use `heap.Push`.
func (pq *waitForPriorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*waitFor)
item.index = n
*pq = append(*pq, item)
}
// Pop removes an item from the queue. Pop should not be called directly;
// instead, use `heap.Pop`.
func (pq *waitForPriorityQueue) Pop() interface{} {
n := len(*pq)
item := (*pq)[n-1]
item.index = -1
*pq = (*pq)[0:(n - 1)]
return item
}
// Peek returns the item at the beginning of the queue, without removing the
// item or otherwise mutating the queue. It is safe to call directly.
func (pq waitForPriorityQueue) Peek() interface{} {
return pq[0]
}
// ShutDown gives a way to shut off this queue
@ -114,7 +158,7 @@ func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
select {
case <-q.stopCh:
// unblock if ShutDown() is called
case q.waitingForAddCh <- waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
}
@ -130,32 +174,35 @@ func (q *delayingType) waitingLoop() {
// Make a placeholder channel to use when there are no items in our list
never := make(<-chan time.Time)
waitingForQueue := &waitForPriorityQueue{}
heap.Init(waitingForQueue)
waitingEntryByData := map[t]*waitFor{}
for {
if q.Interface.ShuttingDown() {
// discard waiting entries
q.waitingForAdd = nil
q.waitingTimeByEntry = nil
return
}
now := q.clock.Now()
// Add ready entries
readyEntries := 0
for _, entry := range q.waitingForAdd {
for waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
if entry.readyAt.After(now) {
break
}
entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data)
delete(q.waitingTimeByEntry, entry.data)
readyEntries++
delete(waitingEntryByData, entry.data)
}
q.waitingForAdd = q.waitingForAdd[readyEntries:]
// Set up a wait for the first item's readyAt (if one exists)
nextReadyAt := never
if len(q.waitingForAdd) > 0 {
nextReadyAt = q.clock.After(q.waitingForAdd[0].readyAt.Sub(now))
if waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
nextReadyAt = q.clock.After(entry.readyAt.Sub(now))
}
select {
@ -170,7 +217,7 @@ func (q *delayingType) waitingLoop() {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry)
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
@ -180,7 +227,7 @@ func (q *delayingType) waitingLoop() {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry)
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
@ -192,55 +239,19 @@ func (q *delayingType) waitingLoop() {
}
}
// inserts the given entry into the sorted entries list
// same semantics as append()... the given slice may be modified,
// and the returned value should be used
//
// TODO: This should probably be converted to use container/heap to improve
// running time for a large number of items.
func insert(entries []waitFor, knownEntries map[t]time.Time, entry waitFor) []waitFor {
// if the entry is already in our retry list and the existing time is before the new one, just skip it
existingTime, exists := knownEntries[entry.data]
if exists && existingTime.Before(entry.readyAt) {
return entries
}
// if the entry exists and is scheduled for later, go ahead and remove the entry
// insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
// if the entry already exists, update the time only if it would cause the item to be queued sooner
existing, exists := knownEntries[entry.data]
if exists {
if existingIndex := findEntryIndex(entries, existingTime, entry.data); existingIndex >= 0 && existingIndex < len(entries) {
entries = append(entries[:existingIndex], entries[existingIndex+1:]...)
if existing.readyAt.After(entry.readyAt) {
existing.readyAt = entry.readyAt
heap.Fix(q, existing.index)
}
return
}
insertionIndex := sort.Search(len(entries), func(i int) bool {
return entry.readyAt.Before(entries[i].readyAt)
})
// grow by 1
entries = append(entries, waitFor{})
// shift items from the insertion point to the end
copy(entries[insertionIndex+1:], entries[insertionIndex:])
// insert the record
entries[insertionIndex] = entry
knownEntries[entry.data] = entry.readyAt
return entries
}
// findEntryIndex returns the index for an existing entry
func findEntryIndex(entries []waitFor, existingTime time.Time, data t) int {
index := sort.Search(len(entries), func(i int) bool {
return entries[i].readyAt.After(existingTime) || existingTime == entries[i].readyAt
})
// we know this is the earliest possible index, but there could be multiple with the same time
// iterate from here to find the dupe
for ; index < len(entries); index++ {
if entries[index].data == data {
break
}
}
return index
heap.Push(q, entry)
knownEntries[entry.data] = entry
}

View File

@ -18,6 +18,7 @@ package workqueue
import (
"fmt"
"math/rand"
"reflect"
"testing"
"time"
@ -214,6 +215,25 @@ func TestCopyShifting(t *testing.T) {
}
}
func BenchmarkDelayingQueue_AddAfter(b *testing.B) {
r := rand.New(rand.NewSource(time.Now().Unix()))
fakeClock := clock.NewFakeClock(time.Now())
q := newDelayingQueue(fakeClock, "")
// Add items
for n := 0; n < b.N; n++ {
data := fmt.Sprintf("%d", n)
q.AddAfter(data, time.Duration(r.Int63n(int64(10*time.Minute))))
}
// Exercise item removal as well
fakeClock.Step(11 * time.Minute)
for n := 0; n < b.N; n++ {
_, _ = q.Get()
}
}
func waitForAdded(q DelayingInterface, depth int) error {
return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if q.Len() == depth {

View File

@ -32,7 +32,7 @@ func TestRateLimitingQueue(t *testing.T) {
clock: fakeClock,
heartbeat: fakeClock.Tick(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan waitFor, 1000),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(""),
}
queue.DelayingInterface = delayingQueue