Merge pull request #63955 from k82cn/k8s_63897

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Taint node when initializing node.

Signed-off-by: Da K. Ma <klaus1982.cn@gmail.com>

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #63897 

**Release note**:
```release-note
If `TaintNodesByCondition` enabled, taint node with `TaintNodeUnschedulable` when
initializing node to avoid race condition.
```
This commit is contained in:
Kubernetes Submit Queue 2018-07-26 21:01:16 -07:00 committed by GitHub
commit ed58d0dfd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 108 additions and 10 deletions

View File

@ -103,6 +103,7 @@ go_library(
"//pkg/util/node:go_default_library",
"//pkg/util/oom:go_default_library",
"//pkg/util/removeall:go_default_library",
"//pkg/util/taints:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/csi:go_default_library",
"//pkg/volume/util:go_default_library",
@ -164,6 +165,7 @@ go_test(
deps = [
"//pkg/apis/core/install:go_default_library",
"//pkg/capabilities:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library",
"//pkg/kubelet/cadvisor/testing:go_default_library",
@ -195,8 +197,10 @@ go_test(
"//pkg/kubelet/util/queue:go_default_library",
"//pkg/kubelet/util/sliceutils:go_default_library",
"//pkg/kubelet/volumemanager:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/util/taints:go_default_library",
"//pkg/version:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/aws_ebs:go_default_library",

View File

@ -24,6 +24,7 @@ import (
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
@ -40,6 +41,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
nodeutil "k8s.io/kubernetes/pkg/util/node"
taintutil "k8s.io/kubernetes/pkg/util/taints"
volutil "k8s.io/kubernetes/pkg/volume/util"
)
@ -230,6 +232,21 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) {
}
nodeTaints = append(nodeTaints, taints...)
}
unschedulableTaint := v1.Taint{
Key: algorithm.TaintNodeUnschedulable,
Effect: v1.TaintEffectNoSchedule,
}
// If TaintNodesByCondition enabled, taint node with TaintNodeUnschedulable when initializing
// node to avoid race condition; refer to #63897 for more detail.
if utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition) {
if node.Spec.Unschedulable &&
!taintutil.TaintExists(nodeTaints, &unschedulableTaint) {
nodeTaints = append(nodeTaints, unschedulableTaint)
}
}
if kl.externalCloudProvider {
taint := v1.Taint{
Key: algorithm.TaintExternalCloudProvider,

View File

@ -42,16 +42,20 @@ import (
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes/fake"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
core "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/features"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/nodestatus"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
taintutil "k8s.io/kubernetes/pkg/util/taints"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/volume/util"
)
@ -126,6 +130,18 @@ func applyNodeStatusPatch(originalNode *v1.Node, patch []byte) (*v1.Node, error)
return updatedNode, nil
}
func notImplemented(action core.Action) (bool, runtime.Object, error) {
return true, nil, fmt.Errorf("no reaction implemented for %s", action)
}
func addNotImplatedReaction(kubeClient *fake.Clientset) {
if kubeClient == nil {
return
}
kubeClient.AddReactor("*", "*", notImplemented)
}
type localCM struct {
cm.ContainerManager
allocatableReservation v1.ResourceList
@ -835,9 +851,9 @@ func TestRegisterWithApiServer(t *testing.T) {
},
}, nil
})
kubeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, fmt.Errorf("no reaction implemented for %s", action)
})
addNotImplatedReaction(kubeClient)
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
@ -964,10 +980,6 @@ func TestTryRegisterWithApiServer(t *testing.T) {
},
}
notImplemented := func(action core.Action) (bool, runtime.Object, error) {
return true, nil, fmt.Errorf("no reaction implemented for %s", action)
}
for _, tc := range cases {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled is a don't-care for this test */)
defer testKubelet.Cleanup()
@ -990,9 +1002,7 @@ func TestTryRegisterWithApiServer(t *testing.T) {
kubeClient.AddReactor("delete", "nodes", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, tc.deleteError
})
kubeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
return notImplemented(action)
})
addNotImplatedReaction(kubeClient)
result := kubelet.tryRegisterWithAPIServer(tc.newNode)
require.Equal(t, tc.expectedResult, result, "test [%s]", tc.name)
@ -1501,3 +1511,53 @@ func TestValidateNodeIPParam(t *testing.T) {
}
}
}
func TestRegisterWithApiServerWithTaint(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
kubelet.machineInfo = machineInfo
var gotNode runtime.Object
kubeClient.AddReactor("create", "nodes", func(action core.Action) (bool, runtime.Object, error) {
createAction := action.(core.CreateAction)
gotNode = createAction.GetObject()
return true, gotNode, nil
})
addNotImplatedReaction(kubeClient)
// Make node to be unschedulable.
kubelet.registerSchedulable = false
forEachFeatureGate(t, []utilfeature.Feature{features.TaintNodesByCondition}, func(t *testing.T) {
// Reset kubelet status for each test.
kubelet.registrationCompleted = false
// Register node to apiserver.
kubelet.registerWithAPIServer()
// Check the unschedulable taint.
got := gotNode.(*v1.Node)
unschedulableTaint := &v1.Taint{
Key: algorithm.TaintNodeUnschedulable,
Effect: v1.TaintEffectNoSchedule,
}
require.Equal(t,
utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition),
taintutil.TaintExists(got.Spec.Taints, unschedulableTaint),
"test unschedulable taint for TaintNodesByCondition")
return
})
}

View File

@ -35,6 +35,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
@ -2249,6 +2250,22 @@ func runVolumeManager(kubelet *Kubelet) chan struct{} {
return stopCh
}
func forEachFeatureGate(t *testing.T, fs []utilfeature.Feature, tf func(t *testing.T)) {
for _, fg := range fs {
func() {
enabled := utilfeature.DefaultFeatureGate.Enabled(fg)
defer func() {
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%v=%t", fg, enabled))
}()
for _, f := range []bool{true, false} {
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%v=%t", fg, f))
t.Run(fmt.Sprintf("%v(%t)", fg, f), tf)
}
}()
}
}
// Sort pods by UID.
type podsByUID []*v1.Pod