E2E test for KEP Scheduling Readiness Gates

This commit is contained in:
Wei Huang 2022-11-01 11:49:40 -07:00
parent ae5d430c76
commit abe0c5d5b4
No known key found for this signature in database
GPG Key ID: 17AFE05D01EA77B2
2 changed files with 147 additions and 2 deletions

View File

@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
"reflect"
"text/tabwriter"
"time"
@ -253,7 +254,7 @@ func WaitForPodsRunningReady(c clientset.Interface, ns string, minPods, allowedN
framework.Logf("Pod %s is Failed, but it's not controlled by a controller", pod.ObjectMeta.Name)
badPods = append(badPods, pod)
}
//ignore failed pods that are controlled by some controller
// ignore failed pods that are controlled by some controller
}
}
@ -326,7 +327,7 @@ func WaitForPodCondition(c clientset.Interface, ns, podName, conditionDesc strin
return maybeTimeoutError(err, "waiting for pod %s to be %s", podIdentifier(ns, podName), conditionDesc)
}
// WaitForPodsCondition waits for the listed pods to match the given condition.
// WaitForAllPodsCondition waits for the listed pods to match the given condition.
// To succeed, at least minPods must be listed, and all listed pods must match the condition.
func WaitForAllPodsCondition(c clientset.Interface, ns string, opts metav1.ListOptions, minPods int, conditionDesc string, timeout time.Duration, condition podCondition) (*v1.PodList, error) {
framework.Logf("Waiting up to %v for at least %d pods in namespace %s to be %s", timeout, minPods, ns, conditionDesc)
@ -362,6 +363,78 @@ func WaitForAllPodsCondition(c clientset.Interface, ns string, opts metav1.ListO
return pods, maybeTimeoutError(err, "waiting for at least %d pods to be %s (matched %d)", minPods, conditionDesc, matched)
}
// WaitForPodsRunning waits for a given `timeout` to evaluate if a certain amount of pods in given `ns` are running.
func WaitForPodsRunning(c clientset.Interface, ns string, num int, timeout time.Duration) error {
matched := 0
err := wait.PollImmediate(poll, timeout, func() (done bool, err error) {
pods, err := c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return handleWaitingAPIError(err, true, "listing pods")
}
matched = 0
for _, pod := range pods.Items {
if ready, _ := testutils.PodRunningReady(&pod); ready {
matched++
}
}
if matched == num {
return true, nil
}
framework.Logf("expect %d pods are running, but got %v", num, matched)
return false, nil
})
return maybeTimeoutError(err, "waiting for pods to be running (want %v, matched %d)", num, matched)
}
// WaitForPodsSchedulingGated waits for a given `timeout` to evaluate if a certain amount of pods in given `ns` stay in scheduling gated state.
func WaitForPodsSchedulingGated(c clientset.Interface, ns string, num int, timeout time.Duration) error {
matched := 0
err := wait.PollImmediate(poll, timeout, func() (done bool, err error) {
pods, err := c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return handleWaitingAPIError(err, true, "listing pods")
}
matched = 0
for _, pod := range pods.Items {
for _, condition := range pod.Status.Conditions {
if condition.Type == v1.PodScheduled && condition.Reason == v1.PodReasonSchedulingGated {
matched++
}
}
}
if matched == num {
return true, nil
}
framework.Logf("expect %d pods in scheduling gated state, but got %v", num, matched)
return false, nil
})
return maybeTimeoutError(err, "waiting for pods to be scheduling gated (want %d, matched %d)", num, matched)
}
// WaitForPodsWithSchedulingGates waits for a given `timeout` to evaluate if a certain amount of pods in given `ns`
// match the given `schedulingGates`stay in scheduling gated state.
func WaitForPodsWithSchedulingGates(c clientset.Interface, ns string, num int, timeout time.Duration, schedulingGates []v1.PodSchedulingGate) error {
matched := 0
err := wait.PollImmediate(poll, timeout, func() (done bool, err error) {
pods, err := c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return handleWaitingAPIError(err, true, "listing pods")
}
matched = 0
for _, pod := range pods.Items {
if reflect.DeepEqual(pod.Spec.SchedulingGates, schedulingGates) {
matched++
}
}
if matched == num {
return true, nil
}
framework.Logf("expect %d pods carry the expected scheduling gates, but got %v", num, matched)
return false, nil
})
return maybeTimeoutError(err, "waiting for pods to carry the expected scheduling gates (want %d, matched %d)", num, matched)
}
// WaitForPodTerminatedInNamespace returns an error if it takes too long for the pod to terminate,
// if the pod Get api returns an error (IsNotFound or other), or if the pod failed (and thus did not
// terminate) with an unexpected reason. Typically called to test that the passed-in pod is fully

