From 7e787b144a550965dcf4cb30d8e68af15c187507 Mon Sep 17 00:00:00 2001 From: Chao Xu Date: Mon, 12 Dec 2016 17:02:05 -0800 Subject: [PATCH] fix leaking goroutine issues in watch cache --- pkg/storage/BUILD | 1 + pkg/storage/cacher.go | 15 ++++++-- pkg/storage/cacher_whitebox_test.go | 56 +++++++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 3 deletions(-) create mode 100644 pkg/storage/cacher_whitebox_test.go diff --git a/pkg/storage/BUILD b/pkg/storage/BUILD index fc2fa17f804..035e17eb185 100644 --- a/pkg/storage/BUILD +++ b/pkg/storage/BUILD @@ -50,6 +50,7 @@ go_library( go_test( name = "go_default_test", srcs = [ + "cacher_whitebox_test.go", "selection_predicate_test.go", "time_budget_test.go", "util_test.go", diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index 6febfac2b00..a81c71a24a7 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -750,6 +750,7 @@ type cacheWatcher struct { sync.Mutex input chan watchCacheEvent result chan watch.Event + done chan struct{} filter watchFilterFunc stopped bool forget func(bool) @@ -759,6 +760,7 @@ func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []watchCac watcher := &cacheWatcher{ input: make(chan watchCacheEvent, chanSize), result: make(chan watch.Event, chanSize), + done: make(chan struct{}), filter: filter, stopped: false, forget: forget, @@ -783,6 +785,7 @@ func (c *cacheWatcher) stop() { defer c.Unlock() if !c.stopped { c.stopped = true + close(c.done) close(c.input) } } @@ -847,13 +850,19 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { glog.Errorf("unexpected copy error: %v", err) return } + var watchEvent watch.Event switch { case curObjPasses && !oldObjPasses: - c.result <- watch.Event{Type: watch.Added, Object: object} + watchEvent = watch.Event{Type: watch.Added, Object: object} case curObjPasses && oldObjPasses: - c.result <- watch.Event{Type: watch.Modified, Object: object} + watchEvent = watch.Event{Type: watch.Modified, Object: object} case !curObjPasses && oldObjPasses: - c.result <- watch.Event{Type: watch.Deleted, Object: object} + watchEvent = watch.Event{Type: watch.Deleted, Object: object} + } + select { + case c.result <- watchEvent: + // don't block on c.result if c.done is closed + case <-c.done: } } diff --git a/pkg/storage/cacher_whitebox_test.go b/pkg/storage/cacher_whitebox_test.go new file mode 100644 index 00000000000..a40e2dfe882 --- /dev/null +++ b/pkg/storage/cacher_whitebox_test.go @@ -0,0 +1,56 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "sync" + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util/wait" +) + +// verifies the cacheWatcher.process goroutine is properly cleaned up even if +// the writes to cacheWatcher.result channel is blocked. +func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { + var lock sync.RWMutex + count := 0 + filter := func(string, labels.Set, fields.Set) bool { return true } + forget := func(bool) { + lock.Lock() + defer lock.Unlock() + count++ + } + initEvents := []watchCacheEvent{ + {Object: &api.Pod{}}, + {Object: &api.Pod{}}, + } + // set the size of the buffer of w.result to 0, so that the writes to + // w.result is blocked. + w := newCacheWatcher(0, 0, initEvents, filter, forget) + w.Stop() + if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { + lock.RLock() + defer lock.RUnlock() + return count == 2, nil + }); err != nil { + t.Fatalf("expected forget() to be called twice, because sendWatchCacheEvent should not be blocked by the result channel: %v", err) + } +}