mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-20 18:31:15 +00:00
Merge pull request #115798 from pohly/dra-event-broadcaster
dra: avoid goroutine leaks from event broadcaster
This commit is contained in:
commit
ebae41641f
@ -118,11 +118,6 @@ func NewController(
|
|||||||
|
|
||||||
metrics.RegisterMetrics()
|
metrics.RegisterMetrics()
|
||||||
|
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
|
||||||
eventBroadcaster.StartLogging(klog.Infof)
|
|
||||||
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
|
|
||||||
ec.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "resource_claim"})
|
|
||||||
|
|
||||||
if _, err := podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
if _, err := podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: func(obj interface{}) {
|
AddFunc: func(obj interface{}) {
|
||||||
ec.enqueuePod(obj, false)
|
ec.enqueuePod(obj, false)
|
||||||
@ -235,6 +230,12 @@ func (ec *Controller) Run(ctx context.Context, workers int) {
|
|||||||
klog.Infof("Starting ephemeral volume controller")
|
klog.Infof("Starting ephemeral volume controller")
|
||||||
defer klog.Infof("Shutting down ephemeral volume controller")
|
defer klog.Infof("Shutting down ephemeral volume controller")
|
||||||
|
|
||||||
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
|
eventBroadcaster.StartLogging(klog.Infof)
|
||||||
|
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: ec.kubeClient.CoreV1().Events("")})
|
||||||
|
ec.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "resource_claim"})
|
||||||
|
defer eventBroadcaster.Shutdown()
|
||||||
|
|
||||||
if !cache.WaitForNamedCacheSync("ephemeral", ctx.Done(), ec.podSynced, ec.claimsSynced) {
|
if !cache.WaitForNamedCacheSync("ephemeral", ctx.Done(), ec.podSynced, ec.claimsSynced) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -314,7 +315,9 @@ func (ec *Controller) syncPod(ctx context.Context, namespace, name string) error
|
|||||||
|
|
||||||
for _, podClaim := range pod.Spec.ResourceClaims {
|
for _, podClaim := range pod.Spec.ResourceClaims {
|
||||||
if err := ec.handleClaim(ctx, pod, podClaim); err != nil {
|
if err := ec.handleClaim(ctx, pod, podClaim); err != nil {
|
||||||
ec.recorder.Event(pod, v1.EventTypeWarning, "FailedResourceClaimCreation", fmt.Sprintf("PodResourceClaim %s: %v", podClaim.Name, err))
|
if ec.recorder != nil {
|
||||||
|
ec.recorder.Event(pod, v1.EventTypeWarning, "FailedResourceClaimCreation", fmt.Sprintf("PodResourceClaim %s: %v", podClaim.Name, err))
|
||||||
|
}
|
||||||
return fmt.Errorf("pod %s/%s, PodResourceClaim %s: %v", namespace, name, podClaim.Name, err)
|
return fmt.Errorf("pod %s/%s, PodResourceClaim %s: %v", namespace, name, podClaim.Name, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -160,6 +160,10 @@ func New(
|
|||||||
podSchedulingInformer := informerFactory.Resource().V1alpha1().PodSchedulings()
|
podSchedulingInformer := informerFactory.Resource().V1alpha1().PodSchedulings()
|
||||||
|
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
|
go func() {
|
||||||
|
<-ctx.Done()
|
||||||
|
eventBroadcaster.Shutdown()
|
||||||
|
}()
|
||||||
// TODO: use contextual logging in eventBroadcaster once it
|
// TODO: use contextual logging in eventBroadcaster once it
|
||||||
// supports it. There is a StartStructuredLogging API, but it
|
// supports it. There is a StartStructuredLogging API, but it
|
||||||
// uses the global klog, which is worse than redirecting an unstructured
|
// uses the global klog, which is worse than redirecting an unstructured
|
||||||
|
Loading…
Reference in New Issue
Block a user