From 3c36d9e373d8e272ad303f359d040621edeb999e Mon Sep 17 00:00:00 2001 From: Kris Date: Tue, 13 Feb 2018 11:33:21 -0800 Subject: [PATCH] Add started state to the processor to protect against double starts This prevents a race condition where the sharedIndexInformer was causeing the processorListener's run and pop method to be started twice. That violated the SharedInformer's interface guarantee of sequential delivery and also caused panics on shutdown. --- .../client-go/tools/cache/shared_informer.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index f6ce07f7af3..8a669494e54 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -335,7 +335,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv s.blockDeltas.Lock() defer s.blockDeltas.Unlock() - s.processor.addAndStartListener(listener) + s.processor.addListener(listener) for _, item := range s.indexer.List() { listener.add(addNotification{newObj: item}) } @@ -373,6 +373,7 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { } type sharedProcessor struct { + listenersStarted bool listenersLock sync.RWMutex listeners []*processorListener syncingListeners []*processorListener @@ -380,20 +381,15 @@ type sharedProcessor struct { wg wait.Group } -func (p *sharedProcessor) addAndStartListener(listener *processorListener) { - p.listenersLock.Lock() - defer p.listenersLock.Unlock() - - p.addListenerLocked(listener) - p.wg.Start(listener.run) - p.wg.Start(listener.pop) -} - func (p *sharedProcessor) addListener(listener *processorListener) { p.listenersLock.Lock() defer p.listenersLock.Unlock() p.addListenerLocked(listener) + if p.listenersStarted { + p.wg.Start(listener.run) + p.wg.Start(listener.pop) + } } func (p *sharedProcessor) addListenerLocked(listener *processorListener) { @@ -424,6 +420,7 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) { p.wg.Start(listener.run) p.wg.Start(listener.pop) } + p.listenersStarted = true }() <-stopCh p.listenersLock.RLock()