mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-28 16:07:29 +00:00
Merge pull request #87329 from windmilleng/nicks/informer-error-handling
cache: add error handling to informers Kubernetes-commit: 7c6473b7c901732d6e09b2bfed0e8d423b7bf1d9
This commit is contained in:
commit
fe32aa3b94
6
tools/cache/controller.go
vendored
6
tools/cache/controller.go
vendored
@ -69,6 +69,9 @@ type Config struct {
|
|||||||
// question to this interface as a parameter. This is probably moot
|
// question to this interface as a parameter. This is probably moot
|
||||||
// now that this functionality appears at a higher level.
|
// now that this functionality appears at a higher level.
|
||||||
RetryOnError bool
|
RetryOnError bool
|
||||||
|
|
||||||
|
// Called whenever the ListAndWatch drops the connection with an error.
|
||||||
|
WatchErrorHandler WatchErrorHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShouldResyncFunc is a type of function that indicates if a reflector should perform a
|
// ShouldResyncFunc is a type of function that indicates if a reflector should perform a
|
||||||
@ -132,6 +135,9 @@ func (c *controller) Run(stopCh <-chan struct{}) {
|
|||||||
)
|
)
|
||||||
r.ShouldResync = c.config.ShouldResync
|
r.ShouldResync = c.config.ShouldResync
|
||||||
r.clock = c.clock
|
r.clock = c.clock
|
||||||
|
if c.config.WatchErrorHandler != nil {
|
||||||
|
r.watchErrorHandler = c.config.WatchErrorHandler
|
||||||
|
}
|
||||||
|
|
||||||
c.reflectorMutex.Lock()
|
c.reflectorMutex.Lock()
|
||||||
c.reflector = r
|
c.reflector = r
|
||||||
|
57
tools/cache/reflector.go
vendored
57
tools/cache/reflector.go
vendored
@ -95,6 +95,37 @@ type Reflector struct {
|
|||||||
// etcd, which is significantly less efficient and may lead to serious performance and
|
// etcd, which is significantly less efficient and may lead to serious performance and
|
||||||
// scalability problems.
|
// scalability problems.
|
||||||
WatchListPageSize int64
|
WatchListPageSize int64
|
||||||
|
// Called whenever the ListAndWatch drops the connection with an error.
|
||||||
|
watchErrorHandler WatchErrorHandler
|
||||||
|
}
|
||||||
|
|
||||||
|
// The WatchErrorHandler is called whenever ListAndWatch drops the
|
||||||
|
// connection with an error. After calling this handler, the informer
|
||||||
|
// will backoff and retry.
|
||||||
|
//
|
||||||
|
// The default implementation looks at the error type and tries to log
|
||||||
|
// the error message at an appropriate level.
|
||||||
|
//
|
||||||
|
// Implementations of this handler may display the error message in other
|
||||||
|
// ways. Implementations should return quickly - any expensive processing
|
||||||
|
// should be offloaded.
|
||||||
|
type WatchErrorHandler func(r *Reflector, err error)
|
||||||
|
|
||||||
|
// DefaultWatchErrorHandler is the default implementation of WatchErrorHandler
|
||||||
|
func DefaultWatchErrorHandler(r *Reflector, err error) {
|
||||||
|
switch {
|
||||||
|
case isExpiredError(err):
|
||||||
|
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
|
||||||
|
// has a semantic that it returns data at least as fresh as provided RV.
|
||||||
|
// So first try to LIST with setting RV to resource version of last observed object.
|
||||||
|
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
|
||||||
|
case err == io.EOF:
|
||||||
|
// watch closed normally
|
||||||
|
case err == io.ErrUnexpectedEOF:
|
||||||
|
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
|
||||||
|
default:
|
||||||
|
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -138,6 +169,7 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
|
|||||||
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
|
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
|
||||||
resyncPeriod: resyncPeriod,
|
resyncPeriod: resyncPeriod,
|
||||||
clock: realClock,
|
clock: realClock,
|
||||||
|
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
|
||||||
}
|
}
|
||||||
r.setExpectedType(expectedType)
|
r.setExpectedType(expectedType)
|
||||||
return r
|
return r
|
||||||
@ -175,7 +207,7 @@ func (r *Reflector) Run(stopCh <-chan struct{}) {
|
|||||||
klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
|
klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
|
||||||
wait.BackoffUntil(func() {
|
wait.BackoffUntil(func() {
|
||||||
if err := r.ListAndWatch(stopCh); err != nil {
|
if err := r.ListAndWatch(stopCh); err != nil {
|
||||||
utilruntime.HandleError(err)
|
r.watchErrorHandler(r, err)
|
||||||
}
|
}
|
||||||
}, r.backoffManager, true, stopCh)
|
}, r.backoffManager, true, stopCh)
|
||||||
klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
|
klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
|
||||||
@ -275,7 +307,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
case <-listCh:
|
case <-listCh:
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
|
return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// We check if the list was paginated and if so set the paginatedResult based on that.
|
// We check if the list was paginated and if so set the paginatedResult based on that.
|
||||||
@ -296,17 +328,17 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
initTrace.Step("Objects listed")
|
initTrace.Step("Objects listed")
|
||||||
listMetaInterface, err := meta.ListAccessor(list)
|
listMetaInterface, err := meta.ListAccessor(list)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
|
return fmt.Errorf("unable to understand list result %#v: %v", list, err)
|
||||||
}
|
}
|
||||||
resourceVersion = listMetaInterface.GetResourceVersion()
|
resourceVersion = listMetaInterface.GetResourceVersion()
|
||||||
initTrace.Step("Resource version extracted")
|
initTrace.Step("Resource version extracted")
|
||||||
items, err := meta.ExtractList(list)
|
items, err := meta.ExtractList(list)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
|
return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
|
||||||
}
|
}
|
||||||
initTrace.Step("Objects extracted")
|
initTrace.Step("Objects extracted")
|
||||||
if err := r.syncWith(items, resourceVersion); err != nil {
|
if err := r.syncWith(items, resourceVersion); err != nil {
|
||||||
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
|
return fmt.Errorf("unable to sync list result: %v", err)
|
||||||
}
|
}
|
||||||
initTrace.Step("SyncWith done")
|
initTrace.Step("SyncWith done")
|
||||||
r.setLastSyncResourceVersion(resourceVersion)
|
r.setLastSyncResourceVersion(resourceVersion)
|
||||||
@ -366,19 +398,6 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
|
|
||||||
w, err := r.listerWatcher.Watch(options)
|
w, err := r.listerWatcher.Watch(options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
switch {
|
|
||||||
case isExpiredError(err):
|
|
||||||
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
|
|
||||||
// has a semantic that it returns data at least as fresh as provided RV.
|
|
||||||
// So first try to LIST with setting RV to resource version of last observed object.
|
|
||||||
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
|
|
||||||
case err == io.EOF:
|
|
||||||
// watch closed normally
|
|
||||||
case err == io.ErrUnexpectedEOF:
|
|
||||||
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
|
|
||||||
default:
|
|
||||||
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
|
|
||||||
}
|
|
||||||
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
|
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
|
||||||
// It doesn't make sense to re-list all objects because most likely we will be able to restart
|
// It doesn't make sense to re-list all objects because most likely we will be able to restart
|
||||||
// watch where we ended.
|
// watch where we ended.
|
||||||
@ -387,7 +406,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return nil
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
|
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
|
||||||
|
31
tools/cache/shared_informer.go
vendored
31
tools/cache/shared_informer.go
vendored
@ -165,6 +165,21 @@ type SharedInformer interface {
|
|||||||
// store. The value returned is not synchronized with access to the underlying store and is not
|
// store. The value returned is not synchronized with access to the underlying store and is not
|
||||||
// thread-safe.
|
// thread-safe.
|
||||||
LastSyncResourceVersion() string
|
LastSyncResourceVersion() string
|
||||||
|
|
||||||
|
// The WatchErrorHandler is called whenever ListAndWatch drops the
|
||||||
|
// connection with an error. After calling this handler, the informer
|
||||||
|
// will backoff and retry.
|
||||||
|
//
|
||||||
|
// The default implementation looks at the error type and tries to log
|
||||||
|
// the error message at an appropriate level.
|
||||||
|
//
|
||||||
|
// There's only one handler, so if you call this multiple times, last one
|
||||||
|
// wins; calling after the informer has been started returns an error.
|
||||||
|
//
|
||||||
|
// The handler is intended for visibility, not to e.g. pause the consumers.
|
||||||
|
// The handler should return quickly - any expensive processing should be
|
||||||
|
// offloaded.
|
||||||
|
SetWatchErrorHandler(handler WatchErrorHandler) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
|
// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
|
||||||
@ -300,6 +315,9 @@ type sharedIndexInformer struct {
|
|||||||
// blockDeltas gives a way to stop all event distribution so that a late event handler
|
// blockDeltas gives a way to stop all event distribution so that a late event handler
|
||||||
// can safely join the shared informer.
|
// can safely join the shared informer.
|
||||||
blockDeltas sync.Mutex
|
blockDeltas sync.Mutex
|
||||||
|
|
||||||
|
// Called whenever the ListAndWatch drops the connection with an error.
|
||||||
|
watchErrorHandler WatchErrorHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
// dummyController hides the fact that a SharedInformer is different from a dedicated one
|
// dummyController hides the fact that a SharedInformer is different from a dedicated one
|
||||||
@ -335,6 +353,18 @@ type deleteNotification struct {
|
|||||||
oldObj interface{}
|
oldObj interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) error {
|
||||||
|
s.startedLock.Lock()
|
||||||
|
defer s.startedLock.Unlock()
|
||||||
|
|
||||||
|
if s.started {
|
||||||
|
return fmt.Errorf("informer has already started")
|
||||||
|
}
|
||||||
|
|
||||||
|
s.watchErrorHandler = handler
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
|
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
|
|
||||||
@ -352,6 +382,7 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
|
|||||||
ShouldResync: s.processor.shouldResync,
|
ShouldResync: s.processor.shouldResync,
|
||||||
|
|
||||||
Process: s.HandleDeltas,
|
Process: s.HandleDeltas,
|
||||||
|
WatchErrorHandler: s.watchErrorHandler,
|
||||||
}
|
}
|
||||||
|
|
||||||
func() {
|
func() {
|
||||||
|
27
tools/cache/shared_informer_test.go
vendored
27
tools/cache/shared_informer_test.go
vendored
@ -18,6 +18,7 @@ package cache
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -330,3 +331,29 @@ func TestSharedInformerWatchDisruption(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSharedInformerErrorHandling(t *testing.T) {
|
||||||
|
source := fcache.NewFakeControllerSource()
|
||||||
|
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||||
|
source.ListError = fmt.Errorf("Access Denied")
|
||||||
|
|
||||||
|
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
|
||||||
|
|
||||||
|
errCh := make(chan error)
|
||||||
|
_ = informer.SetWatchErrorHandler(func(_ *Reflector, err error) {
|
||||||
|
errCh <- err
|
||||||
|
})
|
||||||
|
|
||||||
|
stop := make(chan struct{})
|
||||||
|
go informer.Run(stop)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
if !strings.Contains(err.Error(), "Access Denied") {
|
||||||
|
t.Errorf("Expected 'Access Denied' error. Actual: %v", err)
|
||||||
|
}
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Errorf("Timeout waiting for error handler call")
|
||||||
|
}
|
||||||
|
close(stop)
|
||||||
|
}
|
||||||
|
@ -62,6 +62,9 @@ type FakeControllerSource struct {
|
|||||||
changes []watch.Event // one change per resourceVersion
|
changes []watch.Event // one change per resourceVersion
|
||||||
Broadcaster *watch.Broadcaster
|
Broadcaster *watch.Broadcaster
|
||||||
lastRV int
|
lastRV int
|
||||||
|
|
||||||
|
// Set this to simulate an error on List()
|
||||||
|
ListError error
|
||||||
}
|
}
|
||||||
|
|
||||||
type FakePVControllerSource struct {
|
type FakePVControllerSource struct {
|
||||||
@ -174,6 +177,11 @@ func (f *FakeControllerSource) getListItemsLocked() ([]runtime.Object, error) {
|
|||||||
func (f *FakeControllerSource) List(options metav1.ListOptions) (runtime.Object, error) {
|
func (f *FakeControllerSource) List(options metav1.ListOptions) (runtime.Object, error) {
|
||||||
f.lock.RLock()
|
f.lock.RLock()
|
||||||
defer f.lock.RUnlock()
|
defer f.lock.RUnlock()
|
||||||
|
|
||||||
|
if f.ListError != nil {
|
||||||
|
return nil, f.ListError
|
||||||
|
}
|
||||||
|
|
||||||
list, err := f.getListItemsLocked()
|
list, err := f.getListItemsLocked()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
Loading…
Reference in New Issue
Block a user