cache: add error handling to informers

When creating an informer, this adds a way to add custom error handling, so that
Kubernetes tooling can properly surface the errors to the end user.

Fixes https://github.com/kubernetes/client-go/issues/155

Kubernetes-commit: 435b40aa1e5c0ae44e0aeb9aa6dbde79838b3390
This commit is contained in:
Nick Santos
2020-01-17 12:46:08 -05:00
committed by Kubernetes Publisher
parent 8e91b7aa91
commit ccd5becdff
5 changed files with 114 additions and 23 deletions

View File

@@ -165,6 +165,21 @@ type SharedInformer interface {
// store. The value returned is not synchronized with access to the underlying store and is not
// thread-safe.
LastSyncResourceVersion() string
// The WatchErrorHandler is called whenever ListAndWatch drops the
// connection with an error. After calling this handler, the informer
// will backoff and retry.
//
// The default implementation looks at the error type and tries to log
// the error message at an appropriate level.
//
// There's only one handler, so if you call this multiple times, last one
// wins; calling after the informer has been started returns an error.
//
// The handler is intended for visibility, not to e.g. pause the consumers.
// The handler should return quickly - any expensive processing should be
// offloaded.
SetWatchErrorHandler(handler WatchErrorHandler) error
}
// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
@@ -300,6 +315,9 @@ type sharedIndexInformer struct {
// blockDeltas gives a way to stop all event distribution so that a late event handler
// can safely join the shared informer.
blockDeltas sync.Mutex
// Called whenever the ListAndWatch drops the connection with an error.
watchErrorHandler WatchErrorHandler
}
// dummyController hides the fact that a SharedInformer is different from a dedicated one
@@ -335,6 +353,18 @@ type deleteNotification struct {
oldObj interface{}
}
func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) error {
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.started {
return fmt.Errorf("informer has already started")
}
s.watchErrorHandler = handler
return nil
}
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
@@ -351,7 +381,8 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}
func() {