mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
reflector: move LIST to its own method
This commit is contained in:
parent
e9e26068b7
commit
6fc09008de
@ -254,10 +254,104 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
|
|||||||
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||||
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
|
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
|
||||||
var resourceVersion string
|
var resourceVersion string
|
||||||
|
var err error
|
||||||
|
|
||||||
|
resourceVersion, err = r.list(stopCh)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
resyncerrc := make(chan error, 1)
|
||||||
|
cancelCh := make(chan struct{})
|
||||||
|
defer close(cancelCh)
|
||||||
|
go func() {
|
||||||
|
resyncCh, cleanup := r.resyncChan()
|
||||||
|
defer func() {
|
||||||
|
cleanup() // Call the last one written into cleanup
|
||||||
|
}()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-resyncCh:
|
||||||
|
case <-stopCh:
|
||||||
|
return
|
||||||
|
case <-cancelCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if r.ShouldResync == nil || r.ShouldResync() {
|
||||||
|
klog.V(4).Infof("%s: forcing resync", r.name)
|
||||||
|
if err := r.store.Resync(); err != nil {
|
||||||
|
resyncerrc <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cleanup()
|
||||||
|
resyncCh, cleanup = r.resyncChan()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
|
||||||
|
select {
|
||||||
|
case <-stopCh:
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
|
||||||
|
options := metav1.ListOptions{
|
||||||
|
ResourceVersion: resourceVersion,
|
||||||
|
// We want to avoid situations of hanging watchers. Stop any watchers that do not
|
||||||
|
// receive any events within the timeout window.
|
||||||
|
TimeoutSeconds: &timeoutSeconds,
|
||||||
|
// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
|
||||||
|
// Reflector doesn't assume bookmarks are returned at all (if the server do not support
|
||||||
|
// watch bookmarks, it will ignore this field).
|
||||||
|
AllowWatchBookmarks: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
|
||||||
|
start := r.clock.Now()
|
||||||
|
w, err := r.listerWatcher.Watch(options)
|
||||||
|
if err != nil {
|
||||||
|
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
|
||||||
|
// It doesn't make sense to re-list all objects because most likely we will be able to restart
|
||||||
|
// watch where we ended.
|
||||||
|
// If that's the case begin exponentially backing off and resend watch request.
|
||||||
|
// Do the same for "429" errors.
|
||||||
|
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
|
||||||
|
<-r.initConnBackoffManager.Backoff().C()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, &resourceVersion, resyncerrc, stopCh); err != nil {
|
||||||
|
if err != errorStopRequested {
|
||||||
|
switch {
|
||||||
|
case isExpiredError(err):
|
||||||
|
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
|
||||||
|
// has a semantic that it returns data at least as fresh as provided RV.
|
||||||
|
// So first try to LIST with setting RV to resource version of last observed object.
|
||||||
|
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
|
||||||
|
case apierrors.IsTooManyRequests(err):
|
||||||
|
klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName)
|
||||||
|
<-r.initConnBackoffManager.Backoff().C()
|
||||||
|
continue
|
||||||
|
default:
|
||||||
|
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// list simply lists all items and gets a resource version obtained from the server at the moment of the call.
|
||||||
|
// the resource version can be used for further progress notification (aka. watch).
|
||||||
|
func (r *Reflector) list(stopCh <-chan struct{}) (string, error) {
|
||||||
|
var resourceVersion string
|
||||||
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
|
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
|
||||||
|
|
||||||
if err := func() error {
|
|
||||||
initTrace := trace.New("Reflector ListAndWatch", trace.Field{Key: "name", Value: r.name})
|
initTrace := trace.New("Reflector ListAndWatch", trace.Field{Key: "name", Value: r.name})
|
||||||
defer initTrace.LogIfLong(10 * time.Second)
|
defer initTrace.LogIfLong(10 * time.Second)
|
||||||
var list runtime.Object
|
var list runtime.Object
|
||||||
@ -314,7 +408,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
}()
|
}()
|
||||||
select {
|
select {
|
||||||
case <-stopCh:
|
case <-stopCh:
|
||||||
return nil
|
return "", nil
|
||||||
case r := <-panicCh:
|
case r := <-panicCh:
|
||||||
panic(r)
|
panic(r)
|
||||||
case <-listCh:
|
case <-listCh:
|
||||||
@ -322,7 +416,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err})
|
initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Warningf("%s: failed to list %v: %v", r.name, r.expectedTypeName, err)
|
klog.Warningf("%s: failed to list %v: %v", r.name, r.expectedTypeName, err)
|
||||||
return fmt.Errorf("failed to list %v: %w", r.expectedTypeName, err)
|
return "", fmt.Errorf("failed to list %v: %w", r.expectedTypeName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// We check if the list was paginated and if so set the paginatedResult based on that.
|
// We check if the list was paginated and if so set the paginatedResult based on that.
|
||||||
@ -342,109 +436,22 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
|
r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
|
||||||
listMetaInterface, err := meta.ListAccessor(list)
|
listMetaInterface, err := meta.ListAccessor(list)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to understand list result %#v: %v", list, err)
|
return "", fmt.Errorf("unable to understand list result %#v: %v", list, err)
|
||||||
}
|
}
|
||||||
resourceVersion = listMetaInterface.GetResourceVersion()
|
resourceVersion = listMetaInterface.GetResourceVersion()
|
||||||
initTrace.Step("Resource version extracted")
|
initTrace.Step("Resource version extracted")
|
||||||
items, err := meta.ExtractList(list)
|
items, err := meta.ExtractList(list)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
|
return "", fmt.Errorf("unable to understand list result %#v (%v)", list, err)
|
||||||
}
|
}
|
||||||
initTrace.Step("Objects extracted")
|
initTrace.Step("Objects extracted")
|
||||||
if err := r.syncWith(items, resourceVersion); err != nil {
|
if err := r.syncWith(items, resourceVersion); err != nil {
|
||||||
return fmt.Errorf("unable to sync list result: %v", err)
|
return "", fmt.Errorf("unable to sync list result: %v", err)
|
||||||
}
|
}
|
||||||
initTrace.Step("SyncWith done")
|
initTrace.Step("SyncWith done")
|
||||||
r.setLastSyncResourceVersion(resourceVersion)
|
r.setLastSyncResourceVersion(resourceVersion)
|
||||||
initTrace.Step("Resource version updated")
|
initTrace.Step("Resource version updated")
|
||||||
return nil
|
return resourceVersion, nil
|
||||||
}(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
resyncerrc := make(chan error, 1)
|
|
||||||
cancelCh := make(chan struct{})
|
|
||||||
defer close(cancelCh)
|
|
||||||
go func() {
|
|
||||||
resyncCh, cleanup := r.resyncChan()
|
|
||||||
defer func() {
|
|
||||||
cleanup() // Call the last one written into cleanup
|
|
||||||
}()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-resyncCh:
|
|
||||||
case <-stopCh:
|
|
||||||
return
|
|
||||||
case <-cancelCh:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if r.ShouldResync == nil || r.ShouldResync() {
|
|
||||||
klog.V(4).Infof("%s: forcing resync", r.name)
|
|
||||||
if err := r.store.Resync(); err != nil {
|
|
||||||
resyncerrc <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
cleanup()
|
|
||||||
resyncCh, cleanup = r.resyncChan()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
for {
|
|
||||||
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
|
|
||||||
select {
|
|
||||||
case <-stopCh:
|
|
||||||
return nil
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
|
|
||||||
options = metav1.ListOptions{
|
|
||||||
ResourceVersion: resourceVersion,
|
|
||||||
// We want to avoid situations of hanging watchers. Stop any watchers that do not
|
|
||||||
// receive any events within the timeout window.
|
|
||||||
TimeoutSeconds: &timeoutSeconds,
|
|
||||||
// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
|
|
||||||
// Reflector doesn't assume bookmarks are returned at all (if the server do not support
|
|
||||||
// watch bookmarks, it will ignore this field).
|
|
||||||
AllowWatchBookmarks: true,
|
|
||||||
}
|
|
||||||
|
|
||||||
// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
|
|
||||||
start := r.clock.Now()
|
|
||||||
w, err := r.listerWatcher.Watch(options)
|
|
||||||
if err != nil {
|
|
||||||
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
|
|
||||||
// It doesn't make sense to re-list all objects because most likely we will be able to restart
|
|
||||||
// watch where we ended.
|
|
||||||
// If that's the case begin exponentially backing off and resend watch request.
|
|
||||||
// Do the same for "429" errors.
|
|
||||||
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
|
|
||||||
<-r.initConnBackoffManager.Backoff().C()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, &resourceVersion, resyncerrc, stopCh); err != nil {
|
|
||||||
if err != errorStopRequested {
|
|
||||||
switch {
|
|
||||||
case isExpiredError(err):
|
|
||||||
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
|
|
||||||
// has a semantic that it returns data at least as fresh as provided RV.
|
|
||||||
// So first try to LIST with setting RV to resource version of last observed object.
|
|
||||||
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
|
|
||||||
case apierrors.IsTooManyRequests(err):
|
|
||||||
klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName)
|
|
||||||
<-r.initConnBackoffManager.Backoff().C()
|
|
||||||
continue
|
|
||||||
default:
|
|
||||||
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// syncWith replaces the store's items with the given list.
|
// syncWith replaces the store's items with the given list.
|
||||||
|
Loading…
Reference in New Issue
Block a user