diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go index e58c4adfe2c..4475f3cbf7a 100644 --- a/pkg/controller/disruption/disruption.go +++ b/pkg/controller/disruption/disruption.go @@ -19,6 +19,7 @@ package disruption import ( "context" "fmt" + "sync" "time" apps "k8s.io/api/apps/v1beta1" @@ -461,21 +462,30 @@ func (dc *DisruptionController) Run(ctx context.Context) { } defer dc.broadcaster.Shutdown() - defer dc.queue.ShutDown() - defer dc.recheckQueue.ShutDown() - defer dc.stalePodDisruptionQueue.ShutDown() - logger.Info("Starting disruption controller") - defer logger.Info("Shutting down disruption controller") + + var wg sync.WaitGroup + defer func() { + logger.Info("Shutting down disruption controller") + dc.queue.ShutDown() + dc.recheckQueue.ShutDown() + dc.stalePodDisruptionQueue.ShutDown() + wg.Wait() + }() if !cache.WaitForNamedCacheSyncWithContext(ctx, dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) { return } - go wait.UntilWithContext(ctx, dc.worker, time.Second) - go wait.Until(dc.recheckWorker, time.Second, ctx.Done()) - go wait.UntilWithContext(ctx, dc.stalePodDisruptionWorker, time.Second) - + wg.Go(func() { + wait.UntilWithContext(ctx, dc.worker, time.Second) + }) + wg.Go(func() { + wait.Until(dc.recheckWorker, time.Second, ctx.Done()) + }) + wg.Go(func() { + wait.UntilWithContext(ctx, dc.stalePodDisruptionWorker, time.Second) + }) <-ctx.Done() }