apply desired changes for handler registration

Kubernetes-commit: 92f04baac98186673c684bba419087d6285807b1
This commit is contained in:
Uwe Krueger 2021-08-22 13:03:18 +02:00 committed by Kubernetes Publisher
parent d73e40f207
commit 33eff64a05
2 changed files with 48 additions and 34 deletions

View File

@ -136,9 +136,9 @@ type SharedInformer interface {
// AddEventHandler adds an event handler to the shared informer using the shared informer's resync // AddEventHandler adds an event handler to the shared informer using the shared informer's resync
// period. Events to a single handler are delivered sequentially, but there is no coordination // period. Events to a single handler are delivered sequentially, but there is no coordination
// between different handlers. // between different handlers.
// It returns a handle for the handler that can be used to remove // It returns a registration handle for the handler that can be used to remove
// the handler again. // the handler again.
AddEventHandler(handler ResourceEventHandler) (*ResourceEventHandlerRegistration, error) AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error)
// AddEventHandlerWithResyncPeriod adds an event handler to the // AddEventHandlerWithResyncPeriod adds an event handler to the
// shared informer with the requested resync period; zero means // shared informer with the requested resync period; zero means
// this handler does not care about resyncs. The resync operation // this handler does not care about resyncs. The resync operation
@ -153,9 +153,9 @@ type SharedInformer interface {
// between any two resyncs may be longer than the nominal period // between any two resyncs may be longer than the nominal period
// because the implementation takes time to do work and there may // because the implementation takes time to do work and there may
// be competing load and scheduling noise. // be competing load and scheduling noise.
// It returns a handle for the handler that can be used to remove // It returns a registration handle for the handler that can be used to remove
// the handler again and an error if the handler cannot be added. // the handler again and an error if the handler cannot be added.
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (*ResourceEventHandlerRegistration, error) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error)
// RemoveEventHandlerByRegistration removes a formerly added event handler given by // RemoveEventHandlerByRegistration removes a formerly added event handler given by
// its registration handle. // its registration handle.
// If, for some reason, the same handler has been added multiple // If, for some reason, the same handler has been added multiple
@ -166,7 +166,7 @@ type SharedInformer interface {
// Calling Remove on an already removed handle returns no error // Calling Remove on an already removed handle returns no error
// because the handler is finally (still) removed after calling this // because the handler is finally (still) removed after calling this
// method. // method.
RemoveEventHandlerByRegistration(handle *ResourceEventHandlerRegistration) error RemoveEventHandlerByRegistration(handle ResourceEventHandlerRegistration) error
// GetStore returns the informer's local cache as a Store. // GetStore returns the informer's local cache as a Store.
GetStore() Store GetStore() Store
// GetController is deprecated, it does nothing useful // GetController is deprecated, it does nothing useful
@ -229,13 +229,6 @@ type ResourceEventHandlerRegistration struct {
listener *processorListener listener *processorListener
} }
// isActive reports whether this registration is still active
// meaning that the handler registered with this handle is
// still registered.
func (h *ResourceEventHandlerRegistration) isActive() bool {
return h.listener != nil
}
// SharedIndexInformer provides add and get Indexers ability based on SharedInformer. // SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
type SharedIndexInformer interface { type SharedIndexInformer interface {
SharedInformer SharedInformer
@ -409,6 +402,10 @@ type deleteNotification struct {
oldObj interface{} oldObj interface{}
} }
func (s *sharedIndexInformer) isRegistered(h ResourceEventHandlerRegistration) bool {
return s.processor.isRegistered(h.listener)
}
func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) error { func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) error {
s.startedLock.Lock() s.startedLock.Lock()
defer s.startedLock.Unlock() defer s.startedLock.Unlock()
@ -531,7 +528,7 @@ func (s *sharedIndexInformer) GetController() Controller {
return &dummyController{informer: s} return &dummyController{informer: s}
} }
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (*ResourceEventHandlerRegistration, error) { func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) {
return s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) return s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
} }
@ -552,13 +549,13 @@ func determineResyncPeriod(desired, check time.Duration) time.Duration {
const minimumResyncPeriod = 1 * time.Second const minimumResyncPeriod = 1 * time.Second
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (*ResourceEventHandlerRegistration, error) { func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) {
s.startedLock.Lock() s.startedLock.Lock()
defer s.startedLock.Unlock() defer s.startedLock.Unlock()
if s.stopped { if s.stopped {
klog.V(2).Infof("Handler %v is not added to shared informer because it has stopped already", handler) klog.V(2).Infof("Handler %v is not added to shared informer because it has stopped already", handler)
return nil, fmt.Errorf("handler %v is not added to shared informer because it has stopped already", handler) return ResourceEventHandlerRegistration{}, fmt.Errorf("handler %v is not added to shared informer because it has stopped already", handler)
} }
if resyncPeriod > 0 { if resyncPeriod > 0 {
@ -582,7 +579,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
} }
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize) listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
handle := &ResourceEventHandlerRegistration{listener} handle := ResourceEventHandlerRegistration{listener}
if !s.started { if !s.started {
s.processor.addListener(listener) s.processor.addListener(listener)
@ -663,7 +660,7 @@ func (s *sharedIndexInformer) IsStopped() bool {
// If a handler has been added multiple times, only the actual registration for the // If a handler has been added multiple times, only the actual registration for the
// handler will be removed. Other registrations of the same handler will still be // handler will be removed. Other registrations of the same handler will still be
// active until they are explicitly removes, also. // active until they are explicitly removes, also.
func (s *sharedIndexInformer) RemoveEventHandlerByRegistration(handle *ResourceEventHandlerRegistration) error { func (s *sharedIndexInformer) RemoveEventHandlerByRegistration(handle ResourceEventHandlerRegistration) error {
s.startedLock.Lock() s.startedLock.Lock()
defer s.startedLock.Unlock() defer s.startedLock.Unlock()
@ -682,7 +679,6 @@ func (s *sharedIndexInformer) RemoveEventHandlerByRegistration(handle *ResourceE
s.blockDeltas.Lock() s.blockDeltas.Lock()
defer s.blockDeltas.Unlock() defer s.blockDeltas.Unlock()
s.processor.removeListener(handle.listener) s.processor.removeListener(handle.listener)
handle.listener = nil
return nil return nil
} }
@ -701,6 +697,19 @@ type sharedProcessor struct {
wg wait.Group wg wait.Group
} }
func (p *sharedProcessor) isRegistered(listener *processorListener) bool {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
for i := 0; i < len(p.listeners); i++ {
l := p.listeners[i]
if l == listener {
return true
}
}
return false
}
func (p *sharedProcessor) addListener(listener *processorListener) { func (p *sharedProcessor) addListener(listener *processorListener) {
p.listenersLock.Lock() p.listenersLock.Lock()
defer p.listenersLock.Unlock() defer p.listenersLock.Unlock()
@ -721,18 +730,21 @@ func (p *sharedProcessor) removeListener(listener *processorListener) {
p.listenersLock.Lock() p.listenersLock.Lock()
defer p.listenersLock.Unlock() defer p.listenersLock.Unlock()
p.removeListenerLocked(listener) if p.removeListenerLocked(listener) {
if p.listenersStarted { if p.listenersStarted {
close(listener.addCh) close(listener.addCh)
} }
}
} }
func (p *sharedProcessor) removeListenerLocked(listener *processorListener) { func (p *sharedProcessor) removeListenerLocked(listener *processorListener) bool {
found := false
for i := 0; i < len(p.listeners); i++ { for i := 0; i < len(p.listeners); i++ {
l := p.listeners[i] l := p.listeners[i]
if l == listener { if l == listener {
p.listeners = append(p.listeners[:i], p.listeners[i+1:]...) p.listeners = append(p.listeners[:i], p.listeners[i+1:]...)
i-- i--
found = true
} }
} }
for i := 0; i < len(p.syncingListeners); i++ { for i := 0; i < len(p.syncingListeners); i++ {
@ -742,6 +754,7 @@ func (p *sharedProcessor) removeListenerLocked(listener *processorListener) {
i-- i--
} }
} }
return found
} }
func (p *sharedProcessor) distribute(obj interface{}, sync bool) { func (p *sharedProcessor) distribute(obj interface{}, sync bool) {

View File

@ -110,6 +110,11 @@ func isStarted(i SharedInformer) bool {
return s.started return s.started
} }
func isRegistered(i SharedInformer, h ResourceEventHandlerRegistration) bool {
s := i.(*sharedIndexInformer)
return s.isRegistered(h)
}
func TestListenerResyncPeriods(t *testing.T) { func TestListenerResyncPeriods(t *testing.T) {
// source simulates an apiserver object endpoint. // source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource() source := fcache.NewFakeControllerSource()
@ -494,7 +499,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) {
return return
} }
if !reg1.isActive() { if !isRegistered(informer,reg1) {
t.Errorf("handle1 is not active after successful registration") t.Errorf("handle1 is not active after successful registration")
return return
} }
@ -505,7 +510,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) {
return return
} }
if !reg2.isActive() { if !isRegistered(informer,reg2) {
t.Errorf("handle2 is not active after successful registration") t.Errorf("handle2 is not active after successful registration")
return return
} }
@ -518,11 +523,11 @@ func TestSharedInformerMultipleRegistration(t *testing.T) {
t.Errorf("removing of duplicate handler registration failed: %s", err) t.Errorf("removing of duplicate handler registration failed: %s", err)
} }
if reg1.isActive() { if isRegistered(informer,reg1) {
t.Errorf("handle1 is still active after successful remove") t.Errorf("handle1 is still active after successful remove")
return return
} }
if !reg2.isActive() { if !isRegistered(informer, reg2) {
t.Errorf("handle2 is not active after removing handle1") t.Errorf("handle2 is not active after removing handle1")
return return
} }
@ -539,7 +544,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) {
t.Errorf("removing of second handler registration failed: %s", err) t.Errorf("removing of second handler registration failed: %s", err)
} }
if reg2.isActive() { if isRegistered(informer,reg2) {
t.Errorf("handle2 is still active after successful remove") t.Errorf("handle2 is still active after successful remove")
return return
} }
@ -565,14 +570,14 @@ func TestRemovingRemovedSharedInformer(t *testing.T) {
t.Errorf("removing of handler registration failed: %s", err) t.Errorf("removing of handler registration failed: %s", err)
return return
} }
if reg.isActive() { if isRegistered(informer,reg) {
t.Errorf("handle is still active after successful remove") t.Errorf("handle is still active after successful remove")
return return
} }
if err := informer.RemoveEventHandlerByRegistration(reg); err != nil { if err := informer.RemoveEventHandlerByRegistration(reg); err != nil {
t.Errorf("removing of already removed registration yields unexpected error: %s", err) t.Errorf("removing of already removed registration yields unexpected error: %s", err)
} }
if reg.isActive() { if isRegistered(informer,reg) {
t.Errorf("handle is still active after second remove") t.Errorf("handle is still active after second remove")
return return
} }
@ -643,7 +648,7 @@ func TestAddOnStoppedSharedInformer(t *testing.T) {
return return
} }
handle, err := informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod) _, err := informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
if err == nil { if err == nil {
t.Errorf("stopped informer did not reject add handler") t.Errorf("stopped informer did not reject add handler")
return return
@ -652,10 +657,6 @@ func TestAddOnStoppedSharedInformer(t *testing.T) {
t.Errorf("adding handler to a stopped informer yields unexpected error: %s", err) t.Errorf("adding handler to a stopped informer yields unexpected error: %s", err)
return return
} }
if handle != nil {
t.Errorf("got handle for added handler on stopped informer")
return
}
} }
func TestRemoveOnStoppedSharedInformer(t *testing.T) { func TestRemoveOnStoppedSharedInformer(t *testing.T) {