diff --git a/pkg/controller/nodelifecycle/node_lifecycle_controller.go b/pkg/controller/nodelifecycle/node_lifecycle_controller.go index 38026459743..b7fa74e961c 100644 --- a/pkg/controller/nodelifecycle/node_lifecycle_controller.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller.go @@ -222,8 +222,7 @@ type Controller struct { // 'MemoryPressure', 'OutOfDisk' and 'DiskPressure'. taintNodeByCondition bool - nodeUpdateChannels []chan *v1.Node - nodeUpdateQueue workqueue.Interface + nodeUpdateQueue workqueue.Interface } // NewNodeLifecycleController returns a new taint controller. @@ -350,11 +349,11 @@ func NewNodeLifecycleController(podInformer coreinformers.PodInformer, glog.Infof("Controller will taint node by condition.") nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error { - nc.nodeUpdateQueue.Add(node) + nc.nodeUpdateQueue.Add(node.Name) return nil }), UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { - nc.nodeUpdateQueue.Add(newNode) + nc.nodeUpdateQueue.Add(newNode.Name) return nil }), }) @@ -396,36 +395,16 @@ func (nc *Controller) Run(stopCh <-chan struct{}) { } if nc.taintNodeByCondition { - for i := 0; i < scheduler.UpdateWorkerSize; i++ { - nc.nodeUpdateChannels = append(nc.nodeUpdateChannels, make(chan *v1.Node, scheduler.NodeUpdateChannelSize)) - } - - // Dispatcher - go func(stopCh <-chan struct{}) { - for { - obj, shutdown := nc.nodeUpdateQueue.Get() - if shutdown { - break - } - - node := obj.(*v1.Node) - hash := hash(node.Name, scheduler.UpdateWorkerSize) - - select { - case <-stopCh: - nc.nodeUpdateQueue.Done(node) - return - case nc.nodeUpdateChannels[hash] <- node: - } - nc.nodeUpdateQueue.Done(node) - } - }(stopCh) // Close node update queue to cleanup go routine. defer nc.nodeUpdateQueue.ShutDown() // Start workers to update NoSchedule taint for nodes. for i := 0; i < scheduler.UpdateWorkerSize; i++ { - go nc.doNoScheduleTaintingPassWorker(i, stopCh) + // Thanks to "workqueue", each worker just need to get item from queue, because + // the item is flagged when got from queue: if new event come, the new item will + // be re-queued until "Done", so no more than one worker handle the same item and + // no event missed. + go wait.Until(nc.doNoScheduleTaintingPassWorker, time.Second, stopCh) } } @@ -488,20 +467,34 @@ func (nc *Controller) doFixDeprecatedTaintKeyPass(node *v1.Node) error { return nil } -func (nc *Controller) doNoScheduleTaintingPassWorker(i int, stopCh <-chan struct{}) { +func (nc *Controller) doNoScheduleTaintingPassWorker() { for { - select { - case <-stopCh: + obj, shutdown := nc.nodeUpdateQueue.Get() + // "nodeUpdateQueue" will be shutdown when "stopCh" closed; + // we do not need to re-check "stopCh" again. + if shutdown { return - case node := <-nc.nodeUpdateChannels[i]: - if err := nc.doNoScheduleTaintingPass(node); err != nil { - glog.Errorf("Failed to taint NoSchedule on node <%s>: %v", node.Name, err) - } } + nodeName := obj.(string) + + if err := nc.doNoScheduleTaintingPass(nodeName); err != nil { + // TODO (k82cn): Add nodeName back to the queue. + glog.Errorf("Failed to taint NoSchedule on node <%s>, requeue it: %v", nodeName, err) + } + nc.nodeUpdateQueue.Done(nodeName) } } -func (nc *Controller) doNoScheduleTaintingPass(node *v1.Node) error { +func (nc *Controller) doNoScheduleTaintingPass(nodeName string) error { + node, err := nc.nodeLister.Get(nodeName) + if err != nil { + // If node not found, just ignore it. + if apierrors.IsNotFound(err) { + return nil + } + return err + } + // Map node's condition to Taints. var taints []v1.Taint for _, condition := range node.Status.Conditions { diff --git a/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go b/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go index c6e824221b1..cd581bc0c08 100644 --- a/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go @@ -2334,7 +2334,7 @@ func TestTaintsNodeByCondition(t *testing.T) { if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } - nodeController.doNoScheduleTaintingPass(test.Node) + nodeController.doNoScheduleTaintingPass(test.Node.Name) if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) }