controller/volume/persistentvolume: Improve goroutine mgmt

Make sure all threads are terminated when Run returns.
This commit is contained in:
Ondra Kupka
2025-10-29 11:45:09 +01:00
parent 8eab454e38
commit ed74779a0f
2 changed files with 27 additions and 10 deletions

View File

@@ -21,6 +21,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"time"
v1 "k8s.io/api/core/v1"
@@ -296,8 +297,6 @@ func (ctrl *PersistentVolumeController) deleteClaim(ctx context.Context, claim *
// Run starts all of this controller's control loops
func (ctrl *PersistentVolumeController) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
defer ctrl.claimQueue.ShutDown()
defer ctrl.volumeQueue.ShutDown()
// Start events processing pipeline.
ctrl.eventBroadcaster.StartStructuredLogging(3)
@@ -306,20 +305,31 @@ func (ctrl *PersistentVolumeController) Run(ctx context.Context) {
logger := klog.FromContext(ctx)
logger.Info("Starting persistent volume controller")
defer logger.Info("Shutting down persistent volume controller")
var wg sync.WaitGroup
defer func() {
logger.Info("Shutting down persistent volume controller")
ctrl.claimQueue.ShutDown()
ctrl.volumeQueue.ShutDown()
wg.Wait()
}()
if !cache.WaitForNamedCacheSyncWithContext(ctx, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) {
return
}
ctrl.initializeCaches(logger, ctrl.volumeLister, ctrl.claimLister)
go wait.Until(func() { ctrl.resync(ctx) }, ctrl.resyncPeriod, ctx.Done())
go wait.UntilWithContext(ctx, ctrl.volumeWorker, time.Second)
go wait.UntilWithContext(ctx, ctrl.claimWorker, time.Second)
metrics.Register(ctrl.volumes.store, ctrl.claims, &ctrl.volumePluginMgr)
wg.Go(func() {
wait.Until(func() { ctrl.resync(ctx) }, ctrl.resyncPeriod, ctx.Done())
})
wg.Go(func() {
wait.UntilWithContext(ctx, ctrl.volumeWorker, time.Second)
})
wg.Go(func() {
wait.UntilWithContext(ctx, ctrl.claimWorker, time.Second)
})
<-ctx.Done()
}

View File

@@ -20,6 +20,7 @@ import (
"context"
"errors"
"reflect"
"sync"
"testing"
"time"
@@ -361,10 +362,17 @@ func TestControllerSync(t *testing.T) {
}
// Start the controller
var wg sync.WaitGroup
defer wg.Wait()
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
informers.Start(ctx.Done())
informers.WaitForCacheSync(ctx.Done())
go ctrl.Run(ctx)
wg.Go(func() {
ctrl.Run(ctx)
})
// Wait for the controller to pass initial sync and fill its caches.
err = wait.Poll(10*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
@@ -389,7 +397,6 @@ func TestControllerSync(t *testing.T) {
if err != nil {
t.Errorf("Failed to run test %s: %v", test.name, err)
}
cancel()
evaluateTestResults(ctx, ctrl, reactor.VolumeReactor, test, t)
}