From 1805d30b679108840c686990b612b656e5c9a01a Mon Sep 17 00:00:00 2001 From: Paul Morie Date: Tue, 30 Aug 2016 13:40:25 -0400 Subject: [PATCH] Reconcile value of controller-managed attach-detach annotation on existing nodes in Kubelet startup --- pkg/kubelet/kubelet_getters.go | 7 +- pkg/kubelet/kubelet_node_status.go | 143 ++++++++++++------ pkg/kubelet/kubelet_node_status_test.go | 186 +++++++++++++++++++++++- 3 files changed, 290 insertions(+), 46 deletions(-) diff --git a/pkg/kubelet/kubelet_getters.go b/pkg/kubelet/kubelet_getters.go index 8f9194c07e6..8b3dbbb1f8f 100644 --- a/pkg/kubelet/kubelet_getters.go +++ b/pkg/kubelet/kubelet_getters.go @@ -189,7 +189,7 @@ func (kl *Kubelet) GetRuntime() kubecontainer.Runtime { // GetNode returns the node info for the configured node name of this Kubelet. func (kl *Kubelet) GetNode() (*api.Node, error) { if kl.standaloneMode { - return kl.initialNodeStatus() + return kl.initialNode() } return kl.nodeInfo.GetNodeInfo(kl.nodeName) } @@ -205,7 +205,7 @@ func (kl *Kubelet) getNodeAnyWay() (*api.Node, error) { return n, nil } } - return kl.initialNodeStatus() + return kl.initialNode() } // GetNodeConfig returns the container manager node config. @@ -222,7 +222,8 @@ func (kl *Kubelet) GetHostIP() (net.IP, error) { return nodeutil.GetNodeHostIP(node) } -// getHostIPAnyway attempts to return the host IP from kubelet's nodeInfo, or the initialNodeStatus +// getHostIPAnyway attempts to return the host IP from kubelet's nodeInfo, or +// the initialNode. func (kl *Kubelet) getHostIPAnyWay() (net.IP, error) { node, err := kl.getNodeAnyWay() if err != nil { diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index 6d8bc25a121..cc0e50a34f1 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -39,14 +39,15 @@ import ( "k8s.io/kubernetes/pkg/volume/util/volumehelper" ) -// registerWithApiserver registers the node with the cluster master. It is safe +// registerWithApiServer registers the node with the cluster master. It is safe // to call multiple times, but not concurrently (kl.registrationCompleted is // not locked). -func (kl *Kubelet) registerWithApiserver() { +func (kl *Kubelet) registerWithApiServer() { if kl.registrationCompleted { return } step := 100 * time.Millisecond + for { time.Sleep(step) step = step * 2 @@ -54,52 +55,113 @@ func (kl *Kubelet) registerWithApiserver() { step = 7 * time.Second } - node, err := kl.initialNodeStatus() + node, err := kl.initialNode() if err != nil { glog.Errorf("Unable to construct api.Node object for kubelet: %v", err) continue } - glog.V(2).Infof("Attempting to register node %s", node.Name) - if _, err := kl.kubeClient.Core().Nodes().Create(node); err != nil { - if !apierrors.IsAlreadyExists(err) { - glog.V(2).Infof("Unable to register %s with the apiserver: %v", node.Name, err) - continue - } - currentNode, err := kl.kubeClient.Core().Nodes().Get(kl.nodeName) - if err != nil { - glog.Errorf("error getting node %q: %v", kl.nodeName, err) - continue - } - if currentNode == nil { - glog.Errorf("no node instance returned for %q", kl.nodeName) - continue - } - if currentNode.Spec.ExternalID == node.Spec.ExternalID { - glog.Infof("Node %s was previously registered", node.Name) - kl.registrationCompleted = true - return - } - glog.Errorf( - "Previously %q had externalID %q; now it is %q; will delete and recreate.", - kl.nodeName, node.Spec.ExternalID, currentNode.Spec.ExternalID, - ) - if err := kl.kubeClient.Core().Nodes().Delete(node.Name, nil); err != nil { - glog.Errorf("Unable to delete old node: %v", err) - } else { - glog.Errorf("Deleted old node object %q", kl.nodeName) - } - continue + glog.Infof("Attempting to register node %s", node.Name) + registered := kl.tryRegisterWithApiServer(node) + if registered { + glog.Infof("Successfully registered node %s", node.Name) + kl.registrationCompleted = true + return } - glog.Infof("Successfully registered node %s", node.Name) - kl.registrationCompleted = true - return } } -// initialNodeStatus determines the initial node status, incorporating node -// labels and information from the cloud provider. -func (kl *Kubelet) initialNodeStatus() (*api.Node, error) { +// tryRegisterWithApiServer makes an attempt to register the given node with +// the API server, returning a boolean indicating whether the attempt was +// successful. If a node with the same name already exists, it reconciles the +// value of the annotation for controller-managed attach-detach of attachable +// persistent volumes for the node. If a node of the same name exists but has +// a different externalID value, it attempts to delete that node so that a +// later attempt can recreate it. +func (kl *Kubelet) tryRegisterWithApiServer(node *api.Node) bool { + _, err := kl.kubeClient.Core().Nodes().Create(node) + if err == nil { + return true + } + + if !apierrors.IsAlreadyExists(err) { + glog.Errorf("Unable to register node %q with API server: %v", kl.nodeName, err) + return false + } + + existingNode, err := kl.kubeClient.Core().Nodes().Get(kl.nodeName) + if err != nil { + glog.Errorf("Unable to register node %q with API server: error getting existing node: %v", kl.nodeName, err) + return false + } + if existingNode == nil { + glog.Errorf("Unable to register node %q with API server: no node instance returned", kl.nodeName) + return false + } + + if existingNode.Spec.ExternalID == node.Spec.ExternalID { + glog.Infof("Node %s was previously registered", kl.nodeName) + + // Edge case: the node was previously registered; reconcile + // the value of the controller-managed attach-detach + // annotation. + requiresUpdate := kl.reconcileCMADAnnotationWithExistingNode(node, existingNode) + if requiresUpdate { + if _, err := kl.kubeClient.Core().Nodes().Update(existingNode); err != nil { + glog.Errorf("Unable to reconcile node %q with API server: error updating node: %v", kl.nodeName, err) + return false + } + } + + return true + } + + glog.Errorf( + "Previously node %q had externalID %q; now it is %q; will delete and recreate.", + kl.nodeName, node.Spec.ExternalID, existingNode.Spec.ExternalID, + ) + if err := kl.kubeClient.Core().Nodes().Delete(node.Name, nil); err != nil { + glog.Errorf("Unable to register node %q with API server: error deleting old node: %v", kl.nodeName, err) + } else { + glog.Info("Deleted old node object %q", kl.nodeName) + } + + return false +} + +// reconcileCMADAnnotationWithExistingNode reconciles the controller-managed +// attach-detach annotation on a new node and the existing node, returning +// whether the existing node must be updated. +func (kl *Kubelet) reconcileCMADAnnotationWithExistingNode(node, existingNode *api.Node) bool { + var ( + existingCMAAnnotation = existingNode.Annotations[volumehelper.ControllerManagedAttachAnnotation] + newCMAAnnotation, newSet = node.Annotations[volumehelper.ControllerManagedAttachAnnotation] + ) + + if newCMAAnnotation == existingCMAAnnotation { + return false + } + + // If the just-constructed node and the existing node do + // not have the same value, update the existing node with + // the correct value of the annotation. + if !newSet { + glog.Info("Controller attach-detach setting changed to false; updating existing Node") + delete(existingNode.Annotations, volumehelper.ControllerManagedAttachAnnotation) + } else { + glog.Info("Controller attach-detach setting changed to true; updating existing Node") + if existingNode.Annotations == nil { + existingNode.Annotations = make(map[string]string) + } + existingNode.Annotations[volumehelper.ControllerManagedAttachAnnotation] = newCMAAnnotation + } + + return true +} + +// initialNode constructs the initial api.Node for this Kubelet, incorporating node +// labels, information from the cloud provider, and Kubelet configuration. +func (kl *Kubelet) initialNode() (*api.Node, error) { node := &api.Node{ ObjectMeta: api.ObjectMeta{ Name: kl.nodeName, @@ -216,7 +278,7 @@ func (kl *Kubelet) syncNodeStatus() { } if kl.registerNode { // This will exit immediately if it doesn't need to do anything. - kl.registerWithApiserver() + kl.registerWithApiServer() } if err := kl.updateNodeStatus(); err != nil { glog.Errorf("Unable to update node status: %v", err) @@ -288,7 +350,6 @@ func (kl *Kubelet) recordNodeStatusEvent(eventtype, event string) { // Set IP addresses for the node. func (kl *Kubelet) setNodeAddress(node *api.Node) error { - if kl.cloud != nil { instances, ok := kl.cloud.Instances() if !ok { diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index 67ecde92947..3b2d1d6cac5 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -41,6 +41,7 @@ import ( "k8s.io/kubernetes/pkg/util/uuid" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/version" + "k8s.io/kubernetes/pkg/volume/util/volumehelper" ) // generateTestingImageList generate randomly generated image list and corresponding expectedImageList. @@ -839,7 +840,7 @@ func TestUpdateNodeStatusError(t *testing.T) { } } -func TestRegisterExistingNodeWithApiserver(t *testing.T) { +func TestRegisterWithApiServer(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) kubelet := testKubelet.kubelet kubeClient := testKubelet.fakeKubeClient @@ -886,7 +887,7 @@ func TestRegisterExistingNodeWithApiserver(t *testing.T) { done := make(chan struct{}) go func() { - kubelet.registerWithApiserver() + kubelet.registerWithApiServer() done <- struct{}{} }() select { @@ -896,3 +897,184 @@ func TestRegisterExistingNodeWithApiserver(t *testing.T) { return } } + +func TestTryRegisterWithApiServer(t *testing.T) { + alreadyExists := &apierrors.StatusError{ + ErrStatus: unversioned.Status{Reason: unversioned.StatusReasonAlreadyExists}, + } + + conflict := &apierrors.StatusError{ + ErrStatus: unversioned.Status{Reason: unversioned.StatusReasonConflict}, + } + + newNode := func(cmad bool, externalID string) *api.Node { + node := &api.Node{ + ObjectMeta: api.ObjectMeta{}, + Spec: api.NodeSpec{ + ExternalID: externalID, + }, + } + + if cmad { + node.Annotations = make(map[string]string) + node.Annotations[volumehelper.ControllerManagedAttachAnnotation] = "true" + } + + return node + } + + cases := []struct { + name string + newNode *api.Node + existingNode *api.Node + createError error + getError error + updateError error + deleteError error + expectedResult bool + expectedActions int + testSavedNode bool + savedNodeIndex int + savedNodeCMAD bool + }{ + { + name: "success case - new node", + newNode: &api.Node{}, + expectedResult: true, + expectedActions: 1, + }, + { + name: "success case - existing node - no change in CMAD", + newNode: newNode(true, "a"), + createError: alreadyExists, + existingNode: newNode(true, "a"), + expectedResult: true, + expectedActions: 2, + }, + { + name: "success case - existing node - CMAD disabled", + newNode: newNode(false, "a"), + createError: alreadyExists, + existingNode: newNode(true, "a"), + expectedResult: true, + expectedActions: 3, + testSavedNode: true, + savedNodeIndex: 2, + savedNodeCMAD: false, + }, + { + name: "success case - existing node - CMAD enabled", + newNode: newNode(true, "a"), + createError: alreadyExists, + existingNode: newNode(false, "a"), + expectedResult: true, + expectedActions: 3, + testSavedNode: true, + savedNodeIndex: 2, + savedNodeCMAD: true, + }, + { + name: "success case - external ID changed", + newNode: newNode(false, "b"), + createError: alreadyExists, + existingNode: newNode(false, "a"), + expectedResult: false, + expectedActions: 3, + }, + { + name: "create failed", + newNode: newNode(false, "b"), + createError: conflict, + expectedResult: false, + expectedActions: 1, + }, + { + name: "get existing node failed", + newNode: newNode(false, "a"), + createError: alreadyExists, + getError: conflict, + expectedResult: false, + expectedActions: 2, + }, + { + name: "update existing node failed", + newNode: newNode(false, "a"), + createError: alreadyExists, + existingNode: newNode(true, "a"), + updateError: conflict, + expectedResult: false, + expectedActions: 3, + }, + { + name: "delete existing node failed", + newNode: newNode(false, "b"), + createError: alreadyExists, + existingNode: newNode(false, "a"), + deleteError: conflict, + expectedResult: false, + expectedActions: 3, + }, + } + + for _, tc := range cases { + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled is a don't-care for this test */) + kubelet := testKubelet.kubelet + kubeClient := testKubelet.fakeKubeClient + + kubeClient.AddReactor("create", "nodes", func(action core.Action) (bool, runtime.Object, error) { + return true, nil, tc.createError + }) + kubeClient.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) { + // Return an existing (matching) node on get. + return true, tc.existingNode, tc.getError + }) + kubeClient.AddReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) { + return true, nil, tc.updateError + }) + kubeClient.AddReactor("delete", "nodes", func(action core.Action) (bool, runtime.Object, error) { + return true, nil, tc.deleteError + }) + kubeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) { + return true, nil, fmt.Errorf("no reaction implemented for %s", action) + }) + + result := kubelet.tryRegisterWithApiServer(tc.newNode) + if e, a := tc.expectedResult, result; e != a { + t.Errorf("%v: unexpected result; expected %v got %v", tc.name, e, a) + continue + } + + actions := kubeClient.Actions() + if e, a := tc.expectedActions, len(actions); e != a { + t.Errorf("%v: unexpected number of actions, expected %v, got %v", tc.name, e, a) + } + + if tc.testSavedNode { + var savedNode *api.Node + var ok bool + + t.Logf("actions: %v: %+v", len(actions), actions) + action := actions[tc.savedNodeIndex] + if action.GetVerb() == "create" { + createAction := action.(core.CreateAction) + savedNode, ok = createAction.GetObject().(*api.Node) + if !ok { + t.Errorf("%v: unexpected type; couldn't convert to *api.Node: %+v", tc.name, createAction.GetObject()) + continue + } + } else if action.GetVerb() == "update" { + updateAction := action.(core.UpdateAction) + savedNode, ok = updateAction.GetObject().(*api.Node) + if !ok { + t.Errorf("%v: unexpected type; couldn't convert to *api.Node: %+v", tc.name, updateAction.GetObject()) + continue + } + } + + actualCMAD, _ := strconv.ParseBool(savedNode.Annotations[volumehelper.ControllerManagedAttachAnnotation]) + if e, a := tc.savedNodeCMAD, actualCMAD; e != a { + t.Errorf("%v: unexpected attach-detach value on saved node; expected %v got %v", tc.name, e, a) + } + } + } +}