scheduler: AddEventHandler for assume cache

This enables using the assume cache for cluster events.
This commit is contained in:
Patrick Ohly 2024-04-01 14:55:10 +02:00
parent 171620765e
commit dea16757ef
2 changed files with 379 additions and 30 deletions

View File

@ -25,7 +25,9 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/scheduler/util/queue"
) )
// Informer is the subset of [cache.SharedInformer] that NewAssumeCache depends upon. // Informer is the subset of [cache.SharedInformer] that NewAssumeCache depends upon.
@ -119,9 +121,33 @@ type AssumeCache struct {
// Will be used for all operations. // Will be used for all operations.
logger klog.Logger logger klog.Logger
// Synchronizes updates to store // Synchronizes updates to all fields below.
rwMutex sync.RWMutex rwMutex sync.RWMutex
// All registered event handlers.
eventHandlers []cache.ResourceEventHandler
// The eventQueue contains functions which deliver an event to one
// event handler.
//
// These functions must be invoked while *not locking* rwMutex because
// the event handlers are allowed to access the assume cache. Holding
// rwMutex then would cause a deadlock.
//
// New functions get added as part of processing a cache update while
// the rwMutex is locked. Each function which adds something to the queue
// also drains the queue before returning, therefore it is guaranteed
// that all event handlers get notified immediately (useful for unit
// testing).
//
// A channel cannot be used here because it cannot have an unbounded
// capacity. This could lead to a deadlock (writer holds rwMutex,
// gets blocked because capacity is exhausted, reader is in a handler
// which tries to lock the rwMutex). Writing into such a channel
// while not holding the rwMutex doesn't work because in-order delivery
// of events would no longer be guaranteed.
eventQueue queue.FIFO[func()]
// describes the object stored // describes the object stored
description string description string
@ -199,9 +225,11 @@ func (c *AssumeCache) add(obj interface{}) {
return return
} }
defer c.emitEvents()
c.rwMutex.Lock() c.rwMutex.Lock()
defer c.rwMutex.Unlock() defer c.rwMutex.Unlock()
var oldObj interface{}
if objInfo, _ := c.getObjInfo(name); objInfo != nil { if objInfo, _ := c.getObjInfo(name); objInfo != nil {
newVersion, err := c.getObjVersion(name, obj) newVersion, err := c.getObjVersion(name, obj)
if err != nil { if err != nil {
@ -221,6 +249,7 @@ func (c *AssumeCache) add(obj interface{}) {
c.logger.V(10).Info("Skip adding object to assume cache because version is not newer than storedVersion", "description", c.description, "cacheKey", name, "newVersion", newVersion, "storedVersion", storedVersion) c.logger.V(10).Info("Skip adding object to assume cache because version is not newer than storedVersion", "description", c.description, "cacheKey", name, "newVersion", newVersion, "storedVersion", storedVersion)
return return
} }
oldObj = objInfo.latestObj
} }
objInfo := &objInfo{name: name, latestObj: obj, apiObj: obj} objInfo := &objInfo{name: name, latestObj: obj, apiObj: obj}
@ -228,6 +257,18 @@ func (c *AssumeCache) add(obj interface{}) {
c.logger.Info("Error occurred while updating stored object", "err", err) c.logger.Info("Error occurred while updating stored object", "err", err)
} else { } else {
c.logger.V(10).Info("Adding object to assume cache", "description", c.description, "cacheKey", name, "assumeCache", obj) c.logger.V(10).Info("Adding object to assume cache", "description", c.description, "cacheKey", name, "assumeCache", obj)
for _, handler := range c.eventHandlers {
handler := handler
if oldObj == nil {
c.eventQueue.Push(func() {
handler.OnAdd(obj, false)
})
} else {
c.eventQueue.Push(func() {
handler.OnUpdate(oldObj, obj)
})
}
}
} }
} }
@ -246,14 +287,29 @@ func (c *AssumeCache) delete(obj interface{}) {
return return
} }
defer c.emitEvents()
c.rwMutex.Lock() c.rwMutex.Lock()
defer c.rwMutex.Unlock() defer c.rwMutex.Unlock()
var oldObj interface{}
if len(c.eventHandlers) > 0 {
if objInfo, _ := c.getObjInfo(name); objInfo != nil {
oldObj = objInfo.latestObj
}
}
objInfo := &objInfo{name: name} objInfo := &objInfo{name: name}
err = c.store.Delete(objInfo) err = c.store.Delete(objInfo)
if err != nil { if err != nil {
c.logger.Error(err, "Failed to delete", "description", c.description, "cacheKey", name) c.logger.Error(err, "Failed to delete", "description", c.description, "cacheKey", name)
} }
for _, handler := range c.eventHandlers {
handler := handler
c.eventQueue.Push(func() {
handler.OnDelete(oldObj)
})
}
} }
func (c *AssumeCache) getObjVersion(name string, obj interface{}) (int64, error) { func (c *AssumeCache) getObjVersion(name string, obj interface{}) (int64, error) {
@ -315,6 +371,10 @@ func (c *AssumeCache) List(indexObj interface{}) []interface{} {
c.rwMutex.RLock() c.rwMutex.RLock()
defer c.rwMutex.RUnlock() defer c.rwMutex.RUnlock()
return c.listLocked(indexObj)
}
func (c *AssumeCache) listLocked(indexObj interface{}) []interface{} {
allObjs := []interface{}{} allObjs := []interface{}{}
var objs []interface{} var objs []interface{}
if c.indexName != "" { if c.indexName != "" {
@ -358,6 +418,7 @@ func (c *AssumeCache) Assume(obj interface{}) error {
return &ObjectNameError{err} return &ObjectNameError{err}
} }
defer c.emitEvents()
c.rwMutex.Lock() c.rwMutex.Lock()
defer c.rwMutex.Unlock() defer c.rwMutex.Unlock()
@ -380,6 +441,14 @@ func (c *AssumeCache) Assume(obj interface{}) error {
return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion) return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion)
} }
for _, handler := range c.eventHandlers {
handler := handler
oldObj := objInfo.latestObj
c.eventQueue.Push(func() {
handler.OnUpdate(oldObj, obj)
})
}
// Only update the cached object // Only update the cached object
objInfo.latestObj = obj objInfo.latestObj = obj
c.logger.V(4).Info("Assumed object", "description", c.description, "cacheKey", name, "version", newVersion) c.logger.V(4).Info("Assumed object", "description", c.description, "cacheKey", name, "version", newVersion)
@ -388,6 +457,7 @@ func (c *AssumeCache) Assume(obj interface{}) error {
// Restore the informer cache's version of the object. // Restore the informer cache's version of the object.
func (c *AssumeCache) Restore(objName string) { func (c *AssumeCache) Restore(objName string) {
defer c.emitEvents()
c.rwMutex.Lock() c.rwMutex.Lock()
defer c.rwMutex.Unlock() defer c.rwMutex.Unlock()
@ -396,7 +466,53 @@ func (c *AssumeCache) Restore(objName string) {
// This could be expected if object got deleted // This could be expected if object got deleted
c.logger.V(5).Info("Restore object", "description", c.description, "cacheKey", objName, "err", err) c.logger.V(5).Info("Restore object", "description", c.description, "cacheKey", objName, "err", err)
} else { } else {
objInfo.latestObj = objInfo.apiObj if objInfo.latestObj != objInfo.apiObj {
for _, handler := range c.eventHandlers {
handler := handler
oldObj, obj := objInfo.latestObj, objInfo.apiObj
c.eventQueue.Push(func() {
handler.OnUpdate(oldObj, obj)
})
}
objInfo.latestObj = objInfo.apiObj
}
c.logger.V(4).Info("Restored object", "description", c.description, "cacheKey", objName) c.logger.V(4).Info("Restored object", "description", c.description, "cacheKey", objName)
} }
} }
// AddEventHandler adds an event handler to the cache. Events to a
// single handler are delivered sequentially, but there is no
// coordination between different handlers. A handler may use the
// cache.
func (c *AssumeCache) AddEventHandler(handler cache.ResourceEventHandler) {
defer c.emitEvents()
c.rwMutex.Lock()
defer c.rwMutex.Unlock()
c.eventHandlers = append(c.eventHandlers, handler)
allObjs := c.listLocked(nil)
for _, obj := range allObjs {
c.eventQueue.Push(func() {
handler.OnAdd(obj, true)
})
}
}
// emitEvents delivers all pending events that are in the queue, in the order
// in which they were stored there (FIFO).
func (c *AssumeCache) emitEvents() {
for {
c.rwMutex.Lock()
deliver, ok := c.eventQueue.Pop()
c.rwMutex.Unlock()
if !ok {
return
}
func() {
defer utilruntime.HandleCrash()
deliver()
}()
}
}

View File

@ -19,6 +19,8 @@ package assumecache
import ( import (
"fmt" "fmt"
"slices" "slices"
"sort"
"sync"
"testing" "testing"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
@ -118,6 +120,79 @@ func verifyList(tCtx ktesting.TContext, assumeCache *AssumeCache, expectedObjs [
} }
} }
type mockEventHandler struct {
mutex sync.Mutex
events []event
cache *AssumeCache
block <-chan struct{}
}
type event struct {
What string
OldObj, Obj interface{}
InitialList bool
}
func (m *mockEventHandler) OnAdd(obj interface{}, initialList bool) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.events = append(m.events, event{
What: "add",
Obj: obj,
InitialList: initialList,
})
if m.cache != nil {
// Must not deadlock!
m.cache.List(nil)
}
if m.block != nil {
<-m.block
}
}
func (m *mockEventHandler) OnUpdate(oldObj, obj interface{}) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.events = append(m.events, event{
What: "update",
OldObj: oldObj,
Obj: obj,
})
}
func (m *mockEventHandler) OnDelete(obj interface{}) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.events = append(m.events, event{
What: "delete",
Obj: obj,
})
}
func (m *mockEventHandler) verifyAndFlush(tCtx ktesting.TContext, expectedEvents []event) {
m.mutex.Lock()
defer m.mutex.Unlock()
tCtx.Helper()
if diff := cmp.Diff(expectedEvents, m.events); diff != "" {
tCtx.Fatalf("unexpected events (- expected, + actual):\n%s", diff)
}
m.events = nil
}
func (m *mockEventHandler) sortEvents(cmp func(objI, objJ interface{}) bool) {
m.mutex.Lock()
defer m.mutex.Unlock()
sort.Slice(m.events, func(i, j int) bool {
return cmp(m.events[i].Obj, m.events[j].Obj)
})
}
func TestAssume(t *testing.T) { func TestAssume(t *testing.T) {
scenarios := map[string]struct { scenarios := map[string]struct {
oldObj metav1.Object oldObj metav1.Object
@ -162,6 +237,8 @@ func TestAssume(t *testing.T) {
for name, scenario := range scenarios { for name, scenario := range scenarios {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
tCtx, cache, informer := newTest(t) tCtx, cache, informer := newTest(t)
var events mockEventHandler
cache.AddEventHandler(&events)
// Add old object to cache. // Add old object to cache.
informer.add(scenario.oldObj) informer.add(scenario.oldObj)
@ -173,18 +250,25 @@ func TestAssume(t *testing.T) {
t.Errorf("Assume() returned error: %v\ndiff (- expected, + actual):\n%s", err, diff) t.Errorf("Assume() returned error: %v\ndiff (- expected, + actual):\n%s", err, diff)
} }
// Check that Get returns correct object. // Check that Get returns correct object and
// that events were delivered correctly.
expectEvents := []event{{What: "add", Obj: scenario.oldObj}}
expectedObj := scenario.newObj expectedObj := scenario.newObj
if scenario.expectErr != nil { if scenario.expectErr != nil {
expectedObj = scenario.oldObj expectedObj = scenario.oldObj
} else {
expectEvents = append(expectEvents, event{What: "update", OldObj: scenario.oldObj, Obj: scenario.newObj})
} }
verify(tCtx, cache, scenario.oldObj.GetName(), expectedObj, scenario.oldObj) verify(tCtx, cache, scenario.oldObj.GetName(), expectedObj, scenario.oldObj)
events.verifyAndFlush(tCtx, expectEvents)
}) })
} }
} }
func TestRestore(t *testing.T) { func TestRestore(t *testing.T) {
tCtx, cache, informer := newTest(t) tCtx, cache, informer := newTest(t)
var events mockEventHandler
cache.AddEventHandler(&events)
// This test assumes an object with the same version as the API object. // This test assumes an object with the same version as the API object.
// The assume cache supports that, but doing so in real code suffers from // The assume cache supports that, but doing so in real code suffers from
@ -194,25 +278,40 @@ func TestRestore(t *testing.T) {
newObj := makeObj("pvc1", "5", "") newObj := makeObj("pvc1", "5", "")
// Restore object that doesn't exist // Restore object that doesn't exist
cache.Restore("nothing") ktesting.Step(tCtx, "empty cache", func(tCtx ktesting.TContext) {
cache.Restore("nothing")
events.verifyAndFlush(tCtx, nil)
})
// Add old object to cache. // Add old object to cache.
informer.add(oldObj) ktesting.Step(tCtx, "initial update", func(tCtx ktesting.TContext) {
verify(ktesting.WithStep(tCtx, "after initial update"), cache, oldObj.GetName(), oldObj, oldObj) informer.add(oldObj)
verify(tCtx, cache, oldObj.GetName(), oldObj, oldObj)
events.verifyAndFlush(tCtx, []event{{What: "add", Obj: oldObj}})
})
// Restore object. // Restore object.
cache.Restore(oldObj.GetName()) ktesting.Step(tCtx, "initial Restore", func(tCtx ktesting.TContext) {
verify(ktesting.WithStep(tCtx, "after initial Restore"), cache, oldObj.GetName(), oldObj, oldObj) cache.Restore(oldObj.GetName())
verify(tCtx, cache, oldObj.GetName(), oldObj, oldObj)
events.verifyAndFlush(tCtx, nil)
})
// Assume new object. // Assume new object.
if err := cache.Assume(newObj); err != nil { ktesting.Step(tCtx, "Assume", func(tCtx ktesting.TContext) {
t.Fatalf("Assume() returned error %v", err) if err := cache.Assume(newObj); err != nil {
} tCtx.Fatalf("Assume() returned error %v", err)
verify(ktesting.WithStep(tCtx, "after Assume"), cache, oldObj.GetName(), newObj, oldObj) }
verify(tCtx, cache, oldObj.GetName(), newObj, oldObj)
events.verifyAndFlush(tCtx, []event{{What: "update", OldObj: oldObj, Obj: newObj}})
})
// Restore object. // Restore object.
cache.Restore(oldObj.GetName()) ktesting.Step(tCtx, "second Restore", func(tCtx ktesting.TContext) {
verify(ktesting.WithStep(tCtx, "after second Restore"), cache, oldObj.GetName(), oldObj, oldObj) cache.Restore(oldObj.GetName())
verify(tCtx, cache, oldObj.GetName(), oldObj, oldObj)
events.verifyAndFlush(tCtx, []event{{What: "update", OldObj: newObj, Obj: oldObj}})
})
} }
func TestEvents(t *testing.T) { func TestEvents(t *testing.T) {
@ -226,27 +325,161 @@ func TestEvents(t *testing.T) {
informer.add(oldObj) informer.add(oldObj)
verify(ktesting.WithStep(tCtx, "after initial update"), cache, key, oldObj, oldObj) verify(ktesting.WithStep(tCtx, "after initial update"), cache, key, oldObj, oldObj)
// Receive initial list.
var events mockEventHandler
cache.AddEventHandler(&events)
events.verifyAndFlush(ktesting.WithStep(tCtx, "initial list"), []event{{What: "add", Obj: oldObj, InitialList: true}})
// Update object. // Update object.
informer.update(newObj) ktesting.Step(tCtx, "initial update", func(tCtx ktesting.TContext) {
verify(ktesting.WithStep(tCtx, "after initial update"), cache, key, newObj, newObj) informer.update(newObj)
verify(tCtx, cache, key, newObj, newObj)
events.verifyAndFlush(tCtx, []event{{What: "update", OldObj: oldObj, Obj: newObj}})
})
// Some error cases (don't occur in practice). // Some error cases (don't occur in practice).
informer.add(1) ktesting.Step(tCtx, "nop add", func(tCtx ktesting.TContext) {
verify(ktesting.WithStep(tCtx, "after nop add"), cache, key, newObj, newObj) informer.add(1)
informer.add(nil) verify(tCtx, cache, key, newObj, newObj)
verify(ktesting.WithStep(tCtx, "after nil add"), cache, key, newObj, newObj) events.verifyAndFlush(tCtx, nil)
informer.update(oldObj) })
verify(ktesting.WithStep(tCtx, "after nop update"), cache, key, newObj, newObj) ktesting.Step(tCtx, "nil add", func(tCtx ktesting.TContext) {
informer.update(nil) informer.add(nil)
verify(ktesting.WithStep(tCtx, "after nil update"), cache, key, newObj, newObj) verify(tCtx, cache, key, newObj, newObj)
informer.delete(nil) })
verify(ktesting.WithStep(tCtx, "after nop delete"), cache, key, newObj, newObj) ktesting.Step(tCtx, "nop update", func(tCtx ktesting.TContext) {
informer.update(oldObj)
events.verifyAndFlush(tCtx, nil)
verify(tCtx, cache, key, newObj, newObj)
})
ktesting.Step(tCtx, "nil update", func(tCtx ktesting.TContext) {
informer.update(nil)
verify(tCtx, cache, key, newObj, newObj)
events.verifyAndFlush(tCtx, nil)
})
ktesting.Step(tCtx, "nop delete", func(tCtx ktesting.TContext) {
informer.delete(nil)
verify(tCtx, cache, key, newObj, newObj)
events.verifyAndFlush(tCtx, nil)
})
// Delete object. // Delete object.
informer.delete(oldObj) ktesting.Step(tCtx, "delete", func(tCtx ktesting.TContext) {
_, err := cache.Get(key) informer.delete(oldObj)
if diff := cmp.Diff(ErrNotFound, err, cmpopts.EquateErrors()); diff != "" { events.verifyAndFlush(tCtx, []event{{What: "delete", Obj: newObj}})
t.Errorf("Get did not return expected error: %v\ndiff (- expected, + actual):\n%s", err, diff) _, err := cache.Get(key)
if diff := cmp.Diff(ErrNotFound, err, cmpopts.EquateErrors()); diff != "" {
tCtx.Errorf("Get did not return expected error: %v\ndiff (- expected, + actual):\n%s", err, diff)
}
})
}
func TestEventHandlers(t *testing.T) {
tCtx, cache, informer := newTest(t)
handlers := make([]mockEventHandler, 5)
objs := make([]metav1.Object, 0, 20)
for i := 0; i < 5; i++ {
objs = append(objs, makeObj(fmt.Sprintf("test-pvc%v", i), "1", ""))
informer.add(objs[i])
}
// Accessing cache during OnAdd must not deadlock!
handlers[0].cache = cache
// Order of delivered events is random, we must ensure
// increasing order by name ourselves.
var expectedEvents []event
for _, obj := range objs {
expectedEvents = append(expectedEvents,
event{
What: "add",
Obj: obj,
InitialList: true,
},
)
}
for i := range handlers {
cache.AddEventHandler(&handlers[i])
handlers[i].sortEvents(func(objI, objJ interface{}) bool {
return objI.(*metav1.ObjectMeta).Name <
objJ.(*metav1.ObjectMeta).Name
})
handlers[i].verifyAndFlush(tCtx, expectedEvents)
}
for i := 5; i < 7; i++ {
objs = append(objs, makeObj(fmt.Sprintf("test-pvc%v", i), "1", ""))
informer.add(objs[i])
for e := range handlers {
handlers[e].verifyAndFlush(tCtx, []event{{What: "add", Obj: objs[i]}})
}
}
for i, oldObj := range objs {
newObj := makeObj(fmt.Sprintf("test-pvc%v", i), "2", "")
objs[i] = newObj
informer.update(newObj)
for e := range handlers {
handlers[e].verifyAndFlush(tCtx, []event{{What: "update", OldObj: oldObj, Obj: newObj}})
}
}
for _, obj := range objs {
informer.delete(obj)
for e := range handlers {
handlers[e].verifyAndFlush(tCtx, []event{{What: "delete", Obj: obj}})
}
}
}
func TestEventHandlerConcurrency(t *testing.T) {
tCtx, cache, informer := newTest(t)
handlers := make([]mockEventHandler, 5)
objs := make([]metav1.Object, 0, 20)
for i := 0; i < 5; i++ {
objs = append(objs, makeObj(fmt.Sprintf("test-pvc%v", i), "1", ""))
}
// Accessing cache during OnAdd must not deadlock!
handlers[0].cache = cache
// Each add blocks until this gets cancelled.
tCancelCtx := ktesting.WithCancel(tCtx)
var wg sync.WaitGroup
for i := range handlers {
handlers[i].block = tCancelCtx.Done()
cache.AddEventHandler(&handlers[i])
}
// Execution of the add calls is random, therefore
// we have to sort again.
var expectedEvents []event
for _, obj := range objs {
wg.Add(1)
go func() {
defer wg.Done()
informer.add(obj)
}()
expectedEvents = append(expectedEvents,
event{
What: "add",
Obj: obj,
},
)
}
tCancelCtx.Cancel("proceed")
wg.Wait()
for i := range handlers {
handlers[i].sortEvents(func(objI, objJ interface{}) bool {
return objI.(*metav1.ObjectMeta).Name <
objJ.(*metav1.ObjectMeta).Name
})
handlers[i].verifyAndFlush(tCtx, expectedEvents)
} }
} }