when the hint fn returns error, the scheduling queue logs the error and treats it as QueueAfterBackoff.

Co-authored-by: Kensei Nakada <handbomusic@gmail.com>

Co-authored-by: Kante Yin <kerthcet@gmail.com>

Co-authored-by: XsWack <xushiwei5@huawei.com>
This commit is contained in:
carlory 2023-07-13 21:45:26 +08:00
parent 09200e9c92
commit 0105a002bc
13 changed files with 216 additions and 97 deletions

View File

@ -306,17 +306,16 @@ func (pl *dynamicResources) PreEnqueue(ctx context.Context, pod *v1.Pod) (status
// an informer. It checks whether that change made a previously unschedulable // an informer. It checks whether that change made a previously unschedulable
// pod schedulable. It errs on the side of letting a pod scheduling attempt // pod schedulable. It errs on the side of letting a pod scheduling attempt
// happen. // happen.
func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
if newObj == nil { if newObj == nil {
// Deletes don't make a pod schedulable. // Deletes don't make a pod schedulable.
return framework.QueueSkip return framework.QueueSkip, nil
} }
_, modifiedClaim, err := schedutil.As[*resourcev1alpha2.ResourceClaim](nil, newObj) originalClaim, modifiedClaim, err := schedutil.As[*resourcev1alpha2.ResourceClaim](oldObj, newObj)
if err != nil { if err != nil {
// Shouldn't happen. // Shouldn't happen.
logger.Error(err, "unexpected new object in isSchedulableAfterClaimChange") return framework.QueueAfterBackoff, fmt.Errorf("unexpected object in isSchedulableAfterClaimChange: %w", err)
return framework.QueueAfterBackoff
} }
usesClaim := false usesClaim := false
@ -329,30 +328,24 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po
// foreachPodResourceClaim only returns errors for "not // foreachPodResourceClaim only returns errors for "not
// schedulable". // schedulable".
logger.V(4).Info("pod is not schedulable", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim), "reason", err.Error()) logger.V(4).Info("pod is not schedulable", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim), "reason", err.Error())
return framework.QueueSkip return framework.QueueSkip, nil
} }
if !usesClaim { if !usesClaim {
// This was not the claim the pod was waiting for. // This was not the claim the pod was waiting for.
logger.V(6).Info("unrelated claim got modified", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim)) logger.V(6).Info("unrelated claim got modified", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
return framework.QueueSkip return framework.QueueSkip, nil
} }
if oldObj == nil { if originalClaim == nil {
logger.V(4).Info("claim for pod got created", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim)) logger.V(4).Info("claim for pod got created", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
return framework.QueueImmediately return framework.QueueImmediately, nil
} }
// Modifications may or may not be relevant. If the entire // Modifications may or may not be relevant. If the entire
// status is as before, then something else must have changed // status is as before, then something else must have changed
// and we don't care. What happens in practice is that the // and we don't care. What happens in practice is that the
// resource driver adds the finalizer. // resource driver adds the finalizer.
originalClaim, ok := oldObj.(*resourcev1alpha2.ResourceClaim)
if !ok {
// Shouldn't happen.
logger.Error(nil, "unexpected old object in isSchedulableAfterClaimAddOrUpdate", "obj", oldObj)
return framework.QueueAfterBackoff
}
if apiequality.Semantic.DeepEqual(&originalClaim.Status, &modifiedClaim.Status) { if apiequality.Semantic.DeepEqual(&originalClaim.Status, &modifiedClaim.Status) {
if loggerV := logger.V(7); loggerV.Enabled() { if loggerV := logger.V(7); loggerV.Enabled() {
// Log more information. // Log more information.
@ -360,11 +353,11 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po
} else { } else {
logger.V(6).Info("claim for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim)) logger.V(6).Info("claim for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
} }
return framework.QueueSkip return framework.QueueSkip, nil
} }
logger.V(4).Info("status of claim for pod got updated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim)) logger.V(4).Info("status of claim for pod got updated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
return framework.QueueImmediately return framework.QueueImmediately, nil
} }
// isSchedulableAfterPodSchedulingContextChange is invoked for all // isSchedulableAfterPodSchedulingContextChange is invoked for all
@ -372,25 +365,24 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po
// change made a previously unschedulable pod schedulable (updated) or a new // change made a previously unschedulable pod schedulable (updated) or a new
// attempt is needed to re-create the object (deleted). It errs on the side of // attempt is needed to re-create the object (deleted). It errs on the side of
// letting a pod scheduling attempt happen. // letting a pod scheduling attempt happen.
func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
// Deleted? That can happen because we ourselves delete the PodSchedulingContext while // Deleted? That can happen because we ourselves delete the PodSchedulingContext while
// working on the pod. This can be ignored. // working on the pod. This can be ignored.
if oldObj != nil && newObj == nil { if oldObj != nil && newObj == nil {
logger.V(4).Info("PodSchedulingContext got deleted") logger.V(4).Info("PodSchedulingContext got deleted")
return framework.QueueSkip return framework.QueueSkip, nil
} }
oldPodScheduling, newPodScheduling, err := schedutil.As[*resourcev1alpha2.PodSchedulingContext](oldObj, newObj) oldPodScheduling, newPodScheduling, err := schedutil.As[*resourcev1alpha2.PodSchedulingContext](oldObj, newObj)
if err != nil { if err != nil {
// Shouldn't happen. // Shouldn't happen.
logger.Error(nil, "isSchedulableAfterPodSchedulingChange") return framework.QueueAfterBackoff, fmt.Errorf("unexpected object in isSchedulableAfterPodSchedulingContextChange: %w", err)
return framework.QueueAfterBackoff
} }
podScheduling := newPodScheduling // Never nil because deletes are handled above. podScheduling := newPodScheduling // Never nil because deletes are handled above.
if podScheduling.Name != pod.Name || podScheduling.Namespace != pod.Namespace { if podScheduling.Name != pod.Name || podScheduling.Namespace != pod.Namespace {
logger.V(7).Info("PodSchedulingContext for unrelated pod got modified", "pod", klog.KObj(pod), "podScheduling", klog.KObj(podScheduling)) logger.V(7).Info("PodSchedulingContext for unrelated pod got modified", "pod", klog.KObj(pod), "podScheduling", klog.KObj(podScheduling))
return framework.QueueSkip return framework.QueueSkip, nil
} }
// If the drivers have provided information about all // If the drivers have provided information about all
@ -410,7 +402,7 @@ func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(logger
// foreachPodResourceClaim only returns errors for "not // foreachPodResourceClaim only returns errors for "not
// schedulable". // schedulable".
logger.V(4).Info("pod is not schedulable, keep waiting", "pod", klog.KObj(pod), "reason", err.Error()) logger.V(4).Info("pod is not schedulable, keep waiting", "pod", klog.KObj(pod), "reason", err.Error())
return framework.QueueSkip return framework.QueueSkip, nil
} }
// Some driver responses missing? // Some driver responses missing?
@ -424,14 +416,14 @@ func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(logger
} else { } else {
logger.V(5).Info("PodSchedulingContext with missing resource claim information, keep waiting", "pod", klog.KObj(pod)) logger.V(5).Info("PodSchedulingContext with missing resource claim information, keep waiting", "pod", klog.KObj(pod))
} }
return framework.QueueSkip return framework.QueueSkip, nil
} }
if oldPodScheduling == nil /* create */ || if oldPodScheduling == nil /* create */ ||
len(oldPodScheduling.Status.ResourceClaims) < len(podScheduling.Status.ResourceClaims) /* new information and not incomplete (checked above) */ { len(oldPodScheduling.Status.ResourceClaims) < len(podScheduling.Status.ResourceClaims) /* new information and not incomplete (checked above) */ {
// This definitely is new information for the scheduler. Try again immediately. // This definitely is new information for the scheduler. Try again immediately.
logger.V(4).Info("PodSchedulingContext for pod has all required information, schedule immediately", "pod", klog.KObj(pod)) logger.V(4).Info("PodSchedulingContext for pod has all required information, schedule immediately", "pod", klog.KObj(pod))
return framework.QueueImmediately return framework.QueueImmediately, nil
} }
// The other situation where the scheduler needs to do // The other situation where the scheduler needs to do
@ -456,7 +448,7 @@ func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(logger
for _, claimStatus := range podScheduling.Status.ResourceClaims { for _, claimStatus := range podScheduling.Status.ResourceClaims {
if sliceContains(claimStatus.UnsuitableNodes, podScheduling.Spec.SelectedNode) { if sliceContains(claimStatus.UnsuitableNodes, podScheduling.Spec.SelectedNode) {
logger.V(5).Info("PodSchedulingContext has unsuitable selected node, schedule immediately", "pod", klog.KObj(pod), "selectedNode", podScheduling.Spec.SelectedNode, "podResourceName", claimStatus.Name) logger.V(5).Info("PodSchedulingContext has unsuitable selected node, schedule immediately", "pod", klog.KObj(pod), "selectedNode", podScheduling.Spec.SelectedNode, "podResourceName", claimStatus.Name)
return framework.QueueImmediately return framework.QueueImmediately, nil
} }
} }
} }
@ -466,7 +458,7 @@ func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(logger
!apiequality.Semantic.DeepEqual(&oldPodScheduling.Spec, &podScheduling.Spec) && !apiequality.Semantic.DeepEqual(&oldPodScheduling.Spec, &podScheduling.Spec) &&
apiequality.Semantic.DeepEqual(&oldPodScheduling.Status, &podScheduling.Status) { apiequality.Semantic.DeepEqual(&oldPodScheduling.Status, &podScheduling.Status) {
logger.V(5).Info("PodSchedulingContext has only the scheduler spec changes, ignore the update", "pod", klog.KObj(pod)) logger.V(5).Info("PodSchedulingContext has only the scheduler spec changes, ignore the update", "pod", klog.KObj(pod))
return framework.QueueSkip return framework.QueueSkip, nil
} }
// Once we get here, all changes which are known to require special responses // Once we get here, all changes which are known to require special responses
@ -479,7 +471,7 @@ func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(logger
} else { } else {
logger.V(5).Info("PodSchedulingContext for pod with unknown changes, maybe schedule", "pod", klog.KObj(pod)) logger.V(5).Info("PodSchedulingContext for pod with unknown changes, maybe schedule", "pod", klog.KObj(pod))
} }
return framework.QueueAfterBackoff return framework.QueueAfterBackoff, nil
} }

