mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 02:41:25 +00:00
Recognize etcd/grpc cancel errors correctly
This commit is contained in:
parent
03ff890ef4
commit
267eb25e60
@ -25,6 +25,9 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
grpccodes "google.golang.org/grpc/codes"
|
||||||
|
grpcstatus "google.golang.org/grpc/status"
|
||||||
|
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
@ -35,6 +38,7 @@ import (
|
|||||||
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
||||||
|
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
|
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -152,6 +156,31 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re
|
|||||||
return wc
|
return wc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type etcdError interface {
|
||||||
|
Code() grpccodes.Code
|
||||||
|
Error() string
|
||||||
|
}
|
||||||
|
|
||||||
|
type grpcError interface {
|
||||||
|
GRPCStatus() *grpcstatus.Status
|
||||||
|
}
|
||||||
|
|
||||||
|
func isCancelError(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if err == context.Canceled {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if etcdErr, ok := err.(etcdError); ok && etcdErr.Code() == grpccodes.Canceled {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if grpcErr, ok := err.(grpcError); ok && grpcErr.GRPCStatus().Code() == grpccodes.Canceled {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func (wc *watchChan) run() {
|
func (wc *watchChan) run() {
|
||||||
watchClosedCh := make(chan struct{})
|
watchClosedCh := make(chan struct{})
|
||||||
go wc.startWatching(watchClosedCh)
|
go wc.startWatching(watchClosedCh)
|
||||||
@ -162,7 +191,7 @@ func (wc *watchChan) run() {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case err := <-wc.errChan:
|
case err := <-wc.errChan:
|
||||||
if err == context.Canceled {
|
if isCancelError(err) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
errResult := transformErrorToEvent(err)
|
errResult := transformErrorToEvent(err)
|
||||||
@ -213,12 +242,15 @@ func (wc *watchChan) sync() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// logWatchChannelErr checks whether the error is about mvcc revision compaction which is regarded as warning
|
|
||||||
func logWatchChannelErr(err error) {
|
func logWatchChannelErr(err error) {
|
||||||
if !strings.Contains(err.Error(), "mvcc: required revision has been compacted") {
|
switch {
|
||||||
klog.Errorf("watch chan error: %v", err)
|
case strings.Contains(err.Error(), "mvcc: required revision has been compacted"):
|
||||||
} else {
|
// mvcc revision compaction which is regarded as warning, not error
|
||||||
klog.Warningf("watch chan error: %v", err)
|
klog.Warningf("watch chan error: %v", err)
|
||||||
|
case isCancelError(err):
|
||||||
|
// expected when watches close, no need to log
|
||||||
|
default:
|
||||||
|
klog.Errorf("watch chan error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user