From 2775a52e7a4f91777b7f45632cd46a4e8417847a Mon Sep 17 00:00:00 2001 From: Bobby Salamat Date: Wed, 15 Mar 2017 14:42:19 -0700 Subject: [PATCH] Fix waitForScheduler in scheduer predicates e2e tests --- test/e2e/common/events.go | 6 +- test/e2e/scheduling/predicates.go | 98 ++++++++++++++++++------------- 2 files changed, 60 insertions(+), 44 deletions(-) diff --git a/test/e2e/common/events.go b/test/e2e/common/events.go index aa1192eb36f..5d599260b6c 100644 --- a/test/e2e/common/events.go +++ b/test/e2e/common/events.go @@ -34,9 +34,11 @@ import ( . "github.com/onsi/gomega" ) +type Action func() error + // Returns true if a node update matching the predicate was emitted from the // system after performing the supplied action. -func ObserveNodeUpdateAfterAction(f *framework.Framework, nodeName string, nodePredicate func(*v1.Node) bool, action func() error) (bool, error) { +func ObserveNodeUpdateAfterAction(f *framework.Framework, nodeName string, nodePredicate func(*v1.Node) bool, action Action) (bool, error) { observedMatchingNode := false nodeSelector := fields.OneTermEqualSelector("metadata.name", nodeName) informerStartedChan := make(chan struct{}) @@ -94,7 +96,7 @@ func ObserveNodeUpdateAfterAction(f *framework.Framework, nodeName string, nodeP // Returns true if an event matching the predicate was emitted from the system // after performing the supplied action. -func ObserveEventAfterAction(f *framework.Framework, eventPredicate func(*v1.Event) bool, action func() error) (bool, error) { +func ObserveEventAfterAction(f *framework.Framework, eventPredicate func(*v1.Event) bool, action Action) (bool, error) { observedMatchingEvent := false informerStartedChan := make(chan struct{}) var informerStartedGuard sync.Once diff --git a/test/e2e/scheduling/predicates.go b/test/e2e/scheduling/predicates.go index 7096e7efb2f..7014074f907 100644 --- a/test/e2e/scheduling/predicates.go +++ b/test/e2e/scheduling/predicates.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + "k8s.io/kubernetes/test/e2e/common" "k8s.io/kubernetes/test/e2e/framework" testutils "k8s.io/kubernetes/test/utils" @@ -133,11 +134,10 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { }), true, framework.Logf)) } podName := "additional-pod" - createPausePod(f, pausePodConfig{ + WaitForSchedulerAfterAction(f, createPausePodAction(f, pausePodConfig{ Name: podName, Labels: map[string]string{"name": "additional"}, - }) - waitForScheduler() + }), podName, false) verifyResult(cs, podsNeededForSaturation, 1, ns) }) @@ -202,7 +202,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { }), true, framework.Logf)) } podName := "additional-pod" - createPausePod(f, pausePodConfig{ + conf := pausePodConfig{ Name: podName, Labels: map[string]string{"name": "additional"}, Resources: &v1.ResourceRequirements{ @@ -210,8 +210,8 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { "cpu": *resource.NewMilliQuantity(milliCpuPerPod, "DecimalSI"), }, }, - }) - waitForScheduler() + } + WaitForSchedulerAfterAction(f, createPausePodAction(f, conf), podName, false) verifyResult(cs, podsNeededForSaturation, 1, ns) }) @@ -223,22 +223,22 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { framework.WaitForStableCluster(cs, masterNodes) - createPausePod(f, pausePodConfig{ + conf := pausePodConfig{ Name: podName, Labels: map[string]string{"name": "restricted"}, NodeSelector: map[string]string{ "label": "nonempty", }, - }) + } - waitForScheduler() + WaitForSchedulerAfterAction(f, createPausePodAction(f, conf), podName, false) verifyResult(cs, 0, 1, ns) }) It("validates that a pod with an invalid NodeAffinity is rejected", func() { By("Trying to launch a pod with an invalid Affinity data.") podName := "without-label" - _, err := cs.Core().Pods(ns).Create(initPausePod(f, pausePodConfig{ + _, err := cs.CoreV1().Pods(ns).Create(initPausePod(f, pausePodConfig{ Name: podName, Affinity: &v1.Affinity{ NodeAffinity: &v1.NodeAffinity{ @@ -256,9 +256,6 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { if err == nil || !errors.IsInvalid(err) { framework.Failf("Expect error of invalid, got : %v", err) } - - // Wait a bit to allow scheduler to do its thing if the pod is not rejected. - waitForScheduler() }) It("validates that NodeSelector is respected if matching [Conformance]", func() { @@ -300,7 +297,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { framework.WaitForStableCluster(cs, masterNodes) - createPausePod(f, pausePodConfig{ + conf := pausePodConfig{ Name: podName, Affinity: &v1.Affinity{ NodeAffinity: &v1.NodeAffinity{ @@ -328,8 +325,8 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { }, }, Labels: map[string]string{"name": "restricted"}, - }) - waitForScheduler() + } + WaitForSchedulerAfterAction(f, createPausePodAction(f, conf), podName, false) verifyResult(cs, 0, 1, ns) }) @@ -378,7 +375,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { // already when the kubelet does not know about its new label yet. The // kubelet will then refuse to launch the pod. framework.ExpectNoError(framework.WaitForPodNotPending(cs, ns, labelPodName)) - labelPod, err := cs.Core().Pods(ns).Get(labelPodName, metav1.GetOptions{}) + labelPod, err := cs.CoreV1().Pods(ns).Get(labelPodName, metav1.GetOptions{}) framework.ExpectNoError(err) Expect(labelPod.Spec.NodeName).To(Equal(nodeName)) }) @@ -388,7 +385,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { It("validates that a pod with an invalid podAffinity is rejected because of the LabelSelectorRequirement is invalid", func() { By("Trying to launch a pod with an invalid pod Affinity data.") podName := "without-label-" + string(uuid.NewUUID()) - _, err := cs.Core().Pods(ns).Create(initPausePod(f, pausePodConfig{ + _, err := cs.CoreV1().Pods(ns).Create(initPausePod(f, pausePodConfig{ Name: podName, Labels: map[string]string{"name": "without-label"}, Affinity: &v1.Affinity{ @@ -414,9 +411,6 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { if err == nil || !errors.IsInvalid(err) { framework.Failf("Expect error of invalid, got : %v", err) } - - // Wait a bit to allow scheduler to do its thing if the pod is not rejected. - waitForScheduler() }) // Test Nodes does not have any pod, hence it should be impossible to schedule a Pod with pod affinity. @@ -424,7 +418,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { By("Trying to schedule Pod with nonempty Pod Affinity.") framework.WaitForStableCluster(cs, masterNodes) podName := "without-label-" + string(uuid.NewUUID()) - createPausePod(f, pausePodConfig{ + conf := pausePodConfig{ Name: podName, Affinity: &v1.Affinity{ PodAffinity: &v1.PodAffinity{ @@ -444,9 +438,9 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { }, }, }, - }) + } - waitForScheduler() + WaitForSchedulerAfterAction(f, createPausePodAction(f, conf), podName, false) verifyResult(cs, 0, 1, ns) }) @@ -492,7 +486,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { // already when the kubelet does not know about its new label yet. The // kubelet will then refuse to launch the pod. framework.ExpectNoError(framework.WaitForPodNotPending(cs, ns, labelPodName)) - labelPod, err := cs.Core().Pods(ns).Get(labelPodName, metav1.GetOptions{}) + labelPod, err := cs.CoreV1().Pods(ns).Get(labelPodName, metav1.GetOptions{}) framework.ExpectNoError(err) Expect(labelPod.Spec.NodeName).To(Equal(nodeName)) }) @@ -506,7 +500,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { By("Launching two pods on two distinct nodes to get two node names") CreateHostPortPods(f, "host-port", 2, true) defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, ns, "host-port") - podList, err := cs.Core().Pods(ns).List(metav1.ListOptions{}) + podList, err := cs.CoreV1().Pods(ns).List(metav1.ListOptions{}) framework.ExpectNoError(err) Expect(len(podList.Items)).To(Equal(2)) nodeNames := []string{podList.Items[0].Spec.NodeName, podList.Items[1].Spec.NodeName} @@ -532,7 +526,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { By("Trying to launch another pod, now with podAntiAffinity with same Labels.") labelPodName := "with-podantiaffinity-" + string(uuid.NewUUID()) - createPausePod(f, pausePodConfig{ + conf := pausePodConfig{ Name: labelPodName, Labels: map[string]string{"service": "Diff"}, NodeSelector: map[string]string{k: v}, // only launch on our two nodes, contradicting the podAntiAffinity @@ -555,9 +549,9 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { }, }, }, - }) + } - waitForScheduler() + WaitForSchedulerAfterAction(f, createPausePodAction(f, conf), labelPodName, false) verifyResult(cs, 3, 1, ns) }) @@ -609,7 +603,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { // already when the kubelet does not know about its new label yet. The // kubelet will then refuse to launch the pod. framework.ExpectNoError(framework.WaitForPodNotPending(cs, ns, labelPodName)) - labelPod, err := cs.Core().Pods(ns).Get(labelPodName, metav1.GetOptions{}) + labelPod, err := cs.CoreV1().Pods(ns).Get(labelPodName, metav1.GetOptions{}) framework.ExpectNoError(err) Expect(labelPod.Spec.NodeName).To(Equal(nodeName)) }) @@ -732,18 +726,16 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() { By("Trying to relaunch the pod, still no tolerations.") podNameNoTolerations := "still-no-tolerations" - createPausePod(f, pausePodConfig{ + conf := pausePodConfig{ Name: podNameNoTolerations, NodeSelector: map[string]string{labelKey: labelValue}, - }) + } - waitForScheduler() + WaitForSchedulerAfterAction(f, createPausePodAction(f, conf), podNameNoTolerations, false) verifyResult(cs, 0, 1, ns) By("Removing taint off the node") - framework.RemoveTaintOffNode(cs, nodeName, testTaint) - - waitForScheduler() + WaitForSchedulerAfterAction(f, removeTaintFromNodeAction(cs, nodeName, testTaint), podNameNoTolerations, true) verifyResult(cs, 1, 0, ns) }) }) @@ -900,16 +892,38 @@ func getRequestedCPU(pod v1.Pod) int64 { return result } -func waitForScheduler() { - // Wait a bit to allow scheduler to do its thing - // TODO: this is brittle; there's no guarantee the scheduler will have run in 10 seconds. - framework.Logf("Sleeping 10 seconds and crossing our fingers that scheduler will run in that time.") - time.Sleep(10 * time.Second) +// removeTaintFromNodeAction returns a closure that removes the given taint +// from the given node upon invocation. +func removeTaintFromNodeAction(cs clientset.Interface, nodeName string, testTaint v1.Taint) common.Action { + return func() error { + framework.RemoveTaintOffNode(cs, nodeName, testTaint) + return nil + } +} + +// createPausePodAction returns a closure that creates a pause pod upon invocation. +func createPausePodAction(f *framework.Framework, conf pausePodConfig) common.Action { + return func() error { + _, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(initPausePod(f, conf)) + return err + } +} + +// WaitForSchedulerAfterAction performs the provided action and then waits for +// scheduler to act on the given pod. +func WaitForSchedulerAfterAction(f *framework.Framework, action common.Action, podName string, expectSuccess bool) { + predicate := scheduleFailureEvent(podName) + if expectSuccess { + predicate = scheduleSuccessEvent(podName, "" /* any node */) + } + success, err := common.ObserveEventAfterAction(f, predicate, action) + Expect(err).NotTo(HaveOccurred()) + Expect(success).To(Equal(true)) } // TODO: upgrade calls in PodAffinity tests when we're able to run them func verifyResult(c clientset.Interface, expectedScheduled int, expectedNotScheduled int, ns string) { - allPods, err := c.Core().Pods(ns).List(metav1.ListOptions{}) + allPods, err := c.CoreV1().Pods(ns).List(metav1.ListOptions{}) framework.ExpectNoError(err) scheduledPods, notScheduledPods := framework.GetPodsScheduled(masterNodes, allPods)