mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
etcd watcher: centralize error handling
This commit is contained in:
parent
af3050dd15
commit
5a4a095e29
@ -31,8 +31,6 @@ import (
|
|||||||
etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
"google.golang.org/grpc"
|
|
||||||
"google.golang.org/grpc/codes"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -109,6 +107,10 @@ func (wc *watchChan) run() {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case err := <-wc.errChan:
|
case err := <-wc.errChan:
|
||||||
|
if err == context.Canceled {
|
||||||
|
wc.cancel() // just in case
|
||||||
|
break
|
||||||
|
}
|
||||||
errResult := parseError(err)
|
errResult := parseError(err)
|
||||||
if errResult != nil {
|
if errResult != nil {
|
||||||
// error result is guaranteed to be received by user before closing ResultChan.
|
// error result is guaranteed to be received by user before closing ResultChan.
|
||||||
@ -294,12 +296,6 @@ func parseError(err error) *watch.Event {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (wc *watchChan) sendError(err error) {
|
func (wc *watchChan) sendError(err error) {
|
||||||
// Context.canceled is an expected behavior.
|
|
||||||
// We should just stop all goroutines in watchChan without returning error.
|
|
||||||
// TODO: etcd client should return context.Canceled instead of grpc specific error.
|
|
||||||
if grpc.Code(err) == codes.Canceled || err == context.Canceled {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
select {
|
select {
|
||||||
case wc.errChan <- err:
|
case wc.errChan <- err:
|
||||||
case <-wc.ctx.Done():
|
case <-wc.ctx.Done():
|
||||||
|
@ -196,17 +196,20 @@ func TestWatchContextCancel(t *testing.T) {
|
|||||||
defer cluster.Terminate(t)
|
defer cluster.Terminate(t)
|
||||||
canceledCtx, cancel := context.WithCancel(ctx)
|
canceledCtx, cancel := context.WithCancel(ctx)
|
||||||
cancel()
|
cancel()
|
||||||
w := store.watcher.createWatchChan(canceledCtx, "/abc", 0, false, storage.Everything)
|
// When we watch with a canceled context, we should detect that it's context canceled.
|
||||||
// When we do a client.Get with a canceled context, it will return error.
|
// We won't take it as error and also close the watcher.
|
||||||
// Nonetheless, when we try to send it over internal errChan, we should detect
|
w, err := store.watcher.Watch(canceledCtx, "/abc", 0, false, storage.Everything)
|
||||||
// it's context canceled and not send it.
|
if err != nil {
|
||||||
err := w.sync()
|
t.Fatal(err)
|
||||||
w.ctx = ctx
|
}
|
||||||
w.sendError(err)
|
|
||||||
select {
|
select {
|
||||||
case err := <-w.errChan:
|
case _, ok := <-w.ResultChan():
|
||||||
t.Errorf("cancelling context shouldn't return any error. Err: %v", err)
|
if ok {
|
||||||
default:
|
t.Error("ResultChan() should be closed")
|
||||||
|
}
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Errorf("timeout after %v", wait.ForeverTestTimeout)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user