staging dra: use MutationCache in controller

Directly after modifying a ResourceClaim in the apiserver, the locally cached
copy is outdated until the informer receives the update. If any operation looks
at the claim during that time frame, it will act based on stale
information. For example, it might try to allocate again. If that works because
of idempotency, then the following update operation fails with a conflict
error.

This is harmless, but leads to confusing log output. It can be avoided by
keeping a copy of the updated claim and using that instead of the one from the
informer cache.
This commit is contained in:
Patrick Ohly 2022-09-01 13:51:37 +02:00
parent bb040efd84
commit b2c39798f4
2 changed files with 54 additions and 12 deletions

View File

@ -138,7 +138,7 @@ type controller struct {
eventRecorder record.EventRecorder
rcLister resourcev1alpha1listers.ResourceClassLister
rcSynced cache.InformerSynced
claimLister resourcev1alpha1listers.ResourceClaimLister
claimCache cache.MutationCache
podSchedulingLister resourcev1alpha1listers.PodSchedulingLister
claimSynced cache.InformerSynced
podSchedulingSynced cache.InformerSynced
@ -177,6 +177,13 @@ func New(
queue := workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s-queue", name))
// The mutation cache acts as an additional layer for the informer
// cache and after an update made by the controller returns a more
// recent copy until the informer catches up.
claimInformerCache := claimInformer.Informer().GetIndexer()
claimCache := cache.NewIntegerResourceVersionMutationCache(claimInformerCache, claimInformerCache, 60*time.Second,
false /* only cache updated claims that exist in the informer cache */)
ctrl := &controller{
ctx: ctx,
logger: logger,
@ -186,7 +193,7 @@ func New(
kubeClient: kubeClient,
rcLister: rcInformer.Lister(),
rcSynced: rcInformer.Informer().HasSynced,
claimLister: claimInformer.Lister(),
claimCache: claimCache,
claimSynced: claimInformer.Informer().HasSynced,
podSchedulingLister: podSchedulingInformer.Lister(),
podSchedulingSynced: podSchedulingInformer.Informer().HasSynced,
@ -354,12 +361,8 @@ func (ctrl *controller) syncKey(ctx context.Context, key string) (obj runtime.Ob
switch prefix {
case claimKeyPrefix:
claim, err := ctrl.claimLister.ResourceClaims(namespace).Get(name)
if err != nil {
if k8serrors.IsNotFound(err) {
klog.FromContext(ctx).V(5).Info("ResourceClaim was deleted, no need to process it")
return nil, nil
}
claim, err := ctrl.getCachedClaim(ctx, object)
if claim == nil || err != nil {
return nil, err
}
obj, finalErr = claim, ctrl.syncClaim(ctx, claim)
@ -377,6 +380,22 @@ func (ctrl *controller) syncKey(ctx context.Context, key string) (obj runtime.Ob
return
}
func (ctrl *controller) getCachedClaim(ctx context.Context, key string) (*resourcev1alpha1.ResourceClaim, error) {
claimObj, exists, err := ctrl.claimCache.GetByKey(key)
if !exists || k8serrors.IsNotFound(err) {
klog.FromContext(ctx).V(5).Info("ResourceClaim not found, no need to process it")
return nil, nil
}
if err != nil {
return nil, err
}
claim, ok := claimObj.(*resourcev1alpha1.ResourceClaim)
if !ok {
return nil, fmt.Errorf("internal error: got %T instead of *resourcev1alpha1.ResourceClaim from claim cache", claimObj)
}
return claim, nil
}
// syncClaim determines which next action may be needed for a ResourceClaim
// and does it.
func (ctrl *controller) syncClaim(ctx context.Context, claim *resourcev1alpha1.ResourceClaim) error {
@ -414,6 +433,7 @@ func (ctrl *controller) syncClaim(ctx context.Context, claim *resourcev1alpha1.R
if err != nil {
return fmt.Errorf("remove allocation: %v", err)
}
ctrl.claimCache.Mutation(claim)
} else {
// Ensure that there is no on-going allocation.
if err := ctrl.driver.Deallocate(ctx, claim); err != nil {
@ -428,12 +448,15 @@ func (ctrl *controller) syncClaim(ctx context.Context, claim *resourcev1alpha1.R
if err != nil {
return fmt.Errorf("remove deallocation: %v", err)
}
ctrl.claimCache.Mutation(claim)
}
claim.Finalizers = ctrl.removeFinalizer(claim.Finalizers)
if _, err := ctrl.kubeClient.ResourceV1alpha1().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}); err != nil {
claim, err = ctrl.kubeClient.ResourceV1alpha1().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("remove finalizer: %v", err)
}
ctrl.claimCache.Mutation(claim)
}
// Nothing further to do. The apiserver should remove it shortly.
@ -515,6 +538,7 @@ func (ctrl *controller) allocateClaim(ctx context.Context,
if err != nil {
return fmt.Errorf("add finalizer: %v", err)
}
ctrl.claimCache.Mutation(claim)
}
logger.V(5).Info("Allocating")
@ -528,16 +552,19 @@ func (ctrl *controller) allocateClaim(ctx context.Context,
claim.Status.ReservedFor = append(claim.Status.ReservedFor, *selectedUser)
}
logger.V(6).Info("Updating claim after allocation", "claim", claim)
if _, err := ctrl.kubeClient.ResourceV1alpha1().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}); err != nil {
claim, err = ctrl.kubeClient.ResourceV1alpha1().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("add allocation: %v", err)
}
ctrl.claimCache.Mutation(claim)
return nil
}
func (ctrl *controller) checkPodClaim(ctx context.Context, pod *v1.Pod, podClaim v1.PodResourceClaim) (*ClaimAllocation, error) {
claimName := resourceclaim.Name(pod, &podClaim)
claim, err := ctrl.claimLister.ResourceClaims(pod.Namespace).Get(claimName)
if err != nil {
key := pod.Namespace + "/" + claimName
claim, err := ctrl.getCachedClaim(ctx, key)
if claim == nil || err != nil {
return nil, err
}
if podClaim.Source.ResourceClaimTemplateName != nil {

View File

@ -31,6 +31,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2/ktesting"
_ "k8s.io/klog/v2/ktesting/init"
)
@ -378,6 +379,8 @@ func TestController(t *testing.T) {
} {
t.Run(name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
initialObjects := []runtime.Object{}
for _, class := range test.classes {
initialObjects = append(initialObjects, class)
@ -396,6 +399,10 @@ func TestController(t *testing.T) {
claimInformer := informerFactory.Resource().V1alpha1().ResourceClaims()
podInformer := informerFactory.Core().V1().Pods()
podSchedulingInformer := informerFactory.Resource().V1alpha1().PodSchedulings()
// Order is important: on function exit, we first must
// cancel, then wait (last-in-first-out).
defer informerFactory.Shutdown()
defer cancel()
for _, obj := range initialObjects {
switch obj.(type) {
@ -416,6 +423,14 @@ func TestController(t *testing.T) {
driver.t = t
ctrl := New(ctx, driverName, driver, kubeClient, informerFactory)
informerFactory.Start(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(),
informerFactory.Resource().V1alpha1().ResourceClasses().Informer().HasSynced,
informerFactory.Resource().V1alpha1().ResourceClaims().Informer().HasSynced,
informerFactory.Resource().V1alpha1().PodSchedulings().Informer().HasSynced,
) {
t.Fatal("could not sync caches")
}
_, err := ctrl.(*controller).syncKey(ctx, test.key)
if err != nil && test.expectedError == "" {
t.Fatalf("unexpected error: %v", err)