View File

@ -889,6 +889,7 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
claims []*resourcev1alpha2.ResourceClaim claims []*resourcev1alpha2.ResourceClaim
oldObj, newObj interface{} oldObj, newObj interface{}
expectedHint framework.QueueingHint expectedHint framework.QueueingHint
expectedErr bool
}{ }{
"skip-deletes": { "skip-deletes": {
pod: podWithClaimTemplate, pod: podWithClaimTemplate,
@ -897,9 +898,9 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
expectedHint: framework.QueueSkip, expectedHint: framework.QueueSkip,
}, },
"backoff-wrong-new-object": { "backoff-wrong-new-object": {
pod: podWithClaimTemplate, pod: podWithClaimTemplate,
newObj: "not-a-claim", newObj: "not-a-claim",
expectedHint: framework.QueueAfterBackoff, expectedErr: true,
}, },
"skip-wrong-claim": { "skip-wrong-claim": {
pod: podWithClaimTemplate, pod: podWithClaimTemplate,
@ -927,10 +928,10 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
expectedHint: framework.QueueImmediately, expectedHint: framework.QueueImmediately,
}, },
"backoff-wrong-old-object": { "backoff-wrong-old-object": {
pod: podWithClaimName, pod: podWithClaimName,
oldObj: "not-a-claim", oldObj: "not-a-claim",
newObj: pendingImmediateClaim, newObj: pendingImmediateClaim,
expectedHint: framework.QueueAfterBackoff, expectedErr: true,
}, },
"skip-adding-finalizer": { "skip-adding-finalizer": {
pod: podWithClaimName, pod: podWithClaimName,
@ -969,7 +970,13 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
require.NoError(t, store.Update(claim)) require.NoError(t, store.Update(claim))
} }
} }
actualHint := testCtx.p.isSchedulableAfterClaimChange(logger, tc.pod, tc.oldObj, tc.newObj) actualHint, err := testCtx.p.isSchedulableAfterClaimChange(logger, tc.pod, tc.oldObj, tc.newObj)
if tc.expectedErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.expectedHint, actualHint) require.Equal(t, tc.expectedHint, actualHint)
}) })
} }
@ -982,6 +989,7 @@ func Test_isSchedulableAfterPodSchedulingContextChange(t *testing.T) {
claims []*resourcev1alpha2.ResourceClaim claims []*resourcev1alpha2.ResourceClaim
oldObj, newObj interface{} oldObj, newObj interface{}
expectedHint framework.QueueingHint expectedHint framework.QueueingHint
expectedErr bool
}{ }{
"skip-deleted": { "skip-deleted": {
pod: podWithClaimTemplate, pod: podWithClaimTemplate,
@ -996,18 +1004,18 @@ func Test_isSchedulableAfterPodSchedulingContextChange(t *testing.T) {
expectedHint: framework.QueueSkip, expectedHint: framework.QueueSkip,
}, },
"backoff-wrong-old-object": { "backoff-wrong-old-object": {
pod: podWithClaimTemplate, pod: podWithClaimTemplate,
oldObj: "not-a-scheduling-context", oldObj: "not-a-scheduling-context",
newObj: scheduling, newObj: scheduling,
expectedHint: framework.QueueAfterBackoff, expectedErr: true,
}, },
"backoff-missed-wrong-old-object": { "backoff-missed-wrong-old-object": {
pod: podWithClaimTemplate, pod: podWithClaimTemplate,
oldObj: cache.DeletedFinalStateUnknown{ oldObj: cache.DeletedFinalStateUnknown{
Obj: "not-a-scheduling-context", Obj: "not-a-scheduling-context",
}, },
newObj: scheduling, newObj: scheduling,
expectedHint: framework.QueueAfterBackoff, expectedErr: true,
}, },
"skip-unrelated-object": { "skip-unrelated-object": {
pod: podWithClaimTemplate, pod: podWithClaimTemplate,
@ -1020,10 +1028,10 @@ func Test_isSchedulableAfterPodSchedulingContextChange(t *testing.T) {
expectedHint: framework.QueueSkip, expectedHint: framework.QueueSkip,
}, },
"backoff-wrong-new-object": { "backoff-wrong-new-object": {
pod: podWithClaimTemplate, pod: podWithClaimTemplate,
oldObj: scheduling, oldObj: scheduling,
newObj: "not-a-scheduling-context", newObj: "not-a-scheduling-context",
expectedHint: framework.QueueAfterBackoff, expectedErr: true,
}, },
"skip-missing-claim": { "skip-missing-claim": {
pod: podWithClaimTemplate, pod: podWithClaimTemplate,
@ -1091,7 +1099,13 @@ func Test_isSchedulableAfterPodSchedulingContextChange(t *testing.T) {
t.Parallel() t.Parallel()
logger, _ := ktesting.NewTestContext(t) logger, _ := ktesting.NewTestContext(t)
testCtx := setup(t, nil, tc.claims, nil, tc.schedulings) testCtx := setup(t, nil, tc.claims, nil, tc.schedulings)
actualHint := testCtx.p.isSchedulableAfterPodSchedulingContextChange(logger, tc.pod, tc.oldObj, tc.newObj) actualHint, err := testCtx.p.isSchedulableAfterPodSchedulingContextChange(logger, tc.pod, tc.oldObj, tc.newObj)
if tc.expectedErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.expectedHint, actualHint) require.Equal(t, tc.expectedHint, actualHint)
}) })
} }

