DRA controller: batch resource claims for Allocate

Signed-off-by: Alexey Fomenko <alexey.fomenko@intel.com>
Co-authored-by: Patrick Ohly <patrick.ohly@intel.com>
This commit is contained in:
Alexey Fomenko 2023-06-08 10:52:44 +03:00
parent 027ac5a426
commit b10cc642b5
No known key found for this signature in database
GPG Key ID: 6E69CDBE939C35EE
5 changed files with 225 additions and 100 deletions

View File

@ -68,8 +68,8 @@ type Driver interface {
// The caller will wrap the error to include the parameter reference. // The caller will wrap the error to include the parameter reference.
GetClaimParameters(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, class *resourcev1alpha2.ResourceClass, classParameters interface{}) (interface{}, error) GetClaimParameters(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, class *resourcev1alpha2.ResourceClass, classParameters interface{}) (interface{}, error)
// Allocate gets called when a ResourceClaim is ready to be allocated. // Allocate gets called when all same-driver ResourceClaims for Pod are ready
// The selectedNode is empty for ResourceClaims with immediate // to be allocated. The selectedNode is empty for ResourceClaims with immediate
// allocation, in which case the resource driver decides itself where // allocation, in which case the resource driver decides itself where
// to allocate. If there is already an on-going allocation, the driver // to allocate. If there is already an on-going allocation, the driver
// may finish it and ignore the new parameters or abort the on-going // may finish it and ignore the new parameters or abort the on-going
@ -77,15 +77,21 @@ type Driver interface {
// //
// Parameters have been retrieved earlier. // Parameters have been retrieved earlier.
// //
// Driver must set the result of allocation for every claim in "claims"
// parameter items. In case if there was no error encountered and allocation
// was successful - claims[i].Allocation field should be set. In case of
// particular claim allocation fail - respective item's claims[i].Error field
// should be set, in this case claims[i].Allocation will be ignored.
//
// If selectedNode is set, the driver must attempt to allocate for that // If selectedNode is set, the driver must attempt to allocate for that
// node. If that is not possible, it must return an error. The // node. If that is not possible, it must return an error. The
// controller will call UnsuitableNodes and pass the new information to // controller will call UnsuitableNodes and pass the new information to
// the scheduler, which then will lead to selecting a diffent node // the scheduler, which then will lead to selecting a diffent node
// if the current one is not suitable. // if the current one is not suitable.
// //
// The objects are read-only and must not be modified. This call // The Claim, ClaimParameters, Class, ClassParameters fields of "claims" parameter
// must be idempotent. // items are read-only and must not be modified. This call must be idempotent.
Allocate(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (*resourcev1alpha2.AllocationResult, error) Allocate(ctx context.Context, claims []*ClaimAllocation, selectedNode string)
// Deallocate gets called when a ResourceClaim is ready to be // Deallocate gets called when a ResourceClaim is ready to be
// freed. // freed.
@ -125,6 +131,13 @@ type ClaimAllocation struct {
// UnsuitableNodes needs to be filled in by the driver when // UnsuitableNodes needs to be filled in by the driver when
// Driver.UnsuitableNodes gets called. // Driver.UnsuitableNodes gets called.
UnsuitableNodes []string UnsuitableNodes []string
// Driver must populate this field with resources that were
// allocated for the claim in case of successful allocation.
Allocation *resourcev1alpha2.AllocationResult
// In case of error allocating particular claim, driver must
// populate this field.
Error error
} }
type controller struct { type controller struct {
@ -500,7 +513,20 @@ func (ctrl *controller) syncClaim(ctx context.Context, claim *resourcev1alpha2.R
return err return err
} }
return ctrl.allocateClaim(ctx, claim, claimParameters, class, classParameters, "", nil) claimAllocations := claimAllocations{&ClaimAllocation{
Claim: claim,
ClaimParameters: claimParameters,
Class: class,
ClassParameters: classParameters,
}}
ctrl.allocateClaims(ctx, claimAllocations, "", nil)
if claimAllocations[0].Error != nil {
return fmt.Errorf("allocate: %v", claimAllocations[0].Error)
}
return nil
} }
func (ctrl *controller) getParameters(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, class *resourcev1alpha2.ResourceClass) (claimParameters, classParameters interface{}, err error) { func (ctrl *controller) getParameters(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, class *resourcev1alpha2.ResourceClass) (claimParameters, classParameters interface{}, err error) {
@ -517,51 +543,85 @@ func (ctrl *controller) getParameters(ctx context.Context, claim *resourcev1alph
return return
} }
func (ctrl *controller) allocateClaim(ctx context.Context, // allocateClaims filters list of claims, keeps those needing allocation and asks driver to do the allocations.
claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, // Driver is supposed to write the AllocationResult and Error field into argument claims slice.
class *resourcev1alpha2.ResourceClass, classParameters interface{}, func (ctrl *controller) allocateClaims(ctx context.Context, claims []*ClaimAllocation, selectedNode string, selectedUser *resourcev1alpha2.ResourceClaimConsumerReference) {
selectedNode string,
selectedUser *resourcev1alpha2.ResourceClaimConsumerReference) error {
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
if claim.Status.Allocation != nil { needAllocation := make([]*ClaimAllocation, 0, len(claims))
for _, claim := range claims {
if claim.Claim.Status.Allocation != nil {
// This can happen when two PodSchedulingContext objects trigger // This can happen when two PodSchedulingContext objects trigger
// allocation attempts (first one wins) or when we see the // allocation attempts (first one wins) or when we see the
// update of the PodSchedulingContext object. // update of the PodSchedulingContext object.
logger.V(5).Info("Claim already allocated, nothing to do") logger.V(5).Info("Claim is already allocated, skipping allocation", "claim", claim.PodClaimName)
return nil continue
}
needAllocation = append(needAllocation, claim)
} }
claim = claim.DeepCopy() if len(needAllocation) == 0 {
if !ctrl.hasFinalizer(claim) { logger.V(5).Info("No claims need allocation, nothing to do")
return
}
// Keep separately claims that succeeded adding finalizers,
// they will be sent for Allocate to the driver.
claimsWithFinalizers := make([]*ClaimAllocation, 0, len(needAllocation))
for _, claimAllocation := range needAllocation {
if !ctrl.hasFinalizer(claimAllocation.Claim) {
claim := claimAllocation.Claim.DeepCopy()
// Set finalizer before doing anything. We continue with the updated claim. // Set finalizer before doing anything. We continue with the updated claim.
logger.V(5).Info("Adding finalizer") logger.V(5).Info("Adding finalizer", "claim", claim.Name)
claim.Finalizers = append(claim.Finalizers, ctrl.finalizer) claim.Finalizers = append(claim.Finalizers, ctrl.finalizer)
var err error var err error
claim, err = ctrl.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}) claim, err = ctrl.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{})
if err != nil { if err != nil {
return fmt.Errorf("add finalizer: %v", err) logger.Error(err, "add finalizer", "claim", claim.Name)
claimAllocation.Error = fmt.Errorf("add finalizer: %v", err)
// Do not save claim to ask for Allocate from Driver.
continue
} }
ctrl.claimCache.Mutation(claim) ctrl.claimCache.Mutation(claim)
claimAllocation.Claim = claim
}
claimsWithFinalizers = append(claimsWithFinalizers, claimAllocation)
} }
// Beyond here we only operate with claimsWithFinalizers because those are ready for allocation.
logger.V(5).Info("Allocating") logger.V(5).Info("Allocating")
allocation, err := ctrl.driver.Allocate(ctx, claim, claimParameters, class, classParameters, selectedNode) ctrl.driver.Allocate(ctx, claimsWithFinalizers, selectedNode)
if err != nil {
return fmt.Errorf("allocate: %v", err) // Update successfully allocated claims' status with allocation info.
for _, claimAllocation := range claimsWithFinalizers {
if claimAllocation.Error != nil {
logger.Error(claimAllocation.Error, "allocating claim", "claim", claimAllocation.Claim.Name)
continue
} }
claim.Status.Allocation = allocation if claimAllocation.Allocation == nil {
logger.Error(nil, "allocating claim: missing allocation from driver", "claim", claimAllocation.Claim.Name)
claimAllocation.Error = fmt.Errorf("allocating claim: missing allocation from driver")
// Do not update this claim with allocation, it might succeed next time.
continue
}
logger.V(5).Info("successfully allocated", "claim", klog.KObj(claimAllocation.Claim))
claim := claimAllocation.Claim.DeepCopy()
claim.Status.Allocation = claimAllocation.Allocation
claim.Status.DriverName = ctrl.name claim.Status.DriverName = ctrl.name
if selectedUser != nil { if selectedUser != nil {
claim.Status.ReservedFor = append(claim.Status.ReservedFor, *selectedUser) claim.Status.ReservedFor = append(claim.Status.ReservedFor, *selectedUser)
} }
logger.V(6).Info("Updating claim after allocation", "claim", claim) logger.V(6).Info("Updating claim after allocation", "claim", claim)
claim, err = ctrl.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}) claim, err := ctrl.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
if err != nil { if err != nil {
return fmt.Errorf("add allocation: %v", err) claimAllocation.Error = fmt.Errorf("add allocation: %v", err)
continue
} }
ctrl.claimCache.Mutation(claim) ctrl.claimCache.Mutation(claim)
return nil }
return
} }
func (ctrl *controller) checkPodClaim(ctx context.Context, pod *v1.Pod, podClaim v1.PodResourceClaim) (*ClaimAllocation, error) { func (ctrl *controller) checkPodClaim(ctx context.Context, pod *v1.Pod, podClaim v1.PodResourceClaim) (*ClaimAllocation, error) {
@ -690,11 +750,20 @@ func (ctrl *controller) syncPodSchedulingContexts(ctx context.Context, schedulin
Name: pod.Name, Name: pod.Name,
UID: pod.UID, UID: pod.UID,
} }
ctrl.allocateClaims(ctx, claims, selectedNode, selectedUser)
allErrorsStr := "allocation of one or more pod claims failed."
allocationFailed := false
for _, delayed := range claims { for _, delayed := range claims {
if err := ctrl.allocateClaim(ctx, delayed.Claim, delayed.ClaimParameters, delayed.Class, delayed.ClassParameters, selectedNode, selectedUser); err != nil { if delayed.Error != nil {
return fmt.Errorf("allocation of pod claim %s failed: %v", delayed.PodClaimName, err) allErrorsStr = fmt.Sprintf("%s Claim %s: %s.", allErrorsStr, delayed.Claim.Name, delayed.Error)
allocationFailed = true
} }
} }
if allocationFailed {
return fmt.Errorf(allErrorsStr)
}
} }
} }

