mirror of
https://github.com/kubernetes/client-go.git
synced 2026-07-01 14:35:35 +00:00
Expose a way to wait for a controller listener to be fully shutdown
Kubernetes-commit: 338db9dde41f09aa094b61ef128e2d11327960ca
This commit is contained in:
committed by
Kubernetes Publisher
parent
f226f2f6e6
commit
ebfbc6844a
44
tools/cache/shared_informer.go
vendored
44
tools/cache/shared_informer.go
vendored
@@ -177,6 +177,15 @@ type SharedInformer interface {
|
||||
// RemoveEventHandler removes a formerly added event handler given by
|
||||
// its registration handle.
|
||||
// This function is guaranteed to be idempotent, and thread-safe.
|
||||
//
|
||||
// Note: RemoveEventHandler is asynchronous. It stops queueing new events
|
||||
// but does not wait for already-queued events to finish executing.
|
||||
// Goroutines processing the remaining events may still be running and
|
||||
// invoking callbacks after this function returns.
|
||||
//
|
||||
// If the caller needs to wait for all handlers to finish executing (for
|
||||
// example, to safely close channels or release resources used by the handler),
|
||||
// they should use [ShutDownEventHandler].
|
||||
RemoveEventHandler(handle ResourceEventHandlerRegistration) error
|
||||
// GetStore returns the informer's local cache as a Store.
|
||||
GetStore() Store
|
||||
@@ -1014,6 +1023,24 @@ func (s *sharedIndexInformer) RemoveEventHandler(handle ResourceEventHandlerRegi
|
||||
return s.processor.removeListener(handle)
|
||||
}
|
||||
|
||||
// ShutDownEventHandler removes the event handler and blocks until it has fully
|
||||
// stopped processing events.
|
||||
//
|
||||
// Like RemoveEventHandler, it is idempotent and thread-safe. However, it MUST NOT
|
||||
// be called from within the event handler's own callbacks, as that will result
|
||||
// in a deadlock.
|
||||
func ShutDownEventHandler(informer SharedInformer, handle ResourceEventHandlerRegistration) error {
|
||||
if err := informer.RemoveEventHandler(handle); err != nil {
|
||||
return err
|
||||
}
|
||||
if s, ok := handle.(interface{ ShutdownChan() <-chan struct{} }); ok {
|
||||
<-s.ShutdownChan()
|
||||
} else {
|
||||
return fmt.Errorf("handle does not support ShutdownChan()")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// sharedProcessor has a collection of processorListener and can
|
||||
// distribute a notification object to its listeners. There are two
|
||||
// kinds of distribute operations. The sync distributions go to a
|
||||
@@ -1086,6 +1113,8 @@ func (p *sharedProcessor) removeListener(handle ResourceEventHandlerRegistration
|
||||
|
||||
if p.listenersStarted {
|
||||
close(listener.addCh)
|
||||
} else {
|
||||
close(listener.runFinished)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -1209,10 +1238,11 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(logger klog.Logger, resyncChe
|
||||
// processorListener also keeps track of the adjusted requested resync
|
||||
// period of the listener.
|
||||
type processorListener struct {
|
||||
logger klog.Logger
|
||||
nextCh chan interface{}
|
||||
addCh chan interface{}
|
||||
done chan struct{}
|
||||
logger klog.Logger
|
||||
nextCh chan interface{}
|
||||
addCh chan interface{}
|
||||
done chan struct{}
|
||||
runFinished chan struct{}
|
||||
|
||||
handler ResourceEventHandler
|
||||
handlerName string
|
||||
@@ -1265,6 +1295,10 @@ func (p *processorListener) HasSyncedChecker() DoneChecker {
|
||||
return p.syncTracker
|
||||
}
|
||||
|
||||
func (p *processorListener) ShutdownChan() <-chan struct{} {
|
||||
return p.runFinished
|
||||
}
|
||||
|
||||
func newProcessListener(logger klog.Logger, handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced DoneChecker) *processorListener {
|
||||
handlerName := nameForHandler(handler)
|
||||
ret := &processorListener{
|
||||
@@ -1272,6 +1306,7 @@ func newProcessListener(logger klog.Logger, handler ResourceEventHandler, reques
|
||||
nextCh: make(chan interface{}),
|
||||
addCh: make(chan interface{}),
|
||||
done: make(chan struct{}),
|
||||
runFinished: make(chan struct{}),
|
||||
upstreamHasSynced: hasSynced,
|
||||
handler: handler,
|
||||
handlerName: handlerName,
|
||||
@@ -1328,6 +1363,7 @@ func (p *processorListener) pop() {
|
||||
}
|
||||
|
||||
func (p *processorListener) run() {
|
||||
defer close(p.runFinished)
|
||||
// this call blocks until the channel is closed. When a panic happens during the notification
|
||||
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
|
||||
// the next notification will be attempted. This is usually better than the alternative of never
|
||||
|
||||
218
tools/cache/shared_informer_test.go
vendored
218
tools/cache/shared_informer_test.go
vendored
@@ -1385,3 +1385,221 @@ func numOccurrences(hay, needle string) int {
|
||||
hay = hay[index+len(needle):]
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoveEventHandler_AsynchronousPanic(t *testing.T) {
|
||||
source := newFakeControllerSource(t)
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer)
|
||||
|
||||
stop := make(chan struct{})
|
||||
var wg wait.Group
|
||||
wg.StartWithChannel(stop, informer.Run)
|
||||
defer func() {
|
||||
close(stop)
|
||||
wg.Wait()
|
||||
}()
|
||||
|
||||
scheduledPods := make(chan *v1.Pod, 1)
|
||||
blockChan := make(chan struct{})
|
||||
handlerStarted := make(chan struct{})
|
||||
handlerFinished := make(chan struct{})
|
||||
var panicVal any
|
||||
var panicked bool
|
||||
|
||||
handler := ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj any) {
|
||||
close(handlerStarted)
|
||||
<-blockChan
|
||||
defer close(handlerFinished)
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
panicVal = r
|
||||
panicked = true
|
||||
}
|
||||
}()
|
||||
scheduledPods <- obj.(*v1.Pod)
|
||||
},
|
||||
}
|
||||
|
||||
handle, err := informer.AddEventHandler(handler)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to add event handler: %v", err)
|
||||
}
|
||||
|
||||
// Trigger the event
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
|
||||
// Wait for handler to start and block
|
||||
<-handlerStarted
|
||||
|
||||
// Now remove the handler. This should return immediately.
|
||||
err = informer.RemoveEventHandler(handle)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to remove event handler: %v", err)
|
||||
}
|
||||
|
||||
// Close the channel that the handler writes to
|
||||
close(scheduledPods)
|
||||
|
||||
// Unblock the handler
|
||||
close(blockChan)
|
||||
|
||||
// Wait for handler to finish
|
||||
<-handlerFinished
|
||||
|
||||
if !panicked {
|
||||
t.Fatalf("Expected handler to panic (send on closed channel), but it did not")
|
||||
}
|
||||
t.Logf("Caught expected panic: %v", panicVal)
|
||||
}
|
||||
|
||||
func TestRemoveEventHandler_SynchronousShutdown(t *testing.T) {
|
||||
source := newFakeControllerSource(t)
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer)
|
||||
|
||||
stop := make(chan struct{})
|
||||
var wg wait.Group
|
||||
wg.StartWithChannel(stop, informer.Run)
|
||||
defer func() {
|
||||
close(stop)
|
||||
wg.Wait()
|
||||
}()
|
||||
|
||||
scheduledPods := make(chan *v1.Pod, 1)
|
||||
blockChan := make(chan struct{})
|
||||
handlerStarted := make(chan struct{})
|
||||
handlerFinished := make(chan struct{})
|
||||
var panicVal any
|
||||
var panicked bool
|
||||
|
||||
handler := ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj any) {
|
||||
close(handlerStarted)
|
||||
<-blockChan
|
||||
defer close(handlerFinished)
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
panicVal = r
|
||||
panicked = true
|
||||
}
|
||||
}()
|
||||
scheduledPods <- obj.(*v1.Pod)
|
||||
},
|
||||
}
|
||||
|
||||
handle, err := informer.AddEventHandler(handler)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to add event handler: %v", err)
|
||||
}
|
||||
|
||||
// Trigger the event
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
|
||||
// Wait for handler to start and block
|
||||
<-handlerStarted
|
||||
|
||||
// Unblock the handler so it can run and exit
|
||||
close(blockChan)
|
||||
|
||||
// Now remove the handler and wait for it to fully stop.
|
||||
err = ShutDownEventHandler(informer, handle)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to shutdown event handler: %v", err)
|
||||
}
|
||||
|
||||
// NOW it is safe to close the channel
|
||||
close(scheduledPods)
|
||||
|
||||
// Wait for handler function to finish (should already be done)
|
||||
<-handlerFinished
|
||||
|
||||
if panicked {
|
||||
t.Fatalf("Handler panicked unexpectedly: %v", panicVal)
|
||||
}
|
||||
}
|
||||
|
||||
func TestShutDownEventHandler_Lifecycles(t *testing.T) {
|
||||
t.Run("InformerNeverStarted", func(t *testing.T) {
|
||||
source := newFakeControllerSource(t)
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer)
|
||||
|
||||
handler := ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj any) {},
|
||||
}
|
||||
handle, err := informer.AddEventHandler(handler)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to add event handler: %v", err)
|
||||
}
|
||||
|
||||
// Shutdown should return immediately because the informer was never started.
|
||||
err = ShutDownEventHandler(informer, handle)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to shutdown event handler: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("StartedThenAddedThenShutDown", func(t *testing.T) {
|
||||
source := newFakeControllerSource(t)
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer)
|
||||
|
||||
stop := make(chan struct{})
|
||||
var wg wait.Group
|
||||
wg.StartWithChannel(stop, informer.Run)
|
||||
defer func() {
|
||||
close(stop)
|
||||
wg.Wait()
|
||||
}()
|
||||
|
||||
// Wait deterministically for the informer to sync (proving it has started)
|
||||
syncCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
if !WaitFor(syncCtx, "informer sync", informer.HasSyncedChecker()) {
|
||||
t.Fatalf("Informer did not sync")
|
||||
}
|
||||
|
||||
handler := ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj any) {},
|
||||
}
|
||||
handle, err := informer.AddEventHandler(handler)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to add event handler: %v", err)
|
||||
}
|
||||
|
||||
err = ShutDownEventHandler(informer, handle)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to shutdown event handler: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("AddedThenStartedThenShutDown", func(t *testing.T) {
|
||||
source := newFakeControllerSource(t)
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer)
|
||||
|
||||
handler := ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj any) {},
|
||||
}
|
||||
handle, err := informer.AddEventHandler(handler)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to add event handler: %v", err)
|
||||
}
|
||||
|
||||
stop := make(chan struct{})
|
||||
var wg wait.Group
|
||||
wg.StartWithChannel(stop, informer.Run)
|
||||
defer func() {
|
||||
close(stop)
|
||||
wg.Wait()
|
||||
}()
|
||||
|
||||
// Wait deterministically for the informer to sync (proving it has started)
|
||||
syncCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
if !WaitFor(syncCtx, "informer sync", informer.HasSyncedChecker()) {
|
||||
t.Fatalf("Informer did not sync")
|
||||
}
|
||||
|
||||
err = ShutDownEventHandler(informer, handle)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to shutdown event handler: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user