diff --git a/staging/src/k8s.io/dynamic-resource-allocation/controller/controller.go b/staging/src/k8s.io/dynamic-resource-allocation/controller/controller.go index d0d44858e8e..c59724123a6 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/controller/controller.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/controller/controller.go @@ -138,7 +138,7 @@ type controller struct { eventRecorder record.EventRecorder rcLister resourcev1alpha1listers.ResourceClassLister rcSynced cache.InformerSynced - claimLister resourcev1alpha1listers.ResourceClaimLister + claimCache cache.MutationCache podSchedulingLister resourcev1alpha1listers.PodSchedulingLister claimSynced cache.InformerSynced podSchedulingSynced cache.InformerSynced @@ -177,6 +177,13 @@ func New( queue := workqueue.NewNamedRateLimitingQueue( workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s-queue", name)) + // The mutation cache acts as an additional layer for the informer + // cache and after an update made by the controller returns a more + // recent copy until the informer catches up. + claimInformerCache := claimInformer.Informer().GetIndexer() + claimCache := cache.NewIntegerResourceVersionMutationCache(claimInformerCache, claimInformerCache, 60*time.Second, + false /* only cache updated claims that exist in the informer cache */) + ctrl := &controller{ ctx: ctx, logger: logger, @@ -186,7 +193,7 @@ func New( kubeClient: kubeClient, rcLister: rcInformer.Lister(), rcSynced: rcInformer.Informer().HasSynced, - claimLister: claimInformer.Lister(), + claimCache: claimCache, claimSynced: claimInformer.Informer().HasSynced, podSchedulingLister: podSchedulingInformer.Lister(), podSchedulingSynced: podSchedulingInformer.Informer().HasSynced, @@ -354,12 +361,8 @@ func (ctrl *controller) syncKey(ctx context.Context, key string) (obj runtime.Ob switch prefix { case claimKeyPrefix: - claim, err := ctrl.claimLister.ResourceClaims(namespace).Get(name) - if err != nil { - if k8serrors.IsNotFound(err) { - klog.FromContext(ctx).V(5).Info("ResourceClaim was deleted, no need to process it") - return nil, nil - } + claim, err := ctrl.getCachedClaim(ctx, object) + if claim == nil || err != nil { return nil, err } obj, finalErr = claim, ctrl.syncClaim(ctx, claim) @@ -377,6 +380,22 @@ func (ctrl *controller) syncKey(ctx context.Context, key string) (obj runtime.Ob return } +func (ctrl *controller) getCachedClaim(ctx context.Context, key string) (*resourcev1alpha1.ResourceClaim, error) { + claimObj, exists, err := ctrl.claimCache.GetByKey(key) + if !exists || k8serrors.IsNotFound(err) { + klog.FromContext(ctx).V(5).Info("ResourceClaim not found, no need to process it") + return nil, nil + } + if err != nil { + return nil, err + } + claim, ok := claimObj.(*resourcev1alpha1.ResourceClaim) + if !ok { + return nil, fmt.Errorf("internal error: got %T instead of *resourcev1alpha1.ResourceClaim from claim cache", claimObj) + } + return claim, nil +} + // syncClaim determines which next action may be needed for a ResourceClaim // and does it. func (ctrl *controller) syncClaim(ctx context.Context, claim *resourcev1alpha1.ResourceClaim) error { @@ -414,6 +433,7 @@ func (ctrl *controller) syncClaim(ctx context.Context, claim *resourcev1alpha1.R if err != nil { return fmt.Errorf("remove allocation: %v", err) } + ctrl.claimCache.Mutation(claim) } else { // Ensure that there is no on-going allocation. if err := ctrl.driver.Deallocate(ctx, claim); err != nil { @@ -428,12 +448,15 @@ func (ctrl *controller) syncClaim(ctx context.Context, claim *resourcev1alpha1.R if err != nil { return fmt.Errorf("remove deallocation: %v", err) } + ctrl.claimCache.Mutation(claim) } claim.Finalizers = ctrl.removeFinalizer(claim.Finalizers) - if _, err := ctrl.kubeClient.ResourceV1alpha1().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}); err != nil { + claim, err = ctrl.kubeClient.ResourceV1alpha1().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}) + if err != nil { return fmt.Errorf("remove finalizer: %v", err) } + ctrl.claimCache.Mutation(claim) } // Nothing further to do. The apiserver should remove it shortly. @@ -515,6 +538,7 @@ func (ctrl *controller) allocateClaim(ctx context.Context, if err != nil { return fmt.Errorf("add finalizer: %v", err) } + ctrl.claimCache.Mutation(claim) } logger.V(5).Info("Allocating") @@ -528,16 +552,19 @@ func (ctrl *controller) allocateClaim(ctx context.Context, claim.Status.ReservedFor = append(claim.Status.ReservedFor, *selectedUser) } logger.V(6).Info("Updating claim after allocation", "claim", claim) - if _, err := ctrl.kubeClient.ResourceV1alpha1().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}); err != nil { + claim, err = ctrl.kubeClient.ResourceV1alpha1().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}) + if err != nil { return fmt.Errorf("add allocation: %v", err) } + ctrl.claimCache.Mutation(claim) return nil } func (ctrl *controller) checkPodClaim(ctx context.Context, pod *v1.Pod, podClaim v1.PodResourceClaim) (*ClaimAllocation, error) { claimName := resourceclaim.Name(pod, &podClaim) - claim, err := ctrl.claimLister.ResourceClaims(pod.Namespace).Get(claimName) - if err != nil { + key := pod.Namespace + "/" + claimName + claim, err := ctrl.getCachedClaim(ctx, key) + if claim == nil || err != nil { return nil, err } if podClaim.Source.ResourceClaimTemplateName != nil { diff --git a/staging/src/k8s.io/dynamic-resource-allocation/controller/controller_test.go b/staging/src/k8s.io/dynamic-resource-allocation/controller/controller_test.go index 0aeb2610601..d9b6a3eb586 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/controller/controller_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/controller/controller_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2/ktesting" _ "k8s.io/klog/v2/ktesting/init" ) @@ -378,6 +379,8 @@ func TestController(t *testing.T) { } { t.Run(name, func(t *testing.T) { _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + initialObjects := []runtime.Object{} for _, class := range test.classes { initialObjects = append(initialObjects, class) @@ -396,6 +399,10 @@ func TestController(t *testing.T) { claimInformer := informerFactory.Resource().V1alpha1().ResourceClaims() podInformer := informerFactory.Core().V1().Pods() podSchedulingInformer := informerFactory.Resource().V1alpha1().PodSchedulings() + // Order is important: on function exit, we first must + // cancel, then wait (last-in-first-out). + defer informerFactory.Shutdown() + defer cancel() for _, obj := range initialObjects { switch obj.(type) { @@ -416,6 +423,14 @@ func TestController(t *testing.T) { driver.t = t ctrl := New(ctx, driverName, driver, kubeClient, informerFactory) + informerFactory.Start(ctx.Done()) + if !cache.WaitForCacheSync(ctx.Done(), + informerFactory.Resource().V1alpha1().ResourceClasses().Informer().HasSynced, + informerFactory.Resource().V1alpha1().ResourceClaims().Informer().HasSynced, + informerFactory.Resource().V1alpha1().PodSchedulings().Informer().HasSynced, + ) { + t.Fatal("could not sync caches") + } _, err := ctrl.(*controller).syncKey(ctx, test.key) if err != nil && test.expectedError == "" { t.Fatalf("unexpected error: %v", err)