mirror of
https://github.com/kubernetes/client-go.git
synced 2025-08-30 13:23:28 +00:00
Add started state to the processor to protect against double starts
This prevents a race condition where the sharedIndexInformer was causeing the processorListener's run and pop method to be started twice. That violated the SharedInformer's interface guarantee of sequential delivery and also caused panics on shutdown. Kubernetes-commit: 3c36d9e373d8e272ad303f359d040621edeb999e
This commit is contained in:
parent
3f0de10d86
commit
5c864f2fba
17
tools/cache/shared_informer.go
vendored
17
tools/cache/shared_informer.go
vendored
@ -335,7 +335,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
|
||||
s.blockDeltas.Lock()
|
||||
defer s.blockDeltas.Unlock()
|
||||
|
||||
s.processor.addAndStartListener(listener)
|
||||
s.processor.addListener(listener)
|
||||
for _, item := range s.indexer.List() {
|
||||
listener.add(addNotification{newObj: item})
|
||||
}
|
||||
@ -373,6 +373,7 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
|
||||
}
|
||||
|
||||
type sharedProcessor struct {
|
||||
listenersStarted bool
|
||||
listenersLock sync.RWMutex
|
||||
listeners []*processorListener
|
||||
syncingListeners []*processorListener
|
||||
@ -380,20 +381,15 @@ type sharedProcessor struct {
|
||||
wg wait.Group
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) addAndStartListener(listener *processorListener) {
|
||||
p.listenersLock.Lock()
|
||||
defer p.listenersLock.Unlock()
|
||||
|
||||
p.addListenerLocked(listener)
|
||||
p.wg.Start(listener.run)
|
||||
p.wg.Start(listener.pop)
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) addListener(listener *processorListener) {
|
||||
p.listenersLock.Lock()
|
||||
defer p.listenersLock.Unlock()
|
||||
|
||||
p.addListenerLocked(listener)
|
||||
if p.listenersStarted {
|
||||
p.wg.Start(listener.run)
|
||||
p.wg.Start(listener.pop)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
|
||||
@ -424,6 +420,7 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) {
|
||||
p.wg.Start(listener.run)
|
||||
p.wg.Start(listener.pop)
|
||||
}
|
||||
p.listenersStarted = true
|
||||
}()
|
||||
<-stopCh
|
||||
p.listenersLock.RLock()
|
||||
|
Loading…
Reference in New Issue
Block a user