Merge pull request #113442 from Huang-Wei/kep-3521-C

[KEP-3521] Part 3: Bug fixes, integration & E2E Test
This commit is contained in:
Kubernetes Prow Robot 2022-11-08 15:08:15 -08:00 committed by GitHub
commit d619f60e0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 403 additions and 7 deletions

View File

@ -441,6 +441,9 @@ func (p *PriorityQueue) activate(pod *v1.Pod) bool {
// isPodBackingoff returns true if a pod is still waiting for its backoff timer.
// If this returns true, the pod should not be re-tried.
func (p *PriorityQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool {
if podInfo.Gated {
return false
}
boTime := p.getBackoffTime(podInfo)
return boTime.After(p.clock.Now())
}

View File

@ -512,14 +512,24 @@ func TestPriorityQueue_addToActiveQ(t *testing.T) {
defer cancel()
m := map[string][]framework.PreEnqueuePlugin{"": tt.plugins}
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), []runtime.Object{tt.pod}, WithPreEnqueuePluginMap(m))
got, _ := q.addToActiveQ(newQueuedPodInfoForLookup(tt.pod))
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), []runtime.Object{tt.pod}, WithPreEnqueuePluginMap(m),
WithPodInitialBackoffDuration(time.Second*30), WithPodMaxBackoffDuration(time.Second*60))
got, _ := q.addToActiveQ(q.newQueuedPodInfo(tt.pod))
if got != tt.wantSuccess {
t.Errorf("Unexpected result: want %v, but got %v", tt.wantSuccess, got)
}
if tt.wantUnschedulablePods != len(q.unschedulablePods.podInfoMap) {
t.Errorf("Unexpected unschedulablePods: want %v, but got %v", tt.wantUnschedulablePods, len(q.unschedulablePods.podInfoMap))
}
// Simulate an update event.
clone := tt.pod.DeepCopy()
metav1.SetMetaDataAnnotation(&clone.ObjectMeta, "foo", "")
q.Update(tt.pod, clone)
// Ensure the pod is still located in unschedulablePods.
if tt.wantUnschedulablePods != len(q.unschedulablePods.podInfoMap) {
t.Errorf("Unexpected unschedulablePods: want %v, but got %v", tt.wantUnschedulablePods, len(q.unschedulablePods.podInfoMap))
}
})
}
}

View File

