diff --git a/pkg/util/workqueue/delaying_queue.go b/pkg/util/workqueue/delaying_queue.go index 5fef16740f3..68098c5b895 100644 --- a/pkg/util/workqueue/delaying_queue.go +++ b/pkg/util/workqueue/delaying_queue.go @@ -18,7 +18,6 @@ package workqueue import ( "sort" - "sync" "time" "k8s.io/kubernetes/pkg/util" @@ -40,38 +39,35 @@ func NewDelayingQueue() DelayingInterface { func newDelayingQueue(clock util.Clock) DelayingInterface { ret := &delayingType{ - Type: New(), - clock: clock, - waitingCond: sync.NewCond(&sync.Mutex{}), + Interface: New(), + clock: clock, + heartbeat: time.Tick(maxWait), + stopCh: make(chan struct{}), + waitingForAddCh: make(chan waitFor, 1000), } go ret.waitingLoop() - go ret.heartbeat() return ret } -// delayingType wraps a Type and provides delayed re-enquing +// delayingType wraps an Interface and provides delayed re-enquing type delayingType struct { - *Type + Interface // clock tracks time for delayed firing clock util.Clock + // stopCh lets us signal a shutdown to the waiting loop + stopCh chan struct{} + + // heartbeat ensures we wait no more than maxWait before firing + heartbeat <-chan time.Time + // waitingForAdd is an ordered slice of items to be added to the contained work queue waitingForAdd []waitFor - // waitingLock synchronizes access to waitingForAdd - waitingLock sync.Mutex - // waitingCond is used to notify the adding go func that it needs to check for items to add - waitingCond *sync.Cond - - // nextCheckTime is used to decide whether to add a notification timer. If the requested time - // is beyond the time we're already waiting for, we don't add a new timer thread - nextCheckTime *time.Time - // nextCheckLock serializes access to the notification time - nextCheckLock sync.Mutex - // nextCheckCancel is a channel to close to cancel the notification - nextCheckCancel chan struct{} + // waitingForAddCh is a buffered channel that feeds waitingForAdd + waitingForAddCh chan waitFor } // waitFor holds the data to add and the time it should be added @@ -82,25 +78,28 @@ type waitFor struct { // ShutDown gives a way to shut off this queue func (q *delayingType) ShutDown() { - q.Type.ShutDown() - q.waitingCond.Broadcast() + q.Interface.ShutDown() + close(q.stopCh) } +// AddAfter adds the given item to the work queue after the given delay func (q *delayingType) AddAfter(item interface{}, duration time.Duration) { - q.waitingLock.Lock() - defer q.waitingLock.Unlock() - waitEntry := waitFor{data: item, readyAt: q.clock.Now().Add(duration)} + // don't add if we're already shutting down + if q.ShuttingDown() { + return + } - insertionIndex := sort.Search(len(q.waitingForAdd), func(i int) bool { - return waitEntry.readyAt.Before(q.waitingForAdd[i].readyAt) - }) + // immediately add things with no delay + if duration <= 0 { + q.Add(item) + return + } - tail := q.waitingForAdd[insertionIndex:] - q.waitingForAdd = append(make([]waitFor, 0, len(q.waitingForAdd)+1), q.waitingForAdd[:insertionIndex]...) - q.waitingForAdd = append(q.waitingForAdd, waitEntry) - q.waitingForAdd = append(q.waitingForAdd, tail...) - - q.notifyAt(waitEntry.readyAt) + select { + case <-q.stopCh: + // unblock if ShutDown() is called + case q.waitingForAddCh <- waitFor{data: item, readyAt: q.clock.Now().Add(duration)}: + } } // maxWait keeps a max bound on the wait time. It's just insurance against weird things happening. @@ -112,115 +111,83 @@ const maxWait = 10 * time.Second func (q *delayingType) waitingLoop() { defer utilruntime.HandleCrash() + // Make a placeholder channel to use when there are no items in our list + never := make(<-chan time.Time) + for { - if q.shuttingDown { + if q.Interface.ShuttingDown() { + // discard waiting entries + q.waitingForAdd = nil return } - func() { - q.waitingCond.L.Lock() - defer q.waitingCond.L.Unlock() - q.waitingCond.Wait() + now := q.clock.Now() - if q.shuttingDown { - return + // Add ready entries + readyEntries := 0 + for _, entry := range q.waitingForAdd { + if entry.readyAt.After(now) { + break } - - q.waitingLock.Lock() - defer q.waitingLock.Unlock() - - nextReadyCheck := time.Time{} - itemsAdded := 0 - - for _, queuedItem := range q.waitingForAdd { - nextReadyCheck = queuedItem.readyAt - if queuedItem.readyAt.After(q.clock.Now()) { - break - } - q.Type.Add(queuedItem.data) - itemsAdded++ - } - - switch itemsAdded { - case 0: - // no change - case len(q.waitingForAdd): - // consumed everything - q.waitingForAdd = make([]waitFor, 0, len(q.waitingForAdd)) - - default: - // consumed some - q.waitingForAdd = q.waitingForAdd[itemsAdded:] - - if len(q.waitingForAdd) > 0 { - q.notifyAt(nextReadyCheck) - } - } - }() - } -} - -// heartbeat forces a check every maxWait seconds -func (q *delayingType) heartbeat() { - defer utilruntime.HandleCrash() - - for { - if q.shuttingDown { - return + q.Add(entry.data) + readyEntries++ } + q.waitingForAdd = q.waitingForAdd[readyEntries:] - ch := q.clock.After(maxWait) - <-ch - q.waitingCond.Broadcast() - } -} - -// clearNextCheckTimeIf resets the nextCheckTime if it matches the expected value to ensure that the subsequent notification will take effect. -func (q *delayingType) clearNextCheckTimeIf(nextReadyCheck time.Time) { - q.nextCheckLock.Lock() - defer q.nextCheckLock.Unlock() - - if q.nextCheckTime != nil && *q.nextCheckTime == nextReadyCheck { - q.nextCheckTime = nil - } -} - -// notifyAt: if the requested nextReadyCheck is sooner than the current check, then a new go func is -// spawned to notify the condition that the waitingLoop is waiting for after the time is up. The previous go func -// is cancelled -func (q *delayingType) notifyAt(nextReadyCheck time.Time) { - q.nextCheckLock.Lock() - defer q.nextCheckLock.Unlock() - - now := q.clock.Now() - if (q.nextCheckTime != nil && (nextReadyCheck.After(*q.nextCheckTime) || nextReadyCheck == *q.nextCheckTime)) || nextReadyCheck.Before(now) { - return - } - - duration := nextReadyCheck.Sub(now) - q.nextCheckTime = &nextReadyCheck - ch := q.clock.After(duration) - - newCancel := make(chan struct{}) - oldCancel := q.nextCheckCancel - // always cancel the old notifier - if oldCancel != nil { - close(oldCancel) - } - q.nextCheckCancel = newCancel - - go func() { - defer utilruntime.HandleCrash() + // Set up a wait for the first item's readyAt (if one exists) + nextReadyAt := never + if len(q.waitingForAdd) > 0 { + nextReadyAt = q.clock.After(q.waitingForAdd[0].readyAt.Sub(now)) + } select { - case <-ch: - // we only have one of these go funcs active at a time. If we hit our timer, then clear - // the check time so that the next add will win - q.clearNextCheckTimeIf(nextReadyCheck) - q.waitingCond.Broadcast() + case <-q.stopCh: + return - case <-newCancel: - // do nothing, cancelled + case <-q.heartbeat: + // continue the loop, which will add ready items + + case <-nextReadyAt: + // continue the loop, which will add ready items + + case waitEntry := <-q.waitingForAddCh: + if waitEntry.readyAt.After(q.clock.Now()) { + q.waitingForAdd = insert(q.waitingForAdd, waitEntry) + } else { + q.Add(waitEntry.data) + } + + drained := false + for !drained { + select { + case waitEntry := <-q.waitingForAddCh: + if waitEntry.readyAt.After(q.clock.Now()) { + q.waitingForAdd = insert(q.waitingForAdd, waitEntry) + } else { + q.Add(waitEntry.data) + } + default: + drained = true + } + } } - }() + } +} + +// inserts the given entry into the sorted entries list +// same semantics as append()... the given slice may be modified, +// and the returned value should be used +func insert(entries []waitFor, entry waitFor) []waitFor { + insertionIndex := sort.Search(len(entries), func(i int) bool { + return entry.readyAt.Before(entries[i].readyAt) + }) + + // grow by 1 + entries = append(entries, waitFor{}) + // shift items from the insertion point to the end + copy(entries[insertionIndex+1:], entries[insertionIndex:]) + // insert the record + entries[insertionIndex] = entry + + return entries } diff --git a/pkg/util/workqueue/delaying_queue_test.go b/pkg/util/workqueue/delaying_queue_test.go index d0b2fcc1484..cc2eb165c6d 100644 --- a/pkg/util/workqueue/delaying_queue_test.go +++ b/pkg/util/workqueue/delaying_queue_test.go @@ -33,6 +33,9 @@ func TestSimpleQueue(t *testing.T) { first := "foo" q.AddAfter(first, 50*time.Millisecond) + if err := waitForWaitingQueueToFill(q); err != nil { + t.Fatalf("unexpected err: %v", err) + } if q.Len() != 0 { t.Errorf("should not have added") @@ -40,14 +43,7 @@ func TestSimpleQueue(t *testing.T) { fakeClock.Step(60 * time.Millisecond) - err := wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) { - if q.Len() == 1 { - return true, nil - } - - return false, nil - }) - if err != nil { + if err := waitForAdded(q, 1); err != nil { t.Errorf("should have added") } item, _ := q.Get() @@ -56,7 +52,7 @@ func TestSimpleQueue(t *testing.T) { // step past the next heartbeat fakeClock.Step(10 * time.Second) - err = wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) { + err := wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) { if q.Len() > 0 { return false, fmt.Errorf("added to queue") } @@ -82,21 +78,18 @@ func TestAddTwoFireEarly(t *testing.T) { q.AddAfter(first, 1*time.Second) q.AddAfter(second, 50*time.Millisecond) + if err := waitForWaitingQueueToFill(q); err != nil { + t.Fatalf("unexpected err: %v", err) + } if q.Len() != 0 { t.Errorf("should not have added") } fakeClock.Step(60 * time.Millisecond) - err := wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) { - if q.Len() == 1 { - return true, nil - } - return false, nil - }) - if err != nil { - t.Fatalf("should have added") + if err := waitForAdded(q, 1); err != nil { + t.Fatalf("unexpected err: %v", err) } item, _ := q.Get() if !reflect.DeepEqual(item, second) { @@ -106,15 +99,8 @@ func TestAddTwoFireEarly(t *testing.T) { q.AddAfter(third, 2*time.Second) fakeClock.Step(1 * time.Second) - err = wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) { - if q.Len() == 1 { - return true, nil - } - - return false, nil - }) - if err != nil { - t.Fatalf("should have added") + if err := waitForAdded(q, 1); err != nil { + t.Fatalf("unexpected err: %v", err) } item, _ = q.Get() if !reflect.DeepEqual(item, first) { @@ -122,15 +108,8 @@ func TestAddTwoFireEarly(t *testing.T) { } fakeClock.Step(2 * time.Second) - err = wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) { - if q.Len() == 1 { - return true, nil - } - - return false, nil - }) - if err != nil { - t.Fatalf("should have added") + if err := waitForAdded(q, 1); err != nil { + t.Fatalf("unexpected err: %v", err) } item, _ = q.Get() if !reflect.DeepEqual(item, third) { @@ -138,3 +117,61 @@ func TestAddTwoFireEarly(t *testing.T) { } } + +func TestCopyShifting(t *testing.T) { + fakeClock := util.NewFakeClock(time.Now()) + q := newDelayingQueue(fakeClock) + + first := "foo" + second := "bar" + third := "baz" + + q.AddAfter(first, 1*time.Second) + q.AddAfter(second, 500*time.Millisecond) + q.AddAfter(third, 250*time.Millisecond) + if err := waitForWaitingQueueToFill(q); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + if q.Len() != 0 { + t.Errorf("should not have added") + } + + fakeClock.Step(2 * time.Second) + + if err := waitForAdded(q, 3); err != nil { + t.Fatalf("unexpected err: %v", err) + } + actualFirst, _ := q.Get() + if !reflect.DeepEqual(actualFirst, third) { + t.Errorf("expected %v, got %v", third, actualFirst) + } + actualSecond, _ := q.Get() + if !reflect.DeepEqual(actualSecond, second) { + t.Errorf("expected %v, got %v", second, actualSecond) + } + actualThird, _ := q.Get() + if !reflect.DeepEqual(actualThird, first) { + t.Errorf("expected %v, got %v", first, actualThird) + } +} + +func waitForAdded(q DelayingInterface, depth int) error { + return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) { + if q.Len() == depth { + return true, nil + } + + return false, nil + }) +} + +func waitForWaitingQueueToFill(q DelayingInterface) error { + return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) { + if len(q.(*delayingType).waitingForAddCh) == 0 { + return true, nil + } + + return false, nil + }) +} diff --git a/pkg/util/workqueue/queue.go b/pkg/util/workqueue/queue.go index cbdaac12519..40f2ba2fa7a 100644 --- a/pkg/util/workqueue/queue.go +++ b/pkg/util/workqueue/queue.go @@ -26,6 +26,7 @@ type Interface interface { Get() (item interface{}, shutdown bool) Done(item interface{}) ShutDown() + ShuttingDown() bool } // New constructs a new workqueue (see the package comment). @@ -143,3 +144,10 @@ func (q *Type) ShutDown() { q.shuttingDown = true q.cond.Broadcast() } + +func (q *Type) ShuttingDown() bool { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + return q.shuttingDown +}