DRA scheduler: use assume cache to list claims

This finishes the transition to the assume cache as source of truth for the
current set of claims.

The tests have to be adapted. It's not enough anymore to directly put objects
into the informer store because that doesn't change the assume cache
content. Instead, normal Create/Update calls and waiting for the cache update
are needed.
This commit is contained in:
Patrick Ohly 2024-06-15 18:15:13 +02:00
parent 9a6f3b9388
commit 1b63639d31
4 changed files with 108 additions and 64 deletions

View File

@ -272,7 +272,6 @@ type dynamicResources struct {
enabled bool enabled bool
fh framework.Handle fh framework.Handle
clientset kubernetes.Interface clientset kubernetes.Interface
claimLister resourcev1alpha2listers.ResourceClaimLister
classLister resourcev1alpha2listers.ResourceClassLister classLister resourcev1alpha2listers.ResourceClassLister
podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister
claimParametersLister resourcev1alpha2listers.ResourceClaimParametersLister claimParametersLister resourcev1alpha2listers.ResourceClaimParametersLister
@ -347,7 +346,6 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
enabled: true, enabled: true,
fh: fh, fh: fh,
clientset: fh.ClientSet(), clientset: fh.ClientSet(),
claimLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Lister(),
classLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClasses().Lister(), classLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClasses().Lister(),
podSchedulingContextLister: fh.SharedInformerFactory().Resource().V1alpha2().PodSchedulingContexts().Lister(), podSchedulingContextLister: fh.SharedInformerFactory().Resource().V1alpha2().PodSchedulingContexts().Lister(),
claimParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaimParameters().Lister(), claimParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaimParameters().Lister(),
@ -791,11 +789,16 @@ func (pl *dynamicResources) foreachPodResourceClaim(pod *v1.Pod, cb func(podReso
if claimName == nil { if claimName == nil {
continue continue
} }
claim, err := pl.claimLister.ResourceClaims(pod.Namespace).Get(*claimName) obj, err := pl.claimAssumeCache.Get(pod.Namespace + "/" + *claimName)
if err != nil { if err != nil {
return err return err
} }
claim, ok := obj.(*resourcev1alpha2.ResourceClaim)
if !ok {
return fmt.Errorf("unexpected object type %T for assumed object %s/%s", obj, pod.Namespace, *claimName)
}
if claim.DeletionTimestamp != nil { if claim.DeletionTimestamp != nil {
return fmt.Errorf("resourceclaim %q is being deleted", claim.Name) return fmt.Errorf("resourceclaim %q is being deleted", claim.Name)
} }

View File

@ -1152,6 +1152,7 @@ type testContext struct {
ctx context.Context ctx context.Context
client *fake.Clientset client *fake.Clientset
informerFactory informers.SharedInformerFactory informerFactory informers.SharedInformerFactory
claimAssumeCache *assumecache.AssumeCache
p *dynamicResources p *dynamicResources
nodeInfos []*framework.NodeInfo nodeInfos []*framework.NodeInfo
state *framework.CycleState state *framework.CycleState
@ -1320,11 +1321,11 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl
tc.client.PrependReactor("list", "resourceclassparameters", createListReactor(tc.client.Tracker(), "ResourceClassParameters")) tc.client.PrependReactor("list", "resourceclassparameters", createListReactor(tc.client.Tracker(), "ResourceClassParameters"))
tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0) tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0)
assumeCache := assumecache.NewAssumeCache(tCtx.Logger(), tc.informerFactory.Resource().V1alpha2().ResourceClaims().Informer(), "resource claim", "", nil) tc.claimAssumeCache = assumecache.NewAssumeCache(tCtx.Logger(), tc.informerFactory.Resource().V1alpha2().ResourceClaims().Informer(), "resource claim", "", nil)
opts := []runtime.Option{ opts := []runtime.Option{
runtime.WithClientSet(tc.client), runtime.WithClientSet(tc.client),
runtime.WithInformerFactory(tc.informerFactory), runtime.WithInformerFactory(tc.informerFactory),
runtime.WithResourceClaimCache(assumeCache), runtime.WithResourceClaimCache(tc.claimAssumeCache),
} }
fh, err := runtime.NewFramework(tCtx, nil, nil, opts...) fh, err := runtime.NewFramework(tCtx, nil, nil, opts...)
if err != nil { if err != nil {
@ -1491,6 +1492,7 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
}, },
"backoff-wrong-old-object": { "backoff-wrong-old-object": {
pod: podWithClaimName, pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
oldObj: "not-a-claim", oldObj: "not-a-claim",
newObj: pendingImmediateClaim, newObj: pendingImmediateClaim,
expectedErr: true, expectedErr: true,
@ -1519,15 +1521,10 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
}, },
"structured-claim-deallocate": { "structured-claim-deallocate": {
pod: podWithClaimName, pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim}, claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim, otherStructuredAllocatedClaim},
oldObj: func() *resourcev1alpha2.ResourceClaim { oldObj: otherStructuredAllocatedClaim,
claim := structuredAllocatedClaim.DeepCopy()
claim.Name += "-other"
return claim
}(),
newObj: func() *resourcev1alpha2.ResourceClaim { newObj: func() *resourcev1alpha2.ResourceClaim {
claim := structuredAllocatedClaim.DeepCopy() claim := otherStructuredAllocatedClaim.DeepCopy()
claim.Name += "-other"
claim.Status.Allocation = nil claim.Status.Allocation = nil
return claim return claim
}(), }(),
@ -1539,18 +1536,48 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
for name, tc := range testcases { for name, tc := range testcases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t) logger, tCtx := ktesting.NewTestContext(t)
testCtx := setup(t, nil, tc.claims, nil, nil, nil) testCtx := setup(t, nil, tc.claims, nil, nil, nil)
oldObj := tc.oldObj
newObj := tc.newObj
if claim, ok := tc.newObj.(*resourcev1alpha2.ResourceClaim); ok { if claim, ok := tc.newObj.(*resourcev1alpha2.ResourceClaim); ok {
// Update the informer because the lister gets called and must have the claim. // Add or update through the client and wait until the event is processed.
store := testCtx.informerFactory.Resource().V1alpha2().ResourceClaims().Informer().GetStore() claimKey := claim.Namespace + "/" + claim.Name
if tc.oldObj == nil { if tc.oldObj == nil {
require.NoError(t, store.Add(claim)) // Some test claims already have it. Clear for create.
createClaim := claim.DeepCopy()
createClaim.UID = ""
storedClaim, err := testCtx.client.ResourceV1alpha2().ResourceClaims(createClaim.Namespace).Create(tCtx, createClaim, metav1.CreateOptions{})
require.NoError(t, err, "create claim")
claim = storedClaim
} else { } else {
require.NoError(t, store.Update(claim)) cachedClaim, err := testCtx.claimAssumeCache.Get(claimKey)
require.NoError(t, err, "retrieve old claim")
updateClaim := claim.DeepCopy()
// The test claim doesn't have those (generated dynamically), so copy them.
updateClaim.UID = cachedClaim.(*resourcev1alpha2.ResourceClaim).UID
updateClaim.ResourceVersion = cachedClaim.(*resourcev1alpha2.ResourceClaim).ResourceVersion
storedClaim, err := testCtx.client.ResourceV1alpha2().ResourceClaims(updateClaim.Namespace).Update(tCtx, updateClaim, metav1.UpdateOptions{})
require.NoError(t, err, "update claim")
claim = storedClaim
} }
// Eventually the assume cache will have it, too.
require.EventuallyWithT(t, func(t *assert.CollectT) {
cachedClaim, err := testCtx.claimAssumeCache.Get(claimKey)
require.NoError(t, err, "retrieve claim")
if cachedClaim.(*resourcev1alpha2.ResourceClaim).ResourceVersion != claim.ResourceVersion {
t.Errorf("cached claim not updated yet")
} }
actualHint, err := testCtx.p.isSchedulableAfterClaimChange(logger, tc.pod, tc.oldObj, tc.newObj) }, time.Minute, time.Second, "claim assume cache must have new or updated claim")
// This has the actual UID and ResourceVersion,
// which is relevant for
// isSchedulableAfterClaimChange.
newObj = claim
}
actualHint, err := testCtx.p.isSchedulableAfterClaimChange(logger, tc.pod, oldObj, newObj)
if tc.expectedErr { if tc.expectedErr {
require.Error(t, err) require.Error(t, err)
return return

View File

@ -126,6 +126,7 @@ type AssumeCache struct {
// All registered event handlers. // All registered event handlers.
eventHandlers []cache.ResourceEventHandler eventHandlers []cache.ResourceEventHandler
handlerRegistration cache.ResourceEventHandlerRegistration
// The eventQueue contains functions which deliver an event to one // The eventQueue contains functions which deliver an event to one
// event handler. // event handler.
@ -203,7 +204,7 @@ func NewAssumeCache(logger klog.Logger, informer Informer, description, indexNam
// Unit tests don't use informers // Unit tests don't use informers
if informer != nil { if informer != nil {
// Cannot fail in practice?! No-one bothers checking the error. // Cannot fail in practice?! No-one bothers checking the error.
_, _ = informer.AddEventHandler( c.handlerRegistration, _ = informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
AddFunc: c.add, AddFunc: c.add,
UpdateFunc: c.update, UpdateFunc: c.update,
@ -257,18 +258,7 @@ func (c *AssumeCache) add(obj interface{}) {
c.logger.Info("Error occurred while updating stored object", "err", err) c.logger.Info("Error occurred while updating stored object", "err", err)
} else { } else {
c.logger.V(10).Info("Adding object to assume cache", "description", c.description, "cacheKey", name, "assumeCache", obj) c.logger.V(10).Info("Adding object to assume cache", "description", c.description, "cacheKey", name, "assumeCache", obj)
for _, handler := range c.eventHandlers { c.pushEvent(oldObj, obj)
handler := handler
if oldObj == nil {
c.eventQueue.Push(func() {
handler.OnAdd(obj, false)
})
} else {
c.eventQueue.Push(func() {
handler.OnUpdate(oldObj, obj)
})
}
}
} }
} }
@ -304,11 +294,32 @@ func (c *AssumeCache) delete(obj interface{}) {
c.logger.Error(err, "Failed to delete", "description", c.description, "cacheKey", name) c.logger.Error(err, "Failed to delete", "description", c.description, "cacheKey", name)
} }
c.pushEvent(oldObj, nil)
}
// pushEvent gets called while the mutex is locked for writing.
// It ensures that all currently registered event handlers get
// notified about a change when the caller starts delivering
// those with emitEvents.
//
// For a delete event, newObj is nil. For an add, oldObj is nil.
// An update has both as non-nil.
func (c *AssumeCache) pushEvent(oldObj, newObj interface{}) {
for _, handler := range c.eventHandlers { for _, handler := range c.eventHandlers {
handler := handler handler := handler
if oldObj == nil {
c.eventQueue.Push(func() {
handler.OnAdd(newObj, false)
})
} else if newObj == nil {
c.eventQueue.Push(func() { c.eventQueue.Push(func() {
handler.OnDelete(oldObj) handler.OnDelete(oldObj)
}) })
} else {
c.eventQueue.Push(func() {
handler.OnUpdate(oldObj, newObj)
})
}
} }
} }
@ -441,13 +452,7 @@ func (c *AssumeCache) Assume(obj interface{}) error {
return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion) return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion)
} }
for _, handler := range c.eventHandlers { c.pushEvent(objInfo.latestObj, obj)
handler := handler
oldObj := objInfo.latestObj
c.eventQueue.Push(func() {
handler.OnUpdate(oldObj, obj)
})
}
// Only update the cached object // Only update the cached object
objInfo.latestObj = obj objInfo.latestObj = obj
@ -467,14 +472,7 @@ func (c *AssumeCache) Restore(objName string) {
c.logger.V(5).Info("Restore object", "description", c.description, "cacheKey", objName, "err", err) c.logger.V(5).Info("Restore object", "description", c.description, "cacheKey", objName, "err", err)
} else { } else {
if objInfo.latestObj != objInfo.apiObj { if objInfo.latestObj != objInfo.apiObj {
for _, handler := range c.eventHandlers { c.pushEvent(objInfo.latestObj, objInfo.apiObj)
handler := handler
oldObj, obj := objInfo.latestObj, objInfo.apiObj
c.eventQueue.Push(func() {
handler.OnUpdate(oldObj, obj)
})
}
objInfo.latestObj = objInfo.apiObj objInfo.latestObj = objInfo.apiObj
} }
c.logger.V(4).Info("Restored object", "description", c.description, "cacheKey", objName) c.logger.V(4).Info("Restored object", "description", c.description, "cacheKey", objName)
@ -485,7 +483,9 @@ func (c *AssumeCache) Restore(objName string) {
// single handler are delivered sequentially, but there is no // single handler are delivered sequentially, but there is no
// coordination between different handlers. A handler may use the // coordination between different handlers. A handler may use the
// cache. // cache.
func (c *AssumeCache) AddEventHandler(handler cache.ResourceEventHandler) { //
// The return value can be used to wait for cache synchronization.
func (c *AssumeCache) AddEventHandler(handler cache.ResourceEventHandler) cache.ResourceEventHandlerRegistration {
defer c.emitEvents() defer c.emitEvents()
c.rwMutex.Lock() c.rwMutex.Lock()
defer c.rwMutex.Unlock() defer c.rwMutex.Unlock()
@ -497,6 +497,13 @@ func (c *AssumeCache) AddEventHandler(handler cache.ResourceEventHandler) {
handler.OnAdd(obj, true) handler.OnAdd(obj, true)
}) })
} }
if c.handlerRegistration == nil {
// No informer, so immediately synced.
return syncedHandlerRegistration{}
}
return c.handlerRegistration
} }
// emitEvents delivers all pending events that are in the queue, in the order // emitEvents delivers all pending events that are in the queue, in the order
@ -516,3 +523,9 @@ func (c *AssumeCache) emitEvents() {
}() }()
} }
} }
// syncedHandlerRegistration is an implementation of ResourceEventHandlerRegistration
// which always returns true.
type syncedHandlerRegistration struct{}
func (syncedHandlerRegistration) HasSynced() bool { return true }

