mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-01 07:47:56 +00:00
support removal of event handlers from SharedIndexInformers
To be able to implement controllers that are dynamically deciding on which resources to watch, it is required to get rid of dedicated watches and event handlers again. This requires the possibility to remove event handlers from SharedIndexInformers again. Stopping an informer is not sufficient, because there might be multiple controllers in a controller manager that independently decide which resources to watch. Unfortunately the ResourceEventHandler interface encourages to use value objects for handlers (like the ResourceEventHandlerFuncs struct, that uses value receivers to implement the interface). Go does not support comparison of function pointers and therefore the comparison of such structs is not possible, also. To be able to remove all kinds of handlers and to solve the problem of multi-registrations of handlers a registration handle is introduced. It is returned when adding a handler and can later be used to remove the registration again. This handle directly stores the created listener to simplify the deletion.
This commit is contained in:
parent
759785ea14
commit
7436af3302
@ -45,6 +45,7 @@ import (
|
||||
appslisters "k8s.io/client-go/listers/apps/v1"
|
||||
v1listers "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
appsinternal "k8s.io/kubernetes/pkg/apis/apps"
|
||||
appsconversion "k8s.io/kubernetes/pkg/apis/apps/v1"
|
||||
apiv1 "k8s.io/kubernetes/pkg/apis/core/v1"
|
||||
@ -69,12 +70,12 @@ type conversionInformer struct {
|
||||
cache.SharedIndexInformer
|
||||
}
|
||||
|
||||
func (i conversionInformer) AddEventHandler(handler cache.ResourceEventHandler) {
|
||||
i.SharedIndexInformer.AddEventHandler(conversionEventHandler{handler})
|
||||
func (i conversionInformer) AddEventHandler(handler cache.ResourceEventHandler) (*cache.ResourceEventHandlerHandle, error) {
|
||||
return i.SharedIndexInformer.AddEventHandler(conversionEventHandler{handler})
|
||||
}
|
||||
|
||||
func (i conversionInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) {
|
||||
i.SharedIndexInformer.AddEventHandlerWithResyncPeriod(conversionEventHandler{handler}, resyncPeriod)
|
||||
func (i conversionInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) (*cache.ResourceEventHandlerHandle, error) {
|
||||
return i.SharedIndexInformer.AddEventHandlerWithResyncPeriod(conversionEventHandler{handler}, resyncPeriod)
|
||||
}
|
||||
|
||||
type conversionLister struct {
|
||||
|
@ -131,11 +131,24 @@ import (
|
||||
// A delete notification exposes the last locally known non-absent
|
||||
// state, except that its ResourceVersion is replaced with a
|
||||
// ResourceVersion in which the object is actually absent.
|
||||
//
|
||||
// The informational methods (EventHandlerCount, IsStopped, IsStarted)
|
||||
// are intended to be used to manage informers in any upper informer
|
||||
// management layer for creating and destroying informers on-the fly when
|
||||
// adding or removing handlers.
|
||||
// Beware of race conditions: Such a layer must hide the basic informer
|
||||
// objects from its users and offer a closed *synchronized* view for informers.
|
||||
// Although these informational methods are synchronized each, in
|
||||
// sequences the state queried first might have been changed before
|
||||
// calling the next method, if other callers (or the stop channel)
|
||||
// are able to interact with the informer interface in parallel.
|
||||
type SharedInformer interface {
|
||||
// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
|
||||
// period. Events to a single handler are delivered sequentially, but there is no coordination
|
||||
// between different handlers.
|
||||
AddEventHandler(handler ResourceEventHandler)
|
||||
// It returns a handle for the handler that can be used to remove
|
||||
// the handler again.
|
||||
AddEventHandler(handler ResourceEventHandler) (*ResourceEventHandlerHandle, error)
|
||||
// AddEventHandlerWithResyncPeriod adds an event handler to the
|
||||
// shared informer with the requested resync period; zero means
|
||||
// this handler does not care about resyncs. The resync operation
|
||||
@ -150,7 +163,20 @@ type SharedInformer interface {
|
||||
// between any two resyncs may be longer than the nominal period
|
||||
// because the implementation takes time to do work and there may
|
||||
// be competing load and scheduling noise.
|
||||
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
|
||||
// It returns a handle for the handler that can be used to remove
|
||||
// the handler again and an error if the handler cannot be added.
|
||||
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (*ResourceEventHandlerHandle, error)
|
||||
// RemoveEventHandlerByHandle removes a formerly added event handler given by
|
||||
// its registration handle.
|
||||
// If, for some reason, the same handler has been added multiple
|
||||
// times, only the registration for the given registration handle
|
||||
// will be removed.
|
||||
// It returns an error if the handler cannot be removed,
|
||||
// because the informer is already stopped.
|
||||
// Calling Remove on an already removed handle returns no error
|
||||
// because the handler is finally (still) removed after calling this
|
||||
// method.
|
||||
RemoveEventHandlerByHandle(handle *ResourceEventHandlerHandle) error
|
||||
// GetStore returns the informer's local cache as a Store.
|
||||
GetStore() Store
|
||||
// GetController is deprecated, it does nothing useful
|
||||
@ -195,6 +221,35 @@ type SharedInformer interface {
|
||||
// transform before mutating it at all and returning the copy to prevent
|
||||
// data races.
|
||||
SetTransform(handler TransformFunc) error
|
||||
|
||||
// EventHandlerCount return the number of actually registered
|
||||
// event handlers.
|
||||
EventHandlerCount() int
|
||||
|
||||
// IsStopped reports whether the informer has already been stopped.
|
||||
// Adding event handlers to already stopped informers is not possible.
|
||||
IsStopped() bool
|
||||
|
||||
// IsStarted reports whether the informer has already been started
|
||||
IsStarted() bool
|
||||
}
|
||||
|
||||
// ResourceEventHandlerHandle is a handle returned by the
|
||||
// registration methods of SharedInformers for a registered
|
||||
// ResourceEventHandler. It can be used later on to remove
|
||||
// a registration again.
|
||||
// This indirection is required, because the ResourceEventHandler
|
||||
// interface can be implemented by non-go-comparable handlers, which
|
||||
// could not be removed from a list anymore.
|
||||
type ResourceEventHandlerHandle struct {
|
||||
listener *processorListener
|
||||
}
|
||||
|
||||
// IsActive reports whether this registration is still active
|
||||
// meaning that the handler registered with this handle is
|
||||
// still registered.
|
||||
func (h *ResourceEventHandlerHandle) IsActive() bool {
|
||||
return h.listener != nil
|
||||
}
|
||||
|
||||
// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
|
||||
@ -492,8 +547,8 @@ func (s *sharedIndexInformer) GetController() Controller {
|
||||
return &dummyController{informer: s}
|
||||
}
|
||||
|
||||
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
|
||||
s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
|
||||
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (*ResourceEventHandlerHandle, error) {
|
||||
return s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
|
||||
}
|
||||
|
||||
func determineResyncPeriod(desired, check time.Duration) time.Duration {
|
||||
@ -513,13 +568,13 @@ func determineResyncPeriod(desired, check time.Duration) time.Duration {
|
||||
|
||||
const minimumResyncPeriod = 1 * time.Second
|
||||
|
||||
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
|
||||
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (*ResourceEventHandlerHandle, error) {
|
||||
s.startedLock.Lock()
|
||||
defer s.startedLock.Unlock()
|
||||
|
||||
if s.stopped {
|
||||
klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
|
||||
return
|
||||
klog.V(2).Infof("Handler %v is not added to shared informer because it has stopped already", handler)
|
||||
return nil, fmt.Errorf("handler %v is not added to shared informer because it has stopped already", handler)
|
||||
}
|
||||
|
||||
if resyncPeriod > 0 {
|
||||
@ -543,10 +598,11 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
|
||||
}
|
||||
|
||||
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
|
||||
handle := &ResourceEventHandlerHandle{listener}
|
||||
|
||||
if !s.started {
|
||||
s.processor.addListener(listener)
|
||||
return
|
||||
return handle, nil
|
||||
}
|
||||
|
||||
// in order to safely join, we have to
|
||||
@ -561,6 +617,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
|
||||
for _, item := range s.indexer.List() {
|
||||
listener.add(addNotification{newObj: item})
|
||||
}
|
||||
return handle, nil
|
||||
}
|
||||
|
||||
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
|
||||
@ -610,6 +667,55 @@ func (s *sharedIndexInformer) OnDelete(old interface{}) {
|
||||
s.processor.distribute(deleteNotification{oldObj: old}, false)
|
||||
}
|
||||
|
||||
// IsStarted reports whether the informer has already been started
|
||||
func (s *sharedIndexInformer) IsStarted() bool {
|
||||
s.startedLock.Lock()
|
||||
defer s.startedLock.Unlock()
|
||||
return s.started
|
||||
}
|
||||
|
||||
// IsStopped reports whether the informer has already been stopped
|
||||
func (s *sharedIndexInformer) IsStopped() bool {
|
||||
s.startedLock.Lock()
|
||||
defer s.startedLock.Unlock()
|
||||
return s.stopped
|
||||
}
|
||||
|
||||
// EventHandlerCount reports whether the informer still has registered
|
||||
// event handlers
|
||||
func (s *sharedIndexInformer) EventHandlerCount() int {
|
||||
s.startedLock.Lock()
|
||||
defer s.startedLock.Unlock()
|
||||
return len(s.processor.listeners)
|
||||
}
|
||||
|
||||
// RemoveEventHandlerByHandle tries to remove a formerly added event handler by its
|
||||
// handle returned for its registration.
|
||||
// If a handler has been added multiple times, only the registration for the
|
||||
// given handle will be removed.
|
||||
func (s *sharedIndexInformer) RemoveEventHandlerByHandle(handle *ResourceEventHandlerHandle) error {
|
||||
if handle.listener == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
s.startedLock.Lock()
|
||||
defer s.startedLock.Unlock()
|
||||
|
||||
if s.stopped {
|
||||
return fmt.Errorf("handler %v is not removed from shared informer because it has stopped already", handle.listener.handler)
|
||||
}
|
||||
|
||||
// in order to safely remove, we have to
|
||||
// 1. stop sending add/update/delete notifications
|
||||
// 2. remove and stop listener
|
||||
// 3. unblock
|
||||
s.blockDeltas.Lock()
|
||||
defer s.blockDeltas.Unlock()
|
||||
s.processor.removeListener(handle.listener)
|
||||
handle.listener = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// sharedProcessor has a collection of processorListener and can
|
||||
// distribute a notification object to its listeners. There are two
|
||||
// kinds of distribute operations. The sync distributions go to a
|
||||
@ -641,6 +747,33 @@ func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
|
||||
p.syncingListeners = append(p.syncingListeners, listener)
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) removeListener(listener *processorListener) {
|
||||
p.listenersLock.Lock()
|
||||
defer p.listenersLock.Unlock()
|
||||
|
||||
p.removeListenerLocked(listener)
|
||||
if p.listenersStarted {
|
||||
close(listener.addCh)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) removeListenerLocked(listener *processorListener) {
|
||||
for i := 0; i < len(p.listeners); i++ {
|
||||
l := p.listeners[i]
|
||||
if l == listener {
|
||||
p.listeners = append(p.listeners[:i], p.listeners[i+1:]...)
|
||||
i--
|
||||
}
|
||||
}
|
||||
for i := 0; i < len(p.syncingListeners); i++ {
|
||||
l := p.syncingListeners[i]
|
||||
if l == listener {
|
||||
p.syncingListeners = append(p.syncingListeners[:i], p.syncingListeners[i+1:]...)
|
||||
i--
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
|
||||
p.listenersLock.RLock()
|
||||
defer p.listenersLock.RUnlock()
|
||||
|
@ -390,3 +390,288 @@ func TestSharedInformerTransformer(t *testing.T) {
|
||||
t.Errorf("%s: expected %v, got %v", listenerTransformer.name, listenerTransformer.expectedItemNames, listenerTransformer.receivedItemNames)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSharedInformerRemoveHandler(t *testing.T) {
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
|
||||
|
||||
handler1 := &ResourceEventHandlerFuncs{}
|
||||
handle1, err := informer.AddEventHandler(handler1)
|
||||
if err != nil {
|
||||
t.Errorf("informer did not add handler1: %s", err)
|
||||
return
|
||||
}
|
||||
handler2 := &ResourceEventHandlerFuncs{}
|
||||
handle2, err := informer.AddEventHandler(handler2)
|
||||
if err != nil {
|
||||
t.Errorf("informer did not add handler2: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
if informer.EventHandlerCount() != 2 {
|
||||
t.Errorf("informer has %d registered handler, instead of 2", informer.EventHandlerCount())
|
||||
}
|
||||
|
||||
if err := informer.RemoveEventHandlerByHandle(handle2); err != nil {
|
||||
t.Errorf("removing of first pointer handler failed: %s", err)
|
||||
}
|
||||
if informer.EventHandlerCount() != 1 {
|
||||
t.Errorf("after removing handler informer has %d registered handler(s), instead of 1", informer.EventHandlerCount())
|
||||
}
|
||||
|
||||
if err := informer.RemoveEventHandlerByHandle(handle1); err != nil {
|
||||
t.Errorf("removing of second pointer handler failed: %s", err)
|
||||
}
|
||||
if informer.EventHandlerCount() != 0 {
|
||||
t.Errorf("informer still has registered handlers after removing both handlers")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSharedInformerRemoveNonComparableHandler(t *testing.T) {
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
|
||||
|
||||
handler1 := ResourceEventHandlerFuncs{}
|
||||
handle1, err := informer.AddEventHandler(handler1)
|
||||
if err != nil {
|
||||
t.Errorf("informer did not add handler1: %s", err)
|
||||
return
|
||||
}
|
||||
handler2 := &ResourceEventHandlerFuncs{}
|
||||
handle2, err := informer.AddEventHandler(handler2)
|
||||
if err != nil {
|
||||
t.Errorf("informer did not add handler2: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
if informer.EventHandlerCount() != 2 {
|
||||
t.Errorf("informer has %d registered handler(s), instead of 2", informer.EventHandlerCount())
|
||||
}
|
||||
|
||||
if err := informer.RemoveEventHandlerByHandle(handle2); err != nil {
|
||||
t.Errorf("removing of pointer handler failed: %s", err)
|
||||
}
|
||||
if informer.EventHandlerCount() != 1 {
|
||||
t.Errorf("after removal informer has %d registered handler(s), instead of 1", informer.EventHandlerCount())
|
||||
}
|
||||
|
||||
if err := informer.RemoveEventHandlerByHandle(handle1); err != nil {
|
||||
t.Errorf("removing of non-pointer handler failed: %s", err)
|
||||
}
|
||||
if informer.EventHandlerCount() != 0 {
|
||||
t.Errorf("after removal informer has %d registered handler(s), instead of 0", informer.EventHandlerCount())
|
||||
}
|
||||
}
|
||||
|
||||
func TestSharedInformerMultipleRegistration(t *testing.T) {
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
|
||||
|
||||
handler1 := &ResourceEventHandlerFuncs{}
|
||||
reg1, err := informer.AddEventHandler(handler1)
|
||||
if err != nil {
|
||||
t.Errorf("informer did not add handler for the first time: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
if !reg1.IsActive() {
|
||||
t.Errorf("handle1 is not active after successful registration")
|
||||
return
|
||||
}
|
||||
|
||||
reg2, err := informer.AddEventHandler(handler1)
|
||||
if err != nil {
|
||||
t.Errorf("informer did not add handler for the second: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
if !reg2.IsActive() {
|
||||
t.Errorf("handle2 is not active after successful registration")
|
||||
return
|
||||
}
|
||||
|
||||
if informer.EventHandlerCount() != 2 {
|
||||
t.Errorf("informer has %d registered handler(s), instead of 1", informer.EventHandlerCount())
|
||||
}
|
||||
|
||||
if err := informer.RemoveEventHandlerByHandle(reg1); err != nil {
|
||||
t.Errorf("removing of duplicate handler registration failed: %s", err)
|
||||
}
|
||||
|
||||
if reg1.IsActive() {
|
||||
t.Errorf("handle1 is still active after successful remove")
|
||||
return
|
||||
}
|
||||
if !reg2.IsActive() {
|
||||
t.Errorf("handle2 is not active after removing handle1")
|
||||
return
|
||||
}
|
||||
|
||||
if informer.EventHandlerCount() != 1 {
|
||||
if informer.EventHandlerCount() == 0 {
|
||||
t.Errorf("informer has no registered handler anymore after removal of duplicate registrations")
|
||||
} else {
|
||||
t.Errorf("informer has unexpected number (%d) of handlers after removal of duplicate handler registration", informer.EventHandlerCount())
|
||||
}
|
||||
}
|
||||
|
||||
if err := informer.RemoveEventHandlerByHandle(reg2); err != nil {
|
||||
t.Errorf("removing of second handler registration failed: %s", err)
|
||||
}
|
||||
|
||||
if reg2.IsActive() {
|
||||
t.Errorf("handle2 is still active after successful remove")
|
||||
return
|
||||
}
|
||||
|
||||
if informer.EventHandlerCount() != 0 {
|
||||
t.Errorf("informer has unexpected number (%d) of handlers after removal of second handler registrations", informer.EventHandlerCount())
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemovingRemovedSharedInformer(t *testing.T) {
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
|
||||
handler := &ResourceEventHandlerFuncs{}
|
||||
reg, err := informer.AddEventHandler(handler)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("informer did not add handler for the first time: %s", err)
|
||||
return
|
||||
}
|
||||
if err := informer.RemoveEventHandlerByHandle(reg); err != nil {
|
||||
t.Errorf("removing of handler registration failed: %s", err)
|
||||
return
|
||||
}
|
||||
if reg.IsActive() {
|
||||
t.Errorf("handle is still active after successful remove")
|
||||
return
|
||||
}
|
||||
if err := informer.RemoveEventHandlerByHandle(reg); err != nil {
|
||||
t.Errorf("removing of already removed registration yields unexpected error: %s", err)
|
||||
}
|
||||
if reg.IsActive() {
|
||||
t.Errorf("handle is still active after second remove")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateSharedInformer(t *testing.T) {
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
|
||||
listener := newTestListener("listener", 0, "pod1")
|
||||
informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
|
||||
|
||||
if informer.IsStarted() {
|
||||
t.Errorf("informer already started after creation")
|
||||
return
|
||||
}
|
||||
if informer.IsStopped() {
|
||||
t.Errorf("informer already stopped after creation")
|
||||
return
|
||||
}
|
||||
stop := make(chan struct{})
|
||||
go informer.Run(stop)
|
||||
if !listener.ok() {
|
||||
t.Errorf("informer did not report initial objects")
|
||||
close(stop)
|
||||
return
|
||||
}
|
||||
|
||||
if !informer.IsStarted() {
|
||||
t.Errorf("informer does not report to be started although handling events")
|
||||
close(stop)
|
||||
return
|
||||
}
|
||||
if informer.IsStopped() {
|
||||
t.Errorf("informer reports to be stopped although stop channel not closed")
|
||||
close(stop)
|
||||
return
|
||||
}
|
||||
|
||||
close(stop)
|
||||
fmt.Println("sleeping")
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
if !informer.IsStopped() {
|
||||
t.Errorf("informer reports not to be stopped although stop channel closed")
|
||||
return
|
||||
}
|
||||
if !informer.IsStarted() {
|
||||
t.Errorf("informer reports not to be started after it has been started and stopped")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddOnStoppedSharedInformer(t *testing.T) {
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
|
||||
listener := newTestListener("listener", 0, "pod1")
|
||||
stop := make(chan struct{})
|
||||
go informer.Run(stop)
|
||||
close(stop)
|
||||
fmt.Println("sleeping")
|
||||
time.Sleep(1 * time.Second)
|
||||
if !informer.IsStopped() {
|
||||
t.Errorf("informer reports not to be stopped although stop channel closed")
|
||||
return
|
||||
}
|
||||
|
||||
handle, err := informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
|
||||
if err == nil {
|
||||
t.Errorf("stopped informer did not reject add handler")
|
||||
return
|
||||
}
|
||||
if !strings.HasSuffix(err.Error(), "is not added to shared informer because it has stopped already") {
|
||||
t.Errorf("adding handler to a stopped informer yields unexpected error: %s", err)
|
||||
return
|
||||
}
|
||||
if handle != nil {
|
||||
t.Errorf("got handle for added handler on stopped informer")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoveOnStoppedSharedInformer(t *testing.T) {
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
|
||||
listener := newTestListener("listener", 0, "pod1")
|
||||
handle, err := informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
|
||||
if err != nil {
|
||||
t.Errorf("informer did not add handler: %s", err)
|
||||
return
|
||||
}
|
||||
stop := make(chan struct{})
|
||||
go informer.Run(stop)
|
||||
close(stop)
|
||||
fmt.Println("sleeping")
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
if !informer.IsStopped() {
|
||||
t.Errorf("informer reports not to be stopped although stop channel closed")
|
||||
return
|
||||
}
|
||||
err = informer.RemoveEventHandlerByHandle(handle)
|
||||
if err == nil {
|
||||
t.Errorf("informer removes handler on stopped informer")
|
||||
return
|
||||
}
|
||||
if !strings.HasSuffix(err.Error(), "is not removed from shared informer because it has stopped already") {
|
||||
t.Errorf("removing handler from stopped informer yield unexpected error: %s", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user