Merge pull request #33003 from hongchaodeng/etcd_fix2

Automatic merge from submit-queue

stop etcd watcher when watch chan is closed

When startWatching() stops due to the watch chan is closed, the watcher could still keeps running in the background. When this case happen, we should stop the watcher entirely and close ResultChan() too.
This commit is contained in:
Kubernetes Submit Queue 2016-09-20 20:21:24 -07:00 committed by GitHub
commit d323fed024

View File

@ -99,7 +99,8 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re
}
func (wc *watchChan) run() {
go wc.startWatching()
watchClosedCh := make(chan struct{})
go wc.startWatching(watchClosedCh)
var resultChanWG sync.WaitGroup
resultChanWG.Add(1)
@ -108,7 +109,6 @@ func (wc *watchChan) run() {
select {
case err := <-wc.errChan:
if err == context.Canceled {
wc.cancel() // just in case
break
}
errResult := parseError(err)
@ -119,10 +119,15 @@ func (wc *watchChan) run() {
case <-wc.ctx.Done(): // user has given up all results
}
}
wc.cancel()
case <-wc.ctx.Done():
case <-watchClosedCh:
case <-wc.ctx.Done(): // user cancel
}
// we need to wait until resultChan wouldn't be sent to anymore
// We use wc.ctx to reap all goroutines. Under whatever condition, we should stop them all.
// It's fine to double cancel.
wc.cancel()
// we need to wait until resultChan wouldn't be used anymore
resultChanWG.Wait()
close(wc.resultChan)
}
@ -157,7 +162,7 @@ func (wc *watchChan) sync() error {
// startWatching does:
// - get current objects if initialRev=0; set initialRev to current rev
// - watch on given key and send events to process.
func (wc *watchChan) startWatching() {
func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
if wc.initialRev == 0 {
if err := wc.sync(); err != nil {
glog.Errorf("failed to sync with latest state: %v", err)
@ -182,6 +187,11 @@ func (wc *watchChan) startWatching() {
wc.sendEvent(parseEvent(e))
}
}
// When we come to this point, it's only possible that client side ends the watch.
// e.g. cancel the context, close the client.
// If this watch chan is broken and context isn't cancelled, other goroutines will still hang.
// We should notify the main thread that this goroutine has exited.
close(watchClosedCh)
}
// processEvent processes events from etcd watcher and sends results to resultChan.