From 063ef090e7fb5823ca18a10a83e8847eedac9599 Mon Sep 17 00:00:00 2001 From: Alexander Zielenski <351783+alexzielenski@users.noreply.github.com> Date: Thu, 16 Jun 2022 10:54:03 -0700 Subject: [PATCH] switch listeners to use a map, adapt tests --- pkg/controller/replication/conversion.go | 3 +- .../client-go/tools/cache/shared_informer.go | 169 ++++++-------- .../tools/cache/shared_informer_test.go | 206 ++++++++---------- 3 files changed, 156 insertions(+), 222 deletions(-) diff --git a/pkg/controller/replication/conversion.go b/pkg/controller/replication/conversion.go index 6a2899723bf..2e71fc0a6ff 100644 --- a/pkg/controller/replication/conversion.go +++ b/pkg/controller/replication/conversion.go @@ -29,7 +29,7 @@ import ( apps "k8s.io/api/apps/v1" autoscalingv1 "k8s.io/api/autoscaling/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" @@ -45,7 +45,6 @@ import ( appslisters "k8s.io/client-go/listers/apps/v1" v1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" - appsinternal "k8s.io/kubernetes/pkg/apis/apps" appsconversion "k8s.io/kubernetes/pkg/apis/apps/v1" apiv1 "k8s.io/kubernetes/pkg/apis/core/v1" diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index ce9d086d747..43b529e7669 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -131,7 +131,6 @@ import ( // A delete notification exposes the last locally known non-absent // state, except that its ResourceVersion is replaced with a // ResourceVersion in which the object is actually absent. -// type SharedInformer interface { // AddEventHandler adds an event handler to the shared informer using the shared informer's resync // period. Events to a single handler are delivered sequentially, but there is no coordination @@ -158,14 +157,7 @@ type SharedInformer interface { AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) // RemoveEventHandler removes a formerly added event handler given by // its registration handle. - // If, for some reason, the same handler has been added multiple - // times, only the registration for the given registration handle - // will be removed. - // It returns an error if the handler cannot be removed, - // because the informer is already stopped. - // Calling Remove on an already removed handle returns no error - // because the handler is finally (still) removed after calling this - // method. + // This function is guaranteed to be idempotent, and thread-safe. RemoveEventHandler(handle ResourceEventHandlerRegistration) error // GetStore returns the informer's local cache as a Store. GetStore() Store @@ -218,16 +210,10 @@ type SharedInformer interface { IsStopped() bool } -// ResourceEventHandlerRegistration is a handle returned by the -// registration methods of SharedInformers for a registered -// ResourceEventHandler. It can be used later on to remove -// a registration again. -// This indirection is required, because the ResourceEventHandler -// interface can be implemented by non-go-comparable handlers, which -// could not be removed from a list anymore. -type ResourceEventHandlerRegistration struct { - listener *processorListener -} +// Opaque interface representing the registration of ResourceEventHandler for +// a SharedInformer. Must be supplied back to the same SharedInformer's +// `RemoveEventHandler` to unregister the handlers. +type ResourceEventHandlerRegistration interface{} // SharedIndexInformer provides add and get Indexers ability based on SharedInformer. type SharedIndexInformer interface { @@ -402,10 +388,6 @@ type deleteNotification struct { oldObj interface{} } -func (s *sharedIndexInformer) isRegistered(h ResourceEventHandlerRegistration) bool { - return s.processor.isRegistered(h.listener) -} - func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) error { s.startedLock.Lock() defer s.startedLock.Unlock() @@ -554,8 +536,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv defer s.startedLock.Unlock() if s.stopped { - klog.V(2).Infof("Handler %v is not added to shared informer because it has stopped already", handler) - return ResourceEventHandlerRegistration{}, fmt.Errorf("handler %v is not added to shared informer because it has stopped already", handler) + return nil, fmt.Errorf("handler %v was not added to shared informer because it has stopped already", handler) } if resyncPeriod > 0 { @@ -579,11 +560,9 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv } listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize) - handle := ResourceEventHandlerRegistration{listener} if !s.started { - s.processor.addListener(listener) - return handle, nil + return s.processor.addListener(listener), nil } // in order to safely join, we have to @@ -594,7 +573,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv s.blockDeltas.Lock() defer s.blockDeltas.Unlock() - s.processor.addListener(listener) + handle := s.processor.addListener(listener) for _, item := range s.indexer.List() { listener.add(addNotification{newObj: item}) } @@ -655,31 +634,17 @@ func (s *sharedIndexInformer) IsStopped() bool { return s.stopped } -// RemoveEventHandler tries to remove a formerly added event handler by its -// registration handle returned for its registration. -// If a handler has been added multiple times, only the actual registration for the -// handler will be removed. Other registrations of the same handler will still be -// active until they are explicitly removes, also. func (s *sharedIndexInformer) RemoveEventHandler(handle ResourceEventHandlerRegistration) error { s.startedLock.Lock() defer s.startedLock.Unlock() - if handle.listener == nil { - return nil - } - - if s.stopped { - return nil - } - // in order to safely remove, we have to // 1. stop sending add/update/delete notifications // 2. remove and stop listener // 3. unblock s.blockDeltas.Lock() defer s.blockDeltas.Unlock() - s.processor.removeListener(handle.listener) - return nil + return s.processor.removeListener(handle) } // sharedProcessor has a collection of processorListener and can @@ -691,82 +656,77 @@ func (s *sharedIndexInformer) RemoveEventHandler(handle ResourceEventHandlerRegi type sharedProcessor struct { listenersStarted bool listenersLock sync.RWMutex - listeners []*processorListener - syncingListeners []*processorListener - clock clock.Clock - wg wait.Group + // Map from listeners to whether or not they are currently syncing + listeners map[*processorListener]bool + clock clock.Clock + wg wait.Group } -func (p *sharedProcessor) isRegistered(listener *processorListener) bool { +func (p *sharedProcessor) getListener(registration ResourceEventHandlerRegistration) *processorListener { p.listenersLock.Lock() defer p.listenersLock.Unlock() - for i := 0; i < len(p.listeners); i++ { - l := p.listeners[i] - if l == listener { - return true + if p.listeners == nil { + return nil + } + + if result, ok := registration.(*processorListener); ok { + if _, exists := p.listeners[result]; exists { + return result } } - return false + + return nil } -func (p *sharedProcessor) addListener(listener *processorListener) { +func (p *sharedProcessor) addListener(listener *processorListener) ResourceEventHandlerRegistration { p.listenersLock.Lock() defer p.listenersLock.Unlock() - p.addListenerLocked(listener) + if p.listeners == nil { + p.listeners = make(map[*processorListener]bool) + } + + p.listeners[listener] = true + if p.listenersStarted { p.wg.Start(listener.run) p.wg.Start(listener.pop) } + + return listener } -func (p *sharedProcessor) addListenerLocked(listener *processorListener) { - p.listeners = append(p.listeners, listener) - p.syncingListeners = append(p.syncingListeners, listener) -} - -func (p *sharedProcessor) removeListener(listener *processorListener) { +func (p *sharedProcessor) removeListener(handle ResourceEventHandlerRegistration) error { p.listenersLock.Lock() defer p.listenersLock.Unlock() - if p.removeListenerLocked(listener) { - if p.listenersStarted { - close(listener.addCh) - } + listener, ok := handle.(*processorListener) + if !ok { + return fmt.Errorf("invalid key type %t", handle) + } else if p.listeners == nil { + // No listeners are registered, do nothing + return nil + } else if _, exists := p.listeners[listener]; !exists { + // Listener is not registered, just do nothing + return nil } -} -func (p *sharedProcessor) removeListenerLocked(listener *processorListener) bool { - found := false - for i := 0; i < len(p.listeners); i++ { - l := p.listeners[i] - if l == listener { - p.listeners = append(p.listeners[:i], p.listeners[i+1:]...) - found = true - break - } + delete(p.listeners, listener) + + if p.listenersStarted { + close(listener.addCh) } - for i := 0; i < len(p.syncingListeners); i++ { - l := p.syncingListeners[i] - if l == listener { - p.syncingListeners = append(p.syncingListeners[:i], p.syncingListeners[i+1:]...) - break - } - } - return found + + return nil } func (p *sharedProcessor) distribute(obj interface{}, sync bool) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() - if sync { - for _, listener := range p.syncingListeners { - listener.add(obj) - } - } else { - for _, listener := range p.listeners { + for listener, isSyncing := range p.listeners { + if !sync || isSyncing { listener.add(obj) } } @@ -776,19 +736,24 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) { func() { p.listenersLock.RLock() defer p.listenersLock.RUnlock() - for _, listener := range p.listeners { + for listener := range p.listeners { p.wg.Start(listener.run) p.wg.Start(listener.pop) } p.listenersStarted = true }() <-stopCh - p.listenersLock.RLock() - defer p.listenersLock.RUnlock() - for _, listener := range p.listeners { + + p.listenersLock.Lock() + defer p.listenersLock.Unlock() + for listener := range p.listeners { close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop } p.wg.Wait() // Wait for all .pop() and .run() to stop + + // Wipe out list of listeners since they are now closed + // (processorListener cannot be re-used) + p.listeners = nil } // shouldResync queries every listener to determine if any of them need a resync, based on each @@ -797,19 +762,20 @@ func (p *sharedProcessor) shouldResync() bool { p.listenersLock.Lock() defer p.listenersLock.Unlock() - p.syncingListeners = []*processorListener{} - resyncNeeded := false now := p.clock.Now() - for _, listener := range p.listeners { + for listener := range p.listeners { // need to loop through all the listeners to see if they need to resync so we can prepare any // listeners that are going to be resyncing. - if listener.shouldResync(now) { + shouldResync := listener.shouldResync(now) + p.listeners[listener] = shouldResync + + if shouldResync { resyncNeeded = true - p.syncingListeners = append(p.syncingListeners, listener) listener.determineNextResync(now) } } + return resyncNeeded } @@ -817,8 +783,9 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Durati p.listenersLock.RLock() defer p.listenersLock.RUnlock() - for _, listener := range p.listeners { - resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod) + for listener := range p.listeners { + resyncPeriod := determineResyncPeriod( + listener.requestedResyncPeriod, resyncCheckPeriod) listener.setResyncPeriod(resyncPeriod) } } diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go index 83173342e00..1774ffd6ec3 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go @@ -33,32 +33,60 @@ import ( ) type testListener struct { - genericTestListener + lock sync.RWMutex resyncPeriod time.Duration expectedItemNames sets.String receivedItemNames []string + name string } func newTestListener(name string, resyncPeriod time.Duration, expected ...string) *testListener { l := &testListener{ - genericTestListener: genericTestListener{name: name}, - resyncPeriod: resyncPeriod, - expectedItemNames: sets.NewString(expected...), + resyncPeriod: resyncPeriod, + expectedItemNames: sets.NewString(expected...), + name: name, } - l.genericTestListener.action = l.action return l } -func (l *testListener) action(obj interface{}) { +func (l *testListener) OnAdd(obj interface{}) { + l.handle(obj) +} + +func (l *testListener) OnUpdate(old, new interface{}) { + l.handle(new) +} + +func (l *testListener) OnDelete(obj interface{}) { +} + +func (l *testListener) handle(obj interface{}) { key, _ := MetaNamespaceKeyFunc(obj) fmt.Printf("%s: handle: %v\n", l.name, key) + l.lock.Lock() + defer l.lock.Unlock() objectMeta, _ := meta.Accessor(obj) l.receivedItemNames = append(l.receivedItemNames, objectMeta.GetName()) } func (l *testListener) ok() bool { - return l.genericTestListener.ok(l.satisfiedExpectations) + fmt.Println("polling") + err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) { + if l.satisfiedExpectations() { + return true, nil + } + return false, nil + }) + if err != nil { + return false + } + + // wait just a bit to allow any unexpected stragglers to come in + fmt.Println("sleeping") + time.Sleep(1 * time.Second) + fmt.Println("final check") + return l.satisfiedExpectations() } func (l *testListener) satisfiedExpectations() bool { @@ -84,7 +112,7 @@ func isStarted(i SharedInformer) bool { func isRegistered(i SharedInformer, h ResourceEventHandlerRegistration) bool { s := i.(*sharedIndexInformer) - return s.isRegistered(h) + return s.processor.getListener(h) != nil } func TestListenerResyncPeriods(t *testing.T) { @@ -187,59 +215,60 @@ func TestResyncCheckPeriod(t *testing.T) { // listener 1, never resync listener1 := newTestListener("listener1", 0) - informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod) + listener1Registration, _ := informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod) + if e, a := 12*time.Hour, informer.resyncCheckPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a { + if e, a := time.Duration(0), informer.processor.getListener(listener1Registration).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } // listener 2, resync every minute listener2 := newTestListener("listener2", 1*time.Minute) - informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod) + listener2Registration, _ := informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod) if e, a := 1*time.Minute, informer.resyncCheckPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a { + if e, a := time.Duration(0), informer.processor.getListener(listener1Registration).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := 1*time.Minute, informer.processor.listeners[1].resyncPeriod; e != a { + if e, a := 1*time.Minute, informer.processor.getListener(listener2Registration).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } // listener 3, resync every 55 seconds listener3 := newTestListener("listener3", 55*time.Second) - informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod) + listener3Registration, _ := informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod) if e, a := 55*time.Second, informer.resyncCheckPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a { + if e, a := time.Duration(0), informer.processor.getListener(listener1Registration).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := 1*time.Minute, informer.processor.listeners[1].resyncPeriod; e != a { + if e, a := 1*time.Minute, informer.processor.getListener(listener2Registration).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := 55*time.Second, informer.processor.listeners[2].resyncPeriod; e != a { + if e, a := 55*time.Second, informer.processor.getListener(listener3Registration).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } // listener 4, resync every 5 seconds listener4 := newTestListener("listener4", 5*time.Second) - informer.AddEventHandlerWithResyncPeriod(listener4, listener4.resyncPeriod) + listener4Registration, _ := informer.AddEventHandlerWithResyncPeriod(listener4, listener4.resyncPeriod) if e, a := 5*time.Second, informer.resyncCheckPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a { + if e, a := time.Duration(0), informer.processor.getListener(listener1Registration).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := 1*time.Minute, informer.processor.listeners[1].resyncPeriod; e != a { + if e, a := 1*time.Minute, informer.processor.getListener(listener2Registration).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := 55*time.Second, informer.processor.listeners[2].resyncPeriod; e != a { + if e, a := 55*time.Second, informer.processor.getListener(listener3Registration).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } - if e, a := 5*time.Second, informer.processor.listeners[3].resyncPeriod; e != a { + if e, a := 5*time.Second, informer.processor.getListener(listener4Registration).resyncPeriod; e != a { t.Errorf("expected %d, got %d", e, a) } } @@ -424,12 +453,12 @@ func TestSharedInformerRemoveForeignHandler(t *testing.T) { source := fcache.NewFakeControllerSource() source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) - informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) source2 := fcache.NewFakeControllerSource() source2.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) - informer2 := NewSharedInformer(source2, &v1.Pod{}, 1*time.Second) + informer2 := NewSharedInformer(source2, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) handler1 := &ResourceEventHandlerFuncs{} handle1, err := informer.AddEventHandler(handler1) @@ -474,17 +503,6 @@ func TestSharedInformerRemoveForeignHandler(t *testing.T) { t.Errorf("handle2 not registered anymore for informer") } - // remove empty handle - empty := ResourceEventHandlerRegistration{} - if isRegistered(informer, empty) { - t.Errorf("empty registered for informer") - } - if isRegistered(informer, empty) { - t.Errorf("empty registered for informer") - } - if err := informer2.RemoveEventHandler(empty); err != nil { - t.Errorf("removing of empty handle failed: %s", err) - } if eventHandlerCount(informer) != 2 { t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer)) } @@ -555,7 +573,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { source := fcache.NewFakeControllerSource() source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) - informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) handler1 := &ResourceEventHandlerFuncs{} reg1, err := informer.AddEventHandler(handler1) @@ -706,17 +724,25 @@ func TestAddOnStoppedSharedInformer(t *testing.T) { stop := make(chan struct{}) go informer.Run(stop) close(stop) - if !checkCondition(informer.IsStopped) { + + err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) { + if informer.IsStopped() { + return true, nil + } + return false, nil + }) + + if err != nil { t.Errorf("informer reports not to be stopped although stop channel closed") return } - _, err := informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod) + _, err = informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod) if err == nil { t.Errorf("stopped informer did not reject add handler") return } - if !strings.HasSuffix(err.Error(), "is not added to shared informer because it has stopped already") { + if !strings.HasSuffix(err.Error(), "was not added to shared informer because it has stopped already") { t.Errorf("adding handler to a stopped informer yields unexpected error: %s", err) return } @@ -757,11 +783,8 @@ func TestRemoveWhileActive(t *testing.T) { // create the shared informer and resync every 12 hours informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer) - listener := &genericTestListener{name: "listener"} + listener := newTestListener("listener", 0, "pod1") handle, _ := informer.AddEventHandler(listener) - listener.action = func(obj interface{}) { - informer.RemoveEventHandler(handle) - } stop := make(chan struct{}) defer close(stop) @@ -769,17 +792,24 @@ func TestRemoveWhileActive(t *testing.T) { go informer.Run(stop) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) - if !listener.ok(func() bool { - return listener.getCount() == 1 - }) { + if !listener.ok() { t.Errorf("event did not occur") return } + informer.RemoveEventHandler(handle) + if isRegistered(informer, handle) { t.Errorf("handle is still active after successful remove") return } + + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}}) + + if !listener.ok() { + t.Errorf("unexpected event occurred") + return + } } func TestAddWhileActive(t *testing.T) { @@ -788,17 +818,9 @@ func TestAddWhileActive(t *testing.T) { // create the shared informer and resync every 12 hours informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer) - - listener1 := &genericTestListener{name: "listener1"} - listener2 := &genericTestListener{name: "listener2"} - + listener1 := newTestListener("originalListener", 0, "pod1") + listener2 := newTestListener("originalListener", 0, "pod1", "pod2") handle1, _ := informer.AddEventHandler(listener1) - var handle2 ResourceEventHandlerRegistration - listener1.action = func(obj interface{}) { - listener1.action = nil - handle2, _ = informer.AddEventHandler(listener2) - source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}}) - } stop := make(chan struct{}) defer close(stop) @@ -806,16 +828,15 @@ func TestAddWhileActive(t *testing.T) { go informer.Run(stop) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) - if !listener1.ok(func() bool { - return listener1.getCount() == 2 - }) { + if !listener1.ok() { t.Errorf("events on listener1 did not occur") return } - if !listener2.ok(func() bool { - return listener1.getCount() == 2 - }) { + handle2, _ := informer.AddEventHandler(listener2) + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}}) + + if !listener2.ok() { t.Errorf("event on listener2 did not occur") return } @@ -828,63 +849,10 @@ func TestAddWhileActive(t *testing.T) { t.Errorf("handle2 is not active") return } -} -//////////////////////////////////////////////////////////////////////////////// - -type genericTestListener struct { - lock sync.RWMutex - name string - count int - action func(obj interface{}) -} - -func (l *genericTestListener) OnAdd(obj interface{}) { - l.handle(obj) -} - -func (l *genericTestListener) OnUpdate(old, new interface{}) { - l.handle(new) -} - -func (l *genericTestListener) OnDelete(obj interface{}) { -} - -func (l *genericTestListener) handle(obj interface{}) { - l.lock.Lock() - defer l.lock.Unlock() - l.count++ - if l.action != nil { - l.action(obj) + listener1.expectedItemNames = listener2.expectedItemNames + if !listener1.ok() { + t.Errorf("events on listener1 did not occur") + return } - fmt.Printf("%s: got event\n", l.name) -} - -func (l *genericTestListener) getCount() int { - l.lock.RLock() - defer l.lock.RUnlock() - return l.count -} - -func (l *genericTestListener) ok(check func() bool) bool { - return checkCondition(check) -} - -func checkCondition(check func() bool) bool { - fmt.Println("polling") - err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) { - if check() { - return true, nil - } - return false, nil - }) - if err != nil { - return false - } - - // wait just a bit to allow any unexpected stragglers to come in - fmt.Println("sleeping") - time.Sleep(1 * time.Second) - fmt.Println("final check") - return check() }