From abe0c5d5b49c94068f10815d244bd32e51f59bab Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Tue, 1 Nov 2022 11:49:40 -0700 Subject: [PATCH] E2E test for KEP Scheduling Readiness Gates --- test/e2e/framework/pod/wait.go | 77 ++++++++++++++++++++++++++++++- test/e2e/scheduling/predicates.go | 72 +++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+), 2 deletions(-) diff --git a/test/e2e/framework/pod/wait.go b/test/e2e/framework/pod/wait.go index 44d77b197fb..e4e8de51829 100644 --- a/test/e2e/framework/pod/wait.go +++ b/test/e2e/framework/pod/wait.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "reflect" "text/tabwriter" "time" @@ -253,7 +254,7 @@ func WaitForPodsRunningReady(c clientset.Interface, ns string, minPods, allowedN framework.Logf("Pod %s is Failed, but it's not controlled by a controller", pod.ObjectMeta.Name) badPods = append(badPods, pod) } - //ignore failed pods that are controlled by some controller + // ignore failed pods that are controlled by some controller } } @@ -326,7 +327,7 @@ func WaitForPodCondition(c clientset.Interface, ns, podName, conditionDesc strin return maybeTimeoutError(err, "waiting for pod %s to be %s", podIdentifier(ns, podName), conditionDesc) } -// WaitForPodsCondition waits for the listed pods to match the given condition. +// WaitForAllPodsCondition waits for the listed pods to match the given condition. // To succeed, at least minPods must be listed, and all listed pods must match the condition. func WaitForAllPodsCondition(c clientset.Interface, ns string, opts metav1.ListOptions, minPods int, conditionDesc string, timeout time.Duration, condition podCondition) (*v1.PodList, error) { framework.Logf("Waiting up to %v for at least %d pods in namespace %s to be %s", timeout, minPods, ns, conditionDesc) @@ -362,6 +363,78 @@ func WaitForAllPodsCondition(c clientset.Interface, ns string, opts metav1.ListO return pods, maybeTimeoutError(err, "waiting for at least %d pods to be %s (matched %d)", minPods, conditionDesc, matched) } +// WaitForPodsRunning waits for a given `timeout` to evaluate if a certain amount of pods in given `ns` are running. +func WaitForPodsRunning(c clientset.Interface, ns string, num int, timeout time.Duration) error { + matched := 0 + err := wait.PollImmediate(poll, timeout, func() (done bool, err error) { + pods, err := c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return handleWaitingAPIError(err, true, "listing pods") + } + matched = 0 + for _, pod := range pods.Items { + if ready, _ := testutils.PodRunningReady(&pod); ready { + matched++ + } + } + if matched == num { + return true, nil + } + framework.Logf("expect %d pods are running, but got %v", num, matched) + return false, nil + }) + return maybeTimeoutError(err, "waiting for pods to be running (want %v, matched %d)", num, matched) +} + +// WaitForPodsSchedulingGated waits for a given `timeout` to evaluate if a certain amount of pods in given `ns` stay in scheduling gated state. +func WaitForPodsSchedulingGated(c clientset.Interface, ns string, num int, timeout time.Duration) error { + matched := 0 + err := wait.PollImmediate(poll, timeout, func() (done bool, err error) { + pods, err := c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return handleWaitingAPIError(err, true, "listing pods") + } + matched = 0 + for _, pod := range pods.Items { + for _, condition := range pod.Status.Conditions { + if condition.Type == v1.PodScheduled && condition.Reason == v1.PodReasonSchedulingGated { + matched++ + } + } + } + if matched == num { + return true, nil + } + framework.Logf("expect %d pods in scheduling gated state, but got %v", num, matched) + return false, nil + }) + return maybeTimeoutError(err, "waiting for pods to be scheduling gated (want %d, matched %d)", num, matched) +} + +// WaitForPodsWithSchedulingGates waits for a given `timeout` to evaluate if a certain amount of pods in given `ns` +// match the given `schedulingGates`stay in scheduling gated state. +func WaitForPodsWithSchedulingGates(c clientset.Interface, ns string, num int, timeout time.Duration, schedulingGates []v1.PodSchedulingGate) error { + matched := 0 + err := wait.PollImmediate(poll, timeout, func() (done bool, err error) { + pods, err := c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return handleWaitingAPIError(err, true, "listing pods") + } + matched = 0 + for _, pod := range pods.Items { + if reflect.DeepEqual(pod.Spec.SchedulingGates, schedulingGates) { + matched++ + } + } + if matched == num { + return true, nil + } + framework.Logf("expect %d pods carry the expected scheduling gates, but got %v", num, matched) + return false, nil + }) + return maybeTimeoutError(err, "waiting for pods to carry the expected scheduling gates (want %d, matched %d)", num, matched) +} + // WaitForPodTerminatedInNamespace returns an error if it takes too long for the pod to terminate, // if the pod Get api returns an error (IsNotFound or other), or if the pod failed (and thus did not // terminate) with an unexpected reason. Typically called to test that the passed-in pod is fully diff --git a/test/e2e/scheduling/predicates.go b/test/e2e/scheduling/predicates.go index 93957429d3b..10204c9a552 100644 --- a/test/e2e/scheduling/predicates.go +++ b/test/e2e/scheduling/predicates.go @@ -18,6 +18,7 @@ package scheduling import ( "context" + "encoding/json" "fmt" "time" @@ -25,8 +26,10 @@ import ( nodev1 "k8s.io/api/node/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/uuid" utilversion "k8s.io/apimachinery/pkg/util/version" clientset "k8s.io/client-go/kubernetes" @@ -70,6 +73,7 @@ type pausePodConfig struct { PriorityClassName string DeletionGracePeriodSeconds *int64 TopologySpreadConstraints []v1.TopologySpreadConstraint + SchedulingGates []v1.PodSchedulingGate } var _ = SIGDescribe("SchedulerPredicates [Serial]", func() { @@ -799,8 +803,75 @@ var _ = SIGDescribe("SchedulerPredicates [Serial]", func() { framework.ExpectEqual(numInNode2, expected, fmt.Sprintf("Pods are not distributed as expected on node %q", nodeNames[1])) }) }) + + ginkgo.It("validates Pods with non-empty schedulingGates are blocked on scheduling [Feature:PodSchedulingReadiness] [alpha]", func() { + podLabel := "e2e-scheduling-gates" + replicas := 3 + ginkgo.By(fmt.Sprintf("Creating a ReplicaSet with replicas=%v, carrying scheduling gates [foo bar]", replicas)) + rsConfig := pauseRSConfig{ + Replicas: int32(replicas), + PodConfig: pausePodConfig{ + Name: podLabel, + Namespace: ns, + Labels: map[string]string{podLabel: ""}, + SchedulingGates: []v1.PodSchedulingGate{ + {Name: "foo"}, + {Name: "bar"}, + }, + }, + } + createPauseRS(f, rsConfig) + + ginkgo.By("Expect all pods stay in pending state") + podList, err := e2epod.WaitForNumberOfPods(cs, ns, replicas, time.Minute) + framework.ExpectNoError(err) + framework.ExpectNoError(e2epod.WaitForPodsSchedulingGated(cs, ns, replicas, time.Minute)) + + ginkgo.By("Remove one scheduling gate") + want := []v1.PodSchedulingGate{{Name: "bar"}} + var pods []*v1.Pod + for _, pod := range podList.Items { + clone := pod.DeepCopy() + clone.Spec.SchedulingGates = want + live, err := patchPod(cs, &pod, clone) + framework.ExpectNoError(err) + pods = append(pods, live) + } + + ginkgo.By("Expect all pods carry one scheduling gate and are still in pending state") + framework.ExpectNoError(e2epod.WaitForPodsWithSchedulingGates(cs, ns, replicas, time.Minute, want)) + framework.ExpectNoError(e2epod.WaitForPodsSchedulingGated(cs, ns, replicas, time.Minute)) + + ginkgo.By("Remove the remaining scheduling gates") + for _, pod := range pods { + clone := pod.DeepCopy() + clone.Spec.SchedulingGates = nil + _, err := patchPod(cs, pod, clone) + framework.ExpectNoError(err) + } + + ginkgo.By("Expect all pods are scheduled and running") + framework.ExpectNoError(e2epod.WaitForPodsRunning(cs, ns, replicas, time.Minute)) + }) }) +func patchPod(cs clientset.Interface, old, new *v1.Pod) (*v1.Pod, error) { + oldData, err := json.Marshal(old) + if err != nil { + return nil, err + } + + newData, err := json.Marshal(new) + if err != nil { + return nil, err + } + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Pod{}) + if err != nil { + return nil, fmt.Errorf("failed to create merge patch for Pod %q: %v", old.Name, err) + } + return cs.CoreV1().Pods(new.Namespace).Patch(context.TODO(), new.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) +} + // printAllPodsOnNode outputs status of all kubelet pods into log. func printAllPodsOnNode(c clientset.Interface, nodeName string) { podList, err := c.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{FieldSelector: "spec.nodeName=" + nodeName}) @@ -844,6 +915,7 @@ func initPausePod(f *framework.Framework, conf pausePodConfig) *v1.Pod { Tolerations: conf.Tolerations, PriorityClassName: conf.PriorityClassName, TerminationGracePeriodSeconds: &gracePeriod, + SchedulingGates: conf.SchedulingGates, }, } for key, value := range conf.Labels {