diff --git a/pkg/controller/replication/conversion.go b/pkg/controller/replication/conversion.go index 57eb2555468..2e71fc0a6ff 100644 --- a/pkg/controller/replication/conversion.go +++ b/pkg/controller/replication/conversion.go @@ -29,7 +29,7 @@ import ( apps "k8s.io/api/apps/v1" autoscalingv1 "k8s.io/api/autoscaling/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" @@ -69,12 +69,12 @@ type conversionInformer struct { cache.SharedIndexInformer } -func (i conversionInformer) AddEventHandler(handler cache.ResourceEventHandler) { - i.SharedIndexInformer.AddEventHandler(conversionEventHandler{handler}) +func (i conversionInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) { + return i.SharedIndexInformer.AddEventHandler(conversionEventHandler{handler}) } -func (i conversionInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) { - i.SharedIndexInformer.AddEventHandlerWithResyncPeriod(conversionEventHandler{handler}, resyncPeriod) +func (i conversionInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) (cache.ResourceEventHandlerRegistration, error) { + return i.SharedIndexInformer.AddEventHandlerWithResyncPeriod(conversionEventHandler{handler}, resyncPeriod) } type conversionLister struct { diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index 9f42782d179..f23d55f1e4d 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -135,7 +135,9 @@ type SharedInformer interface { // AddEventHandler adds an event handler to the shared informer using the shared informer's resync // period. Events to a single handler are delivered sequentially, but there is no coordination // between different handlers. - AddEventHandler(handler ResourceEventHandler) + // It returns a registration handle for the handler that can be used to remove + // the handler again. + AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) // AddEventHandlerWithResyncPeriod adds an event handler to the // shared informer with the requested resync period; zero means // this handler does not care about resyncs. The resync operation @@ -150,7 +152,13 @@ type SharedInformer interface { // between any two resyncs may be longer than the nominal period // because the implementation takes time to do work and there may // be competing load and scheduling noise. - AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) + // It returns a registration handle for the handler that can be used to remove + // the handler again and an error if the handler cannot be added. + AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) + // RemoveEventHandler removes a formerly added event handler given by + // its registration handle. + // This function is guaranteed to be idempotent, and thread-safe. + RemoveEventHandler(handle ResourceEventHandlerRegistration) error // GetStore returns the informer's local cache as a Store. GetStore() Store // GetController is deprecated, it does nothing useful @@ -195,8 +203,18 @@ type SharedInformer interface { // transform before mutating it at all and returning the copy to prevent // data races. SetTransform(handler TransformFunc) error + + // IsStopped reports whether the informer has already been stopped. + // Adding event handlers to already stopped informers is not possible. + // An informer already stopped will never be started again. + IsStopped() bool } +// Opaque interface representing the registration of ResourceEventHandler for +// a SharedInformer. Must be supplied back to the same SharedInformer's +// `RemoveEventHandler` to unregister the handlers. +type ResourceEventHandlerRegistration interface{} + // SharedIndexInformer provides add and get Indexers ability based on SharedInformer. type SharedIndexInformer interface { SharedInformer @@ -492,8 +510,8 @@ func (s *sharedIndexInformer) GetController() Controller { return &dummyController{informer: s} } -func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) { - s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) +func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) { + return s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) } func determineResyncPeriod(desired, check time.Duration) time.Duration { @@ -513,13 +531,12 @@ func determineResyncPeriod(desired, check time.Duration) time.Duration { const minimumResyncPeriod = 1 * time.Second -func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) { +func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) { s.startedLock.Lock() defer s.startedLock.Unlock() if s.stopped { - klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler) - return + return nil, fmt.Errorf("handler %v was not added to shared informer because it has stopped already", handler) } if resyncPeriod > 0 { @@ -545,8 +562,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize) if !s.started { - s.processor.addListener(listener) - return + return s.processor.addListener(listener), nil } // in order to safely join, we have to @@ -557,10 +573,11 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv s.blockDeltas.Lock() defer s.blockDeltas.Unlock() - s.processor.addListener(listener) + handle := s.processor.addListener(listener) for _, item := range s.indexer.List() { listener.add(addNotification{newObj: item}) } + return handle, nil } func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { @@ -610,6 +627,26 @@ func (s *sharedIndexInformer) OnDelete(old interface{}) { s.processor.distribute(deleteNotification{oldObj: old}, false) } +// IsStopped reports whether the informer has already been stopped +func (s *sharedIndexInformer) IsStopped() bool { + s.startedLock.Lock() + defer s.startedLock.Unlock() + return s.stopped +} + +func (s *sharedIndexInformer) RemoveEventHandler(handle ResourceEventHandlerRegistration) error { + s.startedLock.Lock() + defer s.startedLock.Unlock() + + // in order to safely remove, we have to + // 1. stop sending add/update/delete notifications + // 2. remove and stop listener + // 3. unblock + s.blockDeltas.Lock() + defer s.blockDeltas.Unlock() + return s.processor.removeListener(handle) +} + // sharedProcessor has a collection of processorListener and can // distribute a notification object to its listeners. There are two // kinds of distribute operations. The sync distributions go to a @@ -619,39 +656,85 @@ func (s *sharedIndexInformer) OnDelete(old interface{}) { type sharedProcessor struct { listenersStarted bool listenersLock sync.RWMutex - listeners []*processorListener - syncingListeners []*processorListener - clock clock.Clock - wg wait.Group + // Map from listeners to whether or not they are currently syncing + listeners map[*processorListener]bool + clock clock.Clock + wg wait.Group } -func (p *sharedProcessor) addListener(listener *processorListener) { +func (p *sharedProcessor) getListener(registration ResourceEventHandlerRegistration) *processorListener { + p.listenersLock.RLock() + defer p.listenersLock.RUnlock() + + if p.listeners == nil { + return nil + } + + if result, ok := registration.(*processorListener); ok { + if _, exists := p.listeners[result]; exists { + return result + } + } + + return nil +} + +func (p *sharedProcessor) addListener(listener *processorListener) ResourceEventHandlerRegistration { p.listenersLock.Lock() defer p.listenersLock.Unlock() - p.addListenerLocked(listener) + if p.listeners == nil { + p.listeners = make(map[*processorListener]bool) + } + + p.listeners[listener] = true + if p.listenersStarted { p.wg.Start(listener.run) p.wg.Start(listener.pop) } + + return listener } -func (p *sharedProcessor) addListenerLocked(listener *processorListener) { - p.listeners = append(p.listeners, listener) - p.syncingListeners = append(p.syncingListeners, listener) +func (p *sharedProcessor) removeListener(handle ResourceEventHandlerRegistration) error { + p.listenersLock.Lock() + defer p.listenersLock.Unlock() + + listener, ok := handle.(*processorListener) + if !ok { + return fmt.Errorf("invalid key type %t", handle) + } else if p.listeners == nil { + // No listeners are registered, do nothing + return nil + } else if _, exists := p.listeners[listener]; !exists { + // Listener is not registered, just do nothing + return nil + } + + delete(p.listeners, listener) + + if p.listenersStarted { + close(listener.addCh) + } + + return nil } func (p *sharedProcessor) distribute(obj interface{}, sync bool) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() - if sync { - for _, listener := range p.syncingListeners { + for listener, isSyncing := range p.listeners { + switch { + case !sync: + // non-sync messages are delivered to every listener listener.add(obj) - } - } else { - for _, listener := range p.listeners { + case isSyncing: + // sync messages are delivered to every syncing listenter listener.add(obj) + default: + // skipping a sync obj for a non-syncing listener } } } @@ -660,18 +743,27 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) { func() { p.listenersLock.RLock() defer p.listenersLock.RUnlock() - for _, listener := range p.listeners { + for listener := range p.listeners { p.wg.Start(listener.run) p.wg.Start(listener.pop) } p.listenersStarted = true }() <-stopCh - p.listenersLock.RLock() - defer p.listenersLock.RUnlock() - for _, listener := range p.listeners { + + p.listenersLock.Lock() + defer p.listenersLock.Unlock() + for listener := range p.listeners { close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop } + + // Wipe out list of listeners since they are now closed + // (processorListener cannot be re-used) + p.listeners = nil + + // Reset to false since no listeners are running + p.listenersStarted = false + p.wg.Wait() // Wait for all .pop() and .run() to stop } @@ -681,16 +773,16 @@ func (p *sharedProcessor) shouldResync() bool { p.listenersLock.Lock() defer p.listenersLock.Unlock() - p.syncingListeners = []*processorListener{} - resyncNeeded := false now := p.clock.Now() - for _, listener := range p.listeners { + for listener := range p.listeners { // need to loop through all the listeners to see if they need to resync so we can prepare any // listeners that are going to be resyncing. - if listener.shouldResync(now) { + shouldResync := listener.shouldResync(now) + p.listeners[listener] = shouldResync + + if shouldResync { resyncNeeded = true - p.syncingListeners = append(p.syncingListeners, listener) listener.determineNextResync(now) } } @@ -701,8 +793,9 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Durati p.listenersLock.RLock() defer p.listenersLock.RUnlock() - for _, listener := range p.listeners { - resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod) + for listener := range p.listeners { + resyncPeriod := determineResyncPeriod( + listener.requestedResyncPeriod, resyncCheckPeriod) listener.setResyncPeriod(resyncPeriod) } } diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go index 3f2ac71bfe4..2676e8f54c7 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go @@ -17,7 +17,10 @@ limitations under the License. package cache import ( + "context" "fmt" + "math/rand" + "strconv" "strings" "sync" "testing" @@ -96,6 +99,25 @@ func (l *testListener) satisfiedExpectations() bool { return sets.NewString(l.receivedItemNames...).Equal(l.expectedItemNames) } +func eventHandlerCount(i SharedInformer) int { + s := i.(*sharedIndexInformer) + s.startedLock.Lock() + defer s.startedLock.Unlock() + return len(s.processor.listeners) +} + +func isStarted(i SharedInformer) bool { + s := i.(*sharedIndexInformer) + s.startedLock.Lock() + defer s.startedLock.Unlock() + return s.started +} + +func isRegistered(i SharedInformer, h ResourceEventHandlerRegistration) bool { + s := i.(*sharedIndexInformer) + return s.processor.getListener(h) != nil +} + func TestListenerResyncPeriods(t *testing.T) { // source simulates an apiserver object endpoint. source := fcache.NewFakeControllerSource() @@ -189,6 +211,7 @@ func TestResyncCheckPeriod(t *testing.T) { // create the shared informer and resync every 12 hours informer := NewSharedInformer(source, &v1.Pod{}, 12*time.Hour).(*sharedIndexInformer) + gl := informer.processor.getListener clock := testingclock.NewFakeClock(time.Now()) informer.clock = clock @@ -196,59 +219,60 @@ func TestResyncCheckPeriod(t *testing.T) { // listener 1, never resync listener1 := newTestListener("listener1", 0) - informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod) + handler1, _ := informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod) + if e, a := 12*time.Hour, informer.resyncCheckPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a { + if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } // listener 2, resync every minute listener2 := newTestListener("listener2", 1*time.Minute) - informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod) + handler2, _ := informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod) if e, a := 1*time.Minute, informer.resyncCheckPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a { + if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := 1*time.Minute, informer.processor.listeners[1].resyncPeriod; e != a { + if e, a := 1*time.Minute, gl(handler2).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } // listener 3, resync every 55 seconds listener3 := newTestListener("listener3", 55*time.Second) - informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod) + handler3, _ := informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod) if e, a := 55*time.Second, informer.resyncCheckPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a { + if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := 1*time.Minute, informer.processor.listeners[1].resyncPeriod; e != a { + if e, a := 1*time.Minute, gl(handler2).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := 55*time.Second, informer.processor.listeners[2].resyncPeriod; e != a { + if e, a := 55*time.Second, gl(handler3).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } // listener 4, resync every 5 seconds listener4 := newTestListener("listener4", 5*time.Second) - informer.AddEventHandlerWithResyncPeriod(listener4, listener4.resyncPeriod) + handler4, _ := informer.AddEventHandlerWithResyncPeriod(listener4, listener4.resyncPeriod) if e, a := 5*time.Second, informer.resyncCheckPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a { + if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := 1*time.Minute, informer.processor.listeners[1].resyncPeriod; e != a { + if e, a := 1*time.Minute, gl(handler2).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := 55*time.Second, informer.processor.listeners[2].resyncPeriod; e != a { + if e, a := 55*time.Second, gl(handler3).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := 5*time.Second, informer.processor.listeners[3].resyncPeriod; e != a { + if e, a := 5*time.Second, gl(handler4).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } } @@ -390,3 +414,528 @@ func TestSharedInformerTransformer(t *testing.T) { t.Errorf("%s: expected %v, got %v", listenerTransformer.name, listenerTransformer.expectedItemNames, listenerTransformer.receivedItemNames) } } + +func TestSharedInformerRemoveHandler(t *testing.T) { + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) + + handler1 := &ResourceEventHandlerFuncs{} + handle1, err := informer.AddEventHandler(handler1) + if err != nil { + t.Errorf("informer did not add handler1: %s", err) + return + } + handler2 := &ResourceEventHandlerFuncs{} + handle2, err := informer.AddEventHandler(handler2) + if err != nil { + t.Errorf("informer did not add handler2: %s", err) + return + } + + if eventHandlerCount(informer) != 2 { + t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer)) + } + + if err := informer.RemoveEventHandler(handle2); err != nil { + t.Errorf("removing of second pointer handler failed: %s", err) + } + if eventHandlerCount(informer) != 1 { + t.Errorf("after removing handler informer has %d registered handler(s), instead of 1", eventHandlerCount(informer)) + } + + if err := informer.RemoveEventHandler(handle1); err != nil { + t.Errorf("removing of first pointer handler failed: %s", err) + } + if eventHandlerCount(informer) != 0 { + t.Errorf("informer still has registered handlers after removing both handlers") + } +} + +func TestSharedInformerRemoveForeignHandler(t *testing.T) { + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + + source2 := fcache.NewFakeControllerSource() + source2.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + informer2 := NewSharedInformer(source2, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + + handler1 := &ResourceEventHandlerFuncs{} + handle1, err := informer.AddEventHandler(handler1) + if err != nil { + t.Errorf("informer did not add handler1: %s", err) + return + } + handler2 := &ResourceEventHandlerFuncs{} + handle2, err := informer.AddEventHandler(handler2) + if err != nil { + t.Errorf("informer did not add handler2: %s", err) + return + } + + if eventHandlerCount(informer) != 2 { + t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer)) + } + if eventHandlerCount(informer2) != 0 { + t.Errorf("informer2 has %d registered handler, instead of 0", eventHandlerCount(informer2)) + } + + // remove handle at foreign informer + if isRegistered(informer2, handle1) { + t.Errorf("handle1 registered for informer2") + } + if isRegistered(informer2, handle2) { + t.Errorf("handle2 registered for informer2") + } + if err := informer2.RemoveEventHandler(handle1); err != nil { + t.Errorf("removing of second pointer handler failed: %s", err) + } + if eventHandlerCount(informer) != 2 { + t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer)) + } + if eventHandlerCount(informer2) != 0 { + t.Errorf("informer2 has %d registered handler, instead of 0", eventHandlerCount(informer2)) + } + if !isRegistered(informer, handle1) { + t.Errorf("handle1 not registered anymore for informer") + } + if !isRegistered(informer, handle2) { + t.Errorf("handle2 not registered anymore for informer") + } + + if eventHandlerCount(informer) != 2 { + t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer)) + } + if eventHandlerCount(informer2) != 0 { + t.Errorf("informer2 has %d registered handler, instead of 0", eventHandlerCount(informer2)) + } + if !isRegistered(informer, handle1) { + t.Errorf("handle1 not registered anymore for informer") + } + if !isRegistered(informer, handle2) { + t.Errorf("handle2 not registered anymore for informer") + } + + if err := informer.RemoveEventHandler(handle2); err != nil { + t.Errorf("removing of second pointer handler failed: %s", err) + } + if eventHandlerCount(informer) != 1 { + t.Errorf("after removing handler informer has %d registered handler(s), instead of 1", eventHandlerCount(informer)) + } + + if err := informer.RemoveEventHandler(handle1); err != nil { + t.Errorf("removing of first pointer handler failed: %s", err) + } + if eventHandlerCount(informer) != 0 { + t.Errorf("informer still has registered handlers after removing both handlers") + } +} + +func TestSharedInformerMultipleRegistration(t *testing.T) { + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + + handler1 := &ResourceEventHandlerFuncs{} + reg1, err := informer.AddEventHandler(handler1) + if err != nil { + t.Errorf("informer did not add handler for the first time: %s", err) + return + } + + if !isRegistered(informer, reg1) { + t.Errorf("handle1 is not active after successful registration") + return + } + + reg2, err := informer.AddEventHandler(handler1) + if err != nil { + t.Errorf("informer did not add handler for the second: %s", err) + return + } + + if !isRegistered(informer, reg2) { + t.Errorf("handle2 is not active after successful registration") + return + } + + if eventHandlerCount(informer) != 2 { + t.Errorf("informer has %d registered handler(s), instead of 2", eventHandlerCount(informer)) + } + + if err := informer.RemoveEventHandler(reg1); err != nil { + t.Errorf("removing of duplicate handler registration failed: %s", err) + } + + if isRegistered(informer, reg1) { + t.Errorf("handle1 is still active after successful remove") + return + } + if !isRegistered(informer, reg2) { + t.Errorf("handle2 is not active after removing handle1") + return + } + + if eventHandlerCount(informer) != 1 { + if eventHandlerCount(informer) == 0 { + t.Errorf("informer has no registered handler anymore after removal of duplicate registrations") + } else { + t.Errorf("informer has unexpected number (%d) of handlers after removal of duplicate handler registration", eventHandlerCount(informer)) + } + } + + if err := informer.RemoveEventHandler(reg2); err != nil { + t.Errorf("removing of second handler registration failed: %s", err) + } + + if isRegistered(informer, reg2) { + t.Errorf("handle2 is still active after successful remove") + return + } + + if eventHandlerCount(informer) != 0 { + t.Errorf("informer has unexpected number (%d) of handlers after removal of second handler registrations", eventHandlerCount(informer)) + } +} + +func TestRemovingRemovedSharedInformer(t *testing.T) { + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + handler := &ResourceEventHandlerFuncs{} + reg, err := informer.AddEventHandler(handler) + + if err != nil { + t.Errorf("informer did not add handler for the first time: %s", err) + return + } + if err := informer.RemoveEventHandler(reg); err != nil { + t.Errorf("removing of handler registration failed: %s", err) + return + } + if isRegistered(informer, reg) { + t.Errorf("handle is still active after successful remove") + return + } + if err := informer.RemoveEventHandler(reg); err != nil { + t.Errorf("removing of already removed registration yields unexpected error: %s", err) + } + if isRegistered(informer, reg) { + t.Errorf("handle is still active after second remove") + return + } +} + +// Shows that many concurrent goroutines can be manipulating shared informer +// listeners without tripping it up. There are not really many assertions in this +// test. Meant to be run with -race to find race conditions +func TestSharedInformerHandlerAbuse(t *testing.T) { + source := fcache.NewFakeControllerSource() + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + + ctx, cancel := context.WithCancel(context.Background()) + informerCtx, informerCancel := context.WithCancel(context.Background()) + go func() { + informer.Run(informerCtx.Done()) + cancel() + }() + + worker := func() { + // Keep adding and removing handler + // Make sure no duplicate events? + funcs := ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) {}, + UpdateFunc: func(oldObj, newObj interface{}) {}, + DeleteFunc: func(obj interface{}) {}, + } + handles := []ResourceEventHandlerRegistration{} + + for { + select { + case <-ctx.Done(): + return + default: + switch rand.Intn(2) { + case 0: + // Register handler again + reg, err := informer.AddEventHandlerWithResyncPeriod(funcs, 1*time.Second) + if err != nil { + if strings.Contains(err.Error(), "stopped already") { + // test is over + return + } + t.Errorf("failed to add handler: %v", err) + return + } + handles = append(handles, reg) + case 1: + // Remove a random handler + if len(handles) == 0 { + continue + } + + idx := rand.Intn(len(handles)) + err := informer.RemoveEventHandler(handles[idx]) + if err != nil { + if strings.Contains(err.Error(), "stopped already") { + // test is over + return + } + t.Errorf("failed to remove handler: %v", err) + return + } + handles = append(handles[:idx], handles[idx+1:]...) + } + } + } + } + + wg := sync.WaitGroup{} + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + worker() + wg.Done() + }() + } + + objs := []*v1.Pod{} + + // While workers run, randomly create events for the informer + for i := 0; i < 10000; i++ { + if len(objs) == 0 { + // Make sure there is always an object + obj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "pod" + strconv.Itoa(i), + }} + objs = append(objs, obj) + + // deep copy before adding since the Modify function mutates the obj + source.Add(obj.DeepCopy()) + } + + switch rand.Intn(3) { + case 0: + // Add Object + obj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "pod" + strconv.Itoa(i), + }} + objs = append(objs, obj) + source.Add(obj.DeepCopy()) + case 1: + // Update Object + idx := rand.Intn(len(objs)) + source.Modify(objs[idx].DeepCopy()) + + case 2: + // Remove Object + idx := rand.Intn(len(objs)) + source.Delete(objs[idx].DeepCopy()) + objs = append(objs[:idx], objs[idx+1:]...) + } + } + + // sotp informer which stops workers. stopping informer first to exercise + // contention for informer while it is closing + informerCancel() + + // wait for workers to finish since they may throw errors + wg.Wait() +} + +func TestStateSharedInformer(t *testing.T) { + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + listener := newTestListener("listener", 0, "pod1") + informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod) + + if isStarted(informer) { + t.Errorf("informer already started after creation") + return + } + if informer.IsStopped() { + t.Errorf("informer already stopped after creation") + return + } + stop := make(chan struct{}) + go informer.Run(stop) + if !listener.ok() { + t.Errorf("informer did not report initial objects") + close(stop) + return + } + + if !isStarted(informer) { + t.Errorf("informer does not report to be started although handling events") + close(stop) + return + } + if informer.IsStopped() { + t.Errorf("informer reports to be stopped although stop channel not closed") + close(stop) + return + } + + close(stop) + fmt.Println("sleeping") + time.Sleep(1 * time.Second) + + if !informer.IsStopped() { + t.Errorf("informer reports not to be stopped although stop channel closed") + return + } + if !isStarted(informer) { + t.Errorf("informer reports not to be started after it has been started and stopped") + return + } +} + +func TestAddOnStoppedSharedInformer(t *testing.T) { + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + listener := newTestListener("listener", 0, "pod1") + stop := make(chan struct{}) + go informer.Run(stop) + close(stop) + + err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) { + if informer.IsStopped() { + return true, nil + } + return false, nil + }) + + if err != nil { + t.Errorf("informer reports not to be stopped although stop channel closed") + return + } + + _, err = informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod) + if err == nil { + t.Errorf("stopped informer did not reject add handler") + return + } + if !strings.HasSuffix(err.Error(), "was not added to shared informer because it has stopped already") { + t.Errorf("adding handler to a stopped informer yields unexpected error: %s", err) + return + } +} + +func TestRemoveOnStoppedSharedInformer(t *testing.T) { + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + listener := newTestListener("listener", 0, "pod1") + handle, err := informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod) + if err != nil { + t.Errorf("informer did not add handler: %s", err) + return + } + stop := make(chan struct{}) + go informer.Run(stop) + close(stop) + fmt.Println("sleeping") + time.Sleep(1 * time.Second) + + if !informer.IsStopped() { + t.Errorf("informer reports not to be stopped although stop channel closed") + return + } + err = informer.RemoveEventHandler(handle) + if err != nil { + t.Errorf("informer does not remove handler on stopped informer") + return + } +} + +func TestRemoveWhileActive(t *testing.T) { + // source simulates an apiserver object endpoint. + source := fcache.NewFakeControllerSource() + + // create the shared informer and resync every 12 hours + informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer) + + listener := newTestListener("listener", 0, "pod1") + handle, _ := informer.AddEventHandler(listener) + + stop := make(chan struct{}) + defer close(stop) + + go informer.Run(stop) + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + if !listener.ok() { + t.Errorf("event did not occur") + return + } + + informer.RemoveEventHandler(handle) + + if isRegistered(informer, handle) { + t.Errorf("handle is still active after successful remove") + return + } + + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}}) + + if !listener.ok() { + t.Errorf("unexpected event occurred") + return + } +} + +func TestAddWhileActive(t *testing.T) { + // source simulates an apiserver object endpoint. + source := fcache.NewFakeControllerSource() + + // create the shared informer and resync every 12 hours + informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer) + listener1 := newTestListener("originalListener", 0, "pod1") + listener2 := newTestListener("originalListener", 0, "pod1", "pod2") + handle1, _ := informer.AddEventHandler(listener1) + + stop := make(chan struct{}) + defer close(stop) + + go informer.Run(stop) + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + if !listener1.ok() { + t.Errorf("events on listener1 did not occur") + return + } + + handle2, _ := informer.AddEventHandler(listener2) + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}}) + + if !listener2.ok() { + t.Errorf("event on listener2 did not occur") + return + } + + if !isRegistered(informer, handle1) { + t.Errorf("handle1 is not active") + return + } + if !isRegistered(informer, handle2) { + t.Errorf("handle2 is not active") + return + } + + listener1.expectedItemNames = listener2.expectedItemNames + if !listener1.ok() { + t.Errorf("events on listener1 did not occur") + return + } +}