Fix a flaky scheduler preemption e2e

- Use preemptor pod's Status.NominatedNodeName to signal success of the Preemption behavior
- Optimize the test to eliminate unnecessary Pods creation
- Increase timeout from 1 minute to 2 minutes
This commit is contained in:
Wei Huang 2020-01-09 09:41:53 -08:00
parent 1c51c4410f
commit 4083c7d49c
No known key found for this signature in database
GPG Key ID: BE5E9752F8B6E005
2 changed files with 102 additions and 54 deletions

View File

@ -18,6 +18,7 @@ package replicaset
import (
"fmt"
"time"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -43,8 +44,14 @@ func WaitForReadyReplicaSet(c clientset.Interface, ns, name string) error {
// WaitForReplicaSetTargetAvailableReplicas waits for .status.availableReplicas of a RS to equal targetReplicaNum
func WaitForReplicaSetTargetAvailableReplicas(c clientset.Interface, replicaSet *appsv1.ReplicaSet, targetReplicaNum int32) error {
return WaitForReplicaSetTargetAvailableReplicasWithTimeout(c, replicaSet, targetReplicaNum, framework.PollShortTimeout)
}
// WaitForReplicaSetTargetAvailableReplicasWithTimeout waits for .status.availableReplicas of a RS to equal targetReplicaNum
// with given timeout.
func WaitForReplicaSetTargetAvailableReplicasWithTimeout(c clientset.Interface, replicaSet *appsv1.ReplicaSet, targetReplicaNum int32, timeout time.Duration) error {
desiredGeneration := replicaSet.Generation
err := wait.PollImmediate(framework.Poll, framework.PollShortTimeout, func() (bool, error) {
err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) {
rs, err := c.AppsV1().ReplicaSets(replicaSet.Namespace).Get(replicaSet.Name, metav1.GetOptions{})
if err != nil {
return false, err

View File

@ -19,8 +19,10 @@ package scheduling
import (
"fmt"
"strings"
"sync/atomic"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
appsv1 "k8s.io/api/apps/v1"
@ -289,7 +291,7 @@ var _ = SIGDescribe("PreemptionExecutionPath", func() {
nodeCopy := node.DeepCopy()
// force it to update
nodeCopy.ResourceVersion = "0"
nodeCopy.Status.Capacity[fakecpu] = resource.MustParse("800")
nodeCopy.Status.Capacity[fakecpu] = resource.MustParse("1000")
node, err = cs.CoreV1().Nodes().UpdateStatus(nodeCopy)
framework.ExpectNoError(err)
@ -307,8 +309,8 @@ var _ = SIGDescribe("PreemptionExecutionPath", func() {
}
})
ginkgo.It("runs ReplicaSets to verify preemption running path [Flaky]", func() {
podNamesSeen := make(map[string]struct{})
ginkgo.It("runs ReplicaSets to verify preemption running path", func() {
podNamesSeen := []int32{0, 0, 0}
stopCh := make(chan struct{})
// create a pod controller to list/watch pod events from the test framework namespace
@ -327,7 +329,13 @@ var _ = SIGDescribe("PreemptionExecutionPath", func() {
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if pod, ok := obj.(*v1.Pod); ok {
podNamesSeen[pod.Name] = struct{}{}
if strings.HasPrefix(pod.Name, "rs-pod1") {
atomic.AddInt32(&podNamesSeen[0], 1)
} else if strings.HasPrefix(pod.Name, "rs-pod2") {
atomic.AddInt32(&podNamesSeen[1], 1)
} else if strings.HasPrefix(pod.Name, "rs-pod3") {
atomic.AddInt32(&podNamesSeen[2], 1)
}
}
},
},
@ -335,10 +343,10 @@ var _ = SIGDescribe("PreemptionExecutionPath", func() {
go podController.Run(stopCh)
defer close(stopCh)
// prepare four ReplicaSet
// prepare three ReplicaSet
rsConfs := []pauseRSConfig{
{
Replicas: int32(5),
Replicas: int32(1),
PodConfig: pausePodConfig{
Name: "pod1",
Namespace: ns,
@ -346,13 +354,13 @@ var _ = SIGDescribe("PreemptionExecutionPath", func() {
PriorityClassName: "p1",
NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
Resources: &v1.ResourceRequirements{
Requests: v1.ResourceList{fakecpu: resource.MustParse("40")},
Limits: v1.ResourceList{fakecpu: resource.MustParse("40")},
Requests: v1.ResourceList{fakecpu: resource.MustParse("200")},
Limits: v1.ResourceList{fakecpu: resource.MustParse("200")},
},
},
},
{
Replicas: int32(4),
Replicas: int32(1),
PodConfig: pausePodConfig{
Name: "pod2",
Namespace: ns,
@ -360,13 +368,13 @@ var _ = SIGDescribe("PreemptionExecutionPath", func() {
PriorityClassName: "p2",
NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
Resources: &v1.ResourceRequirements{
Requests: v1.ResourceList{fakecpu: resource.MustParse("50")},
Limits: v1.ResourceList{fakecpu: resource.MustParse("50")},
Requests: v1.ResourceList{fakecpu: resource.MustParse("300")},
Limits: v1.ResourceList{fakecpu: resource.MustParse("300")},
},
},
},
{
Replicas: int32(4),
Replicas: int32(1),
PodConfig: pausePodConfig{
Name: "pod3",
Namespace: ns,
@ -374,61 +382,68 @@ var _ = SIGDescribe("PreemptionExecutionPath", func() {
PriorityClassName: "p3",
NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
Resources: &v1.ResourceRequirements{
Requests: v1.ResourceList{fakecpu: resource.MustParse("95")},
Limits: v1.ResourceList{fakecpu: resource.MustParse("95")},
},
},
},
{
Replicas: int32(1),
PodConfig: pausePodConfig{
Name: "pod4",
Namespace: ns,
Labels: map[string]string{"name": "pod4"},
PriorityClassName: "p4",
NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
Resources: &v1.ResourceRequirements{
Requests: v1.ResourceList{fakecpu: resource.MustParse("400")},
Limits: v1.ResourceList{fakecpu: resource.MustParse("400")},
Requests: v1.ResourceList{fakecpu: resource.MustParse("450")},
Limits: v1.ResourceList{fakecpu: resource.MustParse("450")},
},
},
},
}
// create ReplicaSet{1,2,3} so as to occupy 780/800 fake resource
rsNum := len(rsConfs)
for i := 0; i < rsNum-1; i++ {
// create ReplicaSet{1,2,3} so as to occupy 950/1000 fake resource
for i := range rsConfs {
runPauseRS(f, rsConfs[i])
}
framework.Logf("pods created so far: %v", podNamesSeen)
framework.Logf("length of pods created so far: %v", len(podNamesSeen))
// create ReplicaSet4
// if runPauseRS failed, it means ReplicaSet4 cannot be scheduled even after 1 minute
// which is unacceptable
runPauseRS(f, rsConfs[rsNum-1])
// create a Preemptor Pod
preemptorPodConf := pausePodConfig{
Name: "pod4",
Namespace: ns,
Labels: map[string]string{"name": "pod4"},
PriorityClassName: "p4",
NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
Resources: &v1.ResourceRequirements{
Requests: v1.ResourceList{fakecpu: resource.MustParse("500")},
Limits: v1.ResourceList{fakecpu: resource.MustParse("500")},
},
}
preemptorPod := createPod(f, preemptorPodConf)
waitForPreemptingWithTimeout(f, preemptorPod, framework.PodGetTimeout)
framework.Logf("pods created so far: %v", podNamesSeen)
framework.Logf("length of pods created so far: %v", len(podNamesSeen))
// count pods number of ReplicaSet{1,2,3}, if it's more than expected replicas
// then it denotes its pods have been over-preempted
// "*2" means pods of ReplicaSet{1,2} are expected to be only preempted once
maxRSPodsSeen := []int{5 * 2, 4 * 2, 4}
rsPodsSeen := []int{0, 0, 0}
for podName := range podNamesSeen {
if strings.HasPrefix(podName, "rs-pod1") {
rsPodsSeen[0]++
} else if strings.HasPrefix(podName, "rs-pod2") {
rsPodsSeen[1]++
} else if strings.HasPrefix(podName, "rs-pod3") {
rsPodsSeen[2]++
// count pods number of ReplicaSet{1,2,3}:
// - if it's more than expected replicas, it denotes its pods have been over-preempted
// - if it's less than expected replicas, it denotes its pods are under-preempted
// "*2" means pods of ReplicaSet{1,2} are expected to be only preempted once.
expectedRSPods := []int32{1 * 2, 1 * 2, 1}
err := wait.Poll(framework.Poll, framework.PollShortTimeout, func() (bool, error) {
for i := 0; i < len(podNamesSeen); i++ {
got := atomic.LoadInt32(&podNamesSeen[i])
if got < expectedRSPods[i] {
framework.Logf("waiting for rs%d to observe %d pod creations, got %d", i+1, expectedRSPods[i], got)
return false, nil
} else if got > expectedRSPods[i] {
return false, fmt.Errorf("rs%d had more than %d pods created: %d", i+1, expectedRSPods[i], got)
}
}
return true, nil
})
if err != nil {
framework.Logf("pods created so far: %v", podNamesSeen)
framework.Failf("failed pod observation expectations: %v", err)
}
for i, got := range rsPodsSeen {
expected := maxRSPodsSeen[i]
if got > expected {
framework.Failf("pods of ReplicaSet%d have been over-preempted: expect %v pod names, but got %d", i+1, expected, got)
// If logic continues to here, we should do a final check to ensure within a time period,
// the state is stable; otherwise, pods may be over-preempted.
time.Sleep(5 * time.Second)
for i := 0; i < len(podNamesSeen); i++ {
got := atomic.LoadInt32(&podNamesSeen[i])
if got < expectedRSPods[i] {
framework.Failf("pods of ReplicaSet%d have been under-preempted: expect %v pod names, but got %d", i+1, expectedRSPods[i], got)
} else if got > expectedRSPods[i] {
framework.Failf("pods of ReplicaSet%d have been over-preempted: expect %v pod names, but got %d", i+1, expectedRSPods[i], got)
}
}
})
@ -472,6 +487,32 @@ func createPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSe
func runPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet {
rs := createPauseRS(f, conf)
framework.ExpectNoError(replicaset.WaitForReplicaSetTargetAvailableReplicas(f.ClientSet, rs, conf.Replicas))
framework.ExpectNoError(replicaset.WaitForReplicaSetTargetAvailableReplicasWithTimeout(f.ClientSet, rs, conf.Replicas, framework.PodGetTimeout))
return rs
}
func createPod(f *framework.Framework, conf pausePodConfig) *v1.Pod {
namespace := conf.Namespace
if len(namespace) == 0 {
namespace = f.Namespace.Name
}
pod, err := f.ClientSet.CoreV1().Pods(namespace).Create(initPausePod(f, conf))
framework.ExpectNoError(err)
return pod
}
// waitForPreemptingWithTimeout verifies if 'pod' is preempting within 'timeout', specifically it checks
// if the 'spec.NodeName' field of preemptor 'pod' has been set.
func waitForPreemptingWithTimeout(f *framework.Framework, pod *v1.Pod, timeout time.Duration) {
err := wait.Poll(2*time.Second, timeout, func() (bool, error) {
pod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
if len(pod.Spec.NodeName) > 0 {
return true, nil
}
return false, err
})
framework.ExpectNoError(err, "pod %v/%v failed to preempt other pods", pod.Namespace, pod.Name)
}