dra: patch ReservedFor during PreBind

This moves adding a pod to ReservedFor out of the main scheduling cycle into
PreBind. There it is done concurrently in different goroutines. For claims
which were specifically allocated for a pod (the most common case), that
usually makes no difference because the claim is already reserved.

It starts to matter when that pod then cannot be scheduled for other reasons,
because then the claim gets unreserved to allow deallocating it. It also
matters for claims that are created separately and then get used multiple times
by different pods.

Because multiple pods might get added to the same claim rapidly independently
from each other, it makes sense to do all claim status updates via patching:
then it is no longer necessary to have an up-to-date copy of the claim because
the patch operation will succeed if (and only if) the patched claim is valid.

Server-side-apply cannot be used for this because a client always has to send
the full list of all entries that it wants to be set, i.e. it cannot add one
entry unless it knows the full list.
This commit is contained in:
Patrick Ohly 2023-11-14 15:47:47 +01:00
parent 81986587ef
commit 5d1509126f
3 changed files with 224 additions and 64 deletions

View File

@ -32,6 +32,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
resourcev1alpha2apply "k8s.io/client-go/applyconfigurations/resource/v1alpha2"
"k8s.io/client-go/kubernetes"
@ -102,25 +103,6 @@ func (d *stateData) Clone() framework.StateData {
return d
}
func (d *stateData) updateClaimStatus(ctx context.Context, clientset kubernetes.Interface, index int, claim *resourcev1alpha2.ResourceClaim) error {
// TODO (#113700): replace with patch operation. Beware that patching must only succeed if the
// object has not been modified in parallel by someone else.
claim, err := clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
// TODO: metric for update results, with the operation ("set selected
// node", "set PotentialNodes", etc.) as one dimension.
if err != nil {
return fmt.Errorf("update resource claim: %w", err)
}
// Remember the new instance. This is relevant when the plugin must
// update the same claim multiple times (for example, first reserve
// the claim, then later remove the reservation), because otherwise the second
// update would fail with a "was modified" error.
d.claims[index] = claim
return nil
}
type podSchedulingState struct {
// A pointer to the PodSchedulingContext object for the pod, if one exists
// in the API server.
@ -300,6 +282,7 @@ var _ framework.PostFilterPlugin = &dynamicResources{}
var _ framework.PreScorePlugin = &dynamicResources{}
var _ framework.ReservePlugin = &dynamicResources{}
var _ framework.EnqueueExtensions = &dynamicResources{}
var _ framework.PreBindPlugin = &dynamicResources{}
var _ framework.PostBindPlugin = &dynamicResources{}
// Name returns name of the plugin. It is used in logs, etc.
@ -803,7 +786,7 @@ func (pl *dynamicResources) PostFilter(ctx context.Context, cs *framework.CycleS
claim.Status.DeallocationRequested = true
claim.Status.ReservedFor = nil
logger.V(5).Info("Requesting deallocation of ResourceClaim", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim))
if err := state.updateClaimStatus(ctx, pl.clientset, index, claim); err != nil {
if _, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}); err != nil {
return nil, statusError(logger, err)
}
return nil, nil
@ -923,27 +906,11 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
logger := klog.FromContext(ctx)
for index, claim := range state.claims {
if claim.Status.Allocation != nil {
// Allocated, but perhaps not reserved yet.
if resourceclaim.IsReservedForPod(pod, claim) {
logger.V(5).Info("is reserved", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim))
continue
}
claim := claim.DeepCopy()
claim.Status.ReservedFor = append(claim.Status.ReservedFor,
resourcev1alpha2.ResourceClaimConsumerReference{
Resource: "pods",
Name: pod.Name,
UID: pod.UID,
})
logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim))
_, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
// TODO: metric for update errors.
if err != nil {
return statusError(logger, err)
}
// If we get here, we know that reserving the claim for
// the pod worked and we can proceed with schedulingCtx
// it.
// Allocated, but perhaps not reserved yet. We checked in PreFilter that
// the pod could reserve the claim. Instead of reserving here by
// updating the ResourceClaim status, we assume that reserving
// will work and only do it for real during binding. If it fails at
// that time, some other pod was faster and we have to try again.
} else {
// Must be delayed allocation.
numDelayedAllocationPending++
@ -1037,29 +1004,74 @@ func (pl *dynamicResources) Unreserve(ctx context.Context, cs *framework.CycleSt
}
logger := klog.FromContext(ctx)
for index, claim := range state.claims {
for _, claim := range state.claims {
if claim.Status.Allocation != nil &&
resourceclaim.IsReservedForPod(pod, claim) {
// Remove pod from ReservedFor.
claim := claim.DeepCopy()
reservedFor := make([]resourcev1alpha2.ResourceClaimConsumerReference, 0, len(claim.Status.ReservedFor)-1)
for _, reserved := range claim.Status.ReservedFor {
// TODO: can UID be assumed to be unique all resources or do we also need to compare Group/Version/Resource?
if reserved.UID != pod.UID {
reservedFor = append(reservedFor, reserved)
}
}
claim.Status.ReservedFor = reservedFor
logger.V(5).Info("unreserve", "resourceclaim", klog.KObj(claim))
if err := state.updateClaimStatus(ctx, pl.clientset, index, claim); err != nil {
// We will get here again when pod schedulingCtx
// is retried.
// Remove pod from ReservedFor. A strategic-merge-patch is used
// because that allows removing an individual entry without having
// the latest slice.
patch := fmt.Sprintf(`{"metadata": {"uid": %q}, "status": { "reservedFor": [ {"$patch": "delete", "uid": %q} ] }}`,
claim.UID,
pod.UID,
)
logger.V(5).Info("unreserve", "resourceclaim", klog.KObj(claim), "pod", klog.KObj(pod))
claim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Patch(ctx, claim.Name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}, "status")
if err != nil {
// We will get here again when pod scheduling is retried.
logger.Error(err, "unreserve", "resourceclaim", klog.KObj(claim))
}
}
}
}
// PreBind gets called in a separate goroutine after it has been determined
// that the pod should get bound to this node. Because Reserve did not actually
// reserve claims, we need to do it now. If that fails, we return an error and
// the pod will have to go into the backoff queue. The scheduler will call
// Unreserve as part of the error handling.
func (pl *dynamicResources) PreBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
if !pl.enabled {
return nil
}
state, err := getStateData(cs)
if err != nil {
return statusError(klog.FromContext(ctx), err)
}
if len(state.claims) == 0 {
return nil
}
logger := klog.FromContext(ctx)
for index, claim := range state.claims {
if !resourceclaim.IsReservedForPod(pod, claim) {
// The claim might be stale, for example because the claim can get shared and some
// other goroutine has updated it in the meantime. We therefore cannot use
// SSA here to add the pod because then we would have to send the entire slice
// or use different field manager strings for each entry.
//
// With a strategic-merge-patch, we can simply send one new entry. The apiserver
// validation will catch if two goroutines try to do that at the same time and
// the claim cannot be shared.
patch := fmt.Sprintf(`{"metadata": {"uid": %q}, "status": { "reservedFor": [ {"resource": "pods", "name": %q, "uid": %q} ] }}`,
claim.UID,
pod.Name,
pod.UID,
)
logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim))
claim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Patch(ctx, claim.Name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}, "status")
logger.V(5).Info("reserved", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.Format(claim))
// TODO: metric for update errors.
if err != nil {
return statusError(logger, err)
}
state.claims[index] = claim
}
}
// If we get here, we know that reserving the claim for
// the pod worked and we can proceed with binding it.
return nil
}
// PostBind is called after a pod is successfully bound to a node. Now we are
// sure that a PodSchedulingContext object, if it exists, is definitely not going to
// be needed anymore and can delete it. This is a one-shot thing, there won't