View File

@ -533,14 +533,19 @@ func (m mockDriver) GetClaimParameters(ctx context.Context, claim *resourcev1alp
return result, nil return result, nil
} }
func (m mockDriver) Allocate(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (*resourcev1alpha2.AllocationResult, error) { func (m mockDriver) Allocate(ctx context.Context, claims []*ClaimAllocation, selectedNode string) {
m.t.Logf("Allocate(%s)", claim) m.t.Logf("Allocate(number of claims %d)", len(claims))
allocate, ok := m.allocate[claim.Name] for _, claimAllocation := range claims {
m.t.Logf("Allocate(%s)", claimAllocation.Claim.Name)
allocate, ok := m.allocate[claimAllocation.Claim.Name]
if !ok { if !ok {
m.t.Fatal("unexpected Allocate call") m.t.Fatalf("unexpected Allocate call for claim %s", claimAllocation.Claim.Name)
} }
assert.Equal(m.t, allocate.selectedNode, selectedNode, "selected node") assert.Equal(m.t, allocate.selectedNode, selectedNode, "selected node")
return allocate.allocResult, allocate.allocErr claimAllocation.Error = allocate.allocErr
claimAllocation.Allocation = allocate.allocResult
}
return
} }
func (m mockDriver) Deallocate(ctx context.Context, claim *resourcev1alpha2.ResourceClaim) error { func (m mockDriver) Deallocate(ctx context.Context, claim *resourcev1alpha2.ResourceClaim) error {

View File

@ -41,10 +41,10 @@ $ kind create cluster --config test/e2e/dra/kind.yaml --image dra/node:latest
- Build ginkgo - Build ginkgo
> NB: If you are using go workspace you must disable it `GOWORK=off make gingko` > NB: If you are using go workspace you must disable it `GOWORK=off make ginkgo`
```bash ```bash
$ make gingko $ make ginkgo
``` ```
- Run e2e tests for the `Dynamic Resource Allocation` feature: - Run e2e tests for the `Dynamic Resource Allocation` feature:

View File

@ -20,8 +20,6 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"strings"
"sync/atomic"
"time" "time"
"github.com/onsi/ginkgo/v2" "github.com/onsi/ginkgo/v2"
@ -32,6 +30,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/dynamic-resource-allocation/controller"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/test/e2e/dra/test-driver/app" "k8s.io/kubernetes/test/e2e/dra/test-driver/app"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
@ -344,69 +343,102 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu
}) })
ginkgo.Context("reallocation", func() { ginkgo.Context("reallocation", func() {
var allocateWrapper app.AllocateWrapperType var allocateWrapper2 app.AllocateWrapperType
driver := NewDriver(f, nodes, func() app.Resources { driver := NewDriver(f, nodes, func() app.Resources {
return app.Resources{
NodeLocal: true,
MaxAllocations: 2,
Nodes: nodes.NodeNames,
}
})
driver2 := NewDriver(f, nodes, func() app.Resources {
return app.Resources{ return app.Resources{
NodeLocal: true, NodeLocal: true,
MaxAllocations: 2, MaxAllocations: 2,
Nodes: nodes.NodeNames, Nodes: nodes.NodeNames,
AllocateWrapper: func(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string, AllocateWrapper: func(
handler func(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha2.AllocationResult, err error)) (result *resourcev1alpha2.AllocationResult, err error) { ctx context.Context,
return allocateWrapper(ctx, claim, claimParameters, class, classParameters, selectedNode, handler) claimAllocations []*controller.ClaimAllocation,
selectedNode string,
handler func(
ctx context.Context,
claimAllocations []*controller.ClaimAllocation,
selectedNode string),
) {
allocateWrapper2(ctx, claimAllocations, selectedNode, handler)
return
}, },
} }
}) })
driver2.NameSuffix = "-other"
b := newBuilder(f, driver) b := newBuilder(f, driver)
b2 := newBuilder(f, driver2)
ginkgo.It("works", func(ctx context.Context) { ginkgo.It("works", func(ctx context.Context) {
// A pod with two claims can run on a node, but // A pod with multiple claims can run on a node, but
// only if allocation of both succeeds. This // only if allocation of all succeeds. This
// tests simulates the scenario where one claim // test simulates the scenario where one claim
// gets allocated but the second doesn't // gets allocated from one driver, but the claims
// because of a race with some other pod. // from second driver fail allocation because of a
// race with some other pod.
// //
// To ensure the right timing, allocation of the second // To ensure the right timing, allocation of the
// claim gets delayed while creating another pod // claims from second driver are delayed while
// that gets the remaining resource on the node. // creating another pod that gets the remaining
// resource on the node from second driver.
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
parameters := b.parameters() parameters1 := b.parameters()
claim1 := b.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer) parameters2 := b2.parameters()
claim2 := b.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
pod1 := b.podExternal() pod1 := b.podExternal()
pod2 := b2.podExternal()
pod1claim1 := b.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
pod2claim1 := b2.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
pod1claim2 := b2.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
pod1claim3 := b2.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
pod1.Spec.ResourceClaims = append(pod1.Spec.ResourceClaims, pod1.Spec.ResourceClaims = append(pod1.Spec.ResourceClaims,
v1.PodResourceClaim{ v1.PodResourceClaim{
Name: "claim2", Name: "claim-other1",
Source: v1.ClaimSource{ Source: v1.ClaimSource{
ResourceClaimName: &claim2.Name, ResourceClaimName: &pod1claim2.Name,
},
},
v1.PodResourceClaim{
Name: "claim-other2",
Source: v1.ClaimSource{
ResourceClaimName: &pod1claim3.Name,
}, },
}, },
) )
// Block on the second external claim that is to be allocated. // Block on the second, third external claim from driver2 that is to be allocated.
blockClaim, cancelBlockClaim := context.WithCancel(ctx) blockClaim, cancelBlockClaim := context.WithCancel(ctx)
defer cancelBlockClaim() defer cancelBlockClaim()
var allocated int32
allocateWrapper = func(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, b2.create(ctx, parameters2, pod1claim2, pod1claim3)
class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string, b.create(ctx, parameters1, pod1claim1, pod1)
handler func(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{},
class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha2.AllocationResult, err error), allocateWrapper2 = func(ctx context.Context,
) (result *resourcev1alpha2.AllocationResult, err error) { claimAllocations []*controller.ClaimAllocation,
oldAllocated := atomic.AddInt32(&allocated, 0) selectedNode string,
if oldAllocated == 1 && strings.HasPrefix(claim.Name, "external-claim") { handler func(ctx context.Context,
claimAllocations []*controller.ClaimAllocation,
selectedNode string),
) {
// pod1 will have only external-claim[2-3], it has to wait
// for pod2 to consume resources from driver2
if claimAllocations[0].Claim.Name != "external-claim-other" {
<-blockClaim.Done() <-blockClaim.Done()
} }
result, err = handler(ctx, claim, claimParameters, class, classParameters, selectedNode) handler(ctx, claimAllocations, selectedNode)
if err == nil {
atomic.AddInt32(&allocated, 1)
}
return return
} }
b.create(ctx, parameters, claim1, claim2, pod1)
ginkgo.By("waiting for one claim to be allocated") ginkgo.By("waiting for one claim from driver1 to be allocated")
var nodeSelector *v1.NodeSelector var nodeSelector *v1.NodeSelector
gomega.Eventually(ctx, func(ctx context.Context) (int, error) { gomega.Eventually(ctx, func(ctx context.Context) (int, error) {
claims, err := f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).List(ctx, metav1.ListOptions{}) claims, err := f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).List(ctx, metav1.ListOptions{})
@ -429,18 +461,20 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu
// the node selector looks like and can // the node selector looks like and can
// directly access the key and value from it. // directly access the key and value from it.
ginkgo.By(fmt.Sprintf("create second pod on the same node %s", nodeSelector)) ginkgo.By(fmt.Sprintf("create second pod on the same node %s", nodeSelector))
pod2, template2 := b.podInline(resourcev1alpha2.AllocationModeWaitForFirstConsumer)
req := nodeSelector.NodeSelectorTerms[0].MatchExpressions[0] req := nodeSelector.NodeSelectorTerms[0].MatchExpressions[0]
node := req.Values[0] node := req.Values[0]
pod2.Spec.NodeSelector = map[string]string{req.Key: node} pod2.Spec.NodeSelector = map[string]string{req.Key: node}
b.create(ctx, pod2, template2)
b2.create(ctx, pod2claim1, pod2)
framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod2), "start pod 2") framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod2), "start pod 2")
// Allow allocation of claim2 to proceed. It should fail now // Allow allocation of pod1 claim2, claim3 to proceed. It should fail now
// and the other node must be used instead, after deallocating // and the other node must be used instead, after deallocating
// the first claim. // the first claim.
ginkgo.By("move first pod to other node") ginkgo.By("move first pod to other node")
cancelBlockClaim() cancelBlockClaim()
framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod1), "start pod 1") framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod1), "start pod 1")
pod1, err := f.ClientSet.CoreV1().Pods(pod1.Namespace).Get(ctx, pod1.Name, metav1.GetOptions{}) pod1, err := f.ClientSet.CoreV1().Pods(pod1.Namespace).Get(ctx, pod1.Name, metav1.GetOptions{})
framework.ExpectNoError(err, "get first pod") framework.ExpectNoError(err, "get first pod")
@ -463,6 +497,7 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu
} }
}) })
b1 := newBuilder(f, driver1) b1 := newBuilder(f, driver1)
driver2 := NewDriver(f, nodes, func() app.Resources { driver2 := NewDriver(f, nodes, func() app.Resources {
return app.Resources{ return app.Resources{
NodeLocal: true, NodeLocal: true,

View File

@ -47,11 +47,12 @@ type Resources struct {
AllocateWrapper AllocateWrapperType AllocateWrapper AllocateWrapperType
} }
type AllocateWrapperType func(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, type AllocateWrapperType func(ctx context.Context, claimAllocations []*controller.ClaimAllocation,
class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string, selectedNode string,
handler func(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, handler func(ctx context.Context,
class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha2.AllocationResult, err error), claimAllocations []*controller.ClaimAllocation,
) (result *resourcev1alpha2.AllocationResult, err error) selectedNode string),
)
type ExampleController struct { type ExampleController struct {
clientset kubernetes.Interface clientset kubernetes.Interface
@ -152,15 +153,30 @@ func (c *ExampleController) readParametersFromConfigMap(ctx context.Context, nam
return configMap.Data, nil return configMap.Data, nil
} }
func (c *ExampleController) Allocate(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha2.AllocationResult, err error) { func (c *ExampleController) Allocate(ctx context.Context, claimAllocations []*controller.ClaimAllocation, selectedNode string) {
if c.resources.AllocateWrapper != nil { if c.resources.AllocateWrapper != nil {
return c.resources.AllocateWrapper(ctx, claim, claimParameters, class, classParameters, selectedNode, c.allocate) c.resources.AllocateWrapper(ctx, claimAllocations, selectedNode, c.allocateOneByOne)
} else {
c.allocateOneByOne(ctx, claimAllocations, selectedNode)
}
return
}
func (c *ExampleController) allocateOneByOne(ctx context.Context, claimAllocations []*controller.ClaimAllocation, selectedNode string) {
for _, ca := range claimAllocations {
allocationResult, err := c.allocateOne(ctx, ca.Claim, ca.ClaimParameters, ca.Class, ca.ClassParameters, selectedNode)
if err != nil {
ca.Error = fmt.Errorf("failed allocating claim %v", ca.Claim.UID)
continue
}
ca.Allocation = allocationResult
} }
return c.allocate(ctx, claim, claimParameters, class, classParameters, selectedNode)
} }
// allocate simply copies parameters as JSON map into a ResourceHandle. // allocate simply copies parameters as JSON map into a ResourceHandle.
func (c *ExampleController) allocate(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha2.AllocationResult, err error) { func (c *ExampleController) allocateOne(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha2.AllocationResult, err error) {
logger := klog.LoggerWithValues(klog.LoggerWithName(klog.FromContext(ctx), "Allocate"), "claim", klog.KObj(claim), "uid", claim.UID) logger := klog.LoggerWithValues(klog.LoggerWithName(klog.FromContext(ctx), "Allocate"), "claim", klog.KObj(claim), "uid", claim.UID)
defer func() { defer func() {
logger.V(3).Info("done", "result", result, "err", err) logger.V(3).Info("done", "result", result, "err", err)