Move queue.Done() before Prebind, add tests

This commit is contained in:
Ania Borowiec 2025-03-20 20:17:53 +00:00
parent b0d6079ddc
commit 17acc4a5ee
No known key found for this signature in database
3 changed files with 516 additions and 101 deletions

View File

@ -134,6 +134,7 @@ type SchedulingQueue interface {
PodsInActiveQ() []*v1.Pod
// PodsInBackoffQ returns all the Pods in the backoffQ.
PodsInBackoffQ() []*v1.Pod
UnschedulablePods() []*v1.Pod
}
// NewSchedulingQueue initializes a priority queue as a new scheduling queue.
@ -1205,6 +1206,15 @@ func (p *PriorityQueue) PodsInBackoffQ() []*v1.Pod {
return p.backoffQ.list()
}
// UnschedulablePods returns all the pods in unschedulable state.
func (p *PriorityQueue) UnschedulablePods() []*v1.Pod {
var result []*v1.Pod
for _, pInfo := range p.unschedulablePods.podInfoMap {
result = append(result, pInfo.Pod)
}
return result
}
var pendingPodsSummary = "activeQ:%v; backoffQ:%v; unschedulablePods:%v"
// GetPod searches for a pod in the activeQ, backoffQ, and unschedulablePods.
@ -1241,9 +1251,9 @@ func (p *PriorityQueue) GetPod(name, namespace string) (pInfo *framework.QueuedP
func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) {
p.lock.RLock()
defer p.lock.RUnlock()
result := p.activeQ.list()
result := p.PodsInActiveQ()
activeQLen := len(result)
backoffQPods := p.backoffQ.list()
backoffQPods := p.PodsInBackoffQ()
backoffQLen := len(backoffQPods)
result = append(result, backoffQPods...)
for _, pInfo := range p.unschedulablePods.podInfoMap {

View File

@ -292,30 +292,19 @@ func (sched *Scheduler) bindingCycle(
return status
}
// Run "prebind" plugins.
if status := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost); !status.IsSuccess() {
if status.IsRejected() {
fitErr := &framework.FitError{
NumAllNodes: 1,
Pod: assumedPodInfo.Pod,
Diagnosis: framework.Diagnosis{
NodeToStatus: framework.NewDefaultNodeToStatus(),
UnschedulablePlugins: sets.New(status.Plugin()),
},
}
fitErr.Diagnosis.NodeToStatus.Set(scheduleResult.SuggestedHost, status)
return framework.NewStatus(status.Code()).WithError(fitErr)
}
return status
}
// Any failures after this point cannot lead to the Pod being considered unschedulable.
// We define the Pod as "unschedulable" only when Pods are rejected at specific extension points, and PreBind is the last one in the scheduling/binding cycle.
// We define the Pod as "unschedulable" only when Pods are rejected at specific extension points, and Permit is the last one in the scheduling/binding cycle.
// If a Pod fails on PreBind or Bind, it should be moved to BackoffQ for retry.
//
// We can call Done() here because
// we can free the cluster events stored in the scheduling queue sonner, which is worth for busy clusters memory consumption wise.
// we can free the cluster events stored in the scheduling queue sooner, which is worth for busy clusters memory consumption wise.
sched.SchedulingQueue.Done(assumedPod.UID)
// Run "prebind" plugins.
if status := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost); !status.IsSuccess() {
return status
}
// Run "bind" plugins.
if status := sched.bind(ctx, fwk, assumedPod, scheduleResult.SuggestedHost, state); !status.IsSuccess() {
return status

View File

@ -89,11 +89,6 @@ var (
}
)
type mockScheduleResult struct {
result ScheduleResult
err error
}
type fakeExtender struct {
isBinder bool
interestedPodName string
@ -670,95 +665,156 @@ func TestSchedulerScheduleOne(t *testing.T) {
testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1", UID: types.UID("node1")}}
client := clientsetfake.NewClientset(&testNode)
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
errS := errors.New("scheduler")
errB := errors.New("binder")
scheduleResultOk := ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}
emptyScheduleResult := ScheduleResult{}
fakeBinding := &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}}
reserveErr := errors.New("reserve error")
schedulingErr := errors.New("scheduler")
permitErr := errors.New("permit error")
preBindErr := errors.New("on PreBind")
bindingErr := errors.New("binder")
testPod := podWithID("foo", "")
assignedTestPod := podWithID("foo", testNode.Name)
table := []struct {
name string
injectBindError error
sendPod *v1.Pod
registerPluginFuncs []tf.RegisterPluginFunc
expectErrorPod *v1.Pod
expectForgetPod *v1.Pod
expectAssumedPod *v1.Pod
expectError error
expectBind *v1.Binding
eventReason string
mockResult mockScheduleResult
name string
sendPod *v1.Pod
registerPluginFuncs []tf.RegisterPluginFunc
injectBindError error
injectSchedulingError error
mockScheduleResult ScheduleResult
expectErrorPod *v1.Pod
expectForgetPod *v1.Pod
expectAssumedPod *v1.Pod
expectPodInBackoffQ *v1.Pod
expectPodInUnschedulable *v1.Pod
expectError error
expectBind *v1.Binding
eventReason string
}{
{
name: "error reserve pod",
sendPod: podWithID("foo", ""),
mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
registerPluginFuncs: []tf.RegisterPluginFunc{
tf.RegisterReservePlugin("FakeReserve", tf.NewFakeReservePlugin(framework.NewStatus(framework.Error, "reserve error"))),
},
expectErrorPod: podWithID("foo", testNode.Name),
expectForgetPod: podWithID("foo", testNode.Name),
expectAssumedPod: podWithID("foo", testNode.Name),
expectError: fmt.Errorf(`running Reserve plugin "FakeReserve": %w`, errors.New("reserve error")),
eventReason: "FailedScheduling",
name: "schedule pod failed",
sendPod: testPod,
injectSchedulingError: schedulingErr,
mockScheduleResult: scheduleResultOk,
expectError: schedulingErr,
expectErrorPod: testPod,
expectPodInBackoffQ: testPod,
eventReason: "FailedScheduling",
},
{
name: "error permit pod",
sendPod: podWithID("foo", ""),
mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
name: "reserve failed with status code error",
sendPod: testPod,
registerPluginFuncs: []tf.RegisterPluginFunc{
tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(framework.NewStatus(framework.Error, "permit error"), time.Minute)),
tf.RegisterReservePlugin("FakeReserve", tf.NewFakeReservePlugin(framework.AsStatus(reserveErr))),
},
expectErrorPod: podWithID("foo", testNode.Name),
expectForgetPod: podWithID("foo", testNode.Name),
expectAssumedPod: podWithID("foo", testNode.Name),
expectError: fmt.Errorf(`running Permit plugin "FakePermit": %w`, errors.New("permit error")),
eventReason: "FailedScheduling",
mockScheduleResult: scheduleResultOk,
expectErrorPod: assignedTestPod,
expectForgetPod: assignedTestPod,
expectAssumedPod: assignedTestPod,
expectPodInBackoffQ: testPod,
expectError: fmt.Errorf(`running Reserve plugin "FakeReserve": %w`, reserveErr),
eventReason: "FailedScheduling",
},
{
name: "error prebind pod",
sendPod: podWithID("foo", ""),
mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
name: "reserve failed with status code rejected",
sendPod: testPod,
registerPluginFuncs: []tf.RegisterPluginFunc{
tf.RegisterReservePlugin("FakeReserve", tf.NewFakeReservePlugin(framework.NewStatus(framework.UnschedulableAndUnresolvable, "rejected on reserve"))),
},
mockScheduleResult: scheduleResultOk,
expectErrorPod: assignedTestPod,
expectForgetPod: assignedTestPod,
expectAssumedPod: assignedTestPod,
expectPodInUnschedulable: testPod,
expectError: makePredicateError("1 rejected on reserve"),
eventReason: "FailedScheduling",
},
{
name: "permit failed with status code error",
sendPod: testPod,
registerPluginFuncs: []tf.RegisterPluginFunc{
tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(framework.AsStatus(permitErr), time.Minute)),
},
mockScheduleResult: scheduleResultOk,
expectErrorPod: assignedTestPod,
expectForgetPod: assignedTestPod,
expectAssumedPod: assignedTestPod,
expectPodInBackoffQ: testPod,
expectError: fmt.Errorf(`running Permit plugin "FakePermit": %w`, permitErr),
eventReason: "FailedScheduling",
},
{
name: "permit failed with status code rejected",
sendPod: testPod,
registerPluginFuncs: []tf.RegisterPluginFunc{
tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(framework.NewStatus(framework.Unschedulable, "rejected on permit"), time.Minute)),
},
mockScheduleResult: scheduleResultOk,
expectErrorPod: assignedTestPod,
expectForgetPod: assignedTestPod,
expectAssumedPod: assignedTestPod,
expectPodInUnschedulable: testPod,
expectError: makePredicateError("1 rejected on permit"),
eventReason: "FailedScheduling",
},
{
name: "prebind failed with status code rejected",
sendPod: testPod,
registerPluginFuncs: []tf.RegisterPluginFunc{
tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(framework.NewStatus(framework.Unschedulable, "rejected on prebind"))),
},
mockScheduleResult: scheduleResultOk,
expectErrorPod: assignedTestPod,
expectForgetPod: assignedTestPod,
expectAssumedPod: assignedTestPod,
expectPodInBackoffQ: testPod,
expectError: fmt.Errorf("rejected on prebind"),
eventReason: "FailedScheduling",
},
{
name: "prebind failed with status code error",
sendPod: testPod,
registerPluginFuncs: []tf.RegisterPluginFunc{
tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(framework.AsStatus(preBindErr))),
},
expectErrorPod: podWithID("foo", testNode.Name),
expectForgetPod: podWithID("foo", testNode.Name),
expectAssumedPod: podWithID("foo", testNode.Name),
expectError: fmt.Errorf(`running PreBind plugin "FakePreBind": %w`, preBindErr),
eventReason: "FailedScheduling",
mockScheduleResult: scheduleResultOk,
expectErrorPod: assignedTestPod,
expectForgetPod: assignedTestPod,
expectAssumedPod: assignedTestPod,
expectPodInBackoffQ: testPod,
expectError: fmt.Errorf(`running PreBind plugin "FakePreBind": %w`, preBindErr),
eventReason: "FailedScheduling",
},
{
name: "bind assumed pod scheduled",
sendPod: podWithID("foo", ""),
mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
expectBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}},
expectAssumedPod: podWithID("foo", testNode.Name),
eventReason: "Scheduled",
name: "binding failed",
sendPod: testPod,
injectBindError: bindingErr,
mockScheduleResult: scheduleResultOk,
expectBind: fakeBinding,
expectAssumedPod: assignedTestPod,
expectError: fmt.Errorf("running Bind plugin %q: %w", "DefaultBinder", bindingErr),
expectErrorPod: assignedTestPod,
expectForgetPod: assignedTestPod,
expectPodInBackoffQ: testPod,
eventReason: "FailedScheduling",
},
{
name: "error pod failed scheduling",
sendPod: podWithID("foo", ""),
mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, errS},
expectError: errS,
expectErrorPod: podWithID("foo", ""),
eventReason: "FailedScheduling",
name: "bind assumed pod scheduled",
sendPod: testPod,
mockScheduleResult: scheduleResultOk,
expectBind: fakeBinding,
expectAssumedPod: assignedTestPod,
eventReason: "Scheduled",
},
{
name: "error bind forget pod failed scheduling",
sendPod: podWithID("foo", ""),
mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
expectBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}},
expectAssumedPod: podWithID("foo", testNode.Name),
injectBindError: errB,
expectError: fmt.Errorf("running Bind plugin %q: %w", "DefaultBinder", errors.New("binder")),
expectErrorPod: podWithID("foo", testNode.Name),
expectForgetPod: podWithID("foo", testNode.Name),
eventReason: "FailedScheduling",
},
{
name: "deleting pod",
sendPod: deletingPod("foo"),
mockResult: mockScheduleResult{ScheduleResult{}, nil},
eventReason: "FailedScheduling",
name: "deleting pod",
sendPod: deletingPod("foo"),
mockScheduleResult: emptyScheduleResult,
eventReason: "FailedScheduling",
},
}
@ -794,6 +850,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
gotBinding = action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
return true, gotBinding, item.injectBindError
})
informerFactory := informers.NewSharedInformerFactory(client, 0)
fwk, err := tf.NewFramework(ctx,
append(item.registerPluginFuncs,
@ -804,12 +861,12 @@ func TestSchedulerScheduleOne(t *testing.T) {
frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()),
frameworkruntime.WithInformerFactory(informerFactory),
)
if err != nil {
t.Fatal(err)
}
informerFactory := informers.NewSharedInformerFactory(client, 0)
ar := metrics.NewMetricsAsyncRecorder(10, 1*time.Second, ctx.Done())
queue := internalqueue.NewSchedulingQueue(nil, informerFactory, internalqueue.WithMetricsRecorder(*ar))
sched := &Scheduler{
@ -822,15 +879,13 @@ func TestSchedulerScheduleOne(t *testing.T) {
queue.Add(logger, item.sendPod)
sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) {
return item.mockResult.result, item.mockResult.err
return item.mockScheduleResult, item.injectSchedulingError
}
sched.FailureHandler = func(_ context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *framework.Status, _ *framework.NominatingInfo, _ time.Time) {
sched.FailureHandler = func(ctx context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *framework.Status, ni *framework.NominatingInfo, start time.Time) {
gotPod = p.Pod
gotError = status.AsError()
msg := truncateMessage(gotError.Error())
fwk.EventRecorder().Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
queue.Done(p.Pod.UID)
sched.handleSchedulingFailure(ctx, fwk, p, status, ni, start)
}
called := make(chan struct{})
stopFunc, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
@ -843,6 +898,8 @@ func TestSchedulerScheduleOne(t *testing.T) {
if err != nil {
t.Fatal(err)
}
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
sched.ScheduleOne(ctx)
<-called
if diff := cmp.Diff(item.expectAssumedPod, gotAssumedPod); diff != "" {
@ -864,6 +921,330 @@ func TestSchedulerScheduleOne(t *testing.T) {
if diff := cmp.Diff(item.expectBind, gotBinding); diff != "" {
t.Errorf("Unexpected binding (-want,+got):\n%s", diff)
}
// We have to use wait here because the Pod goes to the binding cycle in some test cases
// and the inflight pods might not be empty immediately at this point in such case.
if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
return len(queue.InFlightPods()) == 0, nil
}); err != nil {
t.Errorf("in-flight pods should be always empty after SchedulingOne. It has %v Pods", len(queue.InFlightPods()))
}
podsInBackoffQ := queue.PodsInBackoffQ()
if item.expectPodInBackoffQ != nil {
if !podListContainsPod(podsInBackoffQ, item.expectPodInBackoffQ) {
t.Errorf("Expected to find pod in backoffQ, but it's not there.\nWant: %v,\ngot: %v", item.expectPodInBackoffQ, podsInBackoffQ)
}
} else {
if len(podsInBackoffQ) > 0 {
t.Errorf("Expected backoffQ to be empty, but it's not.\nGot: %v", podsInBackoffQ)
}
}
unschedulablePods := queue.UnschedulablePods()
if item.expectPodInUnschedulable != nil {
if !podListContainsPod(unschedulablePods, item.expectPodInUnschedulable) {
t.Errorf("Expected to find pod in unschedulable, but it's not there.\nWant: %v,\ngot: %v", item.expectPodInUnschedulable, unschedulablePods)
}
} else {
if len(unschedulablePods) > 0 {
t.Errorf("Expected unschedulable pods to be empty, but it's not.\nGot: %v", unschedulablePods)
}
}
stopFunc()
})
}
}
}
// Tests the logic removing pods from inFlightPods after Permit (needed to fix issue https://github.com/kubernetes/kubernetes/issues/129967).
// This needs to be a separate test case, because it mocks the waitOnPermit and runPrebindPlugins functions.
func TestScheduleOneMarksPodAsProcessedBeforePreBind(t *testing.T) {
testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1", UID: types.UID("node1")}}
client := clientsetfake.NewClientset(&testNode)
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
scheduleResultOk := ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}
emptyScheduleResult := ScheduleResult{}
bindingOk := &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}}
schedulingErr := errors.New("scheduler")
bindingErr := errors.New("binder")
preBindErr := errors.New("on PreBind")
permitErr := errors.New("permit")
waitOnPermitErr := errors.New("wait on permit")
testPod := podWithID("foo", "")
assignedTestPod := podWithID("foo", testNode.Name)
table := []struct {
name string
sendPod *v1.Pod
registerPluginFuncs []tf.RegisterPluginFunc
injectSchedulingError error
injectBindError error
mockScheduleResult ScheduleResult
mockWaitOnPermitResult *framework.Status
mockRunPreBindPluginsResult *framework.Status
expectErrorPod *v1.Pod
expectAssumedPod *v1.Pod
expectError error
expectBind *v1.Binding
eventReason string
expectPodIsInFlightAtFailureHandler bool
expectPodIsInFlightAtWaitOnPermit bool
}{
{
name: "error on permit",
sendPod: testPod,
mockScheduleResult: scheduleResultOk,
registerPluginFuncs: []tf.RegisterPluginFunc{
tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(framework.AsStatus(permitErr), time.Minute)),
},
expectErrorPod: assignedTestPod,
expectAssumedPod: assignedTestPod,
expectError: fmt.Errorf(`running Permit plugin "FakePermit": %w`, permitErr),
eventReason: "FailedScheduling",
expectPodIsInFlightAtFailureHandler: true,
},
{
name: "pod rejected on permit",
sendPod: testPod,
mockScheduleResult: scheduleResultOk,
registerPluginFuncs: []tf.RegisterPluginFunc{
tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(framework.NewStatus(framework.Unschedulable, "on permit"), time.Minute)),
},
expectErrorPod: assignedTestPod,
expectAssumedPod: assignedTestPod,
expectError: makePredicateError("1 on permit"),
eventReason: "FailedScheduling",
expectPodIsInFlightAtFailureHandler: true,
},
{
name: "error on wait on permit",
sendPod: testPod,
mockScheduleResult: scheduleResultOk,
registerPluginFuncs: []tf.RegisterPluginFunc{
tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(framework.NewStatus(framework.Wait), time.Minute)),
},
mockWaitOnPermitResult: framework.AsStatus(waitOnPermitErr),
expectErrorPod: assignedTestPod,
expectAssumedPod: assignedTestPod,
expectError: waitOnPermitErr,
eventReason: "FailedScheduling",
expectPodIsInFlightAtFailureHandler: true,
expectPodIsInFlightAtWaitOnPermit: true,
},
{
name: "pod rejected while wait on permit",
sendPod: testPod,
mockScheduleResult: scheduleResultOk,
registerPluginFuncs: []tf.RegisterPluginFunc{
tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(framework.NewStatus(framework.Wait), time.Minute)),
},
mockWaitOnPermitResult: framework.NewStatus(framework.Unschedulable, "wait on permit"),
expectErrorPod: assignedTestPod,
expectAssumedPod: assignedTestPod,
expectError: makePredicateError("1 wait on permit"),
eventReason: "FailedScheduling",
expectPodIsInFlightAtFailureHandler: true,
expectPodIsInFlightAtWaitOnPermit: true,
},
{
name: "error prebind pod",
sendPod: testPod,
mockScheduleResult: scheduleResultOk,
registerPluginFuncs: []tf.RegisterPluginFunc{
tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(framework.NewStatus(framework.Unschedulable))),
},
mockWaitOnPermitResult: framework.NewStatus(framework.Success),
mockRunPreBindPluginsResult: framework.NewStatus(framework.Unschedulable, preBindErr.Error()),
expectErrorPod: assignedTestPod,
expectAssumedPod: assignedTestPod,
expectError: preBindErr,
eventReason: "FailedScheduling",
expectPodIsInFlightAtFailureHandler: false,
expectPodIsInFlightAtWaitOnPermit: true,
},
{
name: "bind assumed pod scheduled",
sendPod: testPod,
mockScheduleResult: scheduleResultOk,
expectBind: bindingOk,
expectAssumedPod: assignedTestPod,
mockWaitOnPermitResult: framework.NewStatus(framework.Success),
mockRunPreBindPluginsResult: framework.NewStatus(framework.Success),
eventReason: "Scheduled",
expectPodIsInFlightAtFailureHandler: false,
expectPodIsInFlightAtWaitOnPermit: true,
},
{
name: "error pod failed scheduling",
sendPod: testPod,
mockScheduleResult: scheduleResultOk,
injectSchedulingError: schedulingErr,
expectError: schedulingErr,
expectErrorPod: testPod,
eventReason: "FailedScheduling",
expectPodIsInFlightAtFailureHandler: true,
},
{
name: "error bind forget pod failed scheduling",
sendPod: testPod,
mockScheduleResult: scheduleResultOk,
mockWaitOnPermitResult: framework.NewStatus(framework.Success),
mockRunPreBindPluginsResult: framework.NewStatus(framework.Success),
expectBind: bindingOk,
expectAssumedPod: assignedTestPod,
injectBindError: bindingErr,
expectError: fmt.Errorf("running Bind plugin %q: %w", "DefaultBinder", bindingErr),
expectErrorPod: assignedTestPod,
eventReason: "FailedScheduling",
expectPodIsInFlightAtFailureHandler: false,
expectPodIsInFlightAtWaitOnPermit: true,
},
{
name: "deleting pod",
sendPod: deletingPod("foo"),
mockScheduleResult: emptyScheduleResult,
eventReason: "FailedScheduling",
expectPodIsInFlightAtFailureHandler: false,
expectPodIsInFlightAtWaitOnPermit: false,
},
}
for _, qHintEnabled := range []bool{true, false} {
for _, item := range table {
t.Run(fmt.Sprintf("[QueueingHint: %v] %s", qHintEnabled, item.name), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, qHintEnabled)
logger, ctx := ktesting.NewTestContext(t)
var gotError error
var gotPod *v1.Pod
var gotAssumedPod *v1.Pod
var gotBinding *v1.Binding
var gotCallsToFailureHandler int
var gotPodIsInFlightAtFailureHandler bool
var gotPodIsInFlightAtWaitOnPermit bool
var gotPodIsInFlightAtRunPreBindPlugins bool
cache := &fakecache.Cache{
ForgetFunc: func(pod *v1.Pod) {
},
AssumeFunc: func(pod *v1.Pod) {
gotAssumedPod = pod
},
IsAssumedPodFunc: func(pod *v1.Pod) bool {
if pod == nil || gotAssumedPod == nil {
return false
}
return pod.UID == gotAssumedPod.UID
},
}
client := clientsetfake.NewClientset(item.sendPod)
client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
if action.GetSubresource() != "binding" {
return false, nil, nil
}
gotBinding = action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
return true, gotBinding, item.injectBindError
})
informerFactory := informers.NewSharedInformerFactory(client, 0)
ar := metrics.NewMetricsAsyncRecorder(10, 1*time.Second, ctx.Done())
queue := internalqueue.NewSchedulingQueue(nil, informerFactory, internalqueue.WithMetricsRecorder(*ar))
fwk, err := NewFakeFramework(
ctx,
queue,
append(item.registerPluginFuncs,
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
),
testSchedulerName,
frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()),
)
if err != nil {
t.Fatal(err)
}
fwk.waitOnPermitFn = func(_ context.Context, pod *v1.Pod) *framework.Status {
gotPodIsInFlightAtWaitOnPermit = podListContainsPod(fwk.queue.InFlightPods(), pod)
return item.mockWaitOnPermitResult
}
fwk.runPreBindPluginsFn = func(_ context.Context, _ *framework.CycleState, pod *v1.Pod, _ string) *framework.Status {
gotPodIsInFlightAtRunPreBindPlugins = podListContainsPod(fwk.queue.InFlightPods(), pod)
return item.mockRunPreBindPluginsResult
}
sched := &Scheduler{
Cache: cache,
client: client,
NextPod: queue.Pop,
SchedulingQueue: queue,
Profiles: profile.Map{testSchedulerName: fwk},
}
queue.Add(logger, item.sendPod)
sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) {
return item.mockScheduleResult, item.injectSchedulingError
}
sched.FailureHandler = func(_ context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *framework.Status, _ *framework.NominatingInfo, _ time.Time) {
gotCallsToFailureHandler++
gotPodIsInFlightAtFailureHandler = podListContainsPod(queue.InFlightPods(), p.Pod)
gotPod = p.Pod
gotError = status.AsError()
msg := truncateMessage(gotError.Error())
fwk.EventRecorder().Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
queue.Done(p.Pod.UID)
}
called := make(chan struct{})
stopFunc, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
e, _ := obj.(*eventsv1.Event)
if e.Reason != item.eventReason {
t.Errorf("got event %v, want %v", e.Reason, item.eventReason)
}
close(called)
})
if err != nil {
t.Fatal(err)
}
sched.ScheduleOne(ctx)
<-called
if diff := cmp.Diff(item.expectAssumedPod, gotAssumedPod); diff != "" {
t.Errorf("Unexpected assumed pod (-want,+got):\n%s", diff)
}
if diff := cmp.Diff(item.expectErrorPod, gotPod); diff != "" {
t.Errorf("Unexpected error pod (-want,+got):\n%s", diff)
}
if item.expectError == nil || gotError == nil {
if !errors.Is(gotError, item.expectError) {
t.Errorf("Unexpected error. Wanted %v, got %v", item.expectError, gotError)
}
} else if item.expectError.Error() != gotError.Error() {
t.Errorf("Unexpected error. Wanted %v, got %v", item.expectError.Error(), gotError.Error())
}
if diff := cmp.Diff(item.expectBind, gotBinding); diff != "" {
t.Errorf("Unexpected binding (-want,+got):\n%s", diff)
}
if item.expectError != nil && gotCallsToFailureHandler != 1 {
t.Errorf("expected 1 call to FailureHandlerFn, got %v", gotCallsToFailureHandler)
}
if item.expectError == nil && gotCallsToFailureHandler != 0 {
t.Errorf("expected 0 calls to FailureHandlerFn, got %v", gotCallsToFailureHandler)
}
if (item.expectPodIsInFlightAtFailureHandler && qHintEnabled) != gotPodIsInFlightAtFailureHandler {
t.Errorf("unexpected pod being in flight in FailureHandlerFn, expected %v but got %v.",
item.expectPodIsInFlightAtFailureHandler, gotPodIsInFlightAtFailureHandler)
}
if (item.expectPodIsInFlightAtWaitOnPermit && qHintEnabled) != gotPodIsInFlightAtWaitOnPermit {
t.Errorf("unexpected pod being in flight at start of WaitOnPermit, expected %v but got %v",
item.expectPodIsInFlightAtWaitOnPermit, gotPodIsInFlightAtWaitOnPermit)
}
if gotPodIsInFlightAtRunPreBindPlugins {
t.Errorf("unexpected pod being in flight at start of RunPreBindPlugins")
}
// We have to use wait here
// because the Pod goes to the binding cycle in some test cases and the inflight pods might not be empty immediately at this point in such case.
if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
@ -877,6 +1258,41 @@ func TestSchedulerScheduleOne(t *testing.T) {
}
}
// Fake Framework allows mocking calls to WaitOnPermit, RunPreBindPlugins and RunPostBindPlugins, to allow for
// simpler and more efficient testing of Scheduler's logic within the bindingCycle.
type FakeFramework struct {
framework.Framework
queue internalqueue.SchedulingQueue
waitOnPermitFn func(context.Context, *v1.Pod) *framework.Status
runPreBindPluginsFn func(context.Context, *framework.CycleState, *v1.Pod, string) *framework.Status
}
func NewFakeFramework(ctx context.Context, schedQueue internalqueue.SchedulingQueue, fns []tf.RegisterPluginFunc,
profileName string, opts ...frameworkruntime.Option) (*FakeFramework, error) {
fwk, err := tf.NewFramework(ctx, fns, profileName, opts...)
return &FakeFramework{
Framework: fwk,
queue: schedQueue},
err
}
func (ff *FakeFramework) WaitOnPermit(ctx context.Context, pod *v1.Pod) *framework.Status {
return ff.waitOnPermitFn(ctx, pod)
}
func (ff *FakeFramework) RunPreBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
return ff.runPreBindPluginsFn(ctx, state, pod, nodeName)
}
func podListContainsPod(list []*v1.Pod, pod *v1.Pod) bool {
for _, p := range list {
if p.UID == pod.UID {
return true
}
}
return false
}
func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)