diff --git a/staging/src/k8s.io/dynamic-resource-allocation/controller/controller.go b/staging/src/k8s.io/dynamic-resource-allocation/controller/controller.go index b0367871639..20bc825815f 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/controller/controller.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/controller/controller.go @@ -68,8 +68,8 @@ type Driver interface { // The caller will wrap the error to include the parameter reference. GetClaimParameters(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, class *resourcev1alpha2.ResourceClass, classParameters interface{}) (interface{}, error) - // Allocate gets called when a ResourceClaim is ready to be allocated. - // The selectedNode is empty for ResourceClaims with immediate + // Allocate gets called when all same-driver ResourceClaims for Pod are ready + // to be allocated. The selectedNode is empty for ResourceClaims with immediate // allocation, in which case the resource driver decides itself where // to allocate. If there is already an on-going allocation, the driver // may finish it and ignore the new parameters or abort the on-going @@ -77,15 +77,21 @@ type Driver interface { // // Parameters have been retrieved earlier. // + // Driver must set the result of allocation for every claim in "claims" + // parameter items. In case if there was no error encountered and allocation + // was successful - claims[i].Allocation field should be set. In case of + // particular claim allocation fail - respective item's claims[i].Error field + // should be set, in this case claims[i].Allocation will be ignored. + // // If selectedNode is set, the driver must attempt to allocate for that // node. If that is not possible, it must return an error. The // controller will call UnsuitableNodes and pass the new information to // the scheduler, which then will lead to selecting a diffent node // if the current one is not suitable. // - // The objects are read-only and must not be modified. This call - // must be idempotent. - Allocate(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (*resourcev1alpha2.AllocationResult, error) + // The Claim, ClaimParameters, Class, ClassParameters fields of "claims" parameter + // items are read-only and must not be modified. This call must be idempotent. + Allocate(ctx context.Context, claims []*ClaimAllocation, selectedNode string) // Deallocate gets called when a ResourceClaim is ready to be // freed. @@ -125,6 +131,13 @@ type ClaimAllocation struct { // UnsuitableNodes needs to be filled in by the driver when // Driver.UnsuitableNodes gets called. UnsuitableNodes []string + + // Driver must populate this field with resources that were + // allocated for the claim in case of successful allocation. + Allocation *resourcev1alpha2.AllocationResult + // In case of error allocating particular claim, driver must + // populate this field. + Error error } type controller struct { @@ -500,7 +513,20 @@ func (ctrl *controller) syncClaim(ctx context.Context, claim *resourcev1alpha2.R return err } - return ctrl.allocateClaim(ctx, claim, claimParameters, class, classParameters, "", nil) + claimAllocations := claimAllocations{&ClaimAllocation{ + Claim: claim, + ClaimParameters: claimParameters, + Class: class, + ClassParameters: classParameters, + }} + + ctrl.allocateClaims(ctx, claimAllocations, "", nil) + + if claimAllocations[0].Error != nil { + return fmt.Errorf("allocate: %v", claimAllocations[0].Error) + } + + return nil } func (ctrl *controller) getParameters(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, class *resourcev1alpha2.ResourceClass) (claimParameters, classParameters interface{}, err error) { @@ -517,51 +543,85 @@ func (ctrl *controller) getParameters(ctx context.Context, claim *resourcev1alph return } -func (ctrl *controller) allocateClaim(ctx context.Context, - claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, - class *resourcev1alpha2.ResourceClass, classParameters interface{}, - selectedNode string, - selectedUser *resourcev1alpha2.ResourceClaimConsumerReference) error { +// allocateClaims filters list of claims, keeps those needing allocation and asks driver to do the allocations. +// Driver is supposed to write the AllocationResult and Error field into argument claims slice. +func (ctrl *controller) allocateClaims(ctx context.Context, claims []*ClaimAllocation, selectedNode string, selectedUser *resourcev1alpha2.ResourceClaimConsumerReference) { logger := klog.FromContext(ctx) - if claim.Status.Allocation != nil { - // This can happen when two PodSchedulingContext objects trigger - // allocation attempts (first one wins) or when we see the - // update of the PodSchedulingContext object. - logger.V(5).Info("Claim already allocated, nothing to do") - return nil + needAllocation := make([]*ClaimAllocation, 0, len(claims)) + for _, claim := range claims { + if claim.Claim.Status.Allocation != nil { + // This can happen when two PodSchedulingContext objects trigger + // allocation attempts (first one wins) or when we see the + // update of the PodSchedulingContext object. + logger.V(5).Info("Claim is already allocated, skipping allocation", "claim", claim.PodClaimName) + continue + } + needAllocation = append(needAllocation, claim) } - claim = claim.DeepCopy() - if !ctrl.hasFinalizer(claim) { - // Set finalizer before doing anything. We continue with the updated claim. - logger.V(5).Info("Adding finalizer") - claim.Finalizers = append(claim.Finalizers, ctrl.finalizer) - var err error - claim, err = ctrl.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}) - if err != nil { - return fmt.Errorf("add finalizer: %v", err) - } - ctrl.claimCache.Mutation(claim) + if len(needAllocation) == 0 { + logger.V(5).Info("No claims need allocation, nothing to do") + return } + // Keep separately claims that succeeded adding finalizers, + // they will be sent for Allocate to the driver. + claimsWithFinalizers := make([]*ClaimAllocation, 0, len(needAllocation)) + for _, claimAllocation := range needAllocation { + if !ctrl.hasFinalizer(claimAllocation.Claim) { + claim := claimAllocation.Claim.DeepCopy() + // Set finalizer before doing anything. We continue with the updated claim. + logger.V(5).Info("Adding finalizer", "claim", claim.Name) + claim.Finalizers = append(claim.Finalizers, ctrl.finalizer) + var err error + claim, err = ctrl.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}) + if err != nil { + logger.Error(err, "add finalizer", "claim", claim.Name) + claimAllocation.Error = fmt.Errorf("add finalizer: %v", err) + // Do not save claim to ask for Allocate from Driver. + continue + } + ctrl.claimCache.Mutation(claim) + claimAllocation.Claim = claim + } + claimsWithFinalizers = append(claimsWithFinalizers, claimAllocation) + } + + // Beyond here we only operate with claimsWithFinalizers because those are ready for allocation. + logger.V(5).Info("Allocating") - allocation, err := ctrl.driver.Allocate(ctx, claim, claimParameters, class, classParameters, selectedNode) - if err != nil { - return fmt.Errorf("allocate: %v", err) + ctrl.driver.Allocate(ctx, claimsWithFinalizers, selectedNode) + + // Update successfully allocated claims' status with allocation info. + for _, claimAllocation := range claimsWithFinalizers { + if claimAllocation.Error != nil { + logger.Error(claimAllocation.Error, "allocating claim", "claim", claimAllocation.Claim.Name) + continue + } + if claimAllocation.Allocation == nil { + logger.Error(nil, "allocating claim: missing allocation from driver", "claim", claimAllocation.Claim.Name) + claimAllocation.Error = fmt.Errorf("allocating claim: missing allocation from driver") + // Do not update this claim with allocation, it might succeed next time. + continue + } + logger.V(5).Info("successfully allocated", "claim", klog.KObj(claimAllocation.Claim)) + claim := claimAllocation.Claim.DeepCopy() + claim.Status.Allocation = claimAllocation.Allocation + claim.Status.DriverName = ctrl.name + if selectedUser != nil { + claim.Status.ReservedFor = append(claim.Status.ReservedFor, *selectedUser) + } + logger.V(6).Info("Updating claim after allocation", "claim", claim) + claim, err := ctrl.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}) + if err != nil { + claimAllocation.Error = fmt.Errorf("add allocation: %v", err) + continue + } + + ctrl.claimCache.Mutation(claim) } - claim.Status.Allocation = allocation - claim.Status.DriverName = ctrl.name - if selectedUser != nil { - claim.Status.ReservedFor = append(claim.Status.ReservedFor, *selectedUser) - } - logger.V(6).Info("Updating claim after allocation", "claim", claim) - claim, err = ctrl.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}) - if err != nil { - return fmt.Errorf("add allocation: %v", err) - } - ctrl.claimCache.Mutation(claim) - return nil + return } func (ctrl *controller) checkPodClaim(ctx context.Context, pod *v1.Pod, podClaim v1.PodResourceClaim) (*ClaimAllocation, error) { @@ -690,11 +750,20 @@ func (ctrl *controller) syncPodSchedulingContexts(ctx context.Context, schedulin Name: pod.Name, UID: pod.UID, } + + ctrl.allocateClaims(ctx, claims, selectedNode, selectedUser) + + allErrorsStr := "allocation of one or more pod claims failed." + allocationFailed := false for _, delayed := range claims { - if err := ctrl.allocateClaim(ctx, delayed.Claim, delayed.ClaimParameters, delayed.Class, delayed.ClassParameters, selectedNode, selectedUser); err != nil { - return fmt.Errorf("allocation of pod claim %s failed: %v", delayed.PodClaimName, err) + if delayed.Error != nil { + allErrorsStr = fmt.Sprintf("%s Claim %s: %s.", allErrorsStr, delayed.Claim.Name, delayed.Error) + allocationFailed = true } } + if allocationFailed { + return fmt.Errorf(allErrorsStr) + } } } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/controller/controller_test.go b/staging/src/k8s.io/dynamic-resource-allocation/controller/controller_test.go index 507429d2720..cfffc610fa8 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/controller/controller_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/controller/controller_test.go @@ -533,14 +533,19 @@ func (m mockDriver) GetClaimParameters(ctx context.Context, claim *resourcev1alp return result, nil } -func (m mockDriver) Allocate(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (*resourcev1alpha2.AllocationResult, error) { - m.t.Logf("Allocate(%s)", claim) - allocate, ok := m.allocate[claim.Name] - if !ok { - m.t.Fatal("unexpected Allocate call") +func (m mockDriver) Allocate(ctx context.Context, claims []*ClaimAllocation, selectedNode string) { + m.t.Logf("Allocate(number of claims %d)", len(claims)) + for _, claimAllocation := range claims { + m.t.Logf("Allocate(%s)", claimAllocation.Claim.Name) + allocate, ok := m.allocate[claimAllocation.Claim.Name] + if !ok { + m.t.Fatalf("unexpected Allocate call for claim %s", claimAllocation.Claim.Name) + } + assert.Equal(m.t, allocate.selectedNode, selectedNode, "selected node") + claimAllocation.Error = allocate.allocErr + claimAllocation.Allocation = allocate.allocResult } - assert.Equal(m.t, allocate.selectedNode, selectedNode, "selected node") - return allocate.allocResult, allocate.allocErr + return } func (m mockDriver) Deallocate(ctx context.Context, claim *resourcev1alpha2.ResourceClaim) error { diff --git a/test/e2e/dra/README.md b/test/e2e/dra/README.md index 0b1739709ad..e22110755d4 100644 --- a/test/e2e/dra/README.md +++ b/test/e2e/dra/README.md @@ -41,10 +41,10 @@ $ kind create cluster --config test/e2e/dra/kind.yaml --image dra/node:latest - Build ginkgo -> NB: If you are using go workspace you must disable it `GOWORK=off make gingko` +> NB: If you are using go workspace you must disable it `GOWORK=off make ginkgo` ```bash -$ make gingko +$ make ginkgo ``` - Run e2e tests for the `Dynamic Resource Allocation` feature: diff --git a/test/e2e/dra/dra.go b/test/e2e/dra/dra.go index 59dc3376da3..e678fc6b02e 100644 --- a/test/e2e/dra/dra.go +++ b/test/e2e/dra/dra.go @@ -20,8 +20,6 @@ import ( "context" "errors" "fmt" - "strings" - "sync/atomic" "time" "github.com/onsi/ginkgo/v2" @@ -32,6 +30,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/dynamic-resource-allocation/controller" "k8s.io/klog/v2" "k8s.io/kubernetes/test/e2e/dra/test-driver/app" "k8s.io/kubernetes/test/e2e/framework" @@ -344,69 +343,102 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu }) ginkgo.Context("reallocation", func() { - var allocateWrapper app.AllocateWrapperType + var allocateWrapper2 app.AllocateWrapperType driver := NewDriver(f, nodes, func() app.Resources { + return app.Resources{ + NodeLocal: true, + MaxAllocations: 2, + Nodes: nodes.NodeNames, + } + }) + driver2 := NewDriver(f, nodes, func() app.Resources { return app.Resources{ NodeLocal: true, MaxAllocations: 2, Nodes: nodes.NodeNames, - AllocateWrapper: func(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string, - handler func(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha2.AllocationResult, err error)) (result *resourcev1alpha2.AllocationResult, err error) { - return allocateWrapper(ctx, claim, claimParameters, class, classParameters, selectedNode, handler) + AllocateWrapper: func( + ctx context.Context, + claimAllocations []*controller.ClaimAllocation, + selectedNode string, + handler func( + ctx context.Context, + claimAllocations []*controller.ClaimAllocation, + selectedNode string), + ) { + allocateWrapper2(ctx, claimAllocations, selectedNode, handler) + return }, } }) + driver2.NameSuffix = "-other" + b := newBuilder(f, driver) + b2 := newBuilder(f, driver2) ginkgo.It("works", func(ctx context.Context) { - // A pod with two claims can run on a node, but - // only if allocation of both succeeds. This - // tests simulates the scenario where one claim - // gets allocated but the second doesn't - // because of a race with some other pod. + // A pod with multiple claims can run on a node, but + // only if allocation of all succeeds. This + // test simulates the scenario where one claim + // gets allocated from one driver, but the claims + // from second driver fail allocation because of a + // race with some other pod. // - // To ensure the right timing, allocation of the second - // claim gets delayed while creating another pod - // that gets the remaining resource on the node. + // To ensure the right timing, allocation of the + // claims from second driver are delayed while + // creating another pod that gets the remaining + // resource on the node from second driver. ctx, cancel := context.WithCancel(ctx) defer cancel() - parameters := b.parameters() - claim1 := b.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer) - claim2 := b.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer) + parameters1 := b.parameters() + parameters2 := b2.parameters() pod1 := b.podExternal() + pod2 := b2.podExternal() + pod1claim1 := b.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer) + pod2claim1 := b2.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer) + pod1claim2 := b2.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer) + pod1claim3 := b2.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer) + pod1.Spec.ResourceClaims = append(pod1.Spec.ResourceClaims, v1.PodResourceClaim{ - Name: "claim2", + Name: "claim-other1", Source: v1.ClaimSource{ - ResourceClaimName: &claim2.Name, + ResourceClaimName: &pod1claim2.Name, + }, + }, + v1.PodResourceClaim{ + Name: "claim-other2", + Source: v1.ClaimSource{ + ResourceClaimName: &pod1claim3.Name, }, }, ) - // Block on the second external claim that is to be allocated. + // Block on the second, third external claim from driver2 that is to be allocated. blockClaim, cancelBlockClaim := context.WithCancel(ctx) defer cancelBlockClaim() - var allocated int32 - allocateWrapper = func(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, - class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string, - handler func(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, - class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha2.AllocationResult, err error), - ) (result *resourcev1alpha2.AllocationResult, err error) { - oldAllocated := atomic.AddInt32(&allocated, 0) - if oldAllocated == 1 && strings.HasPrefix(claim.Name, "external-claim") { + + b2.create(ctx, parameters2, pod1claim2, pod1claim3) + b.create(ctx, parameters1, pod1claim1, pod1) + + allocateWrapper2 = func(ctx context.Context, + claimAllocations []*controller.ClaimAllocation, + selectedNode string, + handler func(ctx context.Context, + claimAllocations []*controller.ClaimAllocation, + selectedNode string), + ) { + // pod1 will have only external-claim[2-3], it has to wait + // for pod2 to consume resources from driver2 + if claimAllocations[0].Claim.Name != "external-claim-other" { <-blockClaim.Done() } - result, err = handler(ctx, claim, claimParameters, class, classParameters, selectedNode) - if err == nil { - atomic.AddInt32(&allocated, 1) - } + handler(ctx, claimAllocations, selectedNode) return } - b.create(ctx, parameters, claim1, claim2, pod1) - ginkgo.By("waiting for one claim to be allocated") + ginkgo.By("waiting for one claim from driver1 to be allocated") var nodeSelector *v1.NodeSelector gomega.Eventually(ctx, func(ctx context.Context) (int, error) { claims, err := f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).List(ctx, metav1.ListOptions{}) @@ -429,18 +461,20 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu // the node selector looks like and can // directly access the key and value from it. ginkgo.By(fmt.Sprintf("create second pod on the same node %s", nodeSelector)) - pod2, template2 := b.podInline(resourcev1alpha2.AllocationModeWaitForFirstConsumer) + req := nodeSelector.NodeSelectorTerms[0].MatchExpressions[0] node := req.Values[0] pod2.Spec.NodeSelector = map[string]string{req.Key: node} - b.create(ctx, pod2, template2) + + b2.create(ctx, pod2claim1, pod2) framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod2), "start pod 2") - // Allow allocation of claim2 to proceed. It should fail now + // Allow allocation of pod1 claim2, claim3 to proceed. It should fail now // and the other node must be used instead, after deallocating // the first claim. ginkgo.By("move first pod to other node") cancelBlockClaim() + framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod1), "start pod 1") pod1, err := f.ClientSet.CoreV1().Pods(pod1.Namespace).Get(ctx, pod1.Name, metav1.GetOptions{}) framework.ExpectNoError(err, "get first pod") @@ -463,6 +497,7 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu } }) b1 := newBuilder(f, driver1) + driver2 := NewDriver(f, nodes, func() app.Resources { return app.Resources{ NodeLocal: true, diff --git a/test/e2e/dra/test-driver/app/controller.go b/test/e2e/dra/test-driver/app/controller.go index 9760c219d43..ef099687929 100644 --- a/test/e2e/dra/test-driver/app/controller.go +++ b/test/e2e/dra/test-driver/app/controller.go @@ -47,11 +47,12 @@ type Resources struct { AllocateWrapper AllocateWrapperType } -type AllocateWrapperType func(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, - class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string, - handler func(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, - class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha2.AllocationResult, err error), -) (result *resourcev1alpha2.AllocationResult, err error) +type AllocateWrapperType func(ctx context.Context, claimAllocations []*controller.ClaimAllocation, + selectedNode string, + handler func(ctx context.Context, + claimAllocations []*controller.ClaimAllocation, + selectedNode string), +) type ExampleController struct { clientset kubernetes.Interface @@ -152,15 +153,30 @@ func (c *ExampleController) readParametersFromConfigMap(ctx context.Context, nam return configMap.Data, nil } -func (c *ExampleController) Allocate(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha2.AllocationResult, err error) { +func (c *ExampleController) Allocate(ctx context.Context, claimAllocations []*controller.ClaimAllocation, selectedNode string) { + if c.resources.AllocateWrapper != nil { - return c.resources.AllocateWrapper(ctx, claim, claimParameters, class, classParameters, selectedNode, c.allocate) + c.resources.AllocateWrapper(ctx, claimAllocations, selectedNode, c.allocateOneByOne) + } else { + c.allocateOneByOne(ctx, claimAllocations, selectedNode) + } + + return +} + +func (c *ExampleController) allocateOneByOne(ctx context.Context, claimAllocations []*controller.ClaimAllocation, selectedNode string) { + for _, ca := range claimAllocations { + allocationResult, err := c.allocateOne(ctx, ca.Claim, ca.ClaimParameters, ca.Class, ca.ClassParameters, selectedNode) + if err != nil { + ca.Error = fmt.Errorf("failed allocating claim %v", ca.Claim.UID) + continue + } + ca.Allocation = allocationResult } - return c.allocate(ctx, claim, claimParameters, class, classParameters, selectedNode) } // allocate simply copies parameters as JSON map into a ResourceHandle. -func (c *ExampleController) allocate(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha2.AllocationResult, err error) { +func (c *ExampleController) allocateOne(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha2.AllocationResult, err error) { logger := klog.LoggerWithValues(klog.LoggerWithName(klog.FromContext(ctx), "Allocate"), "claim", klog.KObj(claim), "uid", claim.UID) defer func() { logger.V(3).Info("done", "result", result, "err", err)