From 4a7d70aef1016c3f1a1ea07398ec56cf1fdce68a Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 1 Feb 2016 10:50:05 -0800 Subject: [PATCH 1/3] extend fake clock --- pkg/client/cache/expiration_cache_test.go | 2 +- pkg/client/record/event_test.go | 6 +- pkg/controller/controller_utils_test.go | 4 +- pkg/kubelet/container/image_puller_test.go | 2 +- .../container/serialized_image_puller_test.go | 2 +- pkg/kubelet/dockertools/manager_test.go | 4 +- pkg/kubelet/image_manager_test.go | 4 +- pkg/kubelet/kubelet_test.go | 2 +- pkg/kubelet/util/queue/work_queue_test.go | 2 +- pkg/master/tunneler_test.go | 18 ++-- pkg/util/backoff_test.go | 12 +-- pkg/util/clock.go | 97 +++++++++++++++++-- pkg/util/clock_test.go | 62 +++++++++++- plugin/pkg/scheduler/scheduler_test.go | 4 +- 14 files changed, 181 insertions(+), 40 deletions(-) diff --git a/pkg/client/cache/expiration_cache_test.go b/pkg/client/cache/expiration_cache_test.go index 546b98d919f..ff6bd1d4fa7 100644 --- a/pkg/client/cache/expiration_cache_test.go +++ b/pkg/client/cache/expiration_cache_test.go @@ -113,7 +113,7 @@ func TestTTLPolicy(t *testing.T) { exactlyOnTTL := fakeTime.Add(-ttl) expiredTime := fakeTime.Add(-(ttl + 1)) - policy := TTLPolicy{ttl, &util.FakeClock{Time: fakeTime}} + policy := TTLPolicy{ttl, util.NewFakeClock(fakeTime)} fakeTimestampedEntry := ×tampedEntry{obj: struct{}{}, timestamp: exactlyOnTTL} if policy.IsExpired(fakeTimestampedEntry) { t.Errorf("TTL cache should not expire entries exactly on ttl") diff --git a/pkg/client/record/event_test.go b/pkg/client/record/event_test.go index 3c8983b5b02..61597e0d590 100644 --- a/pkg/client/record/event_test.go +++ b/pkg/client/record/event_test.go @@ -348,7 +348,7 @@ func TestEventf(t *testing.T) { eventBroadcaster := NewBroadcaster() sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) - clock := &util.FakeClock{time.Now()} + clock := util.NewFakeClock(time.Now()) recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock) for index, item := range table { clock.Step(1 * time.Second) @@ -559,7 +559,7 @@ func TestEventfNoNamespace(t *testing.T) { eventBroadcaster := NewBroadcaster() sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) - clock := &util.FakeClock{time.Now()} + clock := util.NewFakeClock(time.Now()) recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock) for index, item := range table { @@ -846,7 +846,7 @@ func TestMultiSinkCache(t *testing.T) { } eventBroadcaster := NewBroadcaster() - clock := &util.FakeClock{time.Now()} + clock := util.NewFakeClock(time.Now()) recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock) sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) diff --git a/pkg/controller/controller_utils_test.go b/pkg/controller/controller_utils_test.go index 2f3fc8f43f4..a9d75ec63be 100644 --- a/pkg/controller/controller_utils_test.go +++ b/pkg/controller/controller_utils_test.go @@ -43,7 +43,7 @@ import ( // NewFakeControllerExpectationsLookup creates a fake store for PodExpectations. func NewFakeControllerExpectationsLookup(ttl time.Duration) (*ControllerExpectations, *util.FakeClock) { fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) - fakeClock := &util.FakeClock{Time: fakeTime} + fakeClock := util.NewFakeClock(fakeTime) ttlPolicy := &cache.TTLPolicy{Ttl: ttl, Clock: fakeClock} ttlStore := cache.NewFakeExpirationStore( ExpKeyFunc, nil, ttlPolicy, fakeClock) @@ -177,7 +177,7 @@ func TestControllerExpectations(t *testing.T) { } // Expectations have expired because of ttl - fakeClock.Time = fakeClock.Time.Add(ttl + 1) + fakeClock.Step(ttl + 1) if !e.SatisfiedExpectations(rcKey) { t.Errorf("Expectations should have expired but didn't") } diff --git a/pkg/kubelet/container/image_puller_test.go b/pkg/kubelet/container/image_puller_test.go index df48428fa56..f8d3ae16f3b 100644 --- a/pkg/kubelet/container/image_puller_test.go +++ b/pkg/kubelet/container/image_puller_test.go @@ -98,7 +98,7 @@ func TestPuller(t *testing.T) { } backOff := util.NewBackOff(time.Second, time.Minute) - fakeClock := &util.FakeClock{Time: time.Now()} + fakeClock := util.NewFakeClock(time.Now()) backOff.Clock = fakeClock fakeRuntime := &FakeRuntime{} diff --git a/pkg/kubelet/container/serialized_image_puller_test.go b/pkg/kubelet/container/serialized_image_puller_test.go index 9313dfc09e6..df0f003a2e6 100644 --- a/pkg/kubelet/container/serialized_image_puller_test.go +++ b/pkg/kubelet/container/serialized_image_puller_test.go @@ -98,7 +98,7 @@ func TestSerializedPuller(t *testing.T) { } backOff := util.NewBackOff(time.Second, time.Minute) - fakeClock := &util.FakeClock{Time: time.Now()} + fakeClock := util.NewFakeClock(time.Now()) backOff.Clock = fakeClock fakeRuntime := &FakeRuntime{} diff --git a/pkg/kubelet/dockertools/manager_test.go b/pkg/kubelet/dockertools/manager_test.go index a360e978455..8d6b568599f 100644 --- a/pkg/kubelet/dockertools/manager_test.go +++ b/pkg/kubelet/dockertools/manager_test.go @@ -1160,7 +1160,7 @@ func TestGetAPIPodStatusWithLastTermination(t *testing.T) { } func TestSyncPodBackoff(t *testing.T) { - var fakeClock = &util.FakeClock{Time: time.Now()} + var fakeClock = util.NewFakeClock(time.Now()) startTime := fakeClock.Now() dm, fakeDocker := newTestDockerManager() @@ -1232,7 +1232,7 @@ func TestSyncPodBackoff(t *testing.T) { backOff.Clock = fakeClock for _, c := range tests { fakeDocker.SetFakeContainers(dockerContainers) - fakeClock.Time = startTime.Add(time.Duration(c.tick) * time.Second) + fakeClock.SetTime(startTime.Add(time.Duration(c.tick) * time.Second)) runSyncPod(t, dm, fakeDocker, pod, backOff, c.expectErr) verifyCalls(t, fakeDocker, c.result) diff --git a/pkg/kubelet/image_manager_test.go b/pkg/kubelet/image_manager_test.go index 538277b55e3..b023475607e 100644 --- a/pkg/kubelet/image_manager_test.go +++ b/pkg/kubelet/image_manager_test.go @@ -431,8 +431,8 @@ func TestGarbageCollectImageNotOldEnough(t *testing.T) { }, } - fakeClock := util.FakeClock{Time: time.Now()} - fmt.Println(fakeClock.Now()) + fakeClock := util.NewFakeClock(time.Now()) + t.Log(fakeClock.Now()) require.NoError(t, manager.detectImages(fakeClock.Now())) require.Equal(t, manager.imageRecordsLen(), 2) // no space freed since one image is in used, and another one is not old enough diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 715e4e9489e..eaa825ea931 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -171,7 +171,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { LowThresholdPercent: 80, } kubelet.imageManager, err = newImageManager(fakeRuntime, mockCadvisor, fakeRecorder, fakeNodeRef, fakeImageGCPolicy) - fakeClock := &util.FakeClock{Time: time.Now()} + fakeClock := util.NewFakeClock(time.Now()) kubelet.backOff = util.NewBackOff(time.Second, time.Minute) kubelet.backOff.Clock = fakeClock kubelet.podKillingCh = make(chan *kubecontainer.Pod, 20) diff --git a/pkg/kubelet/util/queue/work_queue_test.go b/pkg/kubelet/util/queue/work_queue_test.go index 6dd452a9112..40ba6d95d88 100644 --- a/pkg/kubelet/util/queue/work_queue_test.go +++ b/pkg/kubelet/util/queue/work_queue_test.go @@ -26,7 +26,7 @@ import ( ) func newTestBasicWorkQueue() (*basicWorkQueue, *util.FakeClock) { - fakeClock := &util.FakeClock{Time: time.Now()} + fakeClock := util.NewFakeClock(time.Now()) wq := &basicWorkQueue{ clock: fakeClock, queue: make(map[types.UID]time.Time), diff --git a/pkg/master/tunneler_test.go b/pkg/master/tunneler_test.go index 24822f2e06e..1326c615714 100644 --- a/pkg/master/tunneler_test.go +++ b/pkg/master/tunneler_test.go @@ -37,32 +37,32 @@ func TestSecondsSinceSync(t *testing.T) { tunneler.lastSync = time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix() // Nano Second. No difference. - tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.January, 1, 1, 1, 1, 2, time.UTC)} + tunneler.clock = util.NewFakeClock(time.Date(2015, time.January, 1, 1, 1, 1, 2, time.UTC)) assert.Equal(int64(0), tunneler.SecondsSinceSync()) // Second - tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.January, 1, 1, 1, 2, 1, time.UTC)} + tunneler.clock = util.NewFakeClock(time.Date(2015, time.January, 1, 1, 1, 2, 1, time.UTC)) assert.Equal(int64(1), tunneler.SecondsSinceSync()) // Minute - tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.January, 1, 1, 2, 1, 1, time.UTC)} + tunneler.clock = util.NewFakeClock(time.Date(2015, time.January, 1, 1, 2, 1, 1, time.UTC)) assert.Equal(int64(60), tunneler.SecondsSinceSync()) // Hour - tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.January, 1, 2, 1, 1, 1, time.UTC)} + tunneler.clock = util.NewFakeClock(time.Date(2015, time.January, 1, 2, 1, 1, 1, time.UTC)) assert.Equal(int64(3600), tunneler.SecondsSinceSync()) // Day - tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.January, 2, 1, 1, 1, 1, time.UTC)} + tunneler.clock = util.NewFakeClock(time.Date(2015, time.January, 2, 1, 1, 1, 1, time.UTC)) assert.Equal(int64(86400), tunneler.SecondsSinceSync()) // Month - tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.February, 1, 1, 1, 1, 1, time.UTC)} + tunneler.clock = util.NewFakeClock(time.Date(2015, time.February, 1, 1, 1, 1, 1, time.UTC)) assert.Equal(int64(2678400), tunneler.SecondsSinceSync()) // Future Month. Should be -Month. tunneler.lastSync = time.Date(2015, time.February, 1, 1, 1, 1, 1, time.UTC).Unix() - tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC)} + tunneler.clock = util.NewFakeClock(time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC)) assert.Equal(int64(-2678400), tunneler.SecondsSinceSync()) } @@ -89,12 +89,12 @@ func TestIsTunnelSyncHealthy(t *testing.T) { // Pass case: 540 second lag tunneler.lastSync = time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix() - tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.January, 1, 1, 9, 1, 1, time.UTC)} + tunneler.clock = util.NewFakeClock(time.Date(2015, time.January, 1, 1, 9, 1, 1, time.UTC)) err := master.IsTunnelSyncHealthy(nil) assert.NoError(err, "IsTunnelSyncHealthy() should not have returned an error.") // Fail case: 720 second lag - tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.January, 1, 1, 12, 1, 1, time.UTC)} + tunneler.clock = util.NewFakeClock(time.Date(2015, time.January, 1, 1, 12, 1, 1, time.UTC)) err = master.IsTunnelSyncHealthy(nil) assert.Error(err, "IsTunnelSyncHealthy() should have returned an error.") } diff --git a/pkg/util/backoff_test.go b/pkg/util/backoff_test.go index 9035c565f45..d5b744cb3f7 100644 --- a/pkg/util/backoff_test.go +++ b/pkg/util/backoff_test.go @@ -32,7 +32,7 @@ func NewFakeBackOff(initial, max time.Duration, tc *FakeClock) *Backoff { func TestSlowBackoff(t *testing.T) { id := "_idSlow" - tc := &FakeClock{Time: time.Now()} + tc := NewFakeClock(time.Now()) step := time.Second maxDuration := 50 * step @@ -58,7 +58,7 @@ func TestSlowBackoff(t *testing.T) { func TestBackoffReset(t *testing.T) { id := "_idReset" - tc := &FakeClock{Time: time.Now()} + tc := NewFakeClock(time.Now()) step := time.Second maxDuration := step * 5 b := NewFakeBackOff(step, maxDuration, tc) @@ -84,7 +84,7 @@ func TestBackoffReset(t *testing.T) { func TestBackoffHightWaterMark(t *testing.T) { id := "_idHiWaterMark" - tc := &FakeClock{Time: time.Now()} + tc := NewFakeClock(time.Now()) step := time.Second maxDuration := 5 * step b := NewFakeBackOff(step, maxDuration, tc) @@ -106,7 +106,7 @@ func TestBackoffHightWaterMark(t *testing.T) { func TestBackoffGC(t *testing.T) { id := "_idGC" - tc := &FakeClock{Time: time.Now()} + tc := NewFakeClock(time.Now()) step := time.Second maxDuration := 5 * step @@ -134,7 +134,7 @@ func TestBackoffGC(t *testing.T) { func TestIsInBackOffSinceUpdate(t *testing.T) { id := "_idIsInBackOffSinceUpdate" - tc := &FakeClock{Time: time.Now()} + tc := NewFakeClock(time.Now()) step := time.Second maxDuration := 10 * step b := NewFakeBackOff(step, maxDuration, tc) @@ -186,7 +186,7 @@ func TestIsInBackOffSinceUpdate(t *testing.T) { } for _, c := range cases { - tc.Time = startTime.Add(c.tick * step) + tc.SetTime(startTime.Add(c.tick * step)) if c.inBackOff != b.IsInBackOffSinceUpdate(id, tc.Now()) { t.Errorf("expected IsInBackOffSinceUpdate %v got %v at tick %s", c.inBackOff, b.IsInBackOffSinceUpdate(id, tc.Now()), c.tick*step) } diff --git a/pkg/util/clock.go b/pkg/util/clock.go index 4d27baa3735..56ea16c6916 100644 --- a/pkg/util/clock.go +++ b/pkg/util/clock.go @@ -17,6 +17,7 @@ limitations under the License. package util import ( + "sync" "time" ) @@ -25,39 +26,115 @@ import ( type Clock interface { Now() time.Time Since(time.Time) time.Duration + After(d time.Duration) <-chan time.Time } +var ( + _ = Clock(RealClock{}) + _ = Clock(&FakeClock{}) + _ = Clock(&IntervalClock{}) +) + // RealClock really calls time.Now() type RealClock struct{} // Now returns the current time. -func (r RealClock) Now() time.Time { +func (RealClock) Now() time.Time { return time.Now() } // Since returns time since the specified timestamp. -func (r RealClock) Since(ts time.Time) time.Duration { +func (RealClock) Since(ts time.Time) time.Duration { return time.Since(ts) } +// Same as time.After(d). +func (RealClock) After(d time.Duration) <-chan time.Time { + return time.After(d) +} + // FakeClock implements Clock, but returns an arbitrary time. type FakeClock struct { - Time time.Time + lock sync.RWMutex + time time.Time + + // waiters are waiting for the fake time to pass their specified time + waiters []fakeClockWaiter +} + +type fakeClockWaiter struct { + targetTime time.Time + destChan chan<- time.Time +} + +func NewFakeClock(t time.Time) *FakeClock { + return &FakeClock{ + time: t, + } } // Now returns f's time. func (f *FakeClock) Now() time.Time { - return f.Time + f.lock.RLock() + defer f.lock.RUnlock() + return f.time } // Since returns time since the time in f. func (f *FakeClock) Since(ts time.Time) time.Duration { - return f.Time.Sub(ts) + f.lock.RLock() + defer f.lock.RUnlock() + return f.time.Sub(ts) } -// Move clock by Duration +// Fake version of time.After(d). +func (f *FakeClock) After(d time.Duration) <-chan time.Time { + f.lock.Lock() + defer f.lock.Unlock() + stopTime := f.time.Add(d) + ch := make(chan time.Time, 1) // Don't block! + f.waiters = append(f.waiters, fakeClockWaiter{ + targetTime: stopTime, + destChan: ch, + }) + return ch +} + +// Move clock by Duration, notify anyone that's called After func (f *FakeClock) Step(d time.Duration) { - f.Time = f.Time.Add(d) + f.lock.Lock() + defer f.lock.Unlock() + f.setTimeLocked(f.time.Add(d)) +} + +// Sets the time. +func (f *FakeClock) SetTime(t time.Time) { + f.lock.Lock() + defer f.lock.Unlock() + f.setTimeLocked(t) +} + +// Actually changes the time and checks any waiters. f must be write-locked. +func (f *FakeClock) setTimeLocked(t time.Time) { + f.time = t + newWaiters := make([]fakeClockWaiter, 0, len(f.waiters)) + for i := range f.waiters { + w := &f.waiters[i] + if !w.targetTime.After(t) { + w.destChan <- t + } else { + newWaiters = append(newWaiters, f.waiters[i]) + } + } + f.waiters = newWaiters +} + +// Returns true if After has been called on f but not yet satisfied (so you can +// write race-free tests). +func (f *FakeClock) HasWaiters() bool { + f.lock.RLock() + defer f.lock.RUnlock() + return len(f.waiters) > 0 } // IntervalClock implements Clock, but each invocation of Now steps the clock forward the specified duration @@ -76,3 +153,9 @@ func (i *IntervalClock) Now() time.Time { func (i *IntervalClock) Since(ts time.Time) time.Duration { return i.Time.Sub(ts) } + +// Unimplemented, will panic. +// TODO: make interval clock use FakeClock so this can be implemented. +func (*IntervalClock) After(d time.Duration) <-chan time.Time { + panic("IntervalClock doesn't implement After") +} diff --git a/pkg/util/clock_test.go b/pkg/util/clock_test.go index d3523ffda15..db0cce40a11 100644 --- a/pkg/util/clock_test.go +++ b/pkg/util/clock_test.go @@ -23,7 +23,7 @@ import ( func TestFakeClock(t *testing.T) { startTime := time.Now() - tc := &FakeClock{Time: startTime} + tc := NewFakeClock(startTime) tc.Step(time.Second) now := tc.Now() if now.Sub(startTime) != time.Second { @@ -31,8 +31,66 @@ func TestFakeClock(t *testing.T) { } tt := tc.Now() - tc.Time = tt.Add(time.Hour) + tc.SetTime(tt.Add(time.Hour)) if tc.Now().Sub(tt) != time.Hour { t.Errorf("input: %s now=%s gap=%s expected=%s", tt, tc.Now(), tc.Now().Sub(tt), time.Hour) } } + +func TestFakeAfter(t *testing.T) { + tc := NewFakeClock(time.Now()) + if tc.HasWaiters() { + t.Errorf("unexpected waiter?") + } + oneSec := tc.After(time.Second) + if !tc.HasWaiters() { + t.Errorf("unexpected lack of waiter?") + } + + oneOhOneSec := tc.After(time.Second + time.Millisecond) + twoSec := tc.After(2 * time.Second) + select { + case <-oneSec: + t.Errorf("unexpected channel read") + case <-oneOhOneSec: + t.Errorf("unexpected channel read") + case <-twoSec: + t.Errorf("unexpected channel read") + default: + } + + tc.Step(999 * time.Millisecond) + select { + case <-oneSec: + t.Errorf("unexpected channel read") + case <-oneOhOneSec: + t.Errorf("unexpected channel read") + case <-twoSec: + t.Errorf("unexpected channel read") + default: + } + + tc.Step(time.Millisecond) + select { + case <-oneSec: + // Expected! + case <-oneOhOneSec: + t.Errorf("unexpected channel read") + case <-twoSec: + t.Errorf("unexpected channel read") + default: + t.Errorf("unexpected non-channel read") + } + tc.Step(time.Millisecond) + select { + case <-oneSec: + // should not double-trigger! + t.Errorf("unexpected channel read") + case <-oneOhOneSec: + // Expected! + case <-twoSec: + t.Errorf("unexpected channel read") + default: + t.Errorf("unexpected non-channel read") + } +} diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index fe568cedc0d..9ce9a6ab52d 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -176,7 +176,7 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) { // all entries inserted with fakeTime will expire. ttl := 30 * time.Second fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) - fakeClock := &util.FakeClock{Time: fakeTime} + fakeClock := util.NewFakeClock(fakeTime) ttlPolicy := &cache.TTLPolicy{Ttl: ttl, Clock: fakeClock} assumedPodsStore := cache.NewFakeExpirationStore( cache.MetaNamespaceKeyFunc, nil, ttlPolicy, fakeClock) @@ -274,7 +274,7 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) { // Second scheduling pass will fail to schedule if the store hasn't expired // the deleted pod. This would normally happen with a timeout. //expirationPolicy.NeverExpire = util.NewStringSet() - fakeClock.Time = fakeClock.Time.Add(ttl + 1) + fakeClock.Step(ttl + 1) called = make(chan struct{}) events = eventBroadcaster.StartEventWatcher(func(e *api.Event) { From 26683fda29e6f342cd599484b69ccc03dd3c30de Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 1 Feb 2016 10:50:22 -0800 Subject: [PATCH 2/3] add timeout to cacher --- pkg/storage/cacher.go | 5 ++- pkg/storage/watch_cache.go | 32 +++++++++++++++++-- pkg/storage/watch_cache_test.go | 55 ++++++++++++++++++++++++++++----- 3 files changed, 82 insertions(+), 10 deletions(-) diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index 635a1b5dfa7..9469e46de26 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -313,7 +313,10 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, f } filterFunc := filterFunction(key, c.keyFunc, filter) - objs, readResourceVersion := c.watchCache.WaitUntilFreshAndList(listRV) + objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV) + if err != nil { + return fmt.Errorf("failed to wait for fresh list: %v", err) + } for _, obj := range objs { object, ok := obj.(runtime.Object) if !ok { diff --git a/pkg/storage/watch_cache.go b/pkg/storage/watch_cache.go index 24d7ed2d1bb..eeb451f003b 100644 --- a/pkg/storage/watch_cache.go +++ b/pkg/storage/watch_cache.go @@ -21,14 +21,22 @@ import ( "sort" "strconv" "sync" + "time" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/watch" ) +const ( + // MaximumListWait determines how long we're willing to wait for a + // list if a client specified a resource version in the future. + MaximumListWait = 60 * time.Second +) + // watchCacheEvent is a single "watch event" that is send to users of // watchCache. Additionally to a typical "watch.Event" it contains // the previous value of the object to enable proper filtering in the @@ -85,6 +93,9 @@ type watchCache struct { // This handler is run at the end of every Add/Update/Delete method // and additionally gets the previous value of the object. onEvent func(watchCacheEvent) + + // for testing timeouts. + clock util.Clock } func newWatchCache(capacity int) *watchCache { @@ -95,6 +106,7 @@ func newWatchCache(capacity int) *watchCache { endIndex: 0, store: cache.NewStore(cache.MetaNamespaceKeyFunc), resourceVersion: 0, + clock: util.RealClock{}, } wc.cond = sync.NewCond(wc.RLocker()) return wc @@ -193,13 +205,29 @@ func (w *watchCache) List() []interface{} { return w.store.List() } -func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{}, uint64) { +func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{}, uint64, error) { + startTime := w.clock.Now() + go func() { + // Wake us up when the time limit has expired. The docs + // promise that time.After (well, NewTimer, which it calls) + // will wait *at least* the duration given. Since this go + // routine starts sometime after we record the start time, and + // it will wake up the loop below sometime after the broadcast, + // we don't need to worry about waking it up before the time + // has expired accidentally. + <-w.clock.After(MaximumListWait) + w.cond.Broadcast() + }() + w.RLock() for w.resourceVersion < resourceVersion { + if w.clock.Since(startTime) >= MaximumListWait { + return nil, 0, fmt.Errorf("time limit exceeded while waiting for resource version %v (current value: %v)", resourceVersion, w.resourceVersion) + } w.cond.Wait() } defer w.RUnlock() - return w.store.List(), w.resourceVersion + return w.store.List(), w.resourceVersion, nil } func (w *watchCache) ListKeys() []string { diff --git a/pkg/storage/watch_cache_test.go b/pkg/storage/watch_cache_test.go index a96fd1a1ce9..3251be96731 100644 --- a/pkg/storage/watch_cache_test.go +++ b/pkg/storage/watch_cache_test.go @@ -19,6 +19,7 @@ package storage import ( "strconv" "testing" + "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" @@ -40,8 +41,15 @@ func makeTestPod(name string, resourceVersion uint64) *api.Pod { } } +// newTestWatchCache just adds a fake clock. +func newTestWatchCache(capacity int) *watchCache { + wc := newWatchCache(capacity) + wc.clock = util.NewFakeClock(time.Now()) + return wc +} + func TestWatchCacheBasic(t *testing.T) { - store := newWatchCache(2) + store := newTestWatchCache(2) // Test Add/Update/Delete. pod1 := makeTestPod("pod", 1) @@ -111,7 +119,7 @@ func TestWatchCacheBasic(t *testing.T) { } func TestEvents(t *testing.T) { - store := newWatchCache(5) + store := newTestWatchCache(5) store.Add(makeTestPod("pod", 2)) @@ -231,7 +239,7 @@ func TestEvents(t *testing.T) { } func TestWaitUntilFreshAndList(t *testing.T) { - store := newWatchCache(3) + store := newTestWatchCache(3) // In background, update the store. go func() { @@ -239,7 +247,10 @@ func TestWaitUntilFreshAndList(t *testing.T) { store.Add(makeTestPod("bar", 5)) }() - list, resourceVersion := store.WaitUntilFreshAndList(4) + list, resourceVersion, err := store.WaitUntilFreshAndList(5) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } if resourceVersion != 5 { t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion) } @@ -248,6 +259,30 @@ func TestWaitUntilFreshAndList(t *testing.T) { } } +func TestWaitUntilFreshAndListTimeout(t *testing.T) { + store := newTestWatchCache(3) + fc := store.clock.(*util.FakeClock) + + // In background, step clock after the below call starts the timer. + go func() { + for !fc.HasWaiters() { + time.Sleep(time.Millisecond) + } + fc.Step(MaximumListWait) + + // Add an object to make sure the test would + // eventually fail instead of just waiting + // forever. + time.Sleep(30 * time.Second) + store.Add(makeTestPod("bar", 5)) + }() + + _, _, err := store.WaitUntilFreshAndList(5) + if err == nil { + t.Fatalf("unexpected lack of timeout error") + } +} + type testLW struct { ListFunc func(options api.ListOptions) (runtime.Object, error) WatchFunc func(options api.ListOptions) (watch.Interface, error) @@ -261,10 +296,13 @@ func (t *testLW) Watch(options api.ListOptions) (watch.Interface, error) { } func TestReflectorForWatchCache(t *testing.T) { - store := newWatchCache(5) + store := newTestWatchCache(5) { - _, version := store.WaitUntilFreshAndList(0) + _, version, err := store.WaitUntilFreshAndList(0) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } if version != 0 { t.Errorf("unexpected resource version: %d", version) } @@ -284,7 +322,10 @@ func TestReflectorForWatchCache(t *testing.T) { r.ListAndWatch(util.NeverStop) { - _, version := store.WaitUntilFreshAndList(10) + _, version, err := store.WaitUntilFreshAndList(10) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } if version != 10 { t.Errorf("unexpected resource version: %d", version) } From 5d713bb6aab4a3cf75e03f4cfa67b8398db34004 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 1 Feb 2016 10:50:44 -0800 Subject: [PATCH 3/3] don't cross-pollinate RVs --- pkg/registry/service/ipallocator/controller/repair.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/registry/service/ipallocator/controller/repair.go b/pkg/registry/service/ipallocator/controller/repair.go index 2cf212e4c14..e811379c525 100644 --- a/pkg/registry/service/ipallocator/controller/repair.go +++ b/pkg/registry/service/ipallocator/controller/repair.go @@ -99,8 +99,12 @@ func (c *Repair) runOnce() error { } ctx := api.WithNamespace(api.NewDefaultContext(), api.NamespaceAll) - options := &api.ListOptions{ResourceVersion: latest.ObjectMeta.ResourceVersion} - list, err := c.registry.ListServices(ctx, options) + // We explicitly send no resource version, since the resource version + // of 'latest' is from a different collection, it's not comparable to + // the service collection. The caching layer keeps per-collection RVs, + // and this is proper, since in theory the collections could be hosted + // in separate etcd (or even non-etcd) instances. + list, err := c.registry.ListServices(ctx, nil) if err != nil { return fmt.Errorf("unable to refresh the service IP block: %v", err) }