mirror of
https://github.com/kubernetes/client-go.git
synced 2025-08-15 22:13:09 +00:00
active remove/add tests for event handlers
Kubernetes-commit: e9cd17170b1646672796e713dee6c4fb0e6693bb
This commit is contained in:
parent
de4dd3aaf3
commit
90c6a46b32
14
tools/cache/shared_informer.go
vendored
14
tools/cache/shared_informer.go
vendored
@ -156,7 +156,7 @@ type SharedInformer interface {
|
||||
// It returns a registration handle for the handler that can be used to remove
|
||||
// the handler again and an error if the handler cannot be added.
|
||||
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error)
|
||||
// RemoveEventHandlerByRegistration removes a formerly added event handler given by
|
||||
// RemoveEventHandler removes a formerly added event handler given by
|
||||
// its registration handle.
|
||||
// If, for some reason, the same handler has been added multiple
|
||||
// times, only the registration for the given registration handle
|
||||
@ -166,7 +166,7 @@ type SharedInformer interface {
|
||||
// Calling Remove on an already removed handle returns no error
|
||||
// because the handler is finally (still) removed after calling this
|
||||
// method.
|
||||
RemoveEventHandlerByRegistration(handle ResourceEventHandlerRegistration) error
|
||||
RemoveEventHandler(handle ResourceEventHandlerRegistration) error
|
||||
// GetStore returns the informer's local cache as a Store.
|
||||
GetStore() Store
|
||||
// GetController is deprecated, it does nothing useful
|
||||
@ -655,12 +655,12 @@ func (s *sharedIndexInformer) IsStopped() bool {
|
||||
return s.stopped
|
||||
}
|
||||
|
||||
// RemoveEventHandlerByRegistration tries to remove a formerly added event handler by its
|
||||
// RemoveEventHandler tries to remove a formerly added event handler by its
|
||||
// registration handle returned for its registration.
|
||||
// If a handler has been added multiple times, only the actual registration for the
|
||||
// handler will be removed. Other registrations of the same handler will still be
|
||||
// active until they are explicitly removes, also.
|
||||
func (s *sharedIndexInformer) RemoveEventHandlerByRegistration(handle ResourceEventHandlerRegistration) error {
|
||||
func (s *sharedIndexInformer) RemoveEventHandler(handle ResourceEventHandlerRegistration) error {
|
||||
s.startedLock.Lock()
|
||||
defer s.startedLock.Unlock()
|
||||
|
||||
@ -669,7 +669,7 @@ func (s *sharedIndexInformer) RemoveEventHandlerByRegistration(handle ResourceEv
|
||||
}
|
||||
|
||||
if s.stopped {
|
||||
return fmt.Errorf("handler %v is not removed from shared informer because it has stopped already", handle.listener.handler)
|
||||
return nil
|
||||
}
|
||||
|
||||
// in order to safely remove, we have to
|
||||
@ -743,15 +743,15 @@ func (p *sharedProcessor) removeListenerLocked(listener *processorListener) bool
|
||||
l := p.listeners[i]
|
||||
if l == listener {
|
||||
p.listeners = append(p.listeners[:i], p.listeners[i+1:]...)
|
||||
i--
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
for i := 0; i < len(p.syncingListeners); i++ {
|
||||
l := p.syncingListeners[i]
|
||||
if l == listener {
|
||||
p.syncingListeners = append(p.syncingListeners[:i], p.syncingListeners[i+1:]...)
|
||||
i--
|
||||
break
|
||||
}
|
||||
}
|
||||
return found
|
||||
|
221
tools/cache/shared_informer_test.go
vendored
221
tools/cache/shared_informer_test.go
vendored
@ -33,60 +33,32 @@ import (
|
||||
)
|
||||
|
||||
type testListener struct {
|
||||
lock sync.RWMutex
|
||||
genericTestListener
|
||||
resyncPeriod time.Duration
|
||||
expectedItemNames sets.String
|
||||
receivedItemNames []string
|
||||
name string
|
||||
}
|
||||
|
||||
func newTestListener(name string, resyncPeriod time.Duration, expected ...string) *testListener {
|
||||
l := &testListener{
|
||||
resyncPeriod: resyncPeriod,
|
||||
expectedItemNames: sets.NewString(expected...),
|
||||
name: name,
|
||||
genericTestListener: genericTestListener{name: name},
|
||||
resyncPeriod: resyncPeriod,
|
||||
expectedItemNames: sets.NewString(expected...),
|
||||
}
|
||||
l.genericTestListener.action = l.action
|
||||
return l
|
||||
}
|
||||
|
||||
func (l *testListener) OnAdd(obj interface{}) {
|
||||
l.handle(obj)
|
||||
}
|
||||
|
||||
func (l *testListener) OnUpdate(old, new interface{}) {
|
||||
l.handle(new)
|
||||
}
|
||||
|
||||
func (l *testListener) OnDelete(obj interface{}) {
|
||||
}
|
||||
|
||||
func (l *testListener) handle(obj interface{}) {
|
||||
func (l *testListener) action(obj interface{}) {
|
||||
key, _ := MetaNamespaceKeyFunc(obj)
|
||||
fmt.Printf("%s: handle: %v\n", l.name, key)
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
|
||||
objectMeta, _ := meta.Accessor(obj)
|
||||
l.receivedItemNames = append(l.receivedItemNames, objectMeta.GetName())
|
||||
}
|
||||
|
||||
func (l *testListener) ok() bool {
|
||||
fmt.Println("polling")
|
||||
err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) {
|
||||
if l.satisfiedExpectations() {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// wait just a bit to allow any unexpected stragglers to come in
|
||||
fmt.Println("sleeping")
|
||||
time.Sleep(1 * time.Second)
|
||||
fmt.Println("final check")
|
||||
return l.satisfiedExpectations()
|
||||
return l.genericTestListener.ok(l.satisfiedExpectations)
|
||||
}
|
||||
|
||||
func (l *testListener) satisfiedExpectations() bool {
|
||||
@ -433,14 +405,14 @@ func TestSharedInformerRemoveHandler(t *testing.T) {
|
||||
t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer))
|
||||
}
|
||||
|
||||
if err := informer.RemoveEventHandlerByRegistration(handle2); err != nil {
|
||||
if err := informer.RemoveEventHandler(handle2); err != nil {
|
||||
t.Errorf("removing of second pointer handler failed: %s", err)
|
||||
}
|
||||
if eventHandlerCount(informer) != 1 {
|
||||
t.Errorf("after removing handler informer has %d registered handler(s), instead of 1", eventHandlerCount(informer))
|
||||
}
|
||||
|
||||
if err := informer.RemoveEventHandlerByRegistration(handle1); err != nil {
|
||||
if err := informer.RemoveEventHandler(handle1); err != nil {
|
||||
t.Errorf("removing of first pointer handler failed: %s", err)
|
||||
}
|
||||
if eventHandlerCount(informer) != 0 {
|
||||
@ -486,7 +458,7 @@ func TestSharedInformerRemoveForeignHandler(t *testing.T) {
|
||||
if isRegistered(informer2, handle2) {
|
||||
t.Errorf("handle2 registered for informer2")
|
||||
}
|
||||
if err := informer2.RemoveEventHandlerByRegistration(handle1); err != nil {
|
||||
if err := informer2.RemoveEventHandler(handle1); err != nil {
|
||||
t.Errorf("removing of second pointer handler failed: %s", err)
|
||||
}
|
||||
if eventHandlerCount(informer) != 2 {
|
||||
@ -510,7 +482,7 @@ func TestSharedInformerRemoveForeignHandler(t *testing.T) {
|
||||
if isRegistered(informer, empty) {
|
||||
t.Errorf("empty registered for informer")
|
||||
}
|
||||
if err := informer2.RemoveEventHandlerByRegistration(empty); err != nil {
|
||||
if err := informer2.RemoveEventHandler(empty); err != nil {
|
||||
t.Errorf("removing of empty handle failed: %s", err)
|
||||
}
|
||||
if eventHandlerCount(informer) != 2 {
|
||||
@ -526,14 +498,14 @@ func TestSharedInformerRemoveForeignHandler(t *testing.T) {
|
||||
t.Errorf("handle2 not registered anymore for informer")
|
||||
}
|
||||
|
||||
if err := informer.RemoveEventHandlerByRegistration(handle2); err != nil {
|
||||
if err := informer.RemoveEventHandler(handle2); err != nil {
|
||||
t.Errorf("removing of second pointer handler failed: %s", err)
|
||||
}
|
||||
if eventHandlerCount(informer) != 1 {
|
||||
t.Errorf("after removing handler informer has %d registered handler(s), instead of 1", eventHandlerCount(informer))
|
||||
}
|
||||
|
||||
if err := informer.RemoveEventHandlerByRegistration(handle1); err != nil {
|
||||
if err := informer.RemoveEventHandler(handle1); err != nil {
|
||||
t.Errorf("removing of first pointer handler failed: %s", err)
|
||||
}
|
||||
if eventHandlerCount(informer) != 0 {
|
||||
@ -564,14 +536,14 @@ func TestSharedInformerRemoveNonComparableHandler(t *testing.T) {
|
||||
t.Errorf("informer has %d registered handler(s), instead of 2", eventHandlerCount(informer))
|
||||
}
|
||||
|
||||
if err := informer.RemoveEventHandlerByRegistration(handle2); err != nil {
|
||||
if err := informer.RemoveEventHandler(handle2); err != nil {
|
||||
t.Errorf("removing of pointer handler failed: %s", err)
|
||||
}
|
||||
if eventHandlerCount(informer) != 1 {
|
||||
t.Errorf("after removal informer has %d registered handler(s), instead of 1", eventHandlerCount(informer))
|
||||
}
|
||||
|
||||
if err := informer.RemoveEventHandlerByRegistration(handle1); err != nil {
|
||||
if err := informer.RemoveEventHandler(handle1); err != nil {
|
||||
t.Errorf("removing of non-pointer handler failed: %s", err)
|
||||
}
|
||||
if eventHandlerCount(informer) != 0 {
|
||||
@ -609,10 +581,10 @@ func TestSharedInformerMultipleRegistration(t *testing.T) {
|
||||
}
|
||||
|
||||
if eventHandlerCount(informer) != 2 {
|
||||
t.Errorf("informer has %d registered handler(s), instead of 1", eventHandlerCount(informer))
|
||||
t.Errorf("informer has %d registered handler(s), instead of 2", eventHandlerCount(informer))
|
||||
}
|
||||
|
||||
if err := informer.RemoveEventHandlerByRegistration(reg1); err != nil {
|
||||
if err := informer.RemoveEventHandler(reg1); err != nil {
|
||||
t.Errorf("removing of duplicate handler registration failed: %s", err)
|
||||
}
|
||||
|
||||
@ -633,7 +605,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
if err := informer.RemoveEventHandlerByRegistration(reg2); err != nil {
|
||||
if err := informer.RemoveEventHandler(reg2); err != nil {
|
||||
t.Errorf("removing of second handler registration failed: %s", err)
|
||||
}
|
||||
|
||||
@ -659,7 +631,7 @@ func TestRemovingRemovedSharedInformer(t *testing.T) {
|
||||
t.Errorf("informer did not add handler for the first time: %s", err)
|
||||
return
|
||||
}
|
||||
if err := informer.RemoveEventHandlerByRegistration(reg); err != nil {
|
||||
if err := informer.RemoveEventHandler(reg); err != nil {
|
||||
t.Errorf("removing of handler registration failed: %s", err)
|
||||
return
|
||||
}
|
||||
@ -667,7 +639,7 @@ func TestRemovingRemovedSharedInformer(t *testing.T) {
|
||||
t.Errorf("handle is still active after successful remove")
|
||||
return
|
||||
}
|
||||
if err := informer.RemoveEventHandlerByRegistration(reg); err != nil {
|
||||
if err := informer.RemoveEventHandler(reg); err != nil {
|
||||
t.Errorf("removing of already removed registration yields unexpected error: %s", err)
|
||||
}
|
||||
if isRegistered(informer, reg) {
|
||||
@ -734,9 +706,7 @@ func TestAddOnStoppedSharedInformer(t *testing.T) {
|
||||
stop := make(chan struct{})
|
||||
go informer.Run(stop)
|
||||
close(stop)
|
||||
fmt.Println("sleeping")
|
||||
time.Sleep(1 * time.Second)
|
||||
if !informer.IsStopped() {
|
||||
if !checkCondition(informer.IsStopped) {
|
||||
t.Errorf("informer reports not to be stopped although stop channel closed")
|
||||
return
|
||||
}
|
||||
@ -773,13 +743,148 @@ func TestRemoveOnStoppedSharedInformer(t *testing.T) {
|
||||
t.Errorf("informer reports not to be stopped although stop channel closed")
|
||||
return
|
||||
}
|
||||
err = informer.RemoveEventHandlerByRegistration(handle)
|
||||
if err == nil {
|
||||
t.Errorf("informer removes handler on stopped informer")
|
||||
return
|
||||
}
|
||||
if !strings.HasSuffix(err.Error(), "is not removed from shared informer because it has stopped already") {
|
||||
t.Errorf("removing handler from stopped informer yield unexpected error: %s", err)
|
||||
err = informer.RemoveEventHandler(handle)
|
||||
if err != nil {
|
||||
t.Errorf("informer does not remove handler on stopped informer")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoveWhileActive(t *testing.T) {
|
||||
// source simulates an apiserver object endpoint.
|
||||
source := fcache.NewFakeControllerSource()
|
||||
|
||||
// create the shared informer and resync every 12 hours
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer)
|
||||
|
||||
listener := &genericTestListener{name: "listener"}
|
||||
handle, _ := informer.AddEventHandler(listener)
|
||||
listener.action = func(obj interface{}) {
|
||||
informer.RemoveEventHandler(handle)
|
||||
}
|
||||
|
||||
stop := make(chan struct{})
|
||||
defer close(stop)
|
||||
|
||||
go informer.Run(stop)
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
|
||||
if !listener.ok(func() bool {
|
||||
return listener.getCount() == 1
|
||||
}) {
|
||||
t.Errorf("event did not occur")
|
||||
return
|
||||
}
|
||||
|
||||
if isRegistered(informer, handle) {
|
||||
t.Errorf("handle is still active after successful remove")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddWhileActive(t *testing.T) {
|
||||
// source simulates an apiserver object endpoint.
|
||||
source := fcache.NewFakeControllerSource()
|
||||
|
||||
// create the shared informer and resync every 12 hours
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer)
|
||||
|
||||
listener1 := &genericTestListener{name: "listener1"}
|
||||
listener2 := &genericTestListener{name: "listener2"}
|
||||
|
||||
handle1, _ := informer.AddEventHandler(listener1)
|
||||
var handle2 ResourceEventHandlerRegistration
|
||||
listener1.action = func(obj interface{}) {
|
||||
listener1.action = nil
|
||||
handle2, _ = informer.AddEventHandler(listener2)
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}})
|
||||
}
|
||||
|
||||
stop := make(chan struct{})
|
||||
defer close(stop)
|
||||
|
||||
go informer.Run(stop)
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
|
||||
if !listener1.ok(func() bool {
|
||||
return listener1.getCount() == 2
|
||||
}) {
|
||||
t.Errorf("events on listener1 did not occur")
|
||||
return
|
||||
}
|
||||
|
||||
if !listener2.ok(func() bool {
|
||||
return listener1.getCount() == 2
|
||||
}) {
|
||||
t.Errorf("event on listener2 did not occur")
|
||||
return
|
||||
}
|
||||
|
||||
if !isRegistered(informer, handle1) {
|
||||
t.Errorf("handle1 is not active")
|
||||
return
|
||||
}
|
||||
if !isRegistered(informer, handle2) {
|
||||
t.Errorf("handle2 is not active")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
type genericTestListener struct {
|
||||
lock sync.RWMutex
|
||||
name string
|
||||
count int
|
||||
action func(obj interface{})
|
||||
}
|
||||
|
||||
func (l *genericTestListener) OnAdd(obj interface{}) {
|
||||
l.handle(obj)
|
||||
}
|
||||
|
||||
func (l *genericTestListener) OnUpdate(old, new interface{}) {
|
||||
l.handle(new)
|
||||
}
|
||||
|
||||
func (l *genericTestListener) OnDelete(obj interface{}) {
|
||||
}
|
||||
|
||||
func (l *genericTestListener) handle(obj interface{}) {
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
l.count++
|
||||
if l.action != nil {
|
||||
l.action(obj)
|
||||
}
|
||||
fmt.Printf("%s: got event\n", l.name)
|
||||
}
|
||||
|
||||
func (l *genericTestListener) getCount() int {
|
||||
l.lock.RLock()
|
||||
defer l.lock.RUnlock()
|
||||
return l.count
|
||||
}
|
||||
|
||||
func (l *genericTestListener) ok(check func() bool) bool {
|
||||
return checkCondition(check)
|
||||
}
|
||||
|
||||
func checkCondition(check func() bool) bool {
|
||||
fmt.Println("polling")
|
||||
err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) {
|
||||
if check() {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// wait just a bit to allow any unexpected stragglers to come in
|
||||
fmt.Println("sleeping")
|
||||
time.Sleep(1 * time.Second)
|
||||
fmt.Println("final check")
|
||||
return check()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user