diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 5bc8476a9be..90039854a14 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -18,12 +18,10 @@ package dynamicresources import ( "context" - "encoding/json" "errors" "fmt" "slices" "sort" - "strings" "sync" "github.com/google/go-cmp/cmp" @@ -41,6 +39,7 @@ import ( "k8s.io/client-go/kubernetes" resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/retry" "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" "k8s.io/dynamic-resource-allocation/resourceclaim" "k8s.io/klog/v2" @@ -1461,7 +1460,9 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat // The allocation would be enough. The full object is useful for // debugging and testing, so let's make it realistic. claim = claim.DeepCopy() - claim.Finalizers = append(claim.Finalizers, resourcev1alpha2.Finalizer) + if !slices.Contains(claim.Finalizers, resourcev1alpha2.Finalizer) { + claim.Finalizers = append(claim.Finalizers, resourcev1alpha2.Finalizer) + } claim.Status.DriverName = driverName claim.Status.Allocation = allocation pl.inFlightAllocations.Store(claim.UID, claim) @@ -1620,105 +1621,88 @@ func (pl *dynamicResources) PreBind(ctx context.Context, cs *framework.CycleStat // and reservation are recorded. This finishes the work started in Reserve. func (pl *dynamicResources) bindClaim(ctx context.Context, state *stateData, index int, pod *v1.Pod, nodeName string) (patchedClaim *resourcev1alpha2.ResourceClaim, finalErr error) { logger := klog.FromContext(ctx) - claim := state.claims[index] - driverName := "" - allocationObject := "" - + claim := state.claims[index].DeepCopy() allocation := state.informationsForClaim[index].allocation - logger.V(5).Info("preparing claim status patch", "claim", klog.KObj(state.claims[index]), "allocation", klog.Format(allocation)) + defer func() { + if allocation != nil { + // The scheduler was handling allocation. Now that has + // completed, either successfully or with a failure. + if finalErr == nil { + // This can fail, but only for reasons that are okay (concurrent delete or update). + // Shouldn't happen in this case. + if err := pl.claimAssumeCache.Assume(claim); err != nil { + logger.V(5).Info("Claim not stored in assume cache", "err", finalErr) + } + } + pl.inFlightAllocations.Delete(claim.UID) + } + }() - // Do we need to store an allocation result from Reserve? - if allocation != nil { - buffer, err := json.Marshal(allocation) + logger.V(5).Info("preparing claim status update", "claim", klog.KObj(state.claims[index]), "allocation", klog.Format(allocation)) + + // We may run into a ResourceVersion conflict because there may be some + // benign concurrent changes. In that case we get the latest claim and + // try again. + refreshClaim := false + retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + if refreshClaim { + updatedClaim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Get(ctx, claim.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("get updated claim %s after conflict: %w", klog.KObj(claim), err) + } + logger.V(5).Info("retrying update after conflict", "claim", klog.KObj(claim)) + claim = updatedClaim + } else { + // All future retries must get a new claim first. + refreshClaim = true + } + + if claim.DeletionTimestamp != nil { + return fmt.Errorf("claim %s got deleted in the meantime", klog.KObj(claim)) + } + + // Do we need to store an allocation result from Reserve? + if allocation != nil { + if claim.Status.Allocation != nil { + return fmt.Errorf("claim %s got allocated elsewhere in the meantime", klog.KObj(claim)) + } + + // The finalizer needs to be added in a normal update. + // If we were interrupted in the past, it might already be set and we simply continue. + if !slices.Contains(claim.Finalizers, resourcev1alpha2.Finalizer) { + claim.Finalizers = append(claim.Finalizers, resourcev1alpha2.Finalizer) + updatedClaim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("add finalizer to claim %s: %w", klog.KObj(claim), err) + } + claim = updatedClaim + } + + claim.Status.DriverName = state.informationsForClaim[index].allocationDriverName + claim.Status.Allocation = allocation + } + + // We can simply try to add the pod here without checking + // preconditions. The apiserver will tell us with a + // non-conflict error if this isn't possible. + claim.Status.ReservedFor = append(claim.Status.ReservedFor, resourcev1alpha2.ResourceClaimConsumerReference{Resource: "pods", Name: pod.Name, UID: pod.UID}) + updatedClaim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}) if err != nil { - return nil, fmt.Errorf("marshaling AllocationResult failed: %v", err) - } - driverName = state.informationsForClaim[index].allocationDriverName - allocationObject = string(buffer) - - // The finalizer needs to be added in a normal update. Using a simple update is fine - // because we don't expect concurrent modifications while the claim is not allocated - // yet. If there are any, we want to fail. - // - // If we were interrupted in the past, it might already be set and we simply continue. - if !slices.Contains(claim.Finalizers, resourcev1alpha2.Finalizer) { - claim := state.claims[index].DeepCopy() - claim.Finalizers = append(claim.Finalizers, resourcev1alpha2.Finalizer) - if _, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}); err != nil { - return nil, fmt.Errorf("add finalizer: %v", err) + if allocation != nil { + return fmt.Errorf("add allocation and reservation to claim %s: %w", klog.KObj(claim), err) } + return fmt.Errorf("add reservation to claim %s: %w", klog.KObj(claim), err) } + claim = updatedClaim + return nil + }) + + if retryErr != nil { + return nil, retryErr } - // The claim might be stale, for example because the claim can get shared and some - // other goroutine has updated it in the meantime. We therefore cannot use - // SSA here to add the pod because then we would have to send the entire slice - // or use different field manager strings for each entry. - // - // With a strategic-merge-patch, we can simply send one new entry. The apiserver - // validation will catch if two goroutines try to do that at the same time and - // the claim cannot be shared. - // - // Note that this also works when the allocation result gets added twice because - // two pods both started using a shared claim: the first pod to get here adds the - // allocation result. The second pod then only adds itself to reservedFor. - // - // Because we allow concurrent updates, we don't check ResourceVersion - // in a precondition. What we *do* need to check is that the finalizer - // is still present. This implies using a JSON patch instead of a - // simpler strategic-merge-patch. - parts := make([]string, 0, 7) - parts = append(parts, - fmt.Sprintf(`{ "op": "test", "path": "/metadata/uid", "value": %q }`, claim.UID), - ) - // Strictly speaking, we only need to check for existence of our finalizer. - // JSON patch does not support that, so we check for the stronger "array is equal". - if len(claim.Finalizers) > 0 { - parts = append(parts, - fmt.Sprintf(`{ "op": "test", "path": "/metadata/finalizers", "value": %q }`, claim.Finalizers), - ) - } - // JSON patch can only append to a non-empty array. An empty reservedFor gets - // omitted and even if it didn't, it would be null and not an empty array. - // Therefore we have to test and add if it's currently empty. - reservedForEntry := fmt.Sprintf(`{"resource": "pods", "name": %q, "uid": %q}`, pod.Name, pod.UID) - if len(claim.Status.ReservedFor) == 0 { - parts = append(parts, - fmt.Sprintf(`{ "op": "test", "path": "/status/reservedFor", "value": null }`), - fmt.Sprintf(`{ "op": "add", "path": "/status/reservedFor", "value": [ %s ] }`, reservedForEntry), - ) - } else { - parts = append(parts, - fmt.Sprintf(`{ "op": "add", "path": "/status/reservedFor/-", "value": %s }`, reservedForEntry), - ) - } - if allocationObject != "" { - parts = append(parts, - fmt.Sprintf(`{ "op": "add", "path": "/status/driverName", "value": %q }`, driverName), - fmt.Sprintf(`{ "op": "add", "path": "/status/allocation", "value": %s }`, allocationObject), - ) - } - patch := "[\n" + strings.Join(parts, ",\n") + "\n]" - if loggerV := logger.V(6); loggerV.Enabled() { - logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim), "patch", patch) - } else { - logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim)) - } - claim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Patch(ctx, claim.Name, types.JSONPatchType, []byte(patch), metav1.PatchOptions{}, "status") - logger.V(5).Info("reserved", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.Format(claim), "err", err) - if allocationObject != "" { - // The scheduler was handling allocation. Now that has - // completed, either successfully or with a failure. - if err == nil { - // This can fail, but only for reasons that are okay (concurrent delete or update). - // Shouldn't happen in this case. - if err := pl.claimAssumeCache.Assume(claim); err != nil { - logger.V(5).Info("Claim not stored in assume cache", "err", err) - } - } - pl.inFlightAllocations.Delete(claim.UID) - } - return claim, err + logger.V(5).Info("reserved", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.Format(claim)) + return claim, nil } // PostBind is called after a pod is successfully bound to a node. Now we are diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go index 8b93b7d63cb..2468a2ea35a 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go @@ -580,6 +580,115 @@ func TestPlugin(t *testing.T) { }, }, }, + "delayed-allocation-structured-with-resources-has-finalizer": { + // As before. but the finalizer is already set. Could happen if + // the scheduler got interrupted. + pod: podWithClaimName, + claims: func() []*resourcev1alpha2.ResourceClaim { + claim := pendingDelayedClaim.DeepCopy() + claim.Finalizers = structuredAllocatedClaim.Finalizers + return []*resourcev1alpha2.ResourceClaim{claim} + }(), + classes: []*resourcev1alpha2.ResourceClass{structuredResourceClass}, + objs: []apiruntime.Object{workerNodeSlice}, + want: want{ + reserve: result{ + inFlightClaim: structuredAllocatedClaim, + }, + prebind: result{ + assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName), + changes: change{ + claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim { + if claim.Name == claimName { + claim = claim.DeepCopy() + claim.Status = structuredInUseClaim.Status + } + return claim + }, + }, + }, + postbind: result{ + assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName), + }, + }, + }, + "delayed-allocation-structured-with-resources-finalizer-gets-removed": { + // As before. but the finalizer is already set. Then it gets + // removed before the scheduler reaches PreBind. + pod: podWithClaimName, + claims: func() []*resourcev1alpha2.ResourceClaim { + claim := pendingDelayedClaim.DeepCopy() + claim.Finalizers = structuredAllocatedClaim.Finalizers + return []*resourcev1alpha2.ResourceClaim{claim} + }(), + classes: []*resourcev1alpha2.ResourceClass{structuredResourceClass}, + objs: []apiruntime.Object{workerNodeSlice}, + prepare: prepare{ + prebind: change{ + claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim { + claim.Finalizers = nil + return claim + }, + }, + }, + want: want{ + reserve: result{ + inFlightClaim: structuredAllocatedClaim, + }, + prebind: result{ + assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName), + changes: change{ + claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim { + if claim.Name == claimName { + claim = claim.DeepCopy() + claim.Finalizers = structuredAllocatedClaim.Finalizers + claim.Status = structuredInUseClaim.Status + } + return claim + }, + }, + }, + postbind: result{ + assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName), + }, + }, + }, + "delayed-allocation-structured-with-resources-finalizer-gets-added": { + // No finalizer initially, then it gets added before + // the scheduler reaches PreBind. Shouldn't happen? + pod: podWithClaimName, + claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim}, + classes: []*resourcev1alpha2.ResourceClass{structuredResourceClass}, + objs: []apiruntime.Object{workerNodeSlice}, + prepare: prepare{ + prebind: change{ + claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim { + claim.Finalizers = structuredAllocatedClaim.Finalizers + return claim + }, + }, + }, + want: want{ + reserve: result{ + inFlightClaim: structuredAllocatedClaim, + }, + prebind: result{ + assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName), + changes: change{ + claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim { + if claim.Name == claimName { + claim = claim.DeepCopy() + claim.Status = structuredInUseClaim.Status + } + return claim + }, + }, + }, + postbind: result{ + assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName), + }, + }, + }, "delayed-allocation-structured-skip-bind": { pod: podWithClaimName, claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},