mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-25 14:41:53 +00:00
reflector: extract watch and startResyncAsync methods
Kubernetes-commit: 34fe27355b8b5acc4d29d053ed4361b4f72e147b
This commit is contained in:
parent
06ad6b391d
commit
f694a7978b
57
tools/cache/reflector.go
vendored
57
tools/cache/reflector.go
vendored
@ -323,31 +323,40 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
resyncerrc := make(chan error, 1)
|
resyncerrc := make(chan error, 1)
|
||||||
cancelCh := make(chan struct{})
|
cancelCh := make(chan struct{})
|
||||||
defer close(cancelCh)
|
defer close(cancelCh)
|
||||||
go func() {
|
go r.startResync(stopCh, cancelCh, resyncerrc)
|
||||||
resyncCh, cleanup := r.resyncChan()
|
return r.watch(stopCh, resyncerrc)
|
||||||
defer func() {
|
}
|
||||||
cleanup() // Call the last one written into cleanup
|
|
||||||
}()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-resyncCh:
|
|
||||||
case <-stopCh:
|
|
||||||
return
|
|
||||||
case <-cancelCh:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if r.ShouldResync == nil || r.ShouldResync() {
|
|
||||||
klog.V(4).Infof("%s: forcing resync", r.name)
|
|
||||||
if err := r.store.Resync(); err != nil {
|
|
||||||
resyncerrc <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
cleanup()
|
|
||||||
resyncCh, cleanup = r.resyncChan()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
|
// startResync periodically calls r.store.Resync() method.
|
||||||
|
// Note that this method is blocking and should be
|
||||||
|
// called in a separate goroutine.
|
||||||
|
func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{}, resyncerrc chan error) {
|
||||||
|
resyncCh, cleanup := r.resyncChan()
|
||||||
|
defer func() {
|
||||||
|
cleanup() // Call the last one written into cleanup
|
||||||
|
}()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-resyncCh:
|
||||||
|
case <-stopCh:
|
||||||
|
return
|
||||||
|
case <-cancelCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if r.ShouldResync == nil || r.ShouldResync() {
|
||||||
|
klog.V(4).Infof("%s: forcing resync", r.name)
|
||||||
|
if err := r.store.Resync(); err != nil {
|
||||||
|
resyncerrc <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cleanup()
|
||||||
|
resyncCh, cleanup = r.resyncChan()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// watch simply starts a watch request with the server.
|
||||||
|
func (r *Reflector) watch(stopCh <-chan struct{}, resyncerrc chan error) error {
|
||||||
retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
|
retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
|
||||||
for {
|
for {
|
||||||
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
|
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
|
||||||
|
Loading…
Reference in New Issue
Block a user