From 6f1a29520febfdb717a9f7ece46ca1ba91a9e79d Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 13 Feb 2023 09:34:11 +0100 Subject: [PATCH] scheduler/dra: reduce pod scheduling latency This is a combination of two related enhancements: - By implementing a PreEnqueue check, the initial pod scheduling attempt for a pod with a claim template gets avoided when the claim does not exist yet. - By implementing cluster event checks, only those pods get scheduled for which something changed, and they get scheduled immediately without delay. --- .../dynamicresources/dynamicresources.go | 266 ++++++++++++++++- .../dynamicresources/dynamicresources_test.go | 281 ++++++++++++++++-- 2 files changed, 516 insertions(+), 31 deletions(-) diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 9ac711ae18a..e16cc4055a7 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -23,8 +23,11 @@ import ( "sort" "sync" + "github.com/google/go-cmp/cmp" + v1 "k8s.io/api/core/v1" resourcev1alpha2 "k8s.io/api/resource/v1alpha2" + apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -38,6 +41,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" + schedutil "k8s.io/kubernetes/pkg/scheduler/util" ) const ( @@ -208,14 +212,25 @@ func statusForClaim(schedulingCtx *resourcev1alpha2.PodSchedulingContext, podCla // dynamicResources is a plugin that ensures that ResourceClaims are allocated. type dynamicResources struct { enabled bool + fh framework.Handle clientset kubernetes.Interface claimLister resourcev1alpha2listers.ResourceClaimLister classLister resourcev1alpha2listers.ResourceClassLister podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister + + // logger is only meant to be used by background activities which don't + // have some other logger in their parent callstack. + logger klog.Logger } // New initializes a new plugin and returns it. func New(plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) { + // TODO: the runtime should set up logging for each plugin, including + // adding a name for each one (same as in kube-controller-manager). + return NewWithLogger(klog.TODO(), plArgs, fh, fts) +} + +func NewWithLogger(logger klog.Logger, plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) { if !fts.EnableDynamicResourceAllocation { // Disabled, won't do anything. return &dynamicResources{}, nil @@ -223,13 +238,16 @@ func New(plArgs runtime.Object, fh framework.Handle, fts feature.Features) (fram return &dynamicResources{ enabled: true, + fh: fh, clientset: fh.ClientSet(), claimLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Lister(), classLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClasses().Lister(), podSchedulingContextLister: fh.SharedInformerFactory().Resource().V1alpha2().PodSchedulingContexts().Lister(), + logger: logger, }, nil } +var _ framework.PreEnqueuePlugin = &dynamicResources{} var _ framework.PreFilterPlugin = &dynamicResources{} var _ framework.FilterPlugin = &dynamicResources{} var _ framework.PostFilterPlugin = &dynamicResources{} @@ -251,12 +269,10 @@ func (pl *dynamicResources) EventsToRegister() []framework.ClusterEventWithHint } events := []framework.ClusterEventWithHint{ // Allocation is tracked in ResourceClaims, so any changes may make the pods schedulable. - {Event: framework.ClusterEvent{Resource: framework.ResourceClaim, ActionType: framework.Add | framework.Update}}, + {Event: framework.ClusterEvent{Resource: framework.ResourceClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClaimChange}, // When a driver has provided additional information, a pod waiting for that information // may be schedulable. - // TODO (#113702): can we change this so that such an event does not trigger *all* pods? - // Yes: https://github.com/kubernetes/kubernetes/blob/abcbaed0784baf5ed2382aae9705a8918f2daa18/pkg/scheduler/eventhandlers.go#L70 - {Event: framework.ClusterEvent{Resource: framework.PodSchedulingContext, ActionType: framework.Add | framework.Update}}, + {Event: framework.ClusterEvent{Resource: framework.PodSchedulingContext, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodSchedulingContextChange}, // A resource might depend on node labels for topology filtering. // A new or updated node may make pods schedulable. {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}}, @@ -264,13 +280,237 @@ func (pl *dynamicResources) EventsToRegister() []framework.ClusterEventWithHint return events } +// PreEnqueue checks if there are known reasons why a pod currently cannot be +// scheduled. When this fails, one of the registered events can trigger another +// attempt. +func (pl *dynamicResources) PreEnqueue(ctx context.Context, pod *v1.Pod) (status *framework.Status) { + if err := pl.foreachPodResourceClaim(pod, nil); err != nil { + return statusUnschedulable(klog.FromContext(ctx), err.Error()) + } + return nil +} + +// isSchedulableAfterClaimChange is invoked for all claim events reported by +// an informer. It checks whether that change made a previously unschedulable +// pod schedulable. It errs on the side of letting a pod scheduling attempt +// happen. +func (pl *dynamicResources) isSchedulableAfterClaimChange(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { + if newObj == nil { + // Deletes don't make a pod schedulable. + return framework.QueueSkip + } + + _, modifiedClaim, err := schedutil.As[*resourcev1alpha2.ResourceClaim](nil, newObj) + if err != nil { + // Shouldn't happen. + pl.logger.Error(err, "unexpected new object in isSchedulableAfterClaimChange") + return framework.QueueAfterBackoff + } + + usesClaim := false + if err := pl.foreachPodResourceClaim(pod, func(_ string, claim *resourcev1alpha2.ResourceClaim) { + if claim.UID == modifiedClaim.UID { + usesClaim = true + } + }); err != nil { + // This is not an unexpected error: we know that + // foreachPodResourceClaim only returns errors for "not + // schedulable". + pl.logger.V(4).Info("pod is not schedulable", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim), "reason", err.Error()) + return framework.QueueSkip + } + + if !usesClaim { + // This was not the claim the pod was waiting for. + pl.logger.V(6).Info("unrelated claim got modified", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim)) + return framework.QueueSkip + } + + if oldObj == nil { + pl.logger.V(4).Info("claim for pod got created", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim)) + return framework.QueueImmediately + } + + // Modifications may or may not be relevant. If the entire + // status is as before, then something else must have changed + // and we don't care. What happens in practice is that the + // resource driver adds the finalizer. + originalClaim, ok := oldObj.(*resourcev1alpha2.ResourceClaim) + if !ok { + // Shouldn't happen. + pl.logger.Error(nil, "unexpected old object in isSchedulableAfterClaimAddOrUpdate", "obj", oldObj) + return framework.QueueAfterBackoff + } + if apiequality.Semantic.DeepEqual(&originalClaim.Status, &modifiedClaim.Status) { + if loggerV := pl.logger.V(7); loggerV.Enabled() { + // Log more information. + loggerV.Info("claim for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim), "diff", cmp.Diff(originalClaim, modifiedClaim)) + } else { + pl.logger.V(6).Info("claim for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim)) + } + return framework.QueueSkip + } + + pl.logger.V(4).Info("status of claim for pod got updated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim)) + return framework.QueueImmediately +} + +// isSchedulableAfterPodSchedulingContextChange is invoked for all +// PodSchedulingContext events reported by an informer. It checks whether that +// change made a previously unschedulable pod schedulable (updated) or a new +// attempt is needed to re-create the object (deleted). It errs on the side of +// letting a pod scheduling attempt happen. +func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { + // Deleted? That can happen because we ourselves delete the PodSchedulingContext while + // working on the pod. This can be ignored. + if oldObj != nil && newObj == nil { + pl.logger.V(4).Info("PodSchedulingContext got deleted") + return framework.QueueSkip + } + + oldPodScheduling, newPodScheduling, err := schedutil.As[*resourcev1alpha2.PodSchedulingContext](oldObj, newObj) + if err != nil { + // Shouldn't happen. + pl.logger.Error(nil, "isSchedulableAfterPodSchedulingChange") + return framework.QueueAfterBackoff + } + podScheduling := newPodScheduling // Never nil because deletes are handled above. + + if podScheduling.Name != pod.Name || podScheduling.Namespace != pod.Namespace { + pl.logger.V(7).Info("PodSchedulingContext for unrelated pod got modified", "pod", klog.KObj(pod), "podScheduling", klog.KObj(podScheduling)) + return framework.QueueSkip + } + + // If the drivers have provided information about all + // unallocated claims with delayed allocation, then the next + // scheduling attempt is able to pick a node, so we let it run + // immediately if this occurred for the first time, otherwise + // we allow backoff. + pendingDelayedClaims := 0 + if err := pl.foreachPodResourceClaim(pod, func(podResourceName string, claim *resourcev1alpha2.ResourceClaim) { + if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer && + claim.Status.Allocation == nil && + !podSchedulingHasClaimInfo(podScheduling, podResourceName) { + pendingDelayedClaims++ + } + }); err != nil { + // This is not an unexpected error: we know that + // foreachPodResourceClaim only returns errors for "not + // schedulable". + pl.logger.V(4).Info("pod is not schedulable, keep waiting", "pod", klog.KObj(pod), "reason", err.Error()) + return framework.QueueSkip + } + + // Some driver responses missing? + if pendingDelayedClaims > 0 { + // We could start a pod scheduling attempt to refresh the + // potential nodes list. But pod scheduling attempts are + // expensive and doing them too often causes the pod to enter + // backoff. Let's wait instead for all drivers to reply. + if loggerV := pl.logger.V(6); loggerV.Enabled() { + loggerV.Info("PodSchedulingContext with missing resource claim information, keep waiting", "pod", klog.KObj(pod), "podSchedulingDiff", cmp.Diff(oldPodScheduling, podScheduling)) + } else { + pl.logger.V(5).Info("PodSchedulingContext with missing resource claim information, keep waiting", "pod", klog.KObj(pod)) + } + return framework.QueueSkip + } + + if oldPodScheduling == nil /* create */ || + len(oldPodScheduling.Status.ResourceClaims) < len(podScheduling.Status.ResourceClaims) /* new information and not incomplete (checked above) */ { + // This definitely is new information for the scheduler. Try again immediately. + pl.logger.V(4).Info("PodSchedulingContext for pod has all required information, schedule immediately", "pod", klog.KObj(pod)) + return framework.QueueImmediately + } + + // The other situation where the scheduler needs to do + // something immediately is when the selected node doesn't + // work: waiting in the backoff queue only helps eventually + // resources on the selected node become available again. It's + // much more likely, in particular when trying to fill up the + // cluster, that the choice simply didn't work out. The risk + // here is that in a situation where the cluster really is + // full, backoff won't be used because the scheduler keeps + // trying different nodes. This should not happen when it has + // full knowledge about resource availability (= + // PodSchedulingContext.*.UnsuitableNodes is complete) but may happen + // when it doesn't (= PodSchedulingContext.*.UnsuitableNodes had to be + // truncated). + // + // Truncation only happens for very large clusters and then may slow + // down scheduling, but should not break it completely. This is + // acceptable while DRA is alpha and will be investigated further + // before moving DRA to beta. + if podScheduling.Spec.SelectedNode != "" { + for _, claimStatus := range podScheduling.Status.ResourceClaims { + if sliceContains(claimStatus.UnsuitableNodes, podScheduling.Spec.SelectedNode) { + pl.logger.V(5).Info("PodSchedulingContext has unsuitable selected node, schedule immediately", "pod", klog.KObj(pod), "selectedNode", podScheduling.Spec.SelectedNode, "podResourceName", claimStatus.Name) + return framework.QueueImmediately + } + } + } + + // Update with only the spec modified? + if oldPodScheduling != nil && + !apiequality.Semantic.DeepEqual(&oldPodScheduling.Spec, &podScheduling.Spec) && + apiequality.Semantic.DeepEqual(&oldPodScheduling.Status, &podScheduling.Status) { + pl.logger.V(5).Info("PodSchedulingContext has only the scheduler spec changes, ignore the update", "pod", klog.KObj(pod)) + return framework.QueueSkip + } + + // Once we get here, all changes which are known to require special responses + // have been checked for. Whatever the change was, we don't know exactly how + // to handle it and thus return QueueAfterBackoff. This will cause the + // scheduler to treat the event as if no event hint callback had been provided. + // Developers who want to investigate this can enable a diff at log level 6. + if loggerV := pl.logger.V(6); loggerV.Enabled() { + loggerV.Info("PodSchedulingContext for pod with unknown changes, maybe schedule", "pod", klog.KObj(pod), "podSchedulingDiff", cmp.Diff(oldPodScheduling, podScheduling)) + } else { + pl.logger.V(5).Info("PodSchedulingContext for pod with unknown changes, maybe schedule", "pod", klog.KObj(pod)) + } + return framework.QueueAfterBackoff + +} + +func podSchedulingHasClaimInfo(podScheduling *resourcev1alpha2.PodSchedulingContext, podResourceName string) bool { + for _, claimStatus := range podScheduling.Status.ResourceClaims { + if claimStatus.Name == podResourceName { + return true + } + } + return false +} + +func sliceContains(hay []string, needle string) bool { + for _, item := range hay { + if item == needle { + return true + } + } + return false +} + // podResourceClaims returns the ResourceClaims for all pod.Spec.PodResourceClaims. func (pl *dynamicResources) podResourceClaims(pod *v1.Pod) ([]*resourcev1alpha2.ResourceClaim, error) { claims := make([]*resourcev1alpha2.ResourceClaim, 0, len(pod.Spec.ResourceClaims)) + if err := pl.foreachPodResourceClaim(pod, func(_ string, claim *resourcev1alpha2.ResourceClaim) { + // We store the pointer as returned by the lister. The + // assumption is that if a claim gets modified while our code + // runs, the cache will store a new pointer, not mutate the + // existing object that we point to here. + claims = append(claims, claim) + }); err != nil { + return nil, err + } + return claims, nil +} + +// foreachPodResourceClaim checks that each ResourceClaim for the pod exists. +// It calls an optional handler for those claims that it finds. +func (pl *dynamicResources) foreachPodResourceClaim(pod *v1.Pod, cb func(podResourceName string, claim *resourcev1alpha2.ResourceClaim)) error { for _, resource := range pod.Spec.ResourceClaims { claimName, mustCheckOwner, err := resourceclaim.Name(pod, &resource) if err != nil { - return nil, err + return err } // The claim name might be nil if no underlying resource claim // was generated for the referenced claim. There are valid use @@ -280,25 +520,23 @@ func (pl *dynamicResources) podResourceClaims(pod *v1.Pod) ([]*resourcev1alpha2. } claim, err := pl.claimLister.ResourceClaims(pod.Namespace).Get(*claimName) if err != nil { - return nil, err + return err } if claim.DeletionTimestamp != nil { - return nil, fmt.Errorf("resourceclaim %q is being deleted", claim.Name) + return fmt.Errorf("resourceclaim %q is being deleted", claim.Name) } if mustCheckOwner { if err := resourceclaim.IsForPod(pod, claim); err != nil { - return nil, err + return err } } - // We store the pointer as returned by the lister. The - // assumption is that if a claim gets modified while our code - // runs, the cache will store a new pointer, not mutate the - // existing object that we point to here. - claims = append(claims, claim) + if cb != nil { + cb(resource.Name, claim) + } } - return claims, nil + return nil } // PreFilter invoked at the prefilter extension point to check if pod has all diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go index 2c863463199..0c6c119c569 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go @@ -23,6 +23,7 @@ import ( "sort" "sync" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" @@ -37,6 +38,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" cgotesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2/ktesting" _ "k8s.io/klog/v2/ktesting/init" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -103,6 +105,7 @@ var ( AllocationMode(resourcev1alpha2.AllocationModeImmediate). Obj() pendingDelayedClaim = st.FromResourceClaim(claim). + OwnerReference(podName, podUID, podKind). AllocationMode(resourcev1alpha2.AllocationModeWaitForFirstConsumer). Obj() pendingDelayedClaim2 = st.FromResourceClaim(pendingDelayedClaim). @@ -117,7 +120,6 @@ var ( ReservedFor(resourcev1alpha2.ResourceClaimConsumerReference{Resource: "pods", Name: podName, UID: types.UID(podUID)}). Obj() allocatedClaim = st.FromResourceClaim(pendingDelayedClaim). - OwnerReference(podName, podUID, podKind). Allocation(&resourcev1alpha2.AllocationResult{}). Obj() allocatedDelayedClaimWithWrongTopology = st.FromResourceClaim(allocatedClaim). @@ -183,6 +185,7 @@ func (p perNodeResult) forNode(nodeName string) result { } type want struct { + preenqueue result preFilterResult *framework.PreFilterResult prefilter result filter perNodeResult @@ -268,11 +271,34 @@ func TestPlugin(t *testing.T) { pod: podWithClaimTemplate, // status not set claims: []*resourcev1alpha2.ResourceClaim{allocatedClaim, otherClaim}, want: want{ - prefilter: result{ + preenqueue: result{ status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `pod "default/my-pod": ResourceClaim not created yet`), }, - postfilter: result{ - status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`), + }, + }, + "deleted-claim": { + pod: podWithClaimTemplateInStatus, + claims: func() []*resourcev1alpha2.ResourceClaim { + claim := allocatedClaim.DeepCopy() + claim.DeletionTimestamp = &metav1.Time{Time: time.Now()} + return []*resourcev1alpha2.ResourceClaim{claim} + }(), + want: want{ + preenqueue: result{ + status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim "my-pod-my-resource" is being deleted`), + }, + }, + }, + "wrong-claim": { + pod: podWithClaimTemplateInStatus, + claims: func() []*resourcev1alpha2.ResourceClaim { + claim := allocatedClaim.DeepCopy() + claim.OwnerReferences[0].UID += "123" + return []*resourcev1alpha2.ResourceClaim{claim} + }(), + want: want{ + preenqueue: result{ + status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `ResourceClaim default/my-pod-my-resource was not created for pod default/my-pod (pod is not owner)`), }, }, }, @@ -506,8 +532,16 @@ func TestPlugin(t *testing.T) { } testCtx := setup(t, nodes, tc.claims, tc.classes, tc.schedulings) testCtx.p.enabled = !tc.disable - initialObjects := testCtx.listAll(t) + + status := testCtx.p.PreEnqueue(testCtx.ctx, tc.pod) + t.Run("PreEnqueue", func(t *testing.T) { + testCtx.verify(t, tc.want.preenqueue, initialObjects, nil, status) + }) + if !status.IsSuccess() { + return + } + result, status := testCtx.p.PreFilter(testCtx.ctx, testCtx.state, tc.pod) t.Run("prefilter", func(t *testing.T) { assert.Equal(t, tc.want.preFilterResult, result) @@ -595,11 +629,12 @@ func TestPlugin(t *testing.T) { } type testContext struct { - ctx context.Context - client *fake.Clientset - p *dynamicResources - nodeInfos []*framework.NodeInfo - state *framework.CycleState + ctx context.Context + client *fake.Clientset + informerFactory informers.SharedInformerFactory + p *dynamicResources + nodeInfos []*framework.NodeInfo + state *framework.CycleState } func (tc *testContext) verify(t *testing.T, expected result, initialObjects []metav1.Object, result interface{}, status *framework.Status) { @@ -727,7 +762,7 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl t.Helper() tc := &testContext{} - _, ctx := ktesting.NewTestContext(t) + logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) t.Cleanup(cancel) tc.ctx = ctx @@ -736,18 +771,18 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl reactor := createReactor(tc.client.Tracker()) tc.client.PrependReactor("*", "*", reactor) - informerFactory := informers.NewSharedInformerFactory(tc.client, 0) + tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0) opts := []runtime.Option{ runtime.WithClientSet(tc.client), - runtime.WithInformerFactory(informerFactory), + runtime.WithInformerFactory(tc.informerFactory), } fh, err := runtime.NewFramework(ctx, nil, nil, opts...) if err != nil { t.Fatal(err) } - pl, err := New(nil, fh, feature.Features{EnableDynamicResourceAllocation: true}) + pl, err := NewWithLogger(logger, nil, fh, feature.Features{EnableDynamicResourceAllocation: true}) if err != nil { t.Fatal(err) } @@ -768,15 +803,15 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl require.NoError(t, err, "create pod scheduling") } - informerFactory.Start(tc.ctx.Done()) + tc.informerFactory.Start(tc.ctx.Done()) t.Cleanup(func() { // Need to cancel before waiting for the shutdown. cancel() // Now we can wait for all goroutines to stop. - informerFactory.Shutdown() + tc.informerFactory.Shutdown() }) - informerFactory.WaitForCacheSync(tc.ctx.Done()) + tc.informerFactory.WaitForCacheSync(tc.ctx.Done()) for _, node := range nodes { nodeInfo := framework.NewNodeInfo() @@ -849,3 +884,215 @@ func createReactor(tracker cgotesting.ObjectTracker) func(action cgotesting.Acti return false, nil, nil } } + +func TestClaimChange(t *testing.T) { + testcases := map[string]struct { + pod *v1.Pod + claims []*resourcev1alpha2.ResourceClaim + oldObj, newObj interface{} + expectedHint framework.QueueingHint + }{ + "skip-deletes": { + pod: podWithClaimTemplate, + oldObj: allocatedClaim, + newObj: nil, + expectedHint: framework.QueueSkip, + }, + "backoff-wrong-new-object": { + pod: podWithClaimTemplate, + newObj: "not-a-claim", + expectedHint: framework.QueueAfterBackoff, + }, + "skip-wrong-claim": { + pod: podWithClaimTemplate, + newObj: func() *resourcev1alpha2.ResourceClaim { + claim := allocatedClaim.DeepCopy() + claim.OwnerReferences[0].UID += "123" + return claim + }(), + expectedHint: framework.QueueSkip, + }, + "skip-unrelated-claim": { + pod: podWithClaimTemplate, + claims: []*resourcev1alpha2.ResourceClaim{allocatedClaim}, + newObj: func() *resourcev1alpha2.ResourceClaim { + claim := allocatedClaim.DeepCopy() + claim.Name += "-foo" + claim.UID += "123" + return claim + }(), + expectedHint: framework.QueueSkip, + }, + "queue-on-add": { + pod: podWithClaimName, + newObj: pendingImmediateClaim, + expectedHint: framework.QueueImmediately, + }, + "backoff-wrong-old-object": { + pod: podWithClaimName, + oldObj: "not-a-claim", + newObj: pendingImmediateClaim, + expectedHint: framework.QueueAfterBackoff, + }, + "skip-adding-finalizer": { + pod: podWithClaimName, + claims: []*resourcev1alpha2.ResourceClaim{pendingImmediateClaim}, + oldObj: pendingImmediateClaim, + newObj: func() *resourcev1alpha2.ResourceClaim { + claim := pendingImmediateClaim.DeepCopy() + claim.Finalizers = append(claim.Finalizers, "foo") + return claim + }(), + expectedHint: framework.QueueSkip, + }, + "queue-on-status-change": { + pod: podWithClaimName, + claims: []*resourcev1alpha2.ResourceClaim{pendingImmediateClaim}, + oldObj: pendingImmediateClaim, + newObj: func() *resourcev1alpha2.ResourceClaim { + claim := pendingImmediateClaim.DeepCopy() + claim.Status.Allocation = &resourcev1alpha2.AllocationResult{} + return claim + }(), + expectedHint: framework.QueueImmediately, + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + testCtx := setup(t, nil, tc.claims, nil, nil) + if claim, ok := tc.newObj.(*resourcev1alpha2.ResourceClaim); ok { + // Update the informer because the lister gets called and must have the claim. + store := testCtx.informerFactory.Resource().V1alpha2().ResourceClaims().Informer().GetStore() + if tc.oldObj == nil { + require.NoError(t, store.Add(claim)) + } else { + require.NoError(t, store.Update(claim)) + } + } + actualHint := testCtx.p.isSchedulableAfterClaimChange(tc.pod, tc.oldObj, tc.newObj) + require.Equal(t, tc.expectedHint, actualHint) + }) + } +} + +func TestPodSchedulingContextChange(t *testing.T) { + testcases := map[string]struct { + pod *v1.Pod + schedulings []*resourcev1alpha2.PodSchedulingContext + claims []*resourcev1alpha2.ResourceClaim + oldObj, newObj interface{} + expectedHint framework.QueueingHint + }{ + "skip-deleted": { + pod: podWithClaimTemplate, + oldObj: scheduling, + expectedHint: framework.QueueSkip, + }, + "skip-missed-deleted": { + pod: podWithClaimTemplate, + oldObj: cache.DeletedFinalStateUnknown{ + Obj: scheduling, + }, + expectedHint: framework.QueueSkip, + }, + "backoff-wrong-old-object": { + pod: podWithClaimTemplate, + oldObj: "not-a-scheduling-context", + newObj: scheduling, + expectedHint: framework.QueueAfterBackoff, + }, + "backoff-missed-wrong-old-object": { + pod: podWithClaimTemplate, + oldObj: cache.DeletedFinalStateUnknown{ + Obj: "not-a-scheduling-context", + }, + newObj: scheduling, + expectedHint: framework.QueueAfterBackoff, + }, + "skip-unrelated-object": { + pod: podWithClaimTemplate, + claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim}, + newObj: func() *resourcev1alpha2.PodSchedulingContext { + scheduling := scheduling.DeepCopy() + scheduling.Name += "-foo" + return scheduling + }(), + expectedHint: framework.QueueSkip, + }, + "backoff-wrong-new-object": { + pod: podWithClaimTemplate, + oldObj: scheduling, + newObj: "not-a-scheduling-context", + expectedHint: framework.QueueAfterBackoff, + }, + "skip-missing-claim": { + pod: podWithClaimTemplate, + oldObj: scheduling, + newObj: schedulingInfo, + expectedHint: framework.QueueSkip, + }, + "skip-missing-infos": { + pod: podWithClaimTemplateInStatus, + claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim}, + oldObj: scheduling, + newObj: scheduling, + expectedHint: framework.QueueSkip, + }, + "queue-new-infos": { + pod: podWithClaimTemplateInStatus, + claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim}, + oldObj: scheduling, + newObj: schedulingInfo, + expectedHint: framework.QueueImmediately, + }, + "queue-bad-selected-node": { + pod: podWithClaimTemplateInStatus, + claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim}, + oldObj: func() *resourcev1alpha2.PodSchedulingContext { + scheduling := schedulingInfo.DeepCopy() + scheduling.Spec.SelectedNode = workerNode.Name + return scheduling + }(), + newObj: func() *resourcev1alpha2.PodSchedulingContext { + scheduling := schedulingInfo.DeepCopy() + scheduling.Spec.SelectedNode = workerNode.Name + scheduling.Status.ResourceClaims[0].UnsuitableNodes = append(scheduling.Status.ResourceClaims[0].UnsuitableNodes, scheduling.Spec.SelectedNode) + return scheduling + }(), + expectedHint: framework.QueueImmediately, + }, + "skip-spec-changes": { + pod: podWithClaimTemplateInStatus, + claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim}, + oldObj: schedulingInfo, + newObj: func() *resourcev1alpha2.PodSchedulingContext { + scheduling := schedulingInfo.DeepCopy() + scheduling.Spec.SelectedNode = workerNode.Name + return scheduling + }(), + expectedHint: framework.QueueSkip, + }, + "backoff-other-changes": { + pod: podWithClaimTemplateInStatus, + claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim}, + oldObj: schedulingInfo, + newObj: func() *resourcev1alpha2.PodSchedulingContext { + scheduling := schedulingInfo.DeepCopy() + scheduling.Finalizers = append(scheduling.Finalizers, "foo") + return scheduling + }(), + expectedHint: framework.QueueAfterBackoff, + }, + } + + for name, tc := range testcases { + tc := tc + t.Run(name, func(t *testing.T) { + t.Parallel() + testCtx := setup(t, nil, tc.claims, nil, tc.schedulings) + actualHint := testCtx.p.isSchedulableAfterPodSchedulingContextChange(tc.pod, tc.oldObj, tc.newObj) + require.Equal(t, tc.expectedHint, actualHint) + }) + } +}