From 4083c7d49c813846c5071f150ddb1a2a6052a197 Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Thu, 9 Jan 2020 09:41:53 -0800 Subject: [PATCH] Fix a flaky scheduler preemption e2e - Use preemptor pod's Status.NominatedNodeName to signal success of the Preemption behavior - Optimize the test to eliminate unnecessary Pods creation - Increase timeout from 1 minute to 2 minutes --- test/e2e/framework/replicaset/wait.go | 9 +- test/e2e/scheduling/preemption.go | 147 ++++++++++++++++---------- 2 files changed, 102 insertions(+), 54 deletions(-) diff --git a/test/e2e/framework/replicaset/wait.go b/test/e2e/framework/replicaset/wait.go index d214d0cba00..2ae50709ffa 100644 --- a/test/e2e/framework/replicaset/wait.go +++ b/test/e2e/framework/replicaset/wait.go @@ -18,6 +18,7 @@ package replicaset import ( "fmt" + "time" appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -43,8 +44,14 @@ func WaitForReadyReplicaSet(c clientset.Interface, ns, name string) error { // WaitForReplicaSetTargetAvailableReplicas waits for .status.availableReplicas of a RS to equal targetReplicaNum func WaitForReplicaSetTargetAvailableReplicas(c clientset.Interface, replicaSet *appsv1.ReplicaSet, targetReplicaNum int32) error { + return WaitForReplicaSetTargetAvailableReplicasWithTimeout(c, replicaSet, targetReplicaNum, framework.PollShortTimeout) +} + +// WaitForReplicaSetTargetAvailableReplicasWithTimeout waits for .status.availableReplicas of a RS to equal targetReplicaNum +// with given timeout. +func WaitForReplicaSetTargetAvailableReplicasWithTimeout(c clientset.Interface, replicaSet *appsv1.ReplicaSet, targetReplicaNum int32, timeout time.Duration) error { desiredGeneration := replicaSet.Generation - err := wait.PollImmediate(framework.Poll, framework.PollShortTimeout, func() (bool, error) { + err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) { rs, err := c.AppsV1().ReplicaSets(replicaSet.Namespace).Get(replicaSet.Name, metav1.GetOptions{}) if err != nil { return false, err diff --git a/test/e2e/scheduling/preemption.go b/test/e2e/scheduling/preemption.go index c0b8cdc6a76..6e4c645d248 100644 --- a/test/e2e/scheduling/preemption.go +++ b/test/e2e/scheduling/preemption.go @@ -19,8 +19,10 @@ package scheduling import ( "fmt" "strings" + "sync/atomic" "time" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" appsv1 "k8s.io/api/apps/v1" @@ -289,7 +291,7 @@ var _ = SIGDescribe("PreemptionExecutionPath", func() { nodeCopy := node.DeepCopy() // force it to update nodeCopy.ResourceVersion = "0" - nodeCopy.Status.Capacity[fakecpu] = resource.MustParse("800") + nodeCopy.Status.Capacity[fakecpu] = resource.MustParse("1000") node, err = cs.CoreV1().Nodes().UpdateStatus(nodeCopy) framework.ExpectNoError(err) @@ -307,8 +309,8 @@ var _ = SIGDescribe("PreemptionExecutionPath", func() { } }) - ginkgo.It("runs ReplicaSets to verify preemption running path [Flaky]", func() { - podNamesSeen := make(map[string]struct{}) + ginkgo.It("runs ReplicaSets to verify preemption running path", func() { + podNamesSeen := []int32{0, 0, 0} stopCh := make(chan struct{}) // create a pod controller to list/watch pod events from the test framework namespace @@ -327,7 +329,13 @@ var _ = SIGDescribe("PreemptionExecutionPath", func() { cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { if pod, ok := obj.(*v1.Pod); ok { - podNamesSeen[pod.Name] = struct{}{} + if strings.HasPrefix(pod.Name, "rs-pod1") { + atomic.AddInt32(&podNamesSeen[0], 1) + } else if strings.HasPrefix(pod.Name, "rs-pod2") { + atomic.AddInt32(&podNamesSeen[1], 1) + } else if strings.HasPrefix(pod.Name, "rs-pod3") { + atomic.AddInt32(&podNamesSeen[2], 1) + } } }, }, @@ -335,10 +343,10 @@ var _ = SIGDescribe("PreemptionExecutionPath", func() { go podController.Run(stopCh) defer close(stopCh) - // prepare four ReplicaSet + // prepare three ReplicaSet rsConfs := []pauseRSConfig{ { - Replicas: int32(5), + Replicas: int32(1), PodConfig: pausePodConfig{ Name: "pod1", Namespace: ns, @@ -346,13 +354,13 @@ var _ = SIGDescribe("PreemptionExecutionPath", func() { PriorityClassName: "p1", NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel}, Resources: &v1.ResourceRequirements{ - Requests: v1.ResourceList{fakecpu: resource.MustParse("40")}, - Limits: v1.ResourceList{fakecpu: resource.MustParse("40")}, + Requests: v1.ResourceList{fakecpu: resource.MustParse("200")}, + Limits: v1.ResourceList{fakecpu: resource.MustParse("200")}, }, }, }, { - Replicas: int32(4), + Replicas: int32(1), PodConfig: pausePodConfig{ Name: "pod2", Namespace: ns, @@ -360,13 +368,13 @@ var _ = SIGDescribe("PreemptionExecutionPath", func() { PriorityClassName: "p2", NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel}, Resources: &v1.ResourceRequirements{ - Requests: v1.ResourceList{fakecpu: resource.MustParse("50")}, - Limits: v1.ResourceList{fakecpu: resource.MustParse("50")}, + Requests: v1.ResourceList{fakecpu: resource.MustParse("300")}, + Limits: v1.ResourceList{fakecpu: resource.MustParse("300")}, }, }, }, { - Replicas: int32(4), + Replicas: int32(1), PodConfig: pausePodConfig{ Name: "pod3", Namespace: ns, @@ -374,61 +382,68 @@ var _ = SIGDescribe("PreemptionExecutionPath", func() { PriorityClassName: "p3", NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel}, Resources: &v1.ResourceRequirements{ - Requests: v1.ResourceList{fakecpu: resource.MustParse("95")}, - Limits: v1.ResourceList{fakecpu: resource.MustParse("95")}, - }, - }, - }, - { - Replicas: int32(1), - PodConfig: pausePodConfig{ - Name: "pod4", - Namespace: ns, - Labels: map[string]string{"name": "pod4"}, - PriorityClassName: "p4", - NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel}, - Resources: &v1.ResourceRequirements{ - Requests: v1.ResourceList{fakecpu: resource.MustParse("400")}, - Limits: v1.ResourceList{fakecpu: resource.MustParse("400")}, + Requests: v1.ResourceList{fakecpu: resource.MustParse("450")}, + Limits: v1.ResourceList{fakecpu: resource.MustParse("450")}, }, }, }, } - // create ReplicaSet{1,2,3} so as to occupy 780/800 fake resource - rsNum := len(rsConfs) - for i := 0; i < rsNum-1; i++ { + // create ReplicaSet{1,2,3} so as to occupy 950/1000 fake resource + for i := range rsConfs { runPauseRS(f, rsConfs[i]) } framework.Logf("pods created so far: %v", podNamesSeen) framework.Logf("length of pods created so far: %v", len(podNamesSeen)) - // create ReplicaSet4 - // if runPauseRS failed, it means ReplicaSet4 cannot be scheduled even after 1 minute - // which is unacceptable - runPauseRS(f, rsConfs[rsNum-1]) + // create a Preemptor Pod + preemptorPodConf := pausePodConfig{ + Name: "pod4", + Namespace: ns, + Labels: map[string]string{"name": "pod4"}, + PriorityClassName: "p4", + NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel}, + Resources: &v1.ResourceRequirements{ + Requests: v1.ResourceList{fakecpu: resource.MustParse("500")}, + Limits: v1.ResourceList{fakecpu: resource.MustParse("500")}, + }, + } + preemptorPod := createPod(f, preemptorPodConf) + waitForPreemptingWithTimeout(f, preemptorPod, framework.PodGetTimeout) framework.Logf("pods created so far: %v", podNamesSeen) - framework.Logf("length of pods created so far: %v", len(podNamesSeen)) - // count pods number of ReplicaSet{1,2,3}, if it's more than expected replicas - // then it denotes its pods have been over-preempted - // "*2" means pods of ReplicaSet{1,2} are expected to be only preempted once - maxRSPodsSeen := []int{5 * 2, 4 * 2, 4} - rsPodsSeen := []int{0, 0, 0} - for podName := range podNamesSeen { - if strings.HasPrefix(podName, "rs-pod1") { - rsPodsSeen[0]++ - } else if strings.HasPrefix(podName, "rs-pod2") { - rsPodsSeen[1]++ - } else if strings.HasPrefix(podName, "rs-pod3") { - rsPodsSeen[2]++ + // count pods number of ReplicaSet{1,2,3}: + // - if it's more than expected replicas, it denotes its pods have been over-preempted + // - if it's less than expected replicas, it denotes its pods are under-preempted + // "*2" means pods of ReplicaSet{1,2} are expected to be only preempted once. + expectedRSPods := []int32{1 * 2, 1 * 2, 1} + err := wait.Poll(framework.Poll, framework.PollShortTimeout, func() (bool, error) { + for i := 0; i < len(podNamesSeen); i++ { + got := atomic.LoadInt32(&podNamesSeen[i]) + if got < expectedRSPods[i] { + framework.Logf("waiting for rs%d to observe %d pod creations, got %d", i+1, expectedRSPods[i], got) + return false, nil + } else if got > expectedRSPods[i] { + return false, fmt.Errorf("rs%d had more than %d pods created: %d", i+1, expectedRSPods[i], got) + } } + return true, nil + }) + if err != nil { + framework.Logf("pods created so far: %v", podNamesSeen) + framework.Failf("failed pod observation expectations: %v", err) } - for i, got := range rsPodsSeen { - expected := maxRSPodsSeen[i] - if got > expected { - framework.Failf("pods of ReplicaSet%d have been over-preempted: expect %v pod names, but got %d", i+1, expected, got) + + // If logic continues to here, we should do a final check to ensure within a time period, + // the state is stable; otherwise, pods may be over-preempted. + time.Sleep(5 * time.Second) + for i := 0; i < len(podNamesSeen); i++ { + got := atomic.LoadInt32(&podNamesSeen[i]) + if got < expectedRSPods[i] { + framework.Failf("pods of ReplicaSet%d have been under-preempted: expect %v pod names, but got %d", i+1, expectedRSPods[i], got) + } else if got > expectedRSPods[i] { + framework.Failf("pods of ReplicaSet%d have been over-preempted: expect %v pod names, but got %d", i+1, expectedRSPods[i], got) } } }) @@ -472,6 +487,32 @@ func createPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSe func runPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet { rs := createPauseRS(f, conf) - framework.ExpectNoError(replicaset.WaitForReplicaSetTargetAvailableReplicas(f.ClientSet, rs, conf.Replicas)) + framework.ExpectNoError(replicaset.WaitForReplicaSetTargetAvailableReplicasWithTimeout(f.ClientSet, rs, conf.Replicas, framework.PodGetTimeout)) return rs } + +func createPod(f *framework.Framework, conf pausePodConfig) *v1.Pod { + namespace := conf.Namespace + if len(namespace) == 0 { + namespace = f.Namespace.Name + } + pod, err := f.ClientSet.CoreV1().Pods(namespace).Create(initPausePod(f, conf)) + framework.ExpectNoError(err) + return pod +} + +// waitForPreemptingWithTimeout verifies if 'pod' is preempting within 'timeout', specifically it checks +// if the 'spec.NodeName' field of preemptor 'pod' has been set. +func waitForPreemptingWithTimeout(f *framework.Framework, pod *v1.Pod, timeout time.Duration) { + err := wait.Poll(2*time.Second, timeout, func() (bool, error) { + pod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if len(pod.Spec.NodeName) > 0 { + return true, nil + } + return false, err + }) + framework.ExpectNoError(err, "pod %v/%v failed to preempt other pods", pod.Namespace, pod.Name) +}