diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index 4cec4283f..efbf5179e 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -177,6 +177,15 @@ type SharedInformer interface { // RemoveEventHandler removes a formerly added event handler given by // its registration handle. // This function is guaranteed to be idempotent, and thread-safe. + // + // Note: RemoveEventHandler is asynchronous. It stops queueing new events + // but does not wait for already-queued events to finish executing. + // Goroutines processing the remaining events may still be running and + // invoking callbacks after this function returns. + // + // If the caller needs to wait for all handlers to finish executing (for + // example, to safely close channels or release resources used by the handler), + // they should use [ShutDownEventHandler]. RemoveEventHandler(handle ResourceEventHandlerRegistration) error // GetStore returns the informer's local cache as a Store. GetStore() Store @@ -1014,6 +1023,24 @@ func (s *sharedIndexInformer) RemoveEventHandler(handle ResourceEventHandlerRegi return s.processor.removeListener(handle) } +// ShutDownEventHandler removes the event handler and blocks until it has fully +// stopped processing events. +// +// Like RemoveEventHandler, it is idempotent and thread-safe. However, it MUST NOT +// be called from within the event handler's own callbacks, as that will result +// in a deadlock. +func ShutDownEventHandler(informer SharedInformer, handle ResourceEventHandlerRegistration) error { + if err := informer.RemoveEventHandler(handle); err != nil { + return err + } + if s, ok := handle.(interface{ ShutdownChan() <-chan struct{} }); ok { + <-s.ShutdownChan() + } else { + return fmt.Errorf("handle does not support ShutdownChan()") + } + return nil +} + // sharedProcessor has a collection of processorListener and can // distribute a notification object to its listeners. There are two // kinds of distribute operations. The sync distributions go to a @@ -1086,6 +1113,8 @@ func (p *sharedProcessor) removeListener(handle ResourceEventHandlerRegistration if p.listenersStarted { close(listener.addCh) + } else { + close(listener.runFinished) } return nil @@ -1209,10 +1238,11 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(logger klog.Logger, resyncChe // processorListener also keeps track of the adjusted requested resync // period of the listener. type processorListener struct { - logger klog.Logger - nextCh chan interface{} - addCh chan interface{} - done chan struct{} + logger klog.Logger + nextCh chan interface{} + addCh chan interface{} + done chan struct{} + runFinished chan struct{} handler ResourceEventHandler handlerName string @@ -1265,6 +1295,10 @@ func (p *processorListener) HasSyncedChecker() DoneChecker { return p.syncTracker } +func (p *processorListener) ShutdownChan() <-chan struct{} { + return p.runFinished +} + func newProcessListener(logger klog.Logger, handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced DoneChecker) *processorListener { handlerName := nameForHandler(handler) ret := &processorListener{ @@ -1272,6 +1306,7 @@ func newProcessListener(logger klog.Logger, handler ResourceEventHandler, reques nextCh: make(chan interface{}), addCh: make(chan interface{}), done: make(chan struct{}), + runFinished: make(chan struct{}), upstreamHasSynced: hasSynced, handler: handler, handlerName: handlerName, @@ -1328,6 +1363,7 @@ func (p *processorListener) pop() { } func (p *processorListener) run() { + defer close(p.runFinished) // this call blocks until the channel is closed. When a panic happens during the notification // we will catch it, **the offending item will be skipped!**, and after a short delay (one second) // the next notification will be attempted. This is usually better than the alternative of never diff --git a/tools/cache/shared_informer_test.go b/tools/cache/shared_informer_test.go index 835e6debd..a603f3667 100644 --- a/tools/cache/shared_informer_test.go +++ b/tools/cache/shared_informer_test.go @@ -1385,3 +1385,221 @@ func numOccurrences(hay, needle string) int { hay = hay[index+len(needle):] } } + +func TestRemoveEventHandler_AsynchronousPanic(t *testing.T) { + source := newFakeControllerSource(t) + informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer) + + stop := make(chan struct{}) + var wg wait.Group + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() + + scheduledPods := make(chan *v1.Pod, 1) + blockChan := make(chan struct{}) + handlerStarted := make(chan struct{}) + handlerFinished := make(chan struct{}) + var panicVal any + var panicked bool + + handler := ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + close(handlerStarted) + <-blockChan + defer close(handlerFinished) + defer func() { + if r := recover(); r != nil { + panicVal = r + panicked = true + } + }() + scheduledPods <- obj.(*v1.Pod) + }, + } + + handle, err := informer.AddEventHandler(handler) + if err != nil { + t.Fatalf("Failed to add event handler: %v", err) + } + + // Trigger the event + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + // Wait for handler to start and block + <-handlerStarted + + // Now remove the handler. This should return immediately. + err = informer.RemoveEventHandler(handle) + if err != nil { + t.Fatalf("Failed to remove event handler: %v", err) + } + + // Close the channel that the handler writes to + close(scheduledPods) + + // Unblock the handler + close(blockChan) + + // Wait for handler to finish + <-handlerFinished + + if !panicked { + t.Fatalf("Expected handler to panic (send on closed channel), but it did not") + } + t.Logf("Caught expected panic: %v", panicVal) +} + +func TestRemoveEventHandler_SynchronousShutdown(t *testing.T) { + source := newFakeControllerSource(t) + informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer) + + stop := make(chan struct{}) + var wg wait.Group + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() + + scheduledPods := make(chan *v1.Pod, 1) + blockChan := make(chan struct{}) + handlerStarted := make(chan struct{}) + handlerFinished := make(chan struct{}) + var panicVal any + var panicked bool + + handler := ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + close(handlerStarted) + <-blockChan + defer close(handlerFinished) + defer func() { + if r := recover(); r != nil { + panicVal = r + panicked = true + } + }() + scheduledPods <- obj.(*v1.Pod) + }, + } + + handle, err := informer.AddEventHandler(handler) + if err != nil { + t.Fatalf("Failed to add event handler: %v", err) + } + + // Trigger the event + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + + // Wait for handler to start and block + <-handlerStarted + + // Unblock the handler so it can run and exit + close(blockChan) + + // Now remove the handler and wait for it to fully stop. + err = ShutDownEventHandler(informer, handle) + if err != nil { + t.Fatalf("Failed to shutdown event handler: %v", err) + } + + // NOW it is safe to close the channel + close(scheduledPods) + + // Wait for handler function to finish (should already be done) + <-handlerFinished + + if panicked { + t.Fatalf("Handler panicked unexpectedly: %v", panicVal) + } +} + +func TestShutDownEventHandler_Lifecycles(t *testing.T) { + t.Run("InformerNeverStarted", func(t *testing.T) { + source := newFakeControllerSource(t) + informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer) + + handler := ResourceEventHandlerFuncs{ + AddFunc: func(obj any) {}, + } + handle, err := informer.AddEventHandler(handler) + if err != nil { + t.Fatalf("Failed to add event handler: %v", err) + } + + // Shutdown should return immediately because the informer was never started. + err = ShutDownEventHandler(informer, handle) + if err != nil { + t.Fatalf("Failed to shutdown event handler: %v", err) + } + }) + + t.Run("StartedThenAddedThenShutDown", func(t *testing.T) { + source := newFakeControllerSource(t) + informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer) + + stop := make(chan struct{}) + var wg wait.Group + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() + + // Wait deterministically for the informer to sync (proving it has started) + syncCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + if !WaitFor(syncCtx, "informer sync", informer.HasSyncedChecker()) { + t.Fatalf("Informer did not sync") + } + + handler := ResourceEventHandlerFuncs{ + AddFunc: func(obj any) {}, + } + handle, err := informer.AddEventHandler(handler) + if err != nil { + t.Fatalf("Failed to add event handler: %v", err) + } + + err = ShutDownEventHandler(informer, handle) + if err != nil { + t.Fatalf("Failed to shutdown event handler: %v", err) + } + }) + + t.Run("AddedThenStartedThenShutDown", func(t *testing.T) { + source := newFakeControllerSource(t) + informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer) + + handler := ResourceEventHandlerFuncs{ + AddFunc: func(obj any) {}, + } + handle, err := informer.AddEventHandler(handler) + if err != nil { + t.Fatalf("Failed to add event handler: %v", err) + } + + stop := make(chan struct{}) + var wg wait.Group + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() + + // Wait deterministically for the informer to sync (proving it has started) + syncCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + if !WaitFor(syncCtx, "informer sync", informer.HasSyncedChecker()) { + t.Fatalf("Informer did not sync") + } + + err = ShutDownEventHandler(informer, handle) + if err != nil { + t.Fatalf("Failed to shutdown event handler: %v", err) + } + }) +}