@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
"reflect"
"text/tabwriter"
"time"
@ -253,7 +254,7 @@ func WaitForPodsRunningReady(c clientset.Interface, ns string, minPods, allowedN
framework.Logf("Pod %s is Failed, but it's not controlled by a controller", pod.ObjectMeta.Name)
badPods = append(badPods, pod)
}
//ignore failed pods that are controlled by some controller
// ignore failed pods that are controlled by some controller
}
}
@ -326,7 +327,7 @@ func WaitForPodCondition(c clientset.Interface, ns, podName, conditionDesc strin
return maybeTimeoutError(err, "waiting for pod %s to be %s", podIdentifier(ns, podName), conditionDesc)
}
// WaitForPodsCondition waits for the listed pods to match the given condition.
// WaitForAllPodsCondition waits for the listed pods to match the given condition.
// To succeed, at least minPods must be listed, and all listed pods must match the condition.
func WaitForAllPodsCondition(c clientset.Interface, ns string, opts metav1.ListOptions, minPods int, conditionDesc string, timeout time.Duration, condition podCondition) (*v1.PodList, error) {
framework.Logf("Waiting up to %v for at least %d pods in namespace %s to be %s", timeout, minPods, ns, conditionDesc)
@ -362,6 +363,78 @@ func WaitForAllPodsCondition(c clientset.Interface, ns string, opts metav1.ListO
return pods, maybeTimeoutError(err, "waiting for at least %d pods to be %s (matched %d)", minPods, conditionDesc, matched)
}
// WaitForPodsRunning waits for a given `timeout` to evaluate if a certain amount of pods in given `ns` are running.
func WaitForPodsRunning(c clientset.Interface, ns string, num int, timeout time.Duration) error {
matched := 0
err := wait.PollImmediate(poll, timeout, func() (done bool, err error) {
pods, err := c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return handleWaitingAPIError(err, true, "listing pods")
}
matched = 0
for _, pod := range pods.Items {
if ready, _ := testutils.PodRunningReady(&pod); ready {
matched++
}
}
if matched == num {
return true, nil
}
framework.Logf("expect %d pods are running, but got %v", num, matched)
return false, nil
})
return maybeTimeoutError(err, "waiting for pods to be running (want %v, matched %d)", num, matched)
}
// WaitForPodsSchedulingGated waits for a given `timeout` to evaluate if a certain amount of pods in given `ns` stay in scheduling gated state.
func WaitForPodsSchedulingGated(c clientset.Interface, ns string, num int, timeout time.Duration) error {
matched := 0
err := wait.PollImmediate(poll, timeout, func() (done bool, err error) {
pods, err := c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return handleWaitingAPIError(err, true, "listing pods")
}
matched = 0
for _, pod := range pods.Items {
for _, condition := range pod.Status.Conditions {
if condition.Type == v1.PodScheduled && condition.Reason == v1.PodReasonSchedulingGated {
matched++
}
}
}
if matched == num {
return true, nil
}
framework.Logf("expect %d pods in scheduling gated state, but got %v", num, matched)
return false, nil
})
return maybeTimeoutError(err, "waiting for pods to be scheduling gated (want %d, matched %d)", num, matched)
}
// WaitForPodsWithSchedulingGates waits for a given `timeout` to evaluate if a certain amount of pods in given `ns`
// match the given `schedulingGates`stay in scheduling gated state.
func WaitForPodsWithSchedulingGates(c clientset.Interface, ns string, num int, timeout time.Duration, schedulingGates []v1.PodSchedulingGate) error {
matched := 0
err := wait.PollImmediate(poll, timeout, func() (done bool, err error) {
pods, err := c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return handleWaitingAPIError(err, true, "listing pods")
}
matched = 0
for _, pod := range pods.Items {
if reflect.DeepEqual(pod.Spec.SchedulingGates, schedulingGates) {
matched++
}
}
if matched == num {
return true, nil
}
framework.Logf("expect %d pods carry the expected scheduling gates, but got %v", num, matched)
return false, nil
})
return maybeTimeoutError(err, "waiting for pods to carry the expected scheduling gates (want %d, matched %d)", num, matched)
}
// WaitForPodTerminatedInNamespace returns an error if it takes too long for the pod to terminate,
// if the pod Get api returns an error (IsNotFound or other), or if the pod failed (and thus did not
// terminate) with an unexpected reason. Typically called to test that the passed-in pod is fully

View File

