diff --git a/test/integration/scheduler/taint_test.go b/test/integration/scheduler/taint_test.go index 153d10776e3..535b051baad 100644 --- a/test/integration/scheduler/taint_test.go +++ b/test/integration/scheduler/taint_test.go @@ -19,11 +19,13 @@ package scheduler // This file tests the Taint feature. import ( + "errors" "fmt" "testing" "time" v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -584,6 +586,7 @@ func TestTaintBasedEvictions(t *testing.T) { nodeCount := 3 zero := int64(0) gracePeriod := int64(1) + heartbeatInternal := time.Second * 2 testPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{Name: "testpod1", DeletionGracePeriodSeconds: &zero}, Spec: v1.PodSpec{ @@ -657,7 +660,6 @@ func TestTaintBasedEvictions(t *testing.T) { for i, test := range tests { t.Run(test.name, func(t *testing.T) { context := initTestMaster(t, "taint-based-evictions", admission) - defer cleanupTest(t, context) // Build clientset and informers for controllers. externalClientset := kubernetes.NewForConfigOrDie(&restclient.Config{ @@ -669,6 +671,7 @@ func TestTaintBasedEvictions(t *testing.T) { podTolerations.SetExternalKubeInformerFactory(externalInformers) context = initTestScheduler(t, context, true, nil) + defer cleanupTest(t, context) cs := context.clientSet informers := context.informerFactory _, err := cs.CoreV1().Namespaces().Create(context.ns) @@ -726,8 +729,9 @@ func TestTaintBasedEvictions(t *testing.T) { Allocatable: nodeRes, Conditions: []v1.NodeCondition{ { - Type: v1.NodeReady, - Status: v1.ConditionTrue, + Type: v1.NodeReady, + Status: v1.ConditionTrue, + LastHeartbeatTime: metav1.Now(), }, }, }, @@ -737,33 +741,6 @@ func TestTaintBasedEvictions(t *testing.T) { } } - // Regularly send heartbeat event to APIServer so that the cluster doesn't enter fullyDisruption mode. - // TODO(Huang-Wei): use "NodeDisruptionExclusion" feature to simply the below logic when it's beta. - var heartbeatChans []chan struct{} - for i := 0; i < nodeCount; i++ { - heartbeatChans = append(heartbeatChans, make(chan struct{})) - } - for i := 0; i < nodeCount; i++ { - // Spin up goroutines to send heartbeat event to APIServer periodically. - go func(i int) { - for { - select { - case <-heartbeatChans[i]: - return - case <-time.Tick(2 * time.Second): - nodes[i].Status.Conditions = []v1.NodeCondition{ - { - Type: v1.NodeReady, - Status: v1.ConditionTrue, - LastHeartbeatTime: metav1.Now(), - }, - } - updateNodeStatus(cs, nodes[i]) - } - } - }(i) - } - neededNode := nodes[1] if test.pod != nil { test.pod.Name = fmt.Sprintf("testpod-%d", i) @@ -790,18 +767,53 @@ func TestTaintBasedEvictions(t *testing.T) { } } + // Regularly send heartbeat event to APIServer so that the cluster doesn't enter fullyDisruption mode. + // TODO(Huang-Wei): use "NodeDisruptionExclusion" feature to simply the below logic when it's beta. for i := 0; i < nodeCount; i++ { - // Stop the neededNode's heartbeat goroutine. - if neededNode.Name == fmt.Sprintf("node-%d", i) { - heartbeatChans[i] <- struct{}{} - break + var conditions []v1.NodeCondition + // If current node is not + if neededNode.Name != nodes[i].Name { + conditions = []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + }, + } + } else { + c, err := nodeReadyStatus(test.nodeConditions) + if err != nil { + t.Error(err) + } + // Need to distinguish NodeReady/False and NodeReady/Unknown. + // If we try to update the node with condition NotReady/False, i.e. expect a NotReady:NoExecute taint + // we need to keep sending the update event to keep it alive, rather than just sending once. + if c == v1.ConditionFalse { + conditions = test.nodeConditions + } else if c == v1.ConditionUnknown { + // If it's expected to update the node with condition NotReady/Unknown, + // i.e. expect a Unreachable:NoExecute taint, + // we need to only send the update event once to simulate the network unreachable scenario. + nodeCopy := nodeCopyWithConditions(nodes[i], test.nodeConditions) + if err := updateNodeStatus(cs, nodeCopy); err != nil && !apierrors.IsNotFound(err) { + t.Errorf("Cannot update node: %v", err) + } + continue + } } - } - neededNode.Status.Conditions = test.nodeConditions - // Update node condition. - err = updateNodeStatus(cs, neededNode) - if err != nil { - t.Fatalf("Cannot update node: %v", err) + // Keeping sending NodeReady/True or NodeReady/False events. + go func(i int) { + for { + select { + case <-context.ctx.Done(): + return + case <-time.Tick(heartbeatInternal): + nodeCopy := nodeCopyWithConditions(nodes[i], conditions) + if err := updateNodeStatus(cs, nodeCopy); err != nil && !apierrors.IsNotFound(err) { + t.Errorf("Cannot update node: %v", err) + } + } + } + }(i) } if err := waitForNodeTaints(cs, neededNode, test.nodeTaints); err != nil { @@ -826,10 +838,6 @@ func TestTaintBasedEvictions(t *testing.T) { } cleanupPods(cs, t, []*v1.Pod{test.pod}) } - // Close all heartbeat channels. - for i := 0; i < nodeCount; i++ { - close(heartbeatChans[i]) - } cleanupNodes(cs, t) waitForSchedulerCacheCleanup(context.scheduler, t) }) @@ -844,3 +852,26 @@ func getTolerationSeconds(tolerations []v1.Toleration) (int64, error) { } return 0, fmt.Errorf("cannot find toleration") } + +// nodeReadyStatus returns the status of first condition with type NodeReady. +// If none of the condition is of type NodeReady, returns an error. +func nodeReadyStatus(conditions []v1.NodeCondition) (v1.ConditionStatus, error) { + for _, c := range conditions { + if c.Type != v1.NodeReady { + continue + } + // Just return the first condition with type NodeReady + return c.Status, nil + } + return v1.ConditionFalse, errors.New("None of the conditions is of type NodeReady") +} + +func nodeCopyWithConditions(node *v1.Node, conditions []v1.NodeCondition) *v1.Node { + copy := node.DeepCopy() + copy.ResourceVersion = "0" + copy.Status.Conditions = conditions + for i := range copy.Status.Conditions { + copy.Status.Conditions[i].LastHeartbeatTime = metav1.Now() + } + return copy +}