mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 06:27:05 +00:00
add node shutdown taint
shutdowned -> stopped use shutdown everywhere use patch in taints api call use notimplemented in clouds use AddOrUpdateTaintOnNode correct log text add fake cloud try to fix bazel add shutdown tests add context
This commit is contained in:
parent
6827c3cf47
commit
3cf5b172fa
@ -148,6 +148,8 @@ type Instances interface {
|
||||
// InstanceExistsByProviderID returns true if the instance for the given provider id still is running.
|
||||
// If false is returned with no error, the instance will be immediately deleted by the cloud controller manager.
|
||||
InstanceExistsByProviderID(ctx context.Context, providerID string) (bool, error)
|
||||
// InstanceShutdownByProviderID returns true if the instance is shutdown in cloudprovider
|
||||
InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error)
|
||||
}
|
||||
|
||||
// Route is a representation of an advanced routing rule.
|
||||
|
@ -1338,6 +1338,11 @@ func (c *Cloud) InstanceExistsByProviderID(ctx context.Context, providerID strin
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// InstanceShutdownByProviderID returns true if the instance is in safe state to detach volumes
|
||||
func (c *Cloud) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) {
|
||||
return false, cloudprovider.NotImplemented
|
||||
}
|
||||
|
||||
// InstanceID returns the cloud provider ID of the node with the specified nodeName.
|
||||
func (c *Cloud) InstanceID(ctx context.Context, nodeName types.NodeName) (string, error) {
|
||||
// In the future it is possible to also return an endpoint as:
|
||||
|
@ -103,6 +103,11 @@ func (az *Cloud) isCurrentInstance(name types.NodeName) (bool, error) {
|
||||
return (metadataName == nodeName), err
|
||||
}
|
||||
|
||||
// InstanceShutdownByProviderID returns true if the instance is in safe state to detach volumes
|
||||
func (az *Cloud) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) {
|
||||
return false, cloudprovider.NotImplemented
|
||||
}
|
||||
|
||||
// InstanceID returns the cloud provider ID of the specified instance.
|
||||
// Note that if the instance does not exist or is no longer running, we must return ("", cloudprovider.InstanceNotFound)
|
||||
func (az *Cloud) InstanceID(ctx context.Context, name types.NodeName) (string, error) {
|
||||
|
@ -158,3 +158,8 @@ func (cs *CSCloud) InstanceExistsByProviderID(ctx context.Context, providerID st
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// InstanceShutdownByProviderID returns true if the instance is in safe state to detach volumes
|
||||
func (cs *CSCloud) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) {
|
||||
return false, cloudprovider.NotImplemented
|
||||
}
|
||||
|
@ -119,6 +119,11 @@ func (m *metadata) InstanceExistsByProviderID(ctx context.Context, providerID st
|
||||
return false, errors.New("InstanceExistsByProviderID not implemented")
|
||||
}
|
||||
|
||||
// InstanceShutdownByProviderID returns if the instance is shutdown.
|
||||
func (m *metadata) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) {
|
||||
return false, cloudprovider.NotImplemented
|
||||
}
|
||||
|
||||
// GetZone returns the Zone containing the region that the program is running in.
|
||||
func (m *metadata) GetZone(ctx context.Context) (cloudprovider.Zone, error) {
|
||||
zone := cloudprovider.Zone{}
|
||||
|
@ -50,8 +50,10 @@ type FakeCloud struct {
|
||||
Exists bool
|
||||
Err error
|
||||
|
||||
ExistsByProviderID bool
|
||||
ErrByProviderID error
|
||||
ExistsByProviderID bool
|
||||
ErrByProviderID error
|
||||
NodeShutdown bool
|
||||
ErrShutdownByProviderID error
|
||||
|
||||
Calls []string
|
||||
Addresses []v1.NodeAddress
|
||||
@ -241,6 +243,12 @@ func (f *FakeCloud) InstanceExistsByProviderID(ctx context.Context, providerID s
|
||||
return f.ExistsByProviderID, f.ErrByProviderID
|
||||
}
|
||||
|
||||
// InstanceShutdownByProviderID returns true if the instances is in safe state to detach volumes
|
||||
func (f *FakeCloud) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) {
|
||||
f.addCall("instance-shutdown-by-provider-id")
|
||||
return f.NodeShutdown, f.ErrShutdownByProviderID
|
||||
}
|
||||
|
||||
// List is a test-spy implementation of Instances.List.
|
||||
// It adds an entry "list" into the internal method call record.
|
||||
func (f *FakeCloud) List(filter string) ([]types.NodeName, error) {
|
||||
|
@ -190,6 +190,11 @@ func (gce *GCECloud) InstanceExistsByProviderID(ctx context.Context, providerID
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// InstanceShutdownByProviderID returns true if the instance is in safe state to detach volumes
|
||||
func (gce *GCECloud) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) {
|
||||
return false, cloudprovider.NotImplemented
|
||||
}
|
||||
|
||||
// InstanceID returns the cloud provider ID of the node with the specified NodeName.
|
||||
func (gce *GCECloud) InstanceID(ctx context.Context, nodeName types.NodeName) (string, error) {
|
||||
instanceName := mapNodeNameToInstanceName(nodeName)
|
||||
|
@ -141,6 +141,11 @@ func (i *Instances) InstanceExistsByProviderID(ctx context.Context, providerID s
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// InstanceShutdownByProviderID returns true if the instances is in safe state to detach volumes
|
||||
func (i *Instances) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) {
|
||||
return false, cloudprovider.NotImplemented
|
||||
}
|
||||
|
||||
// InstanceID returns the kubelet's cloud provider ID.
|
||||
func (os *OpenStack) InstanceID() (string, error) {
|
||||
if len(os.localInstanceID) == 0 {
|
||||
|
@ -212,6 +212,11 @@ func (v *OVirtCloud) InstanceExistsByProviderID(ctx context.Context, providerID
|
||||
return false, cloudprovider.NotImplemented
|
||||
}
|
||||
|
||||
// InstanceShutdownByProviderID returns true if the instance is in safe state to detach volumes
|
||||
func (v *OVirtCloud) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) {
|
||||
return false, cloudprovider.NotImplemented
|
||||
}
|
||||
|
||||
// InstanceID returns the cloud provider ID of the node with the specified NodeName.
|
||||
func (v *OVirtCloud) InstanceID(ctx context.Context, nodeName types.NodeName) (string, error) {
|
||||
name := mapNodeNameToInstanceName(nodeName)
|
||||
|
@ -477,6 +477,11 @@ func (pc *PCCloud) InstanceExistsByProviderID(ctx context.Context, providerID st
|
||||
return false, cloudprovider.NotImplemented
|
||||
}
|
||||
|
||||
// InstanceShutdownByProviderID returns true if the instance is in safe state to detach volumes
|
||||
func (pc *PCCloud) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) {
|
||||
return false, cloudprovider.NotImplemented
|
||||
}
|
||||
|
||||
// InstanceID returns the cloud provider ID of the specified instance.
|
||||
func (pc *PCCloud) InstanceID(ctx context.Context, nodeName k8stypes.NodeName) (string, error) {
|
||||
name := string(nodeName)
|
||||
|
@ -584,6 +584,11 @@ func (vs *VSphere) InstanceExistsByProviderID(ctx context.Context, providerID st
|
||||
return false, err
|
||||
}
|
||||
|
||||
// InstanceShutdownByProviderID returns true if the instance is in safe state to detach volumes
|
||||
func (vs *VSphere) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) {
|
||||
return false, cloudprovider.NotImplemented
|
||||
}
|
||||
|
||||
// InstanceID returns the cloud provider ID of the node with the specified Name.
|
||||
func (vs *VSphere) InstanceID(ctx context.Context, nodeName k8stypes.NodeName) (string, error) {
|
||||
|
||||
|
@ -37,16 +37,23 @@ import (
|
||||
clientretry "k8s.io/client-go/util/retry"
|
||||
nodeutilv1 "k8s.io/kubernetes/pkg/api/v1/node"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
||||
nodeutil "k8s.io/kubernetes/pkg/util/node"
|
||||
)
|
||||
|
||||
var UpdateNodeSpecBackoff = wait.Backoff{
|
||||
Steps: 20,
|
||||
Duration: 50 * time.Millisecond,
|
||||
Jitter: 1.0,
|
||||
}
|
||||
var (
|
||||
UpdateNodeSpecBackoff = wait.Backoff{
|
||||
Steps: 20,
|
||||
Duration: 50 * time.Millisecond,
|
||||
Jitter: 1.0}
|
||||
|
||||
ShutDownTaint = &v1.Taint{
|
||||
Key: algorithm.TaintNodeShutdown,
|
||||
Effect: v1.TaintEffectNoSchedule,
|
||||
}
|
||||
)
|
||||
|
||||
type CloudNodeController struct {
|
||||
nodeInformer coreinformers.NodeInformer
|
||||
@ -240,9 +247,28 @@ func (cnc *CloudNodeController) MonitorNode() {
|
||||
// from the cloud provider. If node cannot be found in cloudprovider, then delete the node immediately
|
||||
if currentReadyCondition != nil {
|
||||
if currentReadyCondition.Status != v1.ConditionTrue {
|
||||
// we need to check this first to get taint working in similar in all cloudproviders
|
||||
// current problem is that shutdown nodes are not working in similar way ie. all cloudproviders
|
||||
// does not delete node from kubernetes cluster when instance it is shutdown see issue #46442
|
||||
exists, err := instances.InstanceShutdownByProviderID(context.TODO(), node.Spec.ProviderID)
|
||||
if err != nil && err != cloudprovider.NotImplemented {
|
||||
glog.Errorf("Error getting data for node %s from cloud: %v", node.Name, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if exists {
|
||||
// if node is shutdown add shutdown taint
|
||||
err = controller.AddOrUpdateTaintOnNode(cnc.kubeClient, node.Name, ShutDownTaint)
|
||||
if err != nil {
|
||||
glog.Errorf("Error patching node taints: %v", err)
|
||||
}
|
||||
// Continue checking the remaining nodes since the current one is fine.
|
||||
continue
|
||||
}
|
||||
|
||||
// Check with the cloud provider to see if the node still exists. If it
|
||||
// doesn't, delete the node immediately.
|
||||
exists, err := ensureNodeExistsByProviderIDOrExternalID(instances, node)
|
||||
exists, err = ensureNodeExistsByProviderIDOrExternalID(instances, node)
|
||||
if err != nil {
|
||||
glog.Errorf("Error getting data for node %s from cloud: %v", node.Name, err)
|
||||
continue
|
||||
@ -272,6 +298,12 @@ func (cnc *CloudNodeController) MonitorNode() {
|
||||
}
|
||||
}(node.Name)
|
||||
|
||||
} else {
|
||||
// if taint exist remove taint
|
||||
err = controller.RemoveTaintOffNode(cnc.kubeClient, node.Name, node, ShutDownTaint)
|
||||
if err != nil {
|
||||
glog.Errorf("Error patching node taints: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -148,6 +148,115 @@ func TestEnsureNodeExistsByProviderIDOrNodeName(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func TestNodeShutdown(t *testing.T) {
|
||||
|
||||
testCases := []struct {
|
||||
testName string
|
||||
node *v1.Node
|
||||
existsByProviderID bool
|
||||
shutdown bool
|
||||
}{
|
||||
{
|
||||
testName: "node shutdowned add taint",
|
||||
existsByProviderID: true,
|
||||
shutdown: true,
|
||||
node: &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node0",
|
||||
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
Spec: v1.NodeSpec{
|
||||
ProviderID: "node0",
|
||||
},
|
||||
Status: v1.NodeStatus{
|
||||
Conditions: []v1.NodeCondition{
|
||||
{
|
||||
Type: v1.NodeReady,
|
||||
Status: v1.ConditionUnknown,
|
||||
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
|
||||
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
testName: "node started after shutdown remove taint",
|
||||
existsByProviderID: true,
|
||||
shutdown: false,
|
||||
node: &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node0",
|
||||
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
Spec: v1.NodeSpec{
|
||||
ProviderID: "node0",
|
||||
Taints: []v1.Taint{
|
||||
{
|
||||
Key: algorithm.TaintNodeShutdown,
|
||||
Effect: v1.TaintEffectNoSchedule,
|
||||
},
|
||||
},
|
||||
},
|
||||
Status: v1.NodeStatus{
|
||||
Conditions: []v1.NodeCondition{
|
||||
{
|
||||
Type: v1.NodeReady,
|
||||
Status: v1.ConditionTrue,
|
||||
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
|
||||
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.testName, func(t *testing.T) {
|
||||
fc := &fakecloud.FakeCloud{
|
||||
ExistsByProviderID: tc.existsByProviderID,
|
||||
NodeShutdown: tc.shutdown,
|
||||
}
|
||||
fnh := &testutil.FakeNodeHandler{
|
||||
Existing: []*v1.Node{tc.node},
|
||||
Clientset: fake.NewSimpleClientset(),
|
||||
PatchWaitChan: make(chan struct{}),
|
||||
}
|
||||
|
||||
factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc())
|
||||
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
cloudNodeController := &CloudNodeController{
|
||||
kubeClient: fnh,
|
||||
nodeInformer: factory.Core().V1().Nodes(),
|
||||
cloud: fc,
|
||||
nodeMonitorPeriod: 1 * time.Second,
|
||||
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}),
|
||||
nodeStatusUpdateFrequency: 1 * time.Second,
|
||||
}
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
|
||||
cloudNodeController.Run()
|
||||
|
||||
select {
|
||||
case <-fnh.PatchWaitChan:
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Errorf("Timed out waiting %v for node to be updated", wait.ForeverTestTimeout)
|
||||
}
|
||||
|
||||
assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated")
|
||||
if tc.shutdown {
|
||||
assert.Equal(t, 1, len(fnh.UpdatedNodes[0].Spec.Taints), "Node Taint was not added")
|
||||
assert.Equal(t, "node.cloudprovider.kubernetes.io/shutdown", fnh.UpdatedNodes[0].Spec.Taints[0].Key, "Node Taint key is not correct")
|
||||
} else {
|
||||
assert.Equal(t, 0, len(fnh.UpdatedNodes[0].Spec.Taints), "Node Taint was not removed after node is back in ready state")
|
||||
}
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// This test checks that the node is deleted when kubelet stops reporting
|
||||
// and cloud provider says node is gone
|
||||
func TestNodeDeleted(t *testing.T) {
|
||||
|
@ -67,6 +67,7 @@ type FakeNodeHandler struct {
|
||||
// Synchronization
|
||||
lock sync.Mutex
|
||||
DeleteWaitChan chan struct{}
|
||||
PatchWaitChan chan struct{}
|
||||
}
|
||||
|
||||
// FakeLegacyHandler is a fake implemtation of CoreV1Interface.
|
||||
@ -270,6 +271,9 @@ func (m *FakeNodeHandler) Patch(name string, pt types.PatchType, data []byte, su
|
||||
m.lock.Lock()
|
||||
defer func() {
|
||||
m.RequestCount++
|
||||
if m.PatchWaitChan != nil {
|
||||
m.PatchWaitChan <- struct{}{}
|
||||
}
|
||||
m.lock.Unlock()
|
||||
}()
|
||||
var nodeCopy v1.Node
|
||||
|
@ -61,4 +61,9 @@ const (
|
||||
// from the cloud-controller-manager intitializes this node, and then removes
|
||||
// the taint
|
||||
TaintExternalCloudProvider = "node.cloudprovider.kubernetes.io/uninitialized"
|
||||
|
||||
// When node is shutdown in external cloud provider
|
||||
// shutdown flag is needed for immediately volume detach
|
||||
// after node comes back, the taint is removed
|
||||
TaintNodeShutdown = "node.cloudprovider.kubernetes.io/shutdown"
|
||||
)
|
||||
|
@ -732,6 +732,10 @@ func (instances *instances) InstanceExistsByProviderID(ctx context.Context, prov
|
||||
return false, errors.New("unimplemented")
|
||||
}
|
||||
|
||||
func (instances *instances) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) {
|
||||
return false, errors.New("unimplemented")
|
||||
}
|
||||
|
||||
func (instances *instances) List(filter string) ([]types.NodeName, error) {
|
||||
return []types.NodeName{}, errors.New("Not implemented")
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user