From 1b63639d31f40c5d0953bb67b590bd84eae256a7 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Sat, 15 Jun 2024 18:15:13 +0200 Subject: [PATCH] DRA scheduler: use assume cache to list claims This finishes the transition to the assume cache as source of truth for the current set of claims. The tests have to be adapted. It's not enough anymore to directly put objects into the informer store because that doesn't change the assume cache content. Instead, normal Create/Update calls and waiting for the cache update are needed. --- .../dynamicresources/dynamicresources.go | 9 ++- .../dynamicresources/dynamicresources_test.go | 71 +++++++++++------ .../util/assumecache/assume_cache.go | 79 +++++++++++-------- .../util/assumecache/assume_cache_test.go | 13 +-- 4 files changed, 108 insertions(+), 64 deletions(-) diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 715f3f9b1c6..e701a55d9c2 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -272,7 +272,6 @@ type dynamicResources struct { enabled bool fh framework.Handle clientset kubernetes.Interface - claimLister resourcev1alpha2listers.ResourceClaimLister classLister resourcev1alpha2listers.ResourceClassLister podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister claimParametersLister resourcev1alpha2listers.ResourceClaimParametersLister @@ -347,7 +346,6 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe enabled: true, fh: fh, clientset: fh.ClientSet(), - claimLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Lister(), classLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClasses().Lister(), podSchedulingContextLister: fh.SharedInformerFactory().Resource().V1alpha2().PodSchedulingContexts().Lister(), claimParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaimParameters().Lister(), @@ -791,11 +789,16 @@ func (pl *dynamicResources) foreachPodResourceClaim(pod *v1.Pod, cb func(podReso if claimName == nil { continue } - claim, err := pl.claimLister.ResourceClaims(pod.Namespace).Get(*claimName) + obj, err := pl.claimAssumeCache.Get(pod.Namespace + "/" + *claimName) if err != nil { return err } + claim, ok := obj.(*resourcev1alpha2.ResourceClaim) + if !ok { + return fmt.Errorf("unexpected object type %T for assumed object %s/%s", obj, pod.Namespace, *claimName) + } + if claim.DeletionTimestamp != nil { return fmt.Errorf("resourceclaim %q is being deleted", claim.Name) } diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go index 90f08a1aa7c..562a8de7eda 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go @@ -1149,12 +1149,13 @@ func TestPlugin(t *testing.T) { } type testContext struct { - ctx context.Context - client *fake.Clientset - informerFactory informers.SharedInformerFactory - p *dynamicResources - nodeInfos []*framework.NodeInfo - state *framework.CycleState + ctx context.Context + client *fake.Clientset + informerFactory informers.SharedInformerFactory + claimAssumeCache *assumecache.AssumeCache + p *dynamicResources + nodeInfos []*framework.NodeInfo + state *framework.CycleState } func (tc *testContext) verify(t *testing.T, expected result, initialObjects []metav1.Object, result interface{}, status *framework.Status) { @@ -1320,11 +1321,11 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl tc.client.PrependReactor("list", "resourceclassparameters", createListReactor(tc.client.Tracker(), "ResourceClassParameters")) tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0) - assumeCache := assumecache.NewAssumeCache(tCtx.Logger(), tc.informerFactory.Resource().V1alpha2().ResourceClaims().Informer(), "resource claim", "", nil) + tc.claimAssumeCache = assumecache.NewAssumeCache(tCtx.Logger(), tc.informerFactory.Resource().V1alpha2().ResourceClaims().Informer(), "resource claim", "", nil) opts := []runtime.Option{ runtime.WithClientSet(tc.client), runtime.WithInformerFactory(tc.informerFactory), - runtime.WithResourceClaimCache(assumeCache), + runtime.WithResourceClaimCache(tc.claimAssumeCache), } fh, err := runtime.NewFramework(tCtx, nil, nil, opts...) if err != nil { @@ -1491,6 +1492,7 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) { }, "backoff-wrong-old-object": { pod: podWithClaimName, + claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim}, oldObj: "not-a-claim", newObj: pendingImmediateClaim, expectedErr: true, @@ -1519,15 +1521,10 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) { }, "structured-claim-deallocate": { pod: podWithClaimName, - claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim}, - oldObj: func() *resourcev1alpha2.ResourceClaim { - claim := structuredAllocatedClaim.DeepCopy() - claim.Name += "-other" - return claim - }(), + claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim, otherStructuredAllocatedClaim}, + oldObj: otherStructuredAllocatedClaim, newObj: func() *resourcev1alpha2.ResourceClaim { - claim := structuredAllocatedClaim.DeepCopy() - claim.Name += "-other" + claim := otherStructuredAllocatedClaim.DeepCopy() claim.Status.Allocation = nil return claim }(), @@ -1539,18 +1536,48 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) { for name, tc := range testcases { t.Run(name, func(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) + logger, tCtx := ktesting.NewTestContext(t) testCtx := setup(t, nil, tc.claims, nil, nil, nil) + oldObj := tc.oldObj + newObj := tc.newObj if claim, ok := tc.newObj.(*resourcev1alpha2.ResourceClaim); ok { - // Update the informer because the lister gets called and must have the claim. - store := testCtx.informerFactory.Resource().V1alpha2().ResourceClaims().Informer().GetStore() + // Add or update through the client and wait until the event is processed. + claimKey := claim.Namespace + "/" + claim.Name if tc.oldObj == nil { - require.NoError(t, store.Add(claim)) + // Some test claims already have it. Clear for create. + createClaim := claim.DeepCopy() + createClaim.UID = "" + storedClaim, err := testCtx.client.ResourceV1alpha2().ResourceClaims(createClaim.Namespace).Create(tCtx, createClaim, metav1.CreateOptions{}) + require.NoError(t, err, "create claim") + claim = storedClaim } else { - require.NoError(t, store.Update(claim)) + cachedClaim, err := testCtx.claimAssumeCache.Get(claimKey) + require.NoError(t, err, "retrieve old claim") + updateClaim := claim.DeepCopy() + // The test claim doesn't have those (generated dynamically), so copy them. + updateClaim.UID = cachedClaim.(*resourcev1alpha2.ResourceClaim).UID + updateClaim.ResourceVersion = cachedClaim.(*resourcev1alpha2.ResourceClaim).ResourceVersion + + storedClaim, err := testCtx.client.ResourceV1alpha2().ResourceClaims(updateClaim.Namespace).Update(tCtx, updateClaim, metav1.UpdateOptions{}) + require.NoError(t, err, "update claim") + claim = storedClaim } + + // Eventually the assume cache will have it, too. + require.EventuallyWithT(t, func(t *assert.CollectT) { + cachedClaim, err := testCtx.claimAssumeCache.Get(claimKey) + require.NoError(t, err, "retrieve claim") + if cachedClaim.(*resourcev1alpha2.ResourceClaim).ResourceVersion != claim.ResourceVersion { + t.Errorf("cached claim not updated yet") + } + }, time.Minute, time.Second, "claim assume cache must have new or updated claim") + + // This has the actual UID and ResourceVersion, + // which is relevant for + // isSchedulableAfterClaimChange. + newObj = claim } - actualHint, err := testCtx.p.isSchedulableAfterClaimChange(logger, tc.pod, tc.oldObj, tc.newObj) + actualHint, err := testCtx.p.isSchedulableAfterClaimChange(logger, tc.pod, oldObj, newObj) if tc.expectedErr { require.Error(t, err) return diff --git a/pkg/scheduler/util/assumecache/assume_cache.go b/pkg/scheduler/util/assumecache/assume_cache.go index a517db97443..c7392129cbd 100644 --- a/pkg/scheduler/util/assumecache/assume_cache.go +++ b/pkg/scheduler/util/assumecache/assume_cache.go @@ -125,7 +125,8 @@ type AssumeCache struct { rwMutex sync.RWMutex // All registered event handlers. - eventHandlers []cache.ResourceEventHandler + eventHandlers []cache.ResourceEventHandler + handlerRegistration cache.ResourceEventHandlerRegistration // The eventQueue contains functions which deliver an event to one // event handler. @@ -203,7 +204,7 @@ func NewAssumeCache(logger klog.Logger, informer Informer, description, indexNam // Unit tests don't use informers if informer != nil { // Cannot fail in practice?! No-one bothers checking the error. - _, _ = informer.AddEventHandler( + c.handlerRegistration, _ = informer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: c.add, UpdateFunc: c.update, @@ -257,18 +258,7 @@ func (c *AssumeCache) add(obj interface{}) { c.logger.Info("Error occurred while updating stored object", "err", err) } else { c.logger.V(10).Info("Adding object to assume cache", "description", c.description, "cacheKey", name, "assumeCache", obj) - for _, handler := range c.eventHandlers { - handler := handler - if oldObj == nil { - c.eventQueue.Push(func() { - handler.OnAdd(obj, false) - }) - } else { - c.eventQueue.Push(func() { - handler.OnUpdate(oldObj, obj) - }) - } - } + c.pushEvent(oldObj, obj) } } @@ -304,11 +294,32 @@ func (c *AssumeCache) delete(obj interface{}) { c.logger.Error(err, "Failed to delete", "description", c.description, "cacheKey", name) } + c.pushEvent(oldObj, nil) +} + +// pushEvent gets called while the mutex is locked for writing. +// It ensures that all currently registered event handlers get +// notified about a change when the caller starts delivering +// those with emitEvents. +// +// For a delete event, newObj is nil. For an add, oldObj is nil. +// An update has both as non-nil. +func (c *AssumeCache) pushEvent(oldObj, newObj interface{}) { for _, handler := range c.eventHandlers { handler := handler - c.eventQueue.Push(func() { - handler.OnDelete(oldObj) - }) + if oldObj == nil { + c.eventQueue.Push(func() { + handler.OnAdd(newObj, false) + }) + } else if newObj == nil { + c.eventQueue.Push(func() { + handler.OnDelete(oldObj) + }) + } else { + c.eventQueue.Push(func() { + handler.OnUpdate(oldObj, newObj) + }) + } } } @@ -441,13 +452,7 @@ func (c *AssumeCache) Assume(obj interface{}) error { return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion) } - for _, handler := range c.eventHandlers { - handler := handler - oldObj := objInfo.latestObj - c.eventQueue.Push(func() { - handler.OnUpdate(oldObj, obj) - }) - } + c.pushEvent(objInfo.latestObj, obj) // Only update the cached object objInfo.latestObj = obj @@ -467,14 +472,7 @@ func (c *AssumeCache) Restore(objName string) { c.logger.V(5).Info("Restore object", "description", c.description, "cacheKey", objName, "err", err) } else { if objInfo.latestObj != objInfo.apiObj { - for _, handler := range c.eventHandlers { - handler := handler - oldObj, obj := objInfo.latestObj, objInfo.apiObj - c.eventQueue.Push(func() { - handler.OnUpdate(oldObj, obj) - }) - } - + c.pushEvent(objInfo.latestObj, objInfo.apiObj) objInfo.latestObj = objInfo.apiObj } c.logger.V(4).Info("Restored object", "description", c.description, "cacheKey", objName) @@ -485,7 +483,9 @@ func (c *AssumeCache) Restore(objName string) { // single handler are delivered sequentially, but there is no // coordination between different handlers. A handler may use the // cache. -func (c *AssumeCache) AddEventHandler(handler cache.ResourceEventHandler) { +// +// The return value can be used to wait for cache synchronization. +func (c *AssumeCache) AddEventHandler(handler cache.ResourceEventHandler) cache.ResourceEventHandlerRegistration { defer c.emitEvents() c.rwMutex.Lock() defer c.rwMutex.Unlock() @@ -497,6 +497,13 @@ func (c *AssumeCache) AddEventHandler(handler cache.ResourceEventHandler) { handler.OnAdd(obj, true) }) } + + if c.handlerRegistration == nil { + // No informer, so immediately synced. + return syncedHandlerRegistration{} + } + + return c.handlerRegistration } // emitEvents delivers all pending events that are in the queue, in the order @@ -516,3 +523,9 @@ func (c *AssumeCache) emitEvents() { }() } } + +// syncedHandlerRegistration is an implementation of ResourceEventHandlerRegistration +// which always returns true. +type syncedHandlerRegistration struct{} + +func (syncedHandlerRegistration) HasSynced() bool { return true } diff --git a/pkg/scheduler/util/assumecache/assume_cache_test.go b/pkg/scheduler/util/assumecache/assume_cache_test.go index 8e4336730aa..a18d51e0a87 100644 --- a/pkg/scheduler/util/assumecache/assume_cache_test.go +++ b/pkg/scheduler/util/assumecache/assume_cache_test.go @@ -290,7 +290,7 @@ func TestRestore(t *testing.T) { events.verifyAndFlush(tCtx, []event{{What: "add", Obj: oldObj}}) }) - // Restore object. + // Restore the same object. ktesting.Step(tCtx, "initial Restore", func(tCtx ktesting.TContext) { cache.Restore(oldObj.GetName()) verify(tCtx, cache, oldObj.GetName(), oldObj, oldObj) @@ -306,7 +306,7 @@ func TestRestore(t *testing.T) { events.verifyAndFlush(tCtx, []event{{What: "update", OldObj: oldObj, Obj: newObj}}) }) - // Restore object. + // Restore the same object. ktesting.Step(tCtx, "second Restore", func(tCtx ktesting.TContext) { cache.Restore(oldObj.GetName()) verify(tCtx, cache, oldObj.GetName(), oldObj, oldObj) @@ -346,6 +346,7 @@ func TestEvents(t *testing.T) { ktesting.Step(tCtx, "nil add", func(tCtx ktesting.TContext) { informer.add(nil) verify(tCtx, cache, key, newObj, newObj) + events.verifyAndFlush(tCtx, nil) }) ktesting.Step(tCtx, "nop update", func(tCtx ktesting.TContext) { informer.update(oldObj) @@ -378,7 +379,7 @@ func TestEventHandlers(t *testing.T) { tCtx, cache, informer := newTest(t) handlers := make([]mockEventHandler, 5) - objs := make([]metav1.Object, 0, 20) + var objs []metav1.Object for i := 0; i < 5; i++ { objs = append(objs, makeObj(fmt.Sprintf("test-pvc%v", i), "1", "")) informer.add(objs[i]) @@ -437,7 +438,7 @@ func TestEventHandlerConcurrency(t *testing.T) { tCtx, cache, informer := newTest(t) handlers := make([]mockEventHandler, 5) - objs := make([]metav1.Object, 0, 20) + var objs []metav1.Object for i := 0; i < 5; i++ { objs = append(objs, makeObj(fmt.Sprintf("test-pvc%v", i), "1", "")) } @@ -487,7 +488,7 @@ func TestListNoIndexer(t *testing.T) { tCtx, cache, informer := newTest(t) // Add a bunch of objects. - objs := make([]interface{}, 0, 10) + var objs []interface{} for i := 0; i < 10; i++ { obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", "") objs = append(objs, obj) @@ -526,7 +527,7 @@ func TestListWithIndexer(t *testing.T) { // Add a bunch of objects. ns := "ns1" - objs := make([]interface{}, 0, 10) + var objs []interface{} for i := 0; i < 10; i++ { obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", ns) objs = append(objs, obj)