@ -18,6 +18,7 @@ package scheduling
import (
"context"
"encoding/json"
"fmt"
"time"
@ -25,8 +26,10 @@ import (
nodev1 "k8s.io/api/node/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/uuid"
utilversion "k8s.io/apimachinery/pkg/util/version"
clientset "k8s.io/client-go/kubernetes"
@ -70,6 +73,7 @@ type pausePodConfig struct {
PriorityClassName string
DeletionGracePeriodSeconds *int64
TopologySpreadConstraints []v1.TopologySpreadConstraint
SchedulingGates []v1.PodSchedulingGate
}
var _ = SIGDescribe("SchedulerPredicates [Serial]", func() {
@ -799,8 +803,75 @@ var _ = SIGDescribe("SchedulerPredicates [Serial]", func() {
framework.ExpectEqual(numInNode2, expected, fmt.Sprintf("Pods are not distributed as expected on node %q", nodeNames[1]))
})
})
ginkgo.It("validates Pods with non-empty schedulingGates are blocked on scheduling [Feature:PodSchedulingReadiness] [alpha]", func() {
podLabel := "e2e-scheduling-gates"
replicas := 3
ginkgo.By(fmt.Sprintf("Creating a ReplicaSet with replicas=%v, carrying scheduling gates [foo bar]", replicas))
rsConfig := pauseRSConfig{
Replicas: int32(replicas),
PodConfig: pausePodConfig{
Name: podLabel,
Namespace: ns,
Labels: map[string]string{podLabel: ""},
SchedulingGates: []v1.PodSchedulingGate{
{Name: "foo"},
{Name: "bar"},
},
},
}
createPauseRS(f, rsConfig)
ginkgo.By("Expect all pods stay in pending state")
podList, err := e2epod.WaitForNumberOfPods(cs, ns, replicas, time.Minute)
framework.ExpectNoError(err)
framework.ExpectNoError(e2epod.WaitForPodsSchedulingGated(cs, ns, replicas, time.Minute))
ginkgo.By("Remove one scheduling gate")
want := []v1.PodSchedulingGate{{Name: "bar"}}
var pods []*v1.Pod
for _, pod := range podList.Items {
clone := pod.DeepCopy()
clone.Spec.SchedulingGates = want
live, err := patchPod(cs, &pod, clone)
framework.ExpectNoError(err)
pods = append(pods, live)
}
ginkgo.By("Expect all pods carry one scheduling gate and are still in pending state")
framework.ExpectNoError(e2epod.WaitForPodsWithSchedulingGates(cs, ns, replicas, time.Minute, want))
framework.ExpectNoError(e2epod.WaitForPodsSchedulingGated(cs, ns, replicas, time.Minute))
ginkgo.By("Remove the remaining scheduling gates")
for _, pod := range pods {
clone := pod.DeepCopy()
clone.Spec.SchedulingGates = nil
_, err := patchPod(cs, pod, clone)
framework.ExpectNoError(err)
}
ginkgo.By("Expect all pods are scheduled and running")
framework.ExpectNoError(e2epod.WaitForPodsRunning(cs, ns, replicas, time.Minute))
})
})
func patchPod(cs clientset.Interface, old, new *v1.Pod) (*v1.Pod, error) {
oldData, err := json.Marshal(old)
if err != nil {
return nil, err
}
newData, err := json.Marshal(new)
if err != nil {
return nil, err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Pod{})
if err != nil {
return nil, fmt.Errorf("failed to create merge patch for Pod %q: %v", old.Name, err)
}
return cs.CoreV1().Pods(new.Namespace).Patch(context.TODO(), new.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
}
// printAllPodsOnNode outputs status of all kubelet pods into log.
func printAllPodsOnNode(c clientset.Interface, nodeName string) {
podList, err := c.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{FieldSelector: "spec.nodeName=" + nodeName})
@ -844,6 +915,7 @@ func initPausePod(f *framework.Framework, conf pausePodConfig) *v1.Pod {
Tolerations: conf.Tolerations,
PriorityClassName: conf.PriorityClassName,
TerminationGracePeriodSeconds: &gracePeriod,
SchedulingGates: conf.SchedulingGates,
},
}
for key, value := range conf.Labels {

View File

@ -31,9 +31,12 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
listersv1 "k8s.io/client-go/listers/core/v1"
featuregatetesting "k8s.io/component-base/featuregate/testing"
configv1 "k8s.io/kube-scheduler/config/v1"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler"
schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
@ -57,9 +60,15 @@ var (
podSchedulingError = testutils.PodSchedulingError
createAndWaitForNodesInCache = testutils.CreateAndWaitForNodesInCache
waitForPodUnschedulable = testutils.WaitForPodUnschedulable
waitForPodSchedulingGated = testutils.WaitForPodSchedulingGated
waitForPodToScheduleWithTimeout = testutils.WaitForPodToScheduleWithTimeout
)
type PreEnqueuePlugin struct {
called int32
admit bool
}
type PreFilterPlugin struct {
numPreFilterCalled int
failPreFilter bool
@ -146,6 +155,7 @@ type PermitPlugin struct {
}
const (
enqueuePluginName = "enqueue-plugin"
prefilterPluginName = "prefilter-plugin"
postfilterPluginName = "postfilter-plugin"
scorePluginName = "score-plugin"
@ -158,6 +168,7 @@ const (
permitPluginName = "permit-plugin"
)
var _ framework.PreEnqueuePlugin = &PreEnqueuePlugin{}
var _ framework.PreFilterPlugin = &PreFilterPlugin{}
var _ framework.PostFilterPlugin = &PostFilterPlugin{}
var _ framework.ScorePlugin = &ScorePlugin{}
@ -184,6 +195,18 @@ func newPlugin(plugin framework.Plugin) frameworkruntime.PluginFactory {
}
}
func (ep *PreEnqueuePlugin) Name() string {
return enqueuePluginName
}
func (ep *PreEnqueuePlugin) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status {
ep.called++
if ep.admit {
return nil
}
return framework.NewStatus(framework.UnschedulableAndUnresolvable, "not ready for scheduling")
}
// Name returns name of the score plugin.
func (sp *ScorePlugin) Name() string {
return scorePluginName
@ -2089,6 +2112,72 @@ func TestPreScorePlugin(t *testing.T) {
}
}
// TestPreEnqueuePlugin tests invocation of enqueue plugins.
func TestPreEnqueuePlugin(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodSchedulingReadiness, true)()
// Create a plugin registry for testing. Register only a filter plugin.
enqueuePlugin := &PreEnqueuePlugin{}
// Plumb a preFilterPlugin to verify if it's called or not.
preFilterPlugin := &PreFilterPlugin{}
registry, prof := initRegistryAndConfig(t, enqueuePlugin, preFilterPlugin)
// Create the API server and the scheduler with the test plugin set.
testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "enqueue-plugin", nil), 1,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer testutils.CleanupTest(t, testCtx)
tests := []struct {
name string
pod *v1.Pod
admitEnqueue bool
}{
{
name: "pod is admitted to enqueue",
pod: st.MakePod().Name("p").Namespace(testCtx.NS.Name).Container("pause").Obj(),
admitEnqueue: true,
},
{
name: "pod is not admitted to enqueue",
pod: st.MakePod().Name("p").Namespace(testCtx.NS.Name).SchedulingGates([]string{"foo"}).Container("pause").Obj(),
admitEnqueue: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
enqueuePlugin.admit = tt.admitEnqueue
// Create a best effort pod.
pod, err := createPausePod(testCtx.ClientSet, tt.pod)
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
if tt.admitEnqueue {
if err := waitForPodToScheduleWithTimeout(testCtx.ClientSet, pod, 10*time.Second); err != nil {
t.Errorf("Expected the pod to be schedulable, but got: %v", err)
}
// Also verify enqueuePlugin is called.
if enqueuePlugin.called == 0 {
t.Errorf("Expected the enqueuePlugin plugin to be called at least once, but got 0")
}
} else {
if err := waitForPodSchedulingGated(testCtx.ClientSet, pod, 10*time.Second); err != nil {
t.Errorf("Expected the pod to be scheduling waiting, but got: %v", err)
}
// Also verify preFilterPlugin is not called.
if preFilterPlugin.numPreFilterCalled != 0 {
t.Errorf("Expected the preFilter plugin not to be called, but got %v", preFilterPlugin.numPreFilterCalled)
}
}
preFilterPlugin.reset()
testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod})
})
}
}
// TestPreemptWithPermitPlugin tests preempt with permit plugins.
// It verifies how waitingPods behave in different scenarios:
// - when waitingPods get preempted
@ -2450,6 +2539,8 @@ func initRegistryAndConfig(t *testing.T, plugins ...framework.Plugin) (framework
plugin := configv1.Plugin{Name: p.Name()}
switch p.(type) {
case *PreEnqueuePlugin:
pls.PreEnqueue.Enabled = append(pls.PreEnqueue.Enabled, plugin)
case *PreFilterPlugin:
pls.PreFilter.Enabled = append(pls.PreFilter.Enabled, plugin)
case *FilterPlugin:

View File

@ -30,12 +30,16 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
featuregatetesting "k8s.io/component-base/featuregate/testing"
configv1 "k8s.io/kube-scheduler/config/v1"
apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler"
configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
"k8s.io/kubernetes/pkg/scheduler/framework"
@ -47,6 +51,128 @@ import (
"k8s.io/utils/pointer"
)
func TestSchedulingGates(t *testing.T) {
tests := []struct {
name string
pods []*v1.Pod
featureEnabled bool
want []string
rmPodsSchedulingGates []int
wantPostGatesRemoval []string
}{
{
name: "feature disabled, regular pods",
pods: []*v1.Pod{
st.MakePod().Name("p1").Container("pause").Obj(),
st.MakePod().Name("p2").Container("pause").Obj(),
},
featureEnabled: false,
want: []string{"p1", "p2"},
},
{
name: "feature enabled, regular pods",
pods: []*v1.Pod{
st.MakePod().Name("p1").Container("pause").Obj(),
st.MakePod().Name("p2").Container("pause").Obj(),
},
featureEnabled: true,
want: []string{"p1", "p2"},
},
{
name: "feature disabled, one pod carrying scheduling gates",
pods: []*v1.Pod{
st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).Container("pause").Obj(),
st.MakePod().Name("p2").Container("pause").Obj(),
},
featureEnabled: false,
want: []string{"p1", "p2"},
},
{
name: "feature enabled, one pod carrying scheduling gates",
pods: []*v1.Pod{
st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).Container("pause").Obj(),
st.MakePod().Name("p2").Container("pause").Obj(),
},
featureEnabled: true,
want: []string{"p2"},
},
{
name: "feature enabled, two pod carrying scheduling gates, and remove gates of one pod",
pods: []*v1.Pod{
st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).Container("pause").Obj(),
st.MakePod().Name("p2").SchedulingGates([]string{"bar"}).Container("pause").Obj(),
st.MakePod().Name("p3").Container("pause").Obj(),
},
featureEnabled: true,
want: []string{"p3"},
rmPodsSchedulingGates: []int{1}, // remove gates of 'p2'
wantPostGatesRemoval: []string{"p2"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodSchedulingReadiness, tt.featureEnabled)()
// Use zero backoff seconds to bypass backoffQ.
// It's intended to not start the scheduler's queue, and hence to
// not start any flushing logic. We will pop and schedule the Pods manually later.
testCtx := testutils.InitTestSchedulerWithOptions(
t,
testutils.InitTestAPIServer(t, "pod-scheduling-gates", nil),
0,
scheduler.WithPodInitialBackoffSeconds(0),
scheduler.WithPodMaxBackoffSeconds(0),
)
testutils.SyncInformerFactory(testCtx)
defer testutils.CleanupTest(t, testCtx)
cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx
for _, p := range tt.pods {
p.Namespace = ns
if _, err := cs.CoreV1().Pods(ns).Create(ctx, p, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Pod %q: %v", p.Name, err)
}
}
// Wait for the pods to be present in the scheduling queue.
if err := wait.Poll(time.Millisecond*200, wait.ForeverTestTimeout, func() (bool, error) {
pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods()
return len(pendingPods) == len(tt.pods), nil
}); err != nil {
t.Fatal(err)
}
// Pop the expected pods out. They should be de-queueable.
for _, wantPod := range tt.want {
podInfo := nextPodOrDie(t, testCtx)
if got := podInfo.Pod.Name; got != wantPod {
t.Errorf("Want %v to be popped out, but got %v", wantPod, got)
}
}
if len(tt.rmPodsSchedulingGates) == 0 {
return
}
// Remove scheduling gates from the pod spec.
for _, idx := range tt.rmPodsSchedulingGates {
patch := `{"spec": {"schedulingGates": null}}`
podName := tt.pods[idx].Name
if _, err := cs.CoreV1().Pods(ns).Patch(ctx, podName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}); err != nil {
t.Fatalf("Failed to patch pod %v: %v", podName, err)
}
}
// Pop the expected pods out. They should be de-queueable.
for _, wantPod := range tt.wantPostGatesRemoval {
podInfo := nextPodOrDie(t, testCtx)
if got := podInfo.Pod.Name; got != wantPod {
t.Errorf("Want %v to be popped out, but got %v", wantPod, got)
}
}
})
}
}
// TestCoreResourceEnqueue verify Pods failed by in-tree default plugins can be
// moved properly upon their registered events.
func TestCoreResourceEnqueue(t *testing.T) {

View File

@ -776,7 +776,7 @@ func PodScheduledIn(c clientset.Interface, podNamespace, podName string, nodeNam
}
// PodUnschedulable returns a condition function that returns true if the given pod
// gets unschedulable status.
// gets unschedulable status of reason 'Unschedulable'.
func PodUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
@ -806,18 +806,39 @@ func PodSchedulingError(c clientset.Interface, podNamespace, podName string) wai
}
}
// waitForPodUnscheduleWithTimeout waits for a pod to fail scheduling and returns
// PodSchedulingGated returns a condition function that returns true if the given pod
// gets unschedulable status of reason 'SchedulingGated'.
func PodSchedulingGated(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
// This could be a connection error so we want to retry.
return false, nil
}
_, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
return cond != nil && cond.Status == v1.ConditionFalse &&
cond.Reason == v1.PodReasonSchedulingGated && pod.Spec.NodeName == "", nil
}
}
// WaitForPodUnschedulableWithTimeout waits for a pod to fail scheduling and returns
// an error if it does not become unschedulable within the given timeout.
func WaitForPodUnschedulableWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
return wait.Poll(100*time.Millisecond, timeout, PodUnschedulable(cs, pod.Namespace, pod.Name))
}
// waitForPodUnschedule waits for a pod to fail scheduling and returns
// WaitForPodUnschedulable waits for a pod to fail scheduling and returns
// an error if it does not become unschedulable within the timeout duration (30 seconds).
func WaitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error {
return WaitForPodUnschedulableWithTimeout(cs, pod, 30*time.Second)
}
// WaitForPodSchedulingGated waits for a pod to be in scheduling gated state
// and returns an error if it does not fall into this state within the given timeout.
func WaitForPodSchedulingGated(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
return wait.Poll(100*time.Millisecond, timeout, PodSchedulingGated(cs, pod.Namespace, pod.Name))
}
// WaitForPDBsStable waits for PDBs to have "CurrentHealthy" status equal to
// the expected values.
func WaitForPDBsStable(testCtx *TestContext, pdbs []*policy.PodDisruptionBudget, pdbPodNum []int32) error {