View File

@ -57,11 +57,11 @@ func (pl *NodeUnschedulable) EventsToRegister() []framework.ClusterEventWithHint
// isSchedulableAfterNodeChange is invoked for all node events reported by // isSchedulableAfterNodeChange is invoked for all node events reported by
// an informer. It checks whether that change made a previously unschedulable // an informer. It checks whether that change made a previously unschedulable
// pod schedulable. // pod schedulable.
func (pl *NodeUnschedulable) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { func (pl *NodeUnschedulable) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
originalNode, modifiedNode, err := util.As[*v1.Node](oldObj, newObj) originalNode, modifiedNode, err := util.As[*v1.Node](oldObj, newObj)
if err != nil { if err != nil {
logger.Error(err, "unexpected objects in isSchedulableAfterNodeChange", "oldObj", oldObj, "newObj", newObj) logger.Error(err, "unexpected objects in isSchedulableAfterNodeChange", "oldObj", oldObj, "newObj", newObj)
return framework.QueueAfterBackoff return framework.QueueAfterBackoff, err
} }
originalNodeSchedulable, modifiedNodeSchedulable := false, !modifiedNode.Spec.Unschedulable originalNodeSchedulable, modifiedNodeSchedulable := false, !modifiedNode.Spec.Unschedulable
@ -71,11 +71,11 @@ func (pl *NodeUnschedulable) isSchedulableAfterNodeChange(logger klog.Logger, po
if !originalNodeSchedulable && modifiedNodeSchedulable { if !originalNodeSchedulable && modifiedNodeSchedulable {
logger.V(4).Info("node was created or updated, pod may be schedulable now", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode)) logger.V(4).Info("node was created or updated, pod may be schedulable now", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
return framework.QueueAfterBackoff return framework.QueueAfterBackoff, nil
} }
logger.V(4).Info("node was created or updated, but it doesn't make this pod schedulable", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode)) logger.V(4).Info("node was created or updated, but it doesn't make this pod schedulable", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
return framework.QueueSkip return framework.QueueSkip, nil
} }
// Name returns name of the plugin. It is used in logs, etc. // Name returns name of the plugin. It is used in logs, etc.

