From 98ba89d31d524b38cf3d6cc3e0f0d237c428b3f2 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 22 May 2023 14:44:32 +0200 Subject: [PATCH 1/5] resourceclaim controller: avoid caching deleted pod unnecessarily We don't need to remember that a pod got deleted when it had no resource claims because the code which checks the cached UIDs only checks for pods which have resource claims. --- pkg/controller/resourceclaim/controller.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/controller/resourceclaim/controller.go b/pkg/controller/resourceclaim/controller.go index 492c27f65ca..9c4e6ce3fd3 100644 --- a/pkg/controller/resourceclaim/controller.go +++ b/pkg/controller/resourceclaim/controller.go @@ -202,15 +202,15 @@ func (ec *Controller) enqueuePod(logger klog.Logger, obj interface{}, deleted bo return } - if deleted { - ec.deletedObjects.Add(pod.UID) - } - if len(pod.Spec.ResourceClaims) == 0 { // Nothing to do for it at all. return } + if deleted { + ec.deletedObjects.Add(pod.UID) + } + logger.V(6).Info("pod with resource claims changed", "pod", klog.KObj(pod), "deleted", deleted) // Release reservations of a deleted or completed pod? From 08d40f53a7acece4e617ca40b986847da8e00767 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 22 May 2023 18:20:33 +0200 Subject: [PATCH 2/5] dra: test with and without immediate ReservedFor The recommendation and default in the controller helper code is to set ReservedFor to the pod which triggered delayed allocation. However, this is neither required nor enforced. Therefore we should also test the fallback path were kube-scheduler itself adds the pod to ReservedFor. --- .../controller/controller.go | 16 ++++- test/e2e/dra/dra.go | 66 +++++++++++-------- test/e2e/dra/test-driver/app/controller.go | 10 +-- 3 files changed, 60 insertions(+), 32 deletions(-) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/controller/controller.go b/staging/src/k8s.io/dynamic-resource-allocation/controller/controller.go index 22f7e75a043..38507391303 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/controller/controller.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/controller/controller.go @@ -50,6 +50,14 @@ import ( type Controller interface { // Run starts the controller. Run(workers int) + + // SetReservedFor can be used to disable adding the Pod which + // triggered allocation to the status.reservedFor. Normally, + // DRA drivers should always do that, so it's the default. + // But nothing in the protocol between the scheduler and + // a driver requires it, so at least for testing the control + // plane components it is useful to disable it. + SetReservedFor(enabled bool) } // Driver provides the actual allocation and deallocation operations. @@ -146,6 +154,7 @@ type controller struct { name string finalizer string driver Driver + setReservedFor bool kubeClient kubernetes.Interface queue workqueue.RateLimitingInterface eventRecorder record.EventRecorder @@ -207,6 +216,7 @@ func New( name: name, finalizer: name + "/deletion-protection", driver: driver, + setReservedFor: true, kubeClient: kubeClient, rcLister: rcInformer.Lister(), rcSynced: rcInformer.Informer().HasSynced, @@ -232,6 +242,10 @@ func New( return ctrl } +func (ctrl *controller) SetReservedFor(enabled bool) { + ctrl.setReservedFor = enabled +} + func resourceEventHandlerFuncs(logger *klog.Logger, ctrl *controller) cache.ResourceEventHandlerFuncs { return cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -609,7 +623,7 @@ func (ctrl *controller) allocateClaims(ctx context.Context, claims []*ClaimAlloc claim := claimAllocation.Claim.DeepCopy() claim.Status.Allocation = claimAllocation.Allocation claim.Status.DriverName = ctrl.name - if selectedUser != nil { + if selectedUser != nil && ctrl.setReservedFor { claim.Status.ReservedFor = append(claim.Status.ReservedFor, *selectedUser) } logger.V(6).Info("Updating claim after allocation", "claim", claim) diff --git a/test/e2e/dra/dra.go b/test/e2e/dra/dra.go index 2ab9ef65f94..a1e4fa42642 100644 --- a/test/e2e/dra/dra.go +++ b/test/e2e/dra/dra.go @@ -179,8 +179,6 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu ginkgo.Context("cluster", func() { nodes := NewNodes(f, 1, 4) - driver := NewDriver(f, nodes, networkResources) - b := newBuilder(f, driver) ginkgo.It("truncates the name of a generated resource claim", func(ctx context.Context) { parameters := b.parameters() @@ -213,7 +211,7 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu // claimTests tries out several different combinations of pods with // claims, both inline and external. - claimTests := func(allocationMode resourcev1alpha2.AllocationMode) { + claimTests := func(b *builder, driver *Driver, allocationMode resourcev1alpha2.AllocationMode) { ginkgo.It("supports simple pod referencing inline resource claim", func(ctx context.Context) { parameters := b.parameters() pod, template := b.podInline(allocationMode) @@ -322,35 +320,49 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu }).WithTimeout(f.Timeouts.PodStartSlow).Should(gomega.HaveField("Status.ContainerStatuses", gomega.ContainElements(gomega.HaveField("RestartCount", gomega.BeNumerically(">=", 2))))) gomega.Expect(driver.Controller.GetNumAllocations()).To(gomega.Equal(int64(1)), "number of allocations") }) + + ginkgo.It("must deallocate after use when using delayed allocation", func(ctx context.Context) { + parameters := b.parameters() + pod := b.podExternal() + claim := b.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer) + b.create(ctx, parameters, claim, pod) + + gomega.Eventually(ctx, func(ctx context.Context) (*resourcev1alpha2.ResourceClaim, error) { + return b.f.ClientSet.ResourceV1alpha2().ResourceClaims(b.f.Namespace.Name).Get(ctx, claim.Name, metav1.GetOptions{}) + }).WithTimeout(f.Timeouts.PodDelete).ShouldNot(gomega.HaveField("Status.Allocation", (*resourcev1alpha2.AllocationResult)(nil))) + + b.testPod(ctx, f.ClientSet, pod) + + ginkgo.By(fmt.Sprintf("deleting pod %s", klog.KObj(pod))) + framework.ExpectNoError(b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{})) + + ginkgo.By("waiting for claim to get deallocated") + gomega.Eventually(ctx, func(ctx context.Context) (*resourcev1alpha2.ResourceClaim, error) { + return b.f.ClientSet.ResourceV1alpha2().ResourceClaims(b.f.Namespace.Name).Get(ctx, claim.Name, metav1.GetOptions{}) + }).WithTimeout(f.Timeouts.PodDelete).Should(gomega.HaveField("Status.Allocation", (*resourcev1alpha2.AllocationResult)(nil))) + }) } - ginkgo.Context("with delayed allocation", func() { - claimTests(resourcev1alpha2.AllocationModeWaitForFirstConsumer) + ginkgo.Context("with delayed allocation and setting ReservedFor", func() { + driver := NewDriver(f, nodes, networkResources) + b := newBuilder(f, driver) + claimTests(b, driver, resourcev1alpha2.AllocationModeWaitForFirstConsumer) + }) + + ginkgo.Context("with delayed allocation and not setting ReservedFor", func() { + driver := NewDriver(f, nodes, func() app.Resources { + resources := networkResources() + resources.DontSetReservedFor = true + return resources + }) + b := newBuilder(f, driver) + claimTests(b, driver, resourcev1alpha2.AllocationModeWaitForFirstConsumer) }) ginkgo.Context("with immediate allocation", func() { - claimTests(resourcev1alpha2.AllocationModeImmediate) - }) - - ginkgo.It("must deallocate after use when using delayed allocation", func(ctx context.Context) { - parameters := b.parameters() - pod := b.podExternal() - claim := b.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer) - b.create(ctx, parameters, claim, pod) - - gomega.Eventually(ctx, func(ctx context.Context) (*resourcev1alpha2.ResourceClaim, error) { - return b.f.ClientSet.ResourceV1alpha2().ResourceClaims(b.f.Namespace.Name).Get(ctx, claim.Name, metav1.GetOptions{}) - }).WithTimeout(f.Timeouts.PodDelete).ShouldNot(gomega.HaveField("Status.Allocation", (*resourcev1alpha2.AllocationResult)(nil))) - - b.testPod(ctx, f.ClientSet, pod) - - ginkgo.By(fmt.Sprintf("deleting pod %s", klog.KObj(pod))) - framework.ExpectNoError(b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{})) - - ginkgo.By("waiting for claim to get deallocated") - gomega.Eventually(ctx, func(ctx context.Context) (*resourcev1alpha2.ResourceClaim, error) { - return b.f.ClientSet.ResourceV1alpha2().ResourceClaims(b.f.Namespace.Name).Get(ctx, claim.Name, metav1.GetOptions{}) - }).WithTimeout(f.Timeouts.PodDelete).Should(gomega.HaveField("Status.Allocation", (*resourcev1alpha2.AllocationResult)(nil))) + driver := NewDriver(f, nodes, networkResources) + b := newBuilder(f, driver) + claimTests(b, driver, resourcev1alpha2.AllocationModeImmediate) }) }) diff --git a/test/e2e/dra/test-driver/app/controller.go b/test/e2e/dra/test-driver/app/controller.go index ef099687929..673c7bced26 100644 --- a/test/e2e/dra/test-driver/app/controller.go +++ b/test/e2e/dra/test-driver/app/controller.go @@ -38,10 +38,11 @@ import ( ) type Resources struct { - NodeLocal bool - Nodes []string - MaxAllocations int - Shareable bool + DontSetReservedFor bool + NodeLocal bool + Nodes []string + MaxAllocations int + Shareable bool // AllocateWrapper, if set, gets called for each Allocate call. AllocateWrapper AllocateWrapperType @@ -80,6 +81,7 @@ func NewController(clientset kubernetes.Interface, driverName string, resources func (c *ExampleController) Run(ctx context.Context, workers int) { informerFactory := informers.NewSharedInformerFactory(c.clientset, 0 /* resync period */) ctrl := controller.New(ctx, c.driverName, c, c.clientset, informerFactory) + ctrl.SetReservedFor(!c.resources.DontSetReservedFor) informerFactory.Start(ctx.Done()) ctrl.Run(workers) // If we get here, the context was canceled and we can wait for informer factory goroutines. From 5cec6d798c315c1e727ad41adb53db6ee35b9cba Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 22 May 2023 19:08:20 +0200 Subject: [PATCH 3/5] dra: revamp event handlers in kube-controller-manager Enabling logging is useful to track what the code is doing. There are some functional changes: - The pod handler checks for existence of claims. This avoids adding pods to the work queue in more cases when nothing needs to be done, at the cost of making the event handlers a bit slower. This will become more important when adding more work to the controller - The handler for deleted ResourceClaim did not check for cache.DeletedFinalStateUnknown. --- pkg/controller/resourceclaim/controller.go | 130 +++++++++++++----- .../resourceclaim/controller_test.go | 2 +- 2 files changed, 94 insertions(+), 38 deletions(-) diff --git a/pkg/controller/resourceclaim/controller.go b/pkg/controller/resourceclaim/controller.go index 9c4e6ce3fd3..9a3a5ccb36d 100644 --- a/pkg/controller/resourceclaim/controller.go +++ b/pkg/controller/resourceclaim/controller.go @@ -152,13 +152,16 @@ func NewController( } if _, err := claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - ec.onResourceClaimAddOrUpdate(logger, obj) + logger.V(6).Info("new claim", "claimDump", obj) + ec.enqueueResourceClaim(logger, obj, false) }, UpdateFunc: func(old, updated interface{}) { - ec.onResourceClaimAddOrUpdate(logger, updated) + logger.V(6).Info("updated claim", "claimDump", updated) + ec.enqueueResourceClaim(logger, updated, false) }, DeleteFunc: func(obj interface{}) { - ec.onResourceClaimDelete(logger, obj) + logger.V(6).Info("deleted claim", "claimDump", obj) + ec.enqueueResourceClaim(logger, obj, true) }, }); err != nil { return nil, err @@ -199,6 +202,7 @@ func (ec *Controller) enqueuePod(logger klog.Logger, obj interface{}, deleted bo pod, ok := obj.(*v1.Pod) if !ok { // Not a pod?! + logger.Error(nil, "enqueuePod called for unexpected object", "type", fmt.Sprintf("%T", obj)) return } @@ -208,13 +212,14 @@ func (ec *Controller) enqueuePod(logger klog.Logger, obj interface{}, deleted bo } if deleted { + logger.V(6).Info("pod got deleted", "pod", klog.KObj(pod)) ec.deletedObjects.Add(pod.UID) } logger.V(6).Info("pod with resource claims changed", "pod", klog.KObj(pod), "deleted", deleted) // Release reservations of a deleted or completed pod? - if deleted || isPodDone(pod) { + if needsClaims, reason := podNeedsClaims(pod, deleted); !needsClaims { for _, podClaim := range pod.Spec.ResourceClaims { claimName, _, err := resourceclaim.Name(pod, &podClaim) switch { @@ -222,68 +227,119 @@ func (ec *Controller) enqueuePod(logger klog.Logger, obj interface{}, deleted bo // Either the claim was not created (nothing to do here) or // the API changed. The later will also get reported elsewhere, // so here it's just a debug message. - klog.TODO().V(6).Info("Nothing to do for claim during pod change", "err", err) + logger.V(6).Info("Nothing to do for claim during pod change", "err", err, "reason", reason) case claimName != nil: key := claimKeyPrefix + pod.Namespace + "/" + *claimName - logger.V(6).Info("pod is deleted or done, process claim", "pod", klog.KObj(pod), "key", key) - ec.queue.Add(claimKeyPrefix + pod.Namespace + "/" + *claimName) + logger.V(6).Info("Process claim", "pod", klog.KObj(pod), "key", key, "reason", reason) + ec.queue.Add(key) default: // Nothing to do, claim wasn't generated. - klog.TODO().V(6).Info("Nothing to do for skipped claim during pod change") + logger.V(6).Info("Nothing to do for skipped claim during pod change", "reason", reason) } } } - // Create ResourceClaim for inline templates? - if pod.DeletionTimestamp == nil { - for _, podClaim := range pod.Spec.ResourceClaims { + needsWork, reason := ec.podNeedsWork(pod) + if needsWork { + logger.V(6).Info("enqueing pod", "pod", klog.KObj(pod), "reason", reason) + ec.queue.Add(podKeyPrefix + pod.Namespace + "/" + pod.Name) + return + } + logger.V(6).Info("not enqueing pod", "pod", klog.KObj(pod), "reason", reason) +} + +func podNeedsClaims(pod *v1.Pod, deleted bool) (bool, string) { + if deleted { + return false, "pod got removed" + } + if podutil.IsPodTerminal(pod) { + return false, "pod has terminated" + } + if pod.DeletionTimestamp != nil && pod.Spec.NodeName == "" { + return false, "pod got deleted before scheduling" + } + // Still needs claims. + return true, "pod might run" +} + +// podNeedsWork checks whether a new or modified pod needs to be processed +// further by a worker. It returns a boolean with the result and an explanation +// for it. +func (ec *Controller) podNeedsWork(pod *v1.Pod) (bool, string) { + if pod.DeletionTimestamp != nil { + // Nothing else to do for the pod. + return false, "pod is deleted" + } + + for _, podClaim := range pod.Spec.ResourceClaims { + claimName, checkOwner, err := resourceclaim.Name(pod, &podClaim) + if err != nil { + return true, err.Error() + } + // If the claimName is nil, then it has been determined before + // that the claim is not needed. + if claimName == nil { + return false, "claim is not needed" + } + claim, err := ec.claimLister.ResourceClaims(pod.Namespace).Get(*claimName) + if apierrors.IsNotFound(err) { if podClaim.Source.ResourceClaimTemplateName != nil { - // It has at least one inline template, work on it. - key := podKeyPrefix + pod.Namespace + "/" + pod.Name - logger.V(6).Info("pod is not deleted, process it", "pod", klog.KObj(pod), "key", key) - ec.queue.Add(key) - break + return true, "must create ResourceClaim from template" } + // User needs to create claim. + return false, "claim is missing and must be created by user" + } + if err != nil { + // Shouldn't happen. + return true, fmt.Sprintf("internal error while checking for claim: %v", err) + } + + if checkOwner && + resourceclaim.IsForPod(pod, claim) != nil { + // Cannot proceed with the pod unless that other claim gets deleted. + return false, "conflicting claim needs to be removed by user" + } + + if pod.Spec.NodeName == "" { + // Scheduler will handle PodSchedulingContext and reservations. + return false, "pod not scheduled" } } + + return false, "nothing to do" } -func (ec *Controller) onResourceClaimAddOrUpdate(logger klog.Logger, obj interface{}) { +func (ec *Controller) enqueueResourceClaim(logger klog.Logger, obj interface{}, deleted bool) { + if d, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = d.Obj + } claim, ok := obj.(*resourcev1alpha2.ResourceClaim) if !ok { return } - // When starting up, we have to check all claims to find those with - // stale pods in ReservedFor. During an update, a pod might get added - // that already no longer exists. - key := claimKeyPrefix + claim.Namespace + "/" + claim.Name - logger.V(6).Info("claim is new or updated, process it", "key", key) - ec.queue.Add(key) -} - -func (ec *Controller) onResourceClaimDelete(logger klog.Logger, obj interface{}) { - claim, ok := obj.(*resourcev1alpha2.ResourceClaim) - if !ok { - return + if !deleted { + // When starting up, we have to check all claims to find those with + // stale pods in ReservedFor. During an update, a pod might get added + // that already no longer exists. + key := claimKeyPrefix + claim.Namespace + "/" + claim.Name + logger.V(6).Info("enqueing new or updated claim", "claim", klog.KObj(claim), "key", key) + ec.queue.Add(key) + } else { + logger.V(6).Info("not enqueing deleted claim", "claim", klog.KObj(claim)) } - // Someone deleted a ResourceClaim, either intentionally or - // accidentally. If there is a pod referencing it because of - // an inline resource, then we should re-create the ResourceClaim. - // The common indexer does some prefiltering for us by - // limiting the list to those pods which reference - // the ResourceClaim. + // Also check whether this causes work for any of the currently + // known pods which use the ResourceClaim. objs, err := ec.podIndexer.ByIndex(podResourceClaimIndex, fmt.Sprintf("%s/%s", claim.Namespace, claim.Name)) if err != nil { - runtime.HandleError(fmt.Errorf("listing pods from cache: %v", err)) + logger.Error(err, "listing pods from cache") return } if len(objs) == 0 { logger.V(6).Info("claim got deleted while not needed by any pod, nothing to do", "claim", klog.KObj(claim)) return } - logger = klog.LoggerWithValues(logger, "claim", klog.KObj(claim)) for _, obj := range objs { ec.enqueuePod(logger, obj, false) } diff --git a/pkg/controller/resourceclaim/controller_test.go b/pkg/controller/resourceclaim/controller_test.go index 6384887d9a2..4ab378ea631 100644 --- a/pkg/controller/resourceclaim/controller_test.go +++ b/pkg/controller/resourceclaim/controller_test.go @@ -343,7 +343,7 @@ func TestSyncHandler(t *testing.T) { claimInformer := informerFactory.Resource().V1alpha2().ResourceClaims() templateInformer := informerFactory.Resource().V1alpha2().ResourceClaimTemplates() - ec, err := NewController(klog.TODO(), fakeKubeClient, podInformer, claimInformer, templateInformer) + ec, err := NewController(klog.FromContext(ctx), fakeKubeClient, podInformer, claimInformer, templateInformer) if err != nil { t.Fatalf("error creating ephemeral controller : %v", err) } From cffbb1f1b29fc9883c92d68677dbfa5f32073c60 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 7 Jul 2023 19:27:22 +0200 Subject: [PATCH 4/5] dra controller: enhance testing The allocation mode is relevant when clearing the reservedFor: for delayed allocation, deallocation gets requested, for immediate allocation not. Both should get tested. All pre-defined claims now use delayed allocation, just as they would if created normally. --- .../resourceclaim/controller_test.go | 72 ++++++++++++++++--- 1 file changed, 63 insertions(+), 9 deletions(-) diff --git a/pkg/controller/resourceclaim/controller_test.go b/pkg/controller/resourceclaim/controller_test.go index 4ab378ea631..f842c5a60b2 100644 --- a/pkg/controller/resourceclaim/controller_test.go +++ b/pkg/controller/resourceclaim/controller_test.go @@ -56,8 +56,15 @@ var ( otherTestPod = makePod(testPodName+"-II", testNamespace, testPodUID+"-II") testClaim = makeClaim(testPodName+"-"+podResourceClaimName, testNamespace, className, makeOwnerReference(testPodWithResource, true)) generatedTestClaim = makeGeneratedClaim(podResourceClaimName, testPodName+"-"+podResourceClaimName, testNamespace, className, 1, makeOwnerReference(testPodWithResource, true)) - testClaimReserved = func() *resourcev1alpha2.ResourceClaim { + testClaimAllocated = func() *resourcev1alpha2.ResourceClaim { claim := testClaim.DeepCopy() + claim.Status.Allocation = &resourcev1alpha2.AllocationResult{ + Shareable: true, + } + return claim + }() + testClaimReserved = func() *resourcev1alpha2.ResourceClaim { + claim := testClaimAllocated.DeepCopy() claim.Status.ReservedFor = append(claim.Status.ReservedFor, resourcev1alpha2.ResourceClaimConsumerReference{ Resource: "pods", @@ -264,15 +271,35 @@ func TestSyncHandler(t *testing.T) { expectedMetrics: expectedMetrics{0, 0}, }, { - name: "clear-reserved", - pods: []*v1.Pod{}, - key: claimKey(testClaimReserved), - claims: []*resourcev1alpha2.ResourceClaim{testClaimReserved}, - expectedClaims: []resourcev1alpha2.ResourceClaim{*testClaim}, + name: "clear-reserved-delayed-allocation", + pods: []*v1.Pod{}, + key: claimKey(testClaimReserved), + claims: []*resourcev1alpha2.ResourceClaim{testClaimReserved}, + expectedClaims: func() []resourcev1alpha2.ResourceClaim { + claim := testClaimAllocated.DeepCopy() + claim.Status.DeallocationRequested = true + return []resourcev1alpha2.ResourceClaim{*claim} + }(), expectedMetrics: expectedMetrics{0, 0}, }, { - name: "clear-reserved-when-done", + name: "clear-reserved-immediate-allocation", + pods: []*v1.Pod{}, + key: claimKey(testClaimReserved), + claims: func() []*resourcev1alpha2.ResourceClaim { + claim := testClaimReserved.DeepCopy() + claim.Spec.AllocationMode = resourcev1alpha2.AllocationModeImmediate + return []*resourcev1alpha2.ResourceClaim{claim} + }(), + expectedClaims: func() []resourcev1alpha2.ResourceClaim { + claim := testClaimAllocated.DeepCopy() + claim.Spec.AllocationMode = resourcev1alpha2.AllocationModeImmediate + return []resourcev1alpha2.ResourceClaim{*claim} + }(), + expectedMetrics: expectedMetrics{0, 0}, + }, + { + name: "clear-reserved-when-done-delayed-allocation", pods: func() []*v1.Pod { pods := []*v1.Pod{testPodWithResource.DeepCopy()} pods[0].Status.Phase = v1.PodSucceeded @@ -285,9 +312,31 @@ func TestSyncHandler(t *testing.T) { return claims }(), expectedClaims: func() []resourcev1alpha2.ResourceClaim { - claims := []resourcev1alpha2.ResourceClaim{*testClaimReserved.DeepCopy()} + claims := []resourcev1alpha2.ResourceClaim{*testClaimAllocated.DeepCopy()} claims[0].OwnerReferences = nil - claims[0].Status.ReservedFor = nil + claims[0].Status.DeallocationRequested = true + return claims + }(), + expectedMetrics: expectedMetrics{0, 0}, + }, + { + name: "clear-reserved-when-done-immediate-allocation", + pods: func() []*v1.Pod { + pods := []*v1.Pod{testPodWithResource.DeepCopy()} + pods[0].Status.Phase = v1.PodSucceeded + return pods + }(), + key: claimKey(testClaimReserved), + claims: func() []*resourcev1alpha2.ResourceClaim { + claims := []*resourcev1alpha2.ResourceClaim{testClaimReserved.DeepCopy()} + claims[0].OwnerReferences = nil + claims[0].Spec.AllocationMode = resourcev1alpha2.AllocationModeImmediate + return claims + }(), + expectedClaims: func() []resourcev1alpha2.ResourceClaim { + claims := []resourcev1alpha2.ResourceClaim{*testClaimAllocated.DeepCopy()} + claims[0].OwnerReferences = nil + claims[0].Spec.AllocationMode = resourcev1alpha2.AllocationModeImmediate return claims }(), expectedMetrics: expectedMetrics{0, 0}, @@ -412,6 +461,7 @@ func makeClaim(name, namespace, classname string, owner *metav1.OwnerReference) ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}, Spec: resourcev1alpha2.ResourceClaimSpec{ ResourceClassName: classname, + AllocationMode: resourcev1alpha2.AllocationModeWaitForFirstConsumer, }, } if owner != nil { @@ -506,6 +556,10 @@ func normalizeClaims(claims []resourcev1alpha2.ResourceClaim) []resourcev1alpha2 if len(claims[i].Status.ReservedFor) == 0 { claims[i].Status.ReservedFor = nil } + if claims[i].Spec.AllocationMode == "" { + // This emulates defaulting. + claims[i].Spec.AllocationMode = resourcev1alpha2.AllocationModeWaitForFirstConsumer + } } return claims } From 80ab8f0542f9ddcf4935e24f742ef2a94b204471 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 22 May 2023 19:44:58 +0200 Subject: [PATCH 5/5] dra: handle scheduled pods in kube-controller-manager When someone decides that a Pod should definitely run on a specific node, they can create the Pod with spec.nodeName already set. Some custom scheduler might do that. Then kubelet starts to check the pod and (if DRA is enabled) will refuse to run it, either because the claims are still waiting for the first consumer or the pod wasn't added to reservedFor. Both are things the scheduler normally does. Also, if a pod got scheduled while the DRA feature was off in the kube-scheduler, a pod can reach the same state. The resource claim controller can handle these two cases by taking over for the kube-scheduler when nodeName is set. Triggering an allocation is simpler than in the scheduler because all it takes is creating the right PodSchedulingContext with spec.selectedNode set. There's no need to list nodes because that choice was already made, permanently. Adding the pod to reservedFor also isn't hard. What's currently missing is triggering de-allocation of claims to re-allocate them for the desired node. This is not important for claims that get created for the pod from a template and then only get used once, but it might be worthwhile to add de-allocation in the future. --- cmd/kube-controller-manager/app/core.go | 1 + pkg/controller/resourceclaim/controller.go | 184 +++++++++++++++--- .../resourceclaim/controller_test.go | 162 +++++++++++---- .../rbac/bootstrappolicy/controller_policy.go | 1 + test/e2e/dra/dra.go | 57 +++++- test/integration/util/util.go | 3 +- 6 files changed, 337 insertions(+), 71 deletions(-) diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 1f3067c6198..1712f8026ab 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -356,6 +356,7 @@ func startResourceClaimController(ctx context.Context, controllerContext Control klog.FromContext(ctx), controllerContext.ClientBuilder.ClientOrDie("resource-claim-controller"), controllerContext.InformerFactory.Core().V1().Pods(), + controllerContext.InformerFactory.Resource().V1alpha2().PodSchedulingContexts(), controllerContext.InformerFactory.Resource().V1alpha2().ResourceClaims(), controllerContext.InformerFactory.Resource().V1alpha2().ResourceClaimTemplates()) if err != nil { diff --git a/pkg/controller/resourceclaim/controller.go b/pkg/controller/resourceclaim/controller.go index 9a3a5ccb36d..430a2fab838 100644 --- a/pkg/controller/resourceclaim/controller.go +++ b/pkg/controller/resourceclaim/controller.go @@ -87,6 +87,13 @@ type Controller struct { podLister v1listers.PodLister podSynced cache.InformerSynced + // podSchedulingList is the shared PodSchedulingContext lister used to + // fetch scheduling objects from the API server. It is shared with other + // controllers and therefore the objects in its store should be treated + // as immutable. + podSchedulingLister resourcev1alpha2listers.PodSchedulingContextLister + podSchedulingSynced cache.InformerSynced + // templateLister is the shared ResourceClaimTemplate lister used to // fetch template objects from the API server. It is shared with other // controllers and therefore the objects in its store should be treated @@ -119,20 +126,23 @@ func NewController( logger klog.Logger, kubeClient clientset.Interface, podInformer v1informers.PodInformer, + podSchedulingInformer resourcev1alpha2informers.PodSchedulingContextInformer, claimInformer resourcev1alpha2informers.ResourceClaimInformer, templateInformer resourcev1alpha2informers.ResourceClaimTemplateInformer) (*Controller, error) { ec := &Controller{ - kubeClient: kubeClient, - podLister: podInformer.Lister(), - podIndexer: podInformer.Informer().GetIndexer(), - podSynced: podInformer.Informer().HasSynced, - claimLister: claimInformer.Lister(), - claimsSynced: claimInformer.Informer().HasSynced, - templateLister: templateInformer.Lister(), - templatesSynced: templateInformer.Informer().HasSynced, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_claim"), - deletedObjects: newUIDCache(maxUIDCacheEntries), + kubeClient: kubeClient, + podLister: podInformer.Lister(), + podIndexer: podInformer.Informer().GetIndexer(), + podSynced: podInformer.Informer().HasSynced, + podSchedulingLister: podSchedulingInformer.Lister(), + podSchedulingSynced: podSchedulingInformer.Informer().HasSynced, + claimLister: claimInformer.Lister(), + claimsSynced: claimInformer.Informer().HasSynced, + templateLister: templateInformer.Lister(), + templatesSynced: templateInformer.Informer().HasSynced, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_claim"), + deletedObjects: newUIDCache(maxUIDCacheEntries), } metrics.RegisterMetrics() @@ -300,9 +310,40 @@ func (ec *Controller) podNeedsWork(pod *v1.Pod) (bool, string) { return false, "conflicting claim needs to be removed by user" } + // This check skips over the reasons below that only apply + // when a pod has been scheduled already. We need to keep checking + // for more claims that might need to be created. if pod.Spec.NodeName == "" { - // Scheduler will handle PodSchedulingContext and reservations. - return false, "pod not scheduled" + continue + } + + // Create PodSchedulingContext if the pod got scheduled without triggering + // delayed allocation. + // + // These can happen when: + // - a user created a pod with spec.nodeName set, perhaps for testing + // - some scheduler was used which is unaware of DRA + // - DRA was not enabled in kube-scheduler (version skew, configuration) + if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer && + claim.Status.Allocation == nil { + scheduling, err := ec.podSchedulingLister.PodSchedulingContexts(pod.Namespace).Get(pod.Name) + if apierrors.IsNotFound(err) { + return true, "need to create PodSchedulingContext for scheduled pod" + } + if err != nil { + // Shouldn't happen. + return true, fmt.Sprintf("internal error while checking for PodSchedulingContext: %v", err) + } + if scheduling.Spec.SelectedNode != pod.Spec.NodeName { + // Need to update PodSchedulingContext. + return true, "need to updated PodSchedulingContext for scheduled pod" + } + } + if claim.Status.Allocation != nil && + !resourceclaim.IsReservedForPod(pod, claim) && + resourceclaim.CanBeReserved(claim) { + // Need to reserve it. + return true, "need to reserve claim for pod" } } @@ -459,6 +500,49 @@ func (ec *Controller) syncPod(ctx context.Context, namespace, name string) error } } + if pod.Spec.NodeName == "" { + // Scheduler will handle PodSchedulingContext and reservations. + logger.V(5).Info("nothing to do for pod, scheduler will deal with it") + return nil + } + + for _, podClaim := range pod.Spec.ResourceClaims { + claimName, checkOwner, err := resourceclaim.Name(pod, &podClaim) + if err != nil { + return err + } + // If nil, then it has been determined that the claim is not needed + // and can be skipped. + if claimName == nil { + continue + } + claim, err := ec.claimLister.ResourceClaims(pod.Namespace).Get(*claimName) + if apierrors.IsNotFound(err) { + return nil + } + if err != nil { + return fmt.Errorf("retrieve claim: %v", err) + } + if checkOwner { + if err := resourceclaim.IsForPod(pod, claim); err != nil { + return err + } + } + if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer && + claim.Status.Allocation == nil { + logger.V(5).Info("create PodSchedulingContext because claim needs to be allocated", "resourceClaim", klog.KObj(claim)) + return ec.ensurePodSchedulingContext(ctx, pod) + } + if claim.Status.Allocation != nil && + !resourceclaim.IsReservedForPod(pod, claim) && + resourceclaim.CanBeReserved(claim) { + logger.V(5).Info("reserve claim for pod", "resourceClaim", klog.KObj(claim)) + if err := ec.reserveForPod(ctx, pod, claim); err != nil { + return err + } + } + } + return nil } @@ -618,6 +702,64 @@ func (ec *Controller) findPodResourceClaim(pod *v1.Pod, podClaim v1.PodResourceC return nil, nil } +func (ec *Controller) ensurePodSchedulingContext(ctx context.Context, pod *v1.Pod) error { + scheduling, err := ec.podSchedulingLister.PodSchedulingContexts(pod.Namespace).Get(pod.Name) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("retrieve PodSchedulingContext: %v", err) + } + if scheduling == nil { + scheduling = &resourcev1alpha2.PodSchedulingContext{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "v1", + Kind: "Pod", + Name: pod.Name, + UID: pod.UID, + Controller: pointer.Bool(true), + }, + }, + }, + Spec: resourcev1alpha2.PodSchedulingContextSpec{ + SelectedNode: pod.Spec.NodeName, + // There is no need for negotiation about + // potential and suitable nodes anymore, so + // PotentialNodes can be left empty. + }, + } + if _, err := ec.kubeClient.ResourceV1alpha2().PodSchedulingContexts(pod.Namespace).Create(ctx, scheduling, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("create PodSchedulingContext: %v", err) + } + return nil + } + + if scheduling.Spec.SelectedNode != pod.Spec.NodeName { + scheduling := scheduling.DeepCopy() + scheduling.Spec.SelectedNode = pod.Spec.NodeName + if _, err := ec.kubeClient.ResourceV1alpha2().PodSchedulingContexts(pod.Namespace).Update(ctx, scheduling, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("update spec.selectedNode in PodSchedulingContext: %v", err) + } + } + + return nil +} + +func (ec *Controller) reserveForPod(ctx context.Context, pod *v1.Pod, claim *resourcev1alpha2.ResourceClaim) error { + claim = claim.DeepCopy() + claim.Status.ReservedFor = append(claim.Status.ReservedFor, + resourcev1alpha2.ResourceClaimConsumerReference{ + Resource: "pods", + Name: pod.Name, + UID: pod.UID, + }) + if _, err := ec.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("reserve claim for pod: %v", err) + } + return nil +} + func (ec *Controller) syncClaim(ctx context.Context, namespace, name string) error { logger := klog.LoggerWithValues(klog.FromContext(ctx), "claim", klog.KRef(namespace, name)) ctx = klog.NewContext(ctx, logger) @@ -772,7 +914,7 @@ func owningPod(claim *resourcev1alpha2.ResourceClaim) (string, types.UID) { } // podResourceClaimIndexFunc is an index function that returns ResourceClaim keys (= -// namespace/name) for ResourceClaimTemplates in a given pod. +// namespace/name) for ResourceClaim or ResourceClaimTemplates in a given pod. func podResourceClaimIndexFunc(obj interface{}) ([]string, error) { pod, ok := obj.(*v1.Pod) if !ok { @@ -780,16 +922,14 @@ func podResourceClaimIndexFunc(obj interface{}) ([]string, error) { } keys := []string{} for _, podClaim := range pod.Spec.ResourceClaims { - if podClaim.Source.ResourceClaimTemplateName != nil { - claimName, _, err := resourceclaim.Name(pod, &podClaim) - if err != nil || claimName == nil { - // Index functions are not supposed to fail, the caller will panic. - // For both error reasons (claim not created yet, unknown API) - // we simply don't index. - continue - } - keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, *claimName)) + claimName, _, err := resourceclaim.Name(pod, &podClaim) + if err != nil || claimName == nil { + // Index functions are not supposed to fail, the caller will panic. + // For both error reasons (claim not created yet, unknown API) + // we simply don't index. + continue } + keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, *claimName)) } return keys, nil } diff --git a/pkg/controller/resourceclaim/controller_test.go b/pkg/controller/resourceclaim/controller_test.go index f842c5a60b2..6ca1422cee7 100644 --- a/pkg/controller/resourceclaim/controller_test.go +++ b/pkg/controller/resourceclaim/controller_test.go @@ -40,6 +40,7 @@ import ( "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller" ephemeralvolumemetrics "k8s.io/kubernetes/pkg/controller/resourceclaim/metrics" + "k8s.io/utils/pointer" ) var ( @@ -50,44 +51,53 @@ var ( podResourceClaimName = "acme-resource" templateName = "my-template" className = "my-resource-class" + nodeName = "worker" testPod = makePod(testPodName, testNamespace, testPodUID) testPodWithResource = makePod(testPodName, testNamespace, testPodUID, *makePodResourceClaim(podResourceClaimName, templateName)) otherTestPod = makePod(testPodName+"-II", testNamespace, testPodUID+"-II") - testClaim = makeClaim(testPodName+"-"+podResourceClaimName, testNamespace, className, makeOwnerReference(testPodWithResource, true)) - generatedTestClaim = makeGeneratedClaim(podResourceClaimName, testPodName+"-"+podResourceClaimName, testNamespace, className, 1, makeOwnerReference(testPodWithResource, true)) - testClaimAllocated = func() *resourcev1alpha2.ResourceClaim { - claim := testClaim.DeepCopy() - claim.Status.Allocation = &resourcev1alpha2.AllocationResult{ - Shareable: true, - } - return claim - }() - testClaimReserved = func() *resourcev1alpha2.ResourceClaim { - claim := testClaimAllocated.DeepCopy() - claim.Status.ReservedFor = append(claim.Status.ReservedFor, - resourcev1alpha2.ResourceClaimConsumerReference{ - Resource: "pods", - Name: testPodWithResource.Name, - UID: testPodWithResource.UID, - }, - ) - return claim - }() - testClaimReservedTwice = func() *resourcev1alpha2.ResourceClaim { - claim := testClaimReserved.DeepCopy() - claim.Status.ReservedFor = append(claim.Status.ReservedFor, - resourcev1alpha2.ResourceClaimConsumerReference{ - Resource: "pods", - Name: otherTestPod.Name, - UID: otherTestPod.UID, - }, - ) - return claim - }() + + testClaim = makeClaim(testPodName+"-"+podResourceClaimName, testNamespace, className, makeOwnerReference(testPodWithResource, true)) + testClaimAllocated = allocateClaim(testClaim) + testClaimReserved = reserveClaim(testClaimAllocated, testPodWithResource) + testClaimReservedTwice = reserveClaim(testClaimReserved, otherTestPod) + + generatedTestClaim = makeGeneratedClaim(podResourceClaimName, testPodName+"-"+podResourceClaimName, testNamespace, className, 1, makeOwnerReference(testPodWithResource, true)) + generatedTestClaimAllocated = allocateClaim(generatedTestClaim) + generatedTestClaimReserved = reserveClaim(generatedTestClaimAllocated, testPodWithResource) + conflictingClaim = makeClaim(testPodName+"-"+podResourceClaimName, testNamespace, className, nil) otherNamespaceClaim = makeClaim(testPodName+"-"+podResourceClaimName, otherNamespace, className, nil) template = makeTemplate(templateName, testNamespace, className) + + testPodWithNodeName = func() *v1.Pod { + pod := testPodWithResource.DeepCopy() + pod.Spec.NodeName = nodeName + pod.Status.ResourceClaimStatuses = append(pod.Status.ResourceClaimStatuses, v1.PodResourceClaimStatus{ + Name: pod.Spec.ResourceClaims[0].Name, + ResourceClaimName: &generatedTestClaim.Name, + }) + return pod + }() + + podSchedulingContext = resourcev1alpha2.PodSchedulingContext{ + ObjectMeta: metav1.ObjectMeta{ + Name: testPodName, + Namespace: testNamespace, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "v1", + Kind: "Pod", + Name: testPodName, + UID: testPodUID, + Controller: pointer.Bool(true), + }, + }, + }, + Spec: resourcev1alpha2.PodSchedulingContextSpec{ + SelectedNode: nodeName, + }, + } ) func init() { @@ -96,17 +106,18 @@ func init() { func TestSyncHandler(t *testing.T) { tests := []struct { - name string - key string - claims []*resourcev1alpha2.ResourceClaim - claimsInCache []*resourcev1alpha2.ResourceClaim - pods []*v1.Pod - podsLater []*v1.Pod - templates []*resourcev1alpha2.ResourceClaimTemplate - expectedClaims []resourcev1alpha2.ResourceClaim - expectedStatuses map[string][]v1.PodResourceClaimStatus - expectedError bool - expectedMetrics expectedMetrics + name string + key string + claims []*resourcev1alpha2.ResourceClaim + claimsInCache []*resourcev1alpha2.ResourceClaim + pods []*v1.Pod + podsLater []*v1.Pod + templates []*resourcev1alpha2.ResourceClaimTemplate + expectedClaims []resourcev1alpha2.ResourceClaim + expectedPodSchedulingContexts []resourcev1alpha2.PodSchedulingContext + expectedStatuses map[string][]v1.PodResourceClaimStatus + expectedError bool + expectedMetrics expectedMetrics }{ { name: "create", @@ -361,6 +372,35 @@ func TestSyncHandler(t *testing.T) { expectedClaims: nil, expectedMetrics: expectedMetrics{0, 0}, }, + { + name: "trigger-allocation", + pods: []*v1.Pod{testPodWithNodeName}, + key: podKey(testPodWithNodeName), + templates: []*resourcev1alpha2.ResourceClaimTemplate{template}, + claims: []*resourcev1alpha2.ResourceClaim{generatedTestClaim}, + expectedClaims: []resourcev1alpha2.ResourceClaim{*generatedTestClaim}, + expectedStatuses: map[string][]v1.PodResourceClaimStatus{ + testPodWithNodeName.Name: { + {Name: testPodWithNodeName.Spec.ResourceClaims[0].Name, ResourceClaimName: &generatedTestClaim.Name}, + }, + }, + expectedPodSchedulingContexts: []resourcev1alpha2.PodSchedulingContext{podSchedulingContext}, + expectedMetrics: expectedMetrics{0, 0}, + }, + { + name: "add-reserved", + pods: []*v1.Pod{testPodWithNodeName}, + key: podKey(testPodWithNodeName), + templates: []*resourcev1alpha2.ResourceClaimTemplate{template}, + claims: []*resourcev1alpha2.ResourceClaim{generatedTestClaimAllocated}, + expectedClaims: []resourcev1alpha2.ResourceClaim{*generatedTestClaimReserved}, + expectedStatuses: map[string][]v1.PodResourceClaimStatus{ + testPodWithNodeName.Name: { + {Name: testPodWithNodeName.Spec.ResourceClaims[0].Name, ResourceClaimName: &generatedTestClaim.Name}, + }, + }, + expectedMetrics: expectedMetrics{0, 0}, + }, } for _, tc := range tests { @@ -389,10 +429,11 @@ func TestSyncHandler(t *testing.T) { setupMetrics() informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) podInformer := informerFactory.Core().V1().Pods() + podSchedulingInformer := informerFactory.Resource().V1alpha2().PodSchedulingContexts() claimInformer := informerFactory.Resource().V1alpha2().ResourceClaims() templateInformer := informerFactory.Resource().V1alpha2().ResourceClaimTemplates() - ec, err := NewController(klog.FromContext(ctx), fakeKubeClient, podInformer, claimInformer, templateInformer) + ec, err := NewController(klog.FromContext(ctx), fakeKubeClient, podInformer, podSchedulingInformer, claimInformer, templateInformer) if err != nil { t.Fatalf("error creating ephemeral controller : %v", err) } @@ -451,6 +492,12 @@ func TestSyncHandler(t *testing.T) { } assert.Equal(t, tc.expectedStatuses, actualStatuses, "pod resource claim statuses") + scheduling, err := fakeKubeClient.ResourceV1alpha2().PodSchedulingContexts("").List(ctx, metav1.ListOptions{}) + if err != nil { + t.Fatalf("unexpected error while listing claims: %v", err) + } + assert.Equal(t, normalizeScheduling(tc.expectedPodSchedulingContexts), normalizeScheduling(scheduling.Items)) + expectMetrics(t, tc.expectedMetrics) }) } @@ -481,6 +528,7 @@ func makeGeneratedClaim(podClaimName, generateName, namespace, classname string, }, Spec: resourcev1alpha2.ResourceClaimSpec{ ResourceClassName: classname, + AllocationMode: resourcev1alpha2.AllocationModeWaitForFirstConsumer, }, } if owner != nil { @@ -490,6 +538,26 @@ func makeGeneratedClaim(podClaimName, generateName, namespace, classname string, return claim } +func allocateClaim(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim { + claim = claim.DeepCopy() + claim.Status.Allocation = &resourcev1alpha2.AllocationResult{ + Shareable: true, + } + return claim +} + +func reserveClaim(claim *resourcev1alpha2.ResourceClaim, pod *v1.Pod) *resourcev1alpha2.ResourceClaim { + claim = claim.DeepCopy() + claim.Status.ReservedFor = append(claim.Status.ReservedFor, + resourcev1alpha2.ResourceClaimConsumerReference{ + Resource: "pods", + Name: pod.Name, + UID: pod.UID, + }, + ) + return claim +} + func makePodResourceClaim(name, templateName string) *v1.PodResourceClaim { return &v1.PodResourceClaim{ Name: name, @@ -564,6 +632,14 @@ func normalizeClaims(claims []resourcev1alpha2.ResourceClaim) []resourcev1alpha2 return claims } +func normalizeScheduling(scheduling []resourcev1alpha2.PodSchedulingContext) []resourcev1alpha2.PodSchedulingContext { + sort.Slice(scheduling, func(i, j int) bool { + return scheduling[i].Namespace < scheduling[j].Namespace || + scheduling[i].Name < scheduling[j].Name + }) + return scheduling +} + func createTestClient(objects ...runtime.Object) *fake.Clientset { fakeClient := fake.NewSimpleClientset(objects...) fakeClient.PrependReactor("create", "resourceclaims", createResourceClaimReactor()) diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index b1eb24a9c3e..6b6a10def9a 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -213,6 +213,7 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding) rbacv1helpers.NewRule("get", "list", "watch").Groups(legacyGroup).Resources("pods").RuleOrDie(), rbacv1helpers.NewRule("update").Groups(legacyGroup).Resources("pods/finalizers").RuleOrDie(), rbacv1helpers.NewRule("get", "list", "watch", "create", "delete").Groups(resourceGroup).Resources("resourceclaims").RuleOrDie(), + rbacv1helpers.NewRule("get", "list", "watch", "create", "update", "patch").Groups(resourceGroup).Resources("podschedulingcontexts").RuleOrDie(), rbacv1helpers.NewRule("update", "patch").Groups(resourceGroup).Resources("resourceclaims/status").RuleOrDie(), rbacv1helpers.NewRule("update", "patch").Groups(legacyGroup).Resources("pods/status").RuleOrDie(), eventsRule(), diff --git a/test/e2e/dra/dra.go b/test/e2e/dra/dra.go index a1e4fa42642..9b3984179b2 100644 --- a/test/e2e/dra/dra.go +++ b/test/e2e/dra/dra.go @@ -98,16 +98,31 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu }) ginkgo.It("must not run a pod if a claim is not reserved for it", func(ctx context.Context) { - parameters := b.parameters() - claim := b.externalClaim(resourcev1alpha2.AllocationModeImmediate) + // Pretend that the resource is allocated and reserved for some other entity. + // Until the resourceclaim controller learns to remove reservations for + // arbitrary types we can simply fake somthing here. + claim := b.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer) + b.create(ctx, claim) + claim, err := f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).Get(ctx, claim.Name, metav1.GetOptions{}) + framework.ExpectNoError(err, "get claim") + claim.Status.Allocation = &resourcev1alpha2.AllocationResult{} + claim.Status.DriverName = driver.Name + claim.Status.ReservedFor = append(claim.Status.ReservedFor, resourcev1alpha2.ResourceClaimConsumerReference{ + APIGroup: "example.com", + Resource: "some", + Name: "thing", + UID: "12345", + }) + _, err = f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).UpdateStatus(ctx, claim, metav1.UpdateOptions{}) + framework.ExpectNoError(err, "update claim") + pod := b.podExternal() // This bypasses scheduling and therefore the pod gets // to run on the node although it never gets added to // the `ReservedFor` field of the claim. pod.Spec.NodeName = nodes.NodeNames[0] - - b.create(ctx, parameters, claim, pod) + b.create(ctx, pod) gomega.Consistently(func() error { testPod, err := b.f.ClientSet.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) @@ -178,7 +193,9 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu }) ginkgo.Context("cluster", func() { - nodes := NewNodes(f, 1, 4) + nodes := NewNodes(f, 1, 1) + driver := NewDriver(f, nodes, networkResources) + b := newBuilder(f, driver) ginkgo.It("truncates the name of a generated resource claim", func(ctx context.Context) { parameters := b.parameters() @@ -208,6 +225,10 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu framework.ExpectNoError(err) framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod)) }) + }) + + ginkgo.Context("cluster", func() { + nodes := NewNodes(f, 1, 4) // claimTests tries out several different combinations of pods with // claims, both inline and external. @@ -341,6 +362,32 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu return b.f.ClientSet.ResourceV1alpha2().ResourceClaims(b.f.Namespace.Name).Get(ctx, claim.Name, metav1.GetOptions{}) }).WithTimeout(f.Timeouts.PodDelete).Should(gomega.HaveField("Status.Allocation", (*resourcev1alpha2.AllocationResult)(nil))) }) + + // kube-controller-manager can trigger delayed allocation for pods where the + // node name was already selected when creating the pod. For immediate + // allocation, the creator has to ensure that the node matches the claims. + // This does not work for resource claim templates and only isn't + // a problem here because the resource is network-attached and available + // on all nodes. + + ginkgo.It("supports scheduled pod referencing inline resource claim", func(ctx context.Context) { + parameters := b.parameters() + pod, template := b.podInline(allocationMode) + pod.Spec.NodeName = nodes.NodeNames[0] + b.create(ctx, parameters, pod, template) + + b.testPod(ctx, f.ClientSet, pod) + }) + + ginkgo.It("supports scheduled pod referencing external resource claim", func(ctx context.Context) { + parameters := b.parameters() + claim := b.externalClaim(allocationMode) + pod := b.podExternal() + pod.Spec.NodeName = nodes.NodeNames[0] + b.create(ctx, parameters, claim, pod) + + b.testPod(ctx, f.ClientSet, pod) + }) } ginkgo.Context("with delayed allocation and setting ReservedFor", func() { diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 548bb66e663..533f70a92ea 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -121,9 +121,10 @@ func StartScheduler(ctx context.Context, clientSet clientset.Interface, kubeConf func CreateResourceClaimController(ctx context.Context, tb testing.TB, clientSet clientset.Interface, informerFactory informers.SharedInformerFactory) func() { podInformer := informerFactory.Core().V1().Pods() + schedulingInformer := informerFactory.Resource().V1alpha2().PodSchedulingContexts() claimInformer := informerFactory.Resource().V1alpha2().ResourceClaims() claimTemplateInformer := informerFactory.Resource().V1alpha2().ResourceClaimTemplates() - claimController, err := resourceclaim.NewController(klog.FromContext(ctx), clientSet, podInformer, claimInformer, claimTemplateInformer) + claimController, err := resourceclaim.NewController(klog.FromContext(ctx), clientSet, podInformer, schedulingInformer, claimInformer, claimTemplateInformer) if err != nil { tb.Fatalf("Error creating claim controller: %v", err) }