switch listeners to use a map, adapt tests

Kubernetes-commit: 063ef090e7fb5823ca18a10a83e8847eedac9599
This commit is contained in:
Alexander Zielenski
2022-06-16 10:54:03 -07:00
committed by Kubernetes Publisher
parent 90c6a46b32
commit 5a25eb0de8
2 changed files with 155 additions and 220 deletions

View File

@@ -131,7 +131,6 @@ import (
// A delete notification exposes the last locally known non-absent
// state, except that its ResourceVersion is replaced with a
// ResourceVersion in which the object is actually absent.
//
type SharedInformer interface {
// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
// period. Events to a single handler are delivered sequentially, but there is no coordination
@@ -158,14 +157,7 @@ type SharedInformer interface {
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error)
// RemoveEventHandler removes a formerly added event handler given by
// its registration handle.
// If, for some reason, the same handler has been added multiple
// times, only the registration for the given registration handle
// will be removed.
// It returns an error if the handler cannot be removed,
// because the informer is already stopped.
// Calling Remove on an already removed handle returns no error
// because the handler is finally (still) removed after calling this
// method.
// This function is guaranteed to be idempotent, and thread-safe.
RemoveEventHandler(handle ResourceEventHandlerRegistration) error
// GetStore returns the informer's local cache as a Store.
GetStore() Store
@@ -218,16 +210,10 @@ type SharedInformer interface {
IsStopped() bool
}
// ResourceEventHandlerRegistration is a handle returned by the
// registration methods of SharedInformers for a registered
// ResourceEventHandler. It can be used later on to remove
// a registration again.
// This indirection is required, because the ResourceEventHandler
// interface can be implemented by non-go-comparable handlers, which
// could not be removed from a list anymore.
type ResourceEventHandlerRegistration struct {
listener *processorListener
}
// Opaque interface representing the registration of ResourceEventHandler for
// a SharedInformer. Must be supplied back to the same SharedInformer's
// `RemoveEventHandler` to unregister the handlers.
type ResourceEventHandlerRegistration interface{}
// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
type SharedIndexInformer interface {
@@ -402,10 +388,6 @@ type deleteNotification struct {
oldObj interface{}
}
func (s *sharedIndexInformer) isRegistered(h ResourceEventHandlerRegistration) bool {
return s.processor.isRegistered(h.listener)
}
func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) error {
s.startedLock.Lock()
defer s.startedLock.Unlock()
@@ -554,8 +536,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
defer s.startedLock.Unlock()
if s.stopped {
klog.V(2).Infof("Handler %v is not added to shared informer because it has stopped already", handler)
return ResourceEventHandlerRegistration{}, fmt.Errorf("handler %v is not added to shared informer because it has stopped already", handler)
return nil, fmt.Errorf("handler %v was not added to shared informer because it has stopped already", handler)
}
if resyncPeriod > 0 {
@@ -579,11 +560,9 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
}
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
handle := ResourceEventHandlerRegistration{listener}
if !s.started {
s.processor.addListener(listener)
return handle, nil
return s.processor.addListener(listener), nil
}
// in order to safely join, we have to
@@ -594,7 +573,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
s.processor.addListener(listener)
handle := s.processor.addListener(listener)
for _, item := range s.indexer.List() {
listener.add(addNotification{newObj: item})
}
@@ -655,31 +634,17 @@ func (s *sharedIndexInformer) IsStopped() bool {
return s.stopped
}
// RemoveEventHandler tries to remove a formerly added event handler by its
// registration handle returned for its registration.
// If a handler has been added multiple times, only the actual registration for the
// handler will be removed. Other registrations of the same handler will still be
// active until they are explicitly removes, also.
func (s *sharedIndexInformer) RemoveEventHandler(handle ResourceEventHandlerRegistration) error {
s.startedLock.Lock()
defer s.startedLock.Unlock()
if handle.listener == nil {
return nil
}
if s.stopped {
return nil
}
// in order to safely remove, we have to
// 1. stop sending add/update/delete notifications
// 2. remove and stop listener
// 3. unblock
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
s.processor.removeListener(handle.listener)
return nil
return s.processor.removeListener(handle)
}
// sharedProcessor has a collection of processorListener and can
@@ -691,82 +656,77 @@ func (s *sharedIndexInformer) RemoveEventHandler(handle ResourceEventHandlerRegi
type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
listeners []*processorListener
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
// Map from listeners to whether or not they are currently syncing
listeners map[*processorListener]bool
clock clock.Clock
wg wait.Group
}
func (p *sharedProcessor) isRegistered(listener *processorListener) bool {
func (p *sharedProcessor) getListener(registration ResourceEventHandlerRegistration) *processorListener {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
for i := 0; i < len(p.listeners); i++ {
l := p.listeners[i]
if l == listener {
return true
if p.listeners == nil {
return nil
}
if result, ok := registration.(*processorListener); ok {
if _, exists := p.listeners[result]; exists {
return result
}
}
return false
return nil
}
func (p *sharedProcessor) addListener(listener *processorListener) {
func (p *sharedProcessor) addListener(listener *processorListener) ResourceEventHandlerRegistration {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
p.addListenerLocked(listener)
if p.listeners == nil {
p.listeners = make(map[*processorListener]bool)
}
p.listeners[listener] = true
if p.listenersStarted {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
return listener
}
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
p.listeners = append(p.listeners, listener)
p.syncingListeners = append(p.syncingListeners, listener)
}
func (p *sharedProcessor) removeListener(listener *processorListener) {
func (p *sharedProcessor) removeListener(handle ResourceEventHandlerRegistration) error {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
if p.removeListenerLocked(listener) {
if p.listenersStarted {
close(listener.addCh)
}
listener, ok := handle.(*processorListener)
if !ok {
return fmt.Errorf("invalid key type %t", handle)
} else if p.listeners == nil {
// No listeners are registered, do nothing
return nil
} else if _, exists := p.listeners[listener]; !exists {
// Listener is not registered, just do nothing
return nil
}
}
func (p *sharedProcessor) removeListenerLocked(listener *processorListener) bool {
found := false
for i := 0; i < len(p.listeners); i++ {
l := p.listeners[i]
if l == listener {
p.listeners = append(p.listeners[:i], p.listeners[i+1:]...)
found = true
break
}
delete(p.listeners, listener)
if p.listenersStarted {
close(listener.addCh)
}
for i := 0; i < len(p.syncingListeners); i++ {
l := p.syncingListeners[i]
if l == listener {
p.syncingListeners = append(p.syncingListeners[:i], p.syncingListeners[i+1:]...)
break
}
}
return found
return nil
}
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
for listener, isSyncing := range p.listeners {
if !sync || isSyncing {
listener.add(obj)
}
}
@@ -776,19 +736,24 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
for listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
for listener := range p.listeners {
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
p.wg.Wait() // Wait for all .pop() and .run() to stop
// Wipe out list of listeners since they are now closed
// (processorListener cannot be re-used)
p.listeners = nil
}
// shouldResync queries every listener to determine if any of them need a resync, based on each
@@ -797,19 +762,20 @@ func (p *sharedProcessor) shouldResync() bool {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
p.syncingListeners = []*processorListener{}
resyncNeeded := false
now := p.clock.Now()
for _, listener := range p.listeners {
for listener := range p.listeners {
// need to loop through all the listeners to see if they need to resync so we can prepare any
// listeners that are going to be resyncing.
if listener.shouldResync(now) {
shouldResync := listener.shouldResync(now)
p.listeners[listener] = shouldResync
if shouldResync {
resyncNeeded = true
p.syncingListeners = append(p.syncingListeners, listener)
listener.determineNextResync(now)
}
}
return resyncNeeded
}
@@ -817,8 +783,9 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Durati
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod)
for listener := range p.listeners {
resyncPeriod := determineResyncPeriod(
listener.requestedResyncPeriod, resyncCheckPeriod)
listener.setResyncPeriod(resyncPeriod)
}
}