Merge pull request #111122 from alexzielenski/informer

support removal of event handlers from SharedIndexInformers

Kubernetes-commit: 2969000db3fae354b7b27bc79712d244248e640e
This commit is contained in:
Kubernetes Publisher 2022-09-07 11:58:37 -07:00
commit 27c67e708a
2 changed files with 691 additions and 49 deletions

View File

@ -135,7 +135,9 @@ type SharedInformer interface {
// AddEventHandler adds an event handler to the shared informer using the shared informer's resync // AddEventHandler adds an event handler to the shared informer using the shared informer's resync
// period. Events to a single handler are delivered sequentially, but there is no coordination // period. Events to a single handler are delivered sequentially, but there is no coordination
// between different handlers. // between different handlers.
AddEventHandler(handler ResourceEventHandler) // It returns a registration handle for the handler that can be used to remove
// the handler again.
AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error)
// AddEventHandlerWithResyncPeriod adds an event handler to the // AddEventHandlerWithResyncPeriod adds an event handler to the
// shared informer with the requested resync period; zero means // shared informer with the requested resync period; zero means
// this handler does not care about resyncs. The resync operation // this handler does not care about resyncs. The resync operation
@ -150,7 +152,13 @@ type SharedInformer interface {
// between any two resyncs may be longer than the nominal period // between any two resyncs may be longer than the nominal period
// because the implementation takes time to do work and there may // because the implementation takes time to do work and there may
// be competing load and scheduling noise. // be competing load and scheduling noise.
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) // It returns a registration handle for the handler that can be used to remove
// the handler again and an error if the handler cannot be added.
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error)
// RemoveEventHandler removes a formerly added event handler given by
// its registration handle.
// This function is guaranteed to be idempotent, and thread-safe.
RemoveEventHandler(handle ResourceEventHandlerRegistration) error
// GetStore returns the informer's local cache as a Store. // GetStore returns the informer's local cache as a Store.
GetStore() Store GetStore() Store
// GetController is deprecated, it does nothing useful // GetController is deprecated, it does nothing useful
@ -195,8 +203,18 @@ type SharedInformer interface {
// transform before mutating it at all and returning the copy to prevent // transform before mutating it at all and returning the copy to prevent
// data races. // data races.
SetTransform(handler TransformFunc) error SetTransform(handler TransformFunc) error
// IsStopped reports whether the informer has already been stopped.
// Adding event handlers to already stopped informers is not possible.
// An informer already stopped will never be started again.
IsStopped() bool
} }
// Opaque interface representing the registration of ResourceEventHandler for
// a SharedInformer. Must be supplied back to the same SharedInformer's
// `RemoveEventHandler` to unregister the handlers.
type ResourceEventHandlerRegistration interface{}
// SharedIndexInformer provides add and get Indexers ability based on SharedInformer. // SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
type SharedIndexInformer interface { type SharedIndexInformer interface {
SharedInformer SharedInformer
@ -492,8 +510,8 @@ func (s *sharedIndexInformer) GetController() Controller {
return &dummyController{informer: s} return &dummyController{informer: s}
} }
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) { func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) {
s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) return s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
} }
func determineResyncPeriod(desired, check time.Duration) time.Duration { func determineResyncPeriod(desired, check time.Duration) time.Duration {
@ -513,13 +531,12 @@ func determineResyncPeriod(desired, check time.Duration) time.Duration {
const minimumResyncPeriod = 1 * time.Second const minimumResyncPeriod = 1 * time.Second
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) { func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) {
s.startedLock.Lock() s.startedLock.Lock()
defer s.startedLock.Unlock() defer s.startedLock.Unlock()
if s.stopped { if s.stopped {
klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler) return nil, fmt.Errorf("handler %v was not added to shared informer because it has stopped already", handler)
return
} }
if resyncPeriod > 0 { if resyncPeriod > 0 {
@ -545,8 +562,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize) listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
if !s.started { if !s.started {
s.processor.addListener(listener) return s.processor.addListener(listener), nil
return
} }
// in order to safely join, we have to // in order to safely join, we have to
@ -557,10 +573,11 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
s.blockDeltas.Lock() s.blockDeltas.Lock()
defer s.blockDeltas.Unlock() defer s.blockDeltas.Unlock()
s.processor.addListener(listener) handle := s.processor.addListener(listener)
for _, item := range s.indexer.List() { for _, item := range s.indexer.List() {
listener.add(addNotification{newObj: item}) listener.add(addNotification{newObj: item})
} }
return handle, nil
} }
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
@ -610,6 +627,26 @@ func (s *sharedIndexInformer) OnDelete(old interface{}) {
s.processor.distribute(deleteNotification{oldObj: old}, false) s.processor.distribute(deleteNotification{oldObj: old}, false)
} }
// IsStopped reports whether the informer has already been stopped
func (s *sharedIndexInformer) IsStopped() bool {
s.startedLock.Lock()
defer s.startedLock.Unlock()
return s.stopped
}
func (s *sharedIndexInformer) RemoveEventHandler(handle ResourceEventHandlerRegistration) error {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// in order to safely remove, we have to
// 1. stop sending add/update/delete notifications
// 2. remove and stop listener
// 3. unblock
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
return s.processor.removeListener(handle)
}
// sharedProcessor has a collection of processorListener and can // sharedProcessor has a collection of processorListener and can
// distribute a notification object to its listeners. There are two // distribute a notification object to its listeners. There are two
// kinds of distribute operations. The sync distributions go to a // kinds of distribute operations. The sync distributions go to a
@ -619,39 +656,85 @@ func (s *sharedIndexInformer) OnDelete(old interface{}) {
type sharedProcessor struct { type sharedProcessor struct {
listenersStarted bool listenersStarted bool
listenersLock sync.RWMutex listenersLock sync.RWMutex
listeners []*processorListener // Map from listeners to whether or not they are currently syncing
syncingListeners []*processorListener listeners map[*processorListener]bool
clock clock.Clock clock clock.Clock
wg wait.Group wg wait.Group
} }
func (p *sharedProcessor) addListener(listener *processorListener) { func (p *sharedProcessor) getListener(registration ResourceEventHandlerRegistration) *processorListener {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if p.listeners == nil {
return nil
}
if result, ok := registration.(*processorListener); ok {
if _, exists := p.listeners[result]; exists {
return result
}
}
return nil
}
func (p *sharedProcessor) addListener(listener *processorListener) ResourceEventHandlerRegistration {
p.listenersLock.Lock() p.listenersLock.Lock()
defer p.listenersLock.Unlock() defer p.listenersLock.Unlock()
p.addListenerLocked(listener) if p.listeners == nil {
p.listeners = make(map[*processorListener]bool)
}
p.listeners[listener] = true
if p.listenersStarted { if p.listenersStarted {
p.wg.Start(listener.run) p.wg.Start(listener.run)
p.wg.Start(listener.pop) p.wg.Start(listener.pop)
} }
return listener
} }
func (p *sharedProcessor) addListenerLocked(listener *processorListener) { func (p *sharedProcessor) removeListener(handle ResourceEventHandlerRegistration) error {
p.listeners = append(p.listeners, listener) p.listenersLock.Lock()
p.syncingListeners = append(p.syncingListeners, listener) defer p.listenersLock.Unlock()
listener, ok := handle.(*processorListener)
if !ok {
return fmt.Errorf("invalid key type %t", handle)
} else if p.listeners == nil {
// No listeners are registered, do nothing
return nil
} else if _, exists := p.listeners[listener]; !exists {
// Listener is not registered, just do nothing
return nil
}
delete(p.listeners, listener)
if p.listenersStarted {
close(listener.addCh)
}
return nil
} }
func (p *sharedProcessor) distribute(obj interface{}, sync bool) { func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock() p.listenersLock.RLock()
defer p.listenersLock.RUnlock() defer p.listenersLock.RUnlock()
if sync { for listener, isSyncing := range p.listeners {
for _, listener := range p.syncingListeners { switch {
case !sync:
// non-sync messages are delivered to every listener
listener.add(obj) listener.add(obj)
} case isSyncing:
} else { // sync messages are delivered to every syncing listenter
for _, listener := range p.listeners {
listener.add(obj) listener.add(obj)
default:
// skipping a sync obj for a non-syncing listener
} }
} }
} }
@ -660,18 +743,27 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() { func() {
p.listenersLock.RLock() p.listenersLock.RLock()
defer p.listenersLock.RUnlock() defer p.listenersLock.RUnlock()
for _, listener := range p.listeners { for listener := range p.listeners {
p.wg.Start(listener.run) p.wg.Start(listener.run)
p.wg.Start(listener.pop) p.wg.Start(listener.pop)
} }
p.listenersStarted = true p.listenersStarted = true
}() }()
<-stopCh <-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock() p.listenersLock.Lock()
for _, listener := range p.listeners { defer p.listenersLock.Unlock()
for listener := range p.listeners {
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
} }
// Wipe out list of listeners since they are now closed
// (processorListener cannot be re-used)
p.listeners = nil
// Reset to false since no listeners are running
p.listenersStarted = false
p.wg.Wait() // Wait for all .pop() and .run() to stop p.wg.Wait() // Wait for all .pop() and .run() to stop
} }
@ -681,16 +773,16 @@ func (p *sharedProcessor) shouldResync() bool {
p.listenersLock.Lock() p.listenersLock.Lock()
defer p.listenersLock.Unlock() defer p.listenersLock.Unlock()
p.syncingListeners = []*processorListener{}
resyncNeeded := false resyncNeeded := false
now := p.clock.Now() now := p.clock.Now()
for _, listener := range p.listeners { for listener := range p.listeners {
// need to loop through all the listeners to see if they need to resync so we can prepare any // need to loop through all the listeners to see if they need to resync so we can prepare any
// listeners that are going to be resyncing. // listeners that are going to be resyncing.
if listener.shouldResync(now) { shouldResync := listener.shouldResync(now)
p.listeners[listener] = shouldResync
if shouldResync {
resyncNeeded = true resyncNeeded = true
p.syncingListeners = append(p.syncingListeners, listener)
listener.determineNextResync(now) listener.determineNextResync(now)
} }
} }
@ -701,8 +793,9 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Durati
p.listenersLock.RLock() p.listenersLock.RLock()
defer p.listenersLock.RUnlock() defer p.listenersLock.RUnlock()
for _, listener := range p.listeners { for listener := range p.listeners {
resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod) resyncPeriod := determineResyncPeriod(
listener.requestedResyncPeriod, resyncCheckPeriod)
listener.setResyncPeriod(resyncPeriod) listener.setResyncPeriod(resyncPeriod)
} }
} }

