diff --git a/pkg/storage/etcd3/watcher.go b/pkg/storage/etcd3/watcher.go index 544787925b3..a9ed3571f95 100644 --- a/pkg/storage/etcd3/watcher.go +++ b/pkg/storage/etcd3/watcher.go @@ -99,7 +99,8 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re } func (wc *watchChan) run() { - go wc.startWatching() + watchClosedCh := make(chan struct{}) + go wc.startWatching(watchClosedCh) var resultChanWG sync.WaitGroup resultChanWG.Add(1) @@ -108,7 +109,6 @@ func (wc *watchChan) run() { select { case err := <-wc.errChan: if err == context.Canceled { - wc.cancel() // just in case break } errResult := parseError(err) @@ -119,10 +119,15 @@ func (wc *watchChan) run() { case <-wc.ctx.Done(): // user has given up all results } } - wc.cancel() - case <-wc.ctx.Done(): + case <-watchClosedCh: + case <-wc.ctx.Done(): // user cancel } - // we need to wait until resultChan wouldn't be sent to anymore + + // We use wc.ctx to reap all goroutines. Under whatever condition, we should stop them all. + // It's fine to double cancel. + wc.cancel() + + // we need to wait until resultChan wouldn't be used anymore resultChanWG.Wait() close(wc.resultChan) } @@ -157,7 +162,7 @@ func (wc *watchChan) sync() error { // startWatching does: // - get current objects if initialRev=0; set initialRev to current rev // - watch on given key and send events to process. -func (wc *watchChan) startWatching() { +func (wc *watchChan) startWatching(watchClosedCh chan struct{}) { if wc.initialRev == 0 { if err := wc.sync(); err != nil { glog.Errorf("failed to sync with latest state: %v", err) @@ -182,6 +187,11 @@ func (wc *watchChan) startWatching() { wc.sendEvent(parseEvent(e)) } } + // When we come to this point, it's only possible that client side ends the watch. + // e.g. cancel the context, close the client. + // If this watch chan is broken and context isn't cancelled, other goroutines will still hang. + // We should notify the main thread that this goroutine has exited. + close(watchClosedCh) } // processEvent processes events from etcd watcher and sends results to resultChan.