From 3cf5b172fa8982fe205eae25aea7b8f11b82b809 Mon Sep 17 00:00:00 2001 From: Jesse Haka Date: Sun, 4 Feb 2018 15:11:17 +0200 Subject: [PATCH] add node shutdown taint shutdowned -> stopped use shutdown everywhere use patch in taints api call use notimplemented in clouds use AddOrUpdateTaintOnNode correct log text add fake cloud try to fix bazel add shutdown tests add context --- pkg/cloudprovider/cloud.go | 2 + pkg/cloudprovider/providers/aws/aws.go | 5 + .../providers/azure/azure_instances.go | 5 + .../cloudstack/cloudstack_instances.go | 5 + .../providers/cloudstack/metadata.go | 5 + pkg/cloudprovider/providers/fake/fake.go | 12 +- .../providers/gce/gce_instances.go | 5 + .../openstack/openstack_instances.go | 5 + pkg/cloudprovider/providers/ovirt/ovirt.go | 5 + pkg/cloudprovider/providers/photon/photon.go | 5 + .../providers/vsphere/vsphere.go | 5 + pkg/controller/cloud/node_controller.go | 44 ++++++- pkg/controller/cloud/node_controller_test.go | 109 ++++++++++++++++++ pkg/controller/testutil/test_utils.go | 4 + pkg/scheduler/algorithm/well_known_labels.go | 5 + pkg/volume/cinder/attacher_test.go | 4 + 16 files changed, 217 insertions(+), 8 deletions(-) diff --git a/pkg/cloudprovider/cloud.go b/pkg/cloudprovider/cloud.go index 83f743c1807..2f803e4a315 100644 --- a/pkg/cloudprovider/cloud.go +++ b/pkg/cloudprovider/cloud.go @@ -148,6 +148,8 @@ type Instances interface { // InstanceExistsByProviderID returns true if the instance for the given provider id still is running. // If false is returned with no error, the instance will be immediately deleted by the cloud controller manager. InstanceExistsByProviderID(ctx context.Context, providerID string) (bool, error) + // InstanceShutdownByProviderID returns true if the instance is shutdown in cloudprovider + InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) } // Route is a representation of an advanced routing rule. diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index f2275acd915..79e7c628b37 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -1338,6 +1338,11 @@ func (c *Cloud) InstanceExistsByProviderID(ctx context.Context, providerID strin return true, nil } +// InstanceShutdownByProviderID returns true if the instance is in safe state to detach volumes +func (c *Cloud) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) { + return false, cloudprovider.NotImplemented +} + // InstanceID returns the cloud provider ID of the node with the specified nodeName. func (c *Cloud) InstanceID(ctx context.Context, nodeName types.NodeName) (string, error) { // In the future it is possible to also return an endpoint as: diff --git a/pkg/cloudprovider/providers/azure/azure_instances.go b/pkg/cloudprovider/providers/azure/azure_instances.go index 1b85334e1aa..2f69f381794 100644 --- a/pkg/cloudprovider/providers/azure/azure_instances.go +++ b/pkg/cloudprovider/providers/azure/azure_instances.go @@ -103,6 +103,11 @@ func (az *Cloud) isCurrentInstance(name types.NodeName) (bool, error) { return (metadataName == nodeName), err } +// InstanceShutdownByProviderID returns true if the instance is in safe state to detach volumes +func (az *Cloud) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) { + return false, cloudprovider.NotImplemented +} + // InstanceID returns the cloud provider ID of the specified instance. // Note that if the instance does not exist or is no longer running, we must return ("", cloudprovider.InstanceNotFound) func (az *Cloud) InstanceID(ctx context.Context, name types.NodeName) (string, error) { diff --git a/pkg/cloudprovider/providers/cloudstack/cloudstack_instances.go b/pkg/cloudprovider/providers/cloudstack/cloudstack_instances.go index cf8559a2b0b..20f98a31e25 100644 --- a/pkg/cloudprovider/providers/cloudstack/cloudstack_instances.go +++ b/pkg/cloudprovider/providers/cloudstack/cloudstack_instances.go @@ -158,3 +158,8 @@ func (cs *CSCloud) InstanceExistsByProviderID(ctx context.Context, providerID st return true, nil } + +// InstanceShutdownByProviderID returns true if the instance is in safe state to detach volumes +func (cs *CSCloud) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) { + return false, cloudprovider.NotImplemented +} diff --git a/pkg/cloudprovider/providers/cloudstack/metadata.go b/pkg/cloudprovider/providers/cloudstack/metadata.go index d1529912b7d..110027c3074 100644 --- a/pkg/cloudprovider/providers/cloudstack/metadata.go +++ b/pkg/cloudprovider/providers/cloudstack/metadata.go @@ -119,6 +119,11 @@ func (m *metadata) InstanceExistsByProviderID(ctx context.Context, providerID st return false, errors.New("InstanceExistsByProviderID not implemented") } +// InstanceShutdownByProviderID returns if the instance is shutdown. +func (m *metadata) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) { + return false, cloudprovider.NotImplemented +} + // GetZone returns the Zone containing the region that the program is running in. func (m *metadata) GetZone(ctx context.Context) (cloudprovider.Zone, error) { zone := cloudprovider.Zone{} diff --git a/pkg/cloudprovider/providers/fake/fake.go b/pkg/cloudprovider/providers/fake/fake.go index 6b4ee682562..582bb7c5d42 100644 --- a/pkg/cloudprovider/providers/fake/fake.go +++ b/pkg/cloudprovider/providers/fake/fake.go @@ -50,8 +50,10 @@ type FakeCloud struct { Exists bool Err error - ExistsByProviderID bool - ErrByProviderID error + ExistsByProviderID bool + ErrByProviderID error + NodeShutdown bool + ErrShutdownByProviderID error Calls []string Addresses []v1.NodeAddress @@ -241,6 +243,12 @@ func (f *FakeCloud) InstanceExistsByProviderID(ctx context.Context, providerID s return f.ExistsByProviderID, f.ErrByProviderID } +// InstanceShutdownByProviderID returns true if the instances is in safe state to detach volumes +func (f *FakeCloud) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) { + f.addCall("instance-shutdown-by-provider-id") + return f.NodeShutdown, f.ErrShutdownByProviderID +} + // List is a test-spy implementation of Instances.List. // It adds an entry "list" into the internal method call record. func (f *FakeCloud) List(filter string) ([]types.NodeName, error) { diff --git a/pkg/cloudprovider/providers/gce/gce_instances.go b/pkg/cloudprovider/providers/gce/gce_instances.go index aa31f5e3a3e..d52fe06414f 100644 --- a/pkg/cloudprovider/providers/gce/gce_instances.go +++ b/pkg/cloudprovider/providers/gce/gce_instances.go @@ -190,6 +190,11 @@ func (gce *GCECloud) InstanceExistsByProviderID(ctx context.Context, providerID return true, nil } +// InstanceShutdownByProviderID returns true if the instance is in safe state to detach volumes +func (gce *GCECloud) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) { + return false, cloudprovider.NotImplemented +} + // InstanceID returns the cloud provider ID of the node with the specified NodeName. func (gce *GCECloud) InstanceID(ctx context.Context, nodeName types.NodeName) (string, error) { instanceName := mapNodeNameToInstanceName(nodeName) diff --git a/pkg/cloudprovider/providers/openstack/openstack_instances.go b/pkg/cloudprovider/providers/openstack/openstack_instances.go index b8ea557e674..4cd0571ed86 100644 --- a/pkg/cloudprovider/providers/openstack/openstack_instances.go +++ b/pkg/cloudprovider/providers/openstack/openstack_instances.go @@ -141,6 +141,11 @@ func (i *Instances) InstanceExistsByProviderID(ctx context.Context, providerID s return true, nil } +// InstanceShutdownByProviderID returns true if the instances is in safe state to detach volumes +func (i *Instances) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) { + return false, cloudprovider.NotImplemented +} + // InstanceID returns the kubelet's cloud provider ID. func (os *OpenStack) InstanceID() (string, error) { if len(os.localInstanceID) == 0 { diff --git a/pkg/cloudprovider/providers/ovirt/ovirt.go b/pkg/cloudprovider/providers/ovirt/ovirt.go index 708358eec6d..2e4e35c62b4 100644 --- a/pkg/cloudprovider/providers/ovirt/ovirt.go +++ b/pkg/cloudprovider/providers/ovirt/ovirt.go @@ -212,6 +212,11 @@ func (v *OVirtCloud) InstanceExistsByProviderID(ctx context.Context, providerID return false, cloudprovider.NotImplemented } +// InstanceShutdownByProviderID returns true if the instance is in safe state to detach volumes +func (v *OVirtCloud) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) { + return false, cloudprovider.NotImplemented +} + // InstanceID returns the cloud provider ID of the node with the specified NodeName. func (v *OVirtCloud) InstanceID(ctx context.Context, nodeName types.NodeName) (string, error) { name := mapNodeNameToInstanceName(nodeName) diff --git a/pkg/cloudprovider/providers/photon/photon.go b/pkg/cloudprovider/providers/photon/photon.go index 0081b30711f..c1d87eba38b 100644 --- a/pkg/cloudprovider/providers/photon/photon.go +++ b/pkg/cloudprovider/providers/photon/photon.go @@ -477,6 +477,11 @@ func (pc *PCCloud) InstanceExistsByProviderID(ctx context.Context, providerID st return false, cloudprovider.NotImplemented } +// InstanceShutdownByProviderID returns true if the instance is in safe state to detach volumes +func (pc *PCCloud) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) { + return false, cloudprovider.NotImplemented +} + // InstanceID returns the cloud provider ID of the specified instance. func (pc *PCCloud) InstanceID(ctx context.Context, nodeName k8stypes.NodeName) (string, error) { name := string(nodeName) diff --git a/pkg/cloudprovider/providers/vsphere/vsphere.go b/pkg/cloudprovider/providers/vsphere/vsphere.go index a4e7f60d9b1..059a81a7437 100644 --- a/pkg/cloudprovider/providers/vsphere/vsphere.go +++ b/pkg/cloudprovider/providers/vsphere/vsphere.go @@ -584,6 +584,11 @@ func (vs *VSphere) InstanceExistsByProviderID(ctx context.Context, providerID st return false, err } +// InstanceShutdownByProviderID returns true if the instance is in safe state to detach volumes +func (vs *VSphere) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) { + return false, cloudprovider.NotImplemented +} + // InstanceID returns the cloud provider ID of the node with the specified Name. func (vs *VSphere) InstanceID(ctx context.Context, nodeName k8stypes.NodeName) (string, error) { diff --git a/pkg/controller/cloud/node_controller.go b/pkg/controller/cloud/node_controller.go index 9f1c86e7b94..74c09d53db3 100644 --- a/pkg/controller/cloud/node_controller.go +++ b/pkg/controller/cloud/node_controller.go @@ -37,16 +37,23 @@ import ( clientretry "k8s.io/client-go/util/retry" nodeutilv1 "k8s.io/kubernetes/pkg/api/v1/node" "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/controller" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" "k8s.io/kubernetes/pkg/scheduler/algorithm" nodeutil "k8s.io/kubernetes/pkg/util/node" ) -var UpdateNodeSpecBackoff = wait.Backoff{ - Steps: 20, - Duration: 50 * time.Millisecond, - Jitter: 1.0, -} +var ( + UpdateNodeSpecBackoff = wait.Backoff{ + Steps: 20, + Duration: 50 * time.Millisecond, + Jitter: 1.0} + + ShutDownTaint = &v1.Taint{ + Key: algorithm.TaintNodeShutdown, + Effect: v1.TaintEffectNoSchedule, + } +) type CloudNodeController struct { nodeInformer coreinformers.NodeInformer @@ -240,9 +247,28 @@ func (cnc *CloudNodeController) MonitorNode() { // from the cloud provider. If node cannot be found in cloudprovider, then delete the node immediately if currentReadyCondition != nil { if currentReadyCondition.Status != v1.ConditionTrue { + // we need to check this first to get taint working in similar in all cloudproviders + // current problem is that shutdown nodes are not working in similar way ie. all cloudproviders + // does not delete node from kubernetes cluster when instance it is shutdown see issue #46442 + exists, err := instances.InstanceShutdownByProviderID(context.TODO(), node.Spec.ProviderID) + if err != nil && err != cloudprovider.NotImplemented { + glog.Errorf("Error getting data for node %s from cloud: %v", node.Name, err) + continue + } + + if exists { + // if node is shutdown add shutdown taint + err = controller.AddOrUpdateTaintOnNode(cnc.kubeClient, node.Name, ShutDownTaint) + if err != nil { + glog.Errorf("Error patching node taints: %v", err) + } + // Continue checking the remaining nodes since the current one is fine. + continue + } + // Check with the cloud provider to see if the node still exists. If it // doesn't, delete the node immediately. - exists, err := ensureNodeExistsByProviderIDOrExternalID(instances, node) + exists, err = ensureNodeExistsByProviderIDOrExternalID(instances, node) if err != nil { glog.Errorf("Error getting data for node %s from cloud: %v", node.Name, err) continue @@ -272,6 +298,12 @@ func (cnc *CloudNodeController) MonitorNode() { } }(node.Name) + } else { + // if taint exist remove taint + err = controller.RemoveTaintOffNode(cnc.kubeClient, node.Name, node, ShutDownTaint) + if err != nil { + glog.Errorf("Error patching node taints: %v", err) + } } } } diff --git a/pkg/controller/cloud/node_controller_test.go b/pkg/controller/cloud/node_controller_test.go index a8faf8352f4..805e7373fe9 100644 --- a/pkg/controller/cloud/node_controller_test.go +++ b/pkg/controller/cloud/node_controller_test.go @@ -148,6 +148,115 @@ func TestEnsureNodeExistsByProviderIDOrNodeName(t *testing.T) { } +func TestNodeShutdown(t *testing.T) { + + testCases := []struct { + testName string + node *v1.Node + existsByProviderID bool + shutdown bool + }{ + { + testName: "node shutdowned add taint", + existsByProviderID: true, + shutdown: true, + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Spec: v1.NodeSpec{ + ProviderID: "node0", + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionUnknown, + LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + }, + }, + }, + }, + }, + { + testName: "node started after shutdown remove taint", + existsByProviderID: true, + shutdown: false, + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Spec: v1.NodeSpec{ + ProviderID: "node0", + Taints: []v1.Taint{ + { + Key: algorithm.TaintNodeShutdown, + Effect: v1.TaintEffectNoSchedule, + }, + }, + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + }, + }, + }, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.testName, func(t *testing.T) { + fc := &fakecloud.FakeCloud{ + ExistsByProviderID: tc.existsByProviderID, + NodeShutdown: tc.shutdown, + } + fnh := &testutil.FakeNodeHandler{ + Existing: []*v1.Node{tc.node}, + Clientset: fake.NewSimpleClientset(), + PatchWaitChan: make(chan struct{}), + } + + factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc()) + + eventBroadcaster := record.NewBroadcaster() + cloudNodeController := &CloudNodeController{ + kubeClient: fnh, + nodeInformer: factory.Core().V1().Nodes(), + cloud: fc, + nodeMonitorPeriod: 1 * time.Second, + recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}), + nodeStatusUpdateFrequency: 1 * time.Second, + } + eventBroadcaster.StartLogging(glog.Infof) + + cloudNodeController.Run() + + select { + case <-fnh.PatchWaitChan: + case <-time.After(1 * time.Second): + t.Errorf("Timed out waiting %v for node to be updated", wait.ForeverTestTimeout) + } + + assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") + if tc.shutdown { + assert.Equal(t, 1, len(fnh.UpdatedNodes[0].Spec.Taints), "Node Taint was not added") + assert.Equal(t, "node.cloudprovider.kubernetes.io/shutdown", fnh.UpdatedNodes[0].Spec.Taints[0].Key, "Node Taint key is not correct") + } else { + assert.Equal(t, 0, len(fnh.UpdatedNodes[0].Spec.Taints), "Node Taint was not removed after node is back in ready state") + } + + }) + } + +} + // This test checks that the node is deleted when kubelet stops reporting // and cloud provider says node is gone func TestNodeDeleted(t *testing.T) { diff --git a/pkg/controller/testutil/test_utils.go b/pkg/controller/testutil/test_utils.go index 1ecada1f6f2..11bfb66366c 100644 --- a/pkg/controller/testutil/test_utils.go +++ b/pkg/controller/testutil/test_utils.go @@ -67,6 +67,7 @@ type FakeNodeHandler struct { // Synchronization lock sync.Mutex DeleteWaitChan chan struct{} + PatchWaitChan chan struct{} } // FakeLegacyHandler is a fake implemtation of CoreV1Interface. @@ -270,6 +271,9 @@ func (m *FakeNodeHandler) Patch(name string, pt types.PatchType, data []byte, su m.lock.Lock() defer func() { m.RequestCount++ + if m.PatchWaitChan != nil { + m.PatchWaitChan <- struct{}{} + } m.lock.Unlock() }() var nodeCopy v1.Node diff --git a/pkg/scheduler/algorithm/well_known_labels.go b/pkg/scheduler/algorithm/well_known_labels.go index bffe40a0885..e916481ce30 100644 --- a/pkg/scheduler/algorithm/well_known_labels.go +++ b/pkg/scheduler/algorithm/well_known_labels.go @@ -61,4 +61,9 @@ const ( // from the cloud-controller-manager intitializes this node, and then removes // the taint TaintExternalCloudProvider = "node.cloudprovider.kubernetes.io/uninitialized" + + // When node is shutdown in external cloud provider + // shutdown flag is needed for immediately volume detach + // after node comes back, the taint is removed + TaintNodeShutdown = "node.cloudprovider.kubernetes.io/shutdown" ) diff --git a/pkg/volume/cinder/attacher_test.go b/pkg/volume/cinder/attacher_test.go index b75fdef561d..bef42dcaa24 100644 --- a/pkg/volume/cinder/attacher_test.go +++ b/pkg/volume/cinder/attacher_test.go @@ -732,6 +732,10 @@ func (instances *instances) InstanceExistsByProviderID(ctx context.Context, prov return false, errors.New("unimplemented") } +func (instances *instances) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) { + return false, errors.New("unimplemented") +} + func (instances *instances) List(filter string) ([]types.NodeName, error) { return []types.NodeName{}, errors.New("Not implemented") }