diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index ce86cb71..ce9d086d 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -156,7 +156,7 @@ type SharedInformer interface { // It returns a registration handle for the handler that can be used to remove // the handler again and an error if the handler cannot be added. AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) - // RemoveEventHandlerByRegistration removes a formerly added event handler given by + // RemoveEventHandler removes a formerly added event handler given by // its registration handle. // If, for some reason, the same handler has been added multiple // times, only the registration for the given registration handle @@ -166,7 +166,7 @@ type SharedInformer interface { // Calling Remove on an already removed handle returns no error // because the handler is finally (still) removed after calling this // method. - RemoveEventHandlerByRegistration(handle ResourceEventHandlerRegistration) error + RemoveEventHandler(handle ResourceEventHandlerRegistration) error // GetStore returns the informer's local cache as a Store. GetStore() Store // GetController is deprecated, it does nothing useful @@ -655,12 +655,12 @@ func (s *sharedIndexInformer) IsStopped() bool { return s.stopped } -// RemoveEventHandlerByRegistration tries to remove a formerly added event handler by its +// RemoveEventHandler tries to remove a formerly added event handler by its // registration handle returned for its registration. // If a handler has been added multiple times, only the actual registration for the // handler will be removed. Other registrations of the same handler will still be // active until they are explicitly removes, also. -func (s *sharedIndexInformer) RemoveEventHandlerByRegistration(handle ResourceEventHandlerRegistration) error { +func (s *sharedIndexInformer) RemoveEventHandler(handle ResourceEventHandlerRegistration) error { s.startedLock.Lock() defer s.startedLock.Unlock() @@ -669,7 +669,7 @@ func (s *sharedIndexInformer) RemoveEventHandlerByRegistration(handle ResourceEv } if s.stopped { - return fmt.Errorf("handler %v is not removed from shared informer because it has stopped already", handle.listener.handler) + return nil } // in order to safely remove, we have to @@ -743,15 +743,15 @@ func (p *sharedProcessor) removeListenerLocked(listener *processorListener) bool l := p.listeners[i] if l == listener { p.listeners = append(p.listeners[:i], p.listeners[i+1:]...) - i-- found = true + break } } for i := 0; i < len(p.syncingListeners); i++ { l := p.syncingListeners[i] if l == listener { p.syncingListeners = append(p.syncingListeners[:i], p.syncingListeners[i+1:]...) - i-- + break } } return found diff --git a/tools/cache/shared_informer_test.go b/tools/cache/shared_informer_test.go index 98eeb7a3..83173342 100644 --- a/tools/cache/shared_informer_test.go +++ b/tools/cache/shared_informer_test.go @@ -33,60 +33,32 @@ import ( ) type testListener struct { - lock sync.RWMutex + genericTestListener resyncPeriod time.Duration expectedItemNames sets.String receivedItemNames []string - name string } func newTestListener(name string, resyncPeriod time.Duration, expected ...string) *testListener { l := &testListener{ - resyncPeriod: resyncPeriod, - expectedItemNames: sets.NewString(expected...), - name: name, + genericTestListener: genericTestListener{name: name}, + resyncPeriod: resyncPeriod, + expectedItemNames: sets.NewString(expected...), } + l.genericTestListener.action = l.action return l } -func (l *testListener) OnAdd(obj interface{}) { - l.handle(obj) -} - -func (l *testListener) OnUpdate(old, new interface{}) { - l.handle(new) -} - -func (l *testListener) OnDelete(obj interface{}) { -} - -func (l *testListener) handle(obj interface{}) { +func (l *testListener) action(obj interface{}) { key, _ := MetaNamespaceKeyFunc(obj) fmt.Printf("%s: handle: %v\n", l.name, key) - l.lock.Lock() - defer l.lock.Unlock() objectMeta, _ := meta.Accessor(obj) l.receivedItemNames = append(l.receivedItemNames, objectMeta.GetName()) } func (l *testListener) ok() bool { - fmt.Println("polling") - err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) { - if l.satisfiedExpectations() { - return true, nil - } - return false, nil - }) - if err != nil { - return false - } - - // wait just a bit to allow any unexpected stragglers to come in - fmt.Println("sleeping") - time.Sleep(1 * time.Second) - fmt.Println("final check") - return l.satisfiedExpectations() + return l.genericTestListener.ok(l.satisfiedExpectations) } func (l *testListener) satisfiedExpectations() bool { @@ -433,14 +405,14 @@ func TestSharedInformerRemoveHandler(t *testing.T) { t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer)) } - if err := informer.RemoveEventHandlerByRegistration(handle2); err != nil { + if err := informer.RemoveEventHandler(handle2); err != nil { t.Errorf("removing of second pointer handler failed: %s", err) } if eventHandlerCount(informer) != 1 { t.Errorf("after removing handler informer has %d registered handler(s), instead of 1", eventHandlerCount(informer)) } - if err := informer.RemoveEventHandlerByRegistration(handle1); err != nil { + if err := informer.RemoveEventHandler(handle1); err != nil { t.Errorf("removing of first pointer handler failed: %s", err) } if eventHandlerCount(informer) != 0 { @@ -486,7 +458,7 @@ func TestSharedInformerRemoveForeignHandler(t *testing.T) { if isRegistered(informer2, handle2) { t.Errorf("handle2 registered for informer2") } - if err := informer2.RemoveEventHandlerByRegistration(handle1); err != nil { + if err := informer2.RemoveEventHandler(handle1); err != nil { t.Errorf("removing of second pointer handler failed: %s", err) } if eventHandlerCount(informer) != 2 { @@ -510,7 +482,7 @@ func TestSharedInformerRemoveForeignHandler(t *testing.T) { if isRegistered(informer, empty) { t.Errorf("empty registered for informer") } - if err := informer2.RemoveEventHandlerByRegistration(empty); err != nil { + if err := informer2.RemoveEventHandler(empty); err != nil { t.Errorf("removing of empty handle failed: %s", err) } if eventHandlerCount(informer) != 2 { @@ -526,14 +498,14 @@ func TestSharedInformerRemoveForeignHandler(t *testing.T) { t.Errorf("handle2 not registered anymore for informer") } - if err := informer.RemoveEventHandlerByRegistration(handle2); err != nil { + if err := informer.RemoveEventHandler(handle2); err != nil { t.Errorf("removing of second pointer handler failed: %s", err) } if eventHandlerCount(informer) != 1 { t.Errorf("after removing handler informer has %d registered handler(s), instead of 1", eventHandlerCount(informer)) } - if err := informer.RemoveEventHandlerByRegistration(handle1); err != nil { + if err := informer.RemoveEventHandler(handle1); err != nil { t.Errorf("removing of first pointer handler failed: %s", err) } if eventHandlerCount(informer) != 0 { @@ -564,14 +536,14 @@ func TestSharedInformerRemoveNonComparableHandler(t *testing.T) { t.Errorf("informer has %d registered handler(s), instead of 2", eventHandlerCount(informer)) } - if err := informer.RemoveEventHandlerByRegistration(handle2); err != nil { + if err := informer.RemoveEventHandler(handle2); err != nil { t.Errorf("removing of pointer handler failed: %s", err) } if eventHandlerCount(informer) != 1 { t.Errorf("after removal informer has %d registered handler(s), instead of 1", eventHandlerCount(informer)) } - if err := informer.RemoveEventHandlerByRegistration(handle1); err != nil { + if err := informer.RemoveEventHandler(handle1); err != nil { t.Errorf("removing of non-pointer handler failed: %s", err) } if eventHandlerCount(informer) != 0 { @@ -609,10 +581,10 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { } if eventHandlerCount(informer) != 2 { - t.Errorf("informer has %d registered handler(s), instead of 1", eventHandlerCount(informer)) + t.Errorf("informer has %d registered handler(s), instead of 2", eventHandlerCount(informer)) } - if err := informer.RemoveEventHandlerByRegistration(reg1); err != nil { + if err := informer.RemoveEventHandler(reg1); err != nil { t.Errorf("removing of duplicate handler registration failed: %s", err) } @@ -633,7 +605,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { } } - if err := informer.RemoveEventHandlerByRegistration(reg2); err != nil { + if err := informer.RemoveEventHandler(reg2); err != nil { t.Errorf("removing of second handler registration failed: %s", err) } @@ -659,7 +631,7 @@ func TestRemovingRemovedSharedInformer(t *testing.T) { t.Errorf("informer did not add handler for the first time: %s", err) return } - if err := informer.RemoveEventHandlerByRegistration(reg); err != nil { + if err := informer.RemoveEventHandler(reg); err != nil { t.Errorf("removing of handler registration failed: %s", err) return } @@ -667,7 +639,7 @@ func TestRemovingRemovedSharedInformer(t *testing.T) { t.Errorf("handle is still active after successful remove") return } - if err := informer.RemoveEventHandlerByRegistration(reg); err != nil { + if err := informer.RemoveEventHandler(reg); err != nil { t.Errorf("removing of already removed registration yields unexpected error: %s", err) } if isRegistered(informer, reg) { @@ -734,9 +706,7 @@ func TestAddOnStoppedSharedInformer(t *testing.T) { stop := make(chan struct{}) go informer.Run(stop) close(stop) - fmt.Println("sleeping") - time.Sleep(1 * time.Second) - if !informer.IsStopped() { + if !checkCondition(informer.IsStopped) { t.Errorf("informer reports not to be stopped although stop channel closed") return } @@ -773,13 +743,148 @@ func TestRemoveOnStoppedSharedInformer(t *testing.T) { t.Errorf("informer reports not to be stopped although stop channel closed") return } - err = informer.RemoveEventHandlerByRegistration(handle) - if err == nil { - t.Errorf("informer removes handler on stopped informer") - return - } - if !strings.HasSuffix(err.Error(), "is not removed from shared informer because it has stopped already") { - t.Errorf("removing handler from stopped informer yield unexpected error: %s", err) + err = informer.RemoveEventHandler(handle) + if err != nil { + t.Errorf("informer does not remove handler on stopped informer") return } } + +func TestRemoveWhileActive(t *testing.T) { + // source simulates an apiserver object endpoint. + source := fcache.NewFakeControllerSource() + + // create the shared informer and resync every 12 hours + informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer) + + listener := &genericTestListener{name: "listener"} + handle, _ := informer.AddEventHandler(listener) + listener.action = func(obj interface{}) { + informer.RemoveEventHandler(handle) + } + + stop := make(chan struct{}) + defer close(stop) + + go informer.Run(stop) + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + if !listener.ok(func() bool { + return listener.getCount() == 1 + }) { + t.Errorf("event did not occur") + return + } + + if isRegistered(informer, handle) { + t.Errorf("handle is still active after successful remove") + return + } +} + +func TestAddWhileActive(t *testing.T) { + // source simulates an apiserver object endpoint. + source := fcache.NewFakeControllerSource() + + // create the shared informer and resync every 12 hours + informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer) + + listener1 := &genericTestListener{name: "listener1"} + listener2 := &genericTestListener{name: "listener2"} + + handle1, _ := informer.AddEventHandler(listener1) + var handle2 ResourceEventHandlerRegistration + listener1.action = func(obj interface{}) { + listener1.action = nil + handle2, _ = informer.AddEventHandler(listener2) + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}}) + } + + stop := make(chan struct{}) + defer close(stop) + + go informer.Run(stop) + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + if !listener1.ok(func() bool { + return listener1.getCount() == 2 + }) { + t.Errorf("events on listener1 did not occur") + return + } + + if !listener2.ok(func() bool { + return listener1.getCount() == 2 + }) { + t.Errorf("event on listener2 did not occur") + return + } + + if !isRegistered(informer, handle1) { + t.Errorf("handle1 is not active") + return + } + if !isRegistered(informer, handle2) { + t.Errorf("handle2 is not active") + return + } +} + +//////////////////////////////////////////////////////////////////////////////// + +type genericTestListener struct { + lock sync.RWMutex + name string + count int + action func(obj interface{}) +} + +func (l *genericTestListener) OnAdd(obj interface{}) { + l.handle(obj) +} + +func (l *genericTestListener) OnUpdate(old, new interface{}) { + l.handle(new) +} + +func (l *genericTestListener) OnDelete(obj interface{}) { +} + +func (l *genericTestListener) handle(obj interface{}) { + l.lock.Lock() + defer l.lock.Unlock() + l.count++ + if l.action != nil { + l.action(obj) + } + fmt.Printf("%s: got event\n", l.name) +} + +func (l *genericTestListener) getCount() int { + l.lock.RLock() + defer l.lock.RUnlock() + return l.count +} + +func (l *genericTestListener) ok(check func() bool) bool { + return checkCondition(check) +} + +func checkCondition(check func() bool) bool { + fmt.Println("polling") + err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) { + if check() { + return true, nil + } + return false, nil + }) + if err != nil { + return false + } + + // wait just a bit to allow any unexpected stragglers to come in + fmt.Println("sleeping") + time.Sleep(1 * time.Second) + fmt.Println("final check") + return check() +}