From ccd35f7c5eed4e22dbaef43b9761a83b18120f24 Mon Sep 17 00:00:00 2001 From: Ondra Kupka Date: Mon, 27 Oct 2025 12:24:51 +0100 Subject: [PATCH] controller/endpoint: Improve goroutine mgmt Make sure all threads are terminated when Run returns. --- .../endpoint/endpoints_controller.go | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 08882a32aaf..fe1c5f40832 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "math" + "sync" "time" v1 "k8s.io/api/core/v1" @@ -188,27 +189,32 @@ func (e *Controller) Run(ctx context.Context, workers int) { e.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: e.client.CoreV1().Events("")}) defer e.eventBroadcaster.Shutdown() - defer e.queue.ShutDown() - defer e.podQueue.ShutDown() - logger := klog.FromContext(ctx) logger.Info("Starting endpoint controller") - defer logger.Info("Shutting down endpoint controller") + + var wg sync.WaitGroup + defer func() { + logger.Info("Shutting down endpoint controller") + e.queue.ShutDown() + e.podQueue.ShutDown() + wg.Wait() + }() if !cache.WaitForNamedCacheSyncWithContext(ctx, e.podsSynced, e.servicesSynced, e.endpointsSynced) { return } for i := 0; i < workers; i++ { - go wait.UntilWithContext(ctx, e.worker, e.workerLoopPeriod) - go wait.UntilWithContext(ctx, e.podWorker, e.workerLoopPeriod) + wg.Go(func() { + wait.UntilWithContext(ctx, e.worker, e.workerLoopPeriod) + }) + wg.Go(func() { + wait.UntilWithContext(ctx, e.podWorker, e.workerLoopPeriod) + }) } - - go func() { - defer utilruntime.HandleCrash() + wg.Go(func() { e.checkLeftoverEndpoints() - }() - + }) <-ctx.Done() }