Merge pull request #46094 from atlassian/sync-informer-run

Automatic merge from submit-queue (batch tested with PRs 46094, 48544, 48807, 49102, 44174)

Shared Informer Run blocks until all goroutines finish

**What this PR does / why we need it**:
Makes Shared Informer Run method block until all goroutines it spawned finish. See #45454.

**Which issue this PR fixes**
Fixes #45454

**Release note**:
```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-07-18 11:20:48 -07:00 committed by GitHub
commit ac742fa9f2
9 changed files with 77 additions and 41 deletions

View File

@ -22,6 +22,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api"
@ -43,5 +44,6 @@ func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
}
cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0).Run()
r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
go r.Run(wait.NeverStop)
}

View File

@ -392,7 +392,8 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
if kubeDeps.KubeClient != nil {
serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.Core().RESTClient(), "services", metav1.NamespaceAll, fields.Everything())
cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0).Run()
r := cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0)
go r.Run(wait.NeverStop)
}
serviceLister := corelisters.NewServiceLister(serviceIndexer)
@ -400,7 +401,8 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
if kubeDeps.KubeClient != nil {
fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()
nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.Core().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0).Run()
r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0)
go r.Run(wait.NeverStop)
}
nodeInfo := &predicates.CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)}

View File

@ -17,8 +17,10 @@ limitations under the License.
package wait
import (
"context"
"errors"
"math/rand"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/runtime"
@ -36,6 +38,40 @@ var ForeverTestTimeout = time.Second * 30
// NeverStop may be passed to Until to make it never stop.
var NeverStop <-chan struct{} = make(chan struct{})
// Group allows to start a group of goroutines and wait for their completion.
type Group struct {
wg sync.WaitGroup
}
func (g *Group) Wait() {
g.wg.Wait()
}
// StartWithChannel starts f in a new goroutine in the group.
// stopCh is passed to f as an argument. f should stop when stopCh is available.
func (g *Group) StartWithChannel(stopCh <-chan struct{}, f func(stopCh <-chan struct{})) {
g.Start(func() {
f(stopCh)
})
}
// StartWithContext starts f in a new goroutine in the group.
// ctx is passed to f as an argument. f should stop when ctx.Done() is available.
func (g *Group) StartWithContext(ctx context.Context, f func(context.Context)) {
g.Start(func() {
f(ctx)
})
}
// Start starts f in a new goroutine in the group.
func (g *Group) Start(f func()) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
f()
}()
}
// Forever calls f every period for ever.
//
// Forever is syntactic sugar on top of Until.

View File

@ -116,7 +116,10 @@ func (c *controller) Run(stopCh <-chan struct{}) {
c.reflector = r
c.reflectorMutex.Unlock()
r.RunUntil(stopCh)
var wg wait.Group
defer wg.Wait()
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
}

View File

@ -79,17 +79,15 @@ type cacheObj struct {
func (d *defaultCacheMutationDetector) Run(stopCh <-chan struct{}) {
// we DON'T want protection from panics. If we're running this code, we want to die
go func() {
for {
d.CompareObjects()
for {
d.CompareObjects()
select {
case <-stopCh:
return
case <-time.After(d.period):
}
select {
case <-stopCh:
return
case <-time.After(d.period):
}
}()
}
}
// AddObject makes a deep copy of the object for later comparison. It only works on runtime.Object

View File

@ -182,21 +182,10 @@ func extractStackCreator() (string, int, bool) {
}
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run starts a goroutine and returns immediately.
func (r *Reflector) Run() {
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
go wait.Until(func() {
if err := r.ListAndWatch(wait.NeverStop); err != nil {
utilruntime.HandleError(err)
}
}, r.period, wait.NeverStop)
}
// RunUntil starts a watch and handles watch events. Will restart the watch if it is closed.
// RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed.
func (r *Reflector) RunUntil(stopCh <-chan struct{}) {
glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
go wait.Until(func() {
wait.Until(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}

View File

@ -83,7 +83,7 @@ func TestRunUntil(t *testing.T) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
},
}
r.RunUntil(stopCh)
go r.Run(stopCh)
// Synchronously add a dummy pod into the watch channel so we
// know the RunUntil go routine is in the watch handler.
fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}})

View File

@ -147,6 +147,7 @@ type sharedIndexInformer struct {
// stopCh is the channel used to stop the main Run process. We have to track it so that
// late joiners can have a proper stop
stopCh <-chan struct{}
wg wait.Group
}
// dummyController hides the fact that a SharedInformer is different from a dedicated one
@ -204,12 +205,14 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.stopCh = stopCh
s.started = true
}()
s.stopCh = stopCh
s.cacheMutationDetector.Run(stopCh)
s.processor.run(stopCh)
defer s.wg.Wait()
s.wg.StartWithChannel(stopCh, s.cacheMutationDetector.Run)
s.wg.StartWithChannel(stopCh, s.processor.run)
s.controller.Run(stopCh)
}
@ -324,8 +327,8 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
s.processor.addListener(listener)
go listener.run(s.stopCh)
go listener.pop(s.stopCh)
s.wg.StartWithChannel(s.stopCh, listener.run)
s.wg.StartWithChannel(s.stopCh, listener.pop)
items := s.indexer.List()
for i := range items {
@ -395,13 +398,16 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
}
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
go listener.run(stopCh)
go listener.pop(stopCh)
}
var wg wait.Group
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
wg.StartWithChannel(stopCh, listener.run)
wg.StartWithChannel(stopCh, listener.pop)
}
}()
wg.Wait()
}
// shouldResync queries every listener to determine if any of them need a resync, based on each

View File

@ -51,7 +51,7 @@ func NewPodStore(c clientset.Interface, namespace string, label labels.Selector,
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
stopCh := make(chan struct{})
reflector := cache.NewReflector(lw, &v1.Pod{}, store, 0)
reflector.RunUntil(stopCh)
go reflector.Run(stopCh)
return &PodStore{Store: store, stopCh: stopCh, Reflector: reflector}
}