diff --git a/pkg/controller/nodelifecycle/node_lifecycle_controller.go b/pkg/controller/nodelifecycle/node_lifecycle_controller.go index cfb07d1872d..4394f45ff80 100644 --- a/pkg/controller/nodelifecycle/node_lifecycle_controller.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller.go @@ -300,7 +300,8 @@ type Controller struct { getPodsAssignedToNode func(nodeName string) ([]*v1.Pod, error) - recorder record.EventRecorder + broadcaster record.EventBroadcaster + recorder record.EventRecorder // Value controlling Controller monitoring period, i.e. how often does Controller // check node health signal posted from kubelet. This value should be lower than @@ -372,13 +373,6 @@ func NewNodeLifecycleController( eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "node-controller"}) - eventBroadcaster.StartStructuredLogging(0) - - klog.Infof("Sending events to api server.") - eventBroadcaster.StartRecordingToSink( - &v1core.EventSinkImpl{ - Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events(""), - }) if kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { ratelimiter.RegisterMetricAndTrackRateLimiterUsage("node_lifecycle_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()) @@ -390,6 +384,7 @@ func NewNodeLifecycleController( knownNodeSet: make(map[string]*v1.Node), nodeHealthMap: newNodeHealthMap(), nodeEvictionMap: newNodeEvictionMap(), + broadcaster: eventBroadcaster, recorder: recorder, nodeMonitorPeriod: nodeMonitorPeriod, nodeStartupGracePeriod: nodeStartupGracePeriod, @@ -536,6 +531,19 @@ func NewNodeLifecycleController( func (nc *Controller) Run(ctx context.Context) { defer utilruntime.HandleCrash() + // Start events processing pipeline. + nc.broadcaster.StartStructuredLogging(0) + klog.Infof("Sending events to api server.") + nc.broadcaster.StartRecordingToSink( + &v1core.EventSinkImpl{ + Interface: v1core.New(nc.kubeClient.CoreV1().RESTClient()).Events(""), + }) + defer nc.broadcaster.Shutdown() + + // Close node update queue to cleanup go routine. + defer nc.nodeUpdateQueue.ShutDown() + defer nc.podUpdateQueue.ShutDown() + klog.Infof("Starting node controller") defer klog.Infof("Shutting down node controller") @@ -547,10 +555,6 @@ func (nc *Controller) Run(ctx context.Context) { go nc.taintManager.Run(ctx) } - // Close node update queue to cleanup go routine. - defer nc.nodeUpdateQueue.ShutDown() - defer nc.podUpdateQueue.ShutDown() - // Start workers to reconcile labels and/or update NoSchedule taint for nodes. for i := 0; i < scheduler.UpdateWorkerSize; i++ { // Thanks to "workqueue", each worker just need to get item from queue, because diff --git a/pkg/controller/nodelifecycle/scheduler/taint_manager.go b/pkg/controller/nodelifecycle/scheduler/taint_manager.go index 129a61c7df0..750a9390819 100644 --- a/pkg/controller/nodelifecycle/scheduler/taint_manager.go +++ b/pkg/controller/nodelifecycle/scheduler/taint_manager.go @@ -82,6 +82,7 @@ type GetPodsByNodeNameFunc func(nodeName string) ([]*v1.Pod, error) // from Nodes tainted with NoExecute Taints. type NoExecuteTaintManager struct { client clientset.Interface + broadcaster record.EventBroadcaster recorder record.EventRecorder getPod GetPodFunc getNode GetNodeFunc @@ -158,16 +159,10 @@ func getMinTolerationTime(tolerations []v1.Toleration) time.Duration { func NewNoExecuteTaintManager(ctx context.Context, c clientset.Interface, getPod GetPodFunc, getNode GetNodeFunc, getPodsAssignedToNode GetPodsByNodeNameFunc) *NoExecuteTaintManager { eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "taint-controller"}) - eventBroadcaster.StartStructuredLogging(0) - if c != nil { - klog.InfoS("Sending events to api server") - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.CoreV1().Events("")}) - } else { - klog.Fatalf("kubeClient is nil when starting NodeController") - } tm := &NoExecuteTaintManager{ client: c, + broadcaster: eventBroadcaster, recorder: recorder, getPod: getPod, getNode: getNode, @@ -184,8 +179,23 @@ func NewNoExecuteTaintManager(ctx context.Context, c clientset.Interface, getPod // Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed. func (tc *NoExecuteTaintManager) Run(ctx context.Context) { + defer utilruntime.HandleCrash() + klog.InfoS("Starting NoExecuteTaintManager") + // Start events processing pipeline. + tc.broadcaster.StartStructuredLogging(0) + if tc.client != nil { + klog.InfoS("Sending events to api server") + tc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: tc.client.CoreV1().Events("")}) + } else { + klog.Fatalf("kubeClient is nil when starting NodeController") + } + defer tc.broadcaster.Shutdown() + + defer tc.nodeUpdateQueue.ShutDown() + defer tc.podUpdateQueue.ShutDown() + for i := 0; i < UpdateWorkerSize; i++ { tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize)) tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))