mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 22:46:12 +00:00
DRA: fix scheduler/resource claim controller race with retry
The JSON patch approach works, but it is complex. A retry loop is easier to understand (detect conflict, get new claim, try again). There is one additional API call (the get), but in practice this scenario is unlikely.
This commit is contained in:
parent
ecbafb8de5
commit
4bddebc48e
@ -18,12 +18,10 @@ package dynamicresources
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"slices"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
@ -41,6 +39,7 @@ import (
|
||||
"k8s.io/client-go/kubernetes"
|
||||
resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
|
||||
"k8s.io/dynamic-resource-allocation/resourceclaim"
|
||||
"k8s.io/klog/v2"
|
||||
@ -1461,7 +1460,9 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
|
||||
// The allocation would be enough. The full object is useful for
|
||||
// debugging and testing, so let's make it realistic.
|
||||
claim = claim.DeepCopy()
|
||||
claim.Finalizers = append(claim.Finalizers, resourcev1alpha2.Finalizer)
|
||||
if !slices.Contains(claim.Finalizers, resourcev1alpha2.Finalizer) {
|
||||
claim.Finalizers = append(claim.Finalizers, resourcev1alpha2.Finalizer)
|
||||
}
|
||||
claim.Status.DriverName = driverName
|
||||
claim.Status.Allocation = allocation
|
||||
pl.inFlightAllocations.Store(claim.UID, claim)
|
||||
@ -1620,105 +1621,88 @@ func (pl *dynamicResources) PreBind(ctx context.Context, cs *framework.CycleStat
|
||||
// and reservation are recorded. This finishes the work started in Reserve.
|
||||
func (pl *dynamicResources) bindClaim(ctx context.Context, state *stateData, index int, pod *v1.Pod, nodeName string) (patchedClaim *resourcev1alpha2.ResourceClaim, finalErr error) {
|
||||
logger := klog.FromContext(ctx)
|
||||
claim := state.claims[index]
|
||||
driverName := ""
|
||||
allocationObject := ""
|
||||
|
||||
claim := state.claims[index].DeepCopy()
|
||||
allocation := state.informationsForClaim[index].allocation
|
||||
logger.V(5).Info("preparing claim status patch", "claim", klog.KObj(state.claims[index]), "allocation", klog.Format(allocation))
|
||||
defer func() {
|
||||
if allocation != nil {
|
||||
// The scheduler was handling allocation. Now that has
|
||||
// completed, either successfully or with a failure.
|
||||
if finalErr == nil {
|
||||
// This can fail, but only for reasons that are okay (concurrent delete or update).
|
||||
// Shouldn't happen in this case.
|
||||
if err := pl.claimAssumeCache.Assume(claim); err != nil {
|
||||
logger.V(5).Info("Claim not stored in assume cache", "err", finalErr)
|
||||
}
|
||||
}
|
||||
pl.inFlightAllocations.Delete(claim.UID)
|
||||
}
|
||||
}()
|
||||
|
||||
// Do we need to store an allocation result from Reserve?
|
||||
if allocation != nil {
|
||||
buffer, err := json.Marshal(allocation)
|
||||
logger.V(5).Info("preparing claim status update", "claim", klog.KObj(state.claims[index]), "allocation", klog.Format(allocation))
|
||||
|
||||
// We may run into a ResourceVersion conflict because there may be some
|
||||
// benign concurrent changes. In that case we get the latest claim and
|
||||
// try again.
|
||||
refreshClaim := false
|
||||
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
if refreshClaim {
|
||||
updatedClaim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Get(ctx, claim.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("get updated claim %s after conflict: %w", klog.KObj(claim), err)
|
||||
}
|
||||
logger.V(5).Info("retrying update after conflict", "claim", klog.KObj(claim))
|
||||
claim = updatedClaim
|
||||
} else {
|
||||
// All future retries must get a new claim first.
|
||||
refreshClaim = true
|
||||
}
|
||||
|
||||
if claim.DeletionTimestamp != nil {
|
||||
return fmt.Errorf("claim %s got deleted in the meantime", klog.KObj(claim))
|
||||
}
|
||||
|
||||
// Do we need to store an allocation result from Reserve?
|
||||
if allocation != nil {
|
||||
if claim.Status.Allocation != nil {
|
||||
return fmt.Errorf("claim %s got allocated elsewhere in the meantime", klog.KObj(claim))
|
||||
}
|
||||
|
||||
// The finalizer needs to be added in a normal update.
|
||||
// If we were interrupted in the past, it might already be set and we simply continue.
|
||||
if !slices.Contains(claim.Finalizers, resourcev1alpha2.Finalizer) {
|
||||
claim.Finalizers = append(claim.Finalizers, resourcev1alpha2.Finalizer)
|
||||
updatedClaim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("add finalizer to claim %s: %w", klog.KObj(claim), err)
|
||||
}
|
||||
claim = updatedClaim
|
||||
}
|
||||
|
||||
claim.Status.DriverName = state.informationsForClaim[index].allocationDriverName
|
||||
claim.Status.Allocation = allocation
|
||||
}
|
||||
|
||||
// We can simply try to add the pod here without checking
|
||||
// preconditions. The apiserver will tell us with a
|
||||
// non-conflict error if this isn't possible.
|
||||
claim.Status.ReservedFor = append(claim.Status.ReservedFor, resourcev1alpha2.ResourceClaimConsumerReference{Resource: "pods", Name: pod.Name, UID: pod.UID})
|
||||
updatedClaim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshaling AllocationResult failed: %v", err)
|
||||
}
|
||||
driverName = state.informationsForClaim[index].allocationDriverName
|
||||
allocationObject = string(buffer)
|
||||
|
||||
// The finalizer needs to be added in a normal update. Using a simple update is fine
|
||||
// because we don't expect concurrent modifications while the claim is not allocated
|
||||
// yet. If there are any, we want to fail.
|
||||
//
|
||||
// If we were interrupted in the past, it might already be set and we simply continue.
|
||||
if !slices.Contains(claim.Finalizers, resourcev1alpha2.Finalizer) {
|
||||
claim := state.claims[index].DeepCopy()
|
||||
claim.Finalizers = append(claim.Finalizers, resourcev1alpha2.Finalizer)
|
||||
if _, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}); err != nil {
|
||||
return nil, fmt.Errorf("add finalizer: %v", err)
|
||||
if allocation != nil {
|
||||
return fmt.Errorf("add allocation and reservation to claim %s: %w", klog.KObj(claim), err)
|
||||
}
|
||||
return fmt.Errorf("add reservation to claim %s: %w", klog.KObj(claim), err)
|
||||
}
|
||||
claim = updatedClaim
|
||||
return nil
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, retryErr
|
||||
}
|
||||
|
||||
// The claim might be stale, for example because the claim can get shared and some
|
||||
// other goroutine has updated it in the meantime. We therefore cannot use
|
||||
// SSA here to add the pod because then we would have to send the entire slice
|
||||
// or use different field manager strings for each entry.
|
||||
//
|
||||
// With a strategic-merge-patch, we can simply send one new entry. The apiserver
|
||||
// validation will catch if two goroutines try to do that at the same time and
|
||||
// the claim cannot be shared.
|
||||
//
|
||||
// Note that this also works when the allocation result gets added twice because
|
||||
// two pods both started using a shared claim: the first pod to get here adds the
|
||||
// allocation result. The second pod then only adds itself to reservedFor.
|
||||
//
|
||||
// Because we allow concurrent updates, we don't check ResourceVersion
|
||||
// in a precondition. What we *do* need to check is that the finalizer
|
||||
// is still present. This implies using a JSON patch instead of a
|
||||
// simpler strategic-merge-patch.
|
||||
parts := make([]string, 0, 7)
|
||||
parts = append(parts,
|
||||
fmt.Sprintf(`{ "op": "test", "path": "/metadata/uid", "value": %q }`, claim.UID),
|
||||
)
|
||||
// Strictly speaking, we only need to check for existence of our finalizer.
|
||||
// JSON patch does not support that, so we check for the stronger "array is equal".
|
||||
if len(claim.Finalizers) > 0 {
|
||||
parts = append(parts,
|
||||
fmt.Sprintf(`{ "op": "test", "path": "/metadata/finalizers", "value": %q }`, claim.Finalizers),
|
||||
)
|
||||
}
|
||||
// JSON patch can only append to a non-empty array. An empty reservedFor gets
|
||||
// omitted and even if it didn't, it would be null and not an empty array.
|
||||
// Therefore we have to test and add if it's currently empty.
|
||||
reservedForEntry := fmt.Sprintf(`{"resource": "pods", "name": %q, "uid": %q}`, pod.Name, pod.UID)
|
||||
if len(claim.Status.ReservedFor) == 0 {
|
||||
parts = append(parts,
|
||||
fmt.Sprintf(`{ "op": "test", "path": "/status/reservedFor", "value": null }`),
|
||||
fmt.Sprintf(`{ "op": "add", "path": "/status/reservedFor", "value": [ %s ] }`, reservedForEntry),
|
||||
)
|
||||
} else {
|
||||
parts = append(parts,
|
||||
fmt.Sprintf(`{ "op": "add", "path": "/status/reservedFor/-", "value": %s }`, reservedForEntry),
|
||||
)
|
||||
}
|
||||
if allocationObject != "" {
|
||||
parts = append(parts,
|
||||
fmt.Sprintf(`{ "op": "add", "path": "/status/driverName", "value": %q }`, driverName),
|
||||
fmt.Sprintf(`{ "op": "add", "path": "/status/allocation", "value": %s }`, allocationObject),
|
||||
)
|
||||
}
|
||||
patch := "[\n" + strings.Join(parts, ",\n") + "\n]"
|
||||
if loggerV := logger.V(6); loggerV.Enabled() {
|
||||
logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim), "patch", patch)
|
||||
} else {
|
||||
logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim))
|
||||
}
|
||||
claim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Patch(ctx, claim.Name, types.JSONPatchType, []byte(patch), metav1.PatchOptions{}, "status")
|
||||
logger.V(5).Info("reserved", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.Format(claim), "err", err)
|
||||
if allocationObject != "" {
|
||||
// The scheduler was handling allocation. Now that has
|
||||
// completed, either successfully or with a failure.
|
||||
if err == nil {
|
||||
// This can fail, but only for reasons that are okay (concurrent delete or update).
|
||||
// Shouldn't happen in this case.
|
||||
if err := pl.claimAssumeCache.Assume(claim); err != nil {
|
||||
logger.V(5).Info("Claim not stored in assume cache", "err", err)
|
||||
}
|
||||
}
|
||||
pl.inFlightAllocations.Delete(claim.UID)
|
||||
}
|
||||
return claim, err
|
||||
logger.V(5).Info("reserved", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.Format(claim))
|
||||
return claim, nil
|
||||
}
|
||||
|
||||
// PostBind is called after a pod is successfully bound to a node. Now we are
|
||||
|
@ -580,6 +580,115 @@ func TestPlugin(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
"delayed-allocation-structured-with-resources-has-finalizer": {
|
||||
// As before. but the finalizer is already set. Could happen if
|
||||
// the scheduler got interrupted.
|
||||
pod: podWithClaimName,
|
||||
claims: func() []*resourcev1alpha2.ResourceClaim {
|
||||
claim := pendingDelayedClaim.DeepCopy()
|
||||
claim.Finalizers = structuredAllocatedClaim.Finalizers
|
||||
return []*resourcev1alpha2.ResourceClaim{claim}
|
||||
}(),
|
||||
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClass},
|
||||
objs: []apiruntime.Object{workerNodeSlice},
|
||||
want: want{
|
||||
reserve: result{
|
||||
inFlightClaim: structuredAllocatedClaim,
|
||||
},
|
||||
prebind: result{
|
||||
assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName),
|
||||
changes: change{
|
||||
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
|
||||
if claim.Name == claimName {
|
||||
claim = claim.DeepCopy()
|
||||
claim.Status = structuredInUseClaim.Status
|
||||
}
|
||||
return claim
|
||||
},
|
||||
},
|
||||
},
|
||||
postbind: result{
|
||||
assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName),
|
||||
},
|
||||
},
|
||||
},
|
||||
"delayed-allocation-structured-with-resources-finalizer-gets-removed": {
|
||||
// As before. but the finalizer is already set. Then it gets
|
||||
// removed before the scheduler reaches PreBind.
|
||||
pod: podWithClaimName,
|
||||
claims: func() []*resourcev1alpha2.ResourceClaim {
|
||||
claim := pendingDelayedClaim.DeepCopy()
|
||||
claim.Finalizers = structuredAllocatedClaim.Finalizers
|
||||
return []*resourcev1alpha2.ResourceClaim{claim}
|
||||
}(),
|
||||
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClass},
|
||||
objs: []apiruntime.Object{workerNodeSlice},
|
||||
prepare: prepare{
|
||||
prebind: change{
|
||||
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
|
||||
claim.Finalizers = nil
|
||||
return claim
|
||||
},
|
||||
},
|
||||
},
|
||||
want: want{
|
||||
reserve: result{
|
||||
inFlightClaim: structuredAllocatedClaim,
|
||||
},
|
||||
prebind: result{
|
||||
assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName),
|
||||
changes: change{
|
||||
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
|
||||
if claim.Name == claimName {
|
||||
claim = claim.DeepCopy()
|
||||
claim.Finalizers = structuredAllocatedClaim.Finalizers
|
||||
claim.Status = structuredInUseClaim.Status
|
||||
}
|
||||
return claim
|
||||
},
|
||||
},
|
||||
},
|
||||
postbind: result{
|
||||
assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName),
|
||||
},
|
||||
},
|
||||
},
|
||||
"delayed-allocation-structured-with-resources-finalizer-gets-added": {
|
||||
// No finalizer initially, then it gets added before
|
||||
// the scheduler reaches PreBind. Shouldn't happen?
|
||||
pod: podWithClaimName,
|
||||
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
|
||||
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClass},
|
||||
objs: []apiruntime.Object{workerNodeSlice},
|
||||
prepare: prepare{
|
||||
prebind: change{
|
||||
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
|
||||
claim.Finalizers = structuredAllocatedClaim.Finalizers
|
||||
return claim
|
||||
},
|
||||
},
|
||||
},
|
||||
want: want{
|
||||
reserve: result{
|
||||
inFlightClaim: structuredAllocatedClaim,
|
||||
},
|
||||
prebind: result{
|
||||
assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName),
|
||||
changes: change{
|
||||
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
|
||||
if claim.Name == claimName {
|
||||
claim = claim.DeepCopy()
|
||||
claim.Status = structuredInUseClaim.Status
|
||||
}
|
||||
return claim
|
||||
},
|
||||
},
|
||||
},
|
||||
postbind: result{
|
||||
assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName),
|
||||
},
|
||||
},
|
||||
},
|
||||
"delayed-allocation-structured-skip-bind": {
|
||||
pod: podWithClaimName,
|
||||
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
|
||||
|
Loading…
Reference in New Issue
Block a user