From b33e8187851cace587184f73941a53da0f75098e Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Thu, 9 Mar 2023 14:11:59 +0100 Subject: [PATCH] reflector: use watchlist Kubernetes-commit: 51ec7305aa9fc2541c46c4836f5e150c818341f9 --- tools/cache/reflector.go | 39 +++++++++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 69fc23a1..7363ce35 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -100,6 +100,15 @@ type Reflector struct { ShouldResync func() bool // MaxInternalErrorRetryDuration defines how long we should retry internal errors returned by watch. MaxInternalErrorRetryDuration time.Duration + // UseWatchList if turned on instructs the reflector to open a stream to bring data from the API server. + // Streaming has the primary advantage of using fewer server's resources to fetch data. + // + // The old behaviour establishes a LIST request which gets data in chunks. + // Paginated list is less efficient and depending on the actual size of objects + // might result in an increased memory consumption of the APIServer. + // + // See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details + UseWatchList bool } // ResourceVersionUpdater is an interface that allows store implementation to @@ -312,17 +321,39 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) { // It returns error if ListAndWatch didn't even try to initialize watch. func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name) + var err error + var w watch.Interface + fallbackToList := !r.UseWatchList - err := r.list(stopCh) - if err != nil { - return err + if r.UseWatchList { + w, err = r.watchList(stopCh) + if w == nil && err == nil { + // stopCh was closed + return nil + } + if err != nil { + if !apierrors.IsInvalid(err) { + return err + } + klog.Warning("the watch-list feature is not supported by the server, falling back to the previous LIST/WATCH semantic") + fallbackToList = true + // Ensure that we won't accidentally pass some garbage down the watch. + w = nil + } + } + + if fallbackToList { + err = r.list(stopCh) + if err != nil { + return err + } } resyncerrc := make(chan error, 1) cancelCh := make(chan struct{}) defer close(cancelCh) go r.startResync(stopCh, cancelCh, resyncerrc) - return r.watch(nil, stopCh, resyncerrc) + return r.watch(w, stopCh, resyncerrc) } // startResync periodically calls r.store.Resync() method.