diff --git a/pkg/controller/framework/shared_informer.go b/pkg/controller/framework/shared_informer.go index df540da1b38..43b749f9c2d 100644 --- a/pkg/controller/framework/shared_informer.go +++ b/pkg/controller/framework/shared_informer.go @@ -88,6 +88,10 @@ type sharedIndexInformer struct { started bool startedLock sync.Mutex + + // blockDeltas gives a way to stop all event distribution so that a late event handler + // can safely join the shared informer. + blockDeltas sync.Mutex } // dummyController hides the fact that a SharedInformer is different from a dedicated one @@ -199,16 +203,35 @@ func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) erro s.startedLock.Lock() defer s.startedLock.Unlock() - if s.started { - return fmt.Errorf("informer has already started") + if !s.started { + listener := newProcessListener(handler) + s.processor.listeners = append(s.processor.listeners, listener) + return nil } + // in order to safely join, we have to + // 1. stop sending add/update/delete notifications + // 2. do a list against the store + // 3. send synthetic "Add" events to the new handler + // 4. unblock + s.blockDeltas.Lock() + defer s.blockDeltas.Unlock() + listener := newProcessListener(handler) s.processor.listeners = append(s.processor.listeners, listener) + + items := s.indexer.List() + for i := range items { + listener.add(addNotification{newObj: items[i]}) + } + return nil } func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { + s.blockDeltas.Lock() + defer s.blockDeltas.Unlock() + // from oldest to newest for _, d := range obj.(cache.Deltas) { switch d.Type {