Merge pull request #32907 from hongchaodeng/etcd_fix

Automatic merge from submit-queue

etcd watcher: centralize error handling

We should centralize error handling in watcher in run(). Otherwise this could silently return.

Also we don't need the grpc code checking anymore. It's fixed.
This commit is contained in:
Kubernetes Submit Queue
2016-09-19 13:07:11 -07:00
committed by GitHub
2 changed files with 17 additions and 18 deletions

View File

@@ -31,8 +31,6 @@ import (
etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/golang/glog"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
const (
@@ -109,6 +107,10 @@ func (wc *watchChan) run() {
select {
case err := <-wc.errChan:
if err == context.Canceled {
wc.cancel() // just in case
break
}
errResult := parseError(err)
if errResult != nil {
// error result is guaranteed to be received by user before closing ResultChan.
@@ -294,12 +296,6 @@ func parseError(err error) *watch.Event {
}
func (wc *watchChan) sendError(err error) {
// Context.canceled is an expected behavior.
// We should just stop all goroutines in watchChan without returning error.
// TODO: etcd client should return context.Canceled instead of grpc specific error.
if grpc.Code(err) == codes.Canceled || err == context.Canceled {
return
}
select {
case wc.errChan <- err:
case <-wc.ctx.Done():

View File

@@ -196,17 +196,20 @@ func TestWatchContextCancel(t *testing.T) {
defer cluster.Terminate(t)
canceledCtx, cancel := context.WithCancel(ctx)
cancel()
w := store.watcher.createWatchChan(canceledCtx, "/abc", 0, false, storage.Everything)
// When we do a client.Get with a canceled context, it will return error.
// Nonetheless, when we try to send it over internal errChan, we should detect
// it's context canceled and not send it.
err := w.sync()
w.ctx = ctx
w.sendError(err)
// When we watch with a canceled context, we should detect that it's context canceled.
// We won't take it as error and also close the watcher.
w, err := store.watcher.Watch(canceledCtx, "/abc", 0, false, storage.Everything)
if err != nil {
t.Fatal(err)
}
select {
case err := <-w.errChan:
t.Errorf("cancelling context shouldn't return any error. Err: %v", err)
default:
case _, ok := <-w.ResultChan():
if ok {
t.Error("ResultChan() should be closed")
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timeout after %v", wait.ForeverTestTimeout)
}
}