make delayed workqueue use channels with single writer

This commit is contained in:
Jordan Liggitt 2016-04-06 09:23:12 -04:00 committed by deads2k
parent d12a4d6d5a
commit 290d970282
3 changed files with 179 additions and 167 deletions

View File

@ -18,7 +18,6 @@ package workqueue
import ( import (
"sort" "sort"
"sync"
"time" "time"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
@ -40,38 +39,35 @@ func NewDelayingQueue() DelayingInterface {
func newDelayingQueue(clock util.Clock) DelayingInterface { func newDelayingQueue(clock util.Clock) DelayingInterface {
ret := &delayingType{ ret := &delayingType{
Type: New(), Interface: New(),
clock: clock, clock: clock,
waitingCond: sync.NewCond(&sync.Mutex{}), heartbeat: time.Tick(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan waitFor, 1000),
} }
go ret.waitingLoop() go ret.waitingLoop()
go ret.heartbeat()
return ret return ret
} }
// delayingType wraps a Type and provides delayed re-enquing // delayingType wraps an Interface and provides delayed re-enquing
type delayingType struct { type delayingType struct {
*Type Interface
// clock tracks time for delayed firing // clock tracks time for delayed firing
clock util.Clock clock util.Clock
// stopCh lets us signal a shutdown to the waiting loop
stopCh chan struct{}
// heartbeat ensures we wait no more than maxWait before firing
heartbeat <-chan time.Time
// waitingForAdd is an ordered slice of items to be added to the contained work queue // waitingForAdd is an ordered slice of items to be added to the contained work queue
waitingForAdd []waitFor waitingForAdd []waitFor
// waitingLock synchronizes access to waitingForAdd // waitingForAddCh is a buffered channel that feeds waitingForAdd
waitingLock sync.Mutex waitingForAddCh chan waitFor
// waitingCond is used to notify the adding go func that it needs to check for items to add
waitingCond *sync.Cond
// nextCheckTime is used to decide whether to add a notification timer. If the requested time
// is beyond the time we're already waiting for, we don't add a new timer thread
nextCheckTime *time.Time
// nextCheckLock serializes access to the notification time
nextCheckLock sync.Mutex
// nextCheckCancel is a channel to close to cancel the notification
nextCheckCancel chan struct{}
} }
// waitFor holds the data to add and the time it should be added // waitFor holds the data to add and the time it should be added
@ -82,25 +78,28 @@ type waitFor struct {
// ShutDown gives a way to shut off this queue // ShutDown gives a way to shut off this queue
func (q *delayingType) ShutDown() { func (q *delayingType) ShutDown() {
q.Type.ShutDown() q.Interface.ShutDown()
q.waitingCond.Broadcast() close(q.stopCh)
} }
// AddAfter adds the given item to the work queue after the given delay
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) { func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
q.waitingLock.Lock() // don't add if we're already shutting down
defer q.waitingLock.Unlock() if q.ShuttingDown() {
waitEntry := waitFor{data: item, readyAt: q.clock.Now().Add(duration)} return
}
insertionIndex := sort.Search(len(q.waitingForAdd), func(i int) bool { // immediately add things with no delay
return waitEntry.readyAt.Before(q.waitingForAdd[i].readyAt) if duration <= 0 {
}) q.Add(item)
return
}
tail := q.waitingForAdd[insertionIndex:] select {
q.waitingForAdd = append(make([]waitFor, 0, len(q.waitingForAdd)+1), q.waitingForAdd[:insertionIndex]...) case <-q.stopCh:
q.waitingForAdd = append(q.waitingForAdd, waitEntry) // unblock if ShutDown() is called
q.waitingForAdd = append(q.waitingForAdd, tail...) case q.waitingForAddCh <- waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
q.notifyAt(waitEntry.readyAt)
} }
// maxWait keeps a max bound on the wait time. It's just insurance against weird things happening. // maxWait keeps a max bound on the wait time. It's just insurance against weird things happening.
@ -112,115 +111,83 @@ const maxWait = 10 * time.Second
func (q *delayingType) waitingLoop() { func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
for { // Make a placeholder channel to use when there are no items in our list
if q.shuttingDown { never := make(<-chan time.Time)
return
}
func() {
q.waitingCond.L.Lock()
defer q.waitingCond.L.Unlock()
q.waitingCond.Wait()
if q.shuttingDown {
return
}
q.waitingLock.Lock()
defer q.waitingLock.Unlock()
nextReadyCheck := time.Time{}
itemsAdded := 0
for _, queuedItem := range q.waitingForAdd {
nextReadyCheck = queuedItem.readyAt
if queuedItem.readyAt.After(q.clock.Now()) {
break
}
q.Type.Add(queuedItem.data)
itemsAdded++
}
switch itemsAdded {
case 0:
// no change
case len(q.waitingForAdd):
// consumed everything
q.waitingForAdd = make([]waitFor, 0, len(q.waitingForAdd))
default:
// consumed some
q.waitingForAdd = q.waitingForAdd[itemsAdded:]
if len(q.waitingForAdd) > 0 {
q.notifyAt(nextReadyCheck)
}
}
}()
}
}
// heartbeat forces a check every maxWait seconds
func (q *delayingType) heartbeat() {
defer utilruntime.HandleCrash()
for { for {
if q.shuttingDown { if q.Interface.ShuttingDown() {
// discard waiting entries
q.waitingForAdd = nil
return return
} }
ch := q.clock.After(maxWait)
<-ch
q.waitingCond.Broadcast()
}
}
// clearNextCheckTimeIf resets the nextCheckTime if it matches the expected value to ensure that the subsequent notification will take effect.
func (q *delayingType) clearNextCheckTimeIf(nextReadyCheck time.Time) {
q.nextCheckLock.Lock()
defer q.nextCheckLock.Unlock()
if q.nextCheckTime != nil && *q.nextCheckTime == nextReadyCheck {
q.nextCheckTime = nil
}
}
// notifyAt: if the requested nextReadyCheck is sooner than the current check, then a new go func is
// spawned to notify the condition that the waitingLoop is waiting for after the time is up. The previous go func
// is cancelled
func (q *delayingType) notifyAt(nextReadyCheck time.Time) {
q.nextCheckLock.Lock()
defer q.nextCheckLock.Unlock()
now := q.clock.Now() now := q.clock.Now()
if (q.nextCheckTime != nil && (nextReadyCheck.After(*q.nextCheckTime) || nextReadyCheck == *q.nextCheckTime)) || nextReadyCheck.Before(now) {
return // Add ready entries
readyEntries := 0
for _, entry := range q.waitingForAdd {
if entry.readyAt.After(now) {
break
} }
q.Add(entry.data)
duration := nextReadyCheck.Sub(now) readyEntries++
q.nextCheckTime = &nextReadyCheck
ch := q.clock.After(duration)
newCancel := make(chan struct{})
oldCancel := q.nextCheckCancel
// always cancel the old notifier
if oldCancel != nil {
close(oldCancel)
} }
q.nextCheckCancel = newCancel q.waitingForAdd = q.waitingForAdd[readyEntries:]
go func() { // Set up a wait for the first item's readyAt (if one exists)
defer utilruntime.HandleCrash() nextReadyAt := never
if len(q.waitingForAdd) > 0 {
nextReadyAt = q.clock.After(q.waitingForAdd[0].readyAt.Sub(now))
}
select { select {
case <-ch: case <-q.stopCh:
// we only have one of these go funcs active at a time. If we hit our timer, then clear return
// the check time so that the next add will win
q.clearNextCheckTimeIf(nextReadyCheck)
q.waitingCond.Broadcast()
case <-newCancel: case <-q.heartbeat:
// do nothing, cancelled // continue the loop, which will add ready items
case <-nextReadyAt:
// continue the loop, which will add ready items
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
q.waitingForAdd = insert(q.waitingForAdd, waitEntry)
} else {
q.Add(waitEntry.data)
}
drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
q.waitingForAdd = insert(q.waitingForAdd, waitEntry)
} else {
q.Add(waitEntry.data)
}
default:
drained = true
}
}
}
} }
}() }
// inserts the given entry into the sorted entries list
// same semantics as append()... the given slice may be modified,
// and the returned value should be used
func insert(entries []waitFor, entry waitFor) []waitFor {
insertionIndex := sort.Search(len(entries), func(i int) bool {
return entry.readyAt.Before(entries[i].readyAt)
})
// grow by 1
entries = append(entries, waitFor{})
// shift items from the insertion point to the end
copy(entries[insertionIndex+1:], entries[insertionIndex:])
// insert the record
entries[insertionIndex] = entry
return entries
} }

