diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index b2ab986980d..e3bad4b11de 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -22,6 +22,7 @@ import ( "strconv" "strings" "sync" + "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" @@ -352,10 +353,12 @@ func (c *Cacher) terminateAllWatchers() { } } -func forgetWatcher(c *Cacher, index int) func() { - return func() { - c.Lock() - defer c.Unlock() +func forgetWatcher(c *Cacher, index int) func(bool) { + return func(lock bool) { + if lock { + c.Lock() + defer c.Unlock() + } // It's possible that the watcher is already not in the map (e.g. in case of // simulaneous Stop() and terminateAllWatchers(), but it doesn't break anything. delete(c.watchers, index) @@ -428,10 +431,10 @@ type cacheWatcher struct { result chan watch.Event filter FilterFunc stopped bool - forget func() + forget func(bool) } -func newCacheWatcher(initEvents []watchCacheEvent, filter FilterFunc, forget func()) *cacheWatcher { +func newCacheWatcher(initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher { watcher := &cacheWatcher{ input: make(chan watchCacheEvent, 10), result: make(chan watch.Event, 10), @@ -450,7 +453,7 @@ func (c *cacheWatcher) ResultChan() <-chan watch.Event { // Implements watch.Interface. func (c *cacheWatcher) Stop() { - c.forget() + c.forget(true) c.stop() } @@ -464,7 +467,15 @@ func (c *cacheWatcher) stop() { } func (c *cacheWatcher) add(event watchCacheEvent) { - c.input <- event + select { + case c.input <- event: + case <-time.After(5 * time.Second): + // This means that we couldn't send event to that watcher. + // Since we don't want to blockin on it infinitely, + // we simply terminate it. + c.forget(false) + c.stop() + } } func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) { diff --git a/pkg/storage/cacher_test.go b/pkg/storage/cacher_test.go index 763058b1974..baf16266822 100644 --- a/pkg/storage/cacher_test.go +++ b/pkg/storage/cacher_test.go @@ -223,6 +223,32 @@ func TestWatch(t *testing.T) { verifyWatchEvent(t, nowWatcher, watch.Modified, podFooBis) } +func TestWatcherTimeout(t *testing.T) { + server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix()) + defer server.Terminate(t) + cacher := newTestCacher(etcdStorage) + + // Create a watcher that will not be reading any result. + watcher, err := cacher.WatchList(context.TODO(), "pods/ns", 1, storage.Everything) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer watcher.Stop() + + // Create a second watcher that will be reading result. + readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", 1, storage.Everything) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer readingWatcher.Stop() + + for i := 1; i <= 22; i++ { + pod := makeTestPod(strconv.Itoa(i)) + _ = updatePod(t, etcdStorage, pod, nil) + verifyWatchEvent(t, readingWatcher, watch.Added, pod) + } +} + func TestFiltering(t *testing.T) { server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix()) defer server.Terminate(t)