diff --git a/pkg/storage/etcd3/watcher.go b/pkg/storage/etcd3/watcher.go index 4473602e799..6d1490c01d6 100644 --- a/pkg/storage/etcd3/watcher.go +++ b/pkg/storage/etcd3/watcher.go @@ -31,6 +31,8 @@ import ( etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/golang/glog" "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" ) const ( @@ -78,6 +80,12 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bo if recursive && !strings.HasSuffix(key, "/") { key += "/" } + wc := w.createWatchChan(ctx, key, rev, recursive, filter) + go wc.run() + return wc, nil +} + +func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, filter storage.FilterFunc) *watchChan { wc := &watchChan{ watcher: w, key: key, @@ -89,8 +97,7 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bo errChan: make(chan error, 1), } wc.ctx, wc.cancel = context.WithCancel(ctx) - go wc.run() - return wc, nil + return wc } func (wc *watchChan) run() { @@ -276,6 +283,12 @@ func parseError(err error) *watch.Event { } func (wc *watchChan) sendError(err error) { + // Context.canceled is an expected behavior. + // We should just stop all goroutines in watchChan without returning error. + // TODO: etcd client should return context.Canceled instead of grpc specific error. + if grpc.Code(err) == codes.Canceled || err == context.Canceled { + return + } select { case wc.errChan <- err: case <-wc.ctx.Done(): diff --git a/pkg/storage/etcd3/watcher_test.go b/pkg/storage/etcd3/watcher_test.go index 137fe96d5f3..66578e56c0e 100644 --- a/pkg/storage/etcd3/watcher_test.go +++ b/pkg/storage/etcd3/watcher_test.go @@ -168,6 +168,25 @@ func TestWatchError(t *testing.T) { testCheckResult(t, 0, watch.Error, w, nil) } +func TestWatchContextCancel(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + canceledCtx, cancel := context.WithCancel(ctx) + cancel() + w := store.watcher.createWatchChan(canceledCtx, "/abc", 0, false, storage.Everything) + // When we do a client.Get with a canceled context, it will return error. + // Nonetheless, when we try to send it over internal errChan, we should detect + // it's context canceled and not send it. + err := w.sync() + w.ctx = ctx + w.sendError(err) + select { + case err := <-w.errChan: + t.Errorf("cancelling context shouldn't return any error. Err: %v", err) + default: + } +} + type testWatchStruct struct { obj *api.Pod expectEvent bool