mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-19 18:02:01 +00:00
Merge pull request #116297 from p0lyn0mial/upstream-reflector-list-n-watch-refactor
reflector: extract watch and startResyncAsync methods
This commit is contained in:
commit
778b24c97e
@ -323,31 +323,40 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
resyncerrc := make(chan error, 1)
|
||||
cancelCh := make(chan struct{})
|
||||
defer close(cancelCh)
|
||||
go func() {
|
||||
resyncCh, cleanup := r.resyncChan()
|
||||
defer func() {
|
||||
cleanup() // Call the last one written into cleanup
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-resyncCh:
|
||||
case <-stopCh:
|
||||
return
|
||||
case <-cancelCh:
|
||||
return
|
||||
}
|
||||
if r.ShouldResync == nil || r.ShouldResync() {
|
||||
klog.V(4).Infof("%s: forcing resync", r.name)
|
||||
if err := r.store.Resync(); err != nil {
|
||||
resyncerrc <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
cleanup()
|
||||
resyncCh, cleanup = r.resyncChan()
|
||||
}
|
||||
}()
|
||||
go r.startResync(stopCh, cancelCh, resyncerrc)
|
||||
return r.watch(stopCh, resyncerrc)
|
||||
}
|
||||
|
||||
// startResync periodically calls r.store.Resync() method.
|
||||
// Note that this method is blocking and should be
|
||||
// called in a separate goroutine.
|
||||
func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{}, resyncerrc chan error) {
|
||||
resyncCh, cleanup := r.resyncChan()
|
||||
defer func() {
|
||||
cleanup() // Call the last one written into cleanup
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-resyncCh:
|
||||
case <-stopCh:
|
||||
return
|
||||
case <-cancelCh:
|
||||
return
|
||||
}
|
||||
if r.ShouldResync == nil || r.ShouldResync() {
|
||||
klog.V(4).Infof("%s: forcing resync", r.name)
|
||||
if err := r.store.Resync(); err != nil {
|
||||
resyncerrc <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
cleanup()
|
||||
resyncCh, cleanup = r.resyncChan()
|
||||
}
|
||||
}
|
||||
|
||||
// watch simply starts a watch request with the server.
|
||||
func (r *Reflector) watch(stopCh <-chan struct{}, resyncerrc chan error) error {
|
||||
retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
|
||||
for {
|
||||
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
|
||||
|
Loading…
Reference in New Issue
Block a user