mirror of
https://github.com/kubernetes/client-go.git
synced 2026-05-14 19:17:56 +00:00
Merge pull request #137296 from michaelasp/fixListenerDeadlock
Fix listener deadlock in shared informer Kubernetes-commit: 1879769b9354b6e47a13dac831aec6fe3e0368b6
This commit is contained in:
13
tools/cache/shared_informer.go
vendored
13
tools/cache/shared_informer.go
vendored
@@ -907,7 +907,8 @@ func (s *sharedIndexInformer) AddEventHandlerWithOptions(handler ResourceEventHa
|
||||
listener := newProcessListener(logger, handler, resyncPeriod, determineResyncPeriod(logger, resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSyncedChecker())
|
||||
|
||||
if !s.started {
|
||||
return s.processor.addListener(listener), nil
|
||||
handle, _ := s.processor.addListener(listener)
|
||||
return handle, nil
|
||||
}
|
||||
|
||||
// in order to safely join, we have to
|
||||
@@ -918,7 +919,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithOptions(handler ResourceEventHa
|
||||
s.blockDeltas.Lock()
|
||||
defer s.blockDeltas.Unlock()
|
||||
|
||||
handle := s.processor.addListener(listener)
|
||||
handle, started := s.processor.addListener(listener)
|
||||
for _, item := range s.indexer.List() {
|
||||
// Note that we enqueue these notifications with the lock held
|
||||
// and before returning the handle. That means there is never a
|
||||
@@ -932,7 +933,9 @@ func (s *sharedIndexInformer) AddEventHandlerWithOptions(handler ResourceEventHa
|
||||
}
|
||||
|
||||
// Initial list is added, now we can allow the listener to detect that "upstream has synced".
|
||||
s.processor.wg.Start(listener.watchSynced)
|
||||
if started {
|
||||
s.processor.wg.Start(listener.watchSynced)
|
||||
}
|
||||
|
||||
return handle, nil
|
||||
}
|
||||
@@ -1043,7 +1046,7 @@ func (p *sharedProcessor) getListener(registration ResourceEventHandlerRegistrat
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) addListener(listener *processorListener) ResourceEventHandlerRegistration {
|
||||
func (p *sharedProcessor) addListener(listener *processorListener) (ResourceEventHandlerRegistration, bool) {
|
||||
p.listenersLock.Lock()
|
||||
defer p.listenersLock.Unlock()
|
||||
|
||||
@@ -1060,7 +1063,7 @@ func (p *sharedProcessor) addListener(listener *processorListener) ResourceEvent
|
||||
p.wg.Start(listener.pop)
|
||||
}
|
||||
|
||||
return listener
|
||||
return listener, p.listenersStarted
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) removeListener(handle ResourceEventHandlerRegistration) error {
|
||||
|
||||
Reference in New Issue
Block a user