View File

@ -290,7 +290,7 @@ func TestRestore(t *testing.T) {
events.verifyAndFlush(tCtx, []event{{What: "add", Obj: oldObj}}) events.verifyAndFlush(tCtx, []event{{What: "add", Obj: oldObj}})
}) })
// Restore object. // Restore the same object.
ktesting.Step(tCtx, "initial Restore", func(tCtx ktesting.TContext) { ktesting.Step(tCtx, "initial Restore", func(tCtx ktesting.TContext) {
cache.Restore(oldObj.GetName()) cache.Restore(oldObj.GetName())
verify(tCtx, cache, oldObj.GetName(), oldObj, oldObj) verify(tCtx, cache, oldObj.GetName(), oldObj, oldObj)
@ -306,7 +306,7 @@ func TestRestore(t *testing.T) {
events.verifyAndFlush(tCtx, []event{{What: "update", OldObj: oldObj, Obj: newObj}}) events.verifyAndFlush(tCtx, []event{{What: "update", OldObj: oldObj, Obj: newObj}})
}) })
// Restore object. // Restore the same object.
ktesting.Step(tCtx, "second Restore", func(tCtx ktesting.TContext) { ktesting.Step(tCtx, "second Restore", func(tCtx ktesting.TContext) {
cache.Restore(oldObj.GetName()) cache.Restore(oldObj.GetName())
verify(tCtx, cache, oldObj.GetName(), oldObj, oldObj) verify(tCtx, cache, oldObj.GetName(), oldObj, oldObj)
@ -346,6 +346,7 @@ func TestEvents(t *testing.T) {
ktesting.Step(tCtx, "nil add", func(tCtx ktesting.TContext) { ktesting.Step(tCtx, "nil add", func(tCtx ktesting.TContext) {
informer.add(nil) informer.add(nil)
verify(tCtx, cache, key, newObj, newObj) verify(tCtx, cache, key, newObj, newObj)
events.verifyAndFlush(tCtx, nil)
}) })
ktesting.Step(tCtx, "nop update", func(tCtx ktesting.TContext) { ktesting.Step(tCtx, "nop update", func(tCtx ktesting.TContext) {
informer.update(oldObj) informer.update(oldObj)
@ -378,7 +379,7 @@ func TestEventHandlers(t *testing.T) {
tCtx, cache, informer := newTest(t) tCtx, cache, informer := newTest(t)
handlers := make([]mockEventHandler, 5) handlers := make([]mockEventHandler, 5)
objs := make([]metav1.Object, 0, 20) var objs []metav1.Object
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
objs = append(objs, makeObj(fmt.Sprintf("test-pvc%v", i), "1", "")) objs = append(objs, makeObj(fmt.Sprintf("test-pvc%v", i), "1", ""))
informer.add(objs[i]) informer.add(objs[i])
@ -437,7 +438,7 @@ func TestEventHandlerConcurrency(t *testing.T) {
tCtx, cache, informer := newTest(t) tCtx, cache, informer := newTest(t)
handlers := make([]mockEventHandler, 5) handlers := make([]mockEventHandler, 5)
objs := make([]metav1.Object, 0, 20) var objs []metav1.Object
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
objs = append(objs, makeObj(fmt.Sprintf("test-pvc%v", i), "1", "")) objs = append(objs, makeObj(fmt.Sprintf("test-pvc%v", i), "1", ""))
} }
@ -487,7 +488,7 @@ func TestListNoIndexer(t *testing.T) {
tCtx, cache, informer := newTest(t) tCtx, cache, informer := newTest(t)
// Add a bunch of objects. // Add a bunch of objects.
objs := make([]interface{}, 0, 10) var objs []interface{}
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", "") obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", "")
objs = append(objs, obj) objs = append(objs, obj)
@ -526,7 +527,7 @@ func TestListWithIndexer(t *testing.T) {
// Add a bunch of objects. // Add a bunch of objects.
ns := "ns1" ns := "ns1"
objs := make([]interface{}, 0, 10) var objs []interface{}
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", ns) obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", ns)
objs = append(objs, obj) objs = append(objs, obj)