diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index 813daafd..ce86cb71 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -136,9 +136,9 @@ type SharedInformer interface { // AddEventHandler adds an event handler to the shared informer using the shared informer's resync // period. Events to a single handler are delivered sequentially, but there is no coordination // between different handlers. - // It returns a handle for the handler that can be used to remove + // It returns a registration handle for the handler that can be used to remove // the handler again. - AddEventHandler(handler ResourceEventHandler) (*ResourceEventHandlerRegistration, error) + AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) // AddEventHandlerWithResyncPeriod adds an event handler to the // shared informer with the requested resync period; zero means // this handler does not care about resyncs. The resync operation @@ -153,9 +153,9 @@ type SharedInformer interface { // between any two resyncs may be longer than the nominal period // because the implementation takes time to do work and there may // be competing load and scheduling noise. - // It returns a handle for the handler that can be used to remove + // It returns a registration handle for the handler that can be used to remove // the handler again and an error if the handler cannot be added. - AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (*ResourceEventHandlerRegistration, error) + AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) // RemoveEventHandlerByRegistration removes a formerly added event handler given by // its registration handle. // If, for some reason, the same handler has been added multiple @@ -166,7 +166,7 @@ type SharedInformer interface { // Calling Remove on an already removed handle returns no error // because the handler is finally (still) removed after calling this // method. - RemoveEventHandlerByRegistration(handle *ResourceEventHandlerRegistration) error + RemoveEventHandlerByRegistration(handle ResourceEventHandlerRegistration) error // GetStore returns the informer's local cache as a Store. GetStore() Store // GetController is deprecated, it does nothing useful @@ -229,13 +229,6 @@ type ResourceEventHandlerRegistration struct { listener *processorListener } -// isActive reports whether this registration is still active -// meaning that the handler registered with this handle is -// still registered. -func (h *ResourceEventHandlerRegistration) isActive() bool { - return h.listener != nil -} - // SharedIndexInformer provides add and get Indexers ability based on SharedInformer. type SharedIndexInformer interface { SharedInformer @@ -409,6 +402,10 @@ type deleteNotification struct { oldObj interface{} } +func (s *sharedIndexInformer) isRegistered(h ResourceEventHandlerRegistration) bool { + return s.processor.isRegistered(h.listener) +} + func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) error { s.startedLock.Lock() defer s.startedLock.Unlock() @@ -531,7 +528,7 @@ func (s *sharedIndexInformer) GetController() Controller { return &dummyController{informer: s} } -func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (*ResourceEventHandlerRegistration, error) { +func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) { return s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) } @@ -552,13 +549,13 @@ func determineResyncPeriod(desired, check time.Duration) time.Duration { const minimumResyncPeriod = 1 * time.Second -func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (*ResourceEventHandlerRegistration, error) { +func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) { s.startedLock.Lock() defer s.startedLock.Unlock() if s.stopped { klog.V(2).Infof("Handler %v is not added to shared informer because it has stopped already", handler) - return nil, fmt.Errorf("handler %v is not added to shared informer because it has stopped already", handler) + return ResourceEventHandlerRegistration{}, fmt.Errorf("handler %v is not added to shared informer because it has stopped already", handler) } if resyncPeriod > 0 { @@ -582,7 +579,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv } listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize) - handle := &ResourceEventHandlerRegistration{listener} + handle := ResourceEventHandlerRegistration{listener} if !s.started { s.processor.addListener(listener) @@ -663,7 +660,7 @@ func (s *sharedIndexInformer) IsStopped() bool { // If a handler has been added multiple times, only the actual registration for the // handler will be removed. Other registrations of the same handler will still be // active until they are explicitly removes, also. -func (s *sharedIndexInformer) RemoveEventHandlerByRegistration(handle *ResourceEventHandlerRegistration) error { +func (s *sharedIndexInformer) RemoveEventHandlerByRegistration(handle ResourceEventHandlerRegistration) error { s.startedLock.Lock() defer s.startedLock.Unlock() @@ -682,7 +679,6 @@ func (s *sharedIndexInformer) RemoveEventHandlerByRegistration(handle *ResourceE s.blockDeltas.Lock() defer s.blockDeltas.Unlock() s.processor.removeListener(handle.listener) - handle.listener = nil return nil } @@ -701,6 +697,19 @@ type sharedProcessor struct { wg wait.Group } +func (p *sharedProcessor) isRegistered(listener *processorListener) bool { + p.listenersLock.Lock() + defer p.listenersLock.Unlock() + + for i := 0; i < len(p.listeners); i++ { + l := p.listeners[i] + if l == listener { + return true + } + } + return false +} + func (p *sharedProcessor) addListener(listener *processorListener) { p.listenersLock.Lock() defer p.listenersLock.Unlock() @@ -721,18 +730,21 @@ func (p *sharedProcessor) removeListener(listener *processorListener) { p.listenersLock.Lock() defer p.listenersLock.Unlock() - p.removeListenerLocked(listener) - if p.listenersStarted { - close(listener.addCh) + if p.removeListenerLocked(listener) { + if p.listenersStarted { + close(listener.addCh) + } } } -func (p *sharedProcessor) removeListenerLocked(listener *processorListener) { +func (p *sharedProcessor) removeListenerLocked(listener *processorListener) bool { + found := false for i := 0; i < len(p.listeners); i++ { l := p.listeners[i] if l == listener { p.listeners = append(p.listeners[:i], p.listeners[i+1:]...) i-- + found = true } } for i := 0; i < len(p.syncingListeners); i++ { @@ -742,6 +754,7 @@ func (p *sharedProcessor) removeListenerLocked(listener *processorListener) { i-- } } + return found } func (p *sharedProcessor) distribute(obj interface{}, sync bool) { diff --git a/tools/cache/shared_informer_test.go b/tools/cache/shared_informer_test.go index 6feeb218..214a4342 100644 --- a/tools/cache/shared_informer_test.go +++ b/tools/cache/shared_informer_test.go @@ -110,6 +110,11 @@ func isStarted(i SharedInformer) bool { return s.started } +func isRegistered(i SharedInformer, h ResourceEventHandlerRegistration) bool { + s := i.(*sharedIndexInformer) + return s.isRegistered(h) +} + func TestListenerResyncPeriods(t *testing.T) { // source simulates an apiserver object endpoint. source := fcache.NewFakeControllerSource() @@ -494,7 +499,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { return } - if !reg1.isActive() { + if !isRegistered(informer,reg1) { t.Errorf("handle1 is not active after successful registration") return } @@ -505,7 +510,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { return } - if !reg2.isActive() { + if !isRegistered(informer,reg2) { t.Errorf("handle2 is not active after successful registration") return } @@ -518,11 +523,11 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { t.Errorf("removing of duplicate handler registration failed: %s", err) } - if reg1.isActive() { + if isRegistered(informer,reg1) { t.Errorf("handle1 is still active after successful remove") return } - if !reg2.isActive() { + if !isRegistered(informer, reg2) { t.Errorf("handle2 is not active after removing handle1") return } @@ -539,7 +544,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { t.Errorf("removing of second handler registration failed: %s", err) } - if reg2.isActive() { + if isRegistered(informer,reg2) { t.Errorf("handle2 is still active after successful remove") return } @@ -565,14 +570,14 @@ func TestRemovingRemovedSharedInformer(t *testing.T) { t.Errorf("removing of handler registration failed: %s", err) return } - if reg.isActive() { + if isRegistered(informer,reg) { t.Errorf("handle is still active after successful remove") return } if err := informer.RemoveEventHandlerByRegistration(reg); err != nil { t.Errorf("removing of already removed registration yields unexpected error: %s", err) } - if reg.isActive() { + if isRegistered(informer,reg) { t.Errorf("handle is still active after second remove") return } @@ -643,7 +648,7 @@ func TestAddOnStoppedSharedInformer(t *testing.T) { return } - handle, err := informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod) + _, err := informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod) if err == nil { t.Errorf("stopped informer did not reject add handler") return @@ -652,10 +657,6 @@ func TestAddOnStoppedSharedInformer(t *testing.T) { t.Errorf("adding handler to a stopped informer yields unexpected error: %s", err) return } - if handle != nil { - t.Errorf("got handle for added handler on stopped informer") - return - } } func TestRemoveOnStoppedSharedInformer(t *testing.T) {