controller/disruption: Improve goroutine mgmt

Make sure all threads are terminated when Run returns.
This commit is contained in:
Ondra Kupka
2025-10-27 12:10:07 +01:00
parent 6e0a4da2f6
commit d9ba92ba3b

View File

@@ -19,6 +19,7 @@ package disruption
import (
"context"
"fmt"
"sync"
"time"
apps "k8s.io/api/apps/v1beta1"
@@ -461,21 +462,30 @@ func (dc *DisruptionController) Run(ctx context.Context) {
}
defer dc.broadcaster.Shutdown()
defer dc.queue.ShutDown()
defer dc.recheckQueue.ShutDown()
defer dc.stalePodDisruptionQueue.ShutDown()
logger.Info("Starting disruption controller")
defer logger.Info("Shutting down disruption controller")
var wg sync.WaitGroup
defer func() {
logger.Info("Shutting down disruption controller")
dc.queue.ShutDown()
dc.recheckQueue.ShutDown()
dc.stalePodDisruptionQueue.ShutDown()
wg.Wait()
}()
if !cache.WaitForNamedCacheSyncWithContext(ctx, dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) {
return
}
go wait.UntilWithContext(ctx, dc.worker, time.Second)
go wait.Until(dc.recheckWorker, time.Second, ctx.Done())
go wait.UntilWithContext(ctx, dc.stalePodDisruptionWorker, time.Second)
wg.Go(func() {
wait.UntilWithContext(ctx, dc.worker, time.Second)
})
wg.Go(func() {
wait.Until(dc.recheckWorker, time.Second, ctx.Done())
})
wg.Go(func() {
wait.UntilWithContext(ctx, dc.stalePodDisruptionWorker, time.Second)
})
<-ctx.Done()
}