etcd3/watcher: fix goroutine leak if ctx is canceled

In reflector.go, it could probably call Stop() without retrieving all results
from ResultChan().
A potential leak is that when an error has happened, it could block on resultChan,
and then cancelling context in Stop() wouldn't unblock it.
This fixes the problem by making it also select ctx.Done and cancel context
afterwards if error happened.
This commit is contained in:
Hongchao Deng 2016-05-08 23:09:22 -07:00
parent 5dd087040b
commit 97f4647dc3
2 changed files with 32 additions and 3 deletions

View File

@ -110,11 +110,14 @@ func (wc *watchChan) run() {
select {
case err := <-wc.errChan:
errResult := parseError(err)
wc.cancel()
// error result is guaranteed to be received by user before closing ResultChan.
if errResult != nil {
wc.resultChan <- *errResult
// error result is guaranteed to be received by user before closing ResultChan.
select {
case wc.resultChan <- *errResult:
case <-wc.ctx.Done(): // user has given up all results
}
}
wc.cancel()
case <-wc.ctx.Done():
}
// we need to wait until resultChan wouldn't be sent to anymore

View File

@ -18,10 +18,13 @@ package etcd3
import (
"errors"
"fmt"
"reflect"
"testing"
"time"
"sync"
"github.com/coreos/etcd/integration"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/api"
@ -187,6 +190,29 @@ func TestWatchContextCancel(t *testing.T) {
}
}
func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
origCtx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, cancel := context.WithCancel(origCtx)
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, storage.Everything)
// make resutlChan and errChan blocking to ensure ordering.
w.resultChan = make(chan watch.Event)
w.errChan = make(chan error)
// The event flow goes like:
// - first we send an error, it should block on resultChan.
// - Then we cancel ctx. The blocking on resultChan should be freed up
// and run() goroutine should return.
var wg sync.WaitGroup
wg.Add(1)
go func() {
w.run()
wg.Done()
}()
w.errChan <- fmt.Errorf("some error")
cancel()
wg.Wait()
}
type testWatchStruct struct {
obj *api.Pod
expectEvent bool