From d1ba893ad8e732bf8bbeb1e0029fe2335649d480 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 22 Jun 2023 11:21:37 +0200 Subject: [PATCH 1/4] dra resourceclaim controller: refactor isPodDone This covers pods that get deleted before running and will be used more than once soon. --- pkg/controller/resourceclaim/controller.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/controller/resourceclaim/controller.go b/pkg/controller/resourceclaim/controller.go index 3e06237ce48..58b19a19b7a 100644 --- a/pkg/controller/resourceclaim/controller.go +++ b/pkg/controller/resourceclaim/controller.go @@ -167,10 +167,7 @@ func (ec *Controller) enqueuePod(obj interface{}, deleted bool) { } // Release reservations of a deleted or completed pod? - if deleted || - podutil.IsPodTerminal(pod) || - // Deleted and not scheduled: - pod.DeletionTimestamp != nil && pod.Spec.NodeName == "" { + if deleted || isPodDone(pod) { for _, podClaim := range pod.Spec.ResourceClaims { claimName := resourceclaim.Name(pod, &podClaim) ec.queue.Add(claimKeyPrefix + pod.Namespace + "/" + claimName) @@ -481,3 +478,10 @@ func podResourceClaimIndexFunc(obj interface{}) ([]string, error) { } return keys, nil } + +// isPodDone returns true if it is certain that none of the containers are running and never will run. +func isPodDone(pod *v1.Pod) bool { + return podutil.IsPodPhaseTerminal(pod.Status.Phase) || + // Deleted and not scheduled: + pod.DeletionTimestamp != nil && pod.Spec.NodeName == "" +} From 7f5a02fc7e497323c3b93349099c745112663bd9 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 22 Jun 2023 14:10:15 +0200 Subject: [PATCH 2/4] dra resourceclaim controller: enhance logging Adding logging to event handlers makes it more obvious why (or why not) claims and pods need to be processed. --- cmd/kube-controller-manager/app/core.go | 1 + pkg/controller/resourceclaim/controller.go | 49 +++++++++++++------ .../resourceclaim/controller_test.go | 2 +- test/integration/util/util.go | 2 +- 4 files changed, 37 insertions(+), 17 deletions(-) diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index baad5543262..109f8fdb8fc 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -353,6 +353,7 @@ const defaultResourceClaimControllerWorkers = 10 func startResourceClaimController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { ephemeralController, err := resourceclaim.NewController( + klog.FromContext(ctx), controllerContext.ClientBuilder.ClientOrDie("resource-claim-controller"), controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Resource().V1alpha2().ResourceClaims(), diff --git a/pkg/controller/resourceclaim/controller.go b/pkg/controller/resourceclaim/controller.go index 58b19a19b7a..d301183e98a 100644 --- a/pkg/controller/resourceclaim/controller.go +++ b/pkg/controller/resourceclaim/controller.go @@ -98,6 +98,7 @@ const ( // NewController creates a ResourceClaim controller. func NewController( + logger klog.Logger, kubeClient clientset.Interface, podInformer v1informers.PodInformer, claimInformer resourcev1alpha2informers.ResourceClaimInformer, @@ -120,23 +121,27 @@ func NewController( if _, err := podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - ec.enqueuePod(obj, false) + ec.enqueuePod(logger, obj, false) }, UpdateFunc: func(old, updated interface{}) { - ec.enqueuePod(updated, false) + ec.enqueuePod(logger, updated, false) }, DeleteFunc: func(obj interface{}) { - ec.enqueuePod(obj, true) + ec.enqueuePod(logger, obj, true) }, }); err != nil { return nil, err } if _, err := claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: ec.onResourceClaimAddOrUpdate, - UpdateFunc: func(old, updated interface{}) { - ec.onResourceClaimAddOrUpdate(updated) + AddFunc: func(obj interface{}) { + ec.onResourceClaimAddOrUpdate(logger, obj) + }, + UpdateFunc: func(old, updated interface{}) { + ec.onResourceClaimAddOrUpdate(logger, updated) + }, + DeleteFunc: func(obj interface{}) { + ec.onResourceClaimDelete(logger, obj) }, - DeleteFunc: ec.onResourceClaimDelete, }); err != nil { return nil, err } @@ -147,7 +152,7 @@ func NewController( return ec, nil } -func (ec *Controller) enqueuePod(obj interface{}, deleted bool) { +func (ec *Controller) enqueuePod(logger klog.Logger, obj interface{}, deleted bool) { if d, ok := obj.(cache.DeletedFinalStateUnknown); ok { obj = d.Obj } @@ -166,11 +171,15 @@ func (ec *Controller) enqueuePod(obj interface{}, deleted bool) { return } + logger.V(6).Info("pod with resource claims changed", "pod", klog.KObj(pod), "deleted", deleted) + // Release reservations of a deleted or completed pod? if deleted || isPodDone(pod) { for _, podClaim := range pod.Spec.ResourceClaims { claimName := resourceclaim.Name(pod, &podClaim) - ec.queue.Add(claimKeyPrefix + pod.Namespace + "/" + claimName) + key := claimKeyPrefix + pod.Namespace + "/" + claimName + logger.V(6).Info("pod is deleted or done, process claim", "pod", klog.KObj(pod), "key", key) + ec.queue.Add(key) } } @@ -179,14 +188,16 @@ func (ec *Controller) enqueuePod(obj interface{}, deleted bool) { for _, podClaim := range pod.Spec.ResourceClaims { if podClaim.Source.ResourceClaimTemplateName != nil { // It has at least one inline template, work on it. - ec.queue.Add(podKeyPrefix + pod.Namespace + "/" + pod.Name) + key := podKeyPrefix + pod.Namespace + "/" + pod.Name + logger.V(6).Info("pod is not deleted, process it", "pod", klog.KObj(pod), "key", key) + ec.queue.Add(key) break } } } } -func (ec *Controller) onResourceClaimAddOrUpdate(obj interface{}) { +func (ec *Controller) onResourceClaimAddOrUpdate(logger klog.Logger, obj interface{}) { claim, ok := obj.(*resourcev1alpha2.ResourceClaim) if !ok { return @@ -195,10 +206,12 @@ func (ec *Controller) onResourceClaimAddOrUpdate(obj interface{}) { // When starting up, we have to check all claims to find those with // stale pods in ReservedFor. During an update, a pod might get added // that already no longer exists. - ec.queue.Add(claimKeyPrefix + claim.Namespace + "/" + claim.Name) + key := claimKeyPrefix + claim.Namespace + "/" + claim.Name + logger.V(6).Info("claim is new or updated, process it", "key", key) + ec.queue.Add(key) } -func (ec *Controller) onResourceClaimDelete(obj interface{}) { +func (ec *Controller) onResourceClaimDelete(logger klog.Logger, obj interface{}) { claim, ok := obj.(*resourcev1alpha2.ResourceClaim) if !ok { return @@ -215,8 +228,13 @@ func (ec *Controller) onResourceClaimDelete(obj interface{}) { runtime.HandleError(fmt.Errorf("listing pods from cache: %v", err)) return } + if len(objs) == 0 { + logger.V(6).Info("claim got deleted while not needed by any pod, nothing to do", "claim", klog.KObj(claim)) + return + } + logger = klog.LoggerWithValues(logger, "claim", klog.KObj(claim)) for _, obj := range objs { - ec.enqueuePod(obj, false) + ec.enqueuePod(logger, obj, false) } } @@ -382,7 +400,7 @@ func (ec *Controller) handleClaim(ctx context.Context, pod *v1.Pod, podClaim v1. } func (ec *Controller) syncClaim(ctx context.Context, namespace, name string) error { - logger := klog.LoggerWithValues(klog.FromContext(ctx), "PVC", klog.KRef(namespace, name)) + logger := klog.LoggerWithValues(klog.FromContext(ctx), "claim", klog.KRef(namespace, name)) ctx = klog.NewContext(ctx, logger) claim, err := ec.claimLister.ResourceClaims(namespace).Get(name) if err != nil { @@ -449,6 +467,7 @@ func (ec *Controller) syncClaim(ctx context.Context, namespace, name string) err return fmt.Errorf("unsupported ReservedFor entry: %v", reservedFor) } + logger.V(5).Info("claim reserved for counts", "currentCount", len(claim.Status.ReservedFor), "claim", klog.KRef(namespace, name), "updatedCount", len(valid)) if len(valid) < len(claim.Status.ReservedFor) { // TODO (#113700): patch claim := claim.DeepCopy() diff --git a/pkg/controller/resourceclaim/controller_test.go b/pkg/controller/resourceclaim/controller_test.go index ee937fe5f07..d6fb1880c2c 100644 --- a/pkg/controller/resourceclaim/controller_test.go +++ b/pkg/controller/resourceclaim/controller_test.go @@ -226,7 +226,7 @@ func TestSyncHandler(t *testing.T) { claimInformer := informerFactory.Resource().V1alpha2().ResourceClaims() templateInformer := informerFactory.Resource().V1alpha2().ResourceClaimTemplates() - ec, err := NewController(fakeKubeClient, podInformer, claimInformer, templateInformer) + ec, err := NewController(klog.TODO(), fakeKubeClient, podInformer, claimInformer, templateInformer) if err != nil { t.Fatalf("error creating ephemeral controller : %v", err) } diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 359459e6846..091f3713c46 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -108,7 +108,7 @@ func CreateResourceClaimController(ctx context.Context, tb testing.TB, clientSet podInformer := informerFactory.Core().V1().Pods() claimInformer := informerFactory.Resource().V1alpha2().ResourceClaims() claimTemplateInformer := informerFactory.Resource().V1alpha2().ResourceClaimTemplates() - claimController, err := resourceclaim.NewController(clientSet, podInformer, claimInformer, claimTemplateInformer) + claimController, err := resourceclaim.NewController(klog.FromContext(ctx), clientSet, podInformer, claimInformer, claimTemplateInformer) if err != nil { tb.Fatalf("Error creating claim controller: %v", err) } From e8a0c42212d1a3d132eb9c6f4892aee099497ed0 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 22 Jun 2023 14:13:42 +0200 Subject: [PATCH 3/4] dra resourceclaim controller: remove reservation for completed pods When a pod is known to never run (again), the reservation for it also can be removed. This is relevant in particular for the job controller. --- pkg/controller/resourceclaim/controller.go | 14 ++++++++----- .../resourceclaim/controller_test.go | 21 +++++++++++++++++++ test/e2e/dra/dra.go | 15 +++++++++++++ 3 files changed, 45 insertions(+), 5 deletions(-) diff --git a/pkg/controller/resourceclaim/controller.go b/pkg/controller/resourceclaim/controller.go index d301183e98a..087e1fc67de 100644 --- a/pkg/controller/resourceclaim/controller.go +++ b/pkg/controller/resourceclaim/controller.go @@ -435,10 +435,10 @@ func (ec *Controller) syncClaim(ctx context.Context, namespace, name string) err keepEntry = false } else { pod, err := ec.podLister.Pods(claim.Namespace).Get(reservedFor.Name) - if err != nil && !errors.IsNotFound(err) { + switch { + case err != nil && !errors.IsNotFound(err): return err - } - if pod == nil { + case err != nil: // We might not have it in our informer cache // yet. Removing the pod while the scheduler is // scheduling it would be bad. We have to be @@ -449,10 +449,14 @@ func (ec *Controller) syncClaim(ctx context.Context, namespace, name string) err return err } if pod == nil || pod.UID != reservedFor.UID { + logger.V(6).Info("remove reservation because pod is gone or got replaced", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name)) keepEntry = false } - } else if pod.UID != reservedFor.UID { - // Pod exists, but is a different incarnation under the same name. + case pod.UID != reservedFor.UID: + logger.V(6).Info("remove reservation because pod got replaced with new instance", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name)) + keepEntry = false + case isPodDone(pod): + logger.V(6).Info("remove reservation because pod will not run anymore", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name)) keepEntry = false } } diff --git a/pkg/controller/resourceclaim/controller_test.go b/pkg/controller/resourceclaim/controller_test.go index d6fb1880c2c..2cadb5d1505 100644 --- a/pkg/controller/resourceclaim/controller_test.go +++ b/pkg/controller/resourceclaim/controller_test.go @@ -187,6 +187,27 @@ func TestSyncHandler(t *testing.T) { expectedClaims: []resourcev1alpha2.ResourceClaim{*testClaim}, expectedMetrics: expectedMetrics{0, 0}, }, + { + name: "clear-reserved-when-done", + pods: func() []*v1.Pod { + pods := []*v1.Pod{testPodWithResource.DeepCopy()} + pods[0].Status.Phase = v1.PodSucceeded + return pods + }(), + key: claimKey(testClaimReserved), + claims: func() []*resourcev1alpha2.ResourceClaim { + claims := []*resourcev1alpha2.ResourceClaim{testClaimReserved.DeepCopy()} + claims[0].OwnerReferences = nil + return claims + }(), + expectedClaims: func() []resourcev1alpha2.ResourceClaim { + claims := []resourcev1alpha2.ResourceClaim{*testClaimReserved.DeepCopy()} + claims[0].OwnerReferences = nil + claims[0].Status.ReservedFor = nil + return claims + }(), + expectedMetrics: expectedMetrics{0, 0}, + }, { name: "remove-reserved", pods: []*v1.Pod{testPod}, diff --git a/test/e2e/dra/dra.go b/test/e2e/dra/dra.go index 59dc3376da3..ef916ce0ba7 100644 --- a/test/e2e/dra/dra.go +++ b/test/e2e/dra/dra.go @@ -230,6 +230,21 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu b.testPod(ctx, f.ClientSet, pod) }) + + ginkgo.It("removes reservation from claim when pod is done", func(ctx context.Context) { + parameters := b.parameters() + pod := b.podExternal() + claim := b.externalClaim(allocationMode) + pod.Spec.Containers[0].Command = []string{"true"} + b.create(ctx, parameters, claim, pod) + + ginkgo.By("waiting for pod to finish") + framework.ExpectNoError(e2epod.WaitForPodNoLongerRunningInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace), "wait for pod to finish") + ginkgo.By("waiting for claim to be unreserved") + gomega.Eventually(ctx, func(ctx context.Context) (*resourcev1alpha2.ResourceClaim, error) { + return f.ClientSet.ResourceV1alpha2().ResourceClaims(pod.Namespace).Get(ctx, claim.Name, metav1.GetOptions{}) + }).WithTimeout(f.Timeouts.PodDelete).Should(gomega.HaveField("Status.ReservedFor", gomega.BeEmpty()), "reservation should have been removed") + }) } ginkgo.Context("with delayed allocation", func() { From a514f40131c298ef7ca375e5ab8fa8ddafa3e74d Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 22 Jun 2023 14:15:17 +0200 Subject: [PATCH 4/4] dra resourceclaim controller: delete generated claims when pod is done When a pod is done, but not getting removed yet for while, then a claim that got generated for that pod can be deleted already. This then also triggers deallocation. --- pkg/controller/resourceclaim/controller.go | 47 +++++++++++++++++++ .../resourceclaim/controller_test.go | 12 +++++ .../rbac/bootstrappolicy/controller_policy.go | 2 +- test/e2e/dra/dra.go | 32 +++++++++++++ 4 files changed, 92 insertions(+), 1 deletion(-) diff --git a/pkg/controller/resourceclaim/controller.go b/pkg/controller/resourceclaim/controller.go index 087e1fc67de..b12739ef9b4 100644 --- a/pkg/controller/resourceclaim/controller.go +++ b/pkg/controller/resourceclaim/controller.go @@ -26,6 +26,7 @@ import ( resourcev1alpha2 "k8s.io/api/resource/v1alpha2" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" v1informers "k8s.io/client-go/informers/core/v1" @@ -42,6 +43,7 @@ import ( "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller/resourceclaim/metrics" + "k8s.io/utils/pointer" ) const ( @@ -482,9 +484,54 @@ func (ec *Controller) syncClaim(ctx context.Context, namespace, name string) err } } + if len(valid) == 0 { + // Claim is not reserved. If it was generated for a pod and + // that pod is not going to run, the claim can be + // deleted. Normally the garbage collector does that, but the + // pod itself might not get deleted for a while. + podName, podUID := owningPod(claim) + if podName != "" { + pod, err := ec.podLister.Pods(claim.Namespace).Get(podName) + switch { + case err == nil: + // Pod already replaced or not going to run? + if pod.UID != podUID || isPodDone(pod) { + // We are certain that the owning pod is not going to need + // the claim and therefore remove the claim. + logger.V(5).Info("deleting unused generated claim", "claim", klog.KObj(claim), "pod", klog.KObj(pod)) + err := ec.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{}) + if err != nil { + return fmt.Errorf("delete claim: %v", err) + } + } else { + logger.V(6).Info("wrong pod content, not deleting claim", "claim", klog.KObj(claim), "podUID", podUID, "podContent", pod) + } + case errors.IsNotFound(err): + // We might not know the pod *yet*. Instead of doing an expensive API call, + // let the garbage collector handle the case that the pod is truly gone. + logger.V(5).Info("pod for claim not found", "claim", klog.KObj(claim), "pod", klog.KRef(claim.Namespace, podName)) + default: + return fmt.Errorf("lookup pod: %v", err) + } + } else { + logger.V(5).Info("claim not generated for a pod", "claim", klog.KObj(claim)) + } + } + return nil } +func owningPod(claim *resourcev1alpha2.ResourceClaim) (string, types.UID) { + for _, owner := range claim.OwnerReferences { + if pointer.BoolDeref(owner.Controller, false) && + owner.APIVersion == "v1" && + owner.Kind == "Pod" { + return owner.Name, owner.UID + } + } + return "", "" +} + // podResourceClaimIndexFunc is an index function that returns ResourceClaim keys (= // namespace/name) for ResourceClaimTemplates in a given pod. func podResourceClaimIndexFunc(obj interface{}) ([]string, error) { diff --git a/pkg/controller/resourceclaim/controller_test.go b/pkg/controller/resourceclaim/controller_test.go index 2cadb5d1505..aef37dbf50b 100644 --- a/pkg/controller/resourceclaim/controller_test.go +++ b/pkg/controller/resourceclaim/controller_test.go @@ -216,6 +216,18 @@ func TestSyncHandler(t *testing.T) { expectedClaims: []resourcev1alpha2.ResourceClaim{*testClaimReserved}, expectedMetrics: expectedMetrics{0, 0}, }, + { + name: "delete-claim-when-done", + pods: func() []*v1.Pod { + pods := []*v1.Pod{testPodWithResource.DeepCopy()} + pods[0].Status.Phase = v1.PodSucceeded + return pods + }(), + key: claimKey(testClaimReserved), + claims: []*resourcev1alpha2.ResourceClaim{testClaimReserved}, + expectedClaims: nil, + expectedMetrics: expectedMetrics{0, 0}, + }, } for _, tc := range tests { diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index 5e1b3f85486..1c90e45c7a6 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -212,7 +212,7 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding) Rules: []rbacv1.PolicyRule{ rbacv1helpers.NewRule("get", "list", "watch").Groups(legacyGroup).Resources("pods").RuleOrDie(), rbacv1helpers.NewRule("update").Groups(legacyGroup).Resources("pods/finalizers").RuleOrDie(), - rbacv1helpers.NewRule("get", "list", "watch", "create").Groups(resourceGroup).Resources("resourceclaims").RuleOrDie(), + rbacv1helpers.NewRule("get", "list", "watch", "create", "delete").Groups(resourceGroup).Resources("resourceclaims").RuleOrDie(), rbacv1helpers.NewRule("update", "patch").Groups(resourceGroup).Resources("resourceclaims/status").RuleOrDie(), eventsRule(), }, diff --git a/test/e2e/dra/dra.go b/test/e2e/dra/dra.go index ef916ce0ba7..d0d1399197f 100644 --- a/test/e2e/dra/dra.go +++ b/test/e2e/dra/dra.go @@ -245,6 +245,38 @@ var _ = ginkgo.Describe("[sig-node] DRA [Feature:DynamicResourceAllocation]", fu return f.ClientSet.ResourceV1alpha2().ResourceClaims(pod.Namespace).Get(ctx, claim.Name, metav1.GetOptions{}) }).WithTimeout(f.Timeouts.PodDelete).Should(gomega.HaveField("Status.ReservedFor", gomega.BeEmpty()), "reservation should have been removed") }) + + ginkgo.It("deletes generated claims when pod is done", func(ctx context.Context) { + parameters := b.parameters() + pod, template := b.podInline(allocationMode) + pod.Spec.Containers[0].Command = []string{"true"} + b.create(ctx, parameters, template, pod) + + ginkgo.By("waiting for pod to finish") + framework.ExpectNoError(e2epod.WaitForPodNoLongerRunningInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace), "wait for pod to finish") + ginkgo.By("waiting for claim to be deleted") + gomega.Eventually(ctx, func(ctx context.Context) ([]resourcev1alpha2.ResourceClaim, error) { + claims, err := f.ClientSet.ResourceV1alpha2().ResourceClaims(pod.Namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + return claims.Items, nil + }).WithTimeout(f.Timeouts.PodDelete).Should(gomega.BeEmpty(), "claim should have been deleted") + }) + + ginkgo.It("does not delete generated claims when pod is restarting", func(ctx context.Context) { + parameters := b.parameters() + pod, template := b.podInline(allocationMode) + pod.Spec.Containers[0].Command = []string{"sh", "-c", "sleep 1; exit 1"} + pod.Spec.RestartPolicy = v1.RestartPolicyAlways + b.create(ctx, parameters, template, pod) + + ginkgo.By("waiting for pod to restart twice") + gomega.Eventually(ctx, func(ctx context.Context) (*v1.Pod, error) { + return f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + }).WithTimeout(f.Timeouts.PodStartSlow).Should(gomega.HaveField("Status.ContainerStatuses", gomega.ContainElements(gomega.HaveField("RestartCount", gomega.BeNumerically(">=", 2))))) + gomega.Expect(driver.Controller.GetNumAllocations()).To(gomega.Equal(int64(1)), "number of allocations") + }) } ginkgo.Context("with delayed allocation", func() {