From 8040719d7fccd3a5143c19dc570fc09f0b61c037 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Tue, 18 Oct 2016 11:53:41 +0200 Subject: [PATCH] Avoid computing key func multiple times in cacher --- pkg/storage/cacher.go | 29 +++++++++++------------------ pkg/storage/watch_cache.go | 3 +++ 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index 98626f0dc4e..6b50c4390ca 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -125,6 +125,8 @@ func (i *indexedWatchers) terminateAll(objectType reflect.Type) { } } +type filterObjectFunc func(string, runtime.Object) bool + // Cacher is responsible for serving WATCH and LIST requests for a given // resource from its internal cache and updating its cache in the background // based on the underlying storage contents. @@ -161,9 +163,6 @@ type Cacher struct { // Versioner is used to handle resource versions. versioner Versioner - // keyFunc is used to get a key in the underyling storage for a given object. - keyFunc func(runtime.Object) (string, error) - // triggerFunc is used for optimizing amount of watchers that needs to process // an incoming event. triggerFunc TriggerPublisherFunc @@ -201,7 +200,6 @@ func NewCacherFromConfig(config CacherConfig) *Cacher { watchCache: watchCache, reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0), versioner: config.Versioner, - keyFunc: config.KeyFunc, triggerFunc: config.TriggerPublisherFunc, watcherIdx: 0, watchers: indexedWatchers{ @@ -328,7 +326,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, c.Lock() defer c.Unlock() forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported) - watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterFunction(key, c.keyFunc, pred), forget) + watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterFunction(key, pred), forget) c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported) c.watcherIdx++ @@ -382,7 +380,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p if err != nil || listVal.Kind() != reflect.Slice { return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind()) } - filter := filterFunction(key, c.keyFunc, pred) + filter := filterFunction(key, pred) objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace) if err != nil { @@ -394,7 +392,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p if !ok { return fmt.Errorf("non *storeElement returned from storage: %v", obj) } - if filter(elem.Object) { + if filter(elem.Key, elem.Object) { listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem())) } } @@ -524,14 +522,9 @@ func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported b } } -func filterFunction(key string, keyFunc func(runtime.Object) (string, error), p SelectionPredicate) FilterFunc { +func filterFunction(key string, p SelectionPredicate) filterObjectFunc { f := SimpleFilter(p) - filterFunc := func(obj runtime.Object) bool { - objKey, err := keyFunc(obj) - if err != nil { - glog.Errorf("invalid object for filter. Obj: %v. Err: %v", obj, err) - return false - } + filterFunc := func(objKey string, obj runtime.Object) bool { if !hasPathPrefix(objKey, key) { return false } @@ -626,12 +619,12 @@ type cacheWatcher struct { sync.Mutex input chan watchCacheEvent result chan watch.Event - filter FilterFunc + filter filterObjectFunc stopped bool forget func(bool) } -func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher { +func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []watchCacheEvent, filter filterObjectFunc, forget func(bool)) *cacheWatcher { watcher := &cacheWatcher{ input: make(chan watchCacheEvent, chanSize), result: make(chan watch.Event, chanSize), @@ -709,10 +702,10 @@ func (c *cacheWatcher) add(event *watchCacheEvent) { // NOTE: sendWatchCacheEvent is assumed to not modify !!! func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { - curObjPasses := event.Type != watch.Deleted && c.filter(event.Object) + curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.Object) oldObjPasses := false if event.PrevObject != nil { - oldObjPasses = c.filter(event.PrevObject) + oldObjPasses = c.filter(event.Key, event.PrevObject) } if !curObjPasses && !oldObjPasses { // Watcher is not interested in that object. diff --git a/pkg/storage/watch_cache.go b/pkg/storage/watch_cache.go index 76f56d50160..683526bbf75 100644 --- a/pkg/storage/watch_cache.go +++ b/pkg/storage/watch_cache.go @@ -46,6 +46,7 @@ type watchCacheEvent struct { Type watch.EventType Object runtime.Object PrevObject runtime.Object + Key string ResourceVersion uint64 } @@ -215,6 +216,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd Type: event.Type, Object: event.Object, PrevObject: prevObject, + Key: key, ResourceVersion: resourceVersion, } if w.onEvent != nil { @@ -376,6 +378,7 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]wa result[i] = watchCacheEvent{ Type: watch.Added, Object: elem.Object, + Key: elem.Key, ResourceVersion: w.resourceVersion, } }