From 3f0de10d86fb421b3313793fcce8d5d64a83dfb2 Mon Sep 17 00:00:00 2001 From: Kris Date: Tue, 13 Feb 2018 11:32:36 -0800 Subject: [PATCH 1/2] Add a test case for the race in #59822 Kubernetes-commit: be482ad51c21d493f4b83de9aa06c511a552b5b1 --- tools/cache/shared_informer_test.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tools/cache/shared_informer_test.go b/tools/cache/shared_informer_test.go index 6f49d167..22e4a90d 100644 --- a/tools/cache/shared_informer_test.go +++ b/tools/cache/shared_informer_test.go @@ -251,3 +251,15 @@ func TestResyncCheckPeriod(t *testing.T) { t.Errorf("expected %d, got %d", e, a) } } + +// verify that https://github.com/kubernetes/kubernetes/issues/59822 is fixed +func TestSharedInformerInitializationRace(t *testing.T) { + source := fcache.NewFakeControllerSource() + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + listener := newTestListener("raceListener", 0) + + stop := make(chan struct{}) + go informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod) + go informer.Run(stop) + close(stop) +} From 5c864f2fba9b1b8032edcac4d2e01644fcfbf494 Mon Sep 17 00:00:00 2001 From: Kris Date: Tue, 13 Feb 2018 11:33:21 -0800 Subject: [PATCH 2/2] Add started state to the processor to protect against double starts This prevents a race condition where the sharedIndexInformer was causeing the processorListener's run and pop method to be started twice. That violated the SharedInformer's interface guarantee of sequential delivery and also caused panics on shutdown. Kubernetes-commit: 3c36d9e373d8e272ad303f359d040621edeb999e --- tools/cache/shared_informer.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index f6ce07f7..8a669494 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -335,7 +335,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv s.blockDeltas.Lock() defer s.blockDeltas.Unlock() - s.processor.addAndStartListener(listener) + s.processor.addListener(listener) for _, item := range s.indexer.List() { listener.add(addNotification{newObj: item}) } @@ -373,6 +373,7 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { } type sharedProcessor struct { + listenersStarted bool listenersLock sync.RWMutex listeners []*processorListener syncingListeners []*processorListener @@ -380,20 +381,15 @@ type sharedProcessor struct { wg wait.Group } -func (p *sharedProcessor) addAndStartListener(listener *processorListener) { - p.listenersLock.Lock() - defer p.listenersLock.Unlock() - - p.addListenerLocked(listener) - p.wg.Start(listener.run) - p.wg.Start(listener.pop) -} - func (p *sharedProcessor) addListener(listener *processorListener) { p.listenersLock.Lock() defer p.listenersLock.Unlock() p.addListenerLocked(listener) + if p.listenersStarted { + p.wg.Start(listener.run) + p.wg.Start(listener.pop) + } } func (p *sharedProcessor) addListenerLocked(listener *processorListener) { @@ -424,6 +420,7 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) { p.wg.Start(listener.run) p.wg.Start(listener.pop) } + p.listenersStarted = true }() <-stopCh p.listenersLock.RLock()