Graceul termination in Cacher

This commit is contained in:
Wojciech Tyczynski 2015-12-28 10:35:12 +01:00
parent 9bdc4100d5
commit ec70eb16f3
2 changed files with 47 additions and 7 deletions

View File

@ -58,9 +58,6 @@ type CacherConfig struct {
// NewList is a function that creates new empty object storing a list of
// objects of type Type.
NewListFunc func() runtime.Object
// Cacher will be stopped when the StopChannel will be closed.
StopChannel <-chan struct{}
}
// Cacher is responsible for serving WATCH and LIST requests for a given
@ -101,6 +98,12 @@ type Cacher struct {
// keyFunc is used to get a key in the underyling storage for a given object.
keyFunc func(runtime.Object) (string, error)
// Handling graceful termination.
stopLock sync.RWMutex
stopped bool
stopCh chan struct{}
stopWg sync.WaitGroup
}
// Create a new Cacher responsible from service WATCH and LIST requests from its
@ -150,14 +153,31 @@ func NewCacherFromConfig(config CacherConfig) *Cacher {
watchers: make(map[int]*cacheWatcher),
versioner: config.Versioner,
keyFunc: config.KeyFunc,
stopped: false,
// We need to (potentially) stop both:
// - util.Until go-routine
// - reflector.ListAndWatch
// and there are no guarantees on the order that they will stop.
// So we will be simply closing the channel, and synchronizing on the WaitGroup.
stopCh: make(chan struct{}),
stopWg: sync.WaitGroup{},
}
cacher.usable.Lock()
// See startCaching method for why explanation on it.
watchCache.SetOnReplace(func() { cacher.usable.Unlock() })
watchCache.SetOnEvent(cacher.processEvent)
stopCh := config.StopChannel
go util.Until(func() { cacher.startCaching(stopCh) }, 0, stopCh)
stopCh := cacher.stopCh
cacher.stopWg.Add(1)
go func() {
util.Until(
func() {
if !cacher.isStopped() {
cacher.startCaching(stopCh)
}
}, 0, stopCh)
cacher.stopWg.Done()
}()
return cacher
}
@ -178,7 +198,6 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
// need to retry it on errors under lock.
for {
if err := c.reflector.ListAndWatch(stopChannel); err != nil {
// TODO: This can tight loop log.
glog.Errorf("unexpected ListAndWatch error: %v", err)
} else {
break
@ -338,6 +357,20 @@ func (c *Cacher) terminateAllWatchers() {
}
}
func (c *Cacher) isStopped() bool {
c.stopLock.RLock()
defer c.stopLock.RUnlock()
return c.stopped
}
func (c *Cacher) Stop() {
c.stopLock.Lock()
c.stopped = true
c.stopLock.Unlock()
close(c.stopCh)
c.stopWg.Wait()
}
func forgetWatcher(c *Cacher, index int) func(bool) {
return func(lock bool) {
if lock {

View File

@ -56,7 +56,6 @@ func newTestCacher(s storage.Interface) *storage.Cacher {
ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
NewListFunc: func() runtime.Object { return &api.PodList{} },
StopChannel: util.NeverStop,
}
return storage.NewCacherFromConfig(config)
}
@ -91,6 +90,7 @@ func TestList(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage)
defer cacher.Stop()
podFoo := makeTestPod("foo")
podBar := makeTestPod("bar")
@ -167,6 +167,7 @@ func TestWatch(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage)
defer cacher.Stop()
podFoo := makeTestPod("foo")
podBar := makeTestPod("bar")
@ -182,6 +183,7 @@ func TestWatch(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
_ = updatePod(t, etcdStorage, podBar, nil)
@ -200,6 +202,7 @@ func TestWatch(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer initialWatcher.Stop()
verifyWatchEvent(t, initialWatcher, watch.Added, podFoo)
verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime)
@ -209,6 +212,7 @@ func TestWatch(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nowWatcher.Stop()
verifyWatchEvent(t, nowWatcher, watch.Added, podFooPrime)
@ -221,6 +225,7 @@ func TestWatcherTimeout(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage)
defer cacher.Stop()
// Create a watcher that will not be reading any result.
watcher, err := cacher.WatchList(context.TODO(), "pods/ns", "1", storage.Everything)
@ -247,6 +252,7 @@ func TestFiltering(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage)
defer cacher.Stop()
podFoo := makeTestPod("foo")
podFoo.Labels = map[string]string{"filter": "foo"}
@ -279,6 +285,7 @@ func TestFiltering(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()
verifyWatchEvent(t, watcher, watch.Added, podFoo)
verifyWatchEvent(t, watcher, watch.Deleted, podFooFiltered)