View File

@ -192,9 +192,14 @@ type want struct {
prescore result
reserve result
unreserve result
prebind result
postbind result
postFilterResult *framework.PostFilterResult
postfilter result
// unreserveAfterBindFailure, if set, triggers a call to Unreserve
// after PreBind, as if the actual Bind had failed.
unreserveAfterBindFailure *result
}
// prepare contains changes for objects in the API server.
@ -206,6 +211,7 @@ type prepare struct {
prescore change
reserve change
unreserve change
prebind change
postbind change
postfilter change
}
@ -237,7 +243,7 @@ func TestPlugin(t *testing.T) {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{allocatedClaim, otherClaim},
want: want{
reserve: result{
prebind: result{
changes: change{
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
if claim.Name == claimName {
@ -254,7 +260,7 @@ func TestPlugin(t *testing.T) {
pod: podWithClaimTemplateInStatus,
claims: []*resourcev1alpha2.ResourceClaim{allocatedClaim, otherClaim},
want: want{
reserve: result{
prebind: result{
changes: change{
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
if claim.Name == claimName {
@ -417,7 +423,7 @@ func TestPlugin(t *testing.T) {
schedulings: []*resourcev1alpha2.PodSchedulingContext{schedulingInfo},
classes: []*resourcev1alpha2.ResourceClass{resourceClass},
want: want{
reserve: result{
prebind: result{
changes: change{
claim: func(in *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
return st.FromResourceClaim(in).
@ -492,7 +498,7 @@ func TestPlugin(t *testing.T) {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{allocatedClaimWithGoodTopology},
want: want{
reserve: result{
prebind: result{
changes: change{
claim: func(in *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
return st.FromResourceClaim(in).
@ -503,6 +509,30 @@ func TestPlugin(t *testing.T) {
},
},
},
"bind-failure": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{allocatedClaimWithGoodTopology},
want: want{
prebind: result{
changes: change{
claim: func(in *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
return st.FromResourceClaim(in).
ReservedFor(resourcev1alpha2.ResourceClaimConsumerReference{Resource: "pods", Name: podName, UID: types.UID(podUID)}).
Obj()
},
},
},
unreserveAfterBindFailure: &result{
changes: change{
claim: func(in *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
out := in.DeepCopy()
out.Status.ReservedFor = []resourcev1alpha2.ResourceClaimConsumerReference{}
return out
},
},
},
},
},
"reserved-okay": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{inUseClaim},
@ -607,11 +637,26 @@ func TestPlugin(t *testing.T) {
})
} else {
initialObjects = testCtx.listAll(t)
initialObjects = testCtx.updateAPIServer(t, initialObjects, tc.prepare.postbind)
testCtx.p.PostBind(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Node().Name)
t.Run("postbind", func(t *testing.T) {
testCtx.verify(t, tc.want.postbind, initialObjects, nil, status)
initialObjects = testCtx.updateAPIServer(t, initialObjects, tc.prepare.prebind)
status := testCtx.p.PreBind(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Node().Name)
t.Run("prebind", func(t *testing.T) {
testCtx.verify(t, tc.want.prebind, initialObjects, nil, status)
})
if tc.want.unreserveAfterBindFailure != nil {
initialObjects = testCtx.listAll(t)
testCtx.p.Unreserve(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Node().Name)
t.Run("unreserverAfterBindFailure", func(t *testing.T) {
testCtx.verify(t, *tc.want.unreserveAfterBindFailure, initialObjects, nil, status)
})
} else if status.IsSuccess() {
initialObjects = testCtx.listAll(t)
initialObjects = testCtx.updateAPIServer(t, initialObjects, tc.prepare.postbind)
testCtx.p.PostBind(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Node().Name)
t.Run("postbind", func(t *testing.T) {
testCtx.verify(t, tc.want.postbind, initialObjects, nil, nil)
})
}
}
} else {
initialObjects = testCtx.listAll(t)

View File

@ -25,6 +25,7 @@ import (
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
"github.com/onsi/gomega/gcustom"
v1 "k8s.io/api/core/v1"
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
@ -309,6 +310,108 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
ginkgo.Context("cluster", func() {
nodes := NewNodes(f, 1, 4)
ginkgo.Context("with local unshared resources", func() {
driver := NewDriver(f, nodes, func() app.Resources {
return app.Resources{
NodeLocal: true,
MaxAllocations: 10,
Nodes: nodes.NodeNames,
}
})
b := newBuilder(f, driver)
// This test covers some special code paths in the scheduler:
// - Patching the ReservedFor during PreBind because in contrast
// to claims specifically allocated for a pod, here the claim
// gets allocated without reserving it.
// - Error handling when PreBind fails: multiple attempts to bind pods
// are started concurrently, only one attempt succeeds.
// - Removing a ReservedFor entry because the first inline claim gets
// reserved during allocation.
ginkgo.It("reuses an allocated immediate claim", func(ctx context.Context) {
objects := []klog.KMetadata{
b.parameters(),
b.externalClaim(resourcev1alpha2.AllocationModeImmediate),
}
podExternal := b.podExternal()
// Create many pods to increase the chance that the scheduler will
// try to bind two pods at the same time.
numPods := 5
for i := 0; i < numPods; i++ {
podInline, claimTemplate := b.podInline(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
podInline.Spec.Containers[0].Resources.Claims = append(podInline.Spec.Containers[0].Resources.Claims, podExternal.Spec.Containers[0].Resources.Claims[0])
podInline.Spec.ResourceClaims = append(podInline.Spec.ResourceClaims, podExternal.Spec.ResourceClaims[0])
objects = append(objects, claimTemplate, podInline)
}
b.create(ctx, objects...)
var runningPod *v1.Pod
haveRunningPod := gcustom.MakeMatcher(func(pods []v1.Pod) (bool, error) {
numRunning := 0
runningPod = nil
for _, pod := range pods {
if pod.Status.Phase == v1.PodRunning {
pod := pod // Don't keep pointer to loop variable...
runningPod = &pod
numRunning++
}
}
return numRunning == 1, nil
}).WithTemplate("Expected one running Pod.\nGot instead:\n{{.FormattedActual}}")
for i := 0; i < numPods; i++ {
ginkgo.By("waiting for exactly one pod to start")
runningPod = nil
gomega.Eventually(ctx, b.listTestPods).WithTimeout(f.Timeouts.PodStartSlow).Should(haveRunningPod)
ginkgo.By("checking that no other pod gets scheduled")
havePendingPods := gcustom.MakeMatcher(func(pods []v1.Pod) (bool, error) {
numPending := 0
for _, pod := range pods {
if pod.Status.Phase == v1.PodPending {
numPending++
}
}
return numPending == numPods-1-i, nil
}).WithTemplate("Expected only one running Pod.\nGot instead:\n{{.FormattedActual}}")
gomega.Consistently(ctx, b.listTestPods).WithTimeout(time.Second).Should(havePendingPods)
ginkgo.By(fmt.Sprintf("deleting pod %s", klog.KObj(runningPod)))
framework.ExpectNoError(b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Delete(ctx, runningPod.Name, metav1.DeleteOptions{}))
ginkgo.By(fmt.Sprintf("waiting for pod %s to disappear", klog.KObj(runningPod)))
framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, b.f.ClientSet, runningPod.Name, runningPod.Namespace, f.Timeouts.PodDelete))
}
})
})
ginkgo.Context("with shared network resources", func() {
driver := NewDriver(f, nodes, networkResources)
b := newBuilder(f, driver)
// This test complements "reuses an allocated immediate claim" above:
// because the claim can be shared, each PreBind attempt succeeds.
ginkgo.It("shares an allocated immediate claim", func(ctx context.Context) {
objects := []klog.KMetadata{
b.parameters(),
b.externalClaim(resourcev1alpha2.AllocationModeImmediate),
}
// Create many pods to increase the chance that the scheduler will
// try to bind two pods at the same time.
numPods := 5
pods := make([]*v1.Pod, numPods)
for i := 0; i < numPods; i++ {
pods[i] = b.podExternal()
objects = append(objects, pods[i])
}
b.create(ctx, objects...)
ginkgo.By("waiting all pods to start")
framework.ExpectNoError(e2epod.WaitForPodsRunning(ctx, b.f.ClientSet, f.Namespace.Name, numPods+1 /* driver */, f.Timeouts.PodStartSlow))
})
})
// claimTests tries out several different combinations of pods with
// claims, both inline and external.
claimTests := func(b *builder, driver *Driver, allocationMode resourcev1alpha2.AllocationMode) {