reflector: allow watch method to accept a watcher

Kubernetes-commit: f6161a51e93bfa70585f1797f447f13ec1fde68b
This commit is contained in:
Lukasz Szaszkiewicz 2023-03-08 12:52:32 +01:00 committed by Kubernetes Publisher
parent 64e2c7ff16
commit ac7598edb4

View File

@ -50,7 +50,6 @@ const defaultExpectedTypeName = "<unspecified>"
type Reflector struct { type Reflector struct {
// name identifies this reflector. By default it will be a file:line if possible. // name identifies this reflector. By default it will be a file:line if possible.
name string name string
// The name of the type we expect to place in the store. The name // The name of the type we expect to place in the store. The name
// will be the stringification of expectedGVK if provided, and the // will be the stringification of expectedGVK if provided, and the
// stringification of expectedType otherwise. It is for display // stringification of expectedType otherwise. It is for display
@ -67,17 +66,11 @@ type Reflector struct {
store Store store Store
// listerWatcher is used to perform lists and watches. // listerWatcher is used to perform lists and watches.
listerWatcher ListerWatcher listerWatcher ListerWatcher
// backoff manages backoff of ListWatch // backoff manages backoff of ListWatch
backoffManager wait.BackoffManager backoffManager wait.BackoffManager
// initConnBackoffManager manages backoff the initial connection with the Watch call of ListAndWatch. // initConnBackoffManager manages backoff the initial connection with the Watch call of ListAndWatch.
initConnBackoffManager wait.BackoffManager initConnBackoffManager wait.BackoffManager
// MaxInternalErrorRetryDuration defines how long we should retry internal errors returned by watch.
MaxInternalErrorRetryDuration time.Duration
resyncPeriod time.Duration resyncPeriod time.Duration
// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
ShouldResync func() bool
// clock allows tests to manipulate time // clock allows tests to manipulate time
clock clock.Clock clock clock.Clock
// paginatedResult defines whether pagination should be forced for list calls. // paginatedResult defines whether pagination should be forced for list calls.
@ -92,6 +85,8 @@ type Reflector struct {
isLastSyncResourceVersionUnavailable bool isLastSyncResourceVersionUnavailable bool
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
lastSyncResourceVersionMutex sync.RWMutex lastSyncResourceVersionMutex sync.RWMutex
// Called whenever the ListAndWatch drops the connection with an error.
watchErrorHandler WatchErrorHandler
// WatchListPageSize is the requested chunk size of initial and resync watch lists. // WatchListPageSize is the requested chunk size of initial and resync watch lists.
// If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data // If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
// (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0") // (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
@ -100,8 +95,10 @@ type Reflector struct {
// etcd, which is significantly less efficient and may lead to serious performance and // etcd, which is significantly less efficient and may lead to serious performance and
// scalability problems. // scalability problems.
WatchListPageSize int64 WatchListPageSize int64
// Called whenever the ListAndWatch drops the connection with an error. // ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
watchErrorHandler WatchErrorHandler ShouldResync func() bool
// MaxInternalErrorRetryDuration defines how long we should retry internal errors returned by watch.
MaxInternalErrorRetryDuration time.Duration
} }
// ResourceVersionUpdater is an interface that allows store implementation to // ResourceVersionUpdater is an interface that allows store implementation to
@ -324,7 +321,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
cancelCh := make(chan struct{}) cancelCh := make(chan struct{})
defer close(cancelCh) defer close(cancelCh)
go r.startResync(stopCh, cancelCh, resyncerrc) go r.startResync(stopCh, cancelCh, resyncerrc)
return r.watch(stopCh, resyncerrc) return r.watch(nil, stopCh, resyncerrc)
} }
// startResync periodically calls r.store.Resync() method. // startResync periodically calls r.store.Resync() method.
@ -356,8 +353,10 @@ func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{}
} }
// watch simply starts a watch request with the server. // watch simply starts a watch request with the server.
func (r *Reflector) watch(stopCh <-chan struct{}, resyncerrc chan error) error { func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {
var err error
retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock) retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
for { for {
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
select { select {
@ -366,6 +365,10 @@ func (r *Reflector) watch(stopCh <-chan struct{}, resyncerrc chan error) error {
default: default:
} }
// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
start := r.clock.Now()
if w == nil {
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options := metav1.ListOptions{ options := metav1.ListOptions{
ResourceVersion: r.LastSyncResourceVersion(), ResourceVersion: r.LastSyncResourceVersion(),
@ -378,23 +381,20 @@ func (r *Reflector) watch(stopCh <-chan struct{}, resyncerrc chan error) error {
AllowWatchBookmarks: true, AllowWatchBookmarks: true,
} }
// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent w, err = r.listerWatcher.Watch(options)
start := r.clock.Now()
w, err := r.listerWatcher.Watch(options)
if err != nil { if err != nil {
// If this is "connection refused" error, it means that most likely apiserver is not responsive. if canRetry := isWatchErrorRetriable(err); canRetry {
// It doesn't make sense to re-list all objects because most likely we will be able to restart klog.V(4).Infof("%s: watch of %v returned %v - backing off", r.name, r.typeDescription, err)
// watch where we ended.
// If that's the case begin exponentially backing off and resend watch request.
// Do the same for "429" errors.
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
<-r.initConnBackoffManager.Backoff().C() <-r.initConnBackoffManager.Backoff().C()
continue continue
} }
return err return err
} }
}
err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh) err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh)
// Ensure that watch will not be reused across iterations.
w = nil
retry.After(err) retry.After(err)
if err != nil { if err != nil {
if err != errorStopRequested { if err != errorStopRequested {
@ -711,3 +711,17 @@ func isTooLargeResourceVersionError(err error) bool {
return false return false
} }
// isWatchErrorRetriable determines if it is safe to retry
// a watch error retrieved from the server.
func isWatchErrorRetriable(err error) bool {
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
// It doesn't make sense to re-list all objects because most likely we will be able to restart
// watch where we ended.
// If that's the case begin exponentially backing off and resend watch request.
// Do the same for "429" errors.
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
return true
}
return false
}