From ed74779a0f945d1d24183276d192cd50693cc041 Mon Sep 17 00:00:00 2001 From: Ondra Kupka Date: Wed, 29 Oct 2025 11:45:09 +0100 Subject: [PATCH] controller/volume/persistentvolume: Improve goroutine mgmt Make sure all threads are terminated when Run returns. --- .../persistentvolume/pv_controller_base.go | 26 +++++++++++++------ .../persistentvolume/pv_controller_test.go | 11 ++++++-- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/pkg/controller/volume/persistentvolume/pv_controller_base.go b/pkg/controller/volume/persistentvolume/pv_controller_base.go index d4a2522b9b4..e8628a41be9 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller_base.go +++ b/pkg/controller/volume/persistentvolume/pv_controller_base.go @@ -21,6 +21,7 @@ import ( "fmt" "strconv" "strings" + "sync" "time" v1 "k8s.io/api/core/v1" @@ -296,8 +297,6 @@ func (ctrl *PersistentVolumeController) deleteClaim(ctx context.Context, claim * // Run starts all of this controller's control loops func (ctrl *PersistentVolumeController) Run(ctx context.Context) { defer utilruntime.HandleCrash() - defer ctrl.claimQueue.ShutDown() - defer ctrl.volumeQueue.ShutDown() // Start events processing pipeline. ctrl.eventBroadcaster.StartStructuredLogging(3) @@ -306,20 +305,31 @@ func (ctrl *PersistentVolumeController) Run(ctx context.Context) { logger := klog.FromContext(ctx) logger.Info("Starting persistent volume controller") - defer logger.Info("Shutting down persistent volume controller") + + var wg sync.WaitGroup + defer func() { + logger.Info("Shutting down persistent volume controller") + ctrl.claimQueue.ShutDown() + ctrl.volumeQueue.ShutDown() + wg.Wait() + }() if !cache.WaitForNamedCacheSyncWithContext(ctx, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) { return } ctrl.initializeCaches(logger, ctrl.volumeLister, ctrl.claimLister) - - go wait.Until(func() { ctrl.resync(ctx) }, ctrl.resyncPeriod, ctx.Done()) - go wait.UntilWithContext(ctx, ctrl.volumeWorker, time.Second) - go wait.UntilWithContext(ctx, ctrl.claimWorker, time.Second) - metrics.Register(ctrl.volumes.store, ctrl.claims, &ctrl.volumePluginMgr) + wg.Go(func() { + wait.Until(func() { ctrl.resync(ctx) }, ctrl.resyncPeriod, ctx.Done()) + }) + wg.Go(func() { + wait.UntilWithContext(ctx, ctrl.volumeWorker, time.Second) + }) + wg.Go(func() { + wait.UntilWithContext(ctx, ctrl.claimWorker, time.Second) + }) <-ctx.Done() } diff --git a/pkg/controller/volume/persistentvolume/pv_controller_test.go b/pkg/controller/volume/persistentvolume/pv_controller_test.go index 8414c28fc57..3618b1b11a0 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller_test.go +++ b/pkg/controller/volume/persistentvolume/pv_controller_test.go @@ -20,6 +20,7 @@ import ( "context" "errors" "reflect" + "sync" "testing" "time" @@ -361,10 +362,17 @@ func TestControllerSync(t *testing.T) { } // Start the controller + var wg sync.WaitGroup + defer wg.Wait() ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + informers.Start(ctx.Done()) informers.WaitForCacheSync(ctx.Done()) - go ctrl.Run(ctx) + + wg.Go(func() { + ctrl.Run(ctx) + }) // Wait for the controller to pass initial sync and fill its caches. err = wait.Poll(10*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { @@ -389,7 +397,6 @@ func TestControllerSync(t *testing.T) { if err != nil { t.Errorf("Failed to run test %s: %v", test.name, err) } - cancel() evaluateTestResults(ctx, ctrl, reactor.VolumeReactor, test, t) }