View File

@ -33,6 +33,9 @@ func TestSimpleQueue(t *testing.T) {
first := "foo" first := "foo"
q.AddAfter(first, 50*time.Millisecond) q.AddAfter(first, 50*time.Millisecond)
if err := waitForWaitingQueueToFill(q); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if q.Len() != 0 { if q.Len() != 0 {
t.Errorf("should not have added") t.Errorf("should not have added")
@ -40,14 +43,7 @@ func TestSimpleQueue(t *testing.T) {
fakeClock.Step(60 * time.Millisecond) fakeClock.Step(60 * time.Millisecond)
err := wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) { if err := waitForAdded(q, 1); err != nil {
if q.Len() == 1 {
return true, nil
}
return false, nil
})
if err != nil {
t.Errorf("should have added") t.Errorf("should have added")
} }
item, _ := q.Get() item, _ := q.Get()
@ -56,7 +52,7 @@ func TestSimpleQueue(t *testing.T) {
// step past the next heartbeat // step past the next heartbeat
fakeClock.Step(10 * time.Second) fakeClock.Step(10 * time.Second)
err = wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) { err := wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) {
if q.Len() > 0 { if q.Len() > 0 {
return false, fmt.Errorf("added to queue") return false, fmt.Errorf("added to queue")
} }
@ -82,21 +78,18 @@ func TestAddTwoFireEarly(t *testing.T) {
q.AddAfter(first, 1*time.Second) q.AddAfter(first, 1*time.Second)
q.AddAfter(second, 50*time.Millisecond) q.AddAfter(second, 50*time.Millisecond)
if err := waitForWaitingQueueToFill(q); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if q.Len() != 0 { if q.Len() != 0 {
t.Errorf("should not have added") t.Errorf("should not have added")
} }
fakeClock.Step(60 * time.Millisecond) fakeClock.Step(60 * time.Millisecond)
err := wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) {
if q.Len() == 1 {
return true, nil
}
return false, nil if err := waitForAdded(q, 1); err != nil {
}) t.Fatalf("unexpected err: %v", err)
if err != nil {
t.Fatalf("should have added")
} }
item, _ := q.Get() item, _ := q.Get()
if !reflect.DeepEqual(item, second) { if !reflect.DeepEqual(item, second) {
@ -106,15 +99,8 @@ func TestAddTwoFireEarly(t *testing.T) {
q.AddAfter(third, 2*time.Second) q.AddAfter(third, 2*time.Second)
fakeClock.Step(1 * time.Second) fakeClock.Step(1 * time.Second)
err = wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) { if err := waitForAdded(q, 1); err != nil {
if q.Len() == 1 { t.Fatalf("unexpected err: %v", err)
return true, nil
}
return false, nil
})
if err != nil {
t.Fatalf("should have added")
} }
item, _ = q.Get() item, _ = q.Get()
if !reflect.DeepEqual(item, first) { if !reflect.DeepEqual(item, first) {
@ -122,15 +108,8 @@ func TestAddTwoFireEarly(t *testing.T) {
} }
fakeClock.Step(2 * time.Second) fakeClock.Step(2 * time.Second)
err = wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) { if err := waitForAdded(q, 1); err != nil {
if q.Len() == 1 { t.Fatalf("unexpected err: %v", err)
return true, nil
}
return false, nil
})
if err != nil {
t.Fatalf("should have added")
} }
item, _ = q.Get() item, _ = q.Get()
if !reflect.DeepEqual(item, third) { if !reflect.DeepEqual(item, third) {
@ -138,3 +117,61 @@ func TestAddTwoFireEarly(t *testing.T) {
} }
} }
func TestCopyShifting(t *testing.T) {
fakeClock := util.NewFakeClock(time.Now())
q := newDelayingQueue(fakeClock)
first := "foo"
second := "bar"
third := "baz"
q.AddAfter(first, 1*time.Second)
q.AddAfter(second, 500*time.Millisecond)
q.AddAfter(third, 250*time.Millisecond)
if err := waitForWaitingQueueToFill(q); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if q.Len() != 0 {
t.Errorf("should not have added")
}
fakeClock.Step(2 * time.Second)
if err := waitForAdded(q, 3); err != nil {
t.Fatalf("unexpected err: %v", err)
}
actualFirst, _ := q.Get()
if !reflect.DeepEqual(actualFirst, third) {
t.Errorf("expected %v, got %v", third, actualFirst)
}
actualSecond, _ := q.Get()
if !reflect.DeepEqual(actualSecond, second) {
t.Errorf("expected %v, got %v", second, actualSecond)
}
actualThird, _ := q.Get()
if !reflect.DeepEqual(actualThird, first) {
t.Errorf("expected %v, got %v", first, actualThird)
}
}
func waitForAdded(q DelayingInterface, depth int) error {
return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if q.Len() == depth {
return true, nil
}
return false, nil
})
}
func waitForWaitingQueueToFill(q DelayingInterface) error {
return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if len(q.(*delayingType).waitingForAddCh) == 0 {
return true, nil
}
return false, nil
})
}

View File

@ -26,6 +26,7 @@ type Interface interface {
Get() (item interface{}, shutdown bool) Get() (item interface{}, shutdown bool)
Done(item interface{}) Done(item interface{})
ShutDown() ShutDown()
ShuttingDown() bool
} }
// New constructs a new workqueue (see the package comment). // New constructs a new workqueue (see the package comment).
@ -143,3 +144,10 @@ func (q *Type) ShutDown() {
q.shuttingDown = true q.shuttingDown = true
q.cond.Broadcast() q.cond.Broadcast()
} }
func (q *Type) ShuttingDown() bool {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return q.shuttingDown
}