diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index 635a1b5dfa7..9469e46de26 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -313,7 +313,10 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, f } filterFunc := filterFunction(key, c.keyFunc, filter) - objs, readResourceVersion := c.watchCache.WaitUntilFreshAndList(listRV) + objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV) + if err != nil { + return fmt.Errorf("failed to wait for fresh list: %v", err) + } for _, obj := range objs { object, ok := obj.(runtime.Object) if !ok { diff --git a/pkg/storage/watch_cache.go b/pkg/storage/watch_cache.go index 24d7ed2d1bb..eeb451f003b 100644 --- a/pkg/storage/watch_cache.go +++ b/pkg/storage/watch_cache.go @@ -21,14 +21,22 @@ import ( "sort" "strconv" "sync" + "time" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/watch" ) +const ( + // MaximumListWait determines how long we're willing to wait for a + // list if a client specified a resource version in the future. + MaximumListWait = 60 * time.Second +) + // watchCacheEvent is a single "watch event" that is send to users of // watchCache. Additionally to a typical "watch.Event" it contains // the previous value of the object to enable proper filtering in the @@ -85,6 +93,9 @@ type watchCache struct { // This handler is run at the end of every Add/Update/Delete method // and additionally gets the previous value of the object. onEvent func(watchCacheEvent) + + // for testing timeouts. + clock util.Clock } func newWatchCache(capacity int) *watchCache { @@ -95,6 +106,7 @@ func newWatchCache(capacity int) *watchCache { endIndex: 0, store: cache.NewStore(cache.MetaNamespaceKeyFunc), resourceVersion: 0, + clock: util.RealClock{}, } wc.cond = sync.NewCond(wc.RLocker()) return wc @@ -193,13 +205,29 @@ func (w *watchCache) List() []interface{} { return w.store.List() } -func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{}, uint64) { +func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{}, uint64, error) { + startTime := w.clock.Now() + go func() { + // Wake us up when the time limit has expired. The docs + // promise that time.After (well, NewTimer, which it calls) + // will wait *at least* the duration given. Since this go + // routine starts sometime after we record the start time, and + // it will wake up the loop below sometime after the broadcast, + // we don't need to worry about waking it up before the time + // has expired accidentally. + <-w.clock.After(MaximumListWait) + w.cond.Broadcast() + }() + w.RLock() for w.resourceVersion < resourceVersion { + if w.clock.Since(startTime) >= MaximumListWait { + return nil, 0, fmt.Errorf("time limit exceeded while waiting for resource version %v (current value: %v)", resourceVersion, w.resourceVersion) + } w.cond.Wait() } defer w.RUnlock() - return w.store.List(), w.resourceVersion + return w.store.List(), w.resourceVersion, nil } func (w *watchCache) ListKeys() []string { diff --git a/pkg/storage/watch_cache_test.go b/pkg/storage/watch_cache_test.go index a96fd1a1ce9..3251be96731 100644 --- a/pkg/storage/watch_cache_test.go +++ b/pkg/storage/watch_cache_test.go @@ -19,6 +19,7 @@ package storage import ( "strconv" "testing" + "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" @@ -40,8 +41,15 @@ func makeTestPod(name string, resourceVersion uint64) *api.Pod { } } +// newTestWatchCache just adds a fake clock. +func newTestWatchCache(capacity int) *watchCache { + wc := newWatchCache(capacity) + wc.clock = util.NewFakeClock(time.Now()) + return wc +} + func TestWatchCacheBasic(t *testing.T) { - store := newWatchCache(2) + store := newTestWatchCache(2) // Test Add/Update/Delete. pod1 := makeTestPod("pod", 1) @@ -111,7 +119,7 @@ func TestWatchCacheBasic(t *testing.T) { } func TestEvents(t *testing.T) { - store := newWatchCache(5) + store := newTestWatchCache(5) store.Add(makeTestPod("pod", 2)) @@ -231,7 +239,7 @@ func TestEvents(t *testing.T) { } func TestWaitUntilFreshAndList(t *testing.T) { - store := newWatchCache(3) + store := newTestWatchCache(3) // In background, update the store. go func() { @@ -239,7 +247,10 @@ func TestWaitUntilFreshAndList(t *testing.T) { store.Add(makeTestPod("bar", 5)) }() - list, resourceVersion := store.WaitUntilFreshAndList(4) + list, resourceVersion, err := store.WaitUntilFreshAndList(5) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } if resourceVersion != 5 { t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion) } @@ -248,6 +259,30 @@ func TestWaitUntilFreshAndList(t *testing.T) { } } +func TestWaitUntilFreshAndListTimeout(t *testing.T) { + store := newTestWatchCache(3) + fc := store.clock.(*util.FakeClock) + + // In background, step clock after the below call starts the timer. + go func() { + for !fc.HasWaiters() { + time.Sleep(time.Millisecond) + } + fc.Step(MaximumListWait) + + // Add an object to make sure the test would + // eventually fail instead of just waiting + // forever. + time.Sleep(30 * time.Second) + store.Add(makeTestPod("bar", 5)) + }() + + _, _, err := store.WaitUntilFreshAndList(5) + if err == nil { + t.Fatalf("unexpected lack of timeout error") + } +} + type testLW struct { ListFunc func(options api.ListOptions) (runtime.Object, error) WatchFunc func(options api.ListOptions) (watch.Interface, error) @@ -261,10 +296,13 @@ func (t *testLW) Watch(options api.ListOptions) (watch.Interface, error) { } func TestReflectorForWatchCache(t *testing.T) { - store := newWatchCache(5) + store := newTestWatchCache(5) { - _, version := store.WaitUntilFreshAndList(0) + _, version, err := store.WaitUntilFreshAndList(0) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } if version != 0 { t.Errorf("unexpected resource version: %d", version) } @@ -284,7 +322,10 @@ func TestReflectorForWatchCache(t *testing.T) { r.ListAndWatch(util.NeverStop) { - _, version := store.WaitUntilFreshAndList(10) + _, version, err := store.WaitUntilFreshAndList(10) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } if version != 10 { t.Errorf("unexpected resource version: %d", version) }