support removal of event handlers from SharedIndexInformers

To be able to implement controllers that are dynamically deciding
on which resources to watch, it is required to get rid of
dedicated watches and event handlers again. This requires the
possibility to remove event handlers from SharedIndexInformers again.
Stopping an informer is not sufficient, because there might
be multiple controllers in a controller manager that independently
decide which resources to watch.

Unfortunately the ResourceEventHandler interface encourages to use
value objects for handlers (like the ResourceEventHandlerFuncs
struct, that uses value receivers to implement the interface).
Go does not support comparison of function pointers and therefore
the comparison of such structs is not possible, also. To be able
to remove all kinds of handlers and to solve the problem of
multi-registrations of handlers a registration handle is introduced.
It is returned when adding a handler and can later be used to remove
the registration again. This handle directly stores the created
listener to simplify the deletion.

Kubernetes-commit: 7436af3302088c979b431856c432b95dd230f847
This commit is contained in:
Alexander Zielenski
2022-06-16 00:07:54 -07:00
committed by Kubernetes Publisher
parent d28c73639f
commit ecdc8bf729
2 changed files with 426 additions and 8 deletions

View File

@@ -390,3 +390,288 @@ func TestSharedInformerTransformer(t *testing.T) {
t.Errorf("%s: expected %v, got %v", listenerTransformer.name, listenerTransformer.expectedItemNames, listenerTransformer.receivedItemNames)
}
}
func TestSharedInformerRemoveHandler(t *testing.T) {
source := fcache.NewFakeControllerSource()
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
handler1 := &ResourceEventHandlerFuncs{}
handle1, err := informer.AddEventHandler(handler1)
if err != nil {
t.Errorf("informer did not add handler1: %s", err)
return
}
handler2 := &ResourceEventHandlerFuncs{}
handle2, err := informer.AddEventHandler(handler2)
if err != nil {
t.Errorf("informer did not add handler2: %s", err)
return
}
if informer.EventHandlerCount() != 2 {
t.Errorf("informer has %d registered handler, instead of 2", informer.EventHandlerCount())
}
if err := informer.RemoveEventHandlerByHandle(handle2); err != nil {
t.Errorf("removing of first pointer handler failed: %s", err)
}
if informer.EventHandlerCount() != 1 {
t.Errorf("after removing handler informer has %d registered handler(s), instead of 1", informer.EventHandlerCount())
}
if err := informer.RemoveEventHandlerByHandle(handle1); err != nil {
t.Errorf("removing of second pointer handler failed: %s", err)
}
if informer.EventHandlerCount() != 0 {
t.Errorf("informer still has registered handlers after removing both handlers")
}
}
func TestSharedInformerRemoveNonComparableHandler(t *testing.T) {
source := fcache.NewFakeControllerSource()
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
handler1 := ResourceEventHandlerFuncs{}
handle1, err := informer.AddEventHandler(handler1)
if err != nil {
t.Errorf("informer did not add handler1: %s", err)
return
}
handler2 := &ResourceEventHandlerFuncs{}
handle2, err := informer.AddEventHandler(handler2)
if err != nil {
t.Errorf("informer did not add handler2: %s", err)
return
}
if informer.EventHandlerCount() != 2 {
t.Errorf("informer has %d registered handler(s), instead of 2", informer.EventHandlerCount())
}
if err := informer.RemoveEventHandlerByHandle(handle2); err != nil {
t.Errorf("removing of pointer handler failed: %s", err)
}
if informer.EventHandlerCount() != 1 {
t.Errorf("after removal informer has %d registered handler(s), instead of 1", informer.EventHandlerCount())
}
if err := informer.RemoveEventHandlerByHandle(handle1); err != nil {
t.Errorf("removing of non-pointer handler failed: %s", err)
}
if informer.EventHandlerCount() != 0 {
t.Errorf("after removal informer has %d registered handler(s), instead of 0", informer.EventHandlerCount())
}
}
func TestSharedInformerMultipleRegistration(t *testing.T) {
source := fcache.NewFakeControllerSource()
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
handler1 := &ResourceEventHandlerFuncs{}
reg1, err := informer.AddEventHandler(handler1)
if err != nil {
t.Errorf("informer did not add handler for the first time: %s", err)
return
}
if !reg1.IsActive() {
t.Errorf("handle1 is not active after successful registration")
return
}
reg2, err := informer.AddEventHandler(handler1)
if err != nil {
t.Errorf("informer did not add handler for the second: %s", err)
return
}
if !reg2.IsActive() {
t.Errorf("handle2 is not active after successful registration")
return
}
if informer.EventHandlerCount() != 2 {
t.Errorf("informer has %d registered handler(s), instead of 1", informer.EventHandlerCount())
}
if err := informer.RemoveEventHandlerByHandle(reg1); err != nil {
t.Errorf("removing of duplicate handler registration failed: %s", err)
}
if reg1.IsActive() {
t.Errorf("handle1 is still active after successful remove")
return
}
if !reg2.IsActive() {
t.Errorf("handle2 is not active after removing handle1")
return
}
if informer.EventHandlerCount() != 1 {
if informer.EventHandlerCount() == 0 {
t.Errorf("informer has no registered handler anymore after removal of duplicate registrations")
} else {
t.Errorf("informer has unexpected number (%d) of handlers after removal of duplicate handler registration", informer.EventHandlerCount())
}
}
if err := informer.RemoveEventHandlerByHandle(reg2); err != nil {
t.Errorf("removing of second handler registration failed: %s", err)
}
if reg2.IsActive() {
t.Errorf("handle2 is still active after successful remove")
return
}
if informer.EventHandlerCount() != 0 {
t.Errorf("informer has unexpected number (%d) of handlers after removal of second handler registrations", informer.EventHandlerCount())
}
}
func TestRemovingRemovedSharedInformer(t *testing.T) {
source := fcache.NewFakeControllerSource()
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
handler := &ResourceEventHandlerFuncs{}
reg, err := informer.AddEventHandler(handler)
if err != nil {
t.Errorf("informer did not add handler for the first time: %s", err)
return
}
if err := informer.RemoveEventHandlerByHandle(reg); err != nil {
t.Errorf("removing of handler registration failed: %s", err)
return
}
if reg.IsActive() {
t.Errorf("handle is still active after successful remove")
return
}
if err := informer.RemoveEventHandlerByHandle(reg); err != nil {
t.Errorf("removing of already removed registration yields unexpected error: %s", err)
}
if reg.IsActive() {
t.Errorf("handle is still active after second remove")
return
}
}
func TestStateSharedInformer(t *testing.T) {
source := fcache.NewFakeControllerSource()
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
listener := newTestListener("listener", 0, "pod1")
informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
if informer.IsStarted() {
t.Errorf("informer already started after creation")
return
}
if informer.IsStopped() {
t.Errorf("informer already stopped after creation")
return
}
stop := make(chan struct{})
go informer.Run(stop)
if !listener.ok() {
t.Errorf("informer did not report initial objects")
close(stop)
return
}
if !informer.IsStarted() {
t.Errorf("informer does not report to be started although handling events")
close(stop)
return
}
if informer.IsStopped() {
t.Errorf("informer reports to be stopped although stop channel not closed")
close(stop)
return
}
close(stop)
fmt.Println("sleeping")
time.Sleep(1 * time.Second)
if !informer.IsStopped() {
t.Errorf("informer reports not to be stopped although stop channel closed")
return
}
if !informer.IsStarted() {
t.Errorf("informer reports not to be started after it has been started and stopped")
return
}
}
func TestAddOnStoppedSharedInformer(t *testing.T) {
source := fcache.NewFakeControllerSource()
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
listener := newTestListener("listener", 0, "pod1")
stop := make(chan struct{})
go informer.Run(stop)
close(stop)
fmt.Println("sleeping")
time.Sleep(1 * time.Second)
if !informer.IsStopped() {
t.Errorf("informer reports not to be stopped although stop channel closed")
return
}
handle, err := informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
if err == nil {
t.Errorf("stopped informer did not reject add handler")
return
}
if !strings.HasSuffix(err.Error(), "is not added to shared informer because it has stopped already") {
t.Errorf("adding handler to a stopped informer yields unexpected error: %s", err)
return
}
if handle != nil {
t.Errorf("got handle for added handler on stopped informer")
return
}
}
func TestRemoveOnStoppedSharedInformer(t *testing.T) {
source := fcache.NewFakeControllerSource()
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
listener := newTestListener("listener", 0, "pod1")
handle, err := informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
if err != nil {
t.Errorf("informer did not add handler: %s", err)
return
}
stop := make(chan struct{})
go informer.Run(stop)
close(stop)
fmt.Println("sleeping")
time.Sleep(1 * time.Second)
if !informer.IsStopped() {
t.Errorf("informer reports not to be stopped although stop channel closed")
return
}
err = informer.RemoveEventHandlerByHandle(handle)
if err == nil {
t.Errorf("informer removes handler on stopped informer")
return
}
if !strings.HasSuffix(err.Error(), "is not removed from shared informer because it has stopped already") {
t.Errorf("removing handler from stopped informer yield unexpected error: %s", err)
return
}
}