From ec70eb16f330097740edc6fa31353689d5616db0 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Mon, 28 Dec 2015 10:35:12 +0100 Subject: [PATCH] Graceul termination in Cacher --- pkg/storage/cacher.go | 45 +++++++++++++++++++++++++++++++++----- pkg/storage/cacher_test.go | 9 +++++++- 2 files changed, 47 insertions(+), 7 deletions(-) diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index cc105044179..be9e14cddba 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -58,9 +58,6 @@ type CacherConfig struct { // NewList is a function that creates new empty object storing a list of // objects of type Type. NewListFunc func() runtime.Object - - // Cacher will be stopped when the StopChannel will be closed. - StopChannel <-chan struct{} } // Cacher is responsible for serving WATCH and LIST requests for a given @@ -101,6 +98,12 @@ type Cacher struct { // keyFunc is used to get a key in the underyling storage for a given object. keyFunc func(runtime.Object) (string, error) + + // Handling graceful termination. + stopLock sync.RWMutex + stopped bool + stopCh chan struct{} + stopWg sync.WaitGroup } // Create a new Cacher responsible from service WATCH and LIST requests from its @@ -150,14 +153,31 @@ func NewCacherFromConfig(config CacherConfig) *Cacher { watchers: make(map[int]*cacheWatcher), versioner: config.Versioner, keyFunc: config.KeyFunc, + stopped: false, + // We need to (potentially) stop both: + // - util.Until go-routine + // - reflector.ListAndWatch + // and there are no guarantees on the order that they will stop. + // So we will be simply closing the channel, and synchronizing on the WaitGroup. + stopCh: make(chan struct{}), + stopWg: sync.WaitGroup{}, } cacher.usable.Lock() // See startCaching method for why explanation on it. watchCache.SetOnReplace(func() { cacher.usable.Unlock() }) watchCache.SetOnEvent(cacher.processEvent) - stopCh := config.StopChannel - go util.Until(func() { cacher.startCaching(stopCh) }, 0, stopCh) + stopCh := cacher.stopCh + cacher.stopWg.Add(1) + go func() { + util.Until( + func() { + if !cacher.isStopped() { + cacher.startCaching(stopCh) + } + }, 0, stopCh) + cacher.stopWg.Done() + }() return cacher } @@ -178,7 +198,6 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) { // need to retry it on errors under lock. for { if err := c.reflector.ListAndWatch(stopChannel); err != nil { - // TODO: This can tight loop log. glog.Errorf("unexpected ListAndWatch error: %v", err) } else { break @@ -338,6 +357,20 @@ func (c *Cacher) terminateAllWatchers() { } } +func (c *Cacher) isStopped() bool { + c.stopLock.RLock() + defer c.stopLock.RUnlock() + return c.stopped +} + +func (c *Cacher) Stop() { + c.stopLock.Lock() + c.stopped = true + c.stopLock.Unlock() + close(c.stopCh) + c.stopWg.Wait() +} + func forgetWatcher(c *Cacher, index int) func(bool) { return func(lock bool) { if lock { diff --git a/pkg/storage/cacher_test.go b/pkg/storage/cacher_test.go index 91d40538999..a6b7496b54c 100644 --- a/pkg/storage/cacher_test.go +++ b/pkg/storage/cacher_test.go @@ -56,7 +56,6 @@ func newTestCacher(s storage.Interface) *storage.Cacher { ResourcePrefix: prefix, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, NewListFunc: func() runtime.Object { return &api.PodList{} }, - StopChannel: util.NeverStop, } return storage.NewCacherFromConfig(config) } @@ -91,6 +90,7 @@ func TestList(t *testing.T) { server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix()) defer server.Terminate(t) cacher := newTestCacher(etcdStorage) + defer cacher.Stop() podFoo := makeTestPod("foo") podBar := makeTestPod("bar") @@ -167,6 +167,7 @@ func TestWatch(t *testing.T) { server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix()) defer server.Terminate(t) cacher := newTestCacher(etcdStorage) + defer cacher.Stop() podFoo := makeTestPod("foo") podBar := makeTestPod("bar") @@ -182,6 +183,7 @@ func TestWatch(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer watcher.Stop() fooCreated := updatePod(t, etcdStorage, podFoo, nil) _ = updatePod(t, etcdStorage, podBar, nil) @@ -200,6 +202,7 @@ func TestWatch(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer initialWatcher.Stop() verifyWatchEvent(t, initialWatcher, watch.Added, podFoo) verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime) @@ -209,6 +212,7 @@ func TestWatch(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer nowWatcher.Stop() verifyWatchEvent(t, nowWatcher, watch.Added, podFooPrime) @@ -221,6 +225,7 @@ func TestWatcherTimeout(t *testing.T) { server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix()) defer server.Terminate(t) cacher := newTestCacher(etcdStorage) + defer cacher.Stop() // Create a watcher that will not be reading any result. watcher, err := cacher.WatchList(context.TODO(), "pods/ns", "1", storage.Everything) @@ -247,6 +252,7 @@ func TestFiltering(t *testing.T) { server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix()) defer server.Terminate(t) cacher := newTestCacher(etcdStorage) + defer cacher.Stop() podFoo := makeTestPod("foo") podFoo.Labels = map[string]string{"filter": "foo"} @@ -279,6 +285,7 @@ func TestFiltering(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer watcher.Stop() verifyWatchEvent(t, watcher, watch.Added, podFoo) verifyWatchEvent(t, watcher, watch.Deleted, podFooFiltered)