View File

@ -18,6 +18,7 @@ package scheduling
import (
"context"
"encoding/json"
"fmt"
"time"
@ -25,8 +26,10 @@ import (
nodev1 "k8s.io/api/node/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/uuid"
utilversion "k8s.io/apimachinery/pkg/util/version"
clientset "k8s.io/client-go/kubernetes"
@ -70,6 +73,7 @@ type pausePodConfig struct {
PriorityClassName string
DeletionGracePeriodSeconds *int64
TopologySpreadConstraints []v1.TopologySpreadConstraint
SchedulingGates []v1.PodSchedulingGate
}
var _ = SIGDescribe("SchedulerPredicates [Serial]", func() {
@ -799,8 +803,75 @@ var _ = SIGDescribe("SchedulerPredicates [Serial]", func() {
framework.ExpectEqual(numInNode2, expected, fmt.Sprintf("Pods are not distributed as expected on node %q", nodeNames[1]))
})
})
ginkgo.It("validates Pods with non-empty schedulingGates are blocked on scheduling [Feature:PodSchedulingReadiness] [alpha]", func() {
podLabel := "e2e-scheduling-gates"
replicas := 3
ginkgo.By(fmt.Sprintf("Creating a ReplicaSet with replicas=%v, carrying scheduling gates [foo bar]", replicas))
rsConfig := pauseRSConfig{
Replicas: int32(replicas),
PodConfig: pausePodConfig{
Name: podLabel,
Namespace: ns,
Labels: map[string]string{podLabel: ""},
SchedulingGates: []v1.PodSchedulingGate{
{Name: "foo"},
{Name: "bar"},
},
},
}
createPauseRS(f, rsConfig)
ginkgo.By("Expect all pods stay in pending state")
podList, err := e2epod.WaitForNumberOfPods(cs, ns, replicas, time.Minute)
framework.ExpectNoError(err)
framework.ExpectNoError(e2epod.WaitForPodsSchedulingGated(cs, ns, replicas, time.Minute))
ginkgo.By("Remove one scheduling gate")
want := []v1.PodSchedulingGate{{Name: "bar"}}
var pods []*v1.Pod
for _, pod := range podList.Items {
clone := pod.DeepCopy()
clone.Spec.SchedulingGates = want
live, err := patchPod(cs, &pod, clone)
framework.ExpectNoError(err)
pods = append(pods, live)
}
ginkgo.By("Expect all pods carry one scheduling gate and are still in pending state")
framework.ExpectNoError(e2epod.WaitForPodsWithSchedulingGates(cs, ns, replicas, time.Minute, want))
framework.ExpectNoError(e2epod.WaitForPodsSchedulingGated(cs, ns, replicas, time.Minute))
ginkgo.By("Remove the remaining scheduling gates")
for _, pod := range pods {
clone := pod.DeepCopy()
clone.Spec.SchedulingGates = nil
_, err := patchPod(cs, pod, clone)
framework.ExpectNoError(err)
}
ginkgo.By("Expect all pods are scheduled and running")
framework.ExpectNoError(e2epod.WaitForPodsRunning(cs, ns, replicas, time.Minute))
})
})
func patchPod(cs clientset.Interface, old, new *v1.Pod) (*v1.Pod, error) {
oldData, err := json.Marshal(old)
if err != nil {
return nil, err
}
newData, err := json.Marshal(new)
if err != nil {
return nil, err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Pod{})
if err != nil {
return nil, fmt.Errorf("failed to create merge patch for Pod %q: %v", old.Name, err)
}
return cs.CoreV1().Pods(new.Namespace).Patch(context.TODO(), new.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
}
// printAllPodsOnNode outputs status of all kubelet pods into log.
func printAllPodsOnNode(c clientset.Interface, nodeName string) {
podList, err := c.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{FieldSelector: "spec.nodeName=" + nodeName})
@ -844,6 +915,7 @@ func initPausePod(f *framework.Framework, conf pausePodConfig) *v1.Pod {
Tolerations: conf.Tolerations,
PriorityClassName: conf.PriorityClassName,
TerminationGracePeriodSeconds: &gracePeriod,
SchedulingGates: conf.SchedulingGates,
},
}
for key, value := range conf.Labels {