mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 22:46:12 +00:00
switch listeners to use a map, adapt tests
This commit is contained in:
parent
e9cd17170b
commit
063ef090e7
@ -29,7 +29,7 @@ import (
|
||||
|
||||
apps "k8s.io/api/apps/v1"
|
||||
autoscalingv1 "k8s.io/api/autoscaling/v1"
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
@ -45,7 +45,6 @@ import (
|
||||
appslisters "k8s.io/client-go/listers/apps/v1"
|
||||
v1listers "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
appsinternal "k8s.io/kubernetes/pkg/apis/apps"
|
||||
appsconversion "k8s.io/kubernetes/pkg/apis/apps/v1"
|
||||
apiv1 "k8s.io/kubernetes/pkg/apis/core/v1"
|
||||
|
@ -131,7 +131,6 @@ import (
|
||||
// A delete notification exposes the last locally known non-absent
|
||||
// state, except that its ResourceVersion is replaced with a
|
||||
// ResourceVersion in which the object is actually absent.
|
||||
//
|
||||
type SharedInformer interface {
|
||||
// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
|
||||
// period. Events to a single handler are delivered sequentially, but there is no coordination
|
||||
@ -158,14 +157,7 @@ type SharedInformer interface {
|
||||
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error)
|
||||
// RemoveEventHandler removes a formerly added event handler given by
|
||||
// its registration handle.
|
||||
// If, for some reason, the same handler has been added multiple
|
||||
// times, only the registration for the given registration handle
|
||||
// will be removed.
|
||||
// It returns an error if the handler cannot be removed,
|
||||
// because the informer is already stopped.
|
||||
// Calling Remove on an already removed handle returns no error
|
||||
// because the handler is finally (still) removed after calling this
|
||||
// method.
|
||||
// This function is guaranteed to be idempotent, and thread-safe.
|
||||
RemoveEventHandler(handle ResourceEventHandlerRegistration) error
|
||||
// GetStore returns the informer's local cache as a Store.
|
||||
GetStore() Store
|
||||
@ -218,16 +210,10 @@ type SharedInformer interface {
|
||||
IsStopped() bool
|
||||
}
|
||||
|
||||
// ResourceEventHandlerRegistration is a handle returned by the
|
||||
// registration methods of SharedInformers for a registered
|
||||
// ResourceEventHandler. It can be used later on to remove
|
||||
// a registration again.
|
||||
// This indirection is required, because the ResourceEventHandler
|
||||
// interface can be implemented by non-go-comparable handlers, which
|
||||
// could not be removed from a list anymore.
|
||||
type ResourceEventHandlerRegistration struct {
|
||||
listener *processorListener
|
||||
}
|
||||
// Opaque interface representing the registration of ResourceEventHandler for
|
||||
// a SharedInformer. Must be supplied back to the same SharedInformer's
|
||||
// `RemoveEventHandler` to unregister the handlers.
|
||||
type ResourceEventHandlerRegistration interface{}
|
||||
|
||||
// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
|
||||
type SharedIndexInformer interface {
|
||||
@ -402,10 +388,6 @@ type deleteNotification struct {
|
||||
oldObj interface{}
|
||||
}
|
||||
|
||||
func (s *sharedIndexInformer) isRegistered(h ResourceEventHandlerRegistration) bool {
|
||||
return s.processor.isRegistered(h.listener)
|
||||
}
|
||||
|
||||
func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) error {
|
||||
s.startedLock.Lock()
|
||||
defer s.startedLock.Unlock()
|
||||
@ -554,8 +536,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
|
||||
defer s.startedLock.Unlock()
|
||||
|
||||
if s.stopped {
|
||||
klog.V(2).Infof("Handler %v is not added to shared informer because it has stopped already", handler)
|
||||
return ResourceEventHandlerRegistration{}, fmt.Errorf("handler %v is not added to shared informer because it has stopped already", handler)
|
||||
return nil, fmt.Errorf("handler %v was not added to shared informer because it has stopped already", handler)
|
||||
}
|
||||
|
||||
if resyncPeriod > 0 {
|
||||
@ -579,11 +560,9 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
|
||||
}
|
||||
|
||||
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
|
||||
handle := ResourceEventHandlerRegistration{listener}
|
||||
|
||||
if !s.started {
|
||||
s.processor.addListener(listener)
|
||||
return handle, nil
|
||||
return s.processor.addListener(listener), nil
|
||||
}
|
||||
|
||||
// in order to safely join, we have to
|
||||
@ -594,7 +573,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
|
||||
s.blockDeltas.Lock()
|
||||
defer s.blockDeltas.Unlock()
|
||||
|
||||
s.processor.addListener(listener)
|
||||
handle := s.processor.addListener(listener)
|
||||
for _, item := range s.indexer.List() {
|
||||
listener.add(addNotification{newObj: item})
|
||||
}
|
||||
@ -655,31 +634,17 @@ func (s *sharedIndexInformer) IsStopped() bool {
|
||||
return s.stopped
|
||||
}
|
||||
|
||||
// RemoveEventHandler tries to remove a formerly added event handler by its
|
||||
// registration handle returned for its registration.
|
||||
// If a handler has been added multiple times, only the actual registration for the
|
||||
// handler will be removed. Other registrations of the same handler will still be
|
||||
// active until they are explicitly removes, also.
|
||||
func (s *sharedIndexInformer) RemoveEventHandler(handle ResourceEventHandlerRegistration) error {
|
||||
s.startedLock.Lock()
|
||||
defer s.startedLock.Unlock()
|
||||
|
||||
if handle.listener == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if s.stopped {
|
||||
return nil
|
||||
}
|
||||
|
||||
// in order to safely remove, we have to
|
||||
// 1. stop sending add/update/delete notifications
|
||||
// 2. remove and stop listener
|
||||
// 3. unblock
|
||||
s.blockDeltas.Lock()
|
||||
defer s.blockDeltas.Unlock()
|
||||
s.processor.removeListener(handle.listener)
|
||||
return nil
|
||||
return s.processor.removeListener(handle)
|
||||
}
|
||||
|
||||
// sharedProcessor has a collection of processorListener and can
|
||||
@ -691,82 +656,77 @@ func (s *sharedIndexInformer) RemoveEventHandler(handle ResourceEventHandlerRegi
|
||||
type sharedProcessor struct {
|
||||
listenersStarted bool
|
||||
listenersLock sync.RWMutex
|
||||
listeners []*processorListener
|
||||
syncingListeners []*processorListener
|
||||
clock clock.Clock
|
||||
wg wait.Group
|
||||
// Map from listeners to whether or not they are currently syncing
|
||||
listeners map[*processorListener]bool
|
||||
clock clock.Clock
|
||||
wg wait.Group
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) isRegistered(listener *processorListener) bool {
|
||||
func (p *sharedProcessor) getListener(registration ResourceEventHandlerRegistration) *processorListener {
|
||||
p.listenersLock.Lock()
|
||||
defer p.listenersLock.Unlock()
|
||||
|
||||
for i := 0; i < len(p.listeners); i++ {
|
||||
l := p.listeners[i]
|
||||
if l == listener {
|
||||
return true
|
||||
if p.listeners == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if result, ok := registration.(*processorListener); ok {
|
||||
if _, exists := p.listeners[result]; exists {
|
||||
return result
|
||||
}
|
||||
}
|
||||
return false
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) addListener(listener *processorListener) {
|
||||
func (p *sharedProcessor) addListener(listener *processorListener) ResourceEventHandlerRegistration {
|
||||
p.listenersLock.Lock()
|
||||
defer p.listenersLock.Unlock()
|
||||
|
||||
p.addListenerLocked(listener)
|
||||
if p.listeners == nil {
|
||||
p.listeners = make(map[*processorListener]bool)
|
||||
}
|
||||
|
||||
p.listeners[listener] = true
|
||||
|
||||
if p.listenersStarted {
|
||||
p.wg.Start(listener.run)
|
||||
p.wg.Start(listener.pop)
|
||||
}
|
||||
|
||||
return listener
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
|
||||
p.listeners = append(p.listeners, listener)
|
||||
p.syncingListeners = append(p.syncingListeners, listener)
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) removeListener(listener *processorListener) {
|
||||
func (p *sharedProcessor) removeListener(handle ResourceEventHandlerRegistration) error {
|
||||
p.listenersLock.Lock()
|
||||
defer p.listenersLock.Unlock()
|
||||
|
||||
if p.removeListenerLocked(listener) {
|
||||
if p.listenersStarted {
|
||||
close(listener.addCh)
|
||||
}
|
||||
listener, ok := handle.(*processorListener)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid key type %t", handle)
|
||||
} else if p.listeners == nil {
|
||||
// No listeners are registered, do nothing
|
||||
return nil
|
||||
} else if _, exists := p.listeners[listener]; !exists {
|
||||
// Listener is not registered, just do nothing
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) removeListenerLocked(listener *processorListener) bool {
|
||||
found := false
|
||||
for i := 0; i < len(p.listeners); i++ {
|
||||
l := p.listeners[i]
|
||||
if l == listener {
|
||||
p.listeners = append(p.listeners[:i], p.listeners[i+1:]...)
|
||||
found = true
|
||||
break
|
||||
}
|
||||
delete(p.listeners, listener)
|
||||
|
||||
if p.listenersStarted {
|
||||
close(listener.addCh)
|
||||
}
|
||||
for i := 0; i < len(p.syncingListeners); i++ {
|
||||
l := p.syncingListeners[i]
|
||||
if l == listener {
|
||||
p.syncingListeners = append(p.syncingListeners[:i], p.syncingListeners[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
return found
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
|
||||
p.listenersLock.RLock()
|
||||
defer p.listenersLock.RUnlock()
|
||||
|
||||
if sync {
|
||||
for _, listener := range p.syncingListeners {
|
||||
listener.add(obj)
|
||||
}
|
||||
} else {
|
||||
for _, listener := range p.listeners {
|
||||
for listener, isSyncing := range p.listeners {
|
||||
if !sync || isSyncing {
|
||||
listener.add(obj)
|
||||
}
|
||||
}
|
||||
@ -776,19 +736,24 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) {
|
||||
func() {
|
||||
p.listenersLock.RLock()
|
||||
defer p.listenersLock.RUnlock()
|
||||
for _, listener := range p.listeners {
|
||||
for listener := range p.listeners {
|
||||
p.wg.Start(listener.run)
|
||||
p.wg.Start(listener.pop)
|
||||
}
|
||||
p.listenersStarted = true
|
||||
}()
|
||||
<-stopCh
|
||||
p.listenersLock.RLock()
|
||||
defer p.listenersLock.RUnlock()
|
||||
for _, listener := range p.listeners {
|
||||
|
||||
p.listenersLock.Lock()
|
||||
defer p.listenersLock.Unlock()
|
||||
for listener := range p.listeners {
|
||||
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
|
||||
}
|
||||
p.wg.Wait() // Wait for all .pop() and .run() to stop
|
||||
|
||||
// Wipe out list of listeners since they are now closed
|
||||
// (processorListener cannot be re-used)
|
||||
p.listeners = nil
|
||||
}
|
||||
|
||||
// shouldResync queries every listener to determine if any of them need a resync, based on each
|
||||
@ -797,19 +762,20 @@ func (p *sharedProcessor) shouldResync() bool {
|
||||
p.listenersLock.Lock()
|
||||
defer p.listenersLock.Unlock()
|
||||
|
||||
p.syncingListeners = []*processorListener{}
|
||||
|
||||
resyncNeeded := false
|
||||
now := p.clock.Now()
|
||||
for _, listener := range p.listeners {
|
||||
for listener := range p.listeners {
|
||||
// need to loop through all the listeners to see if they need to resync so we can prepare any
|
||||
// listeners that are going to be resyncing.
|
||||
if listener.shouldResync(now) {
|
||||
shouldResync := listener.shouldResync(now)
|
||||
p.listeners[listener] = shouldResync
|
||||
|
||||
if shouldResync {
|
||||
resyncNeeded = true
|
||||
p.syncingListeners = append(p.syncingListeners, listener)
|
||||
listener.determineNextResync(now)
|
||||
}
|
||||
}
|
||||
|
||||
return resyncNeeded
|
||||
}
|
||||
|
||||
@ -817,8 +783,9 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Durati
|
||||
p.listenersLock.RLock()
|
||||
defer p.listenersLock.RUnlock()
|
||||
|
||||
for _, listener := range p.listeners {
|
||||
resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod)
|
||||
for listener := range p.listeners {
|
||||
resyncPeriod := determineResyncPeriod(
|
||||
listener.requestedResyncPeriod, resyncCheckPeriod)
|
||||
listener.setResyncPeriod(resyncPeriod)
|
||||
}
|
||||
}
|
||||
|
@ -33,32 +33,60 @@ import (
|
||||
)
|
||||
|
||||
type testListener struct {
|
||||
genericTestListener
|
||||
lock sync.RWMutex
|
||||
resyncPeriod time.Duration
|
||||
expectedItemNames sets.String
|
||||
receivedItemNames []string
|
||||
name string
|
||||
}
|
||||
|
||||
func newTestListener(name string, resyncPeriod time.Duration, expected ...string) *testListener {
|
||||
l := &testListener{
|
||||
genericTestListener: genericTestListener{name: name},
|
||||
resyncPeriod: resyncPeriod,
|
||||
expectedItemNames: sets.NewString(expected...),
|
||||
resyncPeriod: resyncPeriod,
|
||||
expectedItemNames: sets.NewString(expected...),
|
||||
name: name,
|
||||
}
|
||||
l.genericTestListener.action = l.action
|
||||
return l
|
||||
}
|
||||
|
||||
func (l *testListener) action(obj interface{}) {
|
||||
func (l *testListener) OnAdd(obj interface{}) {
|
||||
l.handle(obj)
|
||||
}
|
||||
|
||||
func (l *testListener) OnUpdate(old, new interface{}) {
|
||||
l.handle(new)
|
||||
}
|
||||
|
||||
func (l *testListener) OnDelete(obj interface{}) {
|
||||
}
|
||||
|
||||
func (l *testListener) handle(obj interface{}) {
|
||||
key, _ := MetaNamespaceKeyFunc(obj)
|
||||
fmt.Printf("%s: handle: %v\n", l.name, key)
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
|
||||
objectMeta, _ := meta.Accessor(obj)
|
||||
l.receivedItemNames = append(l.receivedItemNames, objectMeta.GetName())
|
||||
}
|
||||
|
||||
func (l *testListener) ok() bool {
|
||||
return l.genericTestListener.ok(l.satisfiedExpectations)
|
||||
fmt.Println("polling")
|
||||
err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) {
|
||||
if l.satisfiedExpectations() {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// wait just a bit to allow any unexpected stragglers to come in
|
||||
fmt.Println("sleeping")
|
||||
time.Sleep(1 * time.Second)
|
||||
fmt.Println("final check")
|
||||
return l.satisfiedExpectations()
|
||||
}
|
||||
|
||||
func (l *testListener) satisfiedExpectations() bool {
|
||||
@ -84,7 +112,7 @@ func isStarted(i SharedInformer) bool {
|
||||
|
||||
func isRegistered(i SharedInformer, h ResourceEventHandlerRegistration) bool {
|
||||
s := i.(*sharedIndexInformer)
|
||||
return s.isRegistered(h)
|
||||
return s.processor.getListener(h) != nil
|
||||
}
|
||||
|
||||
func TestListenerResyncPeriods(t *testing.T) {
|
||||
@ -187,59 +215,60 @@ func TestResyncCheckPeriod(t *testing.T) {
|
||||
|
||||
// listener 1, never resync
|
||||
listener1 := newTestListener("listener1", 0)
|
||||
informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod)
|
||||
listener1Registration, _ := informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod)
|
||||
|
||||
if e, a := 12*time.Hour, informer.resyncCheckPeriod; e != a {
|
||||
t.Errorf("expected %d, got %d", e, a)
|
||||
}
|
||||
if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a {
|
||||
if e, a := time.Duration(0), informer.processor.getListener(listener1Registration).resyncPeriod; e != a {
|
||||
t.Errorf("expected %d, got %d", e, a)
|
||||
}
|
||||
|
||||
// listener 2, resync every minute
|
||||
listener2 := newTestListener("listener2", 1*time.Minute)
|
||||
informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod)
|
||||
listener2Registration, _ := informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod)
|
||||
if e, a := 1*time.Minute, informer.resyncCheckPeriod; e != a {
|
||||
t.Errorf("expected %d, got %d", e, a)
|
||||
}
|
||||
if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a {
|
||||
if e, a := time.Duration(0), informer.processor.getListener(listener1Registration).resyncPeriod; e != a {
|
||||
t.Errorf("expected %d, got %d", e, a)
|
||||
}
|
||||
if e, a := 1*time.Minute, informer.processor.listeners[1].resyncPeriod; e != a {
|
||||
if e, a := 1*time.Minute, informer.processor.getListener(listener2Registration).resyncPeriod; e != a {
|
||||
t.Errorf("expected %d, got %d", e, a)
|
||||
}
|
||||
|
||||
// listener 3, resync every 55 seconds
|
||||
listener3 := newTestListener("listener3", 55*time.Second)
|
||||
informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod)
|
||||
listener3Registration, _ := informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod)
|
||||
if e, a := 55*time.Second, informer.resyncCheckPeriod; e != a {
|
||||
t.Errorf("expected %d, got %d", e, a)
|
||||
}
|
||||
if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a {
|
||||
if e, a := time.Duration(0), informer.processor.getListener(listener1Registration).resyncPeriod; e != a {
|
||||
t.Errorf("expected %d, got %d", e, a)
|
||||
}
|
||||
if e, a := 1*time.Minute, informer.processor.listeners[1].resyncPeriod; e != a {
|
||||
if e, a := 1*time.Minute, informer.processor.getListener(listener2Registration).resyncPeriod; e != a {
|
||||
t.Errorf("expected %d, got %d", e, a)
|
||||
}
|
||||
if e, a := 55*time.Second, informer.processor.listeners[2].resyncPeriod; e != a {
|
||||
if e, a := 55*time.Second, informer.processor.getListener(listener3Registration).resyncPeriod; e != a {
|
||||
t.Errorf("expected %d, got %d", e, a)
|
||||
}
|
||||
|
||||
// listener 4, resync every 5 seconds
|
||||
listener4 := newTestListener("listener4", 5*time.Second)
|
||||
informer.AddEventHandlerWithResyncPeriod(listener4, listener4.resyncPeriod)
|
||||
listener4Registration, _ := informer.AddEventHandlerWithResyncPeriod(listener4, listener4.resyncPeriod)
|
||||
if e, a := 5*time.Second, informer.resyncCheckPeriod; e != a {
|
||||
t.Errorf("expected %d, got %d", e, a)
|
||||
}
|
||||
if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a {
|
||||
if e, a := time.Duration(0), informer.processor.getListener(listener1Registration).resyncPeriod; e != a {
|
||||
t.Errorf("expected %d, got %d", e, a)
|
||||
}
|
||||
if e, a := 1*time.Minute, informer.processor.listeners[1].resyncPeriod; e != a {
|
||||
if e, a := 1*time.Minute, informer.processor.getListener(listener2Registration).resyncPeriod; e != a {
|
||||
t.Errorf("expected %d, got %d", e, a)
|
||||
}
|
||||
if e, a := 55*time.Second, informer.processor.listeners[2].resyncPeriod; e != a {
|
||||
if e, a := 55*time.Second, informer.processor.getListener(listener3Registration).resyncPeriod; e != a {
|
||||
t.Errorf("expected %d, got %d", e, a)
|
||||
}
|
||||
if e, a := 5*time.Second, informer.processor.listeners[3].resyncPeriod; e != a {
|
||||
if e, a := 5*time.Second, informer.processor.getListener(listener4Registration).resyncPeriod; e != a {
|
||||
t.Errorf("expected %d, got %d", e, a)
|
||||
}
|
||||
}
|
||||
@ -424,12 +453,12 @@ func TestSharedInformerRemoveForeignHandler(t *testing.T) {
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
|
||||
|
||||
source2 := fcache.NewFakeControllerSource()
|
||||
source2.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
|
||||
informer2 := NewSharedInformer(source2, &v1.Pod{}, 1*time.Second)
|
||||
informer2 := NewSharedInformer(source2, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
|
||||
|
||||
handler1 := &ResourceEventHandlerFuncs{}
|
||||
handle1, err := informer.AddEventHandler(handler1)
|
||||
@ -474,17 +503,6 @@ func TestSharedInformerRemoveForeignHandler(t *testing.T) {
|
||||
t.Errorf("handle2 not registered anymore for informer")
|
||||
}
|
||||
|
||||
// remove empty handle
|
||||
empty := ResourceEventHandlerRegistration{}
|
||||
if isRegistered(informer, empty) {
|
||||
t.Errorf("empty registered for informer")
|
||||
}
|
||||
if isRegistered(informer, empty) {
|
||||
t.Errorf("empty registered for informer")
|
||||
}
|
||||
if err := informer2.RemoveEventHandler(empty); err != nil {
|
||||
t.Errorf("removing of empty handle failed: %s", err)
|
||||
}
|
||||
if eventHandlerCount(informer) != 2 {
|
||||
t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer))
|
||||
}
|
||||
@ -555,7 +573,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) {
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
|
||||
|
||||
handler1 := &ResourceEventHandlerFuncs{}
|
||||
reg1, err := informer.AddEventHandler(handler1)
|
||||
@ -706,17 +724,25 @@ func TestAddOnStoppedSharedInformer(t *testing.T) {
|
||||
stop := make(chan struct{})
|
||||
go informer.Run(stop)
|
||||
close(stop)
|
||||
if !checkCondition(informer.IsStopped) {
|
||||
|
||||
err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) {
|
||||
if informer.IsStopped() {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("informer reports not to be stopped although stop channel closed")
|
||||
return
|
||||
}
|
||||
|
||||
_, err := informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
|
||||
_, err = informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
|
||||
if err == nil {
|
||||
t.Errorf("stopped informer did not reject add handler")
|
||||
return
|
||||
}
|
||||
if !strings.HasSuffix(err.Error(), "is not added to shared informer because it has stopped already") {
|
||||
if !strings.HasSuffix(err.Error(), "was not added to shared informer because it has stopped already") {
|
||||
t.Errorf("adding handler to a stopped informer yields unexpected error: %s", err)
|
||||
return
|
||||
}
|
||||
@ -757,11 +783,8 @@ func TestRemoveWhileActive(t *testing.T) {
|
||||
// create the shared informer and resync every 12 hours
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer)
|
||||
|
||||
listener := &genericTestListener{name: "listener"}
|
||||
listener := newTestListener("listener", 0, "pod1")
|
||||
handle, _ := informer.AddEventHandler(listener)
|
||||
listener.action = func(obj interface{}) {
|
||||
informer.RemoveEventHandler(handle)
|
||||
}
|
||||
|
||||
stop := make(chan struct{})
|
||||
defer close(stop)
|
||||
@ -769,17 +792,24 @@ func TestRemoveWhileActive(t *testing.T) {
|
||||
go informer.Run(stop)
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
|
||||
if !listener.ok(func() bool {
|
||||
return listener.getCount() == 1
|
||||
}) {
|
||||
if !listener.ok() {
|
||||
t.Errorf("event did not occur")
|
||||
return
|
||||
}
|
||||
|
||||
informer.RemoveEventHandler(handle)
|
||||
|
||||
if isRegistered(informer, handle) {
|
||||
t.Errorf("handle is still active after successful remove")
|
||||
return
|
||||
}
|
||||
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}})
|
||||
|
||||
if !listener.ok() {
|
||||
t.Errorf("unexpected event occurred")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddWhileActive(t *testing.T) {
|
||||
@ -788,17 +818,9 @@ func TestAddWhileActive(t *testing.T) {
|
||||
|
||||
// create the shared informer and resync every 12 hours
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer)
|
||||
|
||||
listener1 := &genericTestListener{name: "listener1"}
|
||||
listener2 := &genericTestListener{name: "listener2"}
|
||||
|
||||
listener1 := newTestListener("originalListener", 0, "pod1")
|
||||
listener2 := newTestListener("originalListener", 0, "pod1", "pod2")
|
||||
handle1, _ := informer.AddEventHandler(listener1)
|
||||
var handle2 ResourceEventHandlerRegistration
|
||||
listener1.action = func(obj interface{}) {
|
||||
listener1.action = nil
|
||||
handle2, _ = informer.AddEventHandler(listener2)
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}})
|
||||
}
|
||||
|
||||
stop := make(chan struct{})
|
||||
defer close(stop)
|
||||
@ -806,16 +828,15 @@ func TestAddWhileActive(t *testing.T) {
|
||||
go informer.Run(stop)
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
|
||||
if !listener1.ok(func() bool {
|
||||
return listener1.getCount() == 2
|
||||
}) {
|
||||
if !listener1.ok() {
|
||||
t.Errorf("events on listener1 did not occur")
|
||||
return
|
||||
}
|
||||
|
||||
if !listener2.ok(func() bool {
|
||||
return listener1.getCount() == 2
|
||||
}) {
|
||||
handle2, _ := informer.AddEventHandler(listener2)
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}})
|
||||
|
||||
if !listener2.ok() {
|
||||
t.Errorf("event on listener2 did not occur")
|
||||
return
|
||||
}
|
||||
@ -828,63 +849,10 @@ func TestAddWhileActive(t *testing.T) {
|
||||
t.Errorf("handle2 is not active")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
type genericTestListener struct {
|
||||
lock sync.RWMutex
|
||||
name string
|
||||
count int
|
||||
action func(obj interface{})
|
||||
}
|
||||
|
||||
func (l *genericTestListener) OnAdd(obj interface{}) {
|
||||
l.handle(obj)
|
||||
}
|
||||
|
||||
func (l *genericTestListener) OnUpdate(old, new interface{}) {
|
||||
l.handle(new)
|
||||
}
|
||||
|
||||
func (l *genericTestListener) OnDelete(obj interface{}) {
|
||||
}
|
||||
|
||||
func (l *genericTestListener) handle(obj interface{}) {
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
l.count++
|
||||
if l.action != nil {
|
||||
l.action(obj)
|
||||
listener1.expectedItemNames = listener2.expectedItemNames
|
||||
if !listener1.ok() {
|
||||
t.Errorf("events on listener1 did not occur")
|
||||
return
|
||||
}
|
||||
fmt.Printf("%s: got event\n", l.name)
|
||||
}
|
||||
|
||||
func (l *genericTestListener) getCount() int {
|
||||
l.lock.RLock()
|
||||
defer l.lock.RUnlock()
|
||||
return l.count
|
||||
}
|
||||
|
||||
func (l *genericTestListener) ok(check func() bool) bool {
|
||||
return checkCondition(check)
|
||||
}
|
||||
|
||||
func checkCondition(check func() bool) bool {
|
||||
fmt.Println("polling")
|
||||
err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) {
|
||||
if check() {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// wait just a bit to allow any unexpected stragglers to come in
|
||||
fmt.Println("sleeping")
|
||||
time.Sleep(1 * time.Second)
|
||||
fmt.Println("final check")
|
||||
return check()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user