Merge pull request #116435 from pohly/client-go-cache-reflector-leak

client-go: shut down watch reflector as soon as stop channel closes

Kubernetes-commit: fcfe5dfc2177320833caa451b401f2d1dc5668e7
This commit is contained in:
Kubernetes Publisher 2023-03-10 07:45:01 -08:00
commit 6df09021f9

View File

@ -417,8 +417,12 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc
if err != nil { if err != nil {
if canRetry := isWatchErrorRetriable(err); canRetry { if canRetry := isWatchErrorRetriable(err); canRetry {
klog.V(4).Infof("%s: watch of %v returned %v - backing off", r.name, r.typeDescription, err) klog.V(4).Infof("%s: watch of %v returned %v - backing off", r.name, r.typeDescription, err)
<-r.initConnBackoffManager.Backoff().C() select {
continue case <-stopCh:
return nil
case <-r.initConnBackoffManager.Backoff().C():
continue
}
} }
return err return err
} }
@ -439,8 +443,12 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err) klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err)
case apierrors.IsTooManyRequests(err): case apierrors.IsTooManyRequests(err):
klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.typeDescription) klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.typeDescription)
<-r.initConnBackoffManager.Backoff().C() select {
continue case <-stopCh:
return nil
case <-r.initConnBackoffManager.Backoff().C():
continue
}
case apierrors.IsInternalError(err) && retry.ShouldRetry(): case apierrors.IsInternalError(err) && retry.ShouldRetry():
klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.typeDescription, err) klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.typeDescription, err)
continue continue