From d12a4d6d5a2a638ffc728af2ff0ba5d512d8cbaa Mon Sep 17 00:00:00 2001 From: deads2k Date: Thu, 24 Mar 2016 14:53:28 -0400 Subject: [PATCH 1/3] add a delayed queueing option to the workqueue --- pkg/util/workqueue/delaying_queue.go | 226 ++++++++++++++++++++++ pkg/util/workqueue/delaying_queue_test.go | 140 ++++++++++++++ pkg/util/workqueue/queue.go | 8 + 3 files changed, 374 insertions(+) create mode 100644 pkg/util/workqueue/delaying_queue.go create mode 100644 pkg/util/workqueue/delaying_queue_test.go diff --git a/pkg/util/workqueue/delaying_queue.go b/pkg/util/workqueue/delaying_queue.go new file mode 100644 index 00000000000..5fef16740f3 --- /dev/null +++ b/pkg/util/workqueue/delaying_queue.go @@ -0,0 +1,226 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "sort" + "sync" + "time" + + "k8s.io/kubernetes/pkg/util" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" +) + +// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to +// requeue items after failures without ending up in a hot-loop. +type DelayingInterface interface { + Interface + // AddAfter adds an item to the workqueue after the indicated duration has passed + AddAfter(item interface{}, duration time.Duration) +} + +// NewDelayingQueue constructs a new workqueue with delayed queuing ability +func NewDelayingQueue() DelayingInterface { + return newDelayingQueue(util.RealClock{}) +} + +func newDelayingQueue(clock util.Clock) DelayingInterface { + ret := &delayingType{ + Type: New(), + clock: clock, + waitingCond: sync.NewCond(&sync.Mutex{}), + } + + go ret.waitingLoop() + go ret.heartbeat() + + return ret +} + +// delayingType wraps a Type and provides delayed re-enquing +type delayingType struct { + *Type + + // clock tracks time for delayed firing + clock util.Clock + + // waitingForAdd is an ordered slice of items to be added to the contained work queue + waitingForAdd []waitFor + // waitingLock synchronizes access to waitingForAdd + waitingLock sync.Mutex + // waitingCond is used to notify the adding go func that it needs to check for items to add + waitingCond *sync.Cond + + // nextCheckTime is used to decide whether to add a notification timer. If the requested time + // is beyond the time we're already waiting for, we don't add a new timer thread + nextCheckTime *time.Time + // nextCheckLock serializes access to the notification time + nextCheckLock sync.Mutex + // nextCheckCancel is a channel to close to cancel the notification + nextCheckCancel chan struct{} +} + +// waitFor holds the data to add and the time it should be added +type waitFor struct { + data t + readyAt time.Time +} + +// ShutDown gives a way to shut off this queue +func (q *delayingType) ShutDown() { + q.Type.ShutDown() + q.waitingCond.Broadcast() +} + +func (q *delayingType) AddAfter(item interface{}, duration time.Duration) { + q.waitingLock.Lock() + defer q.waitingLock.Unlock() + waitEntry := waitFor{data: item, readyAt: q.clock.Now().Add(duration)} + + insertionIndex := sort.Search(len(q.waitingForAdd), func(i int) bool { + return waitEntry.readyAt.Before(q.waitingForAdd[i].readyAt) + }) + + tail := q.waitingForAdd[insertionIndex:] + q.waitingForAdd = append(make([]waitFor, 0, len(q.waitingForAdd)+1), q.waitingForAdd[:insertionIndex]...) + q.waitingForAdd = append(q.waitingForAdd, waitEntry) + q.waitingForAdd = append(q.waitingForAdd, tail...) + + q.notifyAt(waitEntry.readyAt) +} + +// maxWait keeps a max bound on the wait time. It's just insurance against weird things happening. +// Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an +// expired item sitting for more than 10 seconds. +const maxWait = 10 * time.Second + +// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added. +func (q *delayingType) waitingLoop() { + defer utilruntime.HandleCrash() + + for { + if q.shuttingDown { + return + } + + func() { + q.waitingCond.L.Lock() + defer q.waitingCond.L.Unlock() + q.waitingCond.Wait() + + if q.shuttingDown { + return + } + + q.waitingLock.Lock() + defer q.waitingLock.Unlock() + + nextReadyCheck := time.Time{} + itemsAdded := 0 + + for _, queuedItem := range q.waitingForAdd { + nextReadyCheck = queuedItem.readyAt + if queuedItem.readyAt.After(q.clock.Now()) { + break + } + q.Type.Add(queuedItem.data) + itemsAdded++ + } + + switch itemsAdded { + case 0: + // no change + case len(q.waitingForAdd): + // consumed everything + q.waitingForAdd = make([]waitFor, 0, len(q.waitingForAdd)) + + default: + // consumed some + q.waitingForAdd = q.waitingForAdd[itemsAdded:] + + if len(q.waitingForAdd) > 0 { + q.notifyAt(nextReadyCheck) + } + } + }() + } +} + +// heartbeat forces a check every maxWait seconds +func (q *delayingType) heartbeat() { + defer utilruntime.HandleCrash() + + for { + if q.shuttingDown { + return + } + + ch := q.clock.After(maxWait) + <-ch + q.waitingCond.Broadcast() + } +} + +// clearNextCheckTimeIf resets the nextCheckTime if it matches the expected value to ensure that the subsequent notification will take effect. +func (q *delayingType) clearNextCheckTimeIf(nextReadyCheck time.Time) { + q.nextCheckLock.Lock() + defer q.nextCheckLock.Unlock() + + if q.nextCheckTime != nil && *q.nextCheckTime == nextReadyCheck { + q.nextCheckTime = nil + } +} + +// notifyAt: if the requested nextReadyCheck is sooner than the current check, then a new go func is +// spawned to notify the condition that the waitingLoop is waiting for after the time is up. The previous go func +// is cancelled +func (q *delayingType) notifyAt(nextReadyCheck time.Time) { + q.nextCheckLock.Lock() + defer q.nextCheckLock.Unlock() + + now := q.clock.Now() + if (q.nextCheckTime != nil && (nextReadyCheck.After(*q.nextCheckTime) || nextReadyCheck == *q.nextCheckTime)) || nextReadyCheck.Before(now) { + return + } + + duration := nextReadyCheck.Sub(now) + q.nextCheckTime = &nextReadyCheck + ch := q.clock.After(duration) + + newCancel := make(chan struct{}) + oldCancel := q.nextCheckCancel + // always cancel the old notifier + if oldCancel != nil { + close(oldCancel) + } + q.nextCheckCancel = newCancel + + go func() { + defer utilruntime.HandleCrash() + + select { + case <-ch: + // we only have one of these go funcs active at a time. If we hit our timer, then clear + // the check time so that the next add will win + q.clearNextCheckTimeIf(nextReadyCheck) + q.waitingCond.Broadcast() + + case <-newCancel: + // do nothing, cancelled + } + }() +} diff --git a/pkg/util/workqueue/delaying_queue_test.go b/pkg/util/workqueue/delaying_queue_test.go new file mode 100644 index 00000000000..d0b2fcc1484 --- /dev/null +++ b/pkg/util/workqueue/delaying_queue_test.go @@ -0,0 +1,140 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "fmt" + "reflect" + "testing" + "time" + + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" +) + +func TestSimpleQueue(t *testing.T) { + fakeClock := util.NewFakeClock(time.Now()) + q := newDelayingQueue(fakeClock) + + first := "foo" + + q.AddAfter(first, 50*time.Millisecond) + + if q.Len() != 0 { + t.Errorf("should not have added") + } + + fakeClock.Step(60 * time.Millisecond) + + err := wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) { + if q.Len() == 1 { + return true, nil + } + + return false, nil + }) + if err != nil { + t.Errorf("should have added") + } + item, _ := q.Get() + q.Done(item) + + // step past the next heartbeat + fakeClock.Step(10 * time.Second) + + err = wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) { + if q.Len() > 0 { + return false, fmt.Errorf("added to queue") + } + + return false, nil + }) + if err != wait.ErrWaitTimeout { + t.Errorf("expected timeout, got: %v", err) + } + + if q.Len() != 0 { + t.Errorf("should not have added") + } +} + +func TestAddTwoFireEarly(t *testing.T) { + fakeClock := util.NewFakeClock(time.Now()) + q := newDelayingQueue(fakeClock) + + first := "foo" + second := "bar" + third := "baz" + + q.AddAfter(first, 1*time.Second) + q.AddAfter(second, 50*time.Millisecond) + + if q.Len() != 0 { + t.Errorf("should not have added") + } + + fakeClock.Step(60 * time.Millisecond) + err := wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) { + if q.Len() == 1 { + return true, nil + } + + return false, nil + }) + if err != nil { + t.Fatalf("should have added") + } + item, _ := q.Get() + if !reflect.DeepEqual(item, second) { + t.Errorf("expected %v, got %v", second, item) + } + + q.AddAfter(third, 2*time.Second) + + fakeClock.Step(1 * time.Second) + err = wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) { + if q.Len() == 1 { + return true, nil + } + + return false, nil + }) + if err != nil { + t.Fatalf("should have added") + } + item, _ = q.Get() + if !reflect.DeepEqual(item, first) { + t.Errorf("expected %v, got %v", first, item) + } + + fakeClock.Step(2 * time.Second) + err = wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) { + if q.Len() == 1 { + return true, nil + } + + return false, nil + }) + if err != nil { + t.Fatalf("should have added") + } + item, _ = q.Get() + if !reflect.DeepEqual(item, third) { + t.Errorf("expected %v, got %v", third, item) + } + +} diff --git a/pkg/util/workqueue/queue.go b/pkg/util/workqueue/queue.go index 4d5239040dc..cbdaac12519 100644 --- a/pkg/util/workqueue/queue.go +++ b/pkg/util/workqueue/queue.go @@ -20,6 +20,14 @@ import ( "sync" ) +type Interface interface { + Add(item interface{}) + Len() int + Get() (item interface{}, shutdown bool) + Done(item interface{}) + ShutDown() +} + // New constructs a new workqueue (see the package comment). func New() *Type { return &Type{ From 290d970282f7287508f3be8501922b84944914bc Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Wed, 6 Apr 2016 09:23:12 -0400 Subject: [PATCH 2/3] make delayed workqueue use channels with single writer --- pkg/util/workqueue/delaying_queue.go | 231 ++++++++++------------ pkg/util/workqueue/delaying_queue_test.go | 107 ++++++---- pkg/util/workqueue/queue.go | 8 + 3 files changed, 179 insertions(+), 167 deletions(-) diff --git a/pkg/util/workqueue/delaying_queue.go b/pkg/util/workqueue/delaying_queue.go index 5fef16740f3..68098c5b895 100644 --- a/pkg/util/workqueue/delaying_queue.go +++ b/pkg/util/workqueue/delaying_queue.go @@ -18,7 +18,6 @@ package workqueue import ( "sort" - "sync" "time" "k8s.io/kubernetes/pkg/util" @@ -40,38 +39,35 @@ func NewDelayingQueue() DelayingInterface { func newDelayingQueue(clock util.Clock) DelayingInterface { ret := &delayingType{ - Type: New(), - clock: clock, - waitingCond: sync.NewCond(&sync.Mutex{}), + Interface: New(), + clock: clock, + heartbeat: time.Tick(maxWait), + stopCh: make(chan struct{}), + waitingForAddCh: make(chan waitFor, 1000), } go ret.waitingLoop() - go ret.heartbeat() return ret } -// delayingType wraps a Type and provides delayed re-enquing +// delayingType wraps an Interface and provides delayed re-enquing type delayingType struct { - *Type + Interface // clock tracks time for delayed firing clock util.Clock + // stopCh lets us signal a shutdown to the waiting loop + stopCh chan struct{} + + // heartbeat ensures we wait no more than maxWait before firing + heartbeat <-chan time.Time + // waitingForAdd is an ordered slice of items to be added to the contained work queue waitingForAdd []waitFor - // waitingLock synchronizes access to waitingForAdd - waitingLock sync.Mutex - // waitingCond is used to notify the adding go func that it needs to check for items to add - waitingCond *sync.Cond - - // nextCheckTime is used to decide whether to add a notification timer. If the requested time - // is beyond the time we're already waiting for, we don't add a new timer thread - nextCheckTime *time.Time - // nextCheckLock serializes access to the notification time - nextCheckLock sync.Mutex - // nextCheckCancel is a channel to close to cancel the notification - nextCheckCancel chan struct{} + // waitingForAddCh is a buffered channel that feeds waitingForAdd + waitingForAddCh chan waitFor } // waitFor holds the data to add and the time it should be added @@ -82,25 +78,28 @@ type waitFor struct { // ShutDown gives a way to shut off this queue func (q *delayingType) ShutDown() { - q.Type.ShutDown() - q.waitingCond.Broadcast() + q.Interface.ShutDown() + close(q.stopCh) } +// AddAfter adds the given item to the work queue after the given delay func (q *delayingType) AddAfter(item interface{}, duration time.Duration) { - q.waitingLock.Lock() - defer q.waitingLock.Unlock() - waitEntry := waitFor{data: item, readyAt: q.clock.Now().Add(duration)} + // don't add if we're already shutting down + if q.ShuttingDown() { + return + } - insertionIndex := sort.Search(len(q.waitingForAdd), func(i int) bool { - return waitEntry.readyAt.Before(q.waitingForAdd[i].readyAt) - }) + // immediately add things with no delay + if duration <= 0 { + q.Add(item) + return + } - tail := q.waitingForAdd[insertionIndex:] - q.waitingForAdd = append(make([]waitFor, 0, len(q.waitingForAdd)+1), q.waitingForAdd[:insertionIndex]...) - q.waitingForAdd = append(q.waitingForAdd, waitEntry) - q.waitingForAdd = append(q.waitingForAdd, tail...) - - q.notifyAt(waitEntry.readyAt) + select { + case <-q.stopCh: + // unblock if ShutDown() is called + case q.waitingForAddCh <- waitFor{data: item, readyAt: q.clock.Now().Add(duration)}: + } } // maxWait keeps a max bound on the wait time. It's just insurance against weird things happening. @@ -112,115 +111,83 @@ const maxWait = 10 * time.Second func (q *delayingType) waitingLoop() { defer utilruntime.HandleCrash() + // Make a placeholder channel to use when there are no items in our list + never := make(<-chan time.Time) + for { - if q.shuttingDown { + if q.Interface.ShuttingDown() { + // discard waiting entries + q.waitingForAdd = nil return } - func() { - q.waitingCond.L.Lock() - defer q.waitingCond.L.Unlock() - q.waitingCond.Wait() + now := q.clock.Now() - if q.shuttingDown { - return + // Add ready entries + readyEntries := 0 + for _, entry := range q.waitingForAdd { + if entry.readyAt.After(now) { + break } - - q.waitingLock.Lock() - defer q.waitingLock.Unlock() - - nextReadyCheck := time.Time{} - itemsAdded := 0 - - for _, queuedItem := range q.waitingForAdd { - nextReadyCheck = queuedItem.readyAt - if queuedItem.readyAt.After(q.clock.Now()) { - break - } - q.Type.Add(queuedItem.data) - itemsAdded++ - } - - switch itemsAdded { - case 0: - // no change - case len(q.waitingForAdd): - // consumed everything - q.waitingForAdd = make([]waitFor, 0, len(q.waitingForAdd)) - - default: - // consumed some - q.waitingForAdd = q.waitingForAdd[itemsAdded:] - - if len(q.waitingForAdd) > 0 { - q.notifyAt(nextReadyCheck) - } - } - }() - } -} - -// heartbeat forces a check every maxWait seconds -func (q *delayingType) heartbeat() { - defer utilruntime.HandleCrash() - - for { - if q.shuttingDown { - return + q.Add(entry.data) + readyEntries++ } + q.waitingForAdd = q.waitingForAdd[readyEntries:] - ch := q.clock.After(maxWait) - <-ch - q.waitingCond.Broadcast() - } -} - -// clearNextCheckTimeIf resets the nextCheckTime if it matches the expected value to ensure that the subsequent notification will take effect. -func (q *delayingType) clearNextCheckTimeIf(nextReadyCheck time.Time) { - q.nextCheckLock.Lock() - defer q.nextCheckLock.Unlock() - - if q.nextCheckTime != nil && *q.nextCheckTime == nextReadyCheck { - q.nextCheckTime = nil - } -} - -// notifyAt: if the requested nextReadyCheck is sooner than the current check, then a new go func is -// spawned to notify the condition that the waitingLoop is waiting for after the time is up. The previous go func -// is cancelled -func (q *delayingType) notifyAt(nextReadyCheck time.Time) { - q.nextCheckLock.Lock() - defer q.nextCheckLock.Unlock() - - now := q.clock.Now() - if (q.nextCheckTime != nil && (nextReadyCheck.After(*q.nextCheckTime) || nextReadyCheck == *q.nextCheckTime)) || nextReadyCheck.Before(now) { - return - } - - duration := nextReadyCheck.Sub(now) - q.nextCheckTime = &nextReadyCheck - ch := q.clock.After(duration) - - newCancel := make(chan struct{}) - oldCancel := q.nextCheckCancel - // always cancel the old notifier - if oldCancel != nil { - close(oldCancel) - } - q.nextCheckCancel = newCancel - - go func() { - defer utilruntime.HandleCrash() + // Set up a wait for the first item's readyAt (if one exists) + nextReadyAt := never + if len(q.waitingForAdd) > 0 { + nextReadyAt = q.clock.After(q.waitingForAdd[0].readyAt.Sub(now)) + } select { - case <-ch: - // we only have one of these go funcs active at a time. If we hit our timer, then clear - // the check time so that the next add will win - q.clearNextCheckTimeIf(nextReadyCheck) - q.waitingCond.Broadcast() + case <-q.stopCh: + return - case <-newCancel: - // do nothing, cancelled + case <-q.heartbeat: + // continue the loop, which will add ready items + + case <-nextReadyAt: + // continue the loop, which will add ready items + + case waitEntry := <-q.waitingForAddCh: + if waitEntry.readyAt.After(q.clock.Now()) { + q.waitingForAdd = insert(q.waitingForAdd, waitEntry) + } else { + q.Add(waitEntry.data) + } + + drained := false + for !drained { + select { + case waitEntry := <-q.waitingForAddCh: + if waitEntry.readyAt.After(q.clock.Now()) { + q.waitingForAdd = insert(q.waitingForAdd, waitEntry) + } else { + q.Add(waitEntry.data) + } + default: + drained = true + } + } } - }() + } +} + +// inserts the given entry into the sorted entries list +// same semantics as append()... the given slice may be modified, +// and the returned value should be used +func insert(entries []waitFor, entry waitFor) []waitFor { + insertionIndex := sort.Search(len(entries), func(i int) bool { + return entry.readyAt.Before(entries[i].readyAt) + }) + + // grow by 1 + entries = append(entries, waitFor{}) + // shift items from the insertion point to the end + copy(entries[insertionIndex+1:], entries[insertionIndex:]) + // insert the record + entries[insertionIndex] = entry + + return entries } diff --git a/pkg/util/workqueue/delaying_queue_test.go b/pkg/util/workqueue/delaying_queue_test.go index d0b2fcc1484..cc2eb165c6d 100644 --- a/pkg/util/workqueue/delaying_queue_test.go +++ b/pkg/util/workqueue/delaying_queue_test.go @@ -33,6 +33,9 @@ func TestSimpleQueue(t *testing.T) { first := "foo" q.AddAfter(first, 50*time.Millisecond) + if err := waitForWaitingQueueToFill(q); err != nil { + t.Fatalf("unexpected err: %v", err) + } if q.Len() != 0 { t.Errorf("should not have added") @@ -40,14 +43,7 @@ func TestSimpleQueue(t *testing.T) { fakeClock.Step(60 * time.Millisecond) - err := wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) { - if q.Len() == 1 { - return true, nil - } - - return false, nil - }) - if err != nil { + if err := waitForAdded(q, 1); err != nil { t.Errorf("should have added") } item, _ := q.Get() @@ -56,7 +52,7 @@ func TestSimpleQueue(t *testing.T) { // step past the next heartbeat fakeClock.Step(10 * time.Second) - err = wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) { + err := wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) { if q.Len() > 0 { return false, fmt.Errorf("added to queue") } @@ -82,21 +78,18 @@ func TestAddTwoFireEarly(t *testing.T) { q.AddAfter(first, 1*time.Second) q.AddAfter(second, 50*time.Millisecond) + if err := waitForWaitingQueueToFill(q); err != nil { + t.Fatalf("unexpected err: %v", err) + } if q.Len() != 0 { t.Errorf("should not have added") } fakeClock.Step(60 * time.Millisecond) - err := wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) { - if q.Len() == 1 { - return true, nil - } - return false, nil - }) - if err != nil { - t.Fatalf("should have added") + if err := waitForAdded(q, 1); err != nil { + t.Fatalf("unexpected err: %v", err) } item, _ := q.Get() if !reflect.DeepEqual(item, second) { @@ -106,15 +99,8 @@ func TestAddTwoFireEarly(t *testing.T) { q.AddAfter(third, 2*time.Second) fakeClock.Step(1 * time.Second) - err = wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) { - if q.Len() == 1 { - return true, nil - } - - return false, nil - }) - if err != nil { - t.Fatalf("should have added") + if err := waitForAdded(q, 1); err != nil { + t.Fatalf("unexpected err: %v", err) } item, _ = q.Get() if !reflect.DeepEqual(item, first) { @@ -122,15 +108,8 @@ func TestAddTwoFireEarly(t *testing.T) { } fakeClock.Step(2 * time.Second) - err = wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) { - if q.Len() == 1 { - return true, nil - } - - return false, nil - }) - if err != nil { - t.Fatalf("should have added") + if err := waitForAdded(q, 1); err != nil { + t.Fatalf("unexpected err: %v", err) } item, _ = q.Get() if !reflect.DeepEqual(item, third) { @@ -138,3 +117,61 @@ func TestAddTwoFireEarly(t *testing.T) { } } + +func TestCopyShifting(t *testing.T) { + fakeClock := util.NewFakeClock(time.Now()) + q := newDelayingQueue(fakeClock) + + first := "foo" + second := "bar" + third := "baz" + + q.AddAfter(first, 1*time.Second) + q.AddAfter(second, 500*time.Millisecond) + q.AddAfter(third, 250*time.Millisecond) + if err := waitForWaitingQueueToFill(q); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + if q.Len() != 0 { + t.Errorf("should not have added") + } + + fakeClock.Step(2 * time.Second) + + if err := waitForAdded(q, 3); err != nil { + t.Fatalf("unexpected err: %v", err) + } + actualFirst, _ := q.Get() + if !reflect.DeepEqual(actualFirst, third) { + t.Errorf("expected %v, got %v", third, actualFirst) + } + actualSecond, _ := q.Get() + if !reflect.DeepEqual(actualSecond, second) { + t.Errorf("expected %v, got %v", second, actualSecond) + } + actualThird, _ := q.Get() + if !reflect.DeepEqual(actualThird, first) { + t.Errorf("expected %v, got %v", first, actualThird) + } +} + +func waitForAdded(q DelayingInterface, depth int) error { + return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) { + if q.Len() == depth { + return true, nil + } + + return false, nil + }) +} + +func waitForWaitingQueueToFill(q DelayingInterface) error { + return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) { + if len(q.(*delayingType).waitingForAddCh) == 0 { + return true, nil + } + + return false, nil + }) +} diff --git a/pkg/util/workqueue/queue.go b/pkg/util/workqueue/queue.go index cbdaac12519..40f2ba2fa7a 100644 --- a/pkg/util/workqueue/queue.go +++ b/pkg/util/workqueue/queue.go @@ -26,6 +26,7 @@ type Interface interface { Get() (item interface{}, shutdown bool) Done(item interface{}) ShutDown() + ShuttingDown() bool } // New constructs a new workqueue (see the package comment). @@ -143,3 +144,10 @@ func (q *Type) ShutDown() { q.shuttingDown = true q.cond.Broadcast() } + +func (q *Type) ShuttingDown() bool { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + return q.shuttingDown +} From bf097ea233126a4d63f99cec25cdcc27f907794b Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Wed, 6 Apr 2016 13:06:46 -0400 Subject: [PATCH 3/3] fake util.clock tick --- pkg/util/clock.go | 52 +++++++++++++++++-- pkg/util/clock_test.go | 78 ++++++++++++++++++++++++++++ pkg/util/workqueue/delaying_queue.go | 2 +- 3 files changed, 127 insertions(+), 5 deletions(-) diff --git a/pkg/util/clock.go b/pkg/util/clock.go index ac2d738d628..474cbb68d11 100644 --- a/pkg/util/clock.go +++ b/pkg/util/clock.go @@ -28,6 +28,7 @@ type Clock interface { Since(time.Time) time.Duration After(d time.Duration) <-chan time.Time Sleep(d time.Duration) + Tick(d time.Duration) <-chan time.Time } var ( @@ -54,6 +55,10 @@ func (RealClock) After(d time.Duration) <-chan time.Time { return time.After(d) } +func (RealClock) Tick(d time.Duration) <-chan time.Time { + return time.Tick(d) +} + func (RealClock) Sleep(d time.Duration) { time.Sleep(d) } @@ -68,8 +73,10 @@ type FakeClock struct { } type fakeClockWaiter struct { - targetTime time.Time - destChan chan<- time.Time + targetTime time.Time + stepInterval time.Duration + skipIfBlocked bool + destChan chan<- time.Time } func NewFakeClock(t time.Time) *FakeClock { @@ -105,7 +112,22 @@ func (f *FakeClock) After(d time.Duration) <-chan time.Time { return ch } -// Move clock by Duration, notify anyone that's called After +func (f *FakeClock) Tick(d time.Duration) <-chan time.Time { + f.lock.Lock() + defer f.lock.Unlock() + tickTime := f.time.Add(d) + ch := make(chan time.Time, 1) // hold one tick + f.waiters = append(f.waiters, fakeClockWaiter{ + targetTime: tickTime, + stepInterval: d, + skipIfBlocked: true, + destChan: ch, + }) + + return ch +} + +// Move clock by Duration, notify anyone that's called After or Tick func (f *FakeClock) Step(d time.Duration) { f.lock.Lock() defer f.lock.Unlock() @@ -126,7 +148,23 @@ func (f *FakeClock) setTimeLocked(t time.Time) { for i := range f.waiters { w := &f.waiters[i] if !w.targetTime.After(t) { - w.destChan <- t + + if w.skipIfBlocked { + select { + case w.destChan <- t: + default: + } + } else { + w.destChan <- t + } + + if w.stepInterval > 0 { + for !w.targetTime.After(t) { + w.targetTime = w.targetTime.Add(w.stepInterval) + } + newWaiters = append(newWaiters, *w) + } + } else { newWaiters = append(newWaiters, f.waiters[i]) } @@ -169,6 +207,12 @@ func (*IntervalClock) After(d time.Duration) <-chan time.Time { panic("IntervalClock doesn't implement After") } +// Unimplemented, will panic. +// TODO: make interval clock use FakeClock so this can be implemented. +func (*IntervalClock) Tick(d time.Duration) <-chan time.Time { + panic("IntervalClock doesn't implement Tick") +} + func (*IntervalClock) Sleep(d time.Duration) { panic("IntervalClock doesn't implement Sleep") } diff --git a/pkg/util/clock_test.go b/pkg/util/clock_test.go index 8205541e8b1..ee60fcb0d23 100644 --- a/pkg/util/clock_test.go +++ b/pkg/util/clock_test.go @@ -104,3 +104,81 @@ func TestFakeAfter(t *testing.T) { t.Errorf("unexpected non-channel read") } } + +func TestFakeTick(t *testing.T) { + tc := NewFakeClock(time.Now()) + if tc.HasWaiters() { + t.Errorf("unexpected waiter?") + } + oneSec := tc.Tick(time.Second) + if !tc.HasWaiters() { + t.Errorf("unexpected lack of waiter?") + } + + oneOhOneSec := tc.Tick(time.Second + time.Millisecond) + twoSec := tc.Tick(2 * time.Second) + select { + case <-oneSec: + t.Errorf("unexpected channel read") + case <-oneOhOneSec: + t.Errorf("unexpected channel read") + case <-twoSec: + t.Errorf("unexpected channel read") + default: + } + + tc.Step(999 * time.Millisecond) // t=.999 + select { + case <-oneSec: + t.Errorf("unexpected channel read") + case <-oneOhOneSec: + t.Errorf("unexpected channel read") + case <-twoSec: + t.Errorf("unexpected channel read") + default: + } + + tc.Step(time.Millisecond) // t=1.000 + select { + case <-oneSec: + // Expected! + case <-oneOhOneSec: + t.Errorf("unexpected channel read") + case <-twoSec: + t.Errorf("unexpected channel read") + default: + t.Errorf("unexpected non-channel read") + } + tc.Step(time.Millisecond) // t=1.001 + select { + case <-oneSec: + // should not double-trigger! + t.Errorf("unexpected channel read") + case <-oneOhOneSec: + // Expected! + case <-twoSec: + t.Errorf("unexpected channel read") + default: + t.Errorf("unexpected non-channel read") + } + + tc.Step(time.Second) // t=2.001 + tc.Step(time.Second) // t=3.001 + tc.Step(time.Second) // t=4.001 + tc.Step(time.Second) // t=5.001 + + // The one second ticker should not accumulate ticks + accumulatedTicks := 0 + drained := false + for !drained { + select { + case <-oneSec: + accumulatedTicks++ + default: + drained = true + } + } + if accumulatedTicks != 1 { + t.Errorf("unexpected number of accumulated ticks: %d", accumulatedTicks) + } +} diff --git a/pkg/util/workqueue/delaying_queue.go b/pkg/util/workqueue/delaying_queue.go index 68098c5b895..429d15c4a1e 100644 --- a/pkg/util/workqueue/delaying_queue.go +++ b/pkg/util/workqueue/delaying_queue.go @@ -41,7 +41,7 @@ func newDelayingQueue(clock util.Clock) DelayingInterface { ret := &delayingType{ Interface: New(), clock: clock, - heartbeat: time.Tick(maxWait), + heartbeat: clock.Tick(maxWait), stopCh: make(chan struct{}), waitingForAddCh: make(chan waitFor, 1000), }