diff --git a/pkg/scheduler/util/assumecache/assume_cache.go b/pkg/scheduler/util/assumecache/assume_cache.go index 69ec1175f03..a517db97443 100644 --- a/pkg/scheduler/util/assumecache/assume_cache.go +++ b/pkg/scheduler/util/assumecache/assume_cache.go @@ -25,7 +25,9 @@ import ( "k8s.io/klog/v2" "k8s.io/apimachinery/pkg/api/meta" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/pkg/scheduler/util/queue" ) // Informer is the subset of [cache.SharedInformer] that NewAssumeCache depends upon. @@ -119,9 +121,33 @@ type AssumeCache struct { // Will be used for all operations. logger klog.Logger - // Synchronizes updates to store + // Synchronizes updates to all fields below. rwMutex sync.RWMutex + // All registered event handlers. + eventHandlers []cache.ResourceEventHandler + + // The eventQueue contains functions which deliver an event to one + // event handler. + // + // These functions must be invoked while *not locking* rwMutex because + // the event handlers are allowed to access the assume cache. Holding + // rwMutex then would cause a deadlock. + // + // New functions get added as part of processing a cache update while + // the rwMutex is locked. Each function which adds something to the queue + // also drains the queue before returning, therefore it is guaranteed + // that all event handlers get notified immediately (useful for unit + // testing). + // + // A channel cannot be used here because it cannot have an unbounded + // capacity. This could lead to a deadlock (writer holds rwMutex, + // gets blocked because capacity is exhausted, reader is in a handler + // which tries to lock the rwMutex). Writing into such a channel + // while not holding the rwMutex doesn't work because in-order delivery + // of events would no longer be guaranteed. + eventQueue queue.FIFO[func()] + // describes the object stored description string @@ -199,9 +225,11 @@ func (c *AssumeCache) add(obj interface{}) { return } + defer c.emitEvents() c.rwMutex.Lock() defer c.rwMutex.Unlock() + var oldObj interface{} if objInfo, _ := c.getObjInfo(name); objInfo != nil { newVersion, err := c.getObjVersion(name, obj) if err != nil { @@ -221,6 +249,7 @@ func (c *AssumeCache) add(obj interface{}) { c.logger.V(10).Info("Skip adding object to assume cache because version is not newer than storedVersion", "description", c.description, "cacheKey", name, "newVersion", newVersion, "storedVersion", storedVersion) return } + oldObj = objInfo.latestObj } objInfo := &objInfo{name: name, latestObj: obj, apiObj: obj} @@ -228,6 +257,18 @@ func (c *AssumeCache) add(obj interface{}) { c.logger.Info("Error occurred while updating stored object", "err", err) } else { c.logger.V(10).Info("Adding object to assume cache", "description", c.description, "cacheKey", name, "assumeCache", obj) + for _, handler := range c.eventHandlers { + handler := handler + if oldObj == nil { + c.eventQueue.Push(func() { + handler.OnAdd(obj, false) + }) + } else { + c.eventQueue.Push(func() { + handler.OnUpdate(oldObj, obj) + }) + } + } } } @@ -246,14 +287,29 @@ func (c *AssumeCache) delete(obj interface{}) { return } + defer c.emitEvents() c.rwMutex.Lock() defer c.rwMutex.Unlock() + var oldObj interface{} + if len(c.eventHandlers) > 0 { + if objInfo, _ := c.getObjInfo(name); objInfo != nil { + oldObj = objInfo.latestObj + } + } + objInfo := &objInfo{name: name} err = c.store.Delete(objInfo) if err != nil { c.logger.Error(err, "Failed to delete", "description", c.description, "cacheKey", name) } + + for _, handler := range c.eventHandlers { + handler := handler + c.eventQueue.Push(func() { + handler.OnDelete(oldObj) + }) + } } func (c *AssumeCache) getObjVersion(name string, obj interface{}) (int64, error) { @@ -315,6 +371,10 @@ func (c *AssumeCache) List(indexObj interface{}) []interface{} { c.rwMutex.RLock() defer c.rwMutex.RUnlock() + return c.listLocked(indexObj) +} + +func (c *AssumeCache) listLocked(indexObj interface{}) []interface{} { allObjs := []interface{}{} var objs []interface{} if c.indexName != "" { @@ -358,6 +418,7 @@ func (c *AssumeCache) Assume(obj interface{}) error { return &ObjectNameError{err} } + defer c.emitEvents() c.rwMutex.Lock() defer c.rwMutex.Unlock() @@ -380,6 +441,14 @@ func (c *AssumeCache) Assume(obj interface{}) error { return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion) } + for _, handler := range c.eventHandlers { + handler := handler + oldObj := objInfo.latestObj + c.eventQueue.Push(func() { + handler.OnUpdate(oldObj, obj) + }) + } + // Only update the cached object objInfo.latestObj = obj c.logger.V(4).Info("Assumed object", "description", c.description, "cacheKey", name, "version", newVersion) @@ -388,6 +457,7 @@ func (c *AssumeCache) Assume(obj interface{}) error { // Restore the informer cache's version of the object. func (c *AssumeCache) Restore(objName string) { + defer c.emitEvents() c.rwMutex.Lock() defer c.rwMutex.Unlock() @@ -396,7 +466,53 @@ func (c *AssumeCache) Restore(objName string) { // This could be expected if object got deleted c.logger.V(5).Info("Restore object", "description", c.description, "cacheKey", objName, "err", err) } else { - objInfo.latestObj = objInfo.apiObj + if objInfo.latestObj != objInfo.apiObj { + for _, handler := range c.eventHandlers { + handler := handler + oldObj, obj := objInfo.latestObj, objInfo.apiObj + c.eventQueue.Push(func() { + handler.OnUpdate(oldObj, obj) + }) + } + + objInfo.latestObj = objInfo.apiObj + } c.logger.V(4).Info("Restored object", "description", c.description, "cacheKey", objName) } } + +// AddEventHandler adds an event handler to the cache. Events to a +// single handler are delivered sequentially, but there is no +// coordination between different handlers. A handler may use the +// cache. +func (c *AssumeCache) AddEventHandler(handler cache.ResourceEventHandler) { + defer c.emitEvents() + c.rwMutex.Lock() + defer c.rwMutex.Unlock() + + c.eventHandlers = append(c.eventHandlers, handler) + allObjs := c.listLocked(nil) + for _, obj := range allObjs { + c.eventQueue.Push(func() { + handler.OnAdd(obj, true) + }) + } +} + +// emitEvents delivers all pending events that are in the queue, in the order +// in which they were stored there (FIFO). +func (c *AssumeCache) emitEvents() { + for { + c.rwMutex.Lock() + deliver, ok := c.eventQueue.Pop() + c.rwMutex.Unlock() + + if !ok { + return + } + func() { + defer utilruntime.HandleCrash() + deliver() + }() + } +} diff --git a/pkg/scheduler/util/assumecache/assume_cache_test.go b/pkg/scheduler/util/assumecache/assume_cache_test.go index 6c11ac275fa..8e4336730aa 100644 --- a/pkg/scheduler/util/assumecache/assume_cache_test.go +++ b/pkg/scheduler/util/assumecache/assume_cache_test.go @@ -19,6 +19,8 @@ package assumecache import ( "fmt" "slices" + "sort" + "sync" "testing" "github.com/google/go-cmp/cmp" @@ -118,6 +120,79 @@ func verifyList(tCtx ktesting.TContext, assumeCache *AssumeCache, expectedObjs [ } } +type mockEventHandler struct { + mutex sync.Mutex + events []event + cache *AssumeCache + block <-chan struct{} +} + +type event struct { + What string + OldObj, Obj interface{} + InitialList bool +} + +func (m *mockEventHandler) OnAdd(obj interface{}, initialList bool) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.events = append(m.events, event{ + What: "add", + Obj: obj, + InitialList: initialList, + }) + + if m.cache != nil { + // Must not deadlock! + m.cache.List(nil) + } + if m.block != nil { + <-m.block + } +} + +func (m *mockEventHandler) OnUpdate(oldObj, obj interface{}) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.events = append(m.events, event{ + What: "update", + OldObj: oldObj, + Obj: obj, + }) +} + +func (m *mockEventHandler) OnDelete(obj interface{}) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.events = append(m.events, event{ + What: "delete", + Obj: obj, + }) +} + +func (m *mockEventHandler) verifyAndFlush(tCtx ktesting.TContext, expectedEvents []event) { + m.mutex.Lock() + defer m.mutex.Unlock() + + tCtx.Helper() + if diff := cmp.Diff(expectedEvents, m.events); diff != "" { + tCtx.Fatalf("unexpected events (- expected, + actual):\n%s", diff) + } + m.events = nil +} + +func (m *mockEventHandler) sortEvents(cmp func(objI, objJ interface{}) bool) { + m.mutex.Lock() + defer m.mutex.Unlock() + + sort.Slice(m.events, func(i, j int) bool { + return cmp(m.events[i].Obj, m.events[j].Obj) + }) +} + func TestAssume(t *testing.T) { scenarios := map[string]struct { oldObj metav1.Object @@ -162,6 +237,8 @@ func TestAssume(t *testing.T) { for name, scenario := range scenarios { t.Run(name, func(t *testing.T) { tCtx, cache, informer := newTest(t) + var events mockEventHandler + cache.AddEventHandler(&events) // Add old object to cache. informer.add(scenario.oldObj) @@ -173,18 +250,25 @@ func TestAssume(t *testing.T) { t.Errorf("Assume() returned error: %v\ndiff (- expected, + actual):\n%s", err, diff) } - // Check that Get returns correct object. + // Check that Get returns correct object and + // that events were delivered correctly. + expectEvents := []event{{What: "add", Obj: scenario.oldObj}} expectedObj := scenario.newObj if scenario.expectErr != nil { expectedObj = scenario.oldObj + } else { + expectEvents = append(expectEvents, event{What: "update", OldObj: scenario.oldObj, Obj: scenario.newObj}) } verify(tCtx, cache, scenario.oldObj.GetName(), expectedObj, scenario.oldObj) + events.verifyAndFlush(tCtx, expectEvents) }) } } func TestRestore(t *testing.T) { tCtx, cache, informer := newTest(t) + var events mockEventHandler + cache.AddEventHandler(&events) // This test assumes an object with the same version as the API object. // The assume cache supports that, but doing so in real code suffers from @@ -194,25 +278,40 @@ func TestRestore(t *testing.T) { newObj := makeObj("pvc1", "5", "") // Restore object that doesn't exist - cache.Restore("nothing") + ktesting.Step(tCtx, "empty cache", func(tCtx ktesting.TContext) { + cache.Restore("nothing") + events.verifyAndFlush(tCtx, nil) + }) // Add old object to cache. - informer.add(oldObj) - verify(ktesting.WithStep(tCtx, "after initial update"), cache, oldObj.GetName(), oldObj, oldObj) + ktesting.Step(tCtx, "initial update", func(tCtx ktesting.TContext) { + informer.add(oldObj) + verify(tCtx, cache, oldObj.GetName(), oldObj, oldObj) + events.verifyAndFlush(tCtx, []event{{What: "add", Obj: oldObj}}) + }) // Restore object. - cache.Restore(oldObj.GetName()) - verify(ktesting.WithStep(tCtx, "after initial Restore"), cache, oldObj.GetName(), oldObj, oldObj) + ktesting.Step(tCtx, "initial Restore", func(tCtx ktesting.TContext) { + cache.Restore(oldObj.GetName()) + verify(tCtx, cache, oldObj.GetName(), oldObj, oldObj) + events.verifyAndFlush(tCtx, nil) + }) // Assume new object. - if err := cache.Assume(newObj); err != nil { - t.Fatalf("Assume() returned error %v", err) - } - verify(ktesting.WithStep(tCtx, "after Assume"), cache, oldObj.GetName(), newObj, oldObj) + ktesting.Step(tCtx, "Assume", func(tCtx ktesting.TContext) { + if err := cache.Assume(newObj); err != nil { + tCtx.Fatalf("Assume() returned error %v", err) + } + verify(tCtx, cache, oldObj.GetName(), newObj, oldObj) + events.verifyAndFlush(tCtx, []event{{What: "update", OldObj: oldObj, Obj: newObj}}) + }) // Restore object. - cache.Restore(oldObj.GetName()) - verify(ktesting.WithStep(tCtx, "after second Restore"), cache, oldObj.GetName(), oldObj, oldObj) + ktesting.Step(tCtx, "second Restore", func(tCtx ktesting.TContext) { + cache.Restore(oldObj.GetName()) + verify(tCtx, cache, oldObj.GetName(), oldObj, oldObj) + events.verifyAndFlush(tCtx, []event{{What: "update", OldObj: newObj, Obj: oldObj}}) + }) } func TestEvents(t *testing.T) { @@ -226,27 +325,161 @@ func TestEvents(t *testing.T) { informer.add(oldObj) verify(ktesting.WithStep(tCtx, "after initial update"), cache, key, oldObj, oldObj) + // Receive initial list. + var events mockEventHandler + cache.AddEventHandler(&events) + events.verifyAndFlush(ktesting.WithStep(tCtx, "initial list"), []event{{What: "add", Obj: oldObj, InitialList: true}}) + // Update object. - informer.update(newObj) - verify(ktesting.WithStep(tCtx, "after initial update"), cache, key, newObj, newObj) + ktesting.Step(tCtx, "initial update", func(tCtx ktesting.TContext) { + informer.update(newObj) + verify(tCtx, cache, key, newObj, newObj) + events.verifyAndFlush(tCtx, []event{{What: "update", OldObj: oldObj, Obj: newObj}}) + }) // Some error cases (don't occur in practice). - informer.add(1) - verify(ktesting.WithStep(tCtx, "after nop add"), cache, key, newObj, newObj) - informer.add(nil) - verify(ktesting.WithStep(tCtx, "after nil add"), cache, key, newObj, newObj) - informer.update(oldObj) - verify(ktesting.WithStep(tCtx, "after nop update"), cache, key, newObj, newObj) - informer.update(nil) - verify(ktesting.WithStep(tCtx, "after nil update"), cache, key, newObj, newObj) - informer.delete(nil) - verify(ktesting.WithStep(tCtx, "after nop delete"), cache, key, newObj, newObj) + ktesting.Step(tCtx, "nop add", func(tCtx ktesting.TContext) { + informer.add(1) + verify(tCtx, cache, key, newObj, newObj) + events.verifyAndFlush(tCtx, nil) + }) + ktesting.Step(tCtx, "nil add", func(tCtx ktesting.TContext) { + informer.add(nil) + verify(tCtx, cache, key, newObj, newObj) + }) + ktesting.Step(tCtx, "nop update", func(tCtx ktesting.TContext) { + informer.update(oldObj) + events.verifyAndFlush(tCtx, nil) + verify(tCtx, cache, key, newObj, newObj) + }) + ktesting.Step(tCtx, "nil update", func(tCtx ktesting.TContext) { + informer.update(nil) + verify(tCtx, cache, key, newObj, newObj) + events.verifyAndFlush(tCtx, nil) + }) + ktesting.Step(tCtx, "nop delete", func(tCtx ktesting.TContext) { + informer.delete(nil) + verify(tCtx, cache, key, newObj, newObj) + events.verifyAndFlush(tCtx, nil) + }) // Delete object. - informer.delete(oldObj) - _, err := cache.Get(key) - if diff := cmp.Diff(ErrNotFound, err, cmpopts.EquateErrors()); diff != "" { - t.Errorf("Get did not return expected error: %v\ndiff (- expected, + actual):\n%s", err, diff) + ktesting.Step(tCtx, "delete", func(tCtx ktesting.TContext) { + informer.delete(oldObj) + events.verifyAndFlush(tCtx, []event{{What: "delete", Obj: newObj}}) + _, err := cache.Get(key) + if diff := cmp.Diff(ErrNotFound, err, cmpopts.EquateErrors()); diff != "" { + tCtx.Errorf("Get did not return expected error: %v\ndiff (- expected, + actual):\n%s", err, diff) + } + }) +} + +func TestEventHandlers(t *testing.T) { + tCtx, cache, informer := newTest(t) + handlers := make([]mockEventHandler, 5) + + objs := make([]metav1.Object, 0, 20) + for i := 0; i < 5; i++ { + objs = append(objs, makeObj(fmt.Sprintf("test-pvc%v", i), "1", "")) + informer.add(objs[i]) + } + + // Accessing cache during OnAdd must not deadlock! + handlers[0].cache = cache + + // Order of delivered events is random, we must ensure + // increasing order by name ourselves. + var expectedEvents []event + for _, obj := range objs { + expectedEvents = append(expectedEvents, + event{ + What: "add", + Obj: obj, + InitialList: true, + }, + ) + } + for i := range handlers { + cache.AddEventHandler(&handlers[i]) + handlers[i].sortEvents(func(objI, objJ interface{}) bool { + return objI.(*metav1.ObjectMeta).Name < + objJ.(*metav1.ObjectMeta).Name + }) + handlers[i].verifyAndFlush(tCtx, expectedEvents) + } + + for i := 5; i < 7; i++ { + objs = append(objs, makeObj(fmt.Sprintf("test-pvc%v", i), "1", "")) + informer.add(objs[i]) + for e := range handlers { + handlers[e].verifyAndFlush(tCtx, []event{{What: "add", Obj: objs[i]}}) + } + } + + for i, oldObj := range objs { + newObj := makeObj(fmt.Sprintf("test-pvc%v", i), "2", "") + objs[i] = newObj + informer.update(newObj) + for e := range handlers { + handlers[e].verifyAndFlush(tCtx, []event{{What: "update", OldObj: oldObj, Obj: newObj}}) + } + } + + for _, obj := range objs { + informer.delete(obj) + for e := range handlers { + handlers[e].verifyAndFlush(tCtx, []event{{What: "delete", Obj: obj}}) + } + } +} + +func TestEventHandlerConcurrency(t *testing.T) { + tCtx, cache, informer := newTest(t) + handlers := make([]mockEventHandler, 5) + + objs := make([]metav1.Object, 0, 20) + for i := 0; i < 5; i++ { + objs = append(objs, makeObj(fmt.Sprintf("test-pvc%v", i), "1", "")) + } + + // Accessing cache during OnAdd must not deadlock! + handlers[0].cache = cache + + // Each add blocks until this gets cancelled. + tCancelCtx := ktesting.WithCancel(tCtx) + var wg sync.WaitGroup + + for i := range handlers { + handlers[i].block = tCancelCtx.Done() + cache.AddEventHandler(&handlers[i]) + } + + // Execution of the add calls is random, therefore + // we have to sort again. + var expectedEvents []event + for _, obj := range objs { + wg.Add(1) + go func() { + defer wg.Done() + informer.add(obj) + }() + expectedEvents = append(expectedEvents, + event{ + What: "add", + Obj: obj, + }, + ) + } + + tCancelCtx.Cancel("proceed") + wg.Wait() + + for i := range handlers { + handlers[i].sortEvents(func(objI, objJ interface{}) bool { + return objI.(*metav1.ObjectMeta).Name < + objJ.(*metav1.ObjectMeta).Name + }) + handlers[i].verifyAndFlush(tCtx, expectedEvents) } }