controller/endpoint: Improve goroutine mgmt

Make sure all threads are terminated when Run returns.
This commit is contained in:
Ondra Kupka
2025-10-27 12:24:51 +01:00
parent d9ba92ba3b
commit ccd35f7c5e

View File

@@ -20,6 +20,7 @@ import (
"context"
"fmt"
"math"
"sync"
"time"
v1 "k8s.io/api/core/v1"
@@ -188,27 +189,32 @@ func (e *Controller) Run(ctx context.Context, workers int) {
e.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: e.client.CoreV1().Events("")})
defer e.eventBroadcaster.Shutdown()
defer e.queue.ShutDown()
defer e.podQueue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting endpoint controller")
defer logger.Info("Shutting down endpoint controller")
var wg sync.WaitGroup
defer func() {
logger.Info("Shutting down endpoint controller")
e.queue.ShutDown()
e.podQueue.ShutDown()
wg.Wait()
}()
if !cache.WaitForNamedCacheSyncWithContext(ctx, e.podsSynced, e.servicesSynced, e.endpointsSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, e.worker, e.workerLoopPeriod)
go wait.UntilWithContext(ctx, e.podWorker, e.workerLoopPeriod)
wg.Go(func() {
wait.UntilWithContext(ctx, e.worker, e.workerLoopPeriod)
})
wg.Go(func() {
wait.UntilWithContext(ctx, e.podWorker, e.workerLoopPeriod)
})
}
go func() {
defer utilruntime.HandleCrash()
wg.Go(func() {
e.checkLeftoverEndpoints()
}()
})
<-ctx.Done()
}