View File

@ -17,7 +17,10 @@ limitations under the License.
package cache package cache
import ( import (
"context"
"fmt" "fmt"
"math/rand"
"strconv"
"strings" "strings"
"sync" "sync"
"testing" "testing"
@ -96,6 +99,25 @@ func (l *testListener) satisfiedExpectations() bool {
return sets.NewString(l.receivedItemNames...).Equal(l.expectedItemNames) return sets.NewString(l.receivedItemNames...).Equal(l.expectedItemNames)
} }
func eventHandlerCount(i SharedInformer) int {
s := i.(*sharedIndexInformer)
s.startedLock.Lock()
defer s.startedLock.Unlock()
return len(s.processor.listeners)
}
func isStarted(i SharedInformer) bool {
s := i.(*sharedIndexInformer)
s.startedLock.Lock()
defer s.startedLock.Unlock()
return s.started
}
func isRegistered(i SharedInformer, h ResourceEventHandlerRegistration) bool {
s := i.(*sharedIndexInformer)
return s.processor.getListener(h) != nil
}
func TestListenerResyncPeriods(t *testing.T) { func TestListenerResyncPeriods(t *testing.T) {
// source simulates an apiserver object endpoint. // source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource() source := fcache.NewFakeControllerSource()
@ -189,6 +211,7 @@ func TestResyncCheckPeriod(t *testing.T) {
// create the shared informer and resync every 12 hours // create the shared informer and resync every 12 hours
informer := NewSharedInformer(source, &v1.Pod{}, 12*time.Hour).(*sharedIndexInformer) informer := NewSharedInformer(source, &v1.Pod{}, 12*time.Hour).(*sharedIndexInformer)
gl := informer.processor.getListener
clock := testingclock.NewFakeClock(time.Now()) clock := testingclock.NewFakeClock(time.Now())
informer.clock = clock informer.clock = clock
@ -196,59 +219,60 @@ func TestResyncCheckPeriod(t *testing.T) {
// listener 1, never resync // listener 1, never resync
listener1 := newTestListener("listener1", 0) listener1 := newTestListener("listener1", 0)
informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod) handler1, _ := informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod)
if e, a := 12*time.Hour, informer.resyncCheckPeriod; e != a { if e, a := 12*time.Hour, informer.resyncCheckPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a { if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
// listener 2, resync every minute // listener 2, resync every minute
listener2 := newTestListener("listener2", 1*time.Minute) listener2 := newTestListener("listener2", 1*time.Minute)
informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod) handler2, _ := informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod)
if e, a := 1*time.Minute, informer.resyncCheckPeriod; e != a { if e, a := 1*time.Minute, informer.resyncCheckPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a { if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
if e, a := 1*time.Minute, informer.processor.listeners[1].resyncPeriod; e != a { if e, a := 1*time.Minute, gl(handler2).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
// listener 3, resync every 55 seconds // listener 3, resync every 55 seconds
listener3 := newTestListener("listener3", 55*time.Second) listener3 := newTestListener("listener3", 55*time.Second)
informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod) handler3, _ := informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod)
if e, a := 55*time.Second, informer.resyncCheckPeriod; e != a { if e, a := 55*time.Second, informer.resyncCheckPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a { if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
if e, a := 1*time.Minute, informer.processor.listeners[1].resyncPeriod; e != a { if e, a := 1*time.Minute, gl(handler2).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
if e, a := 55*time.Second, informer.processor.listeners[2].resyncPeriod; e != a { if e, a := 55*time.Second, gl(handler3).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
// listener 4, resync every 5 seconds // listener 4, resync every 5 seconds
listener4 := newTestListener("listener4", 5*time.Second) listener4 := newTestListener("listener4", 5*time.Second)
informer.AddEventHandlerWithResyncPeriod(listener4, listener4.resyncPeriod) handler4, _ := informer.AddEventHandlerWithResyncPeriod(listener4, listener4.resyncPeriod)
if e, a := 5*time.Second, informer.resyncCheckPeriod; e != a { if e, a := 5*time.Second, informer.resyncCheckPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a { if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
if e, a := 1*time.Minute, informer.processor.listeners[1].resyncPeriod; e != a { if e, a := 1*time.Minute, gl(handler2).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
if e, a := 55*time.Second, informer.processor.listeners[2].resyncPeriod; e != a { if e, a := 55*time.Second, gl(handler3).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
if e, a := 5*time.Second, informer.processor.listeners[3].resyncPeriod; e != a { if e, a := 5*time.Second, gl(handler4).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
} }
@ -390,3 +414,528 @@ func TestSharedInformerTransformer(t *testing.T) {
t.Errorf("%s: expected %v, got %v", listenerTransformer.name, listenerTransformer.expectedItemNames, listenerTransformer.receivedItemNames) t.Errorf("%s: expected %v, got %v", listenerTransformer.name, listenerTransformer.expectedItemNames, listenerTransformer.receivedItemNames)
} }
} }
func TestSharedInformerRemoveHandler(t *testing.T) {
source := fcache.NewFakeControllerSource()
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
handler1 := &ResourceEventHandlerFuncs{}
handle1, err := informer.AddEventHandler(handler1)
if err != nil {
t.Errorf("informer did not add handler1: %s", err)
return
}
handler2 := &ResourceEventHandlerFuncs{}
handle2, err := informer.AddEventHandler(handler2)
if err != nil {
t.Errorf("informer did not add handler2: %s", err)
return
}
if eventHandlerCount(informer) != 2 {
t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer))
}
if err := informer.RemoveEventHandler(handle2); err != nil {
t.Errorf("removing of second pointer handler failed: %s", err)
}
if eventHandlerCount(informer) != 1 {
t.Errorf("after removing handler informer has %d registered handler(s), instead of 1", eventHandlerCount(informer))
}
if err := informer.RemoveEventHandler(handle1); err != nil {
t.Errorf("removing of first pointer handler failed: %s", err)
}
if eventHandlerCount(informer) != 0 {
t.Errorf("informer still has registered handlers after removing both handlers")
}
}
func TestSharedInformerRemoveForeignHandler(t *testing.T) {
source := fcache.NewFakeControllerSource()
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
source2 := fcache.NewFakeControllerSource()
source2.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer2 := NewSharedInformer(source2, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
handler1 := &ResourceEventHandlerFuncs{}
handle1, err := informer.AddEventHandler(handler1)
if err != nil {
t.Errorf("informer did not add handler1: %s", err)
return
}
handler2 := &ResourceEventHandlerFuncs{}
handle2, err := informer.AddEventHandler(handler2)
if err != nil {
t.Errorf("informer did not add handler2: %s", err)
return
}
if eventHandlerCount(informer) != 2 {
t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer))
}
if eventHandlerCount(informer2) != 0 {
t.Errorf("informer2 has %d registered handler, instead of 0", eventHandlerCount(informer2))
}
// remove handle at foreign informer
if isRegistered(informer2, handle1) {
t.Errorf("handle1 registered for informer2")
}
if isRegistered(informer2, handle2) {
t.Errorf("handle2 registered for informer2")
}
if err := informer2.RemoveEventHandler(handle1); err != nil {
t.Errorf("removing of second pointer handler failed: %s", err)
}
if eventHandlerCount(informer) != 2 {
t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer))
}
if eventHandlerCount(informer2) != 0 {
t.Errorf("informer2 has %d registered handler, instead of 0", eventHandlerCount(informer2))
}
if !isRegistered(informer, handle1) {
t.Errorf("handle1 not registered anymore for informer")
}
if !isRegistered(informer, handle2) {
t.Errorf("handle2 not registered anymore for informer")
}
if eventHandlerCount(informer) != 2 {
t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer))
}
if eventHandlerCount(informer2) != 0 {
t.Errorf("informer2 has %d registered handler, instead of 0", eventHandlerCount(informer2))
}
if !isRegistered(informer, handle1) {
t.Errorf("handle1 not registered anymore for informer")
}
if !isRegistered(informer, handle2) {
t.Errorf("handle2 not registered anymore for informer")
}
if err := informer.RemoveEventHandler(handle2); err != nil {
t.Errorf("removing of second pointer handler failed: %s", err)
}
if eventHandlerCount(informer) != 1 {
t.Errorf("after removing handler informer has %d registered handler(s), instead of 1", eventHandlerCount(informer))
}
if err := informer.RemoveEventHandler(handle1); err != nil {
t.Errorf("removing of first pointer handler failed: %s", err)
}
if eventHandlerCount(informer) != 0 {
t.Errorf("informer still has registered handlers after removing both handlers")
}
}
func TestSharedInformerMultipleRegistration(t *testing.T) {
source := fcache.NewFakeControllerSource()
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
handler1 := &ResourceEventHandlerFuncs{}
reg1, err := informer.AddEventHandler(handler1)
if err != nil {
t.Errorf("informer did not add handler for the first time: %s", err)
return
}
if !isRegistered(informer, reg1) {
t.Errorf("handle1 is not active after successful registration")
return
}
reg2, err := informer.AddEventHandler(handler1)
if err != nil {
t.Errorf("informer did not add handler for the second: %s", err)
return
}
if !isRegistered(informer, reg2) {
t.Errorf("handle2 is not active after successful registration")
return
}
if eventHandlerCount(informer) != 2 {
t.Errorf("informer has %d registered handler(s), instead of 2", eventHandlerCount(informer))
}
if err := informer.RemoveEventHandler(reg1); err != nil {
t.Errorf("removing of duplicate handler registration failed: %s", err)
}
if isRegistered(informer, reg1) {
t.Errorf("handle1 is still active after successful remove")
return
}
if !isRegistered(informer, reg2) {
t.Errorf("handle2 is not active after removing handle1")
return
}
if eventHandlerCount(informer) != 1 {
if eventHandlerCount(informer) == 0 {
t.Errorf("informer has no registered handler anymore after removal of duplicate registrations")
} else {
t.Errorf("informer has unexpected number (%d) of handlers after removal of duplicate handler registration", eventHandlerCount(informer))
}
}
if err := informer.RemoveEventHandler(reg2); err != nil {
t.Errorf("removing of second handler registration failed: %s", err)
}
if isRegistered(informer, reg2) {
t.Errorf("handle2 is still active after successful remove")
return
}
if eventHandlerCount(informer) != 0 {
t.Errorf("informer has unexpected number (%d) of handlers after removal of second handler registrations", eventHandlerCount(informer))
}
}
func TestRemovingRemovedSharedInformer(t *testing.T) {
source := fcache.NewFakeControllerSource()
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
handler := &ResourceEventHandlerFuncs{}
reg, err := informer.AddEventHandler(handler)
if err != nil {
t.Errorf("informer did not add handler for the first time: %s", err)
return
}
if err := informer.RemoveEventHandler(reg); err != nil {
t.Errorf("removing of handler registration failed: %s", err)
return
}
if isRegistered(informer, reg) {
t.Errorf("handle is still active after successful remove")
return
}
if err := informer.RemoveEventHandler(reg); err != nil {
t.Errorf("removing of already removed registration yields unexpected error: %s", err)
}
if isRegistered(informer, reg) {
t.Errorf("handle is still active after second remove")
return
}
}
// Shows that many concurrent goroutines can be manipulating shared informer
// listeners without tripping it up. There are not really many assertions in this
// test. Meant to be run with -race to find race conditions
func TestSharedInformerHandlerAbuse(t *testing.T) {
source := fcache.NewFakeControllerSource()
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
ctx, cancel := context.WithCancel(context.Background())
informerCtx, informerCancel := context.WithCancel(context.Background())
go func() {
informer.Run(informerCtx.Done())
cancel()
}()
worker := func() {
// Keep adding and removing handler
// Make sure no duplicate events?
funcs := ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
UpdateFunc: func(oldObj, newObj interface{}) {},
DeleteFunc: func(obj interface{}) {},
}
handles := []ResourceEventHandlerRegistration{}
for {
select {
case <-ctx.Done():
return
default:
switch rand.Intn(2) {
case 0:
// Register handler again
reg, err := informer.AddEventHandlerWithResyncPeriod(funcs, 1*time.Second)
if err != nil {
if strings.Contains(err.Error(), "stopped already") {
// test is over
return
}
t.Errorf("failed to add handler: %v", err)
return
}
handles = append(handles, reg)
case 1:
// Remove a random handler
if len(handles) == 0 {
continue
}
idx := rand.Intn(len(handles))
err := informer.RemoveEventHandler(handles[idx])
if err != nil {
if strings.Contains(err.Error(), "stopped already") {
// test is over
return
}
t.Errorf("failed to remove handler: %v", err)
return
}
handles = append(handles[:idx], handles[idx+1:]...)
}
}
}
}
wg := sync.WaitGroup{}
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
worker()
wg.Done()
}()
}
objs := []*v1.Pod{}
// While workers run, randomly create events for the informer
for i := 0; i < 10000; i++ {
if len(objs) == 0 {
// Make sure there is always an object
obj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "pod" + strconv.Itoa(i),
}}
objs = append(objs, obj)
// deep copy before adding since the Modify function mutates the obj
source.Add(obj.DeepCopy())
}
switch rand.Intn(3) {
case 0:
// Add Object
obj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "pod" + strconv.Itoa(i),
}}
objs = append(objs, obj)
source.Add(obj.DeepCopy())
case 1:
// Update Object
idx := rand.Intn(len(objs))
source.Modify(objs[idx].DeepCopy())
case 2:
// Remove Object
idx := rand.Intn(len(objs))
source.Delete(objs[idx].DeepCopy())
objs = append(objs[:idx], objs[idx+1:]...)
}
}
// sotp informer which stops workers. stopping informer first to exercise
// contention for informer while it is closing
informerCancel()
// wait for workers to finish since they may throw errors
wg.Wait()
}
func TestStateSharedInformer(t *testing.T) {
source := fcache.NewFakeControllerSource()
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
listener := newTestListener("listener", 0, "pod1")
informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
if isStarted(informer) {
t.Errorf("informer already started after creation")
return
}
if informer.IsStopped() {
t.Errorf("informer already stopped after creation")
return
}
stop := make(chan struct{})
go informer.Run(stop)
if !listener.ok() {
t.Errorf("informer did not report initial objects")
close(stop)
return
}
if !isStarted(informer) {
t.Errorf("informer does not report to be started although handling events")
close(stop)
return
}
if informer.IsStopped() {
t.Errorf("informer reports to be stopped although stop channel not closed")
close(stop)
return
}
close(stop)
fmt.Println("sleeping")
time.Sleep(1 * time.Second)
if !informer.IsStopped() {
t.Errorf("informer reports not to be stopped although stop channel closed")
return
}
if !isStarted(informer) {
t.Errorf("informer reports not to be started after it has been started and stopped")
return
}
}
func TestAddOnStoppedSharedInformer(t *testing.T) {
source := fcache.NewFakeControllerSource()
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
listener := newTestListener("listener", 0, "pod1")
stop := make(chan struct{})
go informer.Run(stop)
close(stop)
err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) {
if informer.IsStopped() {
return true, nil
}
return false, nil
})
if err != nil {
t.Errorf("informer reports not to be stopped although stop channel closed")
return
}
_, err = informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
if err == nil {
t.Errorf("stopped informer did not reject add handler")
return
}
if !strings.HasSuffix(err.Error(), "was not added to shared informer because it has stopped already") {
t.Errorf("adding handler to a stopped informer yields unexpected error: %s", err)
return
}
}
func TestRemoveOnStoppedSharedInformer(t *testing.T) {
source := fcache.NewFakeControllerSource()
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
listener := newTestListener("listener", 0, "pod1")
handle, err := informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
if err != nil {
t.Errorf("informer did not add handler: %s", err)
return
}
stop := make(chan struct{})
go informer.Run(stop)
close(stop)
fmt.Println("sleeping")
time.Sleep(1 * time.Second)
if !informer.IsStopped() {
t.Errorf("informer reports not to be stopped although stop channel closed")
return
}
err = informer.RemoveEventHandler(handle)
if err != nil {
t.Errorf("informer does not remove handler on stopped informer")
return
}
}
func TestRemoveWhileActive(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
// create the shared informer and resync every 12 hours
informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer)
listener := newTestListener("listener", 0, "pod1")
handle, _ := informer.AddEventHandler(listener)
stop := make(chan struct{})
defer close(stop)
go informer.Run(stop)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
if !listener.ok() {
t.Errorf("event did not occur")
return
}
informer.RemoveEventHandler(handle)
if isRegistered(informer, handle) {
t.Errorf("handle is still active after successful remove")
return
}
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}})
if !listener.ok() {
t.Errorf("unexpected event occurred")
return
}
}
func TestAddWhileActive(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
// create the shared informer and resync every 12 hours
informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer)
listener1 := newTestListener("originalListener", 0, "pod1")
listener2 := newTestListener("originalListener", 0, "pod1", "pod2")
handle1, _ := informer.AddEventHandler(listener1)
stop := make(chan struct{})
defer close(stop)
go informer.Run(stop)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
if !listener1.ok() {
t.Errorf("events on listener1 did not occur")
return
}
handle2, _ := informer.AddEventHandler(listener2)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}})
if !listener2.ok() {
t.Errorf("event on listener2 did not occur")
return
}
if !isRegistered(informer, handle1) {
t.Errorf("handle1 is not active")
return
}
if !isRegistered(informer, handle2) {
t.Errorf("handle2 is not active")
return
}
listener1.expectedItemNames = listener2.expectedItemNames
if !listener1.ok() {
t.Errorf("events on listener1 did not occur")
return
}
}