From ef48efc736cf543ea0582dffdb19b614daa33f34 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 4 Jul 2023 14:18:49 +0200 Subject: [PATCH 1/3] scheduler dynamicresources: minor logging improvements This makes some complex values a bit more readable. --- .../framework/plugins/dynamicresources/dynamicresources.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 8ec90bae497..9ac711ae18a 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -169,7 +169,7 @@ func (d *stateData) publishPodSchedulingContexts(ctx context.Context, clientset } if loggerV := logger.V(6); loggerV.Enabled() { // At a high enough log level, dump the entire object. - loggerV.Info(msg, "podSchedulingCtxDump", schedulingCtx) + loggerV.Info(msg, "podSchedulingCtxDump", klog.Format(schedulingCtx)) } else { logger.V(5).Info(msg, "podSchedulingCtx", klog.KObj(schedulingCtx)) } @@ -577,7 +577,7 @@ func (pl *dynamicResources) PreScore(ctx context.Context, cs *framework.CycleSta sort.Strings(schedulingCtx.Spec.PotentialNodes) state.storePodSchedulingContexts(schedulingCtx) } - logger.V(5).Info("all potential nodes already set", "pod", klog.KObj(pod), "potentialnodes", nodes) + logger.V(5).Info("all potential nodes already set", "pod", klog.KObj(pod), "potentialnodes", klog.KObjSlice(nodes)) return nil } From e01db325735745865ed5c53fc96ddb1d64e19a28 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Sat, 8 Jul 2023 12:50:25 +0200 Subject: [PATCH 2/3] scheduler util: handle cache.DeletedFinalStateUnknown in As Informer callbacks must be prepared to get cache.DeletedFinalStateUnknown as the deleted object. They can use that as hint that some information may have been missed, but typically they just retrieve the stored object inside it. --- pkg/scheduler/util/utils.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/scheduler/util/utils.go b/pkg/scheduler/util/utils.go index eb7c4301f30..967c248355d 100644 --- a/pkg/scheduler/util/utils.go +++ b/pkg/scheduler/util/utils.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/retry" corev1helpers "k8s.io/component-helpers/scheduling/corev1" "k8s.io/klog/v2" @@ -164,6 +165,8 @@ func IsScalarResourceName(name v1.ResourceName) bool { // As converts two objects to the given type. // Both objects must be of the same type. If not, an error is returned. // nil objects are allowed and will be converted to nil. +// For oldObj, cache.DeletedFinalStateUnknown is handled and the +// object stored in it will be converted instead. func As[T runtime.Object](oldObj, newobj interface{}) (T, T, error) { var oldTyped T var newTyped T @@ -176,6 +179,9 @@ func As[T runtime.Object](oldObj, newobj interface{}) (T, T, error) { } if oldObj != nil { + if realOldObj, ok := oldObj.(cache.DeletedFinalStateUnknown); ok { + oldObj = realOldObj.Obj + } oldTyped, ok = oldObj.(T) if !ok { return oldTyped, newTyped, fmt.Errorf("expected %T, but got %T", oldTyped, oldObj) From 6f1a29520febfdb717a9f7ece46ca1ba91a9e79d Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 13 Feb 2023 09:34:11 +0100 Subject: [PATCH 3/3] scheduler/dra: reduce pod scheduling latency This is a combination of two related enhancements: - By implementing a PreEnqueue check, the initial pod scheduling attempt for a pod with a claim template gets avoided when the claim does not exist yet. - By implementing cluster event checks, only those pods get scheduled for which something changed, and they get scheduled immediately without delay. --- .../dynamicresources/dynamicresources.go | 266 ++++++++++++++++- .../dynamicresources/dynamicresources_test.go | 281 ++++++++++++++++-- 2 files changed, 516 insertions(+), 31 deletions(-) diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 9ac711ae18a..e16cc4055a7 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -23,8 +23,11 @@ import ( "sort" "sync" + "github.com/google/go-cmp/cmp" + v1 "k8s.io/api/core/v1" resourcev1alpha2 "k8s.io/api/resource/v1alpha2" + apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -38,6 +41,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" + schedutil "k8s.io/kubernetes/pkg/scheduler/util" ) const ( @@ -208,14 +212,25 @@ func statusForClaim(schedulingCtx *resourcev1alpha2.PodSchedulingContext, podCla // dynamicResources is a plugin that ensures that ResourceClaims are allocated. type dynamicResources struct { enabled bool + fh framework.Handle clientset kubernetes.Interface claimLister resourcev1alpha2listers.ResourceClaimLister classLister resourcev1alpha2listers.ResourceClassLister podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister + + // logger is only meant to be used by background activities which don't + // have some other logger in their parent callstack. + logger klog.Logger } // New initializes a new plugin and returns it. func New(plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) { + // TODO: the runtime should set up logging for each plugin, including + // adding a name for each one (same as in kube-controller-manager). + return NewWithLogger(klog.TODO(), plArgs, fh, fts) +} + +func NewWithLogger(logger klog.Logger, plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) { if !fts.EnableDynamicResourceAllocation { // Disabled, won't do anything. return &dynamicResources{}, nil @@ -223,13 +238,16 @@ func New(plArgs runtime.Object, fh framework.Handle, fts feature.Features) (fram return &dynamicResources{ enabled: true, + fh: fh, clientset: fh.ClientSet(), claimLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Lister(), classLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClasses().Lister(), podSchedulingContextLister: fh.SharedInformerFactory().Resource().V1alpha2().PodSchedulingContexts().Lister(), + logger: logger, }, nil } +var _ framework.PreEnqueuePlugin = &dynamicResources{} var _ framework.PreFilterPlugin = &dynamicResources{} var _ framework.FilterPlugin = &dynamicResources{} var _ framework.PostFilterPlugin = &dynamicResources{} @@ -251,12 +269,10 @@ func (pl *dynamicResources) EventsToRegister() []framework.ClusterEventWithHint } events := []framework.ClusterEventWithHint{ // Allocation is tracked in ResourceClaims, so any changes may make the pods schedulable. - {Event: framework.ClusterEvent{Resource: framework.ResourceClaim, ActionType: framework.Add | framework.Update}}, + {Event: framework.ClusterEvent{Resource: framework.ResourceClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClaimChange}, // When a driver has provided additional information, a pod waiting for that information // may be schedulable. - // TODO (#113702): can we change this so that such an event does not trigger *all* pods? - // Yes: https://github.com/kubernetes/kubernetes/blob/abcbaed0784baf5ed2382aae9705a8918f2daa18/pkg/scheduler/eventhandlers.go#L70 - {Event: framework.ClusterEvent{Resource: framework.PodSchedulingContext, ActionType: framework.Add | framework.Update}}, + {Event: framework.ClusterEvent{Resource: framework.PodSchedulingContext, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodSchedulingContextChange}, // A resource might depend on node labels for topology filtering. // A new or updated node may make pods schedulable. {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}}, @@ -264,13 +280,237 @@ func (pl *dynamicResources) EventsToRegister() []framework.ClusterEventWithHint return events } +// PreEnqueue checks if there are known reasons why a pod currently cannot be +// scheduled. When this fails, one of the registered events can trigger another +// attempt. +func (pl *dynamicResources) PreEnqueue(ctx context.Context, pod *v1.Pod) (status *framework.Status) { + if err := pl.foreachPodResourceClaim(pod, nil); err != nil { + return statusUnschedulable(klog.FromContext(ctx), err.Error()) + } + return nil +} + +// isSchedulableAfterClaimChange is invoked for all claim events reported by +// an informer. It checks whether that change made a previously unschedulable +// pod schedulable. It errs on the side of letting a pod scheduling attempt +// happen. +func (pl *dynamicResources) isSchedulableAfterClaimChange(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { + if newObj == nil { + // Deletes don't make a pod schedulable. + return framework.QueueSkip + } + + _, modifiedClaim, err := schedutil.As[*resourcev1alpha2.ResourceClaim](nil, newObj) + if err != nil { + // Shouldn't happen. + pl.logger.Error(err, "unexpected new object in isSchedulableAfterClaimChange") + return framework.QueueAfterBackoff + } + + usesClaim := false + if err := pl.foreachPodResourceClaim(pod, func(_ string, claim *resourcev1alpha2.ResourceClaim) { + if claim.UID == modifiedClaim.UID { + usesClaim = true + } + }); err != nil { + // This is not an unexpected error: we know that + // foreachPodResourceClaim only returns errors for "not + // schedulable". + pl.logger.V(4).Info("pod is not schedulable", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim), "reason", err.Error()) + return framework.QueueSkip + } + + if !usesClaim { + // This was not the claim the pod was waiting for. + pl.logger.V(6).Info("unrelated claim got modified", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim)) + return framework.QueueSkip + } + + if oldObj == nil { + pl.logger.V(4).Info("claim for pod got created", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim)) + return framework.QueueImmediately + } + + // Modifications may or may not be relevant. If the entire + // status is as before, then something else must have changed + // and we don't care. What happens in practice is that the + // resource driver adds the finalizer. + originalClaim, ok := oldObj.(*resourcev1alpha2.ResourceClaim) + if !ok { + // Shouldn't happen. + pl.logger.Error(nil, "unexpected old object in isSchedulableAfterClaimAddOrUpdate", "obj", oldObj) + return framework.QueueAfterBackoff + } + if apiequality.Semantic.DeepEqual(&originalClaim.Status, &modifiedClaim.Status) { + if loggerV := pl.logger.V(7); loggerV.Enabled() { + // Log more information. + loggerV.Info("claim for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim), "diff", cmp.Diff(originalClaim, modifiedClaim)) + } else { + pl.logger.V(6).Info("claim for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim)) + } + return framework.QueueSkip + } + + pl.logger.V(4).Info("status of claim for pod got updated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim)) + return framework.QueueImmediately +} + +// isSchedulableAfterPodSchedulingContextChange is invoked for all +// PodSchedulingContext events reported by an informer. It checks whether that +// change made a previously unschedulable pod schedulable (updated) or a new +// attempt is needed to re-create the object (deleted). It errs on the side of +// letting a pod scheduling attempt happen. +func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { + // Deleted? That can happen because we ourselves delete the PodSchedulingContext while + // working on the pod. This can be ignored. + if oldObj != nil && newObj == nil { + pl.logger.V(4).Info("PodSchedulingContext got deleted") + return framework.QueueSkip + } + + oldPodScheduling, newPodScheduling, err := schedutil.As[*resourcev1alpha2.PodSchedulingContext](oldObj, newObj) + if err != nil { + // Shouldn't happen. + pl.logger.Error(nil, "isSchedulableAfterPodSchedulingChange") + return framework.QueueAfterBackoff + } + podScheduling := newPodScheduling // Never nil because deletes are handled above. + + if podScheduling.Name != pod.Name || podScheduling.Namespace != pod.Namespace { + pl.logger.V(7).Info("PodSchedulingContext for unrelated pod got modified", "pod", klog.KObj(pod), "podScheduling", klog.KObj(podScheduling)) + return framework.QueueSkip + } + + // If the drivers have provided information about all + // unallocated claims with delayed allocation, then the next + // scheduling attempt is able to pick a node, so we let it run + // immediately if this occurred for the first time, otherwise + // we allow backoff. + pendingDelayedClaims := 0 + if err := pl.foreachPodResourceClaim(pod, func(podResourceName string, claim *resourcev1alpha2.ResourceClaim) { + if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer && + claim.Status.Allocation == nil && + !podSchedulingHasClaimInfo(podScheduling, podResourceName) { + pendingDelayedClaims++ + } + }); err != nil { + // This is not an unexpected error: we know that + // foreachPodResourceClaim only returns errors for "not + // schedulable". + pl.logger.V(4).Info("pod is not schedulable, keep waiting", "pod", klog.KObj(pod), "reason", err.Error()) + return framework.QueueSkip + } + + // Some driver responses missing? + if pendingDelayedClaims > 0 { + // We could start a pod scheduling attempt to refresh the + // potential nodes list. But pod scheduling attempts are + // expensive and doing them too often causes the pod to enter + // backoff. Let's wait instead for all drivers to reply. + if loggerV := pl.logger.V(6); loggerV.Enabled() { + loggerV.Info("PodSchedulingContext with missing resource claim information, keep waiting", "pod", klog.KObj(pod), "podSchedulingDiff", cmp.Diff(oldPodScheduling, podScheduling)) + } else { + pl.logger.V(5).Info("PodSchedulingContext with missing resource claim information, keep waiting", "pod", klog.KObj(pod)) + } + return framework.QueueSkip + } + + if oldPodScheduling == nil /* create */ || + len(oldPodScheduling.Status.ResourceClaims) < len(podScheduling.Status.ResourceClaims) /* new information and not incomplete (checked above) */ { + // This definitely is new information for the scheduler. Try again immediately. + pl.logger.V(4).Info("PodSchedulingContext for pod has all required information, schedule immediately", "pod", klog.KObj(pod)) + return framework.QueueImmediately + } + + // The other situation where the scheduler needs to do + // something immediately is when the selected node doesn't + // work: waiting in the backoff queue only helps eventually + // resources on the selected node become available again. It's + // much more likely, in particular when trying to fill up the + // cluster, that the choice simply didn't work out. The risk + // here is that in a situation where the cluster really is + // full, backoff won't be used because the scheduler keeps + // trying different nodes. This should not happen when it has + // full knowledge about resource availability (= + // PodSchedulingContext.*.UnsuitableNodes is complete) but may happen + // when it doesn't (= PodSchedulingContext.*.UnsuitableNodes had to be + // truncated). + // + // Truncation only happens for very large clusters and then may slow + // down scheduling, but should not break it completely. This is + // acceptable while DRA is alpha and will be investigated further + // before moving DRA to beta. + if podScheduling.Spec.SelectedNode != "" { + for _, claimStatus := range podScheduling.Status.ResourceClaims { + if sliceContains(claimStatus.UnsuitableNodes, podScheduling.Spec.SelectedNode) { + pl.logger.V(5).Info("PodSchedulingContext has unsuitable selected node, schedule immediately", "pod", klog.KObj(pod), "selectedNode", podScheduling.Spec.SelectedNode, "podResourceName", claimStatus.Name) + return framework.QueueImmediately + } + } + } + + // Update with only the spec modified? + if oldPodScheduling != nil && + !apiequality.Semantic.DeepEqual(&oldPodScheduling.Spec, &podScheduling.Spec) && + apiequality.Semantic.DeepEqual(&oldPodScheduling.Status, &podScheduling.Status) { + pl.logger.V(5).Info("PodSchedulingContext has only the scheduler spec changes, ignore the update", "pod", klog.KObj(pod)) + return framework.QueueSkip + } + + // Once we get here, all changes which are known to require special responses + // have been checked for. Whatever the change was, we don't know exactly how + // to handle it and thus return QueueAfterBackoff. This will cause the + // scheduler to treat the event as if no event hint callback had been provided. + // Developers who want to investigate this can enable a diff at log level 6. + if loggerV := pl.logger.V(6); loggerV.Enabled() { + loggerV.Info("PodSchedulingContext for pod with unknown changes, maybe schedule", "pod", klog.KObj(pod), "podSchedulingDiff", cmp.Diff(oldPodScheduling, podScheduling)) + } else { + pl.logger.V(5).Info("PodSchedulingContext for pod with unknown changes, maybe schedule", "pod", klog.KObj(pod)) + } + return framework.QueueAfterBackoff + +} + +func podSchedulingHasClaimInfo(podScheduling *resourcev1alpha2.PodSchedulingContext, podResourceName string) bool { + for _, claimStatus := range podScheduling.Status.ResourceClaims { + if claimStatus.Name == podResourceName { + return true + } + } + return false +} + +func sliceContains(hay []string, needle string) bool { + for _, item := range hay { + if item == needle { + return true + } + } + return false +} + // podResourceClaims returns the ResourceClaims for all pod.Spec.PodResourceClaims. func (pl *dynamicResources) podResourceClaims(pod *v1.Pod) ([]*resourcev1alpha2.ResourceClaim, error) { claims := make([]*resourcev1alpha2.ResourceClaim, 0, len(pod.Spec.ResourceClaims)) + if err := pl.foreachPodResourceClaim(pod, func(_ string, claim *resourcev1alpha2.ResourceClaim) { + // We store the pointer as returned by the lister. The + // assumption is that if a claim gets modified while our code + // runs, the cache will store a new pointer, not mutate the + // existing object that we point to here. + claims = append(claims, claim) + }); err != nil { + return nil, err + } + return claims, nil +} + +// foreachPodResourceClaim checks that each ResourceClaim for the pod exists. +// It calls an optional handler for those claims that it finds. +func (pl *dynamicResources) foreachPodResourceClaim(pod *v1.Pod, cb func(podResourceName string, claim *resourcev1alpha2.ResourceClaim)) error { for _, resource := range pod.Spec.ResourceClaims { claimName, mustCheckOwner, err := resourceclaim.Name(pod, &resource) if err != nil { - return nil, err + return err } // The claim name might be nil if no underlying resource claim // was generated for the referenced claim. There are valid use @@ -280,25 +520,23 @@ func (pl *dynamicResources) podResourceClaims(pod *v1.Pod) ([]*resourcev1alpha2. } claim, err := pl.claimLister.ResourceClaims(pod.Namespace).Get(*claimName) if err != nil { - return nil, err + return err } if claim.DeletionTimestamp != nil { - return nil, fmt.Errorf("resourceclaim %q is being deleted", claim.Name) + return fmt.Errorf("resourceclaim %q is being deleted", claim.Name) } if mustCheckOwner { if err := resourceclaim.IsForPod(pod, claim); err != nil { - return nil, err + return err } } - // We store the pointer as returned by the lister. The - // assumption is that if a claim gets modified while our code - // runs, the cache will store a new pointer, not mutate the - // existing object that we point to here. - claims = append(claims, claim) + if cb != nil { + cb(resource.Name, claim) + } } - return claims, nil + return nil } // PreFilter invoked at the prefilter extension point to check if pod has all diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go index 2c863463199..0c6c119c569 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go @@ -23,6 +23,7 @@ import ( "sort" "sync" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" @@ -37,6 +38,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" cgotesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2/ktesting" _ "k8s.io/klog/v2/ktesting/init" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -103,6 +105,7 @@ var ( AllocationMode(resourcev1alpha2.AllocationModeImmediate). Obj() pendingDelayedClaim = st.FromResourceClaim(claim). + OwnerReference(podName, podUID, podKind). AllocationMode(resourcev1alpha2.AllocationModeWaitForFirstConsumer). Obj() pendingDelayedClaim2 = st.FromResourceClaim(pendingDelayedClaim). @@ -117,7 +120,6 @@ var ( ReservedFor(resourcev1alpha2.ResourceClaimConsumerReference{Resource: "pods", Name: podName, UID: types.UID(podUID)}). Obj() allocatedClaim = st.FromResourceClaim(pendingDelayedClaim). - OwnerReference(podName, podUID, podKind). Allocation(&resourcev1alpha2.AllocationResult{}). Obj() allocatedDelayedClaimWithWrongTopology = st.FromResourceClaim(allocatedClaim). @@ -183,6 +185,7 @@ func (p perNodeResult) forNode(nodeName string) result { } type want struct { + preenqueue result preFilterResult *framework.PreFilterResult prefilter result filter perNodeResult @@ -268,11 +271,34 @@ func TestPlugin(t *testing.T) { pod: podWithClaimTemplate, // status not set claims: []*resourcev1alpha2.ResourceClaim{allocatedClaim, otherClaim}, want: want{ - prefilter: result{ + preenqueue: result{ status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `pod "default/my-pod": ResourceClaim not created yet`), }, - postfilter: result{ - status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`), + }, + }, + "deleted-claim": { + pod: podWithClaimTemplateInStatus, + claims: func() []*resourcev1alpha2.ResourceClaim { + claim := allocatedClaim.DeepCopy() + claim.DeletionTimestamp = &metav1.Time{Time: time.Now()} + return []*resourcev1alpha2.ResourceClaim{claim} + }(), + want: want{ + preenqueue: result{ + status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim "my-pod-my-resource" is being deleted`), + }, + }, + }, + "wrong-claim": { + pod: podWithClaimTemplateInStatus, + claims: func() []*resourcev1alpha2.ResourceClaim { + claim := allocatedClaim.DeepCopy() + claim.OwnerReferences[0].UID += "123" + return []*resourcev1alpha2.ResourceClaim{claim} + }(), + want: want{ + preenqueue: result{ + status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `ResourceClaim default/my-pod-my-resource was not created for pod default/my-pod (pod is not owner)`), }, }, }, @@ -506,8 +532,16 @@ func TestPlugin(t *testing.T) { } testCtx := setup(t, nodes, tc.claims, tc.classes, tc.schedulings) testCtx.p.enabled = !tc.disable - initialObjects := testCtx.listAll(t) + + status := testCtx.p.PreEnqueue(testCtx.ctx, tc.pod) + t.Run("PreEnqueue", func(t *testing.T) { + testCtx.verify(t, tc.want.preenqueue, initialObjects, nil, status) + }) + if !status.IsSuccess() { + return + } + result, status := testCtx.p.PreFilter(testCtx.ctx, testCtx.state, tc.pod) t.Run("prefilter", func(t *testing.T) { assert.Equal(t, tc.want.preFilterResult, result) @@ -595,11 +629,12 @@ func TestPlugin(t *testing.T) { } type testContext struct { - ctx context.Context - client *fake.Clientset - p *dynamicResources - nodeInfos []*framework.NodeInfo - state *framework.CycleState + ctx context.Context + client *fake.Clientset + informerFactory informers.SharedInformerFactory + p *dynamicResources + nodeInfos []*framework.NodeInfo + state *framework.CycleState } func (tc *testContext) verify(t *testing.T, expected result, initialObjects []metav1.Object, result interface{}, status *framework.Status) { @@ -727,7 +762,7 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl t.Helper() tc := &testContext{} - _, ctx := ktesting.NewTestContext(t) + logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) t.Cleanup(cancel) tc.ctx = ctx @@ -736,18 +771,18 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl reactor := createReactor(tc.client.Tracker()) tc.client.PrependReactor("*", "*", reactor) - informerFactory := informers.NewSharedInformerFactory(tc.client, 0) + tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0) opts := []runtime.Option{ runtime.WithClientSet(tc.client), - runtime.WithInformerFactory(informerFactory), + runtime.WithInformerFactory(tc.informerFactory), } fh, err := runtime.NewFramework(ctx, nil, nil, opts...) if err != nil { t.Fatal(err) } - pl, err := New(nil, fh, feature.Features{EnableDynamicResourceAllocation: true}) + pl, err := NewWithLogger(logger, nil, fh, feature.Features{EnableDynamicResourceAllocation: true}) if err != nil { t.Fatal(err) } @@ -768,15 +803,15 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl require.NoError(t, err, "create pod scheduling") } - informerFactory.Start(tc.ctx.Done()) + tc.informerFactory.Start(tc.ctx.Done()) t.Cleanup(func() { // Need to cancel before waiting for the shutdown. cancel() // Now we can wait for all goroutines to stop. - informerFactory.Shutdown() + tc.informerFactory.Shutdown() }) - informerFactory.WaitForCacheSync(tc.ctx.Done()) + tc.informerFactory.WaitForCacheSync(tc.ctx.Done()) for _, node := range nodes { nodeInfo := framework.NewNodeInfo() @@ -849,3 +884,215 @@ func createReactor(tracker cgotesting.ObjectTracker) func(action cgotesting.Acti return false, nil, nil } } + +func TestClaimChange(t *testing.T) { + testcases := map[string]struct { + pod *v1.Pod + claims []*resourcev1alpha2.ResourceClaim + oldObj, newObj interface{} + expectedHint framework.QueueingHint + }{ + "skip-deletes": { + pod: podWithClaimTemplate, + oldObj: allocatedClaim, + newObj: nil, + expectedHint: framework.QueueSkip, + }, + "backoff-wrong-new-object": { + pod: podWithClaimTemplate, + newObj: "not-a-claim", + expectedHint: framework.QueueAfterBackoff, + }, + "skip-wrong-claim": { + pod: podWithClaimTemplate, + newObj: func() *resourcev1alpha2.ResourceClaim { + claim := allocatedClaim.DeepCopy() + claim.OwnerReferences[0].UID += "123" + return claim + }(), + expectedHint: framework.QueueSkip, + }, + "skip-unrelated-claim": { + pod: podWithClaimTemplate, + claims: []*resourcev1alpha2.ResourceClaim{allocatedClaim}, + newObj: func() *resourcev1alpha2.ResourceClaim { + claim := allocatedClaim.DeepCopy() + claim.Name += "-foo" + claim.UID += "123" + return claim + }(), + expectedHint: framework.QueueSkip, + }, + "queue-on-add": { + pod: podWithClaimName, + newObj: pendingImmediateClaim, + expectedHint: framework.QueueImmediately, + }, + "backoff-wrong-old-object": { + pod: podWithClaimName, + oldObj: "not-a-claim", + newObj: pendingImmediateClaim, + expectedHint: framework.QueueAfterBackoff, + }, + "skip-adding-finalizer": { + pod: podWithClaimName, + claims: []*resourcev1alpha2.ResourceClaim{pendingImmediateClaim}, + oldObj: pendingImmediateClaim, + newObj: func() *resourcev1alpha2.ResourceClaim { + claim := pendingImmediateClaim.DeepCopy() + claim.Finalizers = append(claim.Finalizers, "foo") + return claim + }(), + expectedHint: framework.QueueSkip, + }, + "queue-on-status-change": { + pod: podWithClaimName, + claims: []*resourcev1alpha2.ResourceClaim{pendingImmediateClaim}, + oldObj: pendingImmediateClaim, + newObj: func() *resourcev1alpha2.ResourceClaim { + claim := pendingImmediateClaim.DeepCopy() + claim.Status.Allocation = &resourcev1alpha2.AllocationResult{} + return claim + }(), + expectedHint: framework.QueueImmediately, + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + testCtx := setup(t, nil, tc.claims, nil, nil) + if claim, ok := tc.newObj.(*resourcev1alpha2.ResourceClaim); ok { + // Update the informer because the lister gets called and must have the claim. + store := testCtx.informerFactory.Resource().V1alpha2().ResourceClaims().Informer().GetStore() + if tc.oldObj == nil { + require.NoError(t, store.Add(claim)) + } else { + require.NoError(t, store.Update(claim)) + } + } + actualHint := testCtx.p.isSchedulableAfterClaimChange(tc.pod, tc.oldObj, tc.newObj) + require.Equal(t, tc.expectedHint, actualHint) + }) + } +} + +func TestPodSchedulingContextChange(t *testing.T) { + testcases := map[string]struct { + pod *v1.Pod + schedulings []*resourcev1alpha2.PodSchedulingContext + claims []*resourcev1alpha2.ResourceClaim + oldObj, newObj interface{} + expectedHint framework.QueueingHint + }{ + "skip-deleted": { + pod: podWithClaimTemplate, + oldObj: scheduling, + expectedHint: framework.QueueSkip, + }, + "skip-missed-deleted": { + pod: podWithClaimTemplate, + oldObj: cache.DeletedFinalStateUnknown{ + Obj: scheduling, + }, + expectedHint: framework.QueueSkip, + }, + "backoff-wrong-old-object": { + pod: podWithClaimTemplate, + oldObj: "not-a-scheduling-context", + newObj: scheduling, + expectedHint: framework.QueueAfterBackoff, + }, + "backoff-missed-wrong-old-object": { + pod: podWithClaimTemplate, + oldObj: cache.DeletedFinalStateUnknown{ + Obj: "not-a-scheduling-context", + }, + newObj: scheduling, + expectedHint: framework.QueueAfterBackoff, + }, + "skip-unrelated-object": { + pod: podWithClaimTemplate, + claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim}, + newObj: func() *resourcev1alpha2.PodSchedulingContext { + scheduling := scheduling.DeepCopy() + scheduling.Name += "-foo" + return scheduling + }(), + expectedHint: framework.QueueSkip, + }, + "backoff-wrong-new-object": { + pod: podWithClaimTemplate, + oldObj: scheduling, + newObj: "not-a-scheduling-context", + expectedHint: framework.QueueAfterBackoff, + }, + "skip-missing-claim": { + pod: podWithClaimTemplate, + oldObj: scheduling, + newObj: schedulingInfo, + expectedHint: framework.QueueSkip, + }, + "skip-missing-infos": { + pod: podWithClaimTemplateInStatus, + claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim}, + oldObj: scheduling, + newObj: scheduling, + expectedHint: framework.QueueSkip, + }, + "queue-new-infos": { + pod: podWithClaimTemplateInStatus, + claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim}, + oldObj: scheduling, + newObj: schedulingInfo, + expectedHint: framework.QueueImmediately, + }, + "queue-bad-selected-node": { + pod: podWithClaimTemplateInStatus, + claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim}, + oldObj: func() *resourcev1alpha2.PodSchedulingContext { + scheduling := schedulingInfo.DeepCopy() + scheduling.Spec.SelectedNode = workerNode.Name + return scheduling + }(), + newObj: func() *resourcev1alpha2.PodSchedulingContext { + scheduling := schedulingInfo.DeepCopy() + scheduling.Spec.SelectedNode = workerNode.Name + scheduling.Status.ResourceClaims[0].UnsuitableNodes = append(scheduling.Status.ResourceClaims[0].UnsuitableNodes, scheduling.Spec.SelectedNode) + return scheduling + }(), + expectedHint: framework.QueueImmediately, + }, + "skip-spec-changes": { + pod: podWithClaimTemplateInStatus, + claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim}, + oldObj: schedulingInfo, + newObj: func() *resourcev1alpha2.PodSchedulingContext { + scheduling := schedulingInfo.DeepCopy() + scheduling.Spec.SelectedNode = workerNode.Name + return scheduling + }(), + expectedHint: framework.QueueSkip, + }, + "backoff-other-changes": { + pod: podWithClaimTemplateInStatus, + claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim}, + oldObj: schedulingInfo, + newObj: func() *resourcev1alpha2.PodSchedulingContext { + scheduling := schedulingInfo.DeepCopy() + scheduling.Finalizers = append(scheduling.Finalizers, "foo") + return scheduling + }(), + expectedHint: framework.QueueAfterBackoff, + }, + } + + for name, tc := range testcases { + tc := tc + t.Run(name, func(t *testing.T) { + t.Parallel() + testCtx := setup(t, nil, tc.claims, nil, tc.schedulings) + actualHint := testCtx.p.isSchedulableAfterPodSchedulingContextChange(tc.pod, tc.oldObj, tc.newObj) + require.Equal(t, tc.expectedHint, actualHint) + }) + } +}