View File

@ -90,12 +90,14 @@ func TestIsSchedulableAfterNodeChange(t *testing.T) {
pod *v1.Pod pod *v1.Pod
oldObj, newObj interface{} oldObj, newObj interface{}
expectedHint framework.QueueingHint expectedHint framework.QueueingHint
expectedErr bool
}{ }{
{ {
name: "backoff-wrong-new-object", name: "backoff-wrong-new-object",
pod: &v1.Pod{}, pod: &v1.Pod{},
newObj: "not-a-node", newObj: "not-a-node",
expectedHint: framework.QueueAfterBackoff, expectedHint: framework.QueueAfterBackoff,
expectedErr: true,
}, },
{ {
name: "backoff-wrong-old-object", name: "backoff-wrong-old-object",
@ -107,6 +109,7 @@ func TestIsSchedulableAfterNodeChange(t *testing.T) {
}, },
oldObj: "not-a-node", oldObj: "not-a-node",
expectedHint: framework.QueueAfterBackoff, expectedHint: framework.QueueAfterBackoff,
expectedErr: true,
}, },
{ {
name: "skip-queue-on-unschedulable-node-added", name: "skip-queue-on-unschedulable-node-added",
@ -170,7 +173,11 @@ func TestIsSchedulableAfterNodeChange(t *testing.T) {
t.Run(testCase.name, func(t *testing.T) { t.Run(testCase.name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t) logger, _ := ktesting.NewTestContext(t)
pl := &NodeUnschedulable{} pl := &NodeUnschedulable{}
if got := pl.isSchedulableAfterNodeChange(logger, testCase.pod, testCase.oldObj, testCase.newObj); got != testCase.expectedHint { got, err := pl.isSchedulableAfterNodeChange(logger, testCase.pod, testCase.oldObj, testCase.newObj)
if err != nil && !testCase.expectedErr {
t.Errorf("unexpected error: %v", err)
}
if got != testCase.expectedHint {
t.Errorf("isSchedulableAfterNodeChange() = %v, want %v", got, testCase.expectedHint) t.Errorf("isSchedulableAfterNodeChange() = %v, want %v", got, testCase.expectedHint)
} }
}) })

View File

@ -92,13 +92,15 @@ type ClusterEventWithHint struct {
// QueueingHintFn returns a hint that signals whether the event can make a Pod, // QueueingHintFn returns a hint that signals whether the event can make a Pod,
// which was rejected by this plugin in the past scheduling cycle, schedulable or not. // which was rejected by this plugin in the past scheduling cycle, schedulable or not.
// It's called before a Pod gets moved from unschedulableQ to backoffQ or activeQ. // It's called before a Pod gets moved from unschedulableQ to backoffQ or activeQ.
// If it returns an error, we'll take the returned QueueingHint as `QueueAfterBackoff` at the caller whatever we returned here so that
// we can prevent the Pod from stucking in the unschedulable pod pool.
// //
// - `pod`: the Pod to be enqueued, which is rejected by this plugin in the past. // - `pod`: the Pod to be enqueued, which is rejected by this plugin in the past.
// - `oldObj` `newObj`: the object involved in that event. // - `oldObj` `newObj`: the object involved in that event.
// - For example, the given event is "Node deleted", the `oldObj` will be that deleted Node. // - For example, the given event is "Node deleted", the `oldObj` will be that deleted Node.
// - `oldObj` is nil if the event is add event. // - `oldObj` is nil if the event is add event.
// - `newObj` is nil if the event is delete event. // - `newObj` is nil if the event is delete event.
type QueueingHintFn func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) QueueingHint type QueueingHintFn func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (QueueingHint, error)
type QueueingHint int type QueueingHint int

View File

@ -433,7 +433,20 @@ func (p *PriorityQueue) isPodWorthRequeuing(logger klog.Logger, pInfo *framework
continue continue
} }
switch h := hintfn.QueueingHintFn(logger, pod, oldObj, newObj); h { h, err := hintfn.QueueingHintFn(logger, pod, oldObj, newObj)
if err != nil {
// If the QueueingHintFn returned an error, we should treat the event as QueueAfterBackoff so that we can prevent
// the Pod from stucking in the unschedulable pod pool.
oldObjMeta, newObjMeta, asErr := util.As[klog.KMetadata](oldObj, newObj)
if asErr != nil {
logger.Error(err, "QueueingHintFn returns error", "event", event, "plugin", hintfn.PluginName, "pod", klog.KObj(pod))
} else {
logger.Error(err, "QueueingHintFn returns error", "event", event, "plugin", hintfn.PluginName, "pod", klog.KObj(pod), "oldObj", klog.KObj(oldObjMeta), "newObj", klog.KObj(newObjMeta))
}
h = framework.QueueAfterBackoff
}
switch h {
case framework.QueueSkip: case framework.QueueSkip:
continue continue
case framework.QueueImmediately: case framework.QueueImmediately:

View File

@ -87,14 +87,14 @@ var (
cmpopts.IgnoreFields(nominator{}, "podLister", "lock"), cmpopts.IgnoreFields(nominator{}, "podLister", "lock"),
} }
queueHintReturnQueueAfterBackoff = func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { queueHintReturnQueueAfterBackoff = func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
return framework.QueueAfterBackoff return framework.QueueAfterBackoff, nil
} }
queueHintReturnQueueImmediately = func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { queueHintReturnQueueImmediately = func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
return framework.QueueImmediately return framework.QueueImmediately, nil
} }
queueHintReturnQueueSkip = func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { queueHintReturnQueueSkip = func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
return framework.QueueSkip return framework.QueueSkip, nil
} }
) )
@ -3180,17 +3180,21 @@ func mustNewPodInfo(pod *v1.Pod) *framework.PodInfo {
// Test_isPodWorthRequeuing tests isPodWorthRequeuing function. // Test_isPodWorthRequeuing tests isPodWorthRequeuing function.
func Test_isPodWorthRequeuing(t *testing.T) { func Test_isPodWorthRequeuing(t *testing.T) {
count := 0 count := 0
queueHintReturnQueueImmediately := func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { queueHintReturnQueueImmediately := func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
count++ count++
return framework.QueueImmediately return framework.QueueImmediately, nil
} }
queueHintReturnQueueSkip := func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { queueHintReturnQueueSkip := func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
count++ count++
return framework.QueueSkip return framework.QueueSkip, nil
} }
queueHintReturnQueueAfterBackoff := func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { queueHintReturnQueueAfterBackoff := func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
count++ count++
return framework.QueueAfterBackoff return framework.QueueAfterBackoff, nil
}
queueHintReturnErr := func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
count++
return framework.QueueSkip, fmt.Errorf("unexpected error")
} }
tests := []struct { tests := []struct {
@ -3211,7 +3215,7 @@ func Test_isPodWorthRequeuing(t *testing.T) {
}, },
event: NodeAdd, event: NodeAdd,
oldObj: nil, oldObj: nil,
newObj: st.MakeNode().Node, newObj: st.MakeNode().Obj(),
expected: framework.QueueSkip, expected: framework.QueueSkip,
expectedExecutionCount: 0, expectedExecutionCount: 0,
queueingHintMap: QueueingHintMapPerProfile{ queueingHintMap: QueueingHintMapPerProfile{
@ -3227,6 +3231,28 @@ func Test_isPodWorthRequeuing(t *testing.T) {
}, },
}, },
}, },
{
name: "Treat the event as QueueAfterBackoff when QueueHintFn returns error",
podInfo: &framework.QueuedPodInfo{
UnschedulablePlugins: sets.New("fooPlugin1"),
PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()),
},
event: NodeAdd,
oldObj: nil,
newObj: st.MakeNode().Obj(),
expected: framework.QueueAfterBackoff,
expectedExecutionCount: 1,
queueingHintMap: QueueingHintMapPerProfile{
"": {
NodeAdd: {
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnErr,
},
},
},
},
},
{ {
name: "return QueueAfterBackoff when the event is wildcard", name: "return QueueAfterBackoff when the event is wildcard",
podInfo: &framework.QueuedPodInfo{ podInfo: &framework.QueuedPodInfo{
@ -3235,7 +3261,7 @@ func Test_isPodWorthRequeuing(t *testing.T) {
}, },
event: WildCardEvent, event: WildCardEvent,
oldObj: nil, oldObj: nil,
newObj: st.MakeNode().Node, newObj: st.MakeNode().Obj(),
expected: framework.QueueAfterBackoff, expected: framework.QueueAfterBackoff,
expectedExecutionCount: 0, expectedExecutionCount: 0,
queueingHintMap: QueueingHintMapPerProfile{}, queueingHintMap: QueueingHintMapPerProfile{},
@ -3243,14 +3269,14 @@ func Test_isPodWorthRequeuing(t *testing.T) {
{ {
name: "QueueImmediately is the highest priority", name: "QueueImmediately is the highest priority",
podInfo: &framework.QueuedPodInfo{ podInfo: &framework.QueuedPodInfo{
UnschedulablePlugins: sets.New("fooPlugin1", "fooPlugin2", "fooPlugin3", "fooPlugin4"), UnschedulablePlugins: sets.New("fooPlugin1", "fooPlugin2", "fooPlugin3", "fooPlugin4", "fooPlugin5"),
PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()), PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()),
}, },
event: NodeAdd, event: NodeAdd,
oldObj: nil, oldObj: nil,
newObj: st.MakeNode().Node, newObj: st.MakeNode().Obj(),
expected: framework.QueueImmediately, expected: framework.QueueImmediately,
expectedExecutionCount: 2, expectedExecutionCount: 3,
queueingHintMap: QueueingHintMapPerProfile{ queueingHintMap: QueueingHintMapPerProfile{
"": { "": {
NodeAdd: { NodeAdd: {
@ -3259,19 +3285,24 @@ func Test_isPodWorthRequeuing(t *testing.T) {
PluginName: "fooPlugin1", PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnQueueAfterBackoff, QueueingHintFn: queueHintReturnQueueAfterBackoff,
}, },
{
// executed
PluginName: "fooPlugin2",
QueueingHintFn: queueHintReturnErr,
},
{ {
// executed // executed
// But, no more queueing hint function is executed // But, no more queueing hint function is executed
// because the highest priority is QueueImmediately. // because the highest priority is QueueImmediately.
PluginName: "fooPlugin2", PluginName: "fooPlugin3",
QueueingHintFn: queueHintReturnQueueImmediately, QueueingHintFn: queueHintReturnQueueImmediately,
}, },
{ {
PluginName: "fooPlugin3", PluginName: "fooPlugin4",
QueueingHintFn: queueHintReturnQueueAfterBackoff, QueueingHintFn: queueHintReturnQueueAfterBackoff,
}, },
{ {
PluginName: "fooPlugin4", PluginName: "fooPlugin5",
QueueingHintFn: queueHintReturnQueueSkip, QueueingHintFn: queueHintReturnQueueSkip,
}, },
}, },
@ -3281,14 +3312,14 @@ func Test_isPodWorthRequeuing(t *testing.T) {
{ {
name: "QueueSkip is the lowest priority", name: "QueueSkip is the lowest priority",
podInfo: &framework.QueuedPodInfo{ podInfo: &framework.QueuedPodInfo{
UnschedulablePlugins: sets.New("fooPlugin1", "fooPlugin2", "fooPlugin3"), UnschedulablePlugins: sets.New("fooPlugin1", "fooPlugin2", "fooPlugin3", "fooPlugin4"),
PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()), PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()),
}, },
event: NodeAdd, event: NodeAdd,
oldObj: nil, oldObj: nil,
newObj: st.MakeNode().Node, newObj: st.MakeNode().Obj(),
expected: framework.QueueAfterBackoff, expected: framework.QueueAfterBackoff,
expectedExecutionCount: 3, expectedExecutionCount: 4,
queueingHintMap: QueueingHintMapPerProfile{ queueingHintMap: QueueingHintMapPerProfile{
"": { "": {
NodeAdd: { NodeAdd: {
@ -3304,6 +3335,10 @@ func Test_isPodWorthRequeuing(t *testing.T) {
PluginName: "fooPlugin3", PluginName: "fooPlugin3",
QueueingHintFn: queueHintReturnQueueAfterBackoff, QueueingHintFn: queueHintReturnQueueAfterBackoff,
}, },
{
PluginName: "fooPlugin4",
QueueingHintFn: queueHintReturnErr,
},
}, },
}, },
}, },
@ -3316,7 +3351,7 @@ func Test_isPodWorthRequeuing(t *testing.T) {
}, },
event: NodeAdd, event: NodeAdd,
oldObj: nil, oldObj: nil,
newObj: st.MakeNode().Node, newObj: st.MakeNode().Obj(),
expected: framework.QueueAfterBackoff, expected: framework.QueueAfterBackoff,
expectedExecutionCount: 2, expectedExecutionCount: 2,
queueingHintMap: QueueingHintMapPerProfile{ queueingHintMap: QueueingHintMapPerProfile{
@ -3346,7 +3381,7 @@ func Test_isPodWorthRequeuing(t *testing.T) {
}, },
event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeLabel}, event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeLabel},
oldObj: nil, oldObj: nil,
newObj: st.MakeNode().Node, newObj: st.MakeNode().Obj(),
expected: framework.QueueAfterBackoff, expected: framework.QueueAfterBackoff,
expectedExecutionCount: 3, expectedExecutionCount: 3,
queueingHintMap: QueueingHintMapPerProfile{ queueingHintMap: QueueingHintMapPerProfile{

View File

@ -364,8 +364,8 @@ func New(ctx context.Context,
// defaultQueueingHintFn is the default queueing hint function. // defaultQueueingHintFn is the default queueing hint function.
// It always returns QueueAfterBackoff as the queueing hint. // It always returns QueueAfterBackoff as the queueing hint.
var defaultQueueingHintFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) framework.QueueingHint { var defaultQueueingHintFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) (framework.QueueingHint, error) {
return framework.QueueAfterBackoff return framework.QueueAfterBackoff, nil
} }
func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.QueueingHintMap { func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.QueueingHintMap {

View File

@ -864,8 +864,10 @@ func Test_buildQueueingHintMap(t *testing.T) {
t.Errorf("got plugin name %v, want %v", fn.PluginName, wantfns[i].PluginName) t.Errorf("got plugin name %v, want %v", fn.PluginName, wantfns[i].PluginName)
continue continue
} }
if fn.QueueingHintFn(logger, nil, nil, nil) != wantfns[i].QueueingHintFn(logger, nil, nil, nil) { got, gotErr := fn.QueueingHintFn(logger, nil, nil, nil)
t.Errorf("got queueing hint function (%v) returning %v, expect it to return %v", fn.PluginName, fn.QueueingHintFn(logger, nil, nil, nil), wantfns[i].QueueingHintFn(logger, nil, nil, nil)) want, wantErr := wantfns[i].QueueingHintFn(logger, nil, nil, nil)
if got != want || gotErr != wantErr {
t.Errorf("got queueing hint function (%v) returning (%v, %v), expect it to return (%v, %v)", fn.PluginName, got, gotErr, want, wantErr)
continue continue
} }
} }
@ -1089,8 +1091,8 @@ var hintFromFakeNode = framework.QueueingHint(100)
type fakeNodePlugin struct{} type fakeNodePlugin struct{}
var fakeNodePluginQueueingFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) framework.QueueingHint { var fakeNodePluginQueueingFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) (framework.QueueingHint, error) {
return hintFromFakeNode return hintFromFakeNode, nil
} }
func (*fakeNodePlugin) Name() string { return fakeNode } func (*fakeNodePlugin) Name() string { return fakeNode }
@ -1099,7 +1101,7 @@ func (*fakeNodePlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.
return nil return nil
} }
func (*fakeNodePlugin) EventsToRegister() []framework.ClusterEventWithHint { func (pl *fakeNodePlugin) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{ return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, QueueingHintFn: fakeNodePluginQueueingFn}, {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, QueueingHintFn: fakeNodePluginQueueingFn},
} }
@ -1109,8 +1111,8 @@ var hintFromFakePod = framework.QueueingHint(101)
type fakePodPlugin struct{} type fakePodPlugin struct{}
var fakePodPluginQueueingFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) framework.QueueingHint { var fakePodPluginQueueingFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) (framework.QueueingHint, error) {
return hintFromFakePod return hintFromFakePod, nil
} }
func (*fakePodPlugin) Name() string { return fakePod } func (*fakePodPlugin) Name() string { return fakePod }
@ -1119,7 +1121,7 @@ func (*fakePodPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.P
return nil return nil
} }
func (*fakePodPlugin) EventsToRegister() []framework.ClusterEventWithHint { func (pl *fakePodPlugin) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{ return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add}, QueueingHintFn: fakePodPluginQueueingFn}, {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add}, QueueingHintFn: fakePodPluginQueueingFn},
} }

