sched: retry unschedule pods immediately after a waiting pod's deletion

This commit is contained in:
Wei Huang 2021-07-19 15:46:55 -07:00
parent ebc87c39d3
commit dc079acc2b
No known key found for this signature in database
GPG Key ID: BE5E9752F8B6E005
8 changed files with 312 additions and 101 deletions

View File

@ -174,7 +174,12 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) {
klog.ErrorS(err, "Unable to get profile", "pod", klog.KObj(pod))
return
}
fwk.RejectWaitingPod(pod.UID)
// If a waiting pod is rejected, it indicates it's previously assumed and we're
// removing it from the scheduler cache. In this case, signal a AssignedPodDelete
// event to immediately retry some unscheduled Pods.
if fwk.RejectWaitingPod(pod.UID) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.AssignedPodDelete, nil)
}
}
func (sched *Scheduler) addPodToCache(obj interface{}) {

View File

@ -584,7 +584,8 @@ type Handle interface {
GetWaitingPod(uid types.UID) WaitingPod
// RejectWaitingPod rejects a waiting pod given its UID.
RejectWaitingPod(uid types.UID)
// The return value indicates if the pod is waiting or not.
RejectWaitingPod(uid types.UID) bool
// ClientSet returns a kubernetes clientSet.
ClientSet() clientset.Interface

View File

@ -1092,11 +1092,13 @@ func (f *frameworkImpl) GetWaitingPod(uid types.UID) framework.WaitingPod {
}
// RejectWaitingPod rejects a WaitingPod given its UID.
func (f *frameworkImpl) RejectWaitingPod(uid types.UID) {
waitingPod := f.waitingPods.get(uid)
if waitingPod != nil {
// The returned value indicates if the given pod is waiting or not.
func (f *frameworkImpl) RejectWaitingPod(uid types.UID) bool {
if waitingPod := f.waitingPods.get(uid); waitingPod != nil {
waitingPod.Reject("", "removed")
return true
}
return false
}
// HasFilterPlugins returns true if at least one filter plugin is defined.

View File

@ -639,6 +639,16 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
} else {
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
// TODO(#103853): de-duplicate the logic.
// Avoid moving the assumed Pod itself as it's always Unschedulable.
// It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would
// update `q.moveRequest` and thus move the assumed pod to backoffQ anyways.
defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool {
return assumedPod.UID != pod.UID
})
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "")
return
@ -652,6 +662,11 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
} else {
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
// TODO(#103853): de-duplicate the logic.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "")
return
@ -664,6 +679,11 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if err := sched.SchedulerCache.ForgetPod(assumedPod); err != nil {
klog.ErrorS(err, "scheduler cache ForgetPod failed")
} else {
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
// TODO(#103853): de-duplicate the logic.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, "")
} else {

View File

@ -456,6 +456,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
Profiles: profile.Map{
testSchedulerName: fwk,
},
SchedulingQueue: internalqueue.NewTestQueue(context.Background(), nil),
}
called := make(chan struct{})
stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
@ -946,7 +947,8 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
Profiles: profile.Map{
testSchedulerName: fwk,
},
client: client,
client: client,
SchedulingQueue: internalqueue.NewTestQueue(context.Background(), nil),
}
return sched, bindingChan, errChan

View File

@ -24,6 +24,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
imageutils "k8s.io/kubernetes/test/utils/image"
"k8s.io/utils/pointer"
)
@ -415,6 +416,8 @@ func (p *PodWrapper) Req(resMap map[v1.ResourceName]string) *PodWrapper {
res[k] = resource.MustParse(v)
}
p.Spec.Containers = append(p.Spec.Containers, v1.Container{
Name: fmt.Sprintf("con%d", len(p.Spec.Containers)),
Image: imageutils.GetPauseImageName(),
Resources: v1.ResourceRequirements{
Requests: res,
},

View File

@ -19,16 +19,17 @@ package scheduler
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
listersv1 "k8s.io/client-go/listers/core/v1"
@ -38,6 +39,7 @@ import (
configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
st "k8s.io/kubernetes/pkg/scheduler/testing"
testutils "k8s.io/kubernetes/test/integration/util"
@ -66,6 +68,9 @@ type FilterPlugin struct {
numFilterCalled int32
failFilter bool
rejectFilter bool
numCalledPerPod map[string]int
sync.RWMutex
}
type PostFilterPlugin struct {
@ -92,6 +97,10 @@ type PreBindPlugin struct {
numPreBindCalled int
failPreBind bool
rejectPreBind bool
// If set to true, always succeed on non-first scheduling attempt.
succeedOnRetry bool
// Record the pod UIDs that have been tried scheduling.
podUIDs map[types.UID]struct{}
}
type BindPlugin struct {
@ -232,6 +241,9 @@ func (fp *FilterPlugin) Name() string {
func (fp *FilterPlugin) reset() {
fp.numFilterCalled = 0
fp.failFilter = false
if fp.numCalledPerPod != nil {
fp.numCalledPerPod = make(map[string]int)
}
}
// Filter is a test function that returns an error or nil, depending on the
@ -239,6 +251,12 @@ func (fp *FilterPlugin) reset() {
func (fp *FilterPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
atomic.AddInt32(&fp.numFilterCalled, 1)
if fp.numCalledPerPod != nil {
fp.Lock()
fp.numCalledPerPod[fmt.Sprintf("%v/%v", pod.Namespace, pod.Name)]++
fp.Unlock()
}
if fp.failFilter {
return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
}
@ -310,6 +328,10 @@ func (pp *PreBindPlugin) Name() string {
// PreBind is a test function that returns (true, nil) or errors for testing.
func (pp *PreBindPlugin) PreBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
pp.numPreBindCalled++
if _, tried := pp.podUIDs[pod.UID]; tried && pp.succeedOnRetry {
return nil
}
pp.podUIDs[pod.UID] = struct{}{}
if pp.failPreBind {
return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
}
@ -324,6 +346,8 @@ func (pp *PreBindPlugin) reset() {
pp.numPreBindCalled = 0
pp.failPreBind = false
pp.rejectPreBind = false
pp.succeedOnRetry = false
pp.podUIDs = make(map[types.UID]struct{})
}
const bindPluginAnnotation = "bindPluginName"
@ -940,34 +964,55 @@ func TestReservePluginReserve(t *testing.T) {
// TestPrebindPlugin tests invocation of prebind plugins.
func TestPrebindPlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a prebind plugin.
preBindPlugin := &PreBindPlugin{}
registry := frameworkruntime.Registry{preBindPluginName: newPlugin(preBindPlugin)}
// Create a plugin registry for testing. Register a prebind and a filter plugin.
preBindPlugin := &PreBindPlugin{podUIDs: make(map[types.UID]struct{})}
filterPlugin := &FilterPlugin{}
registry := frameworkruntime.Registry{
preBindPluginName: newPlugin(preBindPlugin),
filterPluginName: newPlugin(filterPlugin),
}
// Setup initial prebind plugin for testing.
// Setup initial prebind and filter plugin in different profiles.
// The second profile ensures the embedded filter plugin is exclusively called, and hence
// we can use its internal `numFilterCalled` to perform some precise checking logic.
cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{
Profiles: []v1beta2.KubeSchedulerProfile{{
SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName),
Plugins: &v1beta2.Plugins{
PreBind: v1beta2.PluginSet{
Enabled: []v1beta2.Plugin{
{Name: preBindPluginName},
Profiles: []v1beta2.KubeSchedulerProfile{
{
SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName),
Plugins: &v1beta2.Plugins{
PreBind: v1beta2.PluginSet{
Enabled: []v1beta2.Plugin{
{Name: preBindPluginName},
},
},
},
},
}},
{
SchedulerName: pointer.StringPtr("2nd-scheduler"),
Plugins: &v1beta2.Plugins{
Filter: v1beta2.PluginSet{
Enabled: []v1beta2.Plugin{
{Name: filterPluginName},
},
},
},
},
},
})
// Create the API server and the scheduler with the test plugin set.
testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "prebind-plugin", nil), 2,
nodesNum := 2
testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "prebind-plugin", nil), nodesNum,
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer testutils.CleanupTest(t, testCtx)
tests := []struct {
name string
fail bool
reject bool
name string
fail bool
reject bool
succeedOnRetry bool
unschedulablePod *v1.Pod
}{
{
name: "disable fail and reject flags",
@ -989,12 +1034,39 @@ func TestPrebindPlugin(t *testing.T) {
fail: true,
reject: true,
},
{
name: "fail on 1st try but succeed on retry",
fail: true,
reject: false,
succeedOnRetry: true,
},
{
name: "reject on 1st try but succeed on retry",
fail: false,
reject: true,
succeedOnRetry: true,
},
{
name: "failure on preBind moves unschedulable pods",
fail: true,
unschedulablePod: st.MakePod().Name("unschedulable-pod").Namespace(testCtx.NS.Name).Container(imageutils.GetPauseImageName()).Obj(),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if p := test.unschedulablePod; p != nil {
p.Spec.SchedulerName = "2nd-scheduler"
filterPlugin.rejectFilter = true
if _, err := createPausePod(testCtx.ClientSet, p); err != nil {
t.Fatalf("Error while creating an unschedulable pod: %v", err)
}
defer filterPlugin.reset()
}
preBindPlugin.failPreBind = test.fail
preBindPlugin.rejectPreBind = test.reject
preBindPlugin.succeedOnRetry = test.succeedOnRetry
// Create a best effort pod.
pod, err := createPausePod(testCtx.ClientSet,
initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
@ -1003,7 +1075,11 @@ func TestPrebindPlugin(t *testing.T) {
}
if test.fail || test.reject {
if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil {
if test.succeedOnRetry {
if err = testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, pod, 10*time.Second); err != nil {
t.Errorf("Expected the pod to be schedulable on retry, but got an error: %v", err)
}
} else if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil {
t.Errorf("Expected a scheduling error, but didn't get it. error: %v", err)
}
} else if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil {
@ -1014,6 +1090,16 @@ func TestPrebindPlugin(t *testing.T) {
t.Errorf("Expected the prebind plugin to be called.")
}
if test.unschedulablePod != nil {
if err := wait.Poll(10*time.Millisecond, 15*time.Second, func() (bool, error) {
// 2 means the unschedulable pod is expected to be retried at least twice.
// (one initial attempt plus the one moved by the preBind pod)
return int(filterPlugin.numFilterCalled) >= 2*nodesNum, nil
}); err != nil {
t.Errorf("Timed out waiting for the unschedulable Pod to be retried at least twice.")
}
}
preBindPlugin.reset()
testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod})
})
@ -1346,6 +1432,7 @@ func TestPostBindPlugin(t *testing.T) {
// Create a plugin registry for testing. Register a prebind and a postbind plugin.
preBindPlugin := &PreBindPlugin{
failPreBind: test.preBindFail,
podUIDs: make(map[types.UID]struct{}),
}
postBindPlugin := &PostBindPlugin{
name: postBindPluginName,
@ -1841,14 +1928,52 @@ func TestPreScorePlugin(t *testing.T) {
}
// TestPreemptWithPermitPlugin tests preempt with permit plugins.
// It verifies how waitingPods behave in different scenarios:
// - when waitingPods get preempted
// - they should be removed from internal waitingPods map, but not physically deleted
// - it'd trigger moving unschedulable Pods, but not the waitingPods themselves
// - when waitingPods get deleted externally, it'd trigger moving unschedulable Pods
func TestPreemptWithPermitPlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a permit plugin.
// Create a plugin registry for testing. Register a permit and a filter plugin.
permitPlugin := &PermitPlugin{}
registry, prof := initRegistryAndConfig(t, permitPlugin)
// Inject a fake filter plugin to use its internal `numFilterCalled` to verify
// how many times a Pod gets tried scheduling.
filterPlugin := &FilterPlugin{numCalledPerPod: make(map[string]int)}
registry := frameworkruntime.Registry{
permitPluginName: newPermitPlugin(permitPlugin),
filterPluginName: newPlugin(filterPlugin),
}
// Setup initial permit and filter plugins in the profile.
cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{
Profiles: []v1beta2.KubeSchedulerProfile{
{
SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName),
Plugins: &v1beta2.Plugins{
Permit: v1beta2.PluginSet{
Enabled: []v1beta2.Plugin{
{Name: permitPluginName},
},
},
Filter: v1beta2.PluginSet{
// Ensure the fake filter plugin is always called; otherwise noderesources
// would fail first and exit the Filter phase.
Enabled: []v1beta2.Plugin{
{Name: filterPluginName},
{Name: noderesources.FitName},
},
Disabled: []v1beta2.Plugin{
{Name: noderesources.FitName},
},
},
},
},
},
})
// Create the API server and the scheduler with the test plugin set.
testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "preempt-with-permit-plugin", nil), 0,
scheduler.WithProfiles(prof),
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer testutils.CleanupTest(t, testCtx)
@ -1863,85 +1988,141 @@ func TestPreemptWithPermitPlugin(t *testing.T) {
t.Fatal(err)
}
permitPlugin.failPermit = false
permitPlugin.rejectPermit = false
permitPlugin.timeoutPermit = false
permitPlugin.waitAndRejectPermit = false
permitPlugin.waitAndAllowPermit = true
permitPlugin.waitingPod = "waiting-pod"
ns := testCtx.NS.Name
lowPriority, highPriority := int32(100), int32(300)
resourceRequest := v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
resReq := map[v1.ResourceName]string{
v1.ResourceCPU: "200m",
v1.ResourceMemory: "200",
}
preemptorResourceRequest := v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI)},
preemptorReq := map[v1.ResourceName]string{
v1.ResourceCPU: "400m",
v1.ResourceMemory: "400",
}
// First pod will go running.
runningPod := initPausePod(&pausePodConfig{Name: "running-pod", Namespace: testCtx.NS.Name, Priority: &lowPriority, Resources: &resourceRequest})
runningPod.Spec.TerminationGracePeriodSeconds = new(int64)
runningPod, err = createPausePod(testCtx.ClientSet, runningPod)
if err != nil {
t.Errorf("Error while creating the waiting pod: %v", err)
}
// Wait until the pod scheduled, then create a preemptor pod to preempt it.
wait.Poll(100*time.Millisecond, 30*time.Second, podScheduled(testCtx.ClientSet, runningPod.Name, runningPod.Namespace))
// Second pod will go waiting.
waitingPod := initPausePod(&pausePodConfig{Name: "waiting-pod", Namespace: testCtx.NS.Name, Priority: &lowPriority, Resources: &resourceRequest})
waitingPod.Spec.TerminationGracePeriodSeconds = new(int64)
waitingPod, err = createPausePod(testCtx.ClientSet, waitingPod)
if err != nil {
t.Errorf("Error while creating the waiting pod: %v", err)
}
// Wait until the waiting-pod is actually waiting, then create a preemptor pod to preempt it.
wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) {
w := false
permitPlugin.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { w = true })
return w, nil
})
// Create third pod which should preempt other pods.
preemptorPod, err := createPausePod(testCtx.ClientSet,
initPausePod(&pausePodConfig{Name: "preemptor-pod", Namespace: testCtx.NS.Name, Priority: &highPriority, Resources: &preemptorResourceRequest}))
if err != nil {
t.Errorf("Error while creating the preemptor pod: %v", err)
tests := []struct {
name string
deleteWaitingPod bool
maxNumWaitingPodCalled int
runningPod *v1.Pod
waitingPod *v1.Pod
preemptor *v1.Pod
}{
{
name: "waiting pod is not physically deleted upon preemption",
maxNumWaitingPodCalled: 2,
runningPod: st.MakePod().Name("running-pod").Namespace(ns).Priority(lowPriority).Req(resReq).ZeroTerminationGracePeriod().Obj(),
waitingPod: st.MakePod().Name("waiting-pod").Namespace(ns).Priority(lowPriority).Req(resReq).ZeroTerminationGracePeriod().Obj(),
preemptor: st.MakePod().Name("preemptor-pod").Namespace(ns).Priority(highPriority).Req(preemptorReq).ZeroTerminationGracePeriod().Obj(),
},
{
name: "rejecting a waiting pod to trigger retrying unschedulable pods immediately, but the waiting pod itself won't be retried",
maxNumWaitingPodCalled: 1,
waitingPod: st.MakePod().Name("waiting-pod").Namespace(ns).Priority(lowPriority).Req(resReq).ZeroTerminationGracePeriod().Obj(),
preemptor: st.MakePod().Name("preemptor-pod").Namespace(ns).Priority(highPriority).Req(preemptorReq).ZeroTerminationGracePeriod().Obj(),
},
{
name: "deleting a waiting pod to trigger retrying unschedulable pods immediately",
deleteWaitingPod: true,
maxNumWaitingPodCalled: 1,
waitingPod: st.MakePod().Name("waiting-pod").Namespace(ns).Priority(lowPriority).Req(resReq).ZeroTerminationGracePeriod().Obj(),
preemptor: st.MakePod().Name("preemptor-pod").Namespace(ns).Priority(lowPriority).Req(preemptorReq).ZeroTerminationGracePeriod().Obj(),
},
}
// TODO(#96478): uncomment below once we find a way to trigger MoveAllToActiveOrBackoffQueue()
// upon deletion event of unassigned waiting pods.
// if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, preemptorPod); err != nil {
// t.Errorf("Expected the preemptor pod to be scheduled. error: %v", err)
// }
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
defer func() {
permitPlugin.reset()
filterPlugin.reset()
var pods []*v1.Pod
for _, p := range []*v1.Pod{tt.runningPod, tt.waitingPod, tt.preemptor} {
if p != nil {
pods = append(pods, p)
}
}
testutils.CleanupPods(testCtx.ClientSet, t, pods)
}()
if err := wait.Poll(200*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
w := false
permitPlugin.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { w = true })
return !w, nil
}); err != nil {
t.Error("Expected the waiting pod to get preempted")
}
// Expect the waitingPod to be still present.
if _, err := getPod(testCtx.ClientSet, waitingPod.Name, waitingPod.Namespace); err != nil {
t.Error("Get waiting pod in waiting pod failed.")
}
// Expect the runningPod to be deleted physically.
_, err = getPod(testCtx.ClientSet, runningPod.Name, runningPod.Namespace)
if err != nil && !errors.IsNotFound(err) {
t.Error("Get running pod failed.")
}
if err == nil {
t.Error("Running pod still exist.")
}
if permitPlugin.numPermitCalled == 0 {
t.Errorf("Expected the permit plugin to be called.")
}
permitPlugin.waitAndAllowPermit = true
permitPlugin.waitingPod = "waiting-pod"
permitPlugin.reset()
testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{waitingPod, runningPod, preemptorPod})
if r := tt.runningPod; r != nil {
if _, err := createPausePod(testCtx.ClientSet, r); err != nil {
t.Fatalf("Error while creating the running pod: %v", err)
}
// Wait until the pod to be scheduled.
if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, r); err != nil {
t.Fatalf("The running pod is expected to be scheduled: %v", err)
}
}
if w := tt.waitingPod; w != nil {
if _, err := createPausePod(testCtx.ClientSet, w); err != nil {
t.Fatalf("Error while creating the waiting pod: %v", err)
}
// Wait until the waiting-pod is actually waiting.
if err := wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) {
w := false
permitPlugin.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { w = true })
return w, nil
}); err != nil {
t.Fatalf("The waiting pod is expected to be waiting: %v", err)
}
}
if p := tt.preemptor; p != nil {
if _, err := createPausePod(testCtx.ClientSet, p); err != nil {
t.Fatalf("Error while creating the preemptor pod: %v", err)
}
// Delete the waiting pod if specified.
if w := tt.waitingPod; w != nil && tt.deleteWaitingPod {
if err := deletePod(testCtx.ClientSet, w.Name, w.Namespace); err != nil {
t.Fatalf("Error while deleting the waiting pod: %v", err)
}
}
if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, p); err != nil {
t.Fatalf("Expected the preemptor pod to be scheduled. error: %v", err)
}
}
if w := tt.waitingPod; w != nil {
if err := wait.Poll(200*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
w := false
permitPlugin.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { w = true })
return !w, nil
}); err != nil {
t.Fatalf("Expected the waiting pod to get preempted.")
}
filterPlugin.RLock()
waitingPodCalled := filterPlugin.numCalledPerPod[fmt.Sprintf("%v/%v", w.Namespace, w.Name)]
filterPlugin.RUnlock()
if waitingPodCalled > tt.maxNumWaitingPodCalled {
t.Fatalf("Expected the waiting pod to be called %v times at most, but got %v", tt.maxNumWaitingPodCalled, waitingPodCalled)
}
if !tt.deleteWaitingPod {
// Expect the waitingPod to be still present.
if _, err := getPod(testCtx.ClientSet, w.Name, w.Namespace); err != nil {
t.Error("Get waiting pod in waiting pod failed.")
}
}
if permitPlugin.numPermitCalled == 0 {
t.Errorf("Expected the permit plugin to be called.")
}
}
if r := tt.runningPod; r != nil {
// Expect the runningPod to be deleted physically.
if _, err = getPod(testCtx.ClientSet, r.Name, r.Namespace); err == nil {
t.Error("The running pod still exists.")
} else if !errors.IsNotFound(err) {
t.Errorf("Get running pod failed: %v", err)
}
}
})
}
}
const (

View File

@ -498,10 +498,7 @@ func podScheduled(c clientset.Interface, podNamespace, podName string) wait.Cond
// This could be a connection error so we want to retry.
return false, nil
}
if pod.Spec.NodeName == "" {
return false, nil
}
return true, nil
return pod.Spec.NodeName != "", nil
}
}