diff --git a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go index b0ea2240fea..f60dcc17663 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go @@ -93,7 +93,9 @@ var UpdateNodeSpecBackoff = wait.Backoff{ type CloudNodeController struct { nodeInformer coreinformers.NodeInformer kubeClient clientset.Interface - recorder record.EventRecorder + + broadcaster record.EventBroadcaster + recorder record.EventRecorder cloud cloudprovider.Interface @@ -113,10 +115,6 @@ func NewCloudNodeController( eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}) - eventBroadcaster.StartStructuredLogging(0) - - klog.Infof("Sending events to api server.") - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) _, instancesSupported := cloud.Instances() _, instancesV2Supported := cloud.InstancesV2() @@ -127,6 +125,7 @@ func NewCloudNodeController( cnc := &CloudNodeController{ nodeInformer: nodeInformer, kubeClient: kubeClient, + broadcaster: eventBroadcaster, recorder: recorder, cloud: cloud, nodeStatusUpdateFrequency: nodeStatusUpdateFrequency, @@ -153,6 +152,12 @@ func (cnc *CloudNodeController) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer cnc.workqueue.ShutDown() + // Start event processing pipeline. + klog.Infof("Sending events to api server.") + cnc.broadcaster.StartStructuredLogging(0) + cnc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: cnc.kubeClient.CoreV1().Events("")}) + defer cnc.broadcaster.Shutdown() + // Wait for the caches to be synced before starting workers klog.Info("Waiting for informer caches to sync") if ok := cache.WaitForCacheSync(stopCh, cnc.nodesSynced); !ok { diff --git a/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go b/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go index 17095b2b78f..aa6601facd7 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go @@ -55,7 +55,9 @@ var ShutdownTaint = &v1.Taint{ type CloudNodeLifecycleController struct { kubeClient clientset.Interface nodeLister v1lister.NodeLister - recorder record.EventRecorder + + broadcaster record.EventBroadcaster + recorder record.EventRecorder cloud cloudprovider.Interface @@ -73,10 +75,6 @@ func NewCloudNodeLifecycleController( eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-lifecycle-controller"}) - eventBroadcaster.StartStructuredLogging(0) - - klog.Info("Sending events to api server") - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) if kubeClient == nil { return nil, errors.New("kubernetes client is nil") @@ -95,6 +93,7 @@ func NewCloudNodeLifecycleController( c := &CloudNodeLifecycleController{ kubeClient: kubeClient, nodeLister: nodeInformer.Lister(), + broadcaster: eventBroadcaster, recorder: recorder, cloud: cloud, nodeMonitorPeriod: nodeMonitorPeriod, @@ -110,6 +109,12 @@ func (c *CloudNodeLifecycleController) Run(ctx context.Context, controllerManage controllerManagerMetrics.ControllerStarted("cloud-node-lifecycle") defer controllerManagerMetrics.ControllerStopped("cloud-node-lifecycle") + // Start event processing pipeline. + klog.Info("Sending events to api server") + c.broadcaster.StartStructuredLogging(0) + c.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events("")}) + defer c.broadcaster.Shutdown() + // The following loops run communicate with the APIServer with a worst case complexity // of O(num_nodes) per cycle. These functions are justified here because these events fire // very infrequently. DO NOT MODIFY this to perform frequent operations. diff --git a/staging/src/k8s.io/cloud-provider/controllers/route/route_controller.go b/staging/src/k8s.io/cloud-provider/controllers/route/route_controller.go index 65f18fc41b0..a98881e2a31 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/route/route_controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/route/route_controller.go @@ -78,7 +78,6 @@ func New(routes cloudprovider.Routes, kubeClient clientset.Interface, nodeInform } eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartStructuredLogging(0) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "route_controller"}) rc := &RouteController{ @@ -98,6 +97,13 @@ func New(routes cloudprovider.Routes, kubeClient clientset.Interface, nodeInform func (rc *RouteController) Run(ctx context.Context, syncPeriod time.Duration) { defer utilruntime.HandleCrash() + // Start event processing pipeline. + if rc.broadcaster != nil { + rc.broadcaster.StartStructuredLogging(0) + rc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: rc.kubeClient.CoreV1().Events("")}) + defer rc.broadcaster.Shutdown() + } + klog.Info("Starting route controller") defer klog.Info("Shutting down route controller") @@ -105,10 +111,6 @@ func (rc *RouteController) Run(ctx context.Context, syncPeriod time.Duration) { return } - if rc.broadcaster != nil { - rc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: rc.kubeClient.CoreV1().Events("")}) - } - // TODO: If we do just the full Resync every 5 minutes (default value) // that means that we may wait up to 5 minutes before even starting // creating a route for it. This is bad. diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go index ae717e6bd50..0a403c9fae9 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go @@ -110,8 +110,6 @@ func New( featureGate featuregate.FeatureGate, ) (*Controller, error) { broadcaster := record.NewBroadcaster() - broadcaster.StartStructuredLogging(0) - broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"}) if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { @@ -230,6 +228,11 @@ func (s *Controller) Run(ctx context.Context, workers int) { defer runtime.HandleCrash() defer s.queue.ShutDown() + // Start event processing pipeline. + s.eventBroadcaster.StartStructuredLogging(0) + s.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.kubeClient.CoreV1().Events("")}) + defer s.eventBroadcaster.Shutdown() + klog.Info("Starting service controller") defer klog.Info("Shutting down service controller") @@ -287,11 +290,15 @@ func (s *Controller) worker(ctx context.Context) { // nodeSyncLoop takes nodeSync signal and triggers nodeSync func (s *Controller) nodeSyncLoop(ctx context.Context, workers int) { klog.V(4).Info("nodeSyncLoop Started") - for range s.nodeSyncCh { - klog.V(4).Info("nodeSync has been triggered") - s.nodeSyncInternal(ctx, workers) + for { + select { + case <-s.nodeSyncCh: + klog.V(4).Info("nodeSync has been triggered") + s.nodeSyncInternal(ctx, workers) + case <-ctx.Done(): + return + } } - klog.V(2).Info("s.nodeSyncCh is closed. Exiting nodeSyncLoop") } func (s *Controller) processNextWorkItem(ctx context.Context) bool {