diff --git a/pkg/controller/node/controller_utils.go b/pkg/controller/node/controller_utils.go index 9127afbf48c..d7a0a191cfa 100644 --- a/pkg/controller/node/controller_utils.go +++ b/pkg/controller/node/controller_utils.go @@ -316,3 +316,69 @@ func swapNodeControllerTaint(kubeClient clientset.Interface, taintToAdd, taintTo glog.V(4).Infof("Made sure that Node %v has no %v Taint", node.Name, taintToRemove) return true } + +func createAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) { + return func(originalObj interface{}) { + obj, err := api.Scheme.DeepCopy(originalObj) + if err != nil { + utilruntime.HandleError(err) + return + } + node := obj.(*v1.Node) + + if err := f(node); err != nil { + utilruntime.HandleError(fmt.Errorf("Error while processing Node Delete: %v", err)) + } + } +} + +func createUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldObj, newObj interface{}) { + return func(origOldObj, origNewObj interface{}) { + oldObj, err := api.Scheme.DeepCopy(origOldObj) + if err != nil { + utilruntime.HandleError(err) + return + } + newObj, err := api.Scheme.DeepCopy(origNewObj) + if err != nil { + utilruntime.HandleError(err) + return + } + node := newObj.(*v1.Node) + prevNode := oldObj.(*v1.Node) + + if err := f(prevNode, node); err != nil { + utilruntime.HandleError(fmt.Errorf("Error while processing Node Add/Delete: %v", err)) + } + } +} + +func createDeleteNodeHandler(f func(node *v1.Node) error) func(obj interface{}) { + return func(originalObj interface{}) { + obj, err := api.Scheme.DeepCopy(originalObj) + if err != nil { + utilruntime.HandleError(err) + return + } + + node, isNode := obj.(*v1.Node) + // We can get DeletedFinalStateUnknown instead of *v1.Node here and + // we need to handle that correctly. #34692 + if !isNode { + deletedState, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("Received unexpected object: %v", obj) + return + } + node, ok = deletedState.Obj.(*v1.Node) + if !ok { + glog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj) + return + } + } + + if err := f(node); err != nil { + utilruntime.HandleError(fmt.Errorf("Error while processing Node Add/Delete: %v", err)) + } + } +} diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index f632e6ef6c3..9df927ee234 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -348,13 +348,51 @@ func NewNodeController( } nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: nc.onNodeAdd, - UpdateFunc: nc.onNodeUpdate, - DeleteFunc: nc.onNodeDelete, + AddFunc: createAddNodeHandler(nc.cidrAllocator.AllocateOrOccupyCIDR), + UpdateFunc: createUpdateNodeHandler(func(_, newNode *v1.Node) error { + // If the PodCIDR is not empty we either: + // - already processed a Node that already had a CIDR after NC restarted + // (cidr is marked as used), + // - already processed a Node successfully and allocated a CIDR for it + // (cidr is marked as used), + // - already processed a Node but we did saw a "timeout" response and + // request eventually got through in this case we haven't released + // the allocated CIDR (cidr is still marked as used). + // There's a possible error here: + // - NC sees a new Node and assigns a CIDR X to it, + // - Update Node call fails with a timeout, + // - Node is updated by some other component, NC sees an update and + // assigns CIDR Y to the Node, + // - Both CIDR X and CIDR Y are marked as used in the local cache, + // even though Node sees only CIDR Y + // The problem here is that in in-memory cache we see CIDR X as marked, + // which prevents it from being assigned to any new node. The cluster + // state is correct. + // Restart of NC fixes the issue. + if newNode.Spec.PodCIDR == "" { + return nc.cidrAllocator.AllocateOrOccupyCIDR(newNode) + } + return nil + }), + DeleteFunc: createDeleteNodeHandler(nc.cidrAllocator.ReleaseCIDR), }) } if nc.runTaintManager { + nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: createAddNodeHandler(func(node *v1.Node) error { + nc.taintManager.NodeUpdated(nil, node) + return nil + }), + UpdateFunc: createUpdateNodeHandler(func(oldNode, newNode *v1.Node) error { + nc.taintManager.NodeUpdated(oldNode, newNode) + return nil + }), + DeleteFunc: createDeleteNodeHandler(func(node *v1.Node) error { + nc.taintManager.NodeUpdated(node, nil) + return nil + }), + }) nc.taintManager = NewNoExecuteTaintManager(kubeClient) } @@ -435,90 +473,6 @@ func (nc *NodeController) doTaintingPass() { } } -func (nc *NodeController) onNodeAdd(originalObj interface{}) { - obj, err := api.Scheme.DeepCopy(originalObj) - if err != nil { - utilruntime.HandleError(err) - return - } - node := obj.(*v1.Node) - - if err := nc.cidrAllocator.AllocateOrOccupyCIDR(node); err != nil { - utilruntime.HandleError(fmt.Errorf("Error allocating CIDR: %v", err)) - } - if nc.taintManager != nil { - nc.taintManager.NodeUpdated(nil, node) - } -} - -func (nc *NodeController) onNodeUpdate(oldNode, newNode interface{}) { - node := newNode.(*v1.Node) - prevNode := oldNode.(*v1.Node) - // If the PodCIDR is not empty we either: - // - already processed a Node that already had a CIDR after NC restarted - // (cidr is marked as used), - // - already processed a Node successfully and allocated a CIDR for it - // (cidr is marked as used), - // - already processed a Node but we did saw a "timeout" response and - // request eventually got through in this case we haven't released - // the allocated CIDR (cidr is still marked as used). - // There's a possible error here: - // - NC sees a new Node and assigns a CIDR X to it, - // - Update Node call fails with a timeout, - // - Node is updated by some other component, NC sees an update and - // assigns CIDR Y to the Node, - // - Both CIDR X and CIDR Y are marked as used in the local cache, - // even though Node sees only CIDR Y - // The problem here is that in in-memory cache we see CIDR X as marked, - // which prevents it from being assigned to any new node. The cluster - // state is correct. - // Restart of NC fixes the issue. - if node.Spec.PodCIDR == "" { - nodeCopy, err := api.Scheme.Copy(node) - if err != nil { - utilruntime.HandleError(err) - return - } - - if err := nc.cidrAllocator.AllocateOrOccupyCIDR(nodeCopy.(*v1.Node)); err != nil { - utilruntime.HandleError(fmt.Errorf("Error allocating CIDR: %v", err)) - } - } - if nc.taintManager != nil { - nc.taintManager.NodeUpdated(prevNode, node) - } -} - -func (nc *NodeController) onNodeDelete(originalObj interface{}) { - obj, err := api.Scheme.DeepCopy(originalObj) - if err != nil { - utilruntime.HandleError(err) - return - } - - node, isNode := obj.(*v1.Node) - // We can get DeletedFinalStateUnknown instead of *v1.Node here and - // we need to handle that correctly. #34692 - if !isNode { - deletedState, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - glog.Errorf("Received unexpected object: %v", obj) - return - } - node, ok = deletedState.Obj.(*v1.Node) - if !ok { - glog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj) - return - } - } - if nc.taintManager != nil { - nc.taintManager.NodeUpdated(node, nil) - } - if err := nc.cidrAllocator.ReleaseCIDR(node); err != nil { - glog.Errorf("Error releasing CIDR: %v", err) - } -} - // Run starts an asynchronous loop that monitors the status of cluster nodes. func (nc *NodeController) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash()