Shared Informer Run blocks until all goroutines finish

Fixes #45454

Kubernetes-commit: d789615902ead2a112ad24fbfebe95285b87004b
This commit is contained in:
Mikhail Mazurskiy 2017-05-27 23:34:58 +10:00 committed by Kubernetes Publisher
parent b5da66a5eb
commit 5f6ea627a3
5 changed files with 33 additions and 37 deletions

View File

@ -116,7 +116,10 @@ func (c *controller) Run(stopCh <-chan struct{}) {
c.reflector = r c.reflector = r
c.reflectorMutex.Unlock() c.reflectorMutex.Unlock()
r.RunUntil(stopCh) var wg sync.WaitGroup
defer wg.Wait()
wait.StartUntil(stopCh, &wg, r.Run)
wait.Until(c.processLoop, time.Second, stopCh) wait.Until(c.processLoop, time.Second, stopCh)
} }

View File

@ -79,17 +79,15 @@ type cacheObj struct {
func (d *defaultCacheMutationDetector) Run(stopCh <-chan struct{}) { func (d *defaultCacheMutationDetector) Run(stopCh <-chan struct{}) {
// we DON'T want protection from panics. If we're running this code, we want to die // we DON'T want protection from panics. If we're running this code, we want to die
go func() { for {
for { d.CompareObjects()
d.CompareObjects()
select { select {
case <-stopCh: case <-stopCh:
return return
case <-time.After(d.period): case <-time.After(d.period):
}
} }
}() }
} }
// AddObject makes a deep copy of the object for later comparison. It only works on runtime.Object // AddObject makes a deep copy of the object for later comparison. It only works on runtime.Object

View File

@ -182,21 +182,10 @@ func extractStackCreator() (string, int, bool) {
} }
// Run starts a watch and handles watch events. Will restart the watch if it is closed. // Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run starts a goroutine and returns immediately. // Run will exit when stopCh is closed.
func (r *Reflector) Run() { func (r *Reflector) Run(stopCh <-chan struct{}) {
glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name) glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
go wait.Until(func() { wait.Until(func() {
if err := r.ListAndWatch(wait.NeverStop); err != nil {
utilruntime.HandleError(err)
}
}, r.period, wait.NeverStop)
}
// RunUntil starts a watch and handles watch events. Will restart the watch if it is closed.
// RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed.
func (r *Reflector) RunUntil(stopCh <-chan struct{}) {
glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
go wait.Until(func() {
if err := r.ListAndWatch(stopCh); err != nil { if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err) utilruntime.HandleError(err)
} }

View File

@ -83,7 +83,7 @@ func TestRunUntil(t *testing.T) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
}, },
} }
r.RunUntil(stopCh) go r.Run(stopCh)
// Synchronously add a dummy pod into the watch channel so we // Synchronously add a dummy pod into the watch channel so we
// know the RunUntil go routine is in the watch handler. // know the RunUntil go routine is in the watch handler.
fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}) fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}})

View File

@ -147,6 +147,7 @@ type sharedIndexInformer struct {
// stopCh is the channel used to stop the main Run process. We have to track it so that // stopCh is the channel used to stop the main Run process. We have to track it so that
// late joiners can have a proper stop // late joiners can have a proper stop
stopCh <-chan struct{} stopCh <-chan struct{}
wg sync.WaitGroup
} }
// dummyController hides the fact that a SharedInformer is different from a dedicated one // dummyController hides the fact that a SharedInformer is different from a dedicated one
@ -204,12 +205,14 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
s.controller = New(cfg) s.controller = New(cfg)
s.controller.(*controller).clock = s.clock s.controller.(*controller).clock = s.clock
s.stopCh = stopCh
s.started = true s.started = true
}() }()
s.stopCh = stopCh defer s.wg.Wait()
s.cacheMutationDetector.Run(stopCh)
s.processor.run(stopCh) wait.StartUntil(stopCh, &s.wg, s.cacheMutationDetector.Run)
wait.StartUntil(stopCh, &s.wg, s.processor.run)
s.controller.Run(stopCh) s.controller.Run(stopCh)
} }
@ -324,8 +327,8 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
s.processor.addListener(listener) s.processor.addListener(listener)
go listener.run(s.stopCh) wait.StartUntil(s.stopCh, &s.wg, listener.run)
go listener.pop(s.stopCh) wait.StartUntil(s.stopCh, &s.wg, listener.pop)
items := s.indexer.List() items := s.indexer.List()
for i := range items { for i := range items {
@ -395,13 +398,16 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
} }
func (p *sharedProcessor) run(stopCh <-chan struct{}) { func (p *sharedProcessor) run(stopCh <-chan struct{}) {
p.listenersLock.RLock() var wg sync.WaitGroup
defer p.listenersLock.RUnlock() func() {
p.listenersLock.RLock()
for _, listener := range p.listeners { defer p.listenersLock.RUnlock()
go listener.run(stopCh) for _, listener := range p.listeners {
go listener.pop(stopCh) wait.StartUntil(stopCh, &wg, listener.run)
} wait.StartUntil(stopCh, &wg, listener.pop)
}
}()
wg.Wait()
} }
// shouldResync queries every listener to determine if any of them need a resync, based on each // shouldResync queries every listener to determine if any of them need a resync, based on each