controller/volume/expand: Improve goroutine mgmt

Make sure all threads are terminated when Run returns.
This commit is contained in:
Ondra Kupka
2025-10-28 20:06:01 +01:00
parent 27774052ab
commit 8eab454e38

View File

@@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"
"k8s.io/klog/v2"
@@ -322,19 +323,26 @@ func (expc *expandController) expand(logger klog.Logger, pvc *v1.PersistentVolum
// TODO make concurrency configurable (workers argument). previously, nestedpendingoperations spawned unlimited goroutines
func (expc *expandController) Run(ctx context.Context) {
defer runtime.HandleCrash()
defer expc.queue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting expand controller")
defer logger.Info("Shutting down expand controller")
var wg sync.WaitGroup
defer func() {
logger.Info("Shutting down expand controller")
expc.queue.ShutDown()
wg.Wait()
}()
if !cache.WaitForNamedCacheSyncWithContext(ctx, expc.pvcsSynced) {
return
}
for i := 0; i < defaultWorkerCount; i++ {
go wait.UntilWithContext(ctx, expc.runWorker, time.Second)
wg.Go(func() {
wait.UntilWithContext(ctx, expc.runWorker, time.Second)
})
}
<-ctx.Done()
}