diff --git a/pkg/storage/etcd3/watcher.go b/pkg/storage/etcd3/watcher.go index 6d1490c01d6..abff8a50061 100644 --- a/pkg/storage/etcd3/watcher.go +++ b/pkg/storage/etcd3/watcher.go @@ -110,11 +110,14 @@ func (wc *watchChan) run() { select { case err := <-wc.errChan: errResult := parseError(err) - wc.cancel() - // error result is guaranteed to be received by user before closing ResultChan. if errResult != nil { - wc.resultChan <- *errResult + // error result is guaranteed to be received by user before closing ResultChan. + select { + case wc.resultChan <- *errResult: + case <-wc.ctx.Done(): // user has given up all results + } } + wc.cancel() case <-wc.ctx.Done(): } // we need to wait until resultChan wouldn't be sent to anymore diff --git a/pkg/storage/etcd3/watcher_test.go b/pkg/storage/etcd3/watcher_test.go index 66578e56c0e..3288c6047b1 100644 --- a/pkg/storage/etcd3/watcher_test.go +++ b/pkg/storage/etcd3/watcher_test.go @@ -18,10 +18,13 @@ package etcd3 import ( "errors" + "fmt" "reflect" "testing" "time" + "sync" + "github.com/coreos/etcd/integration" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/api" @@ -187,6 +190,29 @@ func TestWatchContextCancel(t *testing.T) { } } +func TestWatchErrResultNotBlockAfterCancel(t *testing.T) { + origCtx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + ctx, cancel := context.WithCancel(origCtx) + w := store.watcher.createWatchChan(ctx, "/abc", 0, false, storage.Everything) + // make resutlChan and errChan blocking to ensure ordering. + w.resultChan = make(chan watch.Event) + w.errChan = make(chan error) + // The event flow goes like: + // - first we send an error, it should block on resultChan. + // - Then we cancel ctx. The blocking on resultChan should be freed up + // and run() goroutine should return. + var wg sync.WaitGroup + wg.Add(1) + go func() { + w.run() + wg.Done() + }() + w.errChan <- fmt.Errorf("some error") + cancel() + wg.Wait() +} + type testWatchStruct struct { obj *api.Pod expectEvent bool