Merge pull request #24638 from hongchaodeng/w

Automatic merge from submit-queue

etcd3/watcher: cancelling context shouldn't return error

Fixes #24528
This commit is contained in:
k8s-merge-robot 2016-04-22 01:50:18 -07:00
commit 10e697d1f9
2 changed files with 34 additions and 2 deletions

View File

@ -31,6 +31,8 @@ import (
etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/golang/glog"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
const (
@ -78,6 +80,12 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bo
if recursive && !strings.HasSuffix(key, "/") {
key += "/"
}
wc := w.createWatchChan(ctx, key, rev, recursive, filter)
go wc.run()
return wc, nil
}
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, filter storage.FilterFunc) *watchChan {
wc := &watchChan{
watcher: w,
key: key,
@ -89,8 +97,7 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bo
errChan: make(chan error, 1),
}
wc.ctx, wc.cancel = context.WithCancel(ctx)
go wc.run()
return wc, nil
return wc
}
func (wc *watchChan) run() {
@ -276,6 +283,12 @@ func parseError(err error) *watch.Event {
}
func (wc *watchChan) sendError(err error) {
// Context.canceled is an expected behavior.
// We should just stop all goroutines in watchChan without returning error.
// TODO: etcd client should return context.Canceled instead of grpc specific error.
if grpc.Code(err) == codes.Canceled || err == context.Canceled {
return
}
select {
case wc.errChan <- err:
case <-wc.ctx.Done():

View File

@ -168,6 +168,25 @@ func TestWatchError(t *testing.T) {
testCheckResult(t, 0, watch.Error, w, nil)
}
func TestWatchContextCancel(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
canceledCtx, cancel := context.WithCancel(ctx)
cancel()
w := store.watcher.createWatchChan(canceledCtx, "/abc", 0, false, storage.Everything)
// When we do a client.Get with a canceled context, it will return error.
// Nonetheless, when we try to send it over internal errChan, we should detect
// it's context canceled and not send it.
err := w.sync()
w.ctx = ctx
w.sendError(err)
select {
case err := <-w.errChan:
t.Errorf("cancelling context shouldn't return any error. Err: %v", err)
default:
}
}
type testWatchStruct struct {
obj *api.Pod
expectEvent bool