View File

@ -25,7 +25,6 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/net"
@ -167,7 +166,7 @@ func IsScalarResourceName(name v1.ResourceName) bool {
// nil objects are allowed and will be converted to nil. // nil objects are allowed and will be converted to nil.
// For oldObj, cache.DeletedFinalStateUnknown is handled and the // For oldObj, cache.DeletedFinalStateUnknown is handled and the
// object stored in it will be converted instead. // object stored in it will be converted instead.
func As[T runtime.Object](oldObj, newobj interface{}) (T, T, error) { func As[T any](oldObj, newobj interface{}) (T, T, error) {
var oldTyped T var oldTyped T
var newTyped T var newTyped T
var ok bool var ok bool

View File

@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/net"
clientsetfake "k8s.io/client-go/kubernetes/fake" clientsetfake "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing" clienttesting "k8s.io/client-go/testing"
"k8s.io/klog/v2"
extenderv1 "k8s.io/kube-scheduler/extender/v1" extenderv1 "k8s.io/kube-scheduler/extender/v1"
) )
@ -487,3 +488,57 @@ func Test_As_Node(t *testing.T) {
}) })
} }
} }
// Test_As_KMetadata tests the As function with Pod.
func Test_As_KMetadata(t *testing.T) {
tests := []struct {
name string
oldObj interface{}
newObj interface{}
wantErr bool
}{
{
name: "nil old Pod",
oldObj: nil,
newObj: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
wantErr: false,
},
{
name: "nil new Pod",
oldObj: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
newObj: nil,
wantErr: false,
},
{
name: "two different kinds of objects",
oldObj: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
newObj: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
wantErr: false,
},
{
name: "unknown old type",
oldObj: "unknown type",
wantErr: true,
},
{
name: "unknown new type",
newObj: "unknown type",
wantErr: true,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
_, _, err := As[klog.KMetadata](tc.oldObj, tc.newObj)
if err != nil && !tc.wantErr {
t.Fatalf("unexpected error: %v", err)
}
if tc.wantErr {
if err == nil {
t.Fatalf("expected error, but got nil")
}
return
}
})
}
}

View File

@ -542,9 +542,9 @@ func TestRequeueByPermitRejection(t *testing.T) {
fakePermitPluginName: func(o runtime.Object, fh framework.Handle) (framework.Plugin, error) { fakePermitPluginName: func(o runtime.Object, fh framework.Handle) (framework.Plugin, error) {
fakePermit = &fakePermitPlugin{ fakePermit = &fakePermitPlugin{
frameworkHandler: fh, frameworkHandler: fh,
schedulingHint: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { schedulingHint: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
queueingHintCalledCounter++ queueingHintCalledCounter++
return framework.QueueImmediately return framework.QueueImmediately, nil
}, },
} }
return fakePermit, nil return fakePermit, nil

View File

@ -70,8 +70,8 @@ func (rp *ReservePlugin) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{ return []framework.ClusterEventWithHint{
{ {
Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add},
QueueingHintFn: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { QueueingHintFn: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
return framework.QueueImmediately return framework.QueueImmediately, nil
}, },
}, },
} }
@ -107,8 +107,8 @@ func (pp *PermitPlugin) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{ return []framework.ClusterEventWithHint{
{ {
Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add},
QueueingHintFn: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { QueueingHintFn: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
return framework.QueueImmediately return framework.QueueImmediately, nil
}, },
}, },
} }