From 97f4647dc3d8cf46c2b66b89a31c758a6edfb57c Mon Sep 17 00:00:00 2001 From: Hongchao Deng Date: Sun, 8 May 2016 23:09:22 -0700 Subject: [PATCH] etcd3/watcher: fix goroutine leak if ctx is canceled In reflector.go, it could probably call Stop() without retrieving all results from ResultChan(). A potential leak is that when an error has happened, it could block on resultChan, and then cancelling context in Stop() wouldn't unblock it. This fixes the problem by making it also select ctx.Done and cancel context afterwards if error happened. --- pkg/storage/etcd3/watcher.go | 9 ++++++--- pkg/storage/etcd3/watcher_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/pkg/storage/etcd3/watcher.go b/pkg/storage/etcd3/watcher.go index 6d1490c01d6..abff8a50061 100644 --- a/pkg/storage/etcd3/watcher.go +++ b/pkg/storage/etcd3/watcher.go @@ -110,11 +110,14 @@ func (wc *watchChan) run() { select { case err := <-wc.errChan: errResult := parseError(err) - wc.cancel() - // error result is guaranteed to be received by user before closing ResultChan. if errResult != nil { - wc.resultChan <- *errResult + // error result is guaranteed to be received by user before closing ResultChan. + select { + case wc.resultChan <- *errResult: + case <-wc.ctx.Done(): // user has given up all results + } } + wc.cancel() case <-wc.ctx.Done(): } // we need to wait until resultChan wouldn't be sent to anymore diff --git a/pkg/storage/etcd3/watcher_test.go b/pkg/storage/etcd3/watcher_test.go index 66578e56c0e..3288c6047b1 100644 --- a/pkg/storage/etcd3/watcher_test.go +++ b/pkg/storage/etcd3/watcher_test.go @@ -18,10 +18,13 @@ package etcd3 import ( "errors" + "fmt" "reflect" "testing" "time" + "sync" + "github.com/coreos/etcd/integration" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/api" @@ -187,6 +190,29 @@ func TestWatchContextCancel(t *testing.T) { } } +func TestWatchErrResultNotBlockAfterCancel(t *testing.T) { + origCtx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + ctx, cancel := context.WithCancel(origCtx) + w := store.watcher.createWatchChan(ctx, "/abc", 0, false, storage.Everything) + // make resutlChan and errChan blocking to ensure ordering. + w.resultChan = make(chan watch.Event) + w.errChan = make(chan error) + // The event flow goes like: + // - first we send an error, it should block on resultChan. + // - Then we cancel ctx. The blocking on resultChan should be freed up + // and run() goroutine should return. + var wg sync.WaitGroup + wg.Add(1) + go func() { + w.run() + wg.Done() + }() + w.errChan <- fmt.Errorf("some error") + cancel() + wg.Wait() +} + type testWatchStruct struct { obj *api.Pod expectEvent bool