Merge pull request #118862 from byako/batching-dra-calls

DRA controller: batch resource claims for Allocate
This commit is contained in:
Kubernetes Prow Robot 2023-07-06 11:33:03 -07:00 committed by GitHub
commit d02d8ba635
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 225 additions and 100 deletions

View File

@ -68,8 +68,8 @@ type Driver interface {
// The caller will wrap the error to include the parameter reference.
GetClaimParameters(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, class *resourcev1alpha2.ResourceClass, classParameters interface{}) (interface{}, error)
// Allocate gets called when a ResourceClaim is ready to be allocated.
// The selectedNode is empty for ResourceClaims with immediate
// Allocate gets called when all same-driver ResourceClaims for Pod are ready
// to be allocated. The selectedNode is empty for ResourceClaims with immediate
// allocation, in which case the resource driver decides itself where
// to allocate. If there is already an on-going allocation, the driver
// may finish it and ignore the new parameters or abort the on-going
@ -77,15 +77,21 @@ type Driver interface {
//
// Parameters have been retrieved earlier.
//
// Driver must set the result of allocation for every claim in "claims"
// parameter items. In case if there was no error encountered and allocation
// was successful - claims[i].Allocation field should be set. In case of
// particular claim allocation fail - respective item's claims[i].Error field
// should be set, in this case claims[i].Allocation will be ignored.
//
// If selectedNode is set, the driver must attempt to allocate for that
// node. If that is not possible, it must return an error. The
// controller will call UnsuitableNodes and pass the new information to
// the scheduler, which then will lead to selecting a diffent node
// if the current one is not suitable.
//
// The objects are read-only and must not be modified. This call
// must be idempotent.
Allocate(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (*resourcev1alpha2.AllocationResult, error)
// The Claim, ClaimParameters, Class, ClassParameters fields of "claims" parameter
// items are read-only and must not be modified. This call must be idempotent.
Allocate(ctx context.Context, claims []*ClaimAllocation, selectedNode string)
// Deallocate gets called when a ResourceClaim is ready to be
// freed.
@ -125,6 +131,13 @@ type ClaimAllocation struct {
// UnsuitableNodes needs to be filled in by the driver when
// Driver.UnsuitableNodes gets called.
UnsuitableNodes []string
// Driver must populate this field with resources that were
// allocated for the claim in case of successful allocation.
Allocation *resourcev1alpha2.AllocationResult
// In case of error allocating particular claim, driver must
// populate this field.
Error error
}
type controller struct {
@ -500,7 +513,20 @@ func (ctrl *controller) syncClaim(ctx context.Context, claim *resourcev1alpha2.R
return err
}
return ctrl.allocateClaim(ctx, claim, claimParameters, class, classParameters, "", nil)
claimAllocations := claimAllocations{&ClaimAllocation{
Claim: claim,
ClaimParameters: claimParameters,
Class: class,
ClassParameters: classParameters,
}}
ctrl.allocateClaims(ctx, claimAllocations, "", nil)
if claimAllocations[0].Error != nil {
return fmt.Errorf("allocate: %v", claimAllocations[0].Error)
}
return nil
}
func (ctrl *controller) getParameters(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, class *resourcev1alpha2.ResourceClass) (claimParameters, classParameters interface{}, err error) {
@ -517,51 +543,85 @@ func (ctrl *controller) getParameters(ctx context.Context, claim *resourcev1alph
return
}
func (ctrl *controller) allocateClaim(ctx context.Context,
claim *resourcev1alpha2.ResourceClaim, claimParameters interface{},
class *resourcev1alpha2.ResourceClass, classParameters interface{},
selectedNode string,
selectedUser *resourcev1alpha2.ResourceClaimConsumerReference) error {
// allocateClaims filters list of claims, keeps those needing allocation and asks driver to do the allocations.
// Driver is supposed to write the AllocationResult and Error field into argument claims slice.
func (ctrl *controller) allocateClaims(ctx context.Context, claims []*ClaimAllocation, selectedNode string, selectedUser *resourcev1alpha2.ResourceClaimConsumerReference) {
logger := klog.FromContext(ctx)
if claim.Status.Allocation != nil {
// This can happen when two PodSchedulingContext objects trigger
// allocation attempts (first one wins) or when we see the
// update of the PodSchedulingContext object.
logger.V(5).Info("Claim already allocated, nothing to do")
return nil
needAllocation := make([]*ClaimAllocation, 0, len(claims))
for _, claim := range claims {
if claim.Claim.Status.Allocation != nil {
// This can happen when two PodSchedulingContext objects trigger
// allocation attempts (first one wins) or when we see the
// update of the PodSchedulingContext object.
logger.V(5).Info("Claim is already allocated, skipping allocation", "claim", claim.PodClaimName)
continue
}
needAllocation = append(needAllocation, claim)
}
claim = claim.DeepCopy()
if !ctrl.hasFinalizer(claim) {
// Set finalizer before doing anything. We continue with the updated claim.
logger.V(5).Info("Adding finalizer")
claim.Finalizers = append(claim.Finalizers, ctrl.finalizer)
var err error
claim, err = ctrl.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("add finalizer: %v", err)
}
ctrl.claimCache.Mutation(claim)
if len(needAllocation) == 0 {
logger.V(5).Info("No claims need allocation, nothing to do")
return
}
// Keep separately claims that succeeded adding finalizers,
// they will be sent for Allocate to the driver.
claimsWithFinalizers := make([]*ClaimAllocation, 0, len(needAllocation))
for _, claimAllocation := range needAllocation {
if !ctrl.hasFinalizer(claimAllocation.Claim) {
claim := claimAllocation.Claim.DeepCopy()
// Set finalizer before doing anything. We continue with the updated claim.
logger.V(5).Info("Adding finalizer", "claim", claim.Name)
claim.Finalizers = append(claim.Finalizers, ctrl.finalizer)
var err error
claim, err = ctrl.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{})
if err != nil {
logger.Error(err, "add finalizer", "claim", claim.Name)
claimAllocation.Error = fmt.Errorf("add finalizer: %v", err)
// Do not save claim to ask for Allocate from Driver.
continue
}
ctrl.claimCache.Mutation(claim)
claimAllocation.Claim = claim
}
claimsWithFinalizers = append(claimsWithFinalizers, claimAllocation)
}
// Beyond here we only operate with claimsWithFinalizers because those are ready for allocation.
logger.V(5).Info("Allocating")
allocation, err := ctrl.driver.Allocate(ctx, claim, claimParameters, class, classParameters, selectedNode)
if err != nil {
return fmt.Errorf("allocate: %v", err)
ctrl.driver.Allocate(ctx, claimsWithFinalizers, selectedNode)
// Update successfully allocated claims' status with allocation info.
for _, claimAllocation := range claimsWithFinalizers {
if claimAllocation.Error != nil {
logger.Error(claimAllocation.Error, "allocating claim", "claim", claimAllocation.Claim.Name)
continue
}
if claimAllocation.Allocation == nil {
logger.Error(nil, "allocating claim: missing allocation from driver", "claim", claimAllocation.Claim.Name)
claimAllocation.Error = fmt.Errorf("allocating claim: missing allocation from driver")
// Do not update this claim with allocation, it might succeed next time.
continue
}
logger.V(5).Info("successfully allocated", "claim", klog.KObj(claimAllocation.Claim))
claim := claimAllocation.Claim.DeepCopy()
claim.Status.Allocation = claimAllocation.Allocation
claim.Status.DriverName = ctrl.name
if selectedUser != nil {
claim.Status.ReservedFor = append(claim.Status.ReservedFor, *selectedUser)
}
logger.V(6).Info("Updating claim after allocation", "claim", claim)
claim, err := ctrl.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
if err != nil {
claimAllocation.Error = fmt.Errorf("add allocation: %v", err)
continue
}
ctrl.claimCache.Mutation(claim)
}
claim.Status.Allocation = allocation
claim.Status.DriverName = ctrl.name
if selectedUser != nil {
claim.Status.ReservedFor = append(claim.Status.ReservedFor, *selectedUser)
}
logger.V(6).Info("Updating claim after allocation", "claim", claim)
claim, err = ctrl.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("add allocation: %v", err)
}
ctrl.claimCache.Mutation(claim)
return nil
return
}
func (ctrl *controller) checkPodClaim(ctx context.Context, pod *v1.Pod, podClaim v1.PodResourceClaim) (*ClaimAllocation, error) {
@ -690,11 +750,20 @@ func (ctrl *controller) syncPodSchedulingContexts(ctx context.Context, schedulin
Name: pod.Name,
UID: pod.UID,
}
ctrl.allocateClaims(ctx, claims, selectedNode, selectedUser)
allErrorsStr := "allocation of one or more pod claims failed."
allocationFailed := false
for _, delayed := range claims {
if err := ctrl.allocateClaim(ctx, delayed.Claim, delayed.ClaimParameters, delayed.Class, delayed.ClassParameters, selectedNode, selectedUser); err != nil {
return fmt.Errorf("allocation of pod claim %s failed: %v", delayed.PodClaimName, err)
if delayed.Error != nil {
allErrorsStr = fmt.Sprintf("%s Claim %s: %s.", allErrorsStr, delayed.Claim.Name, delayed.Error)
allocationFailed = true
}
}
if allocationFailed {
return fmt.Errorf(allErrorsStr)
}
}
}

View File

@ -533,14 +533,19 @@ func (m mockDriver) GetClaimParameters(ctx context.Context, claim *resourcev1alp
return result, nil
}
func (m mockDriver) Allocate(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (*resourcev1alpha2.AllocationResult, error) {
m.t.Logf("Allocate(%s)", claim)
allocate, ok := m.allocate[claim.Name]
if !ok {
m.t.Fatal("unexpected Allocate call")
func (m mockDriver) Allocate(ctx context.Context, claims []*ClaimAllocation, selectedNode string) {
m.t.Logf("Allocate(number of claims %d)", len(claims))
for _, claimAllocation := range claims {
m.t.Logf("Allocate(%s)", claimAllocation.Claim.Name)
allocate, ok := m.allocate[claimAllocation.Claim.Name]
if !ok {
m.t.Fatalf("unexpected Allocate call for claim %s", claimAllocation.Claim.Name)
}
assert.Equal(m.t, allocate.selectedNode, selectedNode, "selected node")
claimAllocation.Error = allocate.allocErr
claimAllocation.Allocation = allocate.allocResult
}
assert.Equal(m.t, allocate.selectedNode, selectedNode, "selected node")
return allocate.allocResult, allocate.allocErr
return
}
func (m mockDriver) Deallocate(ctx context.Context, claim *resourcev1alpha2.ResourceClaim) error {

View File

@ -41,10 +41,10 @@ $ kind create cluster --config test/e2e/dra/kind.yaml --image dra/node:latest
- Build ginkgo
> NB: If you are using go workspace you must disable it `GOWORK=off make gingko`
> NB: If you are using go workspace you must disable it `GOWORK=off make ginkgo`
```bash
$ make gingko
$ make ginkgo
```
- Run e2e tests for the `Dynamic Resource Allocation` feature:

View File

@ -20,8 +20,6 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync/atomic"
"time"
"github.com/onsi/ginkgo/v2"
@ -32,6 +30,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/dynamic-resource-allocation/controller"
"k8s.io/klog/v2"
"k8s.io/kubernetes/test/e2e/dra/test-driver/app"
"k8s.io/kubernetes/test/e2e/framework"
@ -427,69 +426,102 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu
})
ginkgo.Context("reallocation", func() {
var allocateWrapper app.AllocateWrapperType
var allocateWrapper2 app.AllocateWrapperType
driver := NewDriver(f, nodes, func() app.Resources {
return app.Resources{
NodeLocal: true,
MaxAllocations: 2,
Nodes: nodes.NodeNames,
}
})
driver2 := NewDriver(f, nodes, func() app.Resources {
return app.Resources{
NodeLocal: true,
MaxAllocations: 2,
Nodes: nodes.NodeNames,
AllocateWrapper: func(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string,
handler func(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha2.AllocationResult, err error)) (result *resourcev1alpha2.AllocationResult, err error) {
return allocateWrapper(ctx, claim, claimParameters, class, classParameters, selectedNode, handler)
AllocateWrapper: func(
ctx context.Context,
claimAllocations []*controller.ClaimAllocation,
selectedNode string,
handler func(
ctx context.Context,
claimAllocations []*controller.ClaimAllocation,
selectedNode string),
) {
allocateWrapper2(ctx, claimAllocations, selectedNode, handler)
return
},
}
})
driver2.NameSuffix = "-other"
b := newBuilder(f, driver)
b2 := newBuilder(f, driver2)
ginkgo.It("works", func(ctx context.Context) {
// A pod with two claims can run on a node, but
// only if allocation of both succeeds. This
// tests simulates the scenario where one claim
// gets allocated but the second doesn't
// because of a race with some other pod.
// A pod with multiple claims can run on a node, but
// only if allocation of all succeeds. This
// test simulates the scenario where one claim
// gets allocated from one driver, but the claims
// from second driver fail allocation because of a
// race with some other pod.
//
// To ensure the right timing, allocation of the second
// claim gets delayed while creating another pod
// that gets the remaining resource on the node.
// To ensure the right timing, allocation of the
// claims from second driver are delayed while
// creating another pod that gets the remaining
// resource on the node from second driver.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
parameters := b.parameters()
claim1 := b.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
claim2 := b.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
parameters1 := b.parameters()
parameters2 := b2.parameters()
pod1 := b.podExternal()
pod2 := b2.podExternal()
pod1claim1 := b.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
pod2claim1 := b2.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
pod1claim2 := b2.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
pod1claim3 := b2.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
pod1.Spec.ResourceClaims = append(pod1.Spec.ResourceClaims,
v1.PodResourceClaim{
Name: "claim2",
Name: "claim-other1",
Source: v1.ClaimSource{
ResourceClaimName: &claim2.Name,
ResourceClaimName: &pod1claim2.Name,
},
},
v1.PodResourceClaim{
Name: "claim-other2",
Source: v1.ClaimSource{
ResourceClaimName: &pod1claim3.Name,
},
},
)
// Block on the second external claim that is to be allocated.
// Block on the second, third external claim from driver2 that is to be allocated.
blockClaim, cancelBlockClaim := context.WithCancel(ctx)
defer cancelBlockClaim()
var allocated int32
allocateWrapper = func(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{},
class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string,
handler func(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{},
class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha2.AllocationResult, err error),
) (result *resourcev1alpha2.AllocationResult, err error) {
oldAllocated := atomic.AddInt32(&allocated, 0)
if oldAllocated == 1 && strings.HasPrefix(claim.Name, "external-claim") {
b2.create(ctx, parameters2, pod1claim2, pod1claim3)
b.create(ctx, parameters1, pod1claim1, pod1)
allocateWrapper2 = func(ctx context.Context,
claimAllocations []*controller.ClaimAllocation,
selectedNode string,
handler func(ctx context.Context,
claimAllocations []*controller.ClaimAllocation,
selectedNode string),
) {
// pod1 will have only external-claim[2-3], it has to wait
// for pod2 to consume resources from driver2
if claimAllocations[0].Claim.Name != "external-claim-other" {
<-blockClaim.Done()
}
result, err = handler(ctx, claim, claimParameters, class, classParameters, selectedNode)
if err == nil {
atomic.AddInt32(&allocated, 1)
}
handler(ctx, claimAllocations, selectedNode)
return
}
b.create(ctx, parameters, claim1, claim2, pod1)
ginkgo.By("waiting for one claim to be allocated")
ginkgo.By("waiting for one claim from driver1 to be allocated")
var nodeSelector *v1.NodeSelector
gomega.Eventually(ctx, func(ctx context.Context) (int, error) {
claims, err := f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).List(ctx, metav1.ListOptions{})
@ -512,18 +544,20 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu
// the node selector looks like and can
// directly access the key and value from it.
ginkgo.By(fmt.Sprintf("create second pod on the same node %s", nodeSelector))
pod2, template2 := b.podInline(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
req := nodeSelector.NodeSelectorTerms[0].MatchExpressions[0]
node := req.Values[0]
pod2.Spec.NodeSelector = map[string]string{req.Key: node}
b.create(ctx, pod2, template2)
b2.create(ctx, pod2claim1, pod2)
framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod2), "start pod 2")
// Allow allocation of claim2 to proceed. It should fail now
// Allow allocation of pod1 claim2, claim3 to proceed. It should fail now
// and the other node must be used instead, after deallocating
// the first claim.
ginkgo.By("move first pod to other node")
cancelBlockClaim()
framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod1), "start pod 1")
pod1, err := f.ClientSet.CoreV1().Pods(pod1.Namespace).Get(ctx, pod1.Name, metav1.GetOptions{})
framework.ExpectNoError(err, "get first pod")
@ -546,6 +580,7 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu
}
})
b1 := newBuilder(f, driver1)
driver2 := NewDriver(f, nodes, func() app.Resources {
return app.Resources{
NodeLocal: true,

View File

@ -47,11 +47,12 @@ type Resources struct {
AllocateWrapper AllocateWrapperType
}
type AllocateWrapperType func(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{},
class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string,
handler func(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{},
class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha2.AllocationResult, err error),
) (result *resourcev1alpha2.AllocationResult, err error)
type AllocateWrapperType func(ctx context.Context, claimAllocations []*controller.ClaimAllocation,
selectedNode string,
handler func(ctx context.Context,
claimAllocations []*controller.ClaimAllocation,
selectedNode string),
)
type ExampleController struct {
clientset kubernetes.Interface
@ -152,15 +153,30 @@ func (c *ExampleController) readParametersFromConfigMap(ctx context.Context, nam
return configMap.Data, nil
}
func (c *ExampleController) Allocate(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha2.AllocationResult, err error) {
func (c *ExampleController) Allocate(ctx context.Context, claimAllocations []*controller.ClaimAllocation, selectedNode string) {
if c.resources.AllocateWrapper != nil {
return c.resources.AllocateWrapper(ctx, claim, claimParameters, class, classParameters, selectedNode, c.allocate)
c.resources.AllocateWrapper(ctx, claimAllocations, selectedNode, c.allocateOneByOne)
} else {
c.allocateOneByOne(ctx, claimAllocations, selectedNode)
}
return
}
func (c *ExampleController) allocateOneByOne(ctx context.Context, claimAllocations []*controller.ClaimAllocation, selectedNode string) {
for _, ca := range claimAllocations {
allocationResult, err := c.allocateOne(ctx, ca.Claim, ca.ClaimParameters, ca.Class, ca.ClassParameters, selectedNode)
if err != nil {
ca.Error = fmt.Errorf("failed allocating claim %v", ca.Claim.UID)
continue
}
ca.Allocation = allocationResult
}
return c.allocate(ctx, claim, claimParameters, class, classParameters, selectedNode)
}
// allocate simply copies parameters as JSON map into a ResourceHandle.
func (c *ExampleController) allocate(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha2.AllocationResult, err error) {
func (c *ExampleController) allocateOne(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha2.AllocationResult, err error) {
logger := klog.LoggerWithValues(klog.LoggerWithName(klog.FromContext(ctx), "Allocate"), "claim", klog.KObj(claim), "uid", claim.UID)
defer func() {
logger.V(3).Info("done", "result", result, "err", err)