From 7436af3302088c979b431856c432b95dd230f847 Mon Sep 17 00:00:00 2001 From: Alexander Zielenski <351783+alexzielenski@users.noreply.github.com> Date: Thu, 16 Jun 2022 00:07:54 -0700 Subject: [PATCH 01/15] support removal of event handlers from SharedIndexInformers To be able to implement controllers that are dynamically deciding on which resources to watch, it is required to get rid of dedicated watches and event handlers again. This requires the possibility to remove event handlers from SharedIndexInformers again. Stopping an informer is not sufficient, because there might be multiple controllers in a controller manager that independently decide which resources to watch. Unfortunately the ResourceEventHandler interface encourages to use value objects for handlers (like the ResourceEventHandlerFuncs struct, that uses value receivers to implement the interface). Go does not support comparison of function pointers and therefore the comparison of such structs is not possible, also. To be able to remove all kinds of handlers and to solve the problem of multi-registrations of handlers a registration handle is introduced. It is returned when adding a handler and can later be used to remove the registration again. This handle directly stores the created listener to simplify the deletion. --- pkg/controller/replication/conversion.go | 9 +- .../client-go/tools/cache/shared_informer.go | 149 ++++++++- .../tools/cache/shared_informer_test.go | 285 ++++++++++++++++++ 3 files changed, 431 insertions(+), 12 deletions(-) diff --git a/pkg/controller/replication/conversion.go b/pkg/controller/replication/conversion.go index 57eb2555468..37e5d046382 100644 --- a/pkg/controller/replication/conversion.go +++ b/pkg/controller/replication/conversion.go @@ -45,6 +45,7 @@ import ( appslisters "k8s.io/client-go/listers/apps/v1" v1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" + appsinternal "k8s.io/kubernetes/pkg/apis/apps" appsconversion "k8s.io/kubernetes/pkg/apis/apps/v1" apiv1 "k8s.io/kubernetes/pkg/apis/core/v1" @@ -69,12 +70,12 @@ type conversionInformer struct { cache.SharedIndexInformer } -func (i conversionInformer) AddEventHandler(handler cache.ResourceEventHandler) { - i.SharedIndexInformer.AddEventHandler(conversionEventHandler{handler}) +func (i conversionInformer) AddEventHandler(handler cache.ResourceEventHandler) (*cache.ResourceEventHandlerHandle, error) { + return i.SharedIndexInformer.AddEventHandler(conversionEventHandler{handler}) } -func (i conversionInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) { - i.SharedIndexInformer.AddEventHandlerWithResyncPeriod(conversionEventHandler{handler}, resyncPeriod) +func (i conversionInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) (*cache.ResourceEventHandlerHandle, error) { + return i.SharedIndexInformer.AddEventHandlerWithResyncPeriod(conversionEventHandler{handler}, resyncPeriod) } type conversionLister struct { diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index 9f42782d179..a78eb6f5f97 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -131,11 +131,24 @@ import ( // A delete notification exposes the last locally known non-absent // state, except that its ResourceVersion is replaced with a // ResourceVersion in which the object is actually absent. +// +// The informational methods (EventHandlerCount, IsStopped, IsStarted) +// are intended to be used to manage informers in any upper informer +// management layer for creating and destroying informers on-the fly when +// adding or removing handlers. +// Beware of race conditions: Such a layer must hide the basic informer +// objects from its users and offer a closed *synchronized* view for informers. +// Although these informational methods are synchronized each, in +// sequences the state queried first might have been changed before +// calling the next method, if other callers (or the stop channel) +// are able to interact with the informer interface in parallel. type SharedInformer interface { // AddEventHandler adds an event handler to the shared informer using the shared informer's resync // period. Events to a single handler are delivered sequentially, but there is no coordination // between different handlers. - AddEventHandler(handler ResourceEventHandler) + // It returns a handle for the handler that can be used to remove + // the handler again. + AddEventHandler(handler ResourceEventHandler) (*ResourceEventHandlerHandle, error) // AddEventHandlerWithResyncPeriod adds an event handler to the // shared informer with the requested resync period; zero means // this handler does not care about resyncs. The resync operation @@ -150,7 +163,20 @@ type SharedInformer interface { // between any two resyncs may be longer than the nominal period // because the implementation takes time to do work and there may // be competing load and scheduling noise. - AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) + // It returns a handle for the handler that can be used to remove + // the handler again and an error if the handler cannot be added. + AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (*ResourceEventHandlerHandle, error) + // RemoveEventHandlerByHandle removes a formerly added event handler given by + // its registration handle. + // If, for some reason, the same handler has been added multiple + // times, only the registration for the given registration handle + // will be removed. + // It returns an error if the handler cannot be removed, + // because the informer is already stopped. + // Calling Remove on an already removed handle returns no error + // because the handler is finally (still) removed after calling this + // method. + RemoveEventHandlerByHandle(handle *ResourceEventHandlerHandle) error // GetStore returns the informer's local cache as a Store. GetStore() Store // GetController is deprecated, it does nothing useful @@ -195,6 +221,35 @@ type SharedInformer interface { // transform before mutating it at all and returning the copy to prevent // data races. SetTransform(handler TransformFunc) error + + // EventHandlerCount return the number of actually registered + // event handlers. + EventHandlerCount() int + + // IsStopped reports whether the informer has already been stopped. + // Adding event handlers to already stopped informers is not possible. + IsStopped() bool + + // IsStarted reports whether the informer has already been started + IsStarted() bool +} + +// ResourceEventHandlerHandle is a handle returned by the +// registration methods of SharedInformers for a registered +// ResourceEventHandler. It can be used later on to remove +// a registration again. +// This indirection is required, because the ResourceEventHandler +// interface can be implemented by non-go-comparable handlers, which +// could not be removed from a list anymore. +type ResourceEventHandlerHandle struct { + listener *processorListener +} + +// IsActive reports whether this registration is still active +// meaning that the handler registered with this handle is +// still registered. +func (h *ResourceEventHandlerHandle) IsActive() bool { + return h.listener != nil } // SharedIndexInformer provides add and get Indexers ability based on SharedInformer. @@ -492,8 +547,8 @@ func (s *sharedIndexInformer) GetController() Controller { return &dummyController{informer: s} } -func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) { - s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) +func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (*ResourceEventHandlerHandle, error) { + return s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) } func determineResyncPeriod(desired, check time.Duration) time.Duration { @@ -513,13 +568,13 @@ func determineResyncPeriod(desired, check time.Duration) time.Duration { const minimumResyncPeriod = 1 * time.Second -func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) { +func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (*ResourceEventHandlerHandle, error) { s.startedLock.Lock() defer s.startedLock.Unlock() if s.stopped { - klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler) - return + klog.V(2).Infof("Handler %v is not added to shared informer because it has stopped already", handler) + return nil, fmt.Errorf("handler %v is not added to shared informer because it has stopped already", handler) } if resyncPeriod > 0 { @@ -543,10 +598,11 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv } listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize) + handle := &ResourceEventHandlerHandle{listener} if !s.started { s.processor.addListener(listener) - return + return handle, nil } // in order to safely join, we have to @@ -561,6 +617,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv for _, item := range s.indexer.List() { listener.add(addNotification{newObj: item}) } + return handle, nil } func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { @@ -610,6 +667,55 @@ func (s *sharedIndexInformer) OnDelete(old interface{}) { s.processor.distribute(deleteNotification{oldObj: old}, false) } +// IsStarted reports whether the informer has already been started +func (s *sharedIndexInformer) IsStarted() bool { + s.startedLock.Lock() + defer s.startedLock.Unlock() + return s.started +} + +// IsStopped reports whether the informer has already been stopped +func (s *sharedIndexInformer) IsStopped() bool { + s.startedLock.Lock() + defer s.startedLock.Unlock() + return s.stopped +} + +// EventHandlerCount reports whether the informer still has registered +// event handlers +func (s *sharedIndexInformer) EventHandlerCount() int { + s.startedLock.Lock() + defer s.startedLock.Unlock() + return len(s.processor.listeners) +} + +// RemoveEventHandlerByHandle tries to remove a formerly added event handler by its +// handle returned for its registration. +// If a handler has been added multiple times, only the registration for the +// given handle will be removed. +func (s *sharedIndexInformer) RemoveEventHandlerByHandle(handle *ResourceEventHandlerHandle) error { + if handle.listener == nil { + return nil + } + + s.startedLock.Lock() + defer s.startedLock.Unlock() + + if s.stopped { + return fmt.Errorf("handler %v is not removed from shared informer because it has stopped already", handle.listener.handler) + } + + // in order to safely remove, we have to + // 1. stop sending add/update/delete notifications + // 2. remove and stop listener + // 3. unblock + s.blockDeltas.Lock() + defer s.blockDeltas.Unlock() + s.processor.removeListener(handle.listener) + handle.listener = nil + return nil +} + // sharedProcessor has a collection of processorListener and can // distribute a notification object to its listeners. There are two // kinds of distribute operations. The sync distributions go to a @@ -641,6 +747,33 @@ func (p *sharedProcessor) addListenerLocked(listener *processorListener) { p.syncingListeners = append(p.syncingListeners, listener) } +func (p *sharedProcessor) removeListener(listener *processorListener) { + p.listenersLock.Lock() + defer p.listenersLock.Unlock() + + p.removeListenerLocked(listener) + if p.listenersStarted { + close(listener.addCh) + } +} + +func (p *sharedProcessor) removeListenerLocked(listener *processorListener) { + for i := 0; i < len(p.listeners); i++ { + l := p.listeners[i] + if l == listener { + p.listeners = append(p.listeners[:i], p.listeners[i+1:]...) + i-- + } + } + for i := 0; i < len(p.syncingListeners); i++ { + l := p.syncingListeners[i] + if l == listener { + p.syncingListeners = append(p.syncingListeners[:i], p.syncingListeners[i+1:]...) + i-- + } + } +} + func (p *sharedProcessor) distribute(obj interface{}, sync bool) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go index 3f2ac71bfe4..21f197e0a8e 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go @@ -390,3 +390,288 @@ func TestSharedInformerTransformer(t *testing.T) { t.Errorf("%s: expected %v, got %v", listenerTransformer.name, listenerTransformer.expectedItemNames, listenerTransformer.receivedItemNames) } } + +func TestSharedInformerRemoveHandler(t *testing.T) { + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) + + handler1 := &ResourceEventHandlerFuncs{} + handle1, err := informer.AddEventHandler(handler1) + if err != nil { + t.Errorf("informer did not add handler1: %s", err) + return + } + handler2 := &ResourceEventHandlerFuncs{} + handle2, err := informer.AddEventHandler(handler2) + if err != nil { + t.Errorf("informer did not add handler2: %s", err) + return + } + + if informer.EventHandlerCount() != 2 { + t.Errorf("informer has %d registered handler, instead of 2", informer.EventHandlerCount()) + } + + if err := informer.RemoveEventHandlerByHandle(handle2); err != nil { + t.Errorf("removing of first pointer handler failed: %s", err) + } + if informer.EventHandlerCount() != 1 { + t.Errorf("after removing handler informer has %d registered handler(s), instead of 1", informer.EventHandlerCount()) + } + + if err := informer.RemoveEventHandlerByHandle(handle1); err != nil { + t.Errorf("removing of second pointer handler failed: %s", err) + } + if informer.EventHandlerCount() != 0 { + t.Errorf("informer still has registered handlers after removing both handlers") + } +} + +func TestSharedInformerRemoveNonComparableHandler(t *testing.T) { + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) + + handler1 := ResourceEventHandlerFuncs{} + handle1, err := informer.AddEventHandler(handler1) + if err != nil { + t.Errorf("informer did not add handler1: %s", err) + return + } + handler2 := &ResourceEventHandlerFuncs{} + handle2, err := informer.AddEventHandler(handler2) + if err != nil { + t.Errorf("informer did not add handler2: %s", err) + return + } + + if informer.EventHandlerCount() != 2 { + t.Errorf("informer has %d registered handler(s), instead of 2", informer.EventHandlerCount()) + } + + if err := informer.RemoveEventHandlerByHandle(handle2); err != nil { + t.Errorf("removing of pointer handler failed: %s", err) + } + if informer.EventHandlerCount() != 1 { + t.Errorf("after removal informer has %d registered handler(s), instead of 1", informer.EventHandlerCount()) + } + + if err := informer.RemoveEventHandlerByHandle(handle1); err != nil { + t.Errorf("removing of non-pointer handler failed: %s", err) + } + if informer.EventHandlerCount() != 0 { + t.Errorf("after removal informer has %d registered handler(s), instead of 0", informer.EventHandlerCount()) + } +} + +func TestSharedInformerMultipleRegistration(t *testing.T) { + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) + + handler1 := &ResourceEventHandlerFuncs{} + reg1, err := informer.AddEventHandler(handler1) + if err != nil { + t.Errorf("informer did not add handler for the first time: %s", err) + return + } + + if !reg1.IsActive() { + t.Errorf("handle1 is not active after successful registration") + return + } + + reg2, err := informer.AddEventHandler(handler1) + if err != nil { + t.Errorf("informer did not add handler for the second: %s", err) + return + } + + if !reg2.IsActive() { + t.Errorf("handle2 is not active after successful registration") + return + } + + if informer.EventHandlerCount() != 2 { + t.Errorf("informer has %d registered handler(s), instead of 1", informer.EventHandlerCount()) + } + + if err := informer.RemoveEventHandlerByHandle(reg1); err != nil { + t.Errorf("removing of duplicate handler registration failed: %s", err) + } + + if reg1.IsActive() { + t.Errorf("handle1 is still active after successful remove") + return + } + if !reg2.IsActive() { + t.Errorf("handle2 is not active after removing handle1") + return + } + + if informer.EventHandlerCount() != 1 { + if informer.EventHandlerCount() == 0 { + t.Errorf("informer has no registered handler anymore after removal of duplicate registrations") + } else { + t.Errorf("informer has unexpected number (%d) of handlers after removal of duplicate handler registration", informer.EventHandlerCount()) + } + } + + if err := informer.RemoveEventHandlerByHandle(reg2); err != nil { + t.Errorf("removing of second handler registration failed: %s", err) + } + + if reg2.IsActive() { + t.Errorf("handle2 is still active after successful remove") + return + } + + if informer.EventHandlerCount() != 0 { + t.Errorf("informer has unexpected number (%d) of handlers after removal of second handler registrations", informer.EventHandlerCount()) + } +} + +func TestRemovingRemovedSharedInformer(t *testing.T) { + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + handler := &ResourceEventHandlerFuncs{} + reg, err := informer.AddEventHandler(handler) + + if err != nil { + t.Errorf("informer did not add handler for the first time: %s", err) + return + } + if err := informer.RemoveEventHandlerByHandle(reg); err != nil { + t.Errorf("removing of handler registration failed: %s", err) + return + } + if reg.IsActive() { + t.Errorf("handle is still active after successful remove") + return + } + if err := informer.RemoveEventHandlerByHandle(reg); err != nil { + t.Errorf("removing of already removed registration yields unexpected error: %s", err) + } + if reg.IsActive() { + t.Errorf("handle is still active after second remove") + return + } +} + +func TestStateSharedInformer(t *testing.T) { + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + listener := newTestListener("listener", 0, "pod1") + informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod) + + if informer.IsStarted() { + t.Errorf("informer already started after creation") + return + } + if informer.IsStopped() { + t.Errorf("informer already stopped after creation") + return + } + stop := make(chan struct{}) + go informer.Run(stop) + if !listener.ok() { + t.Errorf("informer did not report initial objects") + close(stop) + return + } + + if !informer.IsStarted() { + t.Errorf("informer does not report to be started although handling events") + close(stop) + return + } + if informer.IsStopped() { + t.Errorf("informer reports to be stopped although stop channel not closed") + close(stop) + return + } + + close(stop) + fmt.Println("sleeping") + time.Sleep(1 * time.Second) + + if !informer.IsStopped() { + t.Errorf("informer reports not to be stopped although stop channel closed") + return + } + if !informer.IsStarted() { + t.Errorf("informer reports not to be started after it has been started and stopped") + return + } +} + +func TestAddOnStoppedSharedInformer(t *testing.T) { + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + listener := newTestListener("listener", 0, "pod1") + stop := make(chan struct{}) + go informer.Run(stop) + close(stop) + fmt.Println("sleeping") + time.Sleep(1 * time.Second) + if !informer.IsStopped() { + t.Errorf("informer reports not to be stopped although stop channel closed") + return + } + + handle, err := informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod) + if err == nil { + t.Errorf("stopped informer did not reject add handler") + return + } + if !strings.HasSuffix(err.Error(), "is not added to shared informer because it has stopped already") { + t.Errorf("adding handler to a stopped informer yields unexpected error: %s", err) + return + } + if handle != nil { + t.Errorf("got handle for added handler on stopped informer") + return + } +} + +func TestRemoveOnStoppedSharedInformer(t *testing.T) { + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + listener := newTestListener("listener", 0, "pod1") + handle, err := informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod) + if err != nil { + t.Errorf("informer did not add handler: %s", err) + return + } + stop := make(chan struct{}) + go informer.Run(stop) + close(stop) + fmt.Println("sleeping") + time.Sleep(1 * time.Second) + + if !informer.IsStopped() { + t.Errorf("informer reports not to be stopped although stop channel closed") + return + } + err = informer.RemoveEventHandlerByHandle(handle) + if err == nil { + t.Errorf("informer removes handler on stopped informer") + return + } + if !strings.HasSuffix(err.Error(), "is not removed from shared informer because it has stopped already") { + t.Errorf("removing handler from stopped informer yield unexpected error: %s", err) + return + } +} From f52f4a8e3045fd3eeba0315a347ed653012cd5c5 Mon Sep 17 00:00:00 2001 From: Alexander Zielenski <351783+alexzielenski@users.noreply.github.com> Date: Thu, 16 Jun 2022 00:09:20 -0700 Subject: [PATCH 02/15] remove informational informer methods again --- .../client-go/tools/cache/shared_informer.go | 33 +---------- .../tools/cache/shared_informer_test.go | 56 ++++++++++++------- 2 files changed, 36 insertions(+), 53 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index a78eb6f5f97..9e930b1c6f2 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -132,16 +132,6 @@ import ( // state, except that its ResourceVersion is replaced with a // ResourceVersion in which the object is actually absent. // -// The informational methods (EventHandlerCount, IsStopped, IsStarted) -// are intended to be used to manage informers in any upper informer -// management layer for creating and destroying informers on-the fly when -// adding or removing handlers. -// Beware of race conditions: Such a layer must hide the basic informer -// objects from its users and offer a closed *synchronized* view for informers. -// Although these informational methods are synchronized each, in -// sequences the state queried first might have been changed before -// calling the next method, if other callers (or the stop channel) -// are able to interact with the informer interface in parallel. type SharedInformer interface { // AddEventHandler adds an event handler to the shared informer using the shared informer's resync // period. Events to a single handler are delivered sequentially, but there is no coordination @@ -222,16 +212,10 @@ type SharedInformer interface { // data races. SetTransform(handler TransformFunc) error - // EventHandlerCount return the number of actually registered - // event handlers. - EventHandlerCount() int - // IsStopped reports whether the informer has already been stopped. // Adding event handlers to already stopped informers is not possible. + // An informer already stopped will never be started again. IsStopped() bool - - // IsStarted reports whether the informer has already been started - IsStarted() bool } // ResourceEventHandlerHandle is a handle returned by the @@ -667,13 +651,6 @@ func (s *sharedIndexInformer) OnDelete(old interface{}) { s.processor.distribute(deleteNotification{oldObj: old}, false) } -// IsStarted reports whether the informer has already been started -func (s *sharedIndexInformer) IsStarted() bool { - s.startedLock.Lock() - defer s.startedLock.Unlock() - return s.started -} - // IsStopped reports whether the informer has already been stopped func (s *sharedIndexInformer) IsStopped() bool { s.startedLock.Lock() @@ -681,14 +658,6 @@ func (s *sharedIndexInformer) IsStopped() bool { return s.stopped } -// EventHandlerCount reports whether the informer still has registered -// event handlers -func (s *sharedIndexInformer) EventHandlerCount() int { - s.startedLock.Lock() - defer s.startedLock.Unlock() - return len(s.processor.listeners) -} - // RemoveEventHandlerByHandle tries to remove a formerly added event handler by its // handle returned for its registration. // If a handler has been added multiple times, only the registration for the diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go index 21f197e0a8e..2fdf56afb2d 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go @@ -96,6 +96,20 @@ func (l *testListener) satisfiedExpectations() bool { return sets.NewString(l.receivedItemNames...).Equal(l.expectedItemNames) } +func eventHandlerCount(i SharedInformer) int { + s := i.(*sharedIndexInformer) + s.startedLock.Lock() + defer s.startedLock.Unlock() + return len(s.processor.listeners) +} + +func isStarted(i SharedInformer) bool { + s := i.(*sharedIndexInformer) + s.startedLock.Lock() + defer s.startedLock.Unlock() + return s.started +} + func TestListenerResyncPeriods(t *testing.T) { // source simulates an apiserver object endpoint. source := fcache.NewFakeControllerSource() @@ -410,21 +424,21 @@ func TestSharedInformerRemoveHandler(t *testing.T) { return } - if informer.EventHandlerCount() != 2 { - t.Errorf("informer has %d registered handler, instead of 2", informer.EventHandlerCount()) + if eventHandlerCount(informer) != 2 { + t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer)) } if err := informer.RemoveEventHandlerByHandle(handle2); err != nil { t.Errorf("removing of first pointer handler failed: %s", err) } - if informer.EventHandlerCount() != 1 { - t.Errorf("after removing handler informer has %d registered handler(s), instead of 1", informer.EventHandlerCount()) + if eventHandlerCount(informer) != 1 { + t.Errorf("after removing handler informer has %d registered handler(s), instead of 1", eventHandlerCount(informer)) } if err := informer.RemoveEventHandlerByHandle(handle1); err != nil { t.Errorf("removing of second pointer handler failed: %s", err) } - if informer.EventHandlerCount() != 0 { + if eventHandlerCount(informer) != 0 { t.Errorf("informer still has registered handlers after removing both handlers") } } @@ -448,22 +462,22 @@ func TestSharedInformerRemoveNonComparableHandler(t *testing.T) { return } - if informer.EventHandlerCount() != 2 { - t.Errorf("informer has %d registered handler(s), instead of 2", informer.EventHandlerCount()) + if eventHandlerCount(informer) != 2 { + t.Errorf("informer has %d registered handler(s), instead of 2", eventHandlerCount(informer)) } if err := informer.RemoveEventHandlerByHandle(handle2); err != nil { t.Errorf("removing of pointer handler failed: %s", err) } - if informer.EventHandlerCount() != 1 { - t.Errorf("after removal informer has %d registered handler(s), instead of 1", informer.EventHandlerCount()) + if eventHandlerCount(informer) != 1 { + t.Errorf("after removal informer has %d registered handler(s), instead of 1", eventHandlerCount(informer)) } if err := informer.RemoveEventHandlerByHandle(handle1); err != nil { t.Errorf("removing of non-pointer handler failed: %s", err) } - if informer.EventHandlerCount() != 0 { - t.Errorf("after removal informer has %d registered handler(s), instead of 0", informer.EventHandlerCount()) + if eventHandlerCount(informer) != 0 { + t.Errorf("after removal informer has %d registered handler(s), instead of 0", eventHandlerCount(informer)) } } @@ -496,8 +510,8 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { return } - if informer.EventHandlerCount() != 2 { - t.Errorf("informer has %d registered handler(s), instead of 1", informer.EventHandlerCount()) + if eventHandlerCount(informer) != 2 { + t.Errorf("informer has %d registered handler(s), instead of 1", eventHandlerCount(informer)) } if err := informer.RemoveEventHandlerByHandle(reg1); err != nil { @@ -513,11 +527,11 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { return } - if informer.EventHandlerCount() != 1 { - if informer.EventHandlerCount() == 0 { + if eventHandlerCount(informer) != 1 { + if eventHandlerCount(informer) == 0 { t.Errorf("informer has no registered handler anymore after removal of duplicate registrations") } else { - t.Errorf("informer has unexpected number (%d) of handlers after removal of duplicate handler registration", informer.EventHandlerCount()) + t.Errorf("informer has unexpected number (%d) of handlers after removal of duplicate handler registration", eventHandlerCount(informer)) } } @@ -530,8 +544,8 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { return } - if informer.EventHandlerCount() != 0 { - t.Errorf("informer has unexpected number (%d) of handlers after removal of second handler registrations", informer.EventHandlerCount()) + if eventHandlerCount(informer) != 0 { + t.Errorf("informer has unexpected number (%d) of handlers after removal of second handler registrations", eventHandlerCount(informer)) } } @@ -572,7 +586,7 @@ func TestStateSharedInformer(t *testing.T) { listener := newTestListener("listener", 0, "pod1") informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod) - if informer.IsStarted() { + if isStarted(informer) { t.Errorf("informer already started after creation") return } @@ -588,7 +602,7 @@ func TestStateSharedInformer(t *testing.T) { return } - if !informer.IsStarted() { + if !isStarted(informer) { t.Errorf("informer does not report to be started although handling events") close(stop) return @@ -607,7 +621,7 @@ func TestStateSharedInformer(t *testing.T) { t.Errorf("informer reports not to be stopped although stop channel closed") return } - if !informer.IsStarted() { + if !isStarted(informer) { t.Errorf("informer reports not to be started after it has been started and stopped") return } From 7054ac16d43a4a55d6e7b69943eb209f6495c4ce Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Wed, 4 Aug 2021 21:54:32 +0200 Subject: [PATCH 03/15] rename handle to registration --- pkg/controller/replication/conversion.go | 4 +- .../client-go/tools/cache/shared_informer.go | 39 ++++++++++--------- .../tools/cache/shared_informer_test.go | 32 +++++++-------- 3 files changed, 38 insertions(+), 37 deletions(-) diff --git a/pkg/controller/replication/conversion.go b/pkg/controller/replication/conversion.go index 37e5d046382..842e7e3c8e8 100644 --- a/pkg/controller/replication/conversion.go +++ b/pkg/controller/replication/conversion.go @@ -70,11 +70,11 @@ type conversionInformer struct { cache.SharedIndexInformer } -func (i conversionInformer) AddEventHandler(handler cache.ResourceEventHandler) (*cache.ResourceEventHandlerHandle, error) { +func (i conversionInformer) AddEventHandler(handler cache.ResourceEventHandler) (*cache.ResourceEventHandlerRegistration, error) { return i.SharedIndexInformer.AddEventHandler(conversionEventHandler{handler}) } -func (i conversionInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) (*cache.ResourceEventHandlerHandle, error) { +func (i conversionInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) (*cache.ResourceEventHandlerRegistration, error) { return i.SharedIndexInformer.AddEventHandlerWithResyncPeriod(conversionEventHandler{handler}, resyncPeriod) } diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index 9e930b1c6f2..813daafd210 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -138,7 +138,7 @@ type SharedInformer interface { // between different handlers. // It returns a handle for the handler that can be used to remove // the handler again. - AddEventHandler(handler ResourceEventHandler) (*ResourceEventHandlerHandle, error) + AddEventHandler(handler ResourceEventHandler) (*ResourceEventHandlerRegistration, error) // AddEventHandlerWithResyncPeriod adds an event handler to the // shared informer with the requested resync period; zero means // this handler does not care about resyncs. The resync operation @@ -155,8 +155,8 @@ type SharedInformer interface { // be competing load and scheduling noise. // It returns a handle for the handler that can be used to remove // the handler again and an error if the handler cannot be added. - AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (*ResourceEventHandlerHandle, error) - // RemoveEventHandlerByHandle removes a formerly added event handler given by + AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (*ResourceEventHandlerRegistration, error) + // RemoveEventHandlerByRegistration removes a formerly added event handler given by // its registration handle. // If, for some reason, the same handler has been added multiple // times, only the registration for the given registration handle @@ -166,7 +166,7 @@ type SharedInformer interface { // Calling Remove on an already removed handle returns no error // because the handler is finally (still) removed after calling this // method. - RemoveEventHandlerByHandle(handle *ResourceEventHandlerHandle) error + RemoveEventHandlerByRegistration(handle *ResourceEventHandlerRegistration) error // GetStore returns the informer's local cache as a Store. GetStore() Store // GetController is deprecated, it does nothing useful @@ -218,21 +218,21 @@ type SharedInformer interface { IsStopped() bool } -// ResourceEventHandlerHandle is a handle returned by the +// ResourceEventHandlerRegistration is a handle returned by the // registration methods of SharedInformers for a registered // ResourceEventHandler. It can be used later on to remove // a registration again. // This indirection is required, because the ResourceEventHandler // interface can be implemented by non-go-comparable handlers, which // could not be removed from a list anymore. -type ResourceEventHandlerHandle struct { +type ResourceEventHandlerRegistration struct { listener *processorListener } -// IsActive reports whether this registration is still active +// isActive reports whether this registration is still active // meaning that the handler registered with this handle is // still registered. -func (h *ResourceEventHandlerHandle) IsActive() bool { +func (h *ResourceEventHandlerRegistration) isActive() bool { return h.listener != nil } @@ -531,7 +531,7 @@ func (s *sharedIndexInformer) GetController() Controller { return &dummyController{informer: s} } -func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (*ResourceEventHandlerHandle, error) { +func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (*ResourceEventHandlerRegistration, error) { return s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) } @@ -552,7 +552,7 @@ func determineResyncPeriod(desired, check time.Duration) time.Duration { const minimumResyncPeriod = 1 * time.Second -func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (*ResourceEventHandlerHandle, error) { +func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (*ResourceEventHandlerRegistration, error) { s.startedLock.Lock() defer s.startedLock.Unlock() @@ -582,7 +582,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv } listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize) - handle := &ResourceEventHandlerHandle{listener} + handle := &ResourceEventHandlerRegistration{listener} if !s.started { s.processor.addListener(listener) @@ -658,18 +658,19 @@ func (s *sharedIndexInformer) IsStopped() bool { return s.stopped } -// RemoveEventHandlerByHandle tries to remove a formerly added event handler by its -// handle returned for its registration. -// If a handler has been added multiple times, only the registration for the -// given handle will be removed. -func (s *sharedIndexInformer) RemoveEventHandlerByHandle(handle *ResourceEventHandlerHandle) error { +// RemoveEventHandlerByRegistration tries to remove a formerly added event handler by its +// registration handle returned for its registration. +// If a handler has been added multiple times, only the actual registration for the +// handler will be removed. Other registrations of the same handler will still be +// active until they are explicitly removes, also. +func (s *sharedIndexInformer) RemoveEventHandlerByRegistration(handle *ResourceEventHandlerRegistration) error { + s.startedLock.Lock() + defer s.startedLock.Unlock() + if handle.listener == nil { return nil } - s.startedLock.Lock() - defer s.startedLock.Unlock() - if s.stopped { return fmt.Errorf("handler %v is not removed from shared informer because it has stopped already", handle.listener.handler) } diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go index 2fdf56afb2d..6feeb218670 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go @@ -428,14 +428,14 @@ func TestSharedInformerRemoveHandler(t *testing.T) { t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer)) } - if err := informer.RemoveEventHandlerByHandle(handle2); err != nil { + if err := informer.RemoveEventHandlerByRegistration(handle2); err != nil { t.Errorf("removing of first pointer handler failed: %s", err) } if eventHandlerCount(informer) != 1 { t.Errorf("after removing handler informer has %d registered handler(s), instead of 1", eventHandlerCount(informer)) } - if err := informer.RemoveEventHandlerByHandle(handle1); err != nil { + if err := informer.RemoveEventHandlerByRegistration(handle1); err != nil { t.Errorf("removing of second pointer handler failed: %s", err) } if eventHandlerCount(informer) != 0 { @@ -466,14 +466,14 @@ func TestSharedInformerRemoveNonComparableHandler(t *testing.T) { t.Errorf("informer has %d registered handler(s), instead of 2", eventHandlerCount(informer)) } - if err := informer.RemoveEventHandlerByHandle(handle2); err != nil { + if err := informer.RemoveEventHandlerByRegistration(handle2); err != nil { t.Errorf("removing of pointer handler failed: %s", err) } if eventHandlerCount(informer) != 1 { t.Errorf("after removal informer has %d registered handler(s), instead of 1", eventHandlerCount(informer)) } - if err := informer.RemoveEventHandlerByHandle(handle1); err != nil { + if err := informer.RemoveEventHandlerByRegistration(handle1); err != nil { t.Errorf("removing of non-pointer handler failed: %s", err) } if eventHandlerCount(informer) != 0 { @@ -494,7 +494,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { return } - if !reg1.IsActive() { + if !reg1.isActive() { t.Errorf("handle1 is not active after successful registration") return } @@ -505,7 +505,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { return } - if !reg2.IsActive() { + if !reg2.isActive() { t.Errorf("handle2 is not active after successful registration") return } @@ -514,15 +514,15 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { t.Errorf("informer has %d registered handler(s), instead of 1", eventHandlerCount(informer)) } - if err := informer.RemoveEventHandlerByHandle(reg1); err != nil { + if err := informer.RemoveEventHandlerByRegistration(reg1); err != nil { t.Errorf("removing of duplicate handler registration failed: %s", err) } - if reg1.IsActive() { + if reg1.isActive() { t.Errorf("handle1 is still active after successful remove") return } - if !reg2.IsActive() { + if !reg2.isActive() { t.Errorf("handle2 is not active after removing handle1") return } @@ -535,11 +535,11 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { } } - if err := informer.RemoveEventHandlerByHandle(reg2); err != nil { + if err := informer.RemoveEventHandlerByRegistration(reg2); err != nil { t.Errorf("removing of second handler registration failed: %s", err) } - if reg2.IsActive() { + if reg2.isActive() { t.Errorf("handle2 is still active after successful remove") return } @@ -561,18 +561,18 @@ func TestRemovingRemovedSharedInformer(t *testing.T) { t.Errorf("informer did not add handler for the first time: %s", err) return } - if err := informer.RemoveEventHandlerByHandle(reg); err != nil { + if err := informer.RemoveEventHandlerByRegistration(reg); err != nil { t.Errorf("removing of handler registration failed: %s", err) return } - if reg.IsActive() { + if reg.isActive() { t.Errorf("handle is still active after successful remove") return } - if err := informer.RemoveEventHandlerByHandle(reg); err != nil { + if err := informer.RemoveEventHandlerByRegistration(reg); err != nil { t.Errorf("removing of already removed registration yields unexpected error: %s", err) } - if reg.IsActive() { + if reg.isActive() { t.Errorf("handle is still active after second remove") return } @@ -679,7 +679,7 @@ func TestRemoveOnStoppedSharedInformer(t *testing.T) { t.Errorf("informer reports not to be stopped although stop channel closed") return } - err = informer.RemoveEventHandlerByHandle(handle) + err = informer.RemoveEventHandlerByRegistration(handle) if err == nil { t.Errorf("informer removes handler on stopped informer") return From 92f04baac98186673c684bba419087d6285807b1 Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Sun, 22 Aug 2021 13:03:18 +0200 Subject: [PATCH 04/15] apply desired changes for handler registration --- pkg/controller/replication/conversion.go | 4 +- .../client-go/tools/cache/shared_informer.go | 57 ++++++++++++------- .../tools/cache/shared_informer_test.go | 25 ++++---- 3 files changed, 50 insertions(+), 36 deletions(-) diff --git a/pkg/controller/replication/conversion.go b/pkg/controller/replication/conversion.go index 842e7e3c8e8..6a2899723bf 100644 --- a/pkg/controller/replication/conversion.go +++ b/pkg/controller/replication/conversion.go @@ -70,11 +70,11 @@ type conversionInformer struct { cache.SharedIndexInformer } -func (i conversionInformer) AddEventHandler(handler cache.ResourceEventHandler) (*cache.ResourceEventHandlerRegistration, error) { +func (i conversionInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) { return i.SharedIndexInformer.AddEventHandler(conversionEventHandler{handler}) } -func (i conversionInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) (*cache.ResourceEventHandlerRegistration, error) { +func (i conversionInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) (cache.ResourceEventHandlerRegistration, error) { return i.SharedIndexInformer.AddEventHandlerWithResyncPeriod(conversionEventHandler{handler}, resyncPeriod) } diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index 813daafd210..ce86cb71025 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -136,9 +136,9 @@ type SharedInformer interface { // AddEventHandler adds an event handler to the shared informer using the shared informer's resync // period. Events to a single handler are delivered sequentially, but there is no coordination // between different handlers. - // It returns a handle for the handler that can be used to remove + // It returns a registration handle for the handler that can be used to remove // the handler again. - AddEventHandler(handler ResourceEventHandler) (*ResourceEventHandlerRegistration, error) + AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) // AddEventHandlerWithResyncPeriod adds an event handler to the // shared informer with the requested resync period; zero means // this handler does not care about resyncs. The resync operation @@ -153,9 +153,9 @@ type SharedInformer interface { // between any two resyncs may be longer than the nominal period // because the implementation takes time to do work and there may // be competing load and scheduling noise. - // It returns a handle for the handler that can be used to remove + // It returns a registration handle for the handler that can be used to remove // the handler again and an error if the handler cannot be added. - AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (*ResourceEventHandlerRegistration, error) + AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) // RemoveEventHandlerByRegistration removes a formerly added event handler given by // its registration handle. // If, for some reason, the same handler has been added multiple @@ -166,7 +166,7 @@ type SharedInformer interface { // Calling Remove on an already removed handle returns no error // because the handler is finally (still) removed after calling this // method. - RemoveEventHandlerByRegistration(handle *ResourceEventHandlerRegistration) error + RemoveEventHandlerByRegistration(handle ResourceEventHandlerRegistration) error // GetStore returns the informer's local cache as a Store. GetStore() Store // GetController is deprecated, it does nothing useful @@ -229,13 +229,6 @@ type ResourceEventHandlerRegistration struct { listener *processorListener } -// isActive reports whether this registration is still active -// meaning that the handler registered with this handle is -// still registered. -func (h *ResourceEventHandlerRegistration) isActive() bool { - return h.listener != nil -} - // SharedIndexInformer provides add and get Indexers ability based on SharedInformer. type SharedIndexInformer interface { SharedInformer @@ -409,6 +402,10 @@ type deleteNotification struct { oldObj interface{} } +func (s *sharedIndexInformer) isRegistered(h ResourceEventHandlerRegistration) bool { + return s.processor.isRegistered(h.listener) +} + func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) error { s.startedLock.Lock() defer s.startedLock.Unlock() @@ -531,7 +528,7 @@ func (s *sharedIndexInformer) GetController() Controller { return &dummyController{informer: s} } -func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (*ResourceEventHandlerRegistration, error) { +func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) { return s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) } @@ -552,13 +549,13 @@ func determineResyncPeriod(desired, check time.Duration) time.Duration { const minimumResyncPeriod = 1 * time.Second -func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (*ResourceEventHandlerRegistration, error) { +func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) { s.startedLock.Lock() defer s.startedLock.Unlock() if s.stopped { klog.V(2).Infof("Handler %v is not added to shared informer because it has stopped already", handler) - return nil, fmt.Errorf("handler %v is not added to shared informer because it has stopped already", handler) + return ResourceEventHandlerRegistration{}, fmt.Errorf("handler %v is not added to shared informer because it has stopped already", handler) } if resyncPeriod > 0 { @@ -582,7 +579,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv } listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize) - handle := &ResourceEventHandlerRegistration{listener} + handle := ResourceEventHandlerRegistration{listener} if !s.started { s.processor.addListener(listener) @@ -663,7 +660,7 @@ func (s *sharedIndexInformer) IsStopped() bool { // If a handler has been added multiple times, only the actual registration for the // handler will be removed. Other registrations of the same handler will still be // active until they are explicitly removes, also. -func (s *sharedIndexInformer) RemoveEventHandlerByRegistration(handle *ResourceEventHandlerRegistration) error { +func (s *sharedIndexInformer) RemoveEventHandlerByRegistration(handle ResourceEventHandlerRegistration) error { s.startedLock.Lock() defer s.startedLock.Unlock() @@ -682,7 +679,6 @@ func (s *sharedIndexInformer) RemoveEventHandlerByRegistration(handle *ResourceE s.blockDeltas.Lock() defer s.blockDeltas.Unlock() s.processor.removeListener(handle.listener) - handle.listener = nil return nil } @@ -701,6 +697,19 @@ type sharedProcessor struct { wg wait.Group } +func (p *sharedProcessor) isRegistered(listener *processorListener) bool { + p.listenersLock.Lock() + defer p.listenersLock.Unlock() + + for i := 0; i < len(p.listeners); i++ { + l := p.listeners[i] + if l == listener { + return true + } + } + return false +} + func (p *sharedProcessor) addListener(listener *processorListener) { p.listenersLock.Lock() defer p.listenersLock.Unlock() @@ -721,18 +730,21 @@ func (p *sharedProcessor) removeListener(listener *processorListener) { p.listenersLock.Lock() defer p.listenersLock.Unlock() - p.removeListenerLocked(listener) - if p.listenersStarted { - close(listener.addCh) + if p.removeListenerLocked(listener) { + if p.listenersStarted { + close(listener.addCh) + } } } -func (p *sharedProcessor) removeListenerLocked(listener *processorListener) { +func (p *sharedProcessor) removeListenerLocked(listener *processorListener) bool { + found := false for i := 0; i < len(p.listeners); i++ { l := p.listeners[i] if l == listener { p.listeners = append(p.listeners[:i], p.listeners[i+1:]...) i-- + found = true } } for i := 0; i < len(p.syncingListeners); i++ { @@ -742,6 +754,7 @@ func (p *sharedProcessor) removeListenerLocked(listener *processorListener) { i-- } } + return found } func (p *sharedProcessor) distribute(obj interface{}, sync bool) { diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go index 6feeb218670..214a4342dd7 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go @@ -110,6 +110,11 @@ func isStarted(i SharedInformer) bool { return s.started } +func isRegistered(i SharedInformer, h ResourceEventHandlerRegistration) bool { + s := i.(*sharedIndexInformer) + return s.isRegistered(h) +} + func TestListenerResyncPeriods(t *testing.T) { // source simulates an apiserver object endpoint. source := fcache.NewFakeControllerSource() @@ -494,7 +499,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { return } - if !reg1.isActive() { + if !isRegistered(informer,reg1) { t.Errorf("handle1 is not active after successful registration") return } @@ -505,7 +510,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { return } - if !reg2.isActive() { + if !isRegistered(informer,reg2) { t.Errorf("handle2 is not active after successful registration") return } @@ -518,11 +523,11 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { t.Errorf("removing of duplicate handler registration failed: %s", err) } - if reg1.isActive() { + if isRegistered(informer,reg1) { t.Errorf("handle1 is still active after successful remove") return } - if !reg2.isActive() { + if !isRegistered(informer, reg2) { t.Errorf("handle2 is not active after removing handle1") return } @@ -539,7 +544,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { t.Errorf("removing of second handler registration failed: %s", err) } - if reg2.isActive() { + if isRegistered(informer,reg2) { t.Errorf("handle2 is still active after successful remove") return } @@ -565,14 +570,14 @@ func TestRemovingRemovedSharedInformer(t *testing.T) { t.Errorf("removing of handler registration failed: %s", err) return } - if reg.isActive() { + if isRegistered(informer,reg) { t.Errorf("handle is still active after successful remove") return } if err := informer.RemoveEventHandlerByRegistration(reg); err != nil { t.Errorf("removing of already removed registration yields unexpected error: %s", err) } - if reg.isActive() { + if isRegistered(informer,reg) { t.Errorf("handle is still active after second remove") return } @@ -643,7 +648,7 @@ func TestAddOnStoppedSharedInformer(t *testing.T) { return } - handle, err := informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod) + _, err := informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod) if err == nil { t.Errorf("stopped informer did not reject add handler") return @@ -652,10 +657,6 @@ func TestAddOnStoppedSharedInformer(t *testing.T) { t.Errorf("adding handler to a stopped informer yields unexpected error: %s", err) return } - if handle != nil { - t.Errorf("got handle for added handler on stopped informer") - return - } } func TestRemoveOnStoppedSharedInformer(t *testing.T) { From 152b6e11af4c8c50b768630bf7724e0bb3414e9c Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Sun, 22 Aug 2021 13:27:36 +0200 Subject: [PATCH 05/15] tests for invalid registration removals --- .../tools/cache/shared_informer_test.go | 107 ++++++++++++++++-- 1 file changed, 100 insertions(+), 7 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go index 214a4342dd7..98eeb7a3e88 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go @@ -434,15 +434,108 @@ func TestSharedInformerRemoveHandler(t *testing.T) { } if err := informer.RemoveEventHandlerByRegistration(handle2); err != nil { - t.Errorf("removing of first pointer handler failed: %s", err) + t.Errorf("removing of second pointer handler failed: %s", err) } if eventHandlerCount(informer) != 1 { t.Errorf("after removing handler informer has %d registered handler(s), instead of 1", eventHandlerCount(informer)) } if err := informer.RemoveEventHandlerByRegistration(handle1); err != nil { + t.Errorf("removing of first pointer handler failed: %s", err) + } + if eventHandlerCount(informer) != 0 { + t.Errorf("informer still has registered handlers after removing both handlers") + } +} + +func TestSharedInformerRemoveForeignHandler(t *testing.T) { + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) + + source2 := fcache.NewFakeControllerSource() + source2.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + informer2 := NewSharedInformer(source2, &v1.Pod{}, 1*time.Second) + + handler1 := &ResourceEventHandlerFuncs{} + handle1, err := informer.AddEventHandler(handler1) + if err != nil { + t.Errorf("informer did not add handler1: %s", err) + return + } + handler2 := &ResourceEventHandlerFuncs{} + handle2, err := informer.AddEventHandler(handler2) + if err != nil { + t.Errorf("informer did not add handler2: %s", err) + return + } + + if eventHandlerCount(informer) != 2 { + t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer)) + } + if eventHandlerCount(informer2) != 0 { + t.Errorf("informer2 has %d registered handler, instead of 0", eventHandlerCount(informer2)) + } + + // remove handle at foreign informer + if isRegistered(informer2, handle1) { + t.Errorf("handle1 registered for informer2") + } + if isRegistered(informer2, handle2) { + t.Errorf("handle2 registered for informer2") + } + if err := informer2.RemoveEventHandlerByRegistration(handle1); err != nil { t.Errorf("removing of second pointer handler failed: %s", err) } + if eventHandlerCount(informer) != 2 { + t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer)) + } + if eventHandlerCount(informer2) != 0 { + t.Errorf("informer2 has %d registered handler, instead of 0", eventHandlerCount(informer2)) + } + if !isRegistered(informer, handle1) { + t.Errorf("handle1 not registered anymore for informer") + } + if !isRegistered(informer, handle2) { + t.Errorf("handle2 not registered anymore for informer") + } + + // remove empty handle + empty := ResourceEventHandlerRegistration{} + if isRegistered(informer, empty) { + t.Errorf("empty registered for informer") + } + if isRegistered(informer, empty) { + t.Errorf("empty registered for informer") + } + if err := informer2.RemoveEventHandlerByRegistration(empty); err != nil { + t.Errorf("removing of empty handle failed: %s", err) + } + if eventHandlerCount(informer) != 2 { + t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer)) + } + if eventHandlerCount(informer2) != 0 { + t.Errorf("informer2 has %d registered handler, instead of 0", eventHandlerCount(informer2)) + } + if !isRegistered(informer, handle1) { + t.Errorf("handle1 not registered anymore for informer") + } + if !isRegistered(informer, handle2) { + t.Errorf("handle2 not registered anymore for informer") + } + + if err := informer.RemoveEventHandlerByRegistration(handle2); err != nil { + t.Errorf("removing of second pointer handler failed: %s", err) + } + if eventHandlerCount(informer) != 1 { + t.Errorf("after removing handler informer has %d registered handler(s), instead of 1", eventHandlerCount(informer)) + } + + if err := informer.RemoveEventHandlerByRegistration(handle1); err != nil { + t.Errorf("removing of first pointer handler failed: %s", err) + } if eventHandlerCount(informer) != 0 { t.Errorf("informer still has registered handlers after removing both handlers") } @@ -499,7 +592,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { return } - if !isRegistered(informer,reg1) { + if !isRegistered(informer, reg1) { t.Errorf("handle1 is not active after successful registration") return } @@ -510,7 +603,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { return } - if !isRegistered(informer,reg2) { + if !isRegistered(informer, reg2) { t.Errorf("handle2 is not active after successful registration") return } @@ -523,7 +616,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { t.Errorf("removing of duplicate handler registration failed: %s", err) } - if isRegistered(informer,reg1) { + if isRegistered(informer, reg1) { t.Errorf("handle1 is still active after successful remove") return } @@ -544,7 +637,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { t.Errorf("removing of second handler registration failed: %s", err) } - if isRegistered(informer,reg2) { + if isRegistered(informer, reg2) { t.Errorf("handle2 is still active after successful remove") return } @@ -570,14 +663,14 @@ func TestRemovingRemovedSharedInformer(t *testing.T) { t.Errorf("removing of handler registration failed: %s", err) return } - if isRegistered(informer,reg) { + if isRegistered(informer, reg) { t.Errorf("handle is still active after successful remove") return } if err := informer.RemoveEventHandlerByRegistration(reg); err != nil { t.Errorf("removing of already removed registration yields unexpected error: %s", err) } - if isRegistered(informer,reg) { + if isRegistered(informer, reg) { t.Errorf("handle is still active after second remove") return } From e9cd17170b1646672796e713dee6c4fb0e6693bb Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Wed, 6 Oct 2021 12:40:46 +0200 Subject: [PATCH 06/15] active remove/add tests for event handlers --- .../client-go/tools/cache/shared_informer.go | 14 +- .../tools/cache/shared_informer_test.go | 221 +++++++++++++----- 2 files changed, 170 insertions(+), 65 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index ce86cb71025..ce9d086d747 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -156,7 +156,7 @@ type SharedInformer interface { // It returns a registration handle for the handler that can be used to remove // the handler again and an error if the handler cannot be added. AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) - // RemoveEventHandlerByRegistration removes a formerly added event handler given by + // RemoveEventHandler removes a formerly added event handler given by // its registration handle. // If, for some reason, the same handler has been added multiple // times, only the registration for the given registration handle @@ -166,7 +166,7 @@ type SharedInformer interface { // Calling Remove on an already removed handle returns no error // because the handler is finally (still) removed after calling this // method. - RemoveEventHandlerByRegistration(handle ResourceEventHandlerRegistration) error + RemoveEventHandler(handle ResourceEventHandlerRegistration) error // GetStore returns the informer's local cache as a Store. GetStore() Store // GetController is deprecated, it does nothing useful @@ -655,12 +655,12 @@ func (s *sharedIndexInformer) IsStopped() bool { return s.stopped } -// RemoveEventHandlerByRegistration tries to remove a formerly added event handler by its +// RemoveEventHandler tries to remove a formerly added event handler by its // registration handle returned for its registration. // If a handler has been added multiple times, only the actual registration for the // handler will be removed. Other registrations of the same handler will still be // active until they are explicitly removes, also. -func (s *sharedIndexInformer) RemoveEventHandlerByRegistration(handle ResourceEventHandlerRegistration) error { +func (s *sharedIndexInformer) RemoveEventHandler(handle ResourceEventHandlerRegistration) error { s.startedLock.Lock() defer s.startedLock.Unlock() @@ -669,7 +669,7 @@ func (s *sharedIndexInformer) RemoveEventHandlerByRegistration(handle ResourceEv } if s.stopped { - return fmt.Errorf("handler %v is not removed from shared informer because it has stopped already", handle.listener.handler) + return nil } // in order to safely remove, we have to @@ -743,15 +743,15 @@ func (p *sharedProcessor) removeListenerLocked(listener *processorListener) bool l := p.listeners[i] if l == listener { p.listeners = append(p.listeners[:i], p.listeners[i+1:]...) - i-- found = true + break } } for i := 0; i < len(p.syncingListeners); i++ { l := p.syncingListeners[i] if l == listener { p.syncingListeners = append(p.syncingListeners[:i], p.syncingListeners[i+1:]...) - i-- + break } } return found diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go index 98eeb7a3e88..83173342e00 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go @@ -33,60 +33,32 @@ import ( ) type testListener struct { - lock sync.RWMutex + genericTestListener resyncPeriod time.Duration expectedItemNames sets.String receivedItemNames []string - name string } func newTestListener(name string, resyncPeriod time.Duration, expected ...string) *testListener { l := &testListener{ - resyncPeriod: resyncPeriod, - expectedItemNames: sets.NewString(expected...), - name: name, + genericTestListener: genericTestListener{name: name}, + resyncPeriod: resyncPeriod, + expectedItemNames: sets.NewString(expected...), } + l.genericTestListener.action = l.action return l } -func (l *testListener) OnAdd(obj interface{}) { - l.handle(obj) -} - -func (l *testListener) OnUpdate(old, new interface{}) { - l.handle(new) -} - -func (l *testListener) OnDelete(obj interface{}) { -} - -func (l *testListener) handle(obj interface{}) { +func (l *testListener) action(obj interface{}) { key, _ := MetaNamespaceKeyFunc(obj) fmt.Printf("%s: handle: %v\n", l.name, key) - l.lock.Lock() - defer l.lock.Unlock() objectMeta, _ := meta.Accessor(obj) l.receivedItemNames = append(l.receivedItemNames, objectMeta.GetName()) } func (l *testListener) ok() bool { - fmt.Println("polling") - err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) { - if l.satisfiedExpectations() { - return true, nil - } - return false, nil - }) - if err != nil { - return false - } - - // wait just a bit to allow any unexpected stragglers to come in - fmt.Println("sleeping") - time.Sleep(1 * time.Second) - fmt.Println("final check") - return l.satisfiedExpectations() + return l.genericTestListener.ok(l.satisfiedExpectations) } func (l *testListener) satisfiedExpectations() bool { @@ -433,14 +405,14 @@ func TestSharedInformerRemoveHandler(t *testing.T) { t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer)) } - if err := informer.RemoveEventHandlerByRegistration(handle2); err != nil { + if err := informer.RemoveEventHandler(handle2); err != nil { t.Errorf("removing of second pointer handler failed: %s", err) } if eventHandlerCount(informer) != 1 { t.Errorf("after removing handler informer has %d registered handler(s), instead of 1", eventHandlerCount(informer)) } - if err := informer.RemoveEventHandlerByRegistration(handle1); err != nil { + if err := informer.RemoveEventHandler(handle1); err != nil { t.Errorf("removing of first pointer handler failed: %s", err) } if eventHandlerCount(informer) != 0 { @@ -486,7 +458,7 @@ func TestSharedInformerRemoveForeignHandler(t *testing.T) { if isRegistered(informer2, handle2) { t.Errorf("handle2 registered for informer2") } - if err := informer2.RemoveEventHandlerByRegistration(handle1); err != nil { + if err := informer2.RemoveEventHandler(handle1); err != nil { t.Errorf("removing of second pointer handler failed: %s", err) } if eventHandlerCount(informer) != 2 { @@ -510,7 +482,7 @@ func TestSharedInformerRemoveForeignHandler(t *testing.T) { if isRegistered(informer, empty) { t.Errorf("empty registered for informer") } - if err := informer2.RemoveEventHandlerByRegistration(empty); err != nil { + if err := informer2.RemoveEventHandler(empty); err != nil { t.Errorf("removing of empty handle failed: %s", err) } if eventHandlerCount(informer) != 2 { @@ -526,14 +498,14 @@ func TestSharedInformerRemoveForeignHandler(t *testing.T) { t.Errorf("handle2 not registered anymore for informer") } - if err := informer.RemoveEventHandlerByRegistration(handle2); err != nil { + if err := informer.RemoveEventHandler(handle2); err != nil { t.Errorf("removing of second pointer handler failed: %s", err) } if eventHandlerCount(informer) != 1 { t.Errorf("after removing handler informer has %d registered handler(s), instead of 1", eventHandlerCount(informer)) } - if err := informer.RemoveEventHandlerByRegistration(handle1); err != nil { + if err := informer.RemoveEventHandler(handle1); err != nil { t.Errorf("removing of first pointer handler failed: %s", err) } if eventHandlerCount(informer) != 0 { @@ -564,14 +536,14 @@ func TestSharedInformerRemoveNonComparableHandler(t *testing.T) { t.Errorf("informer has %d registered handler(s), instead of 2", eventHandlerCount(informer)) } - if err := informer.RemoveEventHandlerByRegistration(handle2); err != nil { + if err := informer.RemoveEventHandler(handle2); err != nil { t.Errorf("removing of pointer handler failed: %s", err) } if eventHandlerCount(informer) != 1 { t.Errorf("after removal informer has %d registered handler(s), instead of 1", eventHandlerCount(informer)) } - if err := informer.RemoveEventHandlerByRegistration(handle1); err != nil { + if err := informer.RemoveEventHandler(handle1); err != nil { t.Errorf("removing of non-pointer handler failed: %s", err) } if eventHandlerCount(informer) != 0 { @@ -609,10 +581,10 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { } if eventHandlerCount(informer) != 2 { - t.Errorf("informer has %d registered handler(s), instead of 1", eventHandlerCount(informer)) + t.Errorf("informer has %d registered handler(s), instead of 2", eventHandlerCount(informer)) } - if err := informer.RemoveEventHandlerByRegistration(reg1); err != nil { + if err := informer.RemoveEventHandler(reg1); err != nil { t.Errorf("removing of duplicate handler registration failed: %s", err) } @@ -633,7 +605,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { } } - if err := informer.RemoveEventHandlerByRegistration(reg2); err != nil { + if err := informer.RemoveEventHandler(reg2); err != nil { t.Errorf("removing of second handler registration failed: %s", err) } @@ -659,7 +631,7 @@ func TestRemovingRemovedSharedInformer(t *testing.T) { t.Errorf("informer did not add handler for the first time: %s", err) return } - if err := informer.RemoveEventHandlerByRegistration(reg); err != nil { + if err := informer.RemoveEventHandler(reg); err != nil { t.Errorf("removing of handler registration failed: %s", err) return } @@ -667,7 +639,7 @@ func TestRemovingRemovedSharedInformer(t *testing.T) { t.Errorf("handle is still active after successful remove") return } - if err := informer.RemoveEventHandlerByRegistration(reg); err != nil { + if err := informer.RemoveEventHandler(reg); err != nil { t.Errorf("removing of already removed registration yields unexpected error: %s", err) } if isRegistered(informer, reg) { @@ -734,9 +706,7 @@ func TestAddOnStoppedSharedInformer(t *testing.T) { stop := make(chan struct{}) go informer.Run(stop) close(stop) - fmt.Println("sleeping") - time.Sleep(1 * time.Second) - if !informer.IsStopped() { + if !checkCondition(informer.IsStopped) { t.Errorf("informer reports not to be stopped although stop channel closed") return } @@ -773,13 +743,148 @@ func TestRemoveOnStoppedSharedInformer(t *testing.T) { t.Errorf("informer reports not to be stopped although stop channel closed") return } - err = informer.RemoveEventHandlerByRegistration(handle) - if err == nil { - t.Errorf("informer removes handler on stopped informer") - return - } - if !strings.HasSuffix(err.Error(), "is not removed from shared informer because it has stopped already") { - t.Errorf("removing handler from stopped informer yield unexpected error: %s", err) + err = informer.RemoveEventHandler(handle) + if err != nil { + t.Errorf("informer does not remove handler on stopped informer") return } } + +func TestRemoveWhileActive(t *testing.T) { + // source simulates an apiserver object endpoint. + source := fcache.NewFakeControllerSource() + + // create the shared informer and resync every 12 hours + informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer) + + listener := &genericTestListener{name: "listener"} + handle, _ := informer.AddEventHandler(listener) + listener.action = func(obj interface{}) { + informer.RemoveEventHandler(handle) + } + + stop := make(chan struct{}) + defer close(stop) + + go informer.Run(stop) + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + if !listener.ok(func() bool { + return listener.getCount() == 1 + }) { + t.Errorf("event did not occur") + return + } + + if isRegistered(informer, handle) { + t.Errorf("handle is still active after successful remove") + return + } +} + +func TestAddWhileActive(t *testing.T) { + // source simulates an apiserver object endpoint. + source := fcache.NewFakeControllerSource() + + // create the shared informer and resync every 12 hours + informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer) + + listener1 := &genericTestListener{name: "listener1"} + listener2 := &genericTestListener{name: "listener2"} + + handle1, _ := informer.AddEventHandler(listener1) + var handle2 ResourceEventHandlerRegistration + listener1.action = func(obj interface{}) { + listener1.action = nil + handle2, _ = informer.AddEventHandler(listener2) + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}}) + } + + stop := make(chan struct{}) + defer close(stop) + + go informer.Run(stop) + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + if !listener1.ok(func() bool { + return listener1.getCount() == 2 + }) { + t.Errorf("events on listener1 did not occur") + return + } + + if !listener2.ok(func() bool { + return listener1.getCount() == 2 + }) { + t.Errorf("event on listener2 did not occur") + return + } + + if !isRegistered(informer, handle1) { + t.Errorf("handle1 is not active") + return + } + if !isRegistered(informer, handle2) { + t.Errorf("handle2 is not active") + return + } +} + +//////////////////////////////////////////////////////////////////////////////// + +type genericTestListener struct { + lock sync.RWMutex + name string + count int + action func(obj interface{}) +} + +func (l *genericTestListener) OnAdd(obj interface{}) { + l.handle(obj) +} + +func (l *genericTestListener) OnUpdate(old, new interface{}) { + l.handle(new) +} + +func (l *genericTestListener) OnDelete(obj interface{}) { +} + +func (l *genericTestListener) handle(obj interface{}) { + l.lock.Lock() + defer l.lock.Unlock() + l.count++ + if l.action != nil { + l.action(obj) + } + fmt.Printf("%s: got event\n", l.name) +} + +func (l *genericTestListener) getCount() int { + l.lock.RLock() + defer l.lock.RUnlock() + return l.count +} + +func (l *genericTestListener) ok(check func() bool) bool { + return checkCondition(check) +} + +func checkCondition(check func() bool) bool { + fmt.Println("polling") + err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) { + if check() { + return true, nil + } + return false, nil + }) + if err != nil { + return false + } + + // wait just a bit to allow any unexpected stragglers to come in + fmt.Println("sleeping") + time.Sleep(1 * time.Second) + fmt.Println("final check") + return check() +} From 063ef090e7fb5823ca18a10a83e8847eedac9599 Mon Sep 17 00:00:00 2001 From: Alexander Zielenski <351783+alexzielenski@users.noreply.github.com> Date: Thu, 16 Jun 2022 10:54:03 -0700 Subject: [PATCH 07/15] switch listeners to use a map, adapt tests --- pkg/controller/replication/conversion.go | 3 +- .../client-go/tools/cache/shared_informer.go | 169 ++++++-------- .../tools/cache/shared_informer_test.go | 206 ++++++++---------- 3 files changed, 156 insertions(+), 222 deletions(-) diff --git a/pkg/controller/replication/conversion.go b/pkg/controller/replication/conversion.go index 6a2899723bf..2e71fc0a6ff 100644 --- a/pkg/controller/replication/conversion.go +++ b/pkg/controller/replication/conversion.go @@ -29,7 +29,7 @@ import ( apps "k8s.io/api/apps/v1" autoscalingv1 "k8s.io/api/autoscaling/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" @@ -45,7 +45,6 @@ import ( appslisters "k8s.io/client-go/listers/apps/v1" v1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" - appsinternal "k8s.io/kubernetes/pkg/apis/apps" appsconversion "k8s.io/kubernetes/pkg/apis/apps/v1" apiv1 "k8s.io/kubernetes/pkg/apis/core/v1" diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index ce9d086d747..43b529e7669 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -131,7 +131,6 @@ import ( // A delete notification exposes the last locally known non-absent // state, except that its ResourceVersion is replaced with a // ResourceVersion in which the object is actually absent. -// type SharedInformer interface { // AddEventHandler adds an event handler to the shared informer using the shared informer's resync // period. Events to a single handler are delivered sequentially, but there is no coordination @@ -158,14 +157,7 @@ type SharedInformer interface { AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) // RemoveEventHandler removes a formerly added event handler given by // its registration handle. - // If, for some reason, the same handler has been added multiple - // times, only the registration for the given registration handle - // will be removed. - // It returns an error if the handler cannot be removed, - // because the informer is already stopped. - // Calling Remove on an already removed handle returns no error - // because the handler is finally (still) removed after calling this - // method. + // This function is guaranteed to be idempotent, and thread-safe. RemoveEventHandler(handle ResourceEventHandlerRegistration) error // GetStore returns the informer's local cache as a Store. GetStore() Store @@ -218,16 +210,10 @@ type SharedInformer interface { IsStopped() bool } -// ResourceEventHandlerRegistration is a handle returned by the -// registration methods of SharedInformers for a registered -// ResourceEventHandler. It can be used later on to remove -// a registration again. -// This indirection is required, because the ResourceEventHandler -// interface can be implemented by non-go-comparable handlers, which -// could not be removed from a list anymore. -type ResourceEventHandlerRegistration struct { - listener *processorListener -} +// Opaque interface representing the registration of ResourceEventHandler for +// a SharedInformer. Must be supplied back to the same SharedInformer's +// `RemoveEventHandler` to unregister the handlers. +type ResourceEventHandlerRegistration interface{} // SharedIndexInformer provides add and get Indexers ability based on SharedInformer. type SharedIndexInformer interface { @@ -402,10 +388,6 @@ type deleteNotification struct { oldObj interface{} } -func (s *sharedIndexInformer) isRegistered(h ResourceEventHandlerRegistration) bool { - return s.processor.isRegistered(h.listener) -} - func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) error { s.startedLock.Lock() defer s.startedLock.Unlock() @@ -554,8 +536,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv defer s.startedLock.Unlock() if s.stopped { - klog.V(2).Infof("Handler %v is not added to shared informer because it has stopped already", handler) - return ResourceEventHandlerRegistration{}, fmt.Errorf("handler %v is not added to shared informer because it has stopped already", handler) + return nil, fmt.Errorf("handler %v was not added to shared informer because it has stopped already", handler) } if resyncPeriod > 0 { @@ -579,11 +560,9 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv } listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize) - handle := ResourceEventHandlerRegistration{listener} if !s.started { - s.processor.addListener(listener) - return handle, nil + return s.processor.addListener(listener), nil } // in order to safely join, we have to @@ -594,7 +573,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv s.blockDeltas.Lock() defer s.blockDeltas.Unlock() - s.processor.addListener(listener) + handle := s.processor.addListener(listener) for _, item := range s.indexer.List() { listener.add(addNotification{newObj: item}) } @@ -655,31 +634,17 @@ func (s *sharedIndexInformer) IsStopped() bool { return s.stopped } -// RemoveEventHandler tries to remove a formerly added event handler by its -// registration handle returned for its registration. -// If a handler has been added multiple times, only the actual registration for the -// handler will be removed. Other registrations of the same handler will still be -// active until they are explicitly removes, also. func (s *sharedIndexInformer) RemoveEventHandler(handle ResourceEventHandlerRegistration) error { s.startedLock.Lock() defer s.startedLock.Unlock() - if handle.listener == nil { - return nil - } - - if s.stopped { - return nil - } - // in order to safely remove, we have to // 1. stop sending add/update/delete notifications // 2. remove and stop listener // 3. unblock s.blockDeltas.Lock() defer s.blockDeltas.Unlock() - s.processor.removeListener(handle.listener) - return nil + return s.processor.removeListener(handle) } // sharedProcessor has a collection of processorListener and can @@ -691,82 +656,77 @@ func (s *sharedIndexInformer) RemoveEventHandler(handle ResourceEventHandlerRegi type sharedProcessor struct { listenersStarted bool listenersLock sync.RWMutex - listeners []*processorListener - syncingListeners []*processorListener - clock clock.Clock - wg wait.Group + // Map from listeners to whether or not they are currently syncing + listeners map[*processorListener]bool + clock clock.Clock + wg wait.Group } -func (p *sharedProcessor) isRegistered(listener *processorListener) bool { +func (p *sharedProcessor) getListener(registration ResourceEventHandlerRegistration) *processorListener { p.listenersLock.Lock() defer p.listenersLock.Unlock() - for i := 0; i < len(p.listeners); i++ { - l := p.listeners[i] - if l == listener { - return true + if p.listeners == nil { + return nil + } + + if result, ok := registration.(*processorListener); ok { + if _, exists := p.listeners[result]; exists { + return result } } - return false + + return nil } -func (p *sharedProcessor) addListener(listener *processorListener) { +func (p *sharedProcessor) addListener(listener *processorListener) ResourceEventHandlerRegistration { p.listenersLock.Lock() defer p.listenersLock.Unlock() - p.addListenerLocked(listener) + if p.listeners == nil { + p.listeners = make(map[*processorListener]bool) + } + + p.listeners[listener] = true + if p.listenersStarted { p.wg.Start(listener.run) p.wg.Start(listener.pop) } + + return listener } -func (p *sharedProcessor) addListenerLocked(listener *processorListener) { - p.listeners = append(p.listeners, listener) - p.syncingListeners = append(p.syncingListeners, listener) -} - -func (p *sharedProcessor) removeListener(listener *processorListener) { +func (p *sharedProcessor) removeListener(handle ResourceEventHandlerRegistration) error { p.listenersLock.Lock() defer p.listenersLock.Unlock() - if p.removeListenerLocked(listener) { - if p.listenersStarted { - close(listener.addCh) - } + listener, ok := handle.(*processorListener) + if !ok { + return fmt.Errorf("invalid key type %t", handle) + } else if p.listeners == nil { + // No listeners are registered, do nothing + return nil + } else if _, exists := p.listeners[listener]; !exists { + // Listener is not registered, just do nothing + return nil } -} -func (p *sharedProcessor) removeListenerLocked(listener *processorListener) bool { - found := false - for i := 0; i < len(p.listeners); i++ { - l := p.listeners[i] - if l == listener { - p.listeners = append(p.listeners[:i], p.listeners[i+1:]...) - found = true - break - } + delete(p.listeners, listener) + + if p.listenersStarted { + close(listener.addCh) } - for i := 0; i < len(p.syncingListeners); i++ { - l := p.syncingListeners[i] - if l == listener { - p.syncingListeners = append(p.syncingListeners[:i], p.syncingListeners[i+1:]...) - break - } - } - return found + + return nil } func (p *sharedProcessor) distribute(obj interface{}, sync bool) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() - if sync { - for _, listener := range p.syncingListeners { - listener.add(obj) - } - } else { - for _, listener := range p.listeners { + for listener, isSyncing := range p.listeners { + if !sync || isSyncing { listener.add(obj) } } @@ -776,19 +736,24 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) { func() { p.listenersLock.RLock() defer p.listenersLock.RUnlock() - for _, listener := range p.listeners { + for listener := range p.listeners { p.wg.Start(listener.run) p.wg.Start(listener.pop) } p.listenersStarted = true }() <-stopCh - p.listenersLock.RLock() - defer p.listenersLock.RUnlock() - for _, listener := range p.listeners { + + p.listenersLock.Lock() + defer p.listenersLock.Unlock() + for listener := range p.listeners { close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop } p.wg.Wait() // Wait for all .pop() and .run() to stop + + // Wipe out list of listeners since they are now closed + // (processorListener cannot be re-used) + p.listeners = nil } // shouldResync queries every listener to determine if any of them need a resync, based on each @@ -797,19 +762,20 @@ func (p *sharedProcessor) shouldResync() bool { p.listenersLock.Lock() defer p.listenersLock.Unlock() - p.syncingListeners = []*processorListener{} - resyncNeeded := false now := p.clock.Now() - for _, listener := range p.listeners { + for listener := range p.listeners { // need to loop through all the listeners to see if they need to resync so we can prepare any // listeners that are going to be resyncing. - if listener.shouldResync(now) { + shouldResync := listener.shouldResync(now) + p.listeners[listener] = shouldResync + + if shouldResync { resyncNeeded = true - p.syncingListeners = append(p.syncingListeners, listener) listener.determineNextResync(now) } } + return resyncNeeded } @@ -817,8 +783,9 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Durati p.listenersLock.RLock() defer p.listenersLock.RUnlock() - for _, listener := range p.listeners { - resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod) + for listener := range p.listeners { + resyncPeriod := determineResyncPeriod( + listener.requestedResyncPeriod, resyncCheckPeriod) listener.setResyncPeriod(resyncPeriod) } } diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go index 83173342e00..1774ffd6ec3 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go @@ -33,32 +33,60 @@ import ( ) type testListener struct { - genericTestListener + lock sync.RWMutex resyncPeriod time.Duration expectedItemNames sets.String receivedItemNames []string + name string } func newTestListener(name string, resyncPeriod time.Duration, expected ...string) *testListener { l := &testListener{ - genericTestListener: genericTestListener{name: name}, - resyncPeriod: resyncPeriod, - expectedItemNames: sets.NewString(expected...), + resyncPeriod: resyncPeriod, + expectedItemNames: sets.NewString(expected...), + name: name, } - l.genericTestListener.action = l.action return l } -func (l *testListener) action(obj interface{}) { +func (l *testListener) OnAdd(obj interface{}) { + l.handle(obj) +} + +func (l *testListener) OnUpdate(old, new interface{}) { + l.handle(new) +} + +func (l *testListener) OnDelete(obj interface{}) { +} + +func (l *testListener) handle(obj interface{}) { key, _ := MetaNamespaceKeyFunc(obj) fmt.Printf("%s: handle: %v\n", l.name, key) + l.lock.Lock() + defer l.lock.Unlock() objectMeta, _ := meta.Accessor(obj) l.receivedItemNames = append(l.receivedItemNames, objectMeta.GetName()) } func (l *testListener) ok() bool { - return l.genericTestListener.ok(l.satisfiedExpectations) + fmt.Println("polling") + err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) { + if l.satisfiedExpectations() { + return true, nil + } + return false, nil + }) + if err != nil { + return false + } + + // wait just a bit to allow any unexpected stragglers to come in + fmt.Println("sleeping") + time.Sleep(1 * time.Second) + fmt.Println("final check") + return l.satisfiedExpectations() } func (l *testListener) satisfiedExpectations() bool { @@ -84,7 +112,7 @@ func isStarted(i SharedInformer) bool { func isRegistered(i SharedInformer, h ResourceEventHandlerRegistration) bool { s := i.(*sharedIndexInformer) - return s.isRegistered(h) + return s.processor.getListener(h) != nil } func TestListenerResyncPeriods(t *testing.T) { @@ -187,59 +215,60 @@ func TestResyncCheckPeriod(t *testing.T) { // listener 1, never resync listener1 := newTestListener("listener1", 0) - informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod) + listener1Registration, _ := informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod) + if e, a := 12*time.Hour, informer.resyncCheckPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a { + if e, a := time.Duration(0), informer.processor.getListener(listener1Registration).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } // listener 2, resync every minute listener2 := newTestListener("listener2", 1*time.Minute) - informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod) + listener2Registration, _ := informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod) if e, a := 1*time.Minute, informer.resyncCheckPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a { + if e, a := time.Duration(0), informer.processor.getListener(listener1Registration).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := 1*time.Minute, informer.processor.listeners[1].resyncPeriod; e != a { + if e, a := 1*time.Minute, informer.processor.getListener(listener2Registration).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } // listener 3, resync every 55 seconds listener3 := newTestListener("listener3", 55*time.Second) - informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod) + listener3Registration, _ := informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod) if e, a := 55*time.Second, informer.resyncCheckPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a { + if e, a := time.Duration(0), informer.processor.getListener(listener1Registration).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := 1*time.Minute, informer.processor.listeners[1].resyncPeriod; e != a { + if e, a := 1*time.Minute, informer.processor.getListener(listener2Registration).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := 55*time.Second, informer.processor.listeners[2].resyncPeriod; e != a { + if e, a := 55*time.Second, informer.processor.getListener(listener3Registration).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } // listener 4, resync every 5 seconds listener4 := newTestListener("listener4", 5*time.Second) - informer.AddEventHandlerWithResyncPeriod(listener4, listener4.resyncPeriod) + listener4Registration, _ := informer.AddEventHandlerWithResyncPeriod(listener4, listener4.resyncPeriod) if e, a := 5*time.Second, informer.resyncCheckPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a { + if e, a := time.Duration(0), informer.processor.getListener(listener1Registration).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := 1*time.Minute, informer.processor.listeners[1].resyncPeriod; e != a { + if e, a := 1*time.Minute, informer.processor.getListener(listener2Registration).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := 55*time.Second, informer.processor.listeners[2].resyncPeriod; e != a { + if e, a := 55*time.Second, informer.processor.getListener(listener3Registration).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := 5*time.Second, informer.processor.listeners[3].resyncPeriod; e != a { + if e, a := 5*time.Second, informer.processor.getListener(listener4Registration).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } } @@ -424,12 +453,12 @@ func TestSharedInformerRemoveForeignHandler(t *testing.T) { source := fcache.NewFakeControllerSource() source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) - informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) source2 := fcache.NewFakeControllerSource() source2.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) - informer2 := NewSharedInformer(source2, &v1.Pod{}, 1*time.Second) + informer2 := NewSharedInformer(source2, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) handler1 := &ResourceEventHandlerFuncs{} handle1, err := informer.AddEventHandler(handler1) @@ -474,17 +503,6 @@ func TestSharedInformerRemoveForeignHandler(t *testing.T) { t.Errorf("handle2 not registered anymore for informer") } - // remove empty handle - empty := ResourceEventHandlerRegistration{} - if isRegistered(informer, empty) { - t.Errorf("empty registered for informer") - } - if isRegistered(informer, empty) { - t.Errorf("empty registered for informer") - } - if err := informer2.RemoveEventHandler(empty); err != nil { - t.Errorf("removing of empty handle failed: %s", err) - } if eventHandlerCount(informer) != 2 { t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer)) } @@ -555,7 +573,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { source := fcache.NewFakeControllerSource() source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) - informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) handler1 := &ResourceEventHandlerFuncs{} reg1, err := informer.AddEventHandler(handler1) @@ -706,17 +724,25 @@ func TestAddOnStoppedSharedInformer(t *testing.T) { stop := make(chan struct{}) go informer.Run(stop) close(stop) - if !checkCondition(informer.IsStopped) { + + err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) { + if informer.IsStopped() { + return true, nil + } + return false, nil + }) + + if err != nil { t.Errorf("informer reports not to be stopped although stop channel closed") return } - _, err := informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod) + _, err = informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod) if err == nil { t.Errorf("stopped informer did not reject add handler") return } - if !strings.HasSuffix(err.Error(), "is not added to shared informer because it has stopped already") { + if !strings.HasSuffix(err.Error(), "was not added to shared informer because it has stopped already") { t.Errorf("adding handler to a stopped informer yields unexpected error: %s", err) return } @@ -757,11 +783,8 @@ func TestRemoveWhileActive(t *testing.T) { // create the shared informer and resync every 12 hours informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer) - listener := &genericTestListener{name: "listener"} + listener := newTestListener("listener", 0, "pod1") handle, _ := informer.AddEventHandler(listener) - listener.action = func(obj interface{}) { - informer.RemoveEventHandler(handle) - } stop := make(chan struct{}) defer close(stop) @@ -769,17 +792,24 @@ func TestRemoveWhileActive(t *testing.T) { go informer.Run(stop) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) - if !listener.ok(func() bool { - return listener.getCount() == 1 - }) { + if !listener.ok() { t.Errorf("event did not occur") return } + informer.RemoveEventHandler(handle) + if isRegistered(informer, handle) { t.Errorf("handle is still active after successful remove") return } + + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}}) + + if !listener.ok() { + t.Errorf("unexpected event occurred") + return + } } func TestAddWhileActive(t *testing.T) { @@ -788,17 +818,9 @@ func TestAddWhileActive(t *testing.T) { // create the shared informer and resync every 12 hours informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer) - - listener1 := &genericTestListener{name: "listener1"} - listener2 := &genericTestListener{name: "listener2"} - + listener1 := newTestListener("originalListener", 0, "pod1") + listener2 := newTestListener("originalListener", 0, "pod1", "pod2") handle1, _ := informer.AddEventHandler(listener1) - var handle2 ResourceEventHandlerRegistration - listener1.action = func(obj interface{}) { - listener1.action = nil - handle2, _ = informer.AddEventHandler(listener2) - source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}}) - } stop := make(chan struct{}) defer close(stop) @@ -806,16 +828,15 @@ func TestAddWhileActive(t *testing.T) { go informer.Run(stop) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) - if !listener1.ok(func() bool { - return listener1.getCount() == 2 - }) { + if !listener1.ok() { t.Errorf("events on listener1 did not occur") return } - if !listener2.ok(func() bool { - return listener1.getCount() == 2 - }) { + handle2, _ := informer.AddEventHandler(listener2) + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}}) + + if !listener2.ok() { t.Errorf("event on listener2 did not occur") return } @@ -828,63 +849,10 @@ func TestAddWhileActive(t *testing.T) { t.Errorf("handle2 is not active") return } -} -//////////////////////////////////////////////////////////////////////////////// - -type genericTestListener struct { - lock sync.RWMutex - name string - count int - action func(obj interface{}) -} - -func (l *genericTestListener) OnAdd(obj interface{}) { - l.handle(obj) -} - -func (l *genericTestListener) OnUpdate(old, new interface{}) { - l.handle(new) -} - -func (l *genericTestListener) OnDelete(obj interface{}) { -} - -func (l *genericTestListener) handle(obj interface{}) { - l.lock.Lock() - defer l.lock.Unlock() - l.count++ - if l.action != nil { - l.action(obj) + listener1.expectedItemNames = listener2.expectedItemNames + if !listener1.ok() { + t.Errorf("events on listener1 did not occur") + return } - fmt.Printf("%s: got event\n", l.name) -} - -func (l *genericTestListener) getCount() int { - l.lock.RLock() - defer l.lock.RUnlock() - return l.count -} - -func (l *genericTestListener) ok(check func() bool) bool { - return checkCondition(check) -} - -func checkCondition(check func() bool) bool { - fmt.Println("polling") - err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) { - if check() { - return true, nil - } - return false, nil - }) - if err != nil { - return false - } - - // wait just a bit to allow any unexpected stragglers to come in - fmt.Println("sleeping") - time.Sleep(1 * time.Second) - fmt.Println("final check") - return check() } From 5f372e1867305679d8f9d8a013c5500763e5c875 Mon Sep 17 00:00:00 2001 From: Alexander Zielenski <351783+alexzielenski@users.noreply.github.com> Date: Mon, 8 Aug 2022 11:44:37 -0700 Subject: [PATCH 08/15] address review comments asd --- .../client-go/tools/cache/shared_informer.go | 25 +++++++++------- .../tools/cache/shared_informer_test.go | 29 ++++++++++--------- 2 files changed, 29 insertions(+), 25 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index 43b529e7669..56c19b10bbe 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -663,8 +663,8 @@ type sharedProcessor struct { } func (p *sharedProcessor) getListener(registration ResourceEventHandlerRegistration) *processorListener { - p.listenersLock.Lock() - defer p.listenersLock.Unlock() + p.listenersLock.RLock() + defer p.listenersLock.RUnlock() if p.listeners == nil { return nil @@ -744,16 +744,19 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) { }() <-stopCh - p.listenersLock.Lock() - defer p.listenersLock.Unlock() - for listener := range p.listeners { - close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop - } - p.wg.Wait() // Wait for all .pop() and .run() to stop + func() { + p.listenersLock.Lock() + defer p.listenersLock.Unlock() + for listener := range p.listeners { + close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop + } - // Wipe out list of listeners since they are now closed - // (processorListener cannot be re-used) - p.listeners = nil + // Wipe out list of listeners since they are now closed + // (processorListener cannot be re-used) + p.listeners = nil + }() + + p.wg.Wait() // Wait for all .pop() and .run() to stop } // shouldResync queries every listener to determine if any of them need a resync, based on each diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go index 1774ffd6ec3..1ccb35a8f06 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go @@ -208,6 +208,7 @@ func TestResyncCheckPeriod(t *testing.T) { // create the shared informer and resync every 12 hours informer := NewSharedInformer(source, &v1.Pod{}, 12*time.Hour).(*sharedIndexInformer) + gl := informer.processor.getListener clock := testingclock.NewFakeClock(time.Now()) informer.clock = clock @@ -215,60 +216,60 @@ func TestResyncCheckPeriod(t *testing.T) { // listener 1, never resync listener1 := newTestListener("listener1", 0) - listener1Registration, _ := informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod) + handler1, _ := informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod) if e, a := 12*time.Hour, informer.resyncCheckPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := time.Duration(0), informer.processor.getListener(listener1Registration).resyncPeriod; e != a { + if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } // listener 2, resync every minute listener2 := newTestListener("listener2", 1*time.Minute) - listener2Registration, _ := informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod) + handler2, _ := informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod) if e, a := 1*time.Minute, informer.resyncCheckPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := time.Duration(0), informer.processor.getListener(listener1Registration).resyncPeriod; e != a { + if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := 1*time.Minute, informer.processor.getListener(listener2Registration).resyncPeriod; e != a { + if e, a := 1*time.Minute, gl(handler2).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } // listener 3, resync every 55 seconds listener3 := newTestListener("listener3", 55*time.Second) - listener3Registration, _ := informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod) + handler3, _ := informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod) if e, a := 55*time.Second, informer.resyncCheckPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := time.Duration(0), informer.processor.getListener(listener1Registration).resyncPeriod; e != a { + if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := 1*time.Minute, informer.processor.getListener(listener2Registration).resyncPeriod; e != a { + if e, a := 1*time.Minute, gl(handler2).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := 55*time.Second, informer.processor.getListener(listener3Registration).resyncPeriod; e != a { + if e, a := 55*time.Second, gl(handler3).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } // listener 4, resync every 5 seconds listener4 := newTestListener("listener4", 5*time.Second) - listener4Registration, _ := informer.AddEventHandlerWithResyncPeriod(listener4, listener4.resyncPeriod) + handler4, _ := informer.AddEventHandlerWithResyncPeriod(listener4, listener4.resyncPeriod) if e, a := 5*time.Second, informer.resyncCheckPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := time.Duration(0), informer.processor.getListener(listener1Registration).resyncPeriod; e != a { + if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := 1*time.Minute, informer.processor.getListener(listener2Registration).resyncPeriod; e != a { + if e, a := 1*time.Minute, gl(handler2).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := 55*time.Second, informer.processor.getListener(listener3Registration).resyncPeriod; e != a { + if e, a := 55*time.Second, gl(handler3).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := 5*time.Second, informer.processor.getListener(listener4Registration).resyncPeriod; e != a { + if e, a := 5*time.Second, gl(handler4).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } } From df9a7afa63164035f1e94c3278d470b347b697b9 Mon Sep 17 00:00:00 2001 From: Alexander Zielenski <351783+alexzielenski@users.noreply.github.com> Date: Mon, 8 Aug 2022 11:46:05 -0700 Subject: [PATCH 09/15] remove duplicate test became a duplicate while refactoring original PR --- .../tools/cache/shared_informer_test.go | 38 ------------------- 1 file changed, 38 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go index 1ccb35a8f06..954538adf86 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go @@ -532,44 +532,6 @@ func TestSharedInformerRemoveForeignHandler(t *testing.T) { } } -func TestSharedInformerRemoveNonComparableHandler(t *testing.T) { - source := fcache.NewFakeControllerSource() - source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) - - informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) - - handler1 := ResourceEventHandlerFuncs{} - handle1, err := informer.AddEventHandler(handler1) - if err != nil { - t.Errorf("informer did not add handler1: %s", err) - return - } - handler2 := &ResourceEventHandlerFuncs{} - handle2, err := informer.AddEventHandler(handler2) - if err != nil { - t.Errorf("informer did not add handler2: %s", err) - return - } - - if eventHandlerCount(informer) != 2 { - t.Errorf("informer has %d registered handler(s), instead of 2", eventHandlerCount(informer)) - } - - if err := informer.RemoveEventHandler(handle2); err != nil { - t.Errorf("removing of pointer handler failed: %s", err) - } - if eventHandlerCount(informer) != 1 { - t.Errorf("after removal informer has %d registered handler(s), instead of 1", eventHandlerCount(informer)) - } - - if err := informer.RemoveEventHandler(handle1); err != nil { - t.Errorf("removing of non-pointer handler failed: %s", err) - } - if eventHandlerCount(informer) != 0 { - t.Errorf("after removal informer has %d registered handler(s), instead of 0", eventHandlerCount(informer)) - } -} - func TestSharedInformerMultipleRegistration(t *testing.T) { source := fcache.NewFakeControllerSource() source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) From 8af0a31a15079725e5910d538a6ace03df2e382a Mon Sep 17 00:00:00 2001 From: Alexander Zielenski <351783+alexzielenski@users.noreply.github.com> Date: Mon, 8 Aug 2022 13:39:18 -0700 Subject: [PATCH 10/15] add multithreaded test to shared informer --- .../tools/cache/shared_informer_test.go | 118 ++++++++++++++++++ 1 file changed, 118 insertions(+) diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go index 954538adf86..75be63a9e87 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go @@ -17,7 +17,10 @@ limitations under the License. package cache import ( + "context" "fmt" + "math/rand" + "strconv" "strings" "sync" "testing" @@ -629,6 +632,121 @@ func TestRemovingRemovedSharedInformer(t *testing.T) { } } +// Shows that many concurrent goroutines can be manipulating shared informer +// listeners without tripping it up. There are not really many assertions in this +// test. Meant to be run with -race to find race conditions +func TestSharedInformerHandlerAbuse(t *testing.T) { + source := fcache.NewFakeControllerSource() + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + + ctx, cancel := context.WithCancel(context.Background()) + informerCtx, informerCancel := context.WithCancel(context.Background()) + go func() { + informer.Run(informerCtx.Done()) + cancel() + }() + + worker := func() { + // Keep adding and removing handler + // Make sure no duplicate events? + funcs := ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) {}, + UpdateFunc: func(oldObj, newObj interface{}) {}, + DeleteFunc: func(obj interface{}) {}, + } + handles := []ResourceEventHandlerRegistration{} + + for { + select { + case <-ctx.Done(): + return + default: + switch rand.Intn(2) { + case 0: + // Register handler again + reg, err := informer.AddEventHandlerWithResyncPeriod(funcs, 1*time.Second) + if err != nil { + if strings.Contains(err.Error(), "stopped already") { + continue + } + t.Errorf("failed to add handler: %v", err) + return + } + handles = append(handles, reg) + case 1: + // Remove a random handler + if len(handles) == 0 { + continue + } + + idx := rand.Intn(len(handles)) + err := informer.RemoveEventHandler(handles[idx]) + if err != nil { + if strings.Contains(err.Error(), "stopped already") { + continue + } + t.Errorf("failed to remove handler: %v", err) + return + } + handles = append(handles[:idx], handles[idx+1:]...) + } + } + } + } + + wg := sync.WaitGroup{} + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + worker() + wg.Done() + }() + } + + objs := []*v1.Pod{} + + // While workers run, randomly create events for the informer + for i := 0; i < 10000; i++ { + if len(objs) == 0 { + // Make sure there is always an object + obj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "pod" + strconv.Itoa(i), + }} + objs = append(objs, obj) + + // deep copy before adding since the Modify function mutates the obj + source.Add(obj.DeepCopy()) + } + + switch rand.Intn(3) { + case 0: + // Add Object + obj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "pod" + strconv.Itoa(i), + }} + objs = append(objs, obj) + source.Add(obj.DeepCopy()) + case 1: + // Update Object + idx := rand.Intn(len(objs)) + source.Modify(objs[idx].DeepCopy()) + + case 2: + // Remove Object + idx := rand.Intn(len(objs)) + source.Delete(objs[idx].DeepCopy()) + objs = append(objs[:idx], objs[idx+1:]...) + } + } + + // sotp informer which stops workers. stopping informer first to excercise + // contention for informer while it is closing + informerCancel() + + // wait for workers to finish since they may throw errors + wg.Wait() +} + func TestStateSharedInformer(t *testing.T) { source := fcache.NewFakeControllerSource() source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) From 3a81341cfa6f7e2ca1b9bfc195c567dcdfaa4dea Mon Sep 17 00:00:00 2001 From: Alexander Zielenski <351783+alexzielenski@users.noreply.github.com> Date: Mon, 8 Aug 2022 14:19:37 -0700 Subject: [PATCH 11/15] reset listenersStarted for correctness. technically shouldnt be an issue since restarting a stopped processor is not supported --- staging/src/k8s.io/client-go/tools/cache/shared_informer.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index 56c19b10bbe..c7d6d1f1c8d 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -754,6 +754,10 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) { // Wipe out list of listeners since they are now closed // (processorListener cannot be re-used) p.listeners = nil + + // Reset to false since there are nil listeners, also to block new listeners + // that are added from being run now that the processor was stopped + p.listenersStarted = false }() p.wg.Wait() // Wait for all .pop() and .run() to stop From 7685b393c2ee8f8548d62f9dd8485df764a0c8c3 Mon Sep 17 00:00:00 2001 From: Alexander Zielenski <351783+alexzielenski@users.noreply.github.com> Date: Mon, 22 Aug 2022 17:34:02 -0700 Subject: [PATCH 12/15] fix spelling --- .../src/k8s.io/client-go/tools/cache/shared_informer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go index 75be63a9e87..35b9a4d09d5 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go @@ -739,7 +739,7 @@ func TestSharedInformerHandlerAbuse(t *testing.T) { } } - // sotp informer which stops workers. stopping informer first to excercise + // sotp informer which stops workers. stopping informer first to exercise // contention for informer while it is closing informerCancel() From ee24648300f8575f503156f45b44034054c2e49d Mon Sep 17 00:00:00 2001 From: Alexander Zielenski <351783+alexzielenski@users.noreply.github.com> Date: Mon, 29 Aug 2022 11:52:35 -0700 Subject: [PATCH 13/15] simplify control flow --- .../src/k8s.io/client-go/tools/cache/shared_informer.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index c7d6d1f1c8d..d65dba77382 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -726,8 +726,15 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) { defer p.listenersLock.RUnlock() for listener, isSyncing := range p.listeners { - if !sync || isSyncing { + switch { + case !sync: + // non-sync messages are delivered to every listener listener.add(obj) + case isSyncing: + // sync messages are delivered to every syncing listenter + listener.add(obj) + default: + // skipping a sync obj for a non-syncing listener } } } From 7ce19b75a8dbca12837ed9f4c5c2828c38b82a03 Mon Sep 17 00:00:00 2001 From: Alexander Zielenski <351783+alexzielenski@users.noreply.github.com> Date: Mon, 29 Aug 2022 12:34:35 -0700 Subject: [PATCH 14/15] hold listener lock while waiting for goroutines to finish --- .../client-go/tools/cache/shared_informer.go | 24 ++++++++----------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index d65dba77382..f23d55f1e4d 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -751,21 +751,18 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) { }() <-stopCh - func() { - p.listenersLock.Lock() - defer p.listenersLock.Unlock() - for listener := range p.listeners { - close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop - } + p.listenersLock.Lock() + defer p.listenersLock.Unlock() + for listener := range p.listeners { + close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop + } - // Wipe out list of listeners since they are now closed - // (processorListener cannot be re-used) - p.listeners = nil + // Wipe out list of listeners since they are now closed + // (processorListener cannot be re-used) + p.listeners = nil - // Reset to false since there are nil listeners, also to block new listeners - // that are added from being run now that the processor was stopped - p.listenersStarted = false - }() + // Reset to false since no listeners are running + p.listenersStarted = false p.wg.Wait() // Wait for all .pop() and .run() to stop } @@ -789,7 +786,6 @@ func (p *sharedProcessor) shouldResync() bool { listener.determineNextResync(now) } } - return resyncNeeded } From cc0b9ffbd5a4edfe7ce01293c2bf963d4335d8d6 Mon Sep 17 00:00:00 2001 From: Alexander Zielenski <351783+alexzielenski@users.noreply.github.com> Date: Tue, 30 Aug 2022 14:38:21 -0700 Subject: [PATCH 15/15] return when test is done --- .../k8s.io/client-go/tools/cache/shared_informer_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go index 35b9a4d09d5..2676e8f54c7 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go @@ -667,7 +667,8 @@ func TestSharedInformerHandlerAbuse(t *testing.T) { reg, err := informer.AddEventHandlerWithResyncPeriod(funcs, 1*time.Second) if err != nil { if strings.Contains(err.Error(), "stopped already") { - continue + // test is over + return } t.Errorf("failed to add handler: %v", err) return @@ -683,7 +684,8 @@ func TestSharedInformerHandlerAbuse(t *testing.T) { err := informer.RemoveEventHandler(handles[idx]) if err != nil { if strings.Contains(err.Error(), "stopped already") { - continue + // test is over + return } t.Errorf("failed to remove handler: %v", err) return