mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
Clean shutdown of nodecontroller integration tests
This commit is contained in:
parent
2af8d0bbd7
commit
006ff4510b
@ -300,7 +300,8 @@ type Controller struct {
|
|||||||
|
|
||||||
getPodsAssignedToNode func(nodeName string) ([]*v1.Pod, error)
|
getPodsAssignedToNode func(nodeName string) ([]*v1.Pod, error)
|
||||||
|
|
||||||
recorder record.EventRecorder
|
broadcaster record.EventBroadcaster
|
||||||
|
recorder record.EventRecorder
|
||||||
|
|
||||||
// Value controlling Controller monitoring period, i.e. how often does Controller
|
// Value controlling Controller monitoring period, i.e. how often does Controller
|
||||||
// check node health signal posted from kubelet. This value should be lower than
|
// check node health signal posted from kubelet. This value should be lower than
|
||||||
@ -372,13 +373,6 @@ func NewNodeLifecycleController(
|
|||||||
|
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "node-controller"})
|
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "node-controller"})
|
||||||
eventBroadcaster.StartStructuredLogging(0)
|
|
||||||
|
|
||||||
klog.Infof("Sending events to api server.")
|
|
||||||
eventBroadcaster.StartRecordingToSink(
|
|
||||||
&v1core.EventSinkImpl{
|
|
||||||
Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events(""),
|
|
||||||
})
|
|
||||||
|
|
||||||
if kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
|
if kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
|
||||||
ratelimiter.RegisterMetricAndTrackRateLimiterUsage("node_lifecycle_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
|
ratelimiter.RegisterMetricAndTrackRateLimiterUsage("node_lifecycle_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
|
||||||
@ -390,6 +384,7 @@ func NewNodeLifecycleController(
|
|||||||
knownNodeSet: make(map[string]*v1.Node),
|
knownNodeSet: make(map[string]*v1.Node),
|
||||||
nodeHealthMap: newNodeHealthMap(),
|
nodeHealthMap: newNodeHealthMap(),
|
||||||
nodeEvictionMap: newNodeEvictionMap(),
|
nodeEvictionMap: newNodeEvictionMap(),
|
||||||
|
broadcaster: eventBroadcaster,
|
||||||
recorder: recorder,
|
recorder: recorder,
|
||||||
nodeMonitorPeriod: nodeMonitorPeriod,
|
nodeMonitorPeriod: nodeMonitorPeriod,
|
||||||
nodeStartupGracePeriod: nodeStartupGracePeriod,
|
nodeStartupGracePeriod: nodeStartupGracePeriod,
|
||||||
@ -536,6 +531,19 @@ func NewNodeLifecycleController(
|
|||||||
func (nc *Controller) Run(ctx context.Context) {
|
func (nc *Controller) Run(ctx context.Context) {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
|
|
||||||
|
// Start events processing pipeline.
|
||||||
|
nc.broadcaster.StartStructuredLogging(0)
|
||||||
|
klog.Infof("Sending events to api server.")
|
||||||
|
nc.broadcaster.StartRecordingToSink(
|
||||||
|
&v1core.EventSinkImpl{
|
||||||
|
Interface: v1core.New(nc.kubeClient.CoreV1().RESTClient()).Events(""),
|
||||||
|
})
|
||||||
|
defer nc.broadcaster.Shutdown()
|
||||||
|
|
||||||
|
// Close node update queue to cleanup go routine.
|
||||||
|
defer nc.nodeUpdateQueue.ShutDown()
|
||||||
|
defer nc.podUpdateQueue.ShutDown()
|
||||||
|
|
||||||
klog.Infof("Starting node controller")
|
klog.Infof("Starting node controller")
|
||||||
defer klog.Infof("Shutting down node controller")
|
defer klog.Infof("Shutting down node controller")
|
||||||
|
|
||||||
@ -547,10 +555,6 @@ func (nc *Controller) Run(ctx context.Context) {
|
|||||||
go nc.taintManager.Run(ctx)
|
go nc.taintManager.Run(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close node update queue to cleanup go routine.
|
|
||||||
defer nc.nodeUpdateQueue.ShutDown()
|
|
||||||
defer nc.podUpdateQueue.ShutDown()
|
|
||||||
|
|
||||||
// Start workers to reconcile labels and/or update NoSchedule taint for nodes.
|
// Start workers to reconcile labels and/or update NoSchedule taint for nodes.
|
||||||
for i := 0; i < scheduler.UpdateWorkerSize; i++ {
|
for i := 0; i < scheduler.UpdateWorkerSize; i++ {
|
||||||
// Thanks to "workqueue", each worker just need to get item from queue, because
|
// Thanks to "workqueue", each worker just need to get item from queue, because
|
||||||
|
@ -82,6 +82,7 @@ type GetPodsByNodeNameFunc func(nodeName string) ([]*v1.Pod, error)
|
|||||||
// from Nodes tainted with NoExecute Taints.
|
// from Nodes tainted with NoExecute Taints.
|
||||||
type NoExecuteTaintManager struct {
|
type NoExecuteTaintManager struct {
|
||||||
client clientset.Interface
|
client clientset.Interface
|
||||||
|
broadcaster record.EventBroadcaster
|
||||||
recorder record.EventRecorder
|
recorder record.EventRecorder
|
||||||
getPod GetPodFunc
|
getPod GetPodFunc
|
||||||
getNode GetNodeFunc
|
getNode GetNodeFunc
|
||||||
@ -158,16 +159,10 @@ func getMinTolerationTime(tolerations []v1.Toleration) time.Duration {
|
|||||||
func NewNoExecuteTaintManager(ctx context.Context, c clientset.Interface, getPod GetPodFunc, getNode GetNodeFunc, getPodsAssignedToNode GetPodsByNodeNameFunc) *NoExecuteTaintManager {
|
func NewNoExecuteTaintManager(ctx context.Context, c clientset.Interface, getPod GetPodFunc, getNode GetNodeFunc, getPodsAssignedToNode GetPodsByNodeNameFunc) *NoExecuteTaintManager {
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "taint-controller"})
|
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "taint-controller"})
|
||||||
eventBroadcaster.StartStructuredLogging(0)
|
|
||||||
if c != nil {
|
|
||||||
klog.InfoS("Sending events to api server")
|
|
||||||
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.CoreV1().Events("")})
|
|
||||||
} else {
|
|
||||||
klog.Fatalf("kubeClient is nil when starting NodeController")
|
|
||||||
}
|
|
||||||
|
|
||||||
tm := &NoExecuteTaintManager{
|
tm := &NoExecuteTaintManager{
|
||||||
client: c,
|
client: c,
|
||||||
|
broadcaster: eventBroadcaster,
|
||||||
recorder: recorder,
|
recorder: recorder,
|
||||||
getPod: getPod,
|
getPod: getPod,
|
||||||
getNode: getNode,
|
getNode: getNode,
|
||||||
@ -184,8 +179,23 @@ func NewNoExecuteTaintManager(ctx context.Context, c clientset.Interface, getPod
|
|||||||
|
|
||||||
// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
|
// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
|
||||||
func (tc *NoExecuteTaintManager) Run(ctx context.Context) {
|
func (tc *NoExecuteTaintManager) Run(ctx context.Context) {
|
||||||
|
defer utilruntime.HandleCrash()
|
||||||
|
|
||||||
klog.InfoS("Starting NoExecuteTaintManager")
|
klog.InfoS("Starting NoExecuteTaintManager")
|
||||||
|
|
||||||
|
// Start events processing pipeline.
|
||||||
|
tc.broadcaster.StartStructuredLogging(0)
|
||||||
|
if tc.client != nil {
|
||||||
|
klog.InfoS("Sending events to api server")
|
||||||
|
tc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: tc.client.CoreV1().Events("")})
|
||||||
|
} else {
|
||||||
|
klog.Fatalf("kubeClient is nil when starting NodeController")
|
||||||
|
}
|
||||||
|
defer tc.broadcaster.Shutdown()
|
||||||
|
|
||||||
|
defer tc.nodeUpdateQueue.ShutDown()
|
||||||
|
defer tc.podUpdateQueue.ShutDown()
|
||||||
|
|
||||||
for i := 0; i < UpdateWorkerSize; i++ {
|
for i := 0; i < UpdateWorkerSize; i++ {
|
||||||
tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize))
|
tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize))
|
||||||
tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))
|
tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))
|
||||||
|
Loading…
Reference in New Issue
Block a user