diff --git a/pkg/util/clock.go b/pkg/util/clock.go index ac2d738d628..474cbb68d11 100644 --- a/pkg/util/clock.go +++ b/pkg/util/clock.go @@ -28,6 +28,7 @@ type Clock interface { Since(time.Time) time.Duration After(d time.Duration) <-chan time.Time Sleep(d time.Duration) + Tick(d time.Duration) <-chan time.Time } var ( @@ -54,6 +55,10 @@ func (RealClock) After(d time.Duration) <-chan time.Time { return time.After(d) } +func (RealClock) Tick(d time.Duration) <-chan time.Time { + return time.Tick(d) +} + func (RealClock) Sleep(d time.Duration) { time.Sleep(d) } @@ -68,8 +73,10 @@ type FakeClock struct { } type fakeClockWaiter struct { - targetTime time.Time - destChan chan<- time.Time + targetTime time.Time + stepInterval time.Duration + skipIfBlocked bool + destChan chan<- time.Time } func NewFakeClock(t time.Time) *FakeClock { @@ -105,7 +112,22 @@ func (f *FakeClock) After(d time.Duration) <-chan time.Time { return ch } -// Move clock by Duration, notify anyone that's called After +func (f *FakeClock) Tick(d time.Duration) <-chan time.Time { + f.lock.Lock() + defer f.lock.Unlock() + tickTime := f.time.Add(d) + ch := make(chan time.Time, 1) // hold one tick + f.waiters = append(f.waiters, fakeClockWaiter{ + targetTime: tickTime, + stepInterval: d, + skipIfBlocked: true, + destChan: ch, + }) + + return ch +} + +// Move clock by Duration, notify anyone that's called After or Tick func (f *FakeClock) Step(d time.Duration) { f.lock.Lock() defer f.lock.Unlock() @@ -126,7 +148,23 @@ func (f *FakeClock) setTimeLocked(t time.Time) { for i := range f.waiters { w := &f.waiters[i] if !w.targetTime.After(t) { - w.destChan <- t + + if w.skipIfBlocked { + select { + case w.destChan <- t: + default: + } + } else { + w.destChan <- t + } + + if w.stepInterval > 0 { + for !w.targetTime.After(t) { + w.targetTime = w.targetTime.Add(w.stepInterval) + } + newWaiters = append(newWaiters, *w) + } + } else { newWaiters = append(newWaiters, f.waiters[i]) } @@ -169,6 +207,12 @@ func (*IntervalClock) After(d time.Duration) <-chan time.Time { panic("IntervalClock doesn't implement After") } +// Unimplemented, will panic. +// TODO: make interval clock use FakeClock so this can be implemented. +func (*IntervalClock) Tick(d time.Duration) <-chan time.Time { + panic("IntervalClock doesn't implement Tick") +} + func (*IntervalClock) Sleep(d time.Duration) { panic("IntervalClock doesn't implement Sleep") } diff --git a/pkg/util/clock_test.go b/pkg/util/clock_test.go index 8205541e8b1..ee60fcb0d23 100644 --- a/pkg/util/clock_test.go +++ b/pkg/util/clock_test.go @@ -104,3 +104,81 @@ func TestFakeAfter(t *testing.T) { t.Errorf("unexpected non-channel read") } } + +func TestFakeTick(t *testing.T) { + tc := NewFakeClock(time.Now()) + if tc.HasWaiters() { + t.Errorf("unexpected waiter?") + } + oneSec := tc.Tick(time.Second) + if !tc.HasWaiters() { + t.Errorf("unexpected lack of waiter?") + } + + oneOhOneSec := tc.Tick(time.Second + time.Millisecond) + twoSec := tc.Tick(2 * time.Second) + select { + case <-oneSec: + t.Errorf("unexpected channel read") + case <-oneOhOneSec: + t.Errorf("unexpected channel read") + case <-twoSec: + t.Errorf("unexpected channel read") + default: + } + + tc.Step(999 * time.Millisecond) // t=.999 + select { + case <-oneSec: + t.Errorf("unexpected channel read") + case <-oneOhOneSec: + t.Errorf("unexpected channel read") + case <-twoSec: + t.Errorf("unexpected channel read") + default: + } + + tc.Step(time.Millisecond) // t=1.000 + select { + case <-oneSec: + // Expected! + case <-oneOhOneSec: + t.Errorf("unexpected channel read") + case <-twoSec: + t.Errorf("unexpected channel read") + default: + t.Errorf("unexpected non-channel read") + } + tc.Step(time.Millisecond) // t=1.001 + select { + case <-oneSec: + // should not double-trigger! + t.Errorf("unexpected channel read") + case <-oneOhOneSec: + // Expected! + case <-twoSec: + t.Errorf("unexpected channel read") + default: + t.Errorf("unexpected non-channel read") + } + + tc.Step(time.Second) // t=2.001 + tc.Step(time.Second) // t=3.001 + tc.Step(time.Second) // t=4.001 + tc.Step(time.Second) // t=5.001 + + // The one second ticker should not accumulate ticks + accumulatedTicks := 0 + drained := false + for !drained { + select { + case <-oneSec: + accumulatedTicks++ + default: + drained = true + } + } + if accumulatedTicks != 1 { + t.Errorf("unexpected number of accumulated ticks: %d", accumulatedTicks) + } +} diff --git a/pkg/util/workqueue/delaying_queue.go b/pkg/util/workqueue/delaying_queue.go new file mode 100644 index 00000000000..429d15c4a1e --- /dev/null +++ b/pkg/util/workqueue/delaying_queue.go @@ -0,0 +1,193 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "sort" + "time" + + "k8s.io/kubernetes/pkg/util" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" +) + +// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to +// requeue items after failures without ending up in a hot-loop. +type DelayingInterface interface { + Interface + // AddAfter adds an item to the workqueue after the indicated duration has passed + AddAfter(item interface{}, duration time.Duration) +} + +// NewDelayingQueue constructs a new workqueue with delayed queuing ability +func NewDelayingQueue() DelayingInterface { + return newDelayingQueue(util.RealClock{}) +} + +func newDelayingQueue(clock util.Clock) DelayingInterface { + ret := &delayingType{ + Interface: New(), + clock: clock, + heartbeat: clock.Tick(maxWait), + stopCh: make(chan struct{}), + waitingForAddCh: make(chan waitFor, 1000), + } + + go ret.waitingLoop() + + return ret +} + +// delayingType wraps an Interface and provides delayed re-enquing +type delayingType struct { + Interface + + // clock tracks time for delayed firing + clock util.Clock + + // stopCh lets us signal a shutdown to the waiting loop + stopCh chan struct{} + + // heartbeat ensures we wait no more than maxWait before firing + heartbeat <-chan time.Time + + // waitingForAdd is an ordered slice of items to be added to the contained work queue + waitingForAdd []waitFor + // waitingForAddCh is a buffered channel that feeds waitingForAdd + waitingForAddCh chan waitFor +} + +// waitFor holds the data to add and the time it should be added +type waitFor struct { + data t + readyAt time.Time +} + +// ShutDown gives a way to shut off this queue +func (q *delayingType) ShutDown() { + q.Interface.ShutDown() + close(q.stopCh) +} + +// AddAfter adds the given item to the work queue after the given delay +func (q *delayingType) AddAfter(item interface{}, duration time.Duration) { + // don't add if we're already shutting down + if q.ShuttingDown() { + return + } + + // immediately add things with no delay + if duration <= 0 { + q.Add(item) + return + } + + select { + case <-q.stopCh: + // unblock if ShutDown() is called + case q.waitingForAddCh <- waitFor{data: item, readyAt: q.clock.Now().Add(duration)}: + } +} + +// maxWait keeps a max bound on the wait time. It's just insurance against weird things happening. +// Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an +// expired item sitting for more than 10 seconds. +const maxWait = 10 * time.Second + +// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added. +func (q *delayingType) waitingLoop() { + defer utilruntime.HandleCrash() + + // Make a placeholder channel to use when there are no items in our list + never := make(<-chan time.Time) + + for { + if q.Interface.ShuttingDown() { + // discard waiting entries + q.waitingForAdd = nil + return + } + + now := q.clock.Now() + + // Add ready entries + readyEntries := 0 + for _, entry := range q.waitingForAdd { + if entry.readyAt.After(now) { + break + } + q.Add(entry.data) + readyEntries++ + } + q.waitingForAdd = q.waitingForAdd[readyEntries:] + + // Set up a wait for the first item's readyAt (if one exists) + nextReadyAt := never + if len(q.waitingForAdd) > 0 { + nextReadyAt = q.clock.After(q.waitingForAdd[0].readyAt.Sub(now)) + } + + select { + case <-q.stopCh: + return + + case <-q.heartbeat: + // continue the loop, which will add ready items + + case <-nextReadyAt: + // continue the loop, which will add ready items + + case waitEntry := <-q.waitingForAddCh: + if waitEntry.readyAt.After(q.clock.Now()) { + q.waitingForAdd = insert(q.waitingForAdd, waitEntry) + } else { + q.Add(waitEntry.data) + } + + drained := false + for !drained { + select { + case waitEntry := <-q.waitingForAddCh: + if waitEntry.readyAt.After(q.clock.Now()) { + q.waitingForAdd = insert(q.waitingForAdd, waitEntry) + } else { + q.Add(waitEntry.data) + } + default: + drained = true + } + } + } + } +} + +// inserts the given entry into the sorted entries list +// same semantics as append()... the given slice may be modified, +// and the returned value should be used +func insert(entries []waitFor, entry waitFor) []waitFor { + insertionIndex := sort.Search(len(entries), func(i int) bool { + return entry.readyAt.Before(entries[i].readyAt) + }) + + // grow by 1 + entries = append(entries, waitFor{}) + // shift items from the insertion point to the end + copy(entries[insertionIndex+1:], entries[insertionIndex:]) + // insert the record + entries[insertionIndex] = entry + + return entries +} diff --git a/pkg/util/workqueue/delaying_queue_test.go b/pkg/util/workqueue/delaying_queue_test.go new file mode 100644 index 00000000000..cc2eb165c6d --- /dev/null +++ b/pkg/util/workqueue/delaying_queue_test.go @@ -0,0 +1,177 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "fmt" + "reflect" + "testing" + "time" + + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" +) + +func TestSimpleQueue(t *testing.T) { + fakeClock := util.NewFakeClock(time.Now()) + q := newDelayingQueue(fakeClock) + + first := "foo" + + q.AddAfter(first, 50*time.Millisecond) + if err := waitForWaitingQueueToFill(q); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + if q.Len() != 0 { + t.Errorf("should not have added") + } + + fakeClock.Step(60 * time.Millisecond) + + if err := waitForAdded(q, 1); err != nil { + t.Errorf("should have added") + } + item, _ := q.Get() + q.Done(item) + + // step past the next heartbeat + fakeClock.Step(10 * time.Second) + + err := wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) { + if q.Len() > 0 { + return false, fmt.Errorf("added to queue") + } + + return false, nil + }) + if err != wait.ErrWaitTimeout { + t.Errorf("expected timeout, got: %v", err) + } + + if q.Len() != 0 { + t.Errorf("should not have added") + } +} + +func TestAddTwoFireEarly(t *testing.T) { + fakeClock := util.NewFakeClock(time.Now()) + q := newDelayingQueue(fakeClock) + + first := "foo" + second := "bar" + third := "baz" + + q.AddAfter(first, 1*time.Second) + q.AddAfter(second, 50*time.Millisecond) + if err := waitForWaitingQueueToFill(q); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + if q.Len() != 0 { + t.Errorf("should not have added") + } + + fakeClock.Step(60 * time.Millisecond) + + if err := waitForAdded(q, 1); err != nil { + t.Fatalf("unexpected err: %v", err) + } + item, _ := q.Get() + if !reflect.DeepEqual(item, second) { + t.Errorf("expected %v, got %v", second, item) + } + + q.AddAfter(third, 2*time.Second) + + fakeClock.Step(1 * time.Second) + if err := waitForAdded(q, 1); err != nil { + t.Fatalf("unexpected err: %v", err) + } + item, _ = q.Get() + if !reflect.DeepEqual(item, first) { + t.Errorf("expected %v, got %v", first, item) + } + + fakeClock.Step(2 * time.Second) + if err := waitForAdded(q, 1); err != nil { + t.Fatalf("unexpected err: %v", err) + } + item, _ = q.Get() + if !reflect.DeepEqual(item, third) { + t.Errorf("expected %v, got %v", third, item) + } + +} + +func TestCopyShifting(t *testing.T) { + fakeClock := util.NewFakeClock(time.Now()) + q := newDelayingQueue(fakeClock) + + first := "foo" + second := "bar" + third := "baz" + + q.AddAfter(first, 1*time.Second) + q.AddAfter(second, 500*time.Millisecond) + q.AddAfter(third, 250*time.Millisecond) + if err := waitForWaitingQueueToFill(q); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + if q.Len() != 0 { + t.Errorf("should not have added") + } + + fakeClock.Step(2 * time.Second) + + if err := waitForAdded(q, 3); err != nil { + t.Fatalf("unexpected err: %v", err) + } + actualFirst, _ := q.Get() + if !reflect.DeepEqual(actualFirst, third) { + t.Errorf("expected %v, got %v", third, actualFirst) + } + actualSecond, _ := q.Get() + if !reflect.DeepEqual(actualSecond, second) { + t.Errorf("expected %v, got %v", second, actualSecond) + } + actualThird, _ := q.Get() + if !reflect.DeepEqual(actualThird, first) { + t.Errorf("expected %v, got %v", first, actualThird) + } +} + +func waitForAdded(q DelayingInterface, depth int) error { + return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) { + if q.Len() == depth { + return true, nil + } + + return false, nil + }) +} + +func waitForWaitingQueueToFill(q DelayingInterface) error { + return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) { + if len(q.(*delayingType).waitingForAddCh) == 0 { + return true, nil + } + + return false, nil + }) +} diff --git a/pkg/util/workqueue/queue.go b/pkg/util/workqueue/queue.go index 4d5239040dc..40f2ba2fa7a 100644 --- a/pkg/util/workqueue/queue.go +++ b/pkg/util/workqueue/queue.go @@ -20,6 +20,15 @@ import ( "sync" ) +type Interface interface { + Add(item interface{}) + Len() int + Get() (item interface{}, shutdown bool) + Done(item interface{}) + ShutDown() + ShuttingDown() bool +} + // New constructs a new workqueue (see the package comment). func New() *Type { return &Type{ @@ -135,3 +144,10 @@ func (q *Type) ShutDown() { q.shuttingDown = true q.cond.Broadcast() } + +func (q *Type) ShuttingDown() bool { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + return q.shuttingDown +}