Merge pull request #28379 from deads2k/allow-late-joins

Automatic merge from submit-queue

allow handler to join after the informer has started

This allows an event handler to join after a SharedInformer has started.  It can't add any indexes, but it can add its reaction functions.

This works by 
 1. stopping the flow of events from the reflector (thus stopping updates to our store)
 1. registering the new handler
 1. sending synthetic "add" events to the new handler only
 1. unblocking the flow of events

It would be possible to 
 1. block
 1. list
 1. add recorder
 1. unblock
 1. play list to as-yet unregistered handler
 1. block
 1. remove recorder
 1. play recording
 1. add new handler
 1. unblock

But that is considerably more complicated.  I'd rather not start there since this ought to be the exception rather than the rule.

@wojtek-t who requested this power in the initial review
@smarterclayton @liggitt I think this resolves our all-in-one ordering problem.
@hongchaodeng since this came up on the call
This commit is contained in:
k8s-merge-robot 2016-07-05 06:49:10 -07:00 committed by GitHub
commit 4ee877c226

View File

@ -88,6 +88,10 @@ type sharedIndexInformer struct {
started bool
startedLock sync.Mutex
// blockDeltas gives a way to stop all event distribution so that a late event handler
// can safely join the shared informer.
blockDeltas sync.Mutex
}
// dummyController hides the fact that a SharedInformer is different from a dedicated one
@ -199,16 +203,35 @@ func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) erro
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.started {
return fmt.Errorf("informer has already started")
if !s.started {
listener := newProcessListener(handler)
s.processor.listeners = append(s.processor.listeners, listener)
return nil
}
// in order to safely join, we have to
// 1. stop sending add/update/delete notifications
// 2. do a list against the store
// 3. send synthetic "Add" events to the new handler
// 4. unblock
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
listener := newProcessListener(handler)
s.processor.listeners = append(s.processor.listeners, listener)
items := s.indexer.List()
for i := range items {
listener.add(addNotification{newObj: items[i]})
}
return nil
}
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(cache.Deltas) {
switch d.Type {