mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
etcd3/watcher: cancelling context shouldn't return error
This commit is contained in:
parent
06c2db4fe2
commit
b0f4517e65
@ -31,6 +31,8 @@ import (
|
|||||||
etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -78,6 +80,12 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bo
|
|||||||
if recursive && !strings.HasSuffix(key, "/") {
|
if recursive && !strings.HasSuffix(key, "/") {
|
||||||
key += "/"
|
key += "/"
|
||||||
}
|
}
|
||||||
|
wc := w.createWatchChan(ctx, key, rev, recursive, filter)
|
||||||
|
go wc.run()
|
||||||
|
return wc, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, filter storage.FilterFunc) *watchChan {
|
||||||
wc := &watchChan{
|
wc := &watchChan{
|
||||||
watcher: w,
|
watcher: w,
|
||||||
key: key,
|
key: key,
|
||||||
@ -89,8 +97,7 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bo
|
|||||||
errChan: make(chan error, 1),
|
errChan: make(chan error, 1),
|
||||||
}
|
}
|
||||||
wc.ctx, wc.cancel = context.WithCancel(ctx)
|
wc.ctx, wc.cancel = context.WithCancel(ctx)
|
||||||
go wc.run()
|
return wc
|
||||||
return wc, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wc *watchChan) run() {
|
func (wc *watchChan) run() {
|
||||||
@ -276,6 +283,12 @@ func parseError(err error) *watch.Event {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (wc *watchChan) sendError(err error) {
|
func (wc *watchChan) sendError(err error) {
|
||||||
|
// Context.canceled is an expected behavior.
|
||||||
|
// We should just stop all goroutines in watchChan without returning error.
|
||||||
|
// TODO: etcd client should return context.Canceled instead of grpc specific error.
|
||||||
|
if grpc.Code(err) == codes.Canceled || err == context.Canceled {
|
||||||
|
return
|
||||||
|
}
|
||||||
select {
|
select {
|
||||||
case wc.errChan <- err:
|
case wc.errChan <- err:
|
||||||
case <-wc.ctx.Done():
|
case <-wc.ctx.Done():
|
||||||
|
@ -168,6 +168,25 @@ func TestWatchError(t *testing.T) {
|
|||||||
testCheckResult(t, 0, watch.Error, w, nil)
|
testCheckResult(t, 0, watch.Error, w, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWatchContextCancel(t *testing.T) {
|
||||||
|
ctx, store, cluster := testSetup(t)
|
||||||
|
defer cluster.Terminate(t)
|
||||||
|
canceledCtx, cancel := context.WithCancel(ctx)
|
||||||
|
cancel()
|
||||||
|
w := store.watcher.createWatchChan(canceledCtx, "/abc", 0, false, storage.Everything)
|
||||||
|
// When we do a client.Get with a canceled context, it will return error.
|
||||||
|
// Nonetheless, when we try to send it over internal errChan, we should detect
|
||||||
|
// it's context canceled and not send it.
|
||||||
|
err := w.sync()
|
||||||
|
w.ctx = ctx
|
||||||
|
w.sendError(err)
|
||||||
|
select {
|
||||||
|
case err := <-w.errChan:
|
||||||
|
t.Errorf("cancelling context shouldn't return any error. Err: %v", err)
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type testWatchStruct struct {
|
type testWatchStruct struct {
|
||||||
obj *api.Pod
|
obj *api.Pod
|
||||||
expectEvent bool
|
expectEvent bool
|
||||||
|
Loading…
Reference in New Issue
Block a user