mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Make taint manager harder to break before we move it out of NC
This commit is contained in:
parent
5baa947c8c
commit
51c07147c8
@ -316,3 +316,69 @@ func swapNodeControllerTaint(kubeClient clientset.Interface, taintToAdd, taintTo
|
||||
glog.V(4).Infof("Made sure that Node %v has no %v Taint", node.Name, taintToRemove)
|
||||
return true
|
||||
}
|
||||
|
||||
func createAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
|
||||
return func(originalObj interface{}) {
|
||||
obj, err := api.Scheme.DeepCopy(originalObj)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
return
|
||||
}
|
||||
node := obj.(*v1.Node)
|
||||
|
||||
if err := f(node); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Error while processing Node Delete: %v", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func createUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldObj, newObj interface{}) {
|
||||
return func(origOldObj, origNewObj interface{}) {
|
||||
oldObj, err := api.Scheme.DeepCopy(origOldObj)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
return
|
||||
}
|
||||
newObj, err := api.Scheme.DeepCopy(origNewObj)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
return
|
||||
}
|
||||
node := newObj.(*v1.Node)
|
||||
prevNode := oldObj.(*v1.Node)
|
||||
|
||||
if err := f(prevNode, node); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Error while processing Node Add/Delete: %v", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func createDeleteNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
|
||||
return func(originalObj interface{}) {
|
||||
obj, err := api.Scheme.DeepCopy(originalObj)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
return
|
||||
}
|
||||
|
||||
node, isNode := obj.(*v1.Node)
|
||||
// We can get DeletedFinalStateUnknown instead of *v1.Node here and
|
||||
// we need to handle that correctly. #34692
|
||||
if !isNode {
|
||||
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
glog.Errorf("Received unexpected object: %v", obj)
|
||||
return
|
||||
}
|
||||
node, ok = deletedState.Obj.(*v1.Node)
|
||||
if !ok {
|
||||
glog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if err := f(node); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Error while processing Node Add/Delete: %v", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -348,13 +348,51 @@ func NewNodeController(
|
||||
}
|
||||
|
||||
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: nc.onNodeAdd,
|
||||
UpdateFunc: nc.onNodeUpdate,
|
||||
DeleteFunc: nc.onNodeDelete,
|
||||
AddFunc: createAddNodeHandler(nc.cidrAllocator.AllocateOrOccupyCIDR),
|
||||
UpdateFunc: createUpdateNodeHandler(func(_, newNode *v1.Node) error {
|
||||
// If the PodCIDR is not empty we either:
|
||||
// - already processed a Node that already had a CIDR after NC restarted
|
||||
// (cidr is marked as used),
|
||||
// - already processed a Node successfully and allocated a CIDR for it
|
||||
// (cidr is marked as used),
|
||||
// - already processed a Node but we did saw a "timeout" response and
|
||||
// request eventually got through in this case we haven't released
|
||||
// the allocated CIDR (cidr is still marked as used).
|
||||
// There's a possible error here:
|
||||
// - NC sees a new Node and assigns a CIDR X to it,
|
||||
// - Update Node call fails with a timeout,
|
||||
// - Node is updated by some other component, NC sees an update and
|
||||
// assigns CIDR Y to the Node,
|
||||
// - Both CIDR X and CIDR Y are marked as used in the local cache,
|
||||
// even though Node sees only CIDR Y
|
||||
// The problem here is that in in-memory cache we see CIDR X as marked,
|
||||
// which prevents it from being assigned to any new node. The cluster
|
||||
// state is correct.
|
||||
// Restart of NC fixes the issue.
|
||||
if newNode.Spec.PodCIDR == "" {
|
||||
return nc.cidrAllocator.AllocateOrOccupyCIDR(newNode)
|
||||
}
|
||||
return nil
|
||||
}),
|
||||
DeleteFunc: createDeleteNodeHandler(nc.cidrAllocator.ReleaseCIDR),
|
||||
})
|
||||
}
|
||||
|
||||
if nc.runTaintManager {
|
||||
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: createAddNodeHandler(func(node *v1.Node) error {
|
||||
nc.taintManager.NodeUpdated(nil, node)
|
||||
return nil
|
||||
}),
|
||||
UpdateFunc: createUpdateNodeHandler(func(oldNode, newNode *v1.Node) error {
|
||||
nc.taintManager.NodeUpdated(oldNode, newNode)
|
||||
return nil
|
||||
}),
|
||||
DeleteFunc: createDeleteNodeHandler(func(node *v1.Node) error {
|
||||
nc.taintManager.NodeUpdated(node, nil)
|
||||
return nil
|
||||
}),
|
||||
})
|
||||
nc.taintManager = NewNoExecuteTaintManager(kubeClient)
|
||||
}
|
||||
|
||||
@ -435,90 +473,6 @@ func (nc *NodeController) doTaintingPass() {
|
||||
}
|
||||
}
|
||||
|
||||
func (nc *NodeController) onNodeAdd(originalObj interface{}) {
|
||||
obj, err := api.Scheme.DeepCopy(originalObj)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
return
|
||||
}
|
||||
node := obj.(*v1.Node)
|
||||
|
||||
if err := nc.cidrAllocator.AllocateOrOccupyCIDR(node); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Error allocating CIDR: %v", err))
|
||||
}
|
||||
if nc.taintManager != nil {
|
||||
nc.taintManager.NodeUpdated(nil, node)
|
||||
}
|
||||
}
|
||||
|
||||
func (nc *NodeController) onNodeUpdate(oldNode, newNode interface{}) {
|
||||
node := newNode.(*v1.Node)
|
||||
prevNode := oldNode.(*v1.Node)
|
||||
// If the PodCIDR is not empty we either:
|
||||
// - already processed a Node that already had a CIDR after NC restarted
|
||||
// (cidr is marked as used),
|
||||
// - already processed a Node successfully and allocated a CIDR for it
|
||||
// (cidr is marked as used),
|
||||
// - already processed a Node but we did saw a "timeout" response and
|
||||
// request eventually got through in this case we haven't released
|
||||
// the allocated CIDR (cidr is still marked as used).
|
||||
// There's a possible error here:
|
||||
// - NC sees a new Node and assigns a CIDR X to it,
|
||||
// - Update Node call fails with a timeout,
|
||||
// - Node is updated by some other component, NC sees an update and
|
||||
// assigns CIDR Y to the Node,
|
||||
// - Both CIDR X and CIDR Y are marked as used in the local cache,
|
||||
// even though Node sees only CIDR Y
|
||||
// The problem here is that in in-memory cache we see CIDR X as marked,
|
||||
// which prevents it from being assigned to any new node. The cluster
|
||||
// state is correct.
|
||||
// Restart of NC fixes the issue.
|
||||
if node.Spec.PodCIDR == "" {
|
||||
nodeCopy, err := api.Scheme.Copy(node)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := nc.cidrAllocator.AllocateOrOccupyCIDR(nodeCopy.(*v1.Node)); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Error allocating CIDR: %v", err))
|
||||
}
|
||||
}
|
||||
if nc.taintManager != nil {
|
||||
nc.taintManager.NodeUpdated(prevNode, node)
|
||||
}
|
||||
}
|
||||
|
||||
func (nc *NodeController) onNodeDelete(originalObj interface{}) {
|
||||
obj, err := api.Scheme.DeepCopy(originalObj)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
return
|
||||
}
|
||||
|
||||
node, isNode := obj.(*v1.Node)
|
||||
// We can get DeletedFinalStateUnknown instead of *v1.Node here and
|
||||
// we need to handle that correctly. #34692
|
||||
if !isNode {
|
||||
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
glog.Errorf("Received unexpected object: %v", obj)
|
||||
return
|
||||
}
|
||||
node, ok = deletedState.Obj.(*v1.Node)
|
||||
if !ok {
|
||||
glog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
if nc.taintManager != nil {
|
||||
nc.taintManager.NodeUpdated(node, nil)
|
||||
}
|
||||
if err := nc.cidrAllocator.ReleaseCIDR(node); err != nil {
|
||||
glog.Errorf("Error releasing CIDR: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Run starts an asynchronous loop that monitors the status of cluster nodes.
|
||||
func (nc *NodeController) Run(stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
Loading…
Reference in New Issue
Block a user