diff --git a/pkg/kubelet/config/apiserver.go b/pkg/kubelet/config/apiserver.go index a24ea2c9c13..016c33bf720 100644 --- a/pkg/kubelet/config/apiserver.go +++ b/pkg/kubelet/config/apiserver.go @@ -22,6 +22,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api" @@ -43,5 +44,6 @@ func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{} } updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource} } - cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0).Run() + r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0) + go r.Run(wait.NeverStop) } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index d3e3e447bdc..b2a5bdc08e4 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -392,7 +392,8 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) if kubeDeps.KubeClient != nil { serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.Core().RESTClient(), "services", metav1.NamespaceAll, fields.Everything()) - cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0).Run() + r := cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0) + go r.Run(wait.NeverStop) } serviceLister := corelisters.NewServiceLister(serviceIndexer) @@ -400,7 +401,8 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub if kubeDeps.KubeClient != nil { fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector() nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.Core().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector) - cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0).Run() + r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0) + go r.Run(wait.NeverStop) } nodeInfo := &predicates.CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)} diff --git a/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go b/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go index badaa21596c..fea1b861174 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go @@ -19,6 +19,7 @@ package wait import ( "errors" "math/rand" + "sync" "time" "k8s.io/apimachinery/pkg/util/runtime" @@ -36,6 +37,15 @@ var ForeverTestTimeout = time.Second * 30 // NeverStop may be passed to Until to make it never stop. var NeverStop <-chan struct{} = make(chan struct{}) +// StartUntil starts f in a new goroutine and calls done once f has finished. +func StartUntil(stopCh <-chan struct{}, wg *sync.WaitGroup, f func(stopCh <-chan struct{})) { + wg.Add(1) + go func() { + defer wg.Done() + f(stopCh) + }() +} + // Forever calls f every period for ever. // // Forever is syntactic sugar on top of Until. diff --git a/staging/src/k8s.io/client-go/tools/cache/controller.go b/staging/src/k8s.io/client-go/tools/cache/controller.go index 2c97b86580c..88003ce80a3 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller.go @@ -116,7 +116,10 @@ func (c *controller) Run(stopCh <-chan struct{}) { c.reflector = r c.reflectorMutex.Unlock() - r.RunUntil(stopCh) + var wg sync.WaitGroup + defer wg.Wait() + + wait.StartUntil(stopCh, &wg, r.Run) wait.Until(c.processLoop, time.Second, stopCh) } diff --git a/staging/src/k8s.io/client-go/tools/cache/mutation_detector.go b/staging/src/k8s.io/client-go/tools/cache/mutation_detector.go index cc6094ce4ef..8c067cab90c 100644 --- a/staging/src/k8s.io/client-go/tools/cache/mutation_detector.go +++ b/staging/src/k8s.io/client-go/tools/cache/mutation_detector.go @@ -79,17 +79,15 @@ type cacheObj struct { func (d *defaultCacheMutationDetector) Run(stopCh <-chan struct{}) { // we DON'T want protection from panics. If we're running this code, we want to die - go func() { - for { - d.CompareObjects() + for { + d.CompareObjects() - select { - case <-stopCh: - return - case <-time.After(d.period): - } + select { + case <-stopCh: + return + case <-time.After(d.period): } - }() + } } // AddObject makes a deep copy of the object for later comparison. It only works on runtime.Object diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector.go b/staging/src/k8s.io/client-go/tools/cache/reflector.go index 9a730610c62..c09a7865337 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector.go @@ -182,21 +182,10 @@ func extractStackCreator() (string, int, bool) { } // Run starts a watch and handles watch events. Will restart the watch if it is closed. -// Run starts a goroutine and returns immediately. -func (r *Reflector) Run() { +// Run will exit when stopCh is closed. +func (r *Reflector) Run(stopCh <-chan struct{}) { glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name) - go wait.Until(func() { - if err := r.ListAndWatch(wait.NeverStop); err != nil { - utilruntime.HandleError(err) - } - }, r.period, wait.NeverStop) -} - -// RunUntil starts a watch and handles watch events. Will restart the watch if it is closed. -// RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed. -func (r *Reflector) RunUntil(stopCh <-chan struct{}) { - glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name) - go wait.Until(func() { + wait.Until(func() { if err := r.ListAndWatch(stopCh); err != nil { utilruntime.HandleError(err) } diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go index c092beca889..bb06059f7e1 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go @@ -83,7 +83,7 @@ func TestRunUntil(t *testing.T) { return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil }, } - r.RunUntil(stopCh) + go r.Run(stopCh) // Synchronously add a dummy pod into the watch channel so we // know the RunUntil go routine is in the watch handler. fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}) diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index a0dbbb697b2..935f9d36b99 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -147,6 +147,7 @@ type sharedIndexInformer struct { // stopCh is the channel used to stop the main Run process. We have to track it so that // late joiners can have a proper stop stopCh <-chan struct{} + wg sync.WaitGroup } // dummyController hides the fact that a SharedInformer is different from a dedicated one @@ -204,12 +205,14 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { s.controller = New(cfg) s.controller.(*controller).clock = s.clock + s.stopCh = stopCh s.started = true }() - s.stopCh = stopCh - s.cacheMutationDetector.Run(stopCh) - s.processor.run(stopCh) + defer s.wg.Wait() + + wait.StartUntil(stopCh, &s.wg, s.cacheMutationDetector.Run) + wait.StartUntil(stopCh, &s.wg, s.processor.run) s.controller.Run(stopCh) } @@ -324,8 +327,8 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv s.processor.addListener(listener) - go listener.run(s.stopCh) - go listener.pop(s.stopCh) + wait.StartUntil(s.stopCh, &s.wg, listener.run) + wait.StartUntil(s.stopCh, &s.wg, listener.pop) items := s.indexer.List() for i := range items { @@ -395,13 +398,16 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) { } func (p *sharedProcessor) run(stopCh <-chan struct{}) { - p.listenersLock.RLock() - defer p.listenersLock.RUnlock() - - for _, listener := range p.listeners { - go listener.run(stopCh) - go listener.pop(stopCh) - } + var wg sync.WaitGroup + func() { + p.listenersLock.RLock() + defer p.listenersLock.RUnlock() + for _, listener := range p.listeners { + wait.StartUntil(stopCh, &wg, listener.run) + wait.StartUntil(stopCh, &wg, listener.pop) + } + }() + wg.Wait() } // shouldResync queries every listener to determine if any of them need a resync, based on each diff --git a/test/utils/pod_store.go b/test/utils/pod_store.go index 92ddcb5b94d..b7f4d37975b 100644 --- a/test/utils/pod_store.go +++ b/test/utils/pod_store.go @@ -51,7 +51,7 @@ func NewPodStore(c clientset.Interface, namespace string, label labels.Selector, store := cache.NewStore(cache.MetaNamespaceKeyFunc) stopCh := make(chan struct{}) reflector := cache.NewReflector(lw, &v1.Pod{}, store, 0) - reflector.RunUntil(stopCh) + go reflector.Run(stopCh) return &PodStore{Store: store, stopCh: stopCh, Reflector: reflector} }