mirror of
https://github.com/kubernetes/client-go.git
synced 2025-07-16 16:21:11 +00:00
reflector: use watchlist
Kubernetes-commit: 51ec7305aa9fc2541c46c4836f5e150c818341f9
This commit is contained in:
parent
d1c260eddc
commit
b33e818785
39
tools/cache/reflector.go
vendored
39
tools/cache/reflector.go
vendored
@ -100,6 +100,15 @@ type Reflector struct {
|
||||
ShouldResync func() bool
|
||||
// MaxInternalErrorRetryDuration defines how long we should retry internal errors returned by watch.
|
||||
MaxInternalErrorRetryDuration time.Duration
|
||||
// UseWatchList if turned on instructs the reflector to open a stream to bring data from the API server.
|
||||
// Streaming has the primary advantage of using fewer server's resources to fetch data.
|
||||
//
|
||||
// The old behaviour establishes a LIST request which gets data in chunks.
|
||||
// Paginated list is less efficient and depending on the actual size of objects
|
||||
// might result in an increased memory consumption of the APIServer.
|
||||
//
|
||||
// See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details
|
||||
UseWatchList bool
|
||||
}
|
||||
|
||||
// ResourceVersionUpdater is an interface that allows store implementation to
|
||||
@ -312,17 +321,39 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
|
||||
// It returns error if ListAndWatch didn't even try to initialize watch.
|
||||
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name)
|
||||
var err error
|
||||
var w watch.Interface
|
||||
fallbackToList := !r.UseWatchList
|
||||
|
||||
err := r.list(stopCh)
|
||||
if err != nil {
|
||||
return err
|
||||
if r.UseWatchList {
|
||||
w, err = r.watchList(stopCh)
|
||||
if w == nil && err == nil {
|
||||
// stopCh was closed
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
if !apierrors.IsInvalid(err) {
|
||||
return err
|
||||
}
|
||||
klog.Warning("the watch-list feature is not supported by the server, falling back to the previous LIST/WATCH semantic")
|
||||
fallbackToList = true
|
||||
// Ensure that we won't accidentally pass some garbage down the watch.
|
||||
w = nil
|
||||
}
|
||||
}
|
||||
|
||||
if fallbackToList {
|
||||
err = r.list(stopCh)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
resyncerrc := make(chan error, 1)
|
||||
cancelCh := make(chan struct{})
|
||||
defer close(cancelCh)
|
||||
go r.startResync(stopCh, cancelCh, resyncerrc)
|
||||
return r.watch(nil, stopCh, resyncerrc)
|
||||
return r.watch(w, stopCh, resyncerrc)
|
||||
}
|
||||
|
||||
// startResync periodically calls r.store.Resync() method.
|
||||
|
Loading…
Reference in New Issue
Block a user