diff --git a/pkg/client/cache/poller.go b/pkg/client/cache/poller.go index 70fec9361da..deb708320de 100644 --- a/pkg/client/cache/poller.go +++ b/pkg/client/cache/poller.go @@ -57,14 +57,22 @@ func NewPoller(getFunc GetFunc, period time.Duration, store Store) *Poller { // Run begins polling. It starts a goroutine and returns immediately. func (p *Poller) Run() { - go util.Forever(func() { - e, err := p.getFunc() - if err != nil { - glog.Errorf("failed to list: %v", err) - return - } - p.sync(e) - }, p.period) + go util.Forever(p.run, p.period) +} + +// RunUntil begins polling. It starts a goroutine and returns immediately. +// It will stop when the stopCh is closed. +func (p *Poller) RunUntil(stopCh <-chan struct{}) { + go util.Until(p.run, p.period, stopCh) +} + +func (p *Poller) run() { + e, err := p.getFunc() + if err != nil { + glog.Errorf("failed to list: %v", err) + return + } + p.sync(e) } func (p *Poller) sync(e Enumerator) { diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 56ecdecc50e..68ae3f22b05 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -72,6 +72,12 @@ func (r *Reflector) Run() { go util.Forever(func() { r.listAndWatch() }, r.period) } +// RunUntil starts a watch and handles watch events. Will restart the watch if it is closed. +// RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed. +func (r *Reflector) RunUntil(stopCh <-chan struct{}) { + go util.Until(func() { r.listAndWatch() }, r.period, stopCh) +} + func (r *Reflector) listAndWatch() { var resourceVersion string