diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 555f60a3f42..20b3322cb23 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -32,6 +32,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" resourcev1alpha2apply "k8s.io/client-go/applyconfigurations/resource/v1alpha2" "k8s.io/client-go/kubernetes" @@ -102,25 +103,6 @@ func (d *stateData) Clone() framework.StateData { return d } -func (d *stateData) updateClaimStatus(ctx context.Context, clientset kubernetes.Interface, index int, claim *resourcev1alpha2.ResourceClaim) error { - // TODO (#113700): replace with patch operation. Beware that patching must only succeed if the - // object has not been modified in parallel by someone else. - claim, err := clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}) - // TODO: metric for update results, with the operation ("set selected - // node", "set PotentialNodes", etc.) as one dimension. - if err != nil { - return fmt.Errorf("update resource claim: %w", err) - } - - // Remember the new instance. This is relevant when the plugin must - // update the same claim multiple times (for example, first reserve - // the claim, then later remove the reservation), because otherwise the second - // update would fail with a "was modified" error. - d.claims[index] = claim - - return nil -} - type podSchedulingState struct { // A pointer to the PodSchedulingContext object for the pod, if one exists // in the API server. @@ -300,6 +282,7 @@ var _ framework.PostFilterPlugin = &dynamicResources{} var _ framework.PreScorePlugin = &dynamicResources{} var _ framework.ReservePlugin = &dynamicResources{} var _ framework.EnqueueExtensions = &dynamicResources{} +var _ framework.PreBindPlugin = &dynamicResources{} var _ framework.PostBindPlugin = &dynamicResources{} // Name returns name of the plugin. It is used in logs, etc. @@ -803,7 +786,7 @@ func (pl *dynamicResources) PostFilter(ctx context.Context, cs *framework.CycleS claim.Status.DeallocationRequested = true claim.Status.ReservedFor = nil logger.V(5).Info("Requesting deallocation of ResourceClaim", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim)) - if err := state.updateClaimStatus(ctx, pl.clientset, index, claim); err != nil { + if _, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}); err != nil { return nil, statusError(logger, err) } return nil, nil @@ -923,36 +906,21 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat logger := klog.FromContext(ctx) for index, claim := range state.claims { if claim.Status.Allocation != nil { - // Allocated, but perhaps not reserved yet. - if resourceclaim.IsReservedForPod(pod, claim) { - logger.V(5).Info("is reserved", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim)) - continue - } - claim := claim.DeepCopy() - claim.Status.ReservedFor = append(claim.Status.ReservedFor, - resourcev1alpha2.ResourceClaimConsumerReference{ - Resource: "pods", - Name: pod.Name, - UID: pod.UID, - }) - logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim)) - _, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}) - // TODO: metric for update errors. - if err != nil { - return statusError(logger, err) - } - // If we get here, we know that reserving the claim for - // the pod worked and we can proceed with schedulingCtx - // it. - } else { - // Must be delayed allocation. - numDelayedAllocationPending++ + // Allocated, but perhaps not reserved yet. We checked in PreFilter that + // the pod could reserve the claim. Instead of reserving here by + // updating the ResourceClaim status, we assume that reserving + // will work and only do it for real during binding. If it fails at + // that time, some other pod was faster and we have to try again. + continue + } - // Did the driver provide information that steered node - // selection towards a node that it can support? - if statusForClaim(state.podSchedulingState.schedulingCtx, pod.Spec.ResourceClaims[index].Name) != nil { - numClaimsWithStatusInfo++ - } + // Must be delayed allocation. + numDelayedAllocationPending++ + + // Did the driver provide information that steered node + // selection towards a node that it can support? + if statusForClaim(state.podSchedulingState.schedulingCtx, pod.Spec.ResourceClaims[index].Name) != nil { + numClaimsWithStatusInfo++ } } @@ -991,16 +959,15 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat state.podSchedulingState.schedulingCtx.Spec.SelectedNode != nodeName { state.podSchedulingState.selectedNode = &nodeName logger.V(5).Info("start allocation", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}) - if err := state.podSchedulingState.publish(ctx, pod, pl.clientset); err != nil { - return statusError(logger, err) - } - return statusPending(logger, "waiting for resource driver to allocate resource", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}) + // The actual publish happens in PreBind or Unreserve. + return nil } } // May have been modified earlier in PreScore or above. - if err := state.podSchedulingState.publish(ctx, pod, pl.clientset); err != nil { - return statusError(logger, err) + if state.podSchedulingState.isDirty() { + // The actual publish happens in PreBind or Unreserve. + return nil } // More than one pending claim and not enough information about all of them. @@ -1037,29 +1004,95 @@ func (pl *dynamicResources) Unreserve(ctx context.Context, cs *framework.CycleSt } logger := klog.FromContext(ctx) - for index, claim := range state.claims { + + // Was publishing delayed? If yes, do it now. + // + // The most common scenario is that a different set of potential nodes + // was identified. This revised set needs to be published to enable DRA + // drivers to provide better guidance for future scheduling attempts. + if state.podSchedulingState.isDirty() { + if err := state.podSchedulingState.publish(ctx, pod, pl.clientset); err != nil { + logger.Error(err, "publish PodSchedulingContext") + } + } + + for _, claim := range state.claims { if claim.Status.Allocation != nil && resourceclaim.IsReservedForPod(pod, claim) { - // Remove pod from ReservedFor. - claim := claim.DeepCopy() - reservedFor := make([]resourcev1alpha2.ResourceClaimConsumerReference, 0, len(claim.Status.ReservedFor)-1) - for _, reserved := range claim.Status.ReservedFor { - // TODO: can UID be assumed to be unique all resources or do we also need to compare Group/Version/Resource? - if reserved.UID != pod.UID { - reservedFor = append(reservedFor, reserved) - } - } - claim.Status.ReservedFor = reservedFor - logger.V(5).Info("unreserve", "resourceclaim", klog.KObj(claim)) - if err := state.updateClaimStatus(ctx, pl.clientset, index, claim); err != nil { - // We will get here again when pod schedulingCtx - // is retried. + // Remove pod from ReservedFor. A strategic-merge-patch is used + // because that allows removing an individual entry without having + // the latest slice. + patch := fmt.Sprintf(`{"metadata": {"uid": %q}, "status": { "reservedFor": [ {"$patch": "delete", "uid": %q} ] }}`, + claim.UID, + pod.UID, + ) + logger.V(5).Info("unreserve", "resourceclaim", klog.KObj(claim), "pod", klog.KObj(pod)) + claim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Patch(ctx, claim.Name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}, "status") + if err != nil { + // We will get here again when pod scheduling is retried. logger.Error(err, "unreserve", "resourceclaim", klog.KObj(claim)) } } } } +// PreBind gets called in a separate goroutine after it has been determined +// that the pod should get bound to this node. Because Reserve did not actually +// reserve claims, we need to do it now. If that fails, we return an error and +// the pod will have to go into the backoff queue. The scheduler will call +// Unreserve as part of the error handling. +func (pl *dynamicResources) PreBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { + if !pl.enabled { + return nil + } + state, err := getStateData(cs) + if err != nil { + return statusError(klog.FromContext(ctx), err) + } + if len(state.claims) == 0 { + return nil + } + + logger := klog.FromContext(ctx) + + // Was publishing delayed? If yes, do it now and then cause binding to stop. + if state.podSchedulingState.isDirty() { + if err := state.podSchedulingState.publish(ctx, pod, pl.clientset); err != nil { + return statusError(logger, err) + } + return statusPending(logger, "waiting for resource driver", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}) + } + + for index, claim := range state.claims { + if !resourceclaim.IsReservedForPod(pod, claim) { + // The claim might be stale, for example because the claim can get shared and some + // other goroutine has updated it in the meantime. We therefore cannot use + // SSA here to add the pod because then we would have to send the entire slice + // or use different field manager strings for each entry. + // + // With a strategic-merge-patch, we can simply send one new entry. The apiserver + // validation will catch if two goroutines try to do that at the same time and + // the claim cannot be shared. + patch := fmt.Sprintf(`{"metadata": {"uid": %q}, "status": { "reservedFor": [ {"resource": "pods", "name": %q, "uid": %q} ] }}`, + claim.UID, + pod.Name, + pod.UID, + ) + logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim)) + claim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Patch(ctx, claim.Name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}, "status") + logger.V(5).Info("reserved", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.Format(claim)) + // TODO: metric for update errors. + if err != nil { + return statusError(logger, err) + } + state.claims[index] = claim + } + } + // If we get here, we know that reserving the claim for + // the pod worked and we can proceed with binding it. + return nil +} + // PostBind is called after a pod is successfully bound to a node. Now we are // sure that a PodSchedulingContext object, if it exists, is definitely not going to // be needed anymore and can delete it. This is a one-shot thing, there won't diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go index faf866e79b7..9ca2b407fde 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go @@ -192,9 +192,14 @@ type want struct { prescore result reserve result unreserve result + prebind result postbind result postFilterResult *framework.PostFilterResult postfilter result + + // unreserveAfterBindFailure, if set, triggers a call to Unreserve + // after PreBind, as if the actual Bind had failed. + unreserveAfterBindFailure *result } // prepare contains changes for objects in the API server. @@ -206,6 +211,7 @@ type prepare struct { prescore change reserve change unreserve change + prebind change postbind change postfilter change } @@ -237,7 +243,7 @@ func TestPlugin(t *testing.T) { pod: podWithClaimName, claims: []*resourcev1alpha2.ResourceClaim{allocatedClaim, otherClaim}, want: want{ - reserve: result{ + prebind: result{ changes: change{ claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim { if claim.Name == claimName { @@ -254,7 +260,7 @@ func TestPlugin(t *testing.T) { pod: podWithClaimTemplateInStatus, claims: []*resourcev1alpha2.ResourceClaim{allocatedClaim, otherClaim}, want: want{ - reserve: result{ + prebind: result{ changes: change{ claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim { if claim.Name == claimName { @@ -345,8 +351,8 @@ func TestPlugin(t *testing.T) { claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim}, classes: []*resourcev1alpha2.ResourceClass{resourceClass}, want: want{ - reserve: result{ - status: framework.NewStatus(framework.Pending, `waiting for resource driver to allocate resource`), + prebind: result{ + status: framework.NewStatus(framework.Pending, `waiting for resource driver`), added: []metav1.Object{schedulingSelectedPotential}, }, }, @@ -359,8 +365,8 @@ func TestPlugin(t *testing.T) { claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim, pendingDelayedClaim2}, classes: []*resourcev1alpha2.ResourceClass{resourceClass}, want: want{ - reserve: result{ - status: framework.NewStatus(framework.Pending, `waiting for resource driver to provide information`), + prebind: result{ + status: framework.NewStatus(framework.Pending, `waiting for resource driver`), added: []metav1.Object{schedulingPotential}, }, }, @@ -373,8 +379,8 @@ func TestPlugin(t *testing.T) { schedulings: []*resourcev1alpha2.PodSchedulingContext{schedulingInfo}, classes: []*resourcev1alpha2.ResourceClass{resourceClass}, want: want{ - reserve: result{ - status: framework.NewStatus(framework.Pending, `waiting for resource driver to allocate resource`), + prebind: result{ + status: framework.NewStatus(framework.Pending, `waiting for resource driver`), changes: change{ scheduling: func(in *resourcev1alpha2.PodSchedulingContext) *resourcev1alpha2.PodSchedulingContext { return st.FromPodSchedulingContexts(in). @@ -393,7 +399,7 @@ func TestPlugin(t *testing.T) { schedulings: []*resourcev1alpha2.PodSchedulingContext{schedulingInfo}, classes: []*resourcev1alpha2.ResourceClass{resourceClass}, prepare: prepare{ - reserve: change{ + prebind: change{ scheduling: func(in *resourcev1alpha2.PodSchedulingContext) *resourcev1alpha2.PodSchedulingContext { // This does not actually conflict with setting the // selected node, but because the plugin is not using @@ -405,7 +411,7 @@ func TestPlugin(t *testing.T) { }, }, want: want{ - reserve: result{ + prebind: result{ status: framework.AsStatus(errors.New(`ResourceVersion must match the object that gets updated`)), }, }, @@ -417,7 +423,7 @@ func TestPlugin(t *testing.T) { schedulings: []*resourcev1alpha2.PodSchedulingContext{schedulingInfo}, classes: []*resourcev1alpha2.ResourceClass{resourceClass}, want: want{ - reserve: result{ + prebind: result{ changes: change{ claim: func(in *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim { return st.FromResourceClaim(in). @@ -492,7 +498,7 @@ func TestPlugin(t *testing.T) { pod: podWithClaimName, claims: []*resourcev1alpha2.ResourceClaim{allocatedClaimWithGoodTopology}, want: want{ - reserve: result{ + prebind: result{ changes: change{ claim: func(in *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim { return st.FromResourceClaim(in). @@ -503,6 +509,30 @@ func TestPlugin(t *testing.T) { }, }, }, + "bind-failure": { + pod: podWithClaimName, + claims: []*resourcev1alpha2.ResourceClaim{allocatedClaimWithGoodTopology}, + want: want{ + prebind: result{ + changes: change{ + claim: func(in *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim { + return st.FromResourceClaim(in). + ReservedFor(resourcev1alpha2.ResourceClaimConsumerReference{Resource: "pods", Name: podName, UID: types.UID(podUID)}). + Obj() + }, + }, + }, + unreserveAfterBindFailure: &result{ + changes: change{ + claim: func(in *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim { + out := in.DeepCopy() + out.Status.ReservedFor = []resourcev1alpha2.ResourceClaimConsumerReference{} + return out + }, + }, + }, + }, + }, "reserved-okay": { pod: podWithClaimName, claims: []*resourcev1alpha2.ResourceClaim{inUseClaim}, @@ -607,11 +637,26 @@ func TestPlugin(t *testing.T) { }) } else { initialObjects = testCtx.listAll(t) - initialObjects = testCtx.updateAPIServer(t, initialObjects, tc.prepare.postbind) - testCtx.p.PostBind(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Node().Name) - t.Run("postbind", func(t *testing.T) { - testCtx.verify(t, tc.want.postbind, initialObjects, nil, status) + initialObjects = testCtx.updateAPIServer(t, initialObjects, tc.prepare.prebind) + status := testCtx.p.PreBind(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Node().Name) + t.Run("prebind", func(t *testing.T) { + testCtx.verify(t, tc.want.prebind, initialObjects, nil, status) }) + + if tc.want.unreserveAfterBindFailure != nil { + initialObjects = testCtx.listAll(t) + testCtx.p.Unreserve(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Node().Name) + t.Run("unreserverAfterBindFailure", func(t *testing.T) { + testCtx.verify(t, *tc.want.unreserveAfterBindFailure, initialObjects, nil, status) + }) + } else if status.IsSuccess() { + initialObjects = testCtx.listAll(t) + initialObjects = testCtx.updateAPIServer(t, initialObjects, tc.prepare.postbind) + testCtx.p.PostBind(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Node().Name) + t.Run("postbind", func(t *testing.T) { + testCtx.verify(t, tc.want.postbind, initialObjects, nil, nil) + }) + } } } else { initialObjects = testCtx.listAll(t) diff --git a/test/e2e/dra/dra.go b/test/e2e/dra/dra.go index 7aa5cf2f53b..922339255f1 100644 --- a/test/e2e/dra/dra.go +++ b/test/e2e/dra/dra.go @@ -25,6 +25,7 @@ import ( "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + "github.com/onsi/gomega/gcustom" v1 "k8s.io/api/core/v1" resourcev1alpha2 "k8s.io/api/resource/v1alpha2" @@ -309,6 +310,108 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, ginkgo.Context("cluster", func() { nodes := NewNodes(f, 1, 4) + ginkgo.Context("with local unshared resources", func() { + driver := NewDriver(f, nodes, func() app.Resources { + return app.Resources{ + NodeLocal: true, + MaxAllocations: 10, + Nodes: nodes.NodeNames, + } + }) + b := newBuilder(f, driver) + + // This test covers some special code paths in the scheduler: + // - Patching the ReservedFor during PreBind because in contrast + // to claims specifically allocated for a pod, here the claim + // gets allocated without reserving it. + // - Error handling when PreBind fails: multiple attempts to bind pods + // are started concurrently, only one attempt succeeds. + // - Removing a ReservedFor entry because the first inline claim gets + // reserved during allocation. + ginkgo.It("reuses an allocated immediate claim", func(ctx context.Context) { + objects := []klog.KMetadata{ + b.parameters(), + b.externalClaim(resourcev1alpha2.AllocationModeImmediate), + } + podExternal := b.podExternal() + + // Create many pods to increase the chance that the scheduler will + // try to bind two pods at the same time. + numPods := 5 + for i := 0; i < numPods; i++ { + podInline, claimTemplate := b.podInline(resourcev1alpha2.AllocationModeWaitForFirstConsumer) + podInline.Spec.Containers[0].Resources.Claims = append(podInline.Spec.Containers[0].Resources.Claims, podExternal.Spec.Containers[0].Resources.Claims[0]) + podInline.Spec.ResourceClaims = append(podInline.Spec.ResourceClaims, podExternal.Spec.ResourceClaims[0]) + objects = append(objects, claimTemplate, podInline) + } + b.create(ctx, objects...) + + var runningPod *v1.Pod + haveRunningPod := gcustom.MakeMatcher(func(pods []v1.Pod) (bool, error) { + numRunning := 0 + runningPod = nil + for _, pod := range pods { + if pod.Status.Phase == v1.PodRunning { + pod := pod // Don't keep pointer to loop variable... + runningPod = &pod + numRunning++ + } + } + return numRunning == 1, nil + }).WithTemplate("Expected one running Pod.\nGot instead:\n{{.FormattedActual}}") + + for i := 0; i < numPods; i++ { + ginkgo.By("waiting for exactly one pod to start") + runningPod = nil + gomega.Eventually(ctx, b.listTestPods).WithTimeout(f.Timeouts.PodStartSlow).Should(haveRunningPod) + + ginkgo.By("checking that no other pod gets scheduled") + havePendingPods := gcustom.MakeMatcher(func(pods []v1.Pod) (bool, error) { + numPending := 0 + for _, pod := range pods { + if pod.Status.Phase == v1.PodPending { + numPending++ + } + } + return numPending == numPods-1-i, nil + }).WithTemplate("Expected only one running Pod.\nGot instead:\n{{.FormattedActual}}") + gomega.Consistently(ctx, b.listTestPods).WithTimeout(time.Second).Should(havePendingPods) + + ginkgo.By(fmt.Sprintf("deleting pod %s", klog.KObj(runningPod))) + framework.ExpectNoError(b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Delete(ctx, runningPod.Name, metav1.DeleteOptions{})) + + ginkgo.By(fmt.Sprintf("waiting for pod %s to disappear", klog.KObj(runningPod))) + framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, b.f.ClientSet, runningPod.Name, runningPod.Namespace, f.Timeouts.PodDelete)) + } + }) + }) + + ginkgo.Context("with shared network resources", func() { + driver := NewDriver(f, nodes, networkResources) + b := newBuilder(f, driver) + + // This test complements "reuses an allocated immediate claim" above: + // because the claim can be shared, each PreBind attempt succeeds. + ginkgo.It("shares an allocated immediate claim", func(ctx context.Context) { + objects := []klog.KMetadata{ + b.parameters(), + b.externalClaim(resourcev1alpha2.AllocationModeImmediate), + } + // Create many pods to increase the chance that the scheduler will + // try to bind two pods at the same time. + numPods := 5 + pods := make([]*v1.Pod, numPods) + for i := 0; i < numPods; i++ { + pods[i] = b.podExternal() + objects = append(objects, pods[i]) + } + b.create(ctx, objects...) + + ginkgo.By("waiting all pods to start") + framework.ExpectNoError(e2epod.WaitForPodsRunning(ctx, b.f.ClientSet, f.Namespace.Name, numPods+1 /* driver */, f.Timeouts.PodStartSlow)) + }) + }) + // claimTests tries out several different combinations of pods with // claims, both inline and external. claimTests := func(b *builder, driver *Driver, allocationMode resourcev1alpha2.AllocationMode) {