Merge pull request #25331 from hongchaodeng/w

Automatic merge from submit-queue

etcd3/watcher: fix goroutine leak if ctx is canceled

### Problem
In reflector.go, it could probably call Stop() without retrieving all results
from ResultChan(). See [here](https://github.com/kubernetes/kubernetes/blob/master/pkg/client/cache/reflector.go#L369). A potential leak is that when an error has happened, it could block on resultChan,
and then cancelling context in Stop() wouldn't unblock it.

### What's this PR?
This fixes the problem by making it also select ctx.Done and cancel context afterwards if error happened.
This commit is contained in:
k8s-merge-robot 2016-05-09 12:12:15 -07:00
commit 8a81000b71
2 changed files with 32 additions and 3 deletions

View File

@ -110,11 +110,14 @@ func (wc *watchChan) run() {
select {
case err := <-wc.errChan:
errResult := parseError(err)
wc.cancel()
// error result is guaranteed to be received by user before closing ResultChan.
if errResult != nil {
wc.resultChan <- *errResult
// error result is guaranteed to be received by user before closing ResultChan.
select {
case wc.resultChan <- *errResult:
case <-wc.ctx.Done(): // user has given up all results
}
}
wc.cancel()
case <-wc.ctx.Done():
}
// we need to wait until resultChan wouldn't be sent to anymore

View File

@ -18,10 +18,13 @@ package etcd3
import (
"errors"
"fmt"
"reflect"
"testing"
"time"
"sync"
"github.com/coreos/etcd/integration"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/api"
@ -187,6 +190,29 @@ func TestWatchContextCancel(t *testing.T) {
}
}
func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
origCtx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, cancel := context.WithCancel(origCtx)
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, storage.Everything)
// make resutlChan and errChan blocking to ensure ordering.
w.resultChan = make(chan watch.Event)
w.errChan = make(chan error)
// The event flow goes like:
// - first we send an error, it should block on resultChan.
// - Then we cancel ctx. The blocking on resultChan should be freed up
// and run() goroutine should return.
var wg sync.WaitGroup
wg.Add(1)
go func() {
w.run()
wg.Done()
}()
w.errChan <- fmt.Errorf("some error")
cancel()
wg.Wait()
}
type testWatchStruct struct {
obj *api.Pod
expectEvent bool