Merge pull request #31730 from pmorie/kubelet-attach-detach-update

Automatic merge from submit-queue

Make it possible to enable controller-managed attach-detach on existing nodes

Fixes #31673.  Now, if a node already exists with the given name on Kubelet startup, the Kubelet will reconcile the value of the controller-managed-attach-detach annotation so that existing nodes can have this feature turned on and off by changing the Kubelet configuration.

cc @kubernetes/sig-storage @kubernetes/rh-cluster-infra
This commit is contained in:
Kubernetes Submit Queue 2016-09-01 07:31:18 -07:00 committed by GitHub
commit 4e1ff53bb2
3 changed files with 290 additions and 46 deletions

View File

@ -189,7 +189,7 @@ func (kl *Kubelet) GetRuntime() kubecontainer.Runtime {
// GetNode returns the node info for the configured node name of this Kubelet.
func (kl *Kubelet) GetNode() (*api.Node, error) {
if kl.standaloneMode {
return kl.initialNodeStatus()
return kl.initialNode()
}
return kl.nodeInfo.GetNodeInfo(kl.nodeName)
}
@ -205,7 +205,7 @@ func (kl *Kubelet) getNodeAnyWay() (*api.Node, error) {
return n, nil
}
}
return kl.initialNodeStatus()
return kl.initialNode()
}
// GetNodeConfig returns the container manager node config.
@ -222,7 +222,8 @@ func (kl *Kubelet) GetHostIP() (net.IP, error) {
return nodeutil.GetNodeHostIP(node)
}
// getHostIPAnyway attempts to return the host IP from kubelet's nodeInfo, or the initialNodeStatus
// getHostIPAnyway attempts to return the host IP from kubelet's nodeInfo, or
// the initialNode.
func (kl *Kubelet) getHostIPAnyWay() (net.IP, error) {
node, err := kl.getNodeAnyWay()
if err != nil {

View File

@ -39,14 +39,15 @@ import (
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)
// registerWithApiserver registers the node with the cluster master. It is safe
// registerWithApiServer registers the node with the cluster master. It is safe
// to call multiple times, but not concurrently (kl.registrationCompleted is
// not locked).
func (kl *Kubelet) registerWithApiserver() {
func (kl *Kubelet) registerWithApiServer() {
if kl.registrationCompleted {
return
}
step := 100 * time.Millisecond
for {
time.Sleep(step)
step = step * 2
@ -54,52 +55,113 @@ func (kl *Kubelet) registerWithApiserver() {
step = 7 * time.Second
}
node, err := kl.initialNodeStatus()
node, err := kl.initialNode()
if err != nil {
glog.Errorf("Unable to construct api.Node object for kubelet: %v", err)
continue
}
glog.V(2).Infof("Attempting to register node %s", node.Name)
if _, err := kl.kubeClient.Core().Nodes().Create(node); err != nil {
if !apierrors.IsAlreadyExists(err) {
glog.V(2).Infof("Unable to register %s with the apiserver: %v", node.Name, err)
continue
}
currentNode, err := kl.kubeClient.Core().Nodes().Get(kl.nodeName)
if err != nil {
glog.Errorf("error getting node %q: %v", kl.nodeName, err)
continue
}
if currentNode == nil {
glog.Errorf("no node instance returned for %q", kl.nodeName)
continue
}
if currentNode.Spec.ExternalID == node.Spec.ExternalID {
glog.Infof("Node %s was previously registered", node.Name)
kl.registrationCompleted = true
return
}
glog.Errorf(
"Previously %q had externalID %q; now it is %q; will delete and recreate.",
kl.nodeName, node.Spec.ExternalID, currentNode.Spec.ExternalID,
)
if err := kl.kubeClient.Core().Nodes().Delete(node.Name, nil); err != nil {
glog.Errorf("Unable to delete old node: %v", err)
} else {
glog.Errorf("Deleted old node object %q", kl.nodeName)
}
continue
glog.Infof("Attempting to register node %s", node.Name)
registered := kl.tryRegisterWithApiServer(node)
if registered {
glog.Infof("Successfully registered node %s", node.Name)
kl.registrationCompleted = true
return
}
glog.Infof("Successfully registered node %s", node.Name)
kl.registrationCompleted = true
return
}
}
// initialNodeStatus determines the initial node status, incorporating node
// labels and information from the cloud provider.
func (kl *Kubelet) initialNodeStatus() (*api.Node, error) {
// tryRegisterWithApiServer makes an attempt to register the given node with
// the API server, returning a boolean indicating whether the attempt was
// successful. If a node with the same name already exists, it reconciles the
// value of the annotation for controller-managed attach-detach of attachable
// persistent volumes for the node. If a node of the same name exists but has
// a different externalID value, it attempts to delete that node so that a
// later attempt can recreate it.
func (kl *Kubelet) tryRegisterWithApiServer(node *api.Node) bool {
_, err := kl.kubeClient.Core().Nodes().Create(node)
if err == nil {
return true
}
if !apierrors.IsAlreadyExists(err) {
glog.Errorf("Unable to register node %q with API server: %v", kl.nodeName, err)
return false
}
existingNode, err := kl.kubeClient.Core().Nodes().Get(kl.nodeName)
if err != nil {
glog.Errorf("Unable to register node %q with API server: error getting existing node: %v", kl.nodeName, err)
return false
}
if existingNode == nil {
glog.Errorf("Unable to register node %q with API server: no node instance returned", kl.nodeName)
return false
}
if existingNode.Spec.ExternalID == node.Spec.ExternalID {
glog.Infof("Node %s was previously registered", kl.nodeName)
// Edge case: the node was previously registered; reconcile
// the value of the controller-managed attach-detach
// annotation.
requiresUpdate := kl.reconcileCMADAnnotationWithExistingNode(node, existingNode)
if requiresUpdate {
if _, err := kl.kubeClient.Core().Nodes().Update(existingNode); err != nil {
glog.Errorf("Unable to reconcile node %q with API server: error updating node: %v", kl.nodeName, err)
return false
}
}
return true
}
glog.Errorf(
"Previously node %q had externalID %q; now it is %q; will delete and recreate.",
kl.nodeName, node.Spec.ExternalID, existingNode.Spec.ExternalID,
)
if err := kl.kubeClient.Core().Nodes().Delete(node.Name, nil); err != nil {
glog.Errorf("Unable to register node %q with API server: error deleting old node: %v", kl.nodeName, err)
} else {
glog.Info("Deleted old node object %q", kl.nodeName)
}
return false
}
// reconcileCMADAnnotationWithExistingNode reconciles the controller-managed
// attach-detach annotation on a new node and the existing node, returning
// whether the existing node must be updated.
func (kl *Kubelet) reconcileCMADAnnotationWithExistingNode(node, existingNode *api.Node) bool {
var (
existingCMAAnnotation = existingNode.Annotations[volumehelper.ControllerManagedAttachAnnotation]
newCMAAnnotation, newSet = node.Annotations[volumehelper.ControllerManagedAttachAnnotation]
)
if newCMAAnnotation == existingCMAAnnotation {
return false
}
// If the just-constructed node and the existing node do
// not have the same value, update the existing node with
// the correct value of the annotation.
if !newSet {
glog.Info("Controller attach-detach setting changed to false; updating existing Node")
delete(existingNode.Annotations, volumehelper.ControllerManagedAttachAnnotation)
} else {
glog.Info("Controller attach-detach setting changed to true; updating existing Node")
if existingNode.Annotations == nil {
existingNode.Annotations = make(map[string]string)
}
existingNode.Annotations[volumehelper.ControllerManagedAttachAnnotation] = newCMAAnnotation
}
return true
}
// initialNode constructs the initial api.Node for this Kubelet, incorporating node
// labels, information from the cloud provider, and Kubelet configuration.
func (kl *Kubelet) initialNode() (*api.Node, error) {
node := &api.Node{
ObjectMeta: api.ObjectMeta{
Name: kl.nodeName,
@ -216,7 +278,7 @@ func (kl *Kubelet) syncNodeStatus() {
}
if kl.registerNode {
// This will exit immediately if it doesn't need to do anything.
kl.registerWithApiserver()
kl.registerWithApiServer()
}
if err := kl.updateNodeStatus(); err != nil {
glog.Errorf("Unable to update node status: %v", err)
@ -288,7 +350,6 @@ func (kl *Kubelet) recordNodeStatusEvent(eventtype, event string) {
// Set IP addresses for the node.
func (kl *Kubelet) setNodeAddress(node *api.Node) error {
if kl.cloud != nil {
instances, ok := kl.cloud.Instances()
if !ok {

View File

@ -41,6 +41,7 @@ import (
"k8s.io/kubernetes/pkg/util/uuid"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)
// generateTestingImageList generate randomly generated image list and corresponding expectedImageList.
@ -839,7 +840,7 @@ func TestUpdateNodeStatusError(t *testing.T) {
}
}
func TestRegisterExistingNodeWithApiserver(t *testing.T) {
func TestRegisterWithApiServer(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
@ -886,7 +887,7 @@ func TestRegisterExistingNodeWithApiserver(t *testing.T) {
done := make(chan struct{})
go func() {
kubelet.registerWithApiserver()
kubelet.registerWithApiServer()
done <- struct{}{}
}()
select {
@ -896,3 +897,184 @@ func TestRegisterExistingNodeWithApiserver(t *testing.T) {
return
}
}
func TestTryRegisterWithApiServer(t *testing.T) {
alreadyExists := &apierrors.StatusError{
ErrStatus: unversioned.Status{Reason: unversioned.StatusReasonAlreadyExists},
}
conflict := &apierrors.StatusError{
ErrStatus: unversioned.Status{Reason: unversioned.StatusReasonConflict},
}
newNode := func(cmad bool, externalID string) *api.Node {
node := &api.Node{
ObjectMeta: api.ObjectMeta{},
Spec: api.NodeSpec{
ExternalID: externalID,
},
}
if cmad {
node.Annotations = make(map[string]string)
node.Annotations[volumehelper.ControllerManagedAttachAnnotation] = "true"
}
return node
}
cases := []struct {
name string
newNode *api.Node
existingNode *api.Node
createError error
getError error
updateError error
deleteError error
expectedResult bool
expectedActions int
testSavedNode bool
savedNodeIndex int
savedNodeCMAD bool
}{
{
name: "success case - new node",
newNode: &api.Node{},
expectedResult: true,
expectedActions: 1,
},
{
name: "success case - existing node - no change in CMAD",
newNode: newNode(true, "a"),
createError: alreadyExists,
existingNode: newNode(true, "a"),
expectedResult: true,
expectedActions: 2,
},
{
name: "success case - existing node - CMAD disabled",
newNode: newNode(false, "a"),
createError: alreadyExists,
existingNode: newNode(true, "a"),
expectedResult: true,
expectedActions: 3,
testSavedNode: true,
savedNodeIndex: 2,
savedNodeCMAD: false,
},
{
name: "success case - existing node - CMAD enabled",
newNode: newNode(true, "a"),
createError: alreadyExists,
existingNode: newNode(false, "a"),
expectedResult: true,
expectedActions: 3,
testSavedNode: true,
savedNodeIndex: 2,
savedNodeCMAD: true,
},
{
name: "success case - external ID changed",
newNode: newNode(false, "b"),
createError: alreadyExists,
existingNode: newNode(false, "a"),
expectedResult: false,
expectedActions: 3,
},
{
name: "create failed",
newNode: newNode(false, "b"),
createError: conflict,
expectedResult: false,
expectedActions: 1,
},
{
name: "get existing node failed",
newNode: newNode(false, "a"),
createError: alreadyExists,
getError: conflict,
expectedResult: false,
expectedActions: 2,
},
{
name: "update existing node failed",
newNode: newNode(false, "a"),
createError: alreadyExists,
existingNode: newNode(true, "a"),
updateError: conflict,
expectedResult: false,
expectedActions: 3,
},
{
name: "delete existing node failed",
newNode: newNode(false, "b"),
createError: alreadyExists,
existingNode: newNode(false, "a"),
deleteError: conflict,
expectedResult: false,
expectedActions: 3,
},
}
for _, tc := range cases {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled is a don't-care for this test */)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.AddReactor("create", "nodes", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, tc.createError
})
kubeClient.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) {
// Return an existing (matching) node on get.
return true, tc.existingNode, tc.getError
})
kubeClient.AddReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, tc.updateError
})
kubeClient.AddReactor("delete", "nodes", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, tc.deleteError
})
kubeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, fmt.Errorf("no reaction implemented for %s", action)
})
result := kubelet.tryRegisterWithApiServer(tc.newNode)
if e, a := tc.expectedResult, result; e != a {
t.Errorf("%v: unexpected result; expected %v got %v", tc.name, e, a)
continue
}
actions := kubeClient.Actions()
if e, a := tc.expectedActions, len(actions); e != a {
t.Errorf("%v: unexpected number of actions, expected %v, got %v", tc.name, e, a)
}
if tc.testSavedNode {
var savedNode *api.Node
var ok bool
t.Logf("actions: %v: %+v", len(actions), actions)
action := actions[tc.savedNodeIndex]
if action.GetVerb() == "create" {
createAction := action.(core.CreateAction)
savedNode, ok = createAction.GetObject().(*api.Node)
if !ok {
t.Errorf("%v: unexpected type; couldn't convert to *api.Node: %+v", tc.name, createAction.GetObject())
continue
}
} else if action.GetVerb() == "update" {
updateAction := action.(core.UpdateAction)
savedNode, ok = updateAction.GetObject().(*api.Node)
if !ok {
t.Errorf("%v: unexpected type; couldn't convert to *api.Node: %+v", tc.name, updateAction.GetObject())
continue
}
}
actualCMAD, _ := strconv.ParseBool(savedNode.Annotations[volumehelper.ControllerManagedAttachAnnotation])
if e, a := tc.savedNodeCMAD, actualCMAD; e != a {
t.Errorf("%v: unexpected attach-detach value on saved node; expected %v got %v", tc.name, e, a)
}
}
}
}