From b2c39798f4cbf432547d70c30b7c2ea9a86c060d Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 1 Sep 2022 13:51:37 +0200 Subject: [PATCH] staging dra: use MutationCache in controller Directly after modifying a ResourceClaim in the apiserver, the locally cached copy is outdated until the informer receives the update. If any operation looks at the claim during that time frame, it will act based on stale information. For example, it might try to allocate again. If that works because of idempotency, then the following update operation fails with a conflict error. This is harmless, but leads to confusing log output. It can be avoided by keeping a copy of the updated claim and using that instead of the one from the informer cache. --- .../controller/controller.go | 51 ++++++++++++++----- .../controller/controller_test.go | 15 ++++++ 2 files changed, 54 insertions(+), 12 deletions(-) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/controller/controller.go b/staging/src/k8s.io/dynamic-resource-allocation/controller/controller.go index d0d44858e8e..c59724123a6 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/controller/controller.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/controller/controller.go @@ -138,7 +138,7 @@ type controller struct { eventRecorder record.EventRecorder rcLister resourcev1alpha1listers.ResourceClassLister rcSynced cache.InformerSynced - claimLister resourcev1alpha1listers.ResourceClaimLister + claimCache cache.MutationCache podSchedulingLister resourcev1alpha1listers.PodSchedulingLister claimSynced cache.InformerSynced podSchedulingSynced cache.InformerSynced @@ -177,6 +177,13 @@ func New( queue := workqueue.NewNamedRateLimitingQueue( workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s-queue", name)) + // The mutation cache acts as an additional layer for the informer + // cache and after an update made by the controller returns a more + // recent copy until the informer catches up. + claimInformerCache := claimInformer.Informer().GetIndexer() + claimCache := cache.NewIntegerResourceVersionMutationCache(claimInformerCache, claimInformerCache, 60*time.Second, + false /* only cache updated claims that exist in the informer cache */) + ctrl := &controller{ ctx: ctx, logger: logger, @@ -186,7 +193,7 @@ func New( kubeClient: kubeClient, rcLister: rcInformer.Lister(), rcSynced: rcInformer.Informer().HasSynced, - claimLister: claimInformer.Lister(), + claimCache: claimCache, claimSynced: claimInformer.Informer().HasSynced, podSchedulingLister: podSchedulingInformer.Lister(), podSchedulingSynced: podSchedulingInformer.Informer().HasSynced, @@ -354,12 +361,8 @@ func (ctrl *controller) syncKey(ctx context.Context, key string) (obj runtime.Ob switch prefix { case claimKeyPrefix: - claim, err := ctrl.claimLister.ResourceClaims(namespace).Get(name) - if err != nil { - if k8serrors.IsNotFound(err) { - klog.FromContext(ctx).V(5).Info("ResourceClaim was deleted, no need to process it") - return nil, nil - } + claim, err := ctrl.getCachedClaim(ctx, object) + if claim == nil || err != nil { return nil, err } obj, finalErr = claim, ctrl.syncClaim(ctx, claim) @@ -377,6 +380,22 @@ func (ctrl *controller) syncKey(ctx context.Context, key string) (obj runtime.Ob return } +func (ctrl *controller) getCachedClaim(ctx context.Context, key string) (*resourcev1alpha1.ResourceClaim, error) { + claimObj, exists, err := ctrl.claimCache.GetByKey(key) + if !exists || k8serrors.IsNotFound(err) { + klog.FromContext(ctx).V(5).Info("ResourceClaim not found, no need to process it") + return nil, nil + } + if err != nil { + return nil, err + } + claim, ok := claimObj.(*resourcev1alpha1.ResourceClaim) + if !ok { + return nil, fmt.Errorf("internal error: got %T instead of *resourcev1alpha1.ResourceClaim from claim cache", claimObj) + } + return claim, nil +} + // syncClaim determines which next action may be needed for a ResourceClaim // and does it. func (ctrl *controller) syncClaim(ctx context.Context, claim *resourcev1alpha1.ResourceClaim) error { @@ -414,6 +433,7 @@ func (ctrl *controller) syncClaim(ctx context.Context, claim *resourcev1alpha1.R if err != nil { return fmt.Errorf("remove allocation: %v", err) } + ctrl.claimCache.Mutation(claim) } else { // Ensure that there is no on-going allocation. if err := ctrl.driver.Deallocate(ctx, claim); err != nil { @@ -428,12 +448,15 @@ func (ctrl *controller) syncClaim(ctx context.Context, claim *resourcev1alpha1.R if err != nil { return fmt.Errorf("remove deallocation: %v", err) } + ctrl.claimCache.Mutation(claim) } claim.Finalizers = ctrl.removeFinalizer(claim.Finalizers) - if _, err := ctrl.kubeClient.ResourceV1alpha1().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}); err != nil { + claim, err = ctrl.kubeClient.ResourceV1alpha1().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}) + if err != nil { return fmt.Errorf("remove finalizer: %v", err) } + ctrl.claimCache.Mutation(claim) } // Nothing further to do. The apiserver should remove it shortly. @@ -515,6 +538,7 @@ func (ctrl *controller) allocateClaim(ctx context.Context, if err != nil { return fmt.Errorf("add finalizer: %v", err) } + ctrl.claimCache.Mutation(claim) } logger.V(5).Info("Allocating") @@ -528,16 +552,19 @@ func (ctrl *controller) allocateClaim(ctx context.Context, claim.Status.ReservedFor = append(claim.Status.ReservedFor, *selectedUser) } logger.V(6).Info("Updating claim after allocation", "claim", claim) - if _, err := ctrl.kubeClient.ResourceV1alpha1().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}); err != nil { + claim, err = ctrl.kubeClient.ResourceV1alpha1().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}) + if err != nil { return fmt.Errorf("add allocation: %v", err) } + ctrl.claimCache.Mutation(claim) return nil } func (ctrl *controller) checkPodClaim(ctx context.Context, pod *v1.Pod, podClaim v1.PodResourceClaim) (*ClaimAllocation, error) { claimName := resourceclaim.Name(pod, &podClaim) - claim, err := ctrl.claimLister.ResourceClaims(pod.Namespace).Get(claimName) - if err != nil { + key := pod.Namespace + "/" + claimName + claim, err := ctrl.getCachedClaim(ctx, key) + if claim == nil || err != nil { return nil, err } if podClaim.Source.ResourceClaimTemplateName != nil { diff --git a/staging/src/k8s.io/dynamic-resource-allocation/controller/controller_test.go b/staging/src/k8s.io/dynamic-resource-allocation/controller/controller_test.go index 0aeb2610601..d9b6a3eb586 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/controller/controller_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/controller/controller_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2/ktesting" _ "k8s.io/klog/v2/ktesting/init" ) @@ -378,6 +379,8 @@ func TestController(t *testing.T) { } { t.Run(name, func(t *testing.T) { _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + initialObjects := []runtime.Object{} for _, class := range test.classes { initialObjects = append(initialObjects, class) @@ -396,6 +399,10 @@ func TestController(t *testing.T) { claimInformer := informerFactory.Resource().V1alpha1().ResourceClaims() podInformer := informerFactory.Core().V1().Pods() podSchedulingInformer := informerFactory.Resource().V1alpha1().PodSchedulings() + // Order is important: on function exit, we first must + // cancel, then wait (last-in-first-out). + defer informerFactory.Shutdown() + defer cancel() for _, obj := range initialObjects { switch obj.(type) { @@ -416,6 +423,14 @@ func TestController(t *testing.T) { driver.t = t ctrl := New(ctx, driverName, driver, kubeClient, informerFactory) + informerFactory.Start(ctx.Done()) + if !cache.WaitForCacheSync(ctx.Done(), + informerFactory.Resource().V1alpha1().ResourceClasses().Informer().HasSynced, + informerFactory.Resource().V1alpha1().ResourceClaims().Informer().HasSynced, + informerFactory.Resource().V1alpha1().PodSchedulings().Informer().HasSynced, + ) { + t.Fatal("could not sync caches") + } _, err := ctrl.(*controller).syncKey(ctx, test.key) if err != nil && test.expectedError == "" { t.Fatalf("unexpected error: %v", err)