Update test logic to simulate NodeReady/False and NodeReady/Unknown events correctly

- optimize code to use one loop to spin up goroutines
- add `defer cleanupTest()` to avoid goroutine leaks
- use only one heartbeat channel
This commit is contained in:
Wei Huang 2019-11-07 15:59:20 -08:00
parent bef8d426f2
commit b6b92b6075
No known key found for this signature in database
GPG Key ID: BE5E9752F8B6E005

View File

@ -19,11 +19,13 @@ package scheduler
// This file tests the Taint feature.
import (
"errors"
"fmt"
"testing"
"time"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -584,6 +586,7 @@ func TestTaintBasedEvictions(t *testing.T) {
nodeCount := 3
zero := int64(0)
gracePeriod := int64(1)
heartbeatInternal := time.Second * 2
testPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "testpod1", DeletionGracePeriodSeconds: &zero},
Spec: v1.PodSpec{
@ -657,7 +660,6 @@ func TestTaintBasedEvictions(t *testing.T) {
for i, test := range tests {
t.Run(test.name, func(t *testing.T) {
context := initTestMaster(t, "taint-based-evictions", admission)
defer cleanupTest(t, context)
// Build clientset and informers for controllers.
externalClientset := kubernetes.NewForConfigOrDie(&restclient.Config{
@ -669,6 +671,7 @@ func TestTaintBasedEvictions(t *testing.T) {
podTolerations.SetExternalKubeInformerFactory(externalInformers)
context = initTestScheduler(t, context, true, nil)
defer cleanupTest(t, context)
cs := context.clientSet
informers := context.informerFactory
_, err := cs.CoreV1().Namespaces().Create(context.ns)
@ -726,8 +729,9 @@ func TestTaintBasedEvictions(t *testing.T) {
Allocatable: nodeRes,
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: metav1.Now(),
},
},
},
@ -737,33 +741,6 @@ func TestTaintBasedEvictions(t *testing.T) {
}
}
// Regularly send heartbeat event to APIServer so that the cluster doesn't enter fullyDisruption mode.
// TODO(Huang-Wei): use "NodeDisruptionExclusion" feature to simply the below logic when it's beta.
var heartbeatChans []chan struct{}
for i := 0; i < nodeCount; i++ {
heartbeatChans = append(heartbeatChans, make(chan struct{}))
}
for i := 0; i < nodeCount; i++ {
// Spin up <nodeCount> goroutines to send heartbeat event to APIServer periodically.
go func(i int) {
for {
select {
case <-heartbeatChans[i]:
return
case <-time.Tick(2 * time.Second):
nodes[i].Status.Conditions = []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: metav1.Now(),
},
}
updateNodeStatus(cs, nodes[i])
}
}
}(i)
}
neededNode := nodes[1]
if test.pod != nil {
test.pod.Name = fmt.Sprintf("testpod-%d", i)
@ -790,18 +767,53 @@ func TestTaintBasedEvictions(t *testing.T) {
}
}
// Regularly send heartbeat event to APIServer so that the cluster doesn't enter fullyDisruption mode.
// TODO(Huang-Wei): use "NodeDisruptionExclusion" feature to simply the below logic when it's beta.
for i := 0; i < nodeCount; i++ {
// Stop the neededNode's heartbeat goroutine.
if neededNode.Name == fmt.Sprintf("node-%d", i) {
heartbeatChans[i] <- struct{}{}
break
var conditions []v1.NodeCondition
// If current node is not <neededNode>
if neededNode.Name != nodes[i].Name {
conditions = []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
}
} else {
c, err := nodeReadyStatus(test.nodeConditions)
if err != nil {
t.Error(err)
}
// Need to distinguish NodeReady/False and NodeReady/Unknown.
// If we try to update the node with condition NotReady/False, i.e. expect a NotReady:NoExecute taint
// we need to keep sending the update event to keep it alive, rather than just sending once.
if c == v1.ConditionFalse {
conditions = test.nodeConditions
} else if c == v1.ConditionUnknown {
// If it's expected to update the node with condition NotReady/Unknown,
// i.e. expect a Unreachable:NoExecute taint,
// we need to only send the update event once to simulate the network unreachable scenario.
nodeCopy := nodeCopyWithConditions(nodes[i], test.nodeConditions)
if err := updateNodeStatus(cs, nodeCopy); err != nil && !apierrors.IsNotFound(err) {
t.Errorf("Cannot update node: %v", err)
}
continue
}
}
}
neededNode.Status.Conditions = test.nodeConditions
// Update node condition.
err = updateNodeStatus(cs, neededNode)
if err != nil {
t.Fatalf("Cannot update node: %v", err)
// Keeping sending NodeReady/True or NodeReady/False events.
go func(i int) {
for {
select {
case <-context.ctx.Done():
return
case <-time.Tick(heartbeatInternal):
nodeCopy := nodeCopyWithConditions(nodes[i], conditions)
if err := updateNodeStatus(cs, nodeCopy); err != nil && !apierrors.IsNotFound(err) {
t.Errorf("Cannot update node: %v", err)
}
}
}
}(i)
}
if err := waitForNodeTaints(cs, neededNode, test.nodeTaints); err != nil {
@ -826,10 +838,6 @@ func TestTaintBasedEvictions(t *testing.T) {
}
cleanupPods(cs, t, []*v1.Pod{test.pod})
}
// Close all heartbeat channels.
for i := 0; i < nodeCount; i++ {
close(heartbeatChans[i])
}
cleanupNodes(cs, t)
waitForSchedulerCacheCleanup(context.scheduler, t)
})
@ -844,3 +852,26 @@ func getTolerationSeconds(tolerations []v1.Toleration) (int64, error) {
}
return 0, fmt.Errorf("cannot find toleration")
}
// nodeReadyStatus returns the status of first condition with type NodeReady.
// If none of the condition is of type NodeReady, returns an error.
func nodeReadyStatus(conditions []v1.NodeCondition) (v1.ConditionStatus, error) {
for _, c := range conditions {
if c.Type != v1.NodeReady {
continue
}
// Just return the first condition with type NodeReady
return c.Status, nil
}
return v1.ConditionFalse, errors.New("None of the conditions is of type NodeReady")
}
func nodeCopyWithConditions(node *v1.Node, conditions []v1.NodeCondition) *v1.Node {
copy := node.DeepCopy()
copy.ResourceVersion = "0"
copy.Status.Conditions = conditions
for i := range copy.Status.Conditions {
copy.Status.Conditions[i].LastHeartbeatTime = metav1.Now()
}
return copy
}