From aac9f1cbaad3741688a8e3a221d420c63017d3d4 Mon Sep 17 00:00:00 2001 From: "Da K. Ma" Date: Thu, 17 May 2018 10:37:13 +0800 Subject: [PATCH] Taint node when initializing node. Signed-off-by: Da K. Ma --- pkg/kubelet/BUILD | 4 ++ pkg/kubelet/kubelet_node_status.go | 17 ++++++ pkg/kubelet/kubelet_node_status_test.go | 80 +++++++++++++++++++++---- pkg/kubelet/kubelet_test.go | 17 ++++++ 4 files changed, 108 insertions(+), 10 deletions(-) diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index c5474872841..9dabb507d79 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -103,6 +103,7 @@ go_library( "//pkg/util/node:go_default_library", "//pkg/util/oom:go_default_library", "//pkg/util/removeall:go_default_library", + "//pkg/util/taints:go_default_library", "//pkg/volume:go_default_library", "//pkg/volume/csi:go_default_library", "//pkg/volume/util:go_default_library", @@ -164,6 +165,7 @@ go_test( deps = [ "//pkg/apis/core/install:go_default_library", "//pkg/capabilities:go_default_library", + "//pkg/features:go_default_library", "//pkg/kubelet/apis:go_default_library", "//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library", "//pkg/kubelet/cadvisor/testing:go_default_library", @@ -195,8 +197,10 @@ go_test( "//pkg/kubelet/util/queue:go_default_library", "//pkg/kubelet/util/sliceutils:go_default_library", "//pkg/kubelet/volumemanager:go_default_library", + "//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/cache:go_default_library", "//pkg/util/mount:go_default_library", + "//pkg/util/taints:go_default_library", "//pkg/version:go_default_library", "//pkg/volume:go_default_library", "//pkg/volume/aws_ebs:go_default_library", diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index ce55bb53ece..5ef6e182659 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -24,6 +24,7 @@ import ( "time" "github.com/golang/glog" + "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -40,6 +41,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/scheduler/algorithm" nodeutil "k8s.io/kubernetes/pkg/util/node" + taintutil "k8s.io/kubernetes/pkg/util/taints" volutil "k8s.io/kubernetes/pkg/volume/util" ) @@ -230,6 +232,21 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) { } nodeTaints = append(nodeTaints, taints...) } + + unschedulableTaint := v1.Taint{ + Key: algorithm.TaintNodeUnschedulable, + Effect: v1.TaintEffectNoSchedule, + } + + // If TaintNodesByCondition enabled, taint node with TaintNodeUnschedulable when initializing + // node to avoid race condition; refer to #63897 for more detail. + if utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition) { + if node.Spec.Unschedulable && + !taintutil.TaintExists(nodeTaints, &unschedulableTaint) { + nodeTaints = append(nodeTaints, unschedulableTaint) + } + } + if kl.externalCloudProvider { taint := v1.Taint{ Key: algorithm.TaintExternalCloudProvider, diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index 229feece4fa..4424c123fcb 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -42,16 +42,20 @@ import ( "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes/fake" v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" core "k8s.io/client-go/testing" + "k8s.io/kubernetes/pkg/features" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing" "k8s.io/kubernetes/pkg/kubelet/cm" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/nodestatus" "k8s.io/kubernetes/pkg/kubelet/util/sliceutils" + "k8s.io/kubernetes/pkg/scheduler/algorithm" + taintutil "k8s.io/kubernetes/pkg/util/taints" "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/volume/util" ) @@ -126,6 +130,18 @@ func applyNodeStatusPatch(originalNode *v1.Node, patch []byte) (*v1.Node, error) return updatedNode, nil } +func notImplemented(action core.Action) (bool, runtime.Object, error) { + return true, nil, fmt.Errorf("no reaction implemented for %s", action) +} + +func addNotImplatedReaction(kubeClient *fake.Clientset) { + if kubeClient == nil { + return + } + + kubeClient.AddReactor("*", "*", notImplemented) +} + type localCM struct { cm.ContainerManager allocatableReservation v1.ResourceList @@ -835,9 +851,9 @@ func TestRegisterWithApiServer(t *testing.T) { }, }, nil }) - kubeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) { - return true, nil, fmt.Errorf("no reaction implemented for %s", action) - }) + + addNotImplatedReaction(kubeClient) + machineInfo := &cadvisorapi.MachineInfo{ MachineID: "123", SystemUUID: "abc", @@ -964,10 +980,6 @@ func TestTryRegisterWithApiServer(t *testing.T) { }, } - notImplemented := func(action core.Action) (bool, runtime.Object, error) { - return true, nil, fmt.Errorf("no reaction implemented for %s", action) - } - for _, tc := range cases { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled is a don't-care for this test */) defer testKubelet.Cleanup() @@ -990,9 +1002,7 @@ func TestTryRegisterWithApiServer(t *testing.T) { kubeClient.AddReactor("delete", "nodes", func(action core.Action) (bool, runtime.Object, error) { return true, nil, tc.deleteError }) - kubeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) { - return notImplemented(action) - }) + addNotImplatedReaction(kubeClient) result := kubelet.tryRegisterWithAPIServer(tc.newNode) require.Equal(t, tc.expectedResult, result, "test [%s]", tc.name) @@ -1501,3 +1511,53 @@ func TestValidateNodeIPParam(t *testing.T) { } } } + +func TestRegisterWithApiServerWithTaint(t *testing.T) { + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + defer testKubelet.Cleanup() + kubelet := testKubelet.kubelet + kubeClient := testKubelet.fakeKubeClient + + machineInfo := &cadvisorapi.MachineInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + NumCores: 2, + MemoryCapacity: 1024, + } + kubelet.machineInfo = machineInfo + + var gotNode runtime.Object + kubeClient.AddReactor("create", "nodes", func(action core.Action) (bool, runtime.Object, error) { + createAction := action.(core.CreateAction) + gotNode = createAction.GetObject() + return true, gotNode, nil + }) + + addNotImplatedReaction(kubeClient) + + // Make node to be unschedulable. + kubelet.registerSchedulable = false + + forEachFeatureGate(t, []utilfeature.Feature{features.TaintNodesByCondition}, func(t *testing.T) { + // Reset kubelet status for each test. + kubelet.registrationCompleted = false + + // Register node to apiserver. + kubelet.registerWithAPIServer() + + // Check the unschedulable taint. + got := gotNode.(*v1.Node) + unschedulableTaint := &v1.Taint{ + Key: algorithm.TaintNodeUnschedulable, + Effect: v1.TaintEffectNoSchedule, + } + + require.Equal(t, + utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition), + taintutil.TaintExists(got.Spec.Taints, unschedulableTaint), + "test unschedulable taint for TaintNodesByCondition") + + return + }) +} diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index bdf54d6d873..b21cd1c5d18 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -35,6 +35,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/flowcontrol" @@ -2249,6 +2250,22 @@ func runVolumeManager(kubelet *Kubelet) chan struct{} { return stopCh } +func forEachFeatureGate(t *testing.T, fs []utilfeature.Feature, tf func(t *testing.T)) { + for _, fg := range fs { + func() { + enabled := utilfeature.DefaultFeatureGate.Enabled(fg) + defer func() { + utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%v=%t", fg, enabled)) + }() + + for _, f := range []bool{true, false} { + utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%v=%t", fg, f)) + t.Run(fmt.Sprintf("%v(%t)", fg, f), tf) + } + }() + } +} + // Sort pods by UID. type podsByUID []*v1.Pod