From 0e1139d027067af47f7121a45bf1d613f316e1ed Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 14 Feb 2023 19:35:44 +0100 Subject: [PATCH] dra: avoid goroutine leaks from event broadcaster When using these controllers in test/integration/scheduler_perf, the goroutine leak check there pointed out that broadcaster.Shutdown function wasn't called and thus goroutines leaked during a test. --- pkg/controller/resourceclaim/controller.go | 15 +++++++++------ .../controller/controller.go | 4 ++++ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/pkg/controller/resourceclaim/controller.go b/pkg/controller/resourceclaim/controller.go index 6a484b90037..ee2321051b0 100644 --- a/pkg/controller/resourceclaim/controller.go +++ b/pkg/controller/resourceclaim/controller.go @@ -118,11 +118,6 @@ func NewController( metrics.RegisterMetrics() - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(klog.Infof) - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) - ec.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "resource_claim"}) - if _, err := podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { ec.enqueuePod(obj, false) @@ -235,6 +230,12 @@ func (ec *Controller) Run(ctx context.Context, workers int) { klog.Infof("Starting ephemeral volume controller") defer klog.Infof("Shutting down ephemeral volume controller") + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(klog.Infof) + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: ec.kubeClient.CoreV1().Events("")}) + ec.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "resource_claim"}) + defer eventBroadcaster.Shutdown() + if !cache.WaitForNamedCacheSync("ephemeral", ctx.Done(), ec.podSynced, ec.claimsSynced) { return } @@ -314,7 +315,9 @@ func (ec *Controller) syncPod(ctx context.Context, namespace, name string) error for _, podClaim := range pod.Spec.ResourceClaims { if err := ec.handleClaim(ctx, pod, podClaim); err != nil { - ec.recorder.Event(pod, v1.EventTypeWarning, "FailedResourceClaimCreation", fmt.Sprintf("PodResourceClaim %s: %v", podClaim.Name, err)) + if ec.recorder != nil { + ec.recorder.Event(pod, v1.EventTypeWarning, "FailedResourceClaimCreation", fmt.Sprintf("PodResourceClaim %s: %v", podClaim.Name, err)) + } return fmt.Errorf("pod %s/%s, PodResourceClaim %s: %v", namespace, name, podClaim.Name, err) } } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/controller/controller.go b/staging/src/k8s.io/dynamic-resource-allocation/controller/controller.go index c59724123a6..681fcb798a7 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/controller/controller.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/controller/controller.go @@ -160,6 +160,10 @@ func New( podSchedulingInformer := informerFactory.Resource().V1alpha1().PodSchedulings() eventBroadcaster := record.NewBroadcaster() + go func() { + <-ctx.Done() + eventBroadcaster.Shutdown() + }() // TODO: use contextual logging in eventBroadcaster once it // supports it. There is a StartStructuredLogging API, but it // uses the global klog, which is worse than redirecting an unstructured