mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 10:19:50 +00:00
Avoid computing key func multiple times in cacher
This commit is contained in:
parent
f10b0205e7
commit
8040719d7f
@ -125,6 +125,8 @@ func (i *indexedWatchers) terminateAll(objectType reflect.Type) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type filterObjectFunc func(string, runtime.Object) bool
|
||||||
|
|
||||||
// Cacher is responsible for serving WATCH and LIST requests for a given
|
// Cacher is responsible for serving WATCH and LIST requests for a given
|
||||||
// resource from its internal cache and updating its cache in the background
|
// resource from its internal cache and updating its cache in the background
|
||||||
// based on the underlying storage contents.
|
// based on the underlying storage contents.
|
||||||
@ -161,9 +163,6 @@ type Cacher struct {
|
|||||||
// Versioner is used to handle resource versions.
|
// Versioner is used to handle resource versions.
|
||||||
versioner Versioner
|
versioner Versioner
|
||||||
|
|
||||||
// keyFunc is used to get a key in the underyling storage for a given object.
|
|
||||||
keyFunc func(runtime.Object) (string, error)
|
|
||||||
|
|
||||||
// triggerFunc is used for optimizing amount of watchers that needs to process
|
// triggerFunc is used for optimizing amount of watchers that needs to process
|
||||||
// an incoming event.
|
// an incoming event.
|
||||||
triggerFunc TriggerPublisherFunc
|
triggerFunc TriggerPublisherFunc
|
||||||
@ -201,7 +200,6 @@ func NewCacherFromConfig(config CacherConfig) *Cacher {
|
|||||||
watchCache: watchCache,
|
watchCache: watchCache,
|
||||||
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
|
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
|
||||||
versioner: config.Versioner,
|
versioner: config.Versioner,
|
||||||
keyFunc: config.KeyFunc,
|
|
||||||
triggerFunc: config.TriggerPublisherFunc,
|
triggerFunc: config.TriggerPublisherFunc,
|
||||||
watcherIdx: 0,
|
watcherIdx: 0,
|
||||||
watchers: indexedWatchers{
|
watchers: indexedWatchers{
|
||||||
@ -328,7 +326,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
|
|||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
|
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
|
||||||
watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterFunction(key, c.keyFunc, pred), forget)
|
watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterFunction(key, pred), forget)
|
||||||
|
|
||||||
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
|
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
|
||||||
c.watcherIdx++
|
c.watcherIdx++
|
||||||
@ -382,7 +380,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
|
|||||||
if err != nil || listVal.Kind() != reflect.Slice {
|
if err != nil || listVal.Kind() != reflect.Slice {
|
||||||
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
|
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
|
||||||
}
|
}
|
||||||
filter := filterFunction(key, c.keyFunc, pred)
|
filter := filterFunction(key, pred)
|
||||||
|
|
||||||
objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace)
|
objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -394,7 +392,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
|
|||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
|
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
|
||||||
}
|
}
|
||||||
if filter(elem.Object) {
|
if filter(elem.Key, elem.Object) {
|
||||||
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
|
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -524,14 +522,9 @@ func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported b
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func filterFunction(key string, keyFunc func(runtime.Object) (string, error), p SelectionPredicate) FilterFunc {
|
func filterFunction(key string, p SelectionPredicate) filterObjectFunc {
|
||||||
f := SimpleFilter(p)
|
f := SimpleFilter(p)
|
||||||
filterFunc := func(obj runtime.Object) bool {
|
filterFunc := func(objKey string, obj runtime.Object) bool {
|
||||||
objKey, err := keyFunc(obj)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("invalid object for filter. Obj: %v. Err: %v", obj, err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if !hasPathPrefix(objKey, key) {
|
if !hasPathPrefix(objKey, key) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -626,12 +619,12 @@ type cacheWatcher struct {
|
|||||||
sync.Mutex
|
sync.Mutex
|
||||||
input chan watchCacheEvent
|
input chan watchCacheEvent
|
||||||
result chan watch.Event
|
result chan watch.Event
|
||||||
filter FilterFunc
|
filter filterObjectFunc
|
||||||
stopped bool
|
stopped bool
|
||||||
forget func(bool)
|
forget func(bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher {
|
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []watchCacheEvent, filter filterObjectFunc, forget func(bool)) *cacheWatcher {
|
||||||
watcher := &cacheWatcher{
|
watcher := &cacheWatcher{
|
||||||
input: make(chan watchCacheEvent, chanSize),
|
input: make(chan watchCacheEvent, chanSize),
|
||||||
result: make(chan watch.Event, chanSize),
|
result: make(chan watch.Event, chanSize),
|
||||||
@ -709,10 +702,10 @@ func (c *cacheWatcher) add(event *watchCacheEvent) {
|
|||||||
|
|
||||||
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
|
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
|
||||||
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
|
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
|
||||||
curObjPasses := event.Type != watch.Deleted && c.filter(event.Object)
|
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.Object)
|
||||||
oldObjPasses := false
|
oldObjPasses := false
|
||||||
if event.PrevObject != nil {
|
if event.PrevObject != nil {
|
||||||
oldObjPasses = c.filter(event.PrevObject)
|
oldObjPasses = c.filter(event.Key, event.PrevObject)
|
||||||
}
|
}
|
||||||
if !curObjPasses && !oldObjPasses {
|
if !curObjPasses && !oldObjPasses {
|
||||||
// Watcher is not interested in that object.
|
// Watcher is not interested in that object.
|
||||||
|
@ -46,6 +46,7 @@ type watchCacheEvent struct {
|
|||||||
Type watch.EventType
|
Type watch.EventType
|
||||||
Object runtime.Object
|
Object runtime.Object
|
||||||
PrevObject runtime.Object
|
PrevObject runtime.Object
|
||||||
|
Key string
|
||||||
ResourceVersion uint64
|
ResourceVersion uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -215,6 +216,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
|
|||||||
Type: event.Type,
|
Type: event.Type,
|
||||||
Object: event.Object,
|
Object: event.Object,
|
||||||
PrevObject: prevObject,
|
PrevObject: prevObject,
|
||||||
|
Key: key,
|
||||||
ResourceVersion: resourceVersion,
|
ResourceVersion: resourceVersion,
|
||||||
}
|
}
|
||||||
if w.onEvent != nil {
|
if w.onEvent != nil {
|
||||||
@ -376,6 +378,7 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]wa
|
|||||||
result[i] = watchCacheEvent{
|
result[i] = watchCacheEvent{
|
||||||
Type: watch.Added,
|
Type: watch.Added,
|
||||||
Object: elem.Object,
|
Object: elem.Object,
|
||||||
|
Key: elem.Key,
|
||||||
ResourceVersion: w.resourceVersion,
|
ResourceVersion: w.resourceVersion,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user