From 0105a002bc6f99c39a1ee2d08a895d11a1b5bf49 Mon Sep 17 00:00:00 2001 From: carlory Date: Thu, 13 Jul 2023 21:45:26 +0800 Subject: [PATCH] when the hint fn returns error, the scheduling queue logs the error and treats it as QueueAfterBackoff. Co-authored-by: Kensei Nakada Co-authored-by: Kante Yin Co-authored-by: XsWack --- .../dynamicresources/dynamicresources.go | 48 +++++------ .../dynamicresources/dynamicresources_test.go | 52 +++++++----- .../nodeunschedulable/node_unschedulable.go | 8 +- .../node_unschedulable_test.go | 9 +- pkg/scheduler/framework/types.go | 4 +- .../internal/queue/scheduling_queue.go | 15 +++- .../internal/queue/scheduling_queue_test.go | 85 +++++++++++++------ pkg/scheduler/scheduler.go | 4 +- pkg/scheduler/scheduler_test.go | 18 ++-- pkg/scheduler/util/utils.go | 3 +- pkg/scheduler/util/utils_test.go | 55 ++++++++++++ test/integration/scheduler/queue_test.go | 4 +- .../scheduler/rescheduling_test.go | 8 +- 13 files changed, 216 insertions(+), 97 deletions(-) diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 2c0541b1edd..b63b479895e 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -306,17 +306,16 @@ func (pl *dynamicResources) PreEnqueue(ctx context.Context, pod *v1.Pod) (status // an informer. It checks whether that change made a previously unschedulable // pod schedulable. It errs on the side of letting a pod scheduling attempt // happen. -func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { +func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { if newObj == nil { // Deletes don't make a pod schedulable. - return framework.QueueSkip + return framework.QueueSkip, nil } - _, modifiedClaim, err := schedutil.As[*resourcev1alpha2.ResourceClaim](nil, newObj) + originalClaim, modifiedClaim, err := schedutil.As[*resourcev1alpha2.ResourceClaim](oldObj, newObj) if err != nil { // Shouldn't happen. - logger.Error(err, "unexpected new object in isSchedulableAfterClaimChange") - return framework.QueueAfterBackoff + return framework.QueueAfterBackoff, fmt.Errorf("unexpected object in isSchedulableAfterClaimChange: %w", err) } usesClaim := false @@ -329,30 +328,24 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po // foreachPodResourceClaim only returns errors for "not // schedulable". logger.V(4).Info("pod is not schedulable", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim), "reason", err.Error()) - return framework.QueueSkip + return framework.QueueSkip, nil } if !usesClaim { // This was not the claim the pod was waiting for. logger.V(6).Info("unrelated claim got modified", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim)) - return framework.QueueSkip + return framework.QueueSkip, nil } - if oldObj == nil { + if originalClaim == nil { logger.V(4).Info("claim for pod got created", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim)) - return framework.QueueImmediately + return framework.QueueImmediately, nil } // Modifications may or may not be relevant. If the entire // status is as before, then something else must have changed // and we don't care. What happens in practice is that the // resource driver adds the finalizer. - originalClaim, ok := oldObj.(*resourcev1alpha2.ResourceClaim) - if !ok { - // Shouldn't happen. - logger.Error(nil, "unexpected old object in isSchedulableAfterClaimAddOrUpdate", "obj", oldObj) - return framework.QueueAfterBackoff - } if apiequality.Semantic.DeepEqual(&originalClaim.Status, &modifiedClaim.Status) { if loggerV := logger.V(7); loggerV.Enabled() { // Log more information. @@ -360,11 +353,11 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po } else { logger.V(6).Info("claim for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim)) } - return framework.QueueSkip + return framework.QueueSkip, nil } logger.V(4).Info("status of claim for pod got updated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim)) - return framework.QueueImmediately + return framework.QueueImmediately, nil } // isSchedulableAfterPodSchedulingContextChange is invoked for all @@ -372,25 +365,24 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po // change made a previously unschedulable pod schedulable (updated) or a new // attempt is needed to re-create the object (deleted). It errs on the side of // letting a pod scheduling attempt happen. -func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { +func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { // Deleted? That can happen because we ourselves delete the PodSchedulingContext while // working on the pod. This can be ignored. if oldObj != nil && newObj == nil { logger.V(4).Info("PodSchedulingContext got deleted") - return framework.QueueSkip + return framework.QueueSkip, nil } oldPodScheduling, newPodScheduling, err := schedutil.As[*resourcev1alpha2.PodSchedulingContext](oldObj, newObj) if err != nil { // Shouldn't happen. - logger.Error(nil, "isSchedulableAfterPodSchedulingChange") - return framework.QueueAfterBackoff + return framework.QueueAfterBackoff, fmt.Errorf("unexpected object in isSchedulableAfterPodSchedulingContextChange: %w", err) } podScheduling := newPodScheduling // Never nil because deletes are handled above. if podScheduling.Name != pod.Name || podScheduling.Namespace != pod.Namespace { logger.V(7).Info("PodSchedulingContext for unrelated pod got modified", "pod", klog.KObj(pod), "podScheduling", klog.KObj(podScheduling)) - return framework.QueueSkip + return framework.QueueSkip, nil } // If the drivers have provided information about all @@ -410,7 +402,7 @@ func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(logger // foreachPodResourceClaim only returns errors for "not // schedulable". logger.V(4).Info("pod is not schedulable, keep waiting", "pod", klog.KObj(pod), "reason", err.Error()) - return framework.QueueSkip + return framework.QueueSkip, nil } // Some driver responses missing? @@ -424,14 +416,14 @@ func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(logger } else { logger.V(5).Info("PodSchedulingContext with missing resource claim information, keep waiting", "pod", klog.KObj(pod)) } - return framework.QueueSkip + return framework.QueueSkip, nil } if oldPodScheduling == nil /* create */ || len(oldPodScheduling.Status.ResourceClaims) < len(podScheduling.Status.ResourceClaims) /* new information and not incomplete (checked above) */ { // This definitely is new information for the scheduler. Try again immediately. logger.V(4).Info("PodSchedulingContext for pod has all required information, schedule immediately", "pod", klog.KObj(pod)) - return framework.QueueImmediately + return framework.QueueImmediately, nil } // The other situation where the scheduler needs to do @@ -456,7 +448,7 @@ func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(logger for _, claimStatus := range podScheduling.Status.ResourceClaims { if sliceContains(claimStatus.UnsuitableNodes, podScheduling.Spec.SelectedNode) { logger.V(5).Info("PodSchedulingContext has unsuitable selected node, schedule immediately", "pod", klog.KObj(pod), "selectedNode", podScheduling.Spec.SelectedNode, "podResourceName", claimStatus.Name) - return framework.QueueImmediately + return framework.QueueImmediately, nil } } } @@ -466,7 +458,7 @@ func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(logger !apiequality.Semantic.DeepEqual(&oldPodScheduling.Spec, &podScheduling.Spec) && apiequality.Semantic.DeepEqual(&oldPodScheduling.Status, &podScheduling.Status) { logger.V(5).Info("PodSchedulingContext has only the scheduler spec changes, ignore the update", "pod", klog.KObj(pod)) - return framework.QueueSkip + return framework.QueueSkip, nil } // Once we get here, all changes which are known to require special responses @@ -479,7 +471,7 @@ func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(logger } else { logger.V(5).Info("PodSchedulingContext for pod with unknown changes, maybe schedule", "pod", klog.KObj(pod)) } - return framework.QueueAfterBackoff + return framework.QueueAfterBackoff, nil } diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go index b284a7bfa8f..0ee4fbee2d3 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go @@ -889,6 +889,7 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) { claims []*resourcev1alpha2.ResourceClaim oldObj, newObj interface{} expectedHint framework.QueueingHint + expectedErr bool }{ "skip-deletes": { pod: podWithClaimTemplate, @@ -897,9 +898,9 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) { expectedHint: framework.QueueSkip, }, "backoff-wrong-new-object": { - pod: podWithClaimTemplate, - newObj: "not-a-claim", - expectedHint: framework.QueueAfterBackoff, + pod: podWithClaimTemplate, + newObj: "not-a-claim", + expectedErr: true, }, "skip-wrong-claim": { pod: podWithClaimTemplate, @@ -927,10 +928,10 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) { expectedHint: framework.QueueImmediately, }, "backoff-wrong-old-object": { - pod: podWithClaimName, - oldObj: "not-a-claim", - newObj: pendingImmediateClaim, - expectedHint: framework.QueueAfterBackoff, + pod: podWithClaimName, + oldObj: "not-a-claim", + newObj: pendingImmediateClaim, + expectedErr: true, }, "skip-adding-finalizer": { pod: podWithClaimName, @@ -969,7 +970,13 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) { require.NoError(t, store.Update(claim)) } } - actualHint := testCtx.p.isSchedulableAfterClaimChange(logger, tc.pod, tc.oldObj, tc.newObj) + actualHint, err := testCtx.p.isSchedulableAfterClaimChange(logger, tc.pod, tc.oldObj, tc.newObj) + if tc.expectedErr { + require.Error(t, err) + return + } + + require.NoError(t, err) require.Equal(t, tc.expectedHint, actualHint) }) } @@ -982,6 +989,7 @@ func Test_isSchedulableAfterPodSchedulingContextChange(t *testing.T) { claims []*resourcev1alpha2.ResourceClaim oldObj, newObj interface{} expectedHint framework.QueueingHint + expectedErr bool }{ "skip-deleted": { pod: podWithClaimTemplate, @@ -996,18 +1004,18 @@ func Test_isSchedulableAfterPodSchedulingContextChange(t *testing.T) { expectedHint: framework.QueueSkip, }, "backoff-wrong-old-object": { - pod: podWithClaimTemplate, - oldObj: "not-a-scheduling-context", - newObj: scheduling, - expectedHint: framework.QueueAfterBackoff, + pod: podWithClaimTemplate, + oldObj: "not-a-scheduling-context", + newObj: scheduling, + expectedErr: true, }, "backoff-missed-wrong-old-object": { pod: podWithClaimTemplate, oldObj: cache.DeletedFinalStateUnknown{ Obj: "not-a-scheduling-context", }, - newObj: scheduling, - expectedHint: framework.QueueAfterBackoff, + newObj: scheduling, + expectedErr: true, }, "skip-unrelated-object": { pod: podWithClaimTemplate, @@ -1020,10 +1028,10 @@ func Test_isSchedulableAfterPodSchedulingContextChange(t *testing.T) { expectedHint: framework.QueueSkip, }, "backoff-wrong-new-object": { - pod: podWithClaimTemplate, - oldObj: scheduling, - newObj: "not-a-scheduling-context", - expectedHint: framework.QueueAfterBackoff, + pod: podWithClaimTemplate, + oldObj: scheduling, + newObj: "not-a-scheduling-context", + expectedErr: true, }, "skip-missing-claim": { pod: podWithClaimTemplate, @@ -1091,7 +1099,13 @@ func Test_isSchedulableAfterPodSchedulingContextChange(t *testing.T) { t.Parallel() logger, _ := ktesting.NewTestContext(t) testCtx := setup(t, nil, tc.claims, nil, tc.schedulings) - actualHint := testCtx.p.isSchedulableAfterPodSchedulingContextChange(logger, tc.pod, tc.oldObj, tc.newObj) + actualHint, err := testCtx.p.isSchedulableAfterPodSchedulingContextChange(logger, tc.pod, tc.oldObj, tc.newObj) + if tc.expectedErr { + require.Error(t, err) + return + } + + require.NoError(t, err) require.Equal(t, tc.expectedHint, actualHint) }) } diff --git a/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go b/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go index 5565013da83..f6c8db356ef 100644 --- a/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go +++ b/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go @@ -57,11 +57,11 @@ func (pl *NodeUnschedulable) EventsToRegister() []framework.ClusterEventWithHint // isSchedulableAfterNodeChange is invoked for all node events reported by // an informer. It checks whether that change made a previously unschedulable // pod schedulable. -func (pl *NodeUnschedulable) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { +func (pl *NodeUnschedulable) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { originalNode, modifiedNode, err := util.As[*v1.Node](oldObj, newObj) if err != nil { logger.Error(err, "unexpected objects in isSchedulableAfterNodeChange", "oldObj", oldObj, "newObj", newObj) - return framework.QueueAfterBackoff + return framework.QueueAfterBackoff, err } originalNodeSchedulable, modifiedNodeSchedulable := false, !modifiedNode.Spec.Unschedulable @@ -71,11 +71,11 @@ func (pl *NodeUnschedulable) isSchedulableAfterNodeChange(logger klog.Logger, po if !originalNodeSchedulable && modifiedNodeSchedulable { logger.V(4).Info("node was created or updated, pod may be schedulable now", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode)) - return framework.QueueAfterBackoff + return framework.QueueAfterBackoff, nil } logger.V(4).Info("node was created or updated, but it doesn't make this pod schedulable", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode)) - return framework.QueueSkip + return framework.QueueSkip, nil } // Name returns name of the plugin. It is used in logs, etc. diff --git a/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable_test.go b/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable_test.go index 3be140a8cf4..af7602492f7 100644 --- a/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable_test.go +++ b/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable_test.go @@ -90,12 +90,14 @@ func TestIsSchedulableAfterNodeChange(t *testing.T) { pod *v1.Pod oldObj, newObj interface{} expectedHint framework.QueueingHint + expectedErr bool }{ { name: "backoff-wrong-new-object", pod: &v1.Pod{}, newObj: "not-a-node", expectedHint: framework.QueueAfterBackoff, + expectedErr: true, }, { name: "backoff-wrong-old-object", @@ -107,6 +109,7 @@ func TestIsSchedulableAfterNodeChange(t *testing.T) { }, oldObj: "not-a-node", expectedHint: framework.QueueAfterBackoff, + expectedErr: true, }, { name: "skip-queue-on-unschedulable-node-added", @@ -170,7 +173,11 @@ func TestIsSchedulableAfterNodeChange(t *testing.T) { t.Run(testCase.name, func(t *testing.T) { logger, _ := ktesting.NewTestContext(t) pl := &NodeUnschedulable{} - if got := pl.isSchedulableAfterNodeChange(logger, testCase.pod, testCase.oldObj, testCase.newObj); got != testCase.expectedHint { + got, err := pl.isSchedulableAfterNodeChange(logger, testCase.pod, testCase.oldObj, testCase.newObj) + if err != nil && !testCase.expectedErr { + t.Errorf("unexpected error: %v", err) + } + if got != testCase.expectedHint { t.Errorf("isSchedulableAfterNodeChange() = %v, want %v", got, testCase.expectedHint) } }) diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index edce3a5acfb..c7269890055 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -92,13 +92,15 @@ type ClusterEventWithHint struct { // QueueingHintFn returns a hint that signals whether the event can make a Pod, // which was rejected by this plugin in the past scheduling cycle, schedulable or not. // It's called before a Pod gets moved from unschedulableQ to backoffQ or activeQ. +// If it returns an error, we'll take the returned QueueingHint as `QueueAfterBackoff` at the caller whatever we returned here so that +// we can prevent the Pod from stucking in the unschedulable pod pool. // // - `pod`: the Pod to be enqueued, which is rejected by this plugin in the past. // - `oldObj` `newObj`: the object involved in that event. // - For example, the given event is "Node deleted", the `oldObj` will be that deleted Node. // - `oldObj` is nil if the event is add event. // - `newObj` is nil if the event is delete event. -type QueueingHintFn func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) QueueingHint +type QueueingHintFn func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (QueueingHint, error) type QueueingHint int diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 4772fecb3ae..0c265bcf693 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -433,7 +433,20 @@ func (p *PriorityQueue) isPodWorthRequeuing(logger klog.Logger, pInfo *framework continue } - switch h := hintfn.QueueingHintFn(logger, pod, oldObj, newObj); h { + h, err := hintfn.QueueingHintFn(logger, pod, oldObj, newObj) + if err != nil { + // If the QueueingHintFn returned an error, we should treat the event as QueueAfterBackoff so that we can prevent + // the Pod from stucking in the unschedulable pod pool. + oldObjMeta, newObjMeta, asErr := util.As[klog.KMetadata](oldObj, newObj) + if asErr != nil { + logger.Error(err, "QueueingHintFn returns error", "event", event, "plugin", hintfn.PluginName, "pod", klog.KObj(pod)) + } else { + logger.Error(err, "QueueingHintFn returns error", "event", event, "plugin", hintfn.PluginName, "pod", klog.KObj(pod), "oldObj", klog.KObj(oldObjMeta), "newObj", klog.KObj(newObjMeta)) + } + h = framework.QueueAfterBackoff + } + + switch h { case framework.QueueSkip: continue case framework.QueueImmediately: diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index b17e5a3c3cc..a84fb742cc7 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -87,14 +87,14 @@ var ( cmpopts.IgnoreFields(nominator{}, "podLister", "lock"), } - queueHintReturnQueueAfterBackoff = func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { - return framework.QueueAfterBackoff + queueHintReturnQueueAfterBackoff = func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + return framework.QueueAfterBackoff, nil } - queueHintReturnQueueImmediately = func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { - return framework.QueueImmediately + queueHintReturnQueueImmediately = func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + return framework.QueueImmediately, nil } - queueHintReturnQueueSkip = func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { - return framework.QueueSkip + queueHintReturnQueueSkip = func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + return framework.QueueSkip, nil } ) @@ -3180,17 +3180,21 @@ func mustNewPodInfo(pod *v1.Pod) *framework.PodInfo { // Test_isPodWorthRequeuing tests isPodWorthRequeuing function. func Test_isPodWorthRequeuing(t *testing.T) { count := 0 - queueHintReturnQueueImmediately := func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { + queueHintReturnQueueImmediately := func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { count++ - return framework.QueueImmediately + return framework.QueueImmediately, nil } - queueHintReturnQueueSkip := func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { + queueHintReturnQueueSkip := func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { count++ - return framework.QueueSkip + return framework.QueueSkip, nil } - queueHintReturnQueueAfterBackoff := func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { + queueHintReturnQueueAfterBackoff := func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { count++ - return framework.QueueAfterBackoff + return framework.QueueAfterBackoff, nil + } + queueHintReturnErr := func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + count++ + return framework.QueueSkip, fmt.Errorf("unexpected error") } tests := []struct { @@ -3211,7 +3215,7 @@ func Test_isPodWorthRequeuing(t *testing.T) { }, event: NodeAdd, oldObj: nil, - newObj: st.MakeNode().Node, + newObj: st.MakeNode().Obj(), expected: framework.QueueSkip, expectedExecutionCount: 0, queueingHintMap: QueueingHintMapPerProfile{ @@ -3227,6 +3231,28 @@ func Test_isPodWorthRequeuing(t *testing.T) { }, }, }, + { + name: "Treat the event as QueueAfterBackoff when QueueHintFn returns error", + podInfo: &framework.QueuedPodInfo{ + UnschedulablePlugins: sets.New("fooPlugin1"), + PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()), + }, + event: NodeAdd, + oldObj: nil, + newObj: st.MakeNode().Obj(), + expected: framework.QueueAfterBackoff, + expectedExecutionCount: 1, + queueingHintMap: QueueingHintMapPerProfile{ + "": { + NodeAdd: { + { + PluginName: "fooPlugin1", + QueueingHintFn: queueHintReturnErr, + }, + }, + }, + }, + }, { name: "return QueueAfterBackoff when the event is wildcard", podInfo: &framework.QueuedPodInfo{ @@ -3235,7 +3261,7 @@ func Test_isPodWorthRequeuing(t *testing.T) { }, event: WildCardEvent, oldObj: nil, - newObj: st.MakeNode().Node, + newObj: st.MakeNode().Obj(), expected: framework.QueueAfterBackoff, expectedExecutionCount: 0, queueingHintMap: QueueingHintMapPerProfile{}, @@ -3243,14 +3269,14 @@ func Test_isPodWorthRequeuing(t *testing.T) { { name: "QueueImmediately is the highest priority", podInfo: &framework.QueuedPodInfo{ - UnschedulablePlugins: sets.New("fooPlugin1", "fooPlugin2", "fooPlugin3", "fooPlugin4"), + UnschedulablePlugins: sets.New("fooPlugin1", "fooPlugin2", "fooPlugin3", "fooPlugin4", "fooPlugin5"), PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()), }, event: NodeAdd, oldObj: nil, - newObj: st.MakeNode().Node, + newObj: st.MakeNode().Obj(), expected: framework.QueueImmediately, - expectedExecutionCount: 2, + expectedExecutionCount: 3, queueingHintMap: QueueingHintMapPerProfile{ "": { NodeAdd: { @@ -3259,19 +3285,24 @@ func Test_isPodWorthRequeuing(t *testing.T) { PluginName: "fooPlugin1", QueueingHintFn: queueHintReturnQueueAfterBackoff, }, + { + // executed + PluginName: "fooPlugin2", + QueueingHintFn: queueHintReturnErr, + }, { // executed // But, no more queueing hint function is executed // because the highest priority is QueueImmediately. - PluginName: "fooPlugin2", + PluginName: "fooPlugin3", QueueingHintFn: queueHintReturnQueueImmediately, }, { - PluginName: "fooPlugin3", + PluginName: "fooPlugin4", QueueingHintFn: queueHintReturnQueueAfterBackoff, }, { - PluginName: "fooPlugin4", + PluginName: "fooPlugin5", QueueingHintFn: queueHintReturnQueueSkip, }, }, @@ -3281,14 +3312,14 @@ func Test_isPodWorthRequeuing(t *testing.T) { { name: "QueueSkip is the lowest priority", podInfo: &framework.QueuedPodInfo{ - UnschedulablePlugins: sets.New("fooPlugin1", "fooPlugin2", "fooPlugin3"), + UnschedulablePlugins: sets.New("fooPlugin1", "fooPlugin2", "fooPlugin3", "fooPlugin4"), PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()), }, event: NodeAdd, oldObj: nil, - newObj: st.MakeNode().Node, + newObj: st.MakeNode().Obj(), expected: framework.QueueAfterBackoff, - expectedExecutionCount: 3, + expectedExecutionCount: 4, queueingHintMap: QueueingHintMapPerProfile{ "": { NodeAdd: { @@ -3304,6 +3335,10 @@ func Test_isPodWorthRequeuing(t *testing.T) { PluginName: "fooPlugin3", QueueingHintFn: queueHintReturnQueueAfterBackoff, }, + { + PluginName: "fooPlugin4", + QueueingHintFn: queueHintReturnErr, + }, }, }, }, @@ -3316,7 +3351,7 @@ func Test_isPodWorthRequeuing(t *testing.T) { }, event: NodeAdd, oldObj: nil, - newObj: st.MakeNode().Node, + newObj: st.MakeNode().Obj(), expected: framework.QueueAfterBackoff, expectedExecutionCount: 2, queueingHintMap: QueueingHintMapPerProfile{ @@ -3346,7 +3381,7 @@ func Test_isPodWorthRequeuing(t *testing.T) { }, event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeLabel}, oldObj: nil, - newObj: st.MakeNode().Node, + newObj: st.MakeNode().Obj(), expected: framework.QueueAfterBackoff, expectedExecutionCount: 3, queueingHintMap: QueueingHintMapPerProfile{ diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index e315ec0fc29..635335855d5 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -364,8 +364,8 @@ func New(ctx context.Context, // defaultQueueingHintFn is the default queueing hint function. // It always returns QueueAfterBackoff as the queueing hint. -var defaultQueueingHintFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) framework.QueueingHint { - return framework.QueueAfterBackoff +var defaultQueueingHintFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) (framework.QueueingHint, error) { + return framework.QueueAfterBackoff, nil } func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.QueueingHintMap { diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index bc85e70eaeb..7900f42db7d 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -864,8 +864,10 @@ func Test_buildQueueingHintMap(t *testing.T) { t.Errorf("got plugin name %v, want %v", fn.PluginName, wantfns[i].PluginName) continue } - if fn.QueueingHintFn(logger, nil, nil, nil) != wantfns[i].QueueingHintFn(logger, nil, nil, nil) { - t.Errorf("got queueing hint function (%v) returning %v, expect it to return %v", fn.PluginName, fn.QueueingHintFn(logger, nil, nil, nil), wantfns[i].QueueingHintFn(logger, nil, nil, nil)) + got, gotErr := fn.QueueingHintFn(logger, nil, nil, nil) + want, wantErr := wantfns[i].QueueingHintFn(logger, nil, nil, nil) + if got != want || gotErr != wantErr { + t.Errorf("got queueing hint function (%v) returning (%v, %v), expect it to return (%v, %v)", fn.PluginName, got, gotErr, want, wantErr) continue } } @@ -1089,8 +1091,8 @@ var hintFromFakeNode = framework.QueueingHint(100) type fakeNodePlugin struct{} -var fakeNodePluginQueueingFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) framework.QueueingHint { - return hintFromFakeNode +var fakeNodePluginQueueingFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) (framework.QueueingHint, error) { + return hintFromFakeNode, nil } func (*fakeNodePlugin) Name() string { return fakeNode } @@ -1099,7 +1101,7 @@ func (*fakeNodePlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1. return nil } -func (*fakeNodePlugin) EventsToRegister() []framework.ClusterEventWithHint { +func (pl *fakeNodePlugin) EventsToRegister() []framework.ClusterEventWithHint { return []framework.ClusterEventWithHint{ {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, QueueingHintFn: fakeNodePluginQueueingFn}, } @@ -1109,8 +1111,8 @@ var hintFromFakePod = framework.QueueingHint(101) type fakePodPlugin struct{} -var fakePodPluginQueueingFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) framework.QueueingHint { - return hintFromFakePod +var fakePodPluginQueueingFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) (framework.QueueingHint, error) { + return hintFromFakePod, nil } func (*fakePodPlugin) Name() string { return fakePod } @@ -1119,7 +1121,7 @@ func (*fakePodPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.P return nil } -func (*fakePodPlugin) EventsToRegister() []framework.ClusterEventWithHint { +func (pl *fakePodPlugin) EventsToRegister() []framework.ClusterEventWithHint { return []framework.ClusterEventWithHint{ {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add}, QueueingHintFn: fakePodPluginQueueingFn}, } diff --git a/pkg/scheduler/util/utils.go b/pkg/scheduler/util/utils.go index 967c248355d..e14b4e77453 100644 --- a/pkg/scheduler/util/utils.go +++ b/pkg/scheduler/util/utils.go @@ -25,7 +25,6 @@ import ( v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/net" @@ -167,7 +166,7 @@ func IsScalarResourceName(name v1.ResourceName) bool { // nil objects are allowed and will be converted to nil. // For oldObj, cache.DeletedFinalStateUnknown is handled and the // object stored in it will be converted instead. -func As[T runtime.Object](oldObj, newobj interface{}) (T, T, error) { +func As[T any](oldObj, newobj interface{}) (T, T, error) { var oldTyped T var newTyped T var ok bool diff --git a/pkg/scheduler/util/utils_test.go b/pkg/scheduler/util/utils_test.go index bb8115cb8e1..e187a77068b 100644 --- a/pkg/scheduler/util/utils_test.go +++ b/pkg/scheduler/util/utils_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/util/net" clientsetfake "k8s.io/client-go/kubernetes/fake" clienttesting "k8s.io/client-go/testing" + "k8s.io/klog/v2" extenderv1 "k8s.io/kube-scheduler/extender/v1" ) @@ -487,3 +488,57 @@ func Test_As_Node(t *testing.T) { }) } } + +// Test_As_KMetadata tests the As function with Pod. +func Test_As_KMetadata(t *testing.T) { + tests := []struct { + name string + oldObj interface{} + newObj interface{} + wantErr bool + }{ + { + name: "nil old Pod", + oldObj: nil, + newObj: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + wantErr: false, + }, + { + name: "nil new Pod", + oldObj: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + newObj: nil, + wantErr: false, + }, + { + name: "two different kinds of objects", + oldObj: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + newObj: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + wantErr: false, + }, + { + name: "unknown old type", + oldObj: "unknown type", + wantErr: true, + }, + { + name: "unknown new type", + newObj: "unknown type", + wantErr: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + _, _, err := As[klog.KMetadata](tc.oldObj, tc.newObj) + if err != nil && !tc.wantErr { + t.Fatalf("unexpected error: %v", err) + } + if tc.wantErr { + if err == nil { + t.Fatalf("expected error, but got nil") + } + return + } + }) + } +} diff --git a/test/integration/scheduler/queue_test.go b/test/integration/scheduler/queue_test.go index 10c9eb32a9a..072fabae34a 100644 --- a/test/integration/scheduler/queue_test.go +++ b/test/integration/scheduler/queue_test.go @@ -542,9 +542,9 @@ func TestRequeueByPermitRejection(t *testing.T) { fakePermitPluginName: func(o runtime.Object, fh framework.Handle) (framework.Plugin, error) { fakePermit = &fakePermitPlugin{ frameworkHandler: fh, - schedulingHint: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { + schedulingHint: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { queueingHintCalledCounter++ - return framework.QueueImmediately + return framework.QueueImmediately, nil }, } return fakePermit, nil diff --git a/test/integration/scheduler/rescheduling_test.go b/test/integration/scheduler/rescheduling_test.go index 67781fff9c5..049cde2ea14 100644 --- a/test/integration/scheduler/rescheduling_test.go +++ b/test/integration/scheduler/rescheduling_test.go @@ -70,8 +70,8 @@ func (rp *ReservePlugin) EventsToRegister() []framework.ClusterEventWithHint { return []framework.ClusterEventWithHint{ { Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, - QueueingHintFn: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { - return framework.QueueImmediately + QueueingHintFn: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + return framework.QueueImmediately, nil }, }, } @@ -107,8 +107,8 @@ func (pp *PermitPlugin) EventsToRegister() []framework.ClusterEventWithHint { return []framework.ClusterEventWithHint{ { Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, - QueueingHintFn: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { - return framework.QueueImmediately + QueueingHintFn: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + return framework.QueueImmediately, nil }, }, }