diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index f6ce07f7af3..8a669494e54 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -335,7 +335,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv s.blockDeltas.Lock() defer s.blockDeltas.Unlock() - s.processor.addAndStartListener(listener) + s.processor.addListener(listener) for _, item := range s.indexer.List() { listener.add(addNotification{newObj: item}) } @@ -373,6 +373,7 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { } type sharedProcessor struct { + listenersStarted bool listenersLock sync.RWMutex listeners []*processorListener syncingListeners []*processorListener @@ -380,20 +381,15 @@ type sharedProcessor struct { wg wait.Group } -func (p *sharedProcessor) addAndStartListener(listener *processorListener) { - p.listenersLock.Lock() - defer p.listenersLock.Unlock() - - p.addListenerLocked(listener) - p.wg.Start(listener.run) - p.wg.Start(listener.pop) -} - func (p *sharedProcessor) addListener(listener *processorListener) { p.listenersLock.Lock() defer p.listenersLock.Unlock() p.addListenerLocked(listener) + if p.listenersStarted { + p.wg.Start(listener.run) + p.wg.Start(listener.pop) + } } func (p *sharedProcessor) addListenerLocked(listener *processorListener) { @@ -424,6 +420,7 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) { p.wg.Start(listener.run) p.wg.Start(listener.pop) } + p.listenersStarted = true }() <-stopCh p.listenersLock.RLock() diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go index 6f49d167a33..22e4a90ddd7 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go @@ -251,3 +251,15 @@ func TestResyncCheckPeriod(t *testing.T) { t.Errorf("expected %d, got %d", e, a) } } + +// verify that https://github.com/kubernetes/kubernetes/issues/59822 is fixed +func TestSharedInformerInitializationRace(t *testing.T) { + source := fcache.NewFakeControllerSource() + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + listener := newTestListener("raceListener", 0) + + stop := make(chan struct{}) + go informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod) + go informer.Run(stop) + close(stop) +}