diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 715f3f9b1c6..e701a55d9c2 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -272,7 +272,6 @@ type dynamicResources struct { enabled bool fh framework.Handle clientset kubernetes.Interface - claimLister resourcev1alpha2listers.ResourceClaimLister classLister resourcev1alpha2listers.ResourceClassLister podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister claimParametersLister resourcev1alpha2listers.ResourceClaimParametersLister @@ -347,7 +346,6 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe enabled: true, fh: fh, clientset: fh.ClientSet(), - claimLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Lister(), classLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClasses().Lister(), podSchedulingContextLister: fh.SharedInformerFactory().Resource().V1alpha2().PodSchedulingContexts().Lister(), claimParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaimParameters().Lister(), @@ -791,11 +789,16 @@ func (pl *dynamicResources) foreachPodResourceClaim(pod *v1.Pod, cb func(podReso if claimName == nil { continue } - claim, err := pl.claimLister.ResourceClaims(pod.Namespace).Get(*claimName) + obj, err := pl.claimAssumeCache.Get(pod.Namespace + "/" + *claimName) if err != nil { return err } + claim, ok := obj.(*resourcev1alpha2.ResourceClaim) + if !ok { + return fmt.Errorf("unexpected object type %T for assumed object %s/%s", obj, pod.Namespace, *claimName) + } + if claim.DeletionTimestamp != nil { return fmt.Errorf("resourceclaim %q is being deleted", claim.Name) } diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go index 90f08a1aa7c..562a8de7eda 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go @@ -1149,12 +1149,13 @@ func TestPlugin(t *testing.T) { } type testContext struct { - ctx context.Context - client *fake.Clientset - informerFactory informers.SharedInformerFactory - p *dynamicResources - nodeInfos []*framework.NodeInfo - state *framework.CycleState + ctx context.Context + client *fake.Clientset + informerFactory informers.SharedInformerFactory + claimAssumeCache *assumecache.AssumeCache + p *dynamicResources + nodeInfos []*framework.NodeInfo + state *framework.CycleState } func (tc *testContext) verify(t *testing.T, expected result, initialObjects []metav1.Object, result interface{}, status *framework.Status) { @@ -1320,11 +1321,11 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl tc.client.PrependReactor("list", "resourceclassparameters", createListReactor(tc.client.Tracker(), "ResourceClassParameters")) tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0) - assumeCache := assumecache.NewAssumeCache(tCtx.Logger(), tc.informerFactory.Resource().V1alpha2().ResourceClaims().Informer(), "resource claim", "", nil) + tc.claimAssumeCache = assumecache.NewAssumeCache(tCtx.Logger(), tc.informerFactory.Resource().V1alpha2().ResourceClaims().Informer(), "resource claim", "", nil) opts := []runtime.Option{ runtime.WithClientSet(tc.client), runtime.WithInformerFactory(tc.informerFactory), - runtime.WithResourceClaimCache(assumeCache), + runtime.WithResourceClaimCache(tc.claimAssumeCache), } fh, err := runtime.NewFramework(tCtx, nil, nil, opts...) if err != nil { @@ -1491,6 +1492,7 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) { }, "backoff-wrong-old-object": { pod: podWithClaimName, + claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim}, oldObj: "not-a-claim", newObj: pendingImmediateClaim, expectedErr: true, @@ -1519,15 +1521,10 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) { }, "structured-claim-deallocate": { pod: podWithClaimName, - claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim}, - oldObj: func() *resourcev1alpha2.ResourceClaim { - claim := structuredAllocatedClaim.DeepCopy() - claim.Name += "-other" - return claim - }(), + claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim, otherStructuredAllocatedClaim}, + oldObj: otherStructuredAllocatedClaim, newObj: func() *resourcev1alpha2.ResourceClaim { - claim := structuredAllocatedClaim.DeepCopy() - claim.Name += "-other" + claim := otherStructuredAllocatedClaim.DeepCopy() claim.Status.Allocation = nil return claim }(), @@ -1539,18 +1536,48 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) { for name, tc := range testcases { t.Run(name, func(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) + logger, tCtx := ktesting.NewTestContext(t) testCtx := setup(t, nil, tc.claims, nil, nil, nil) + oldObj := tc.oldObj + newObj := tc.newObj if claim, ok := tc.newObj.(*resourcev1alpha2.ResourceClaim); ok { - // Update the informer because the lister gets called and must have the claim. - store := testCtx.informerFactory.Resource().V1alpha2().ResourceClaims().Informer().GetStore() + // Add or update through the client and wait until the event is processed. + claimKey := claim.Namespace + "/" + claim.Name if tc.oldObj == nil { - require.NoError(t, store.Add(claim)) + // Some test claims already have it. Clear for create. + createClaim := claim.DeepCopy() + createClaim.UID = "" + storedClaim, err := testCtx.client.ResourceV1alpha2().ResourceClaims(createClaim.Namespace).Create(tCtx, createClaim, metav1.CreateOptions{}) + require.NoError(t, err, "create claim") + claim = storedClaim } else { - require.NoError(t, store.Update(claim)) + cachedClaim, err := testCtx.claimAssumeCache.Get(claimKey) + require.NoError(t, err, "retrieve old claim") + updateClaim := claim.DeepCopy() + // The test claim doesn't have those (generated dynamically), so copy them. + updateClaim.UID = cachedClaim.(*resourcev1alpha2.ResourceClaim).UID + updateClaim.ResourceVersion = cachedClaim.(*resourcev1alpha2.ResourceClaim).ResourceVersion + + storedClaim, err := testCtx.client.ResourceV1alpha2().ResourceClaims(updateClaim.Namespace).Update(tCtx, updateClaim, metav1.UpdateOptions{}) + require.NoError(t, err, "update claim") + claim = storedClaim } + + // Eventually the assume cache will have it, too. + require.EventuallyWithT(t, func(t *assert.CollectT) { + cachedClaim, err := testCtx.claimAssumeCache.Get(claimKey) + require.NoError(t, err, "retrieve claim") + if cachedClaim.(*resourcev1alpha2.ResourceClaim).ResourceVersion != claim.ResourceVersion { + t.Errorf("cached claim not updated yet") + } + }, time.Minute, time.Second, "claim assume cache must have new or updated claim") + + // This has the actual UID and ResourceVersion, + // which is relevant for + // isSchedulableAfterClaimChange. + newObj = claim } - actualHint, err := testCtx.p.isSchedulableAfterClaimChange(logger, tc.pod, tc.oldObj, tc.newObj) + actualHint, err := testCtx.p.isSchedulableAfterClaimChange(logger, tc.pod, oldObj, newObj) if tc.expectedErr { require.Error(t, err) return diff --git a/pkg/scheduler/util/assumecache/assume_cache.go b/pkg/scheduler/util/assumecache/assume_cache.go index a517db97443..c7392129cbd 100644 --- a/pkg/scheduler/util/assumecache/assume_cache.go +++ b/pkg/scheduler/util/assumecache/assume_cache.go @@ -125,7 +125,8 @@ type AssumeCache struct { rwMutex sync.RWMutex // All registered event handlers. - eventHandlers []cache.ResourceEventHandler + eventHandlers []cache.ResourceEventHandler + handlerRegistration cache.ResourceEventHandlerRegistration // The eventQueue contains functions which deliver an event to one // event handler. @@ -203,7 +204,7 @@ func NewAssumeCache(logger klog.Logger, informer Informer, description, indexNam // Unit tests don't use informers if informer != nil { // Cannot fail in practice?! No-one bothers checking the error. - _, _ = informer.AddEventHandler( + c.handlerRegistration, _ = informer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: c.add, UpdateFunc: c.update, @@ -257,18 +258,7 @@ func (c *AssumeCache) add(obj interface{}) { c.logger.Info("Error occurred while updating stored object", "err", err) } else { c.logger.V(10).Info("Adding object to assume cache", "description", c.description, "cacheKey", name, "assumeCache", obj) - for _, handler := range c.eventHandlers { - handler := handler - if oldObj == nil { - c.eventQueue.Push(func() { - handler.OnAdd(obj, false) - }) - } else { - c.eventQueue.Push(func() { - handler.OnUpdate(oldObj, obj) - }) - } - } + c.pushEvent(oldObj, obj) } } @@ -304,11 +294,32 @@ func (c *AssumeCache) delete(obj interface{}) { c.logger.Error(err, "Failed to delete", "description", c.description, "cacheKey", name) } + c.pushEvent(oldObj, nil) +} + +// pushEvent gets called while the mutex is locked for writing. +// It ensures that all currently registered event handlers get +// notified about a change when the caller starts delivering +// those with emitEvents. +// +// For a delete event, newObj is nil. For an add, oldObj is nil. +// An update has both as non-nil. +func (c *AssumeCache) pushEvent(oldObj, newObj interface{}) { for _, handler := range c.eventHandlers { handler := handler - c.eventQueue.Push(func() { - handler.OnDelete(oldObj) - }) + if oldObj == nil { + c.eventQueue.Push(func() { + handler.OnAdd(newObj, false) + }) + } else if newObj == nil { + c.eventQueue.Push(func() { + handler.OnDelete(oldObj) + }) + } else { + c.eventQueue.Push(func() { + handler.OnUpdate(oldObj, newObj) + }) + } } } @@ -441,13 +452,7 @@ func (c *AssumeCache) Assume(obj interface{}) error { return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion) } - for _, handler := range c.eventHandlers { - handler := handler - oldObj := objInfo.latestObj - c.eventQueue.Push(func() { - handler.OnUpdate(oldObj, obj) - }) - } + c.pushEvent(objInfo.latestObj, obj) // Only update the cached object objInfo.latestObj = obj @@ -467,14 +472,7 @@ func (c *AssumeCache) Restore(objName string) { c.logger.V(5).Info("Restore object", "description", c.description, "cacheKey", objName, "err", err) } else { if objInfo.latestObj != objInfo.apiObj { - for _, handler := range c.eventHandlers { - handler := handler - oldObj, obj := objInfo.latestObj, objInfo.apiObj - c.eventQueue.Push(func() { - handler.OnUpdate(oldObj, obj) - }) - } - + c.pushEvent(objInfo.latestObj, objInfo.apiObj) objInfo.latestObj = objInfo.apiObj } c.logger.V(4).Info("Restored object", "description", c.description, "cacheKey", objName) @@ -485,7 +483,9 @@ func (c *AssumeCache) Restore(objName string) { // single handler are delivered sequentially, but there is no // coordination between different handlers. A handler may use the // cache. -func (c *AssumeCache) AddEventHandler(handler cache.ResourceEventHandler) { +// +// The return value can be used to wait for cache synchronization. +func (c *AssumeCache) AddEventHandler(handler cache.ResourceEventHandler) cache.ResourceEventHandlerRegistration { defer c.emitEvents() c.rwMutex.Lock() defer c.rwMutex.Unlock() @@ -497,6 +497,13 @@ func (c *AssumeCache) AddEventHandler(handler cache.ResourceEventHandler) { handler.OnAdd(obj, true) }) } + + if c.handlerRegistration == nil { + // No informer, so immediately synced. + return syncedHandlerRegistration{} + } + + return c.handlerRegistration } // emitEvents delivers all pending events that are in the queue, in the order @@ -516,3 +523,9 @@ func (c *AssumeCache) emitEvents() { }() } } + +// syncedHandlerRegistration is an implementation of ResourceEventHandlerRegistration +// which always returns true. +type syncedHandlerRegistration struct{} + +func (syncedHandlerRegistration) HasSynced() bool { return true } diff --git a/pkg/scheduler/util/assumecache/assume_cache_test.go b/pkg/scheduler/util/assumecache/assume_cache_test.go index 8e4336730aa..a18d51e0a87 100644 --- a/pkg/scheduler/util/assumecache/assume_cache_test.go +++ b/pkg/scheduler/util/assumecache/assume_cache_test.go @@ -290,7 +290,7 @@ func TestRestore(t *testing.T) { events.verifyAndFlush(tCtx, []event{{What: "add", Obj: oldObj}}) }) - // Restore object. + // Restore the same object. ktesting.Step(tCtx, "initial Restore", func(tCtx ktesting.TContext) { cache.Restore(oldObj.GetName()) verify(tCtx, cache, oldObj.GetName(), oldObj, oldObj) @@ -306,7 +306,7 @@ func TestRestore(t *testing.T) { events.verifyAndFlush(tCtx, []event{{What: "update", OldObj: oldObj, Obj: newObj}}) }) - // Restore object. + // Restore the same object. ktesting.Step(tCtx, "second Restore", func(tCtx ktesting.TContext) { cache.Restore(oldObj.GetName()) verify(tCtx, cache, oldObj.GetName(), oldObj, oldObj) @@ -346,6 +346,7 @@ func TestEvents(t *testing.T) { ktesting.Step(tCtx, "nil add", func(tCtx ktesting.TContext) { informer.add(nil) verify(tCtx, cache, key, newObj, newObj) + events.verifyAndFlush(tCtx, nil) }) ktesting.Step(tCtx, "nop update", func(tCtx ktesting.TContext) { informer.update(oldObj) @@ -378,7 +379,7 @@ func TestEventHandlers(t *testing.T) { tCtx, cache, informer := newTest(t) handlers := make([]mockEventHandler, 5) - objs := make([]metav1.Object, 0, 20) + var objs []metav1.Object for i := 0; i < 5; i++ { objs = append(objs, makeObj(fmt.Sprintf("test-pvc%v", i), "1", "")) informer.add(objs[i]) @@ -437,7 +438,7 @@ func TestEventHandlerConcurrency(t *testing.T) { tCtx, cache, informer := newTest(t) handlers := make([]mockEventHandler, 5) - objs := make([]metav1.Object, 0, 20) + var objs []metav1.Object for i := 0; i < 5; i++ { objs = append(objs, makeObj(fmt.Sprintf("test-pvc%v", i), "1", "")) } @@ -487,7 +488,7 @@ func TestListNoIndexer(t *testing.T) { tCtx, cache, informer := newTest(t) // Add a bunch of objects. - objs := make([]interface{}, 0, 10) + var objs []interface{} for i := 0; i < 10; i++ { obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", "") objs = append(objs, obj) @@ -526,7 +527,7 @@ func TestListWithIndexer(t *testing.T) { // Add a bunch of objects. ns := "ns1" - objs := make([]interface{}, 0, 10) + var objs []interface{} for i := 0; i < 10; i++ { obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", ns) objs = append(objs, obj)