stop etcd watcher when watch chan is closed

This commit is contained in:
Hongchao Deng 2016-09-19 13:13:12 -07:00
parent 4b7f0c8388
commit 8e3b40735a

View File

@ -99,7 +99,8 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re
} }
func (wc *watchChan) run() { func (wc *watchChan) run() {
go wc.startWatching() watchClosedCh := make(chan struct{})
go wc.startWatching(watchClosedCh)
var resultChanWG sync.WaitGroup var resultChanWG sync.WaitGroup
resultChanWG.Add(1) resultChanWG.Add(1)
@ -108,7 +109,6 @@ func (wc *watchChan) run() {
select { select {
case err := <-wc.errChan: case err := <-wc.errChan:
if err == context.Canceled { if err == context.Canceled {
wc.cancel() // just in case
break break
} }
errResult := parseError(err) errResult := parseError(err)
@ -119,10 +119,15 @@ func (wc *watchChan) run() {
case <-wc.ctx.Done(): // user has given up all results case <-wc.ctx.Done(): // user has given up all results
} }
} }
wc.cancel() case <-watchClosedCh:
case <-wc.ctx.Done(): case <-wc.ctx.Done(): // user cancel
} }
// we need to wait until resultChan wouldn't be sent to anymore
// We use wc.ctx to reap all goroutines. Under whatever condition, we should stop them all.
// It's fine to double cancel.
wc.cancel()
// we need to wait until resultChan wouldn't be used anymore
resultChanWG.Wait() resultChanWG.Wait()
close(wc.resultChan) close(wc.resultChan)
} }
@ -157,7 +162,7 @@ func (wc *watchChan) sync() error {
// startWatching does: // startWatching does:
// - get current objects if initialRev=0; set initialRev to current rev // - get current objects if initialRev=0; set initialRev to current rev
// - watch on given key and send events to process. // - watch on given key and send events to process.
func (wc *watchChan) startWatching() { func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
if wc.initialRev == 0 { if wc.initialRev == 0 {
if err := wc.sync(); err != nil { if err := wc.sync(); err != nil {
glog.Errorf("failed to sync with latest state: %v", err) glog.Errorf("failed to sync with latest state: %v", err)
@ -182,6 +187,11 @@ func (wc *watchChan) startWatching() {
wc.sendEvent(parseEvent(e)) wc.sendEvent(parseEvent(e))
} }
} }
// When we come to this point, it's only possible that client side ends the watch.
// e.g. cancel the context, close the client.
// If this watch chan is broken and context isn't cancelled, other goroutines will still hang.
// We should notify the main thread that this goroutine has exited.
close(watchClosedCh)
} }
// processEvent processes events from etcd watcher and sends results to resultChan. // processEvent processes events from etcd watcher and sends results to resultChan.