From 9895f337ee21a248d05ac99b18c91aace909e8c9 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Tue, 18 Oct 2016 10:42:00 +0200 Subject: [PATCH 1/3] Avoid unnecessary copies in cacher --- pkg/storage/cacher.go | 7 ++++--- pkg/storage/watch_cache.go | 13 +++++++++---- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index 41fd2ee447b..06a21591e98 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -707,7 +707,8 @@ func (c *cacheWatcher) add(event *watchCacheEvent) { } } -func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) { +// NOTE: sendWatchCacheEvent is assumed to not modify !!! +func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { curObjPasses := event.Type != watch.Deleted && c.filter(event.Object) oldObjPasses := false if event.PrevObject != nil { @@ -752,7 +753,7 @@ func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uin const initProcessThreshold = 500 * time.Millisecond startTime := time.Now() for _, event := range initEvents { - c.sendWatchCacheEvent(event) + c.sendWatchCacheEvent(&event) } processingTime := time.Since(startTime) if processingTime > initProcessThreshold { @@ -772,7 +773,7 @@ func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uin } // only send events newer than resourceVersion if event.ResourceVersion > resourceVersion { - c.sendWatchCacheEvent(event) + c.sendWatchCacheEvent(&event) } } } diff --git a/pkg/storage/watch_cache.go b/pkg/storage/watch_cache.go index 84dc451ea74..c6641e56178 100644 --- a/pkg/storage/watch_cache.go +++ b/pkg/storage/watch_cache.go @@ -181,23 +181,28 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd if exists { prevObject = previous.(runtime.Object) } - watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject, resourceVersion} + watchCacheEvent := watchCacheEvent{ + Type: event.Type, + Object: event.Object, + PrevObject: prevObject, + ResourceVersion: resourceVersion, + } if w.onEvent != nil { w.onEvent(watchCacheEvent) } - w.updateCache(resourceVersion, watchCacheEvent) + w.updateCache(resourceVersion, &watchCacheEvent) w.resourceVersion = resourceVersion w.cond.Broadcast() return updateFunc(event.Object) } // Assumes that lock is already held for write. -func (w *watchCache) updateCache(resourceVersion uint64, event watchCacheEvent) { +func (w *watchCache) updateCache(resourceVersion uint64, event *watchCacheEvent) { if w.endIndex == w.startIndex+w.capacity { // Cache is full - remove the oldest element. w.startIndex++ } - w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, event} + w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, *event} w.endIndex++ } From f10b0205e760116630ce3db93c2ef7ec6061dc4f Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Tue, 18 Oct 2016 11:07:56 +0200 Subject: [PATCH 2/3] Store keys in watchCache store --- pkg/storage/cacher.go | 10 ++-- pkg/storage/watch_cache.go | 90 ++++++++++++++++++++++++++++----- pkg/storage/watch_cache_test.go | 13 +++-- 3 files changed, 91 insertions(+), 22 deletions(-) diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index 06a21591e98..98626f0dc4e 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -183,7 +183,7 @@ type Cacher struct { // internal cache and updating its cache in the background based on the given // configuration. func NewCacherFromConfig(config CacherConfig) *Cacher { - watchCache := newWatchCache(config.CacheCapacity) + watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc) listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) // Give this error when it is constructed rather than when you get the @@ -390,12 +390,12 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p } trace.Step(fmt.Sprintf("Listed %d items from cache", len(objs))) for _, obj := range objs { - object, ok := obj.(runtime.Object) + elem, ok := obj.(*storeElement) if !ok { - return fmt.Errorf("non runtime.Object returned from storage: %v", obj) + return fmt.Errorf("non *storeElement returned from storage: %v", obj) } - if filter(object) { - listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem())) + if filter(elem.Object) { + listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem())) } } trace.Step(fmt.Sprintf("Filtered %d items", listVal.Len())) diff --git a/pkg/storage/watch_cache.go b/pkg/storage/watch_cache.go index c6641e56178..76f56d50160 100644 --- a/pkg/storage/watch_cache.go +++ b/pkg/storage/watch_cache.go @@ -49,6 +49,23 @@ type watchCacheEvent struct { ResourceVersion uint64 } +// Computing a key of an object is generally non-trivial (it performs +// e.g. validation underneath). To avoid computing it multiple times +// (to serve the event in different List/Watch requests), in the +// underlying store we are keeping pair (key, object). +type storeElement struct { + Key string + Object runtime.Object +} + +func storeElementKey(obj interface{}) (string, error) { + elem, ok := obj.(*storeElement) + if !ok { + return "", fmt.Errorf("not a storeElement: %v", obj) + } + return elem.Key, nil +} + // watchCacheElement is a single "watch event" stored in a cache. // It contains the resource version of the object and the object // itself. @@ -72,6 +89,9 @@ type watchCache struct { // Maximum size of history window. capacity int + // keyFunc is used to get a key in the underlying storage for a given object. + keyFunc func(runtime.Object) (string, error) + // cache is used a cyclic buffer - its first element (with the smallest // resourceVersion) is defined by startIndex, its last element is defined // by endIndex (if cache is full it will be startIndex + capacity). @@ -100,13 +120,14 @@ type watchCache struct { clock clock.Clock } -func newWatchCache(capacity int) *watchCache { +func newWatchCache(capacity int, keyFunc func(runtime.Object) (string, error)) *watchCache { wc := &watchCache{ capacity: capacity, + keyFunc: keyFunc, cache: make([]watchCacheElement, capacity), startIndex: 0, endIndex: 0, - store: cache.NewStore(cache.MetaNamespaceKeyFunc), + store: cache.NewStore(storeElementKey), resourceVersion: 0, clock: clock.RealClock{}, } @@ -114,6 +135,7 @@ func newWatchCache(capacity int) *watchCache { return wc } +// Add takes runtime.Object as an argument. func (w *watchCache) Add(obj interface{}) error { object, resourceVersion, err := objectToVersionedRuntimeObject(obj) if err != nil { @@ -121,10 +143,11 @@ func (w *watchCache) Add(obj interface{}) error { } event := watch.Event{Type: watch.Added, Object: object} - f := func(obj runtime.Object) error { return w.store.Add(obj) } + f := func(elem *storeElement) error { return w.store.Add(elem) } return w.processEvent(event, resourceVersion, f) } +// Update takes runtime.Object as an argument. func (w *watchCache) Update(obj interface{}) error { object, resourceVersion, err := objectToVersionedRuntimeObject(obj) if err != nil { @@ -132,10 +155,11 @@ func (w *watchCache) Update(obj interface{}) error { } event := watch.Event{Type: watch.Modified, Object: object} - f := func(obj runtime.Object) error { return w.store.Update(obj) } + f := func(elem *storeElement) error { return w.store.Update(elem) } return w.processEvent(event, resourceVersion, f) } +// Delete takes runtime.Object as an argument. func (w *watchCache) Delete(obj interface{}) error { object, resourceVersion, err := objectToVersionedRuntimeObject(obj) if err != nil { @@ -143,7 +167,7 @@ func (w *watchCache) Delete(obj interface{}) error { } event := watch.Event{Type: watch.Deleted, Object: object} - f := func(obj runtime.Object) error { return w.store.Delete(obj) } + f := func(elem *storeElement) error { return w.store.Delete(elem) } return w.processEvent(event, resourceVersion, f) } @@ -170,16 +194,22 @@ func parseResourceVersion(resourceVersion string) (uint64, error) { return strconv.ParseUint(resourceVersion, 10, 64) } -func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error { +func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error { + key, err := w.keyFunc(event.Object) + if err != nil { + return fmt.Errorf("couldn't compute key: %v", err) + } + elem := &storeElement{Key: key, Object: event.Object} + w.Lock() defer w.Unlock() - previous, exists, err := w.store.Get(event.Object) + previous, exists, err := w.store.Get(elem) if err != nil { return err } var prevObject runtime.Object if exists { - prevObject = previous.(runtime.Object) + prevObject = previous.(*storeElement).Object } watchCacheEvent := watchCacheEvent{ Type: event.Type, @@ -193,7 +223,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd w.updateCache(resourceVersion, &watchCacheEvent) w.resourceVersion = resourceVersion w.cond.Broadcast() - return updateFunc(event.Object) + return updateFunc(elem) } // Assumes that lock is already held for write. @@ -206,12 +236,14 @@ func (w *watchCache) updateCache(resourceVersion uint64, event *watchCacheEvent) w.endIndex++ } +// List returns list of pointers to objects. func (w *watchCache) List() []interface{} { w.RLock() defer w.RUnlock() return w.store.List() } +// WaitUntilFreshAndList returns list of pointers to objects. func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, trace *util.Trace) ([]interface{}, uint64, error) { startTime := w.clock.Now() go func() { @@ -249,30 +281,56 @@ func (w *watchCache) ListKeys() []string { return w.store.ListKeys() } +// Get takes runtime.Object as a parameter. However, it returns +// pointer to . func (w *watchCache) Get(obj interface{}) (interface{}, bool, error) { + object, ok := obj.(runtime.Object) + if !ok { + return nil, false, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj) + } + key, err := w.keyFunc(object) + if err != nil { + return nil, false, fmt.Errorf("couldn't compute key: %v", err) + } + w.RLock() defer w.RUnlock() - return w.store.Get(obj) + return w.store.Get(&storeElement{Key: key, Object: object}) } +// GetByKey returns pointer to . func (w *watchCache) GetByKey(key string) (interface{}, bool, error) { w.RLock() defer w.RUnlock() return w.store.GetByKey(key) } +// Replace takes slice of runtime.Object as a paramater. func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error { version, err := parseResourceVersion(resourceVersion) if err != nil { return err } + toReplace := make([]interface{}, 0, len(objs)) + for _, obj := range objs { + object, ok := obj.(runtime.Object) + if !ok { + return fmt.Errorf("didn't get runtime.Object for replace: %#v", obj) + } + key, err := w.keyFunc(object) + if err != nil { + return fmt.Errorf("couldn't compute key: %v", err) + } + toReplace = append(toReplace, &storeElement{Key: key, Object: object}) + } + w.Lock() defer w.Unlock() w.startIndex = 0 w.endIndex = 0 - if err := w.store.Replace(objs, resourceVersion); err != nil { + if err := w.store.Replace(toReplace, resourceVersion); err != nil { return err } w.resourceVersion = version @@ -311,7 +369,15 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]wa allItems := w.store.List() result := make([]watchCacheEvent, len(allItems)) for i, item := range allItems { - result[i] = watchCacheEvent{Type: watch.Added, Object: item.(runtime.Object)} + elem, ok := item.(*storeElement) + if !ok { + return nil, fmt.Errorf("not a storeElement: %v", elem) + } + result[i] = watchCacheEvent{ + Type: watch.Added, + Object: elem.Object, + ResourceVersion: w.resourceVersion, + } } return result, nil } diff --git a/pkg/storage/watch_cache_test.go b/pkg/storage/watch_cache_test.go index 7843a9d398e..57628c362b1 100644 --- a/pkg/storage/watch_cache_test.go +++ b/pkg/storage/watch_cache_test.go @@ -44,7 +44,10 @@ func makeTestPod(name string, resourceVersion uint64) *api.Pod { // newTestWatchCache just adds a fake clock. func newTestWatchCache(capacity int) *watchCache { - wc := newWatchCache(capacity) + keyFunc := func(obj runtime.Object) (string, error) { + return NamespaceKeyFunc("prefix", obj) + } + wc := newWatchCache(capacity, keyFunc) wc.clock = clock.NewFakeClock(time.Now()) return wc } @@ -60,7 +63,7 @@ func TestWatchCacheBasic(t *testing.T) { if item, ok, _ := store.Get(pod1); !ok { t.Errorf("didn't find pod") } else { - if !api.Semantic.DeepEqual(pod1, item) { + if !api.Semantic.DeepEqual(&storeElement{Key: "prefix/ns/pod", Object: pod1}, item) { t.Errorf("expected %v, got %v", pod1, item) } } @@ -71,7 +74,7 @@ func TestWatchCacheBasic(t *testing.T) { if item, ok, _ := store.Get(pod2); !ok { t.Errorf("didn't find pod") } else { - if !api.Semantic.DeepEqual(pod2, item) { + if !api.Semantic.DeepEqual(&storeElement{Key: "prefix/ns/pod", Object: pod2}, item) { t.Errorf("expected %v, got %v", pod1, item) } } @@ -90,7 +93,7 @@ func TestWatchCacheBasic(t *testing.T) { { podNames := sets.String{} for _, item := range store.List() { - podNames.Insert(item.(*api.Pod).ObjectMeta.Name) + podNames.Insert(item.(*storeElement).Object.(*api.Pod).ObjectMeta.Name) } if !podNames.HasAll("pod1", "pod2", "pod3") { t.Errorf("missing pods, found %v", podNames) @@ -108,7 +111,7 @@ func TestWatchCacheBasic(t *testing.T) { { podNames := sets.String{} for _, item := range store.List() { - podNames.Insert(item.(*api.Pod).ObjectMeta.Name) + podNames.Insert(item.(*storeElement).Object.(*api.Pod).ObjectMeta.Name) } if !podNames.HasAll("pod4", "pod5") { t.Errorf("missing pods, found %v", podNames) From 8040719d7fccd3a5143c19dc570fc09f0b61c037 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Tue, 18 Oct 2016 11:53:41 +0200 Subject: [PATCH 3/3] Avoid computing key func multiple times in cacher --- pkg/storage/cacher.go | 29 +++++++++++------------------ pkg/storage/watch_cache.go | 3 +++ 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index 98626f0dc4e..6b50c4390ca 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -125,6 +125,8 @@ func (i *indexedWatchers) terminateAll(objectType reflect.Type) { } } +type filterObjectFunc func(string, runtime.Object) bool + // Cacher is responsible for serving WATCH and LIST requests for a given // resource from its internal cache and updating its cache in the background // based on the underlying storage contents. @@ -161,9 +163,6 @@ type Cacher struct { // Versioner is used to handle resource versions. versioner Versioner - // keyFunc is used to get a key in the underyling storage for a given object. - keyFunc func(runtime.Object) (string, error) - // triggerFunc is used for optimizing amount of watchers that needs to process // an incoming event. triggerFunc TriggerPublisherFunc @@ -201,7 +200,6 @@ func NewCacherFromConfig(config CacherConfig) *Cacher { watchCache: watchCache, reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0), versioner: config.Versioner, - keyFunc: config.KeyFunc, triggerFunc: config.TriggerPublisherFunc, watcherIdx: 0, watchers: indexedWatchers{ @@ -328,7 +326,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, c.Lock() defer c.Unlock() forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported) - watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterFunction(key, c.keyFunc, pred), forget) + watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterFunction(key, pred), forget) c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported) c.watcherIdx++ @@ -382,7 +380,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p if err != nil || listVal.Kind() != reflect.Slice { return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind()) } - filter := filterFunction(key, c.keyFunc, pred) + filter := filterFunction(key, pred) objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace) if err != nil { @@ -394,7 +392,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p if !ok { return fmt.Errorf("non *storeElement returned from storage: %v", obj) } - if filter(elem.Object) { + if filter(elem.Key, elem.Object) { listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem())) } } @@ -524,14 +522,9 @@ func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported b } } -func filterFunction(key string, keyFunc func(runtime.Object) (string, error), p SelectionPredicate) FilterFunc { +func filterFunction(key string, p SelectionPredicate) filterObjectFunc { f := SimpleFilter(p) - filterFunc := func(obj runtime.Object) bool { - objKey, err := keyFunc(obj) - if err != nil { - glog.Errorf("invalid object for filter. Obj: %v. Err: %v", obj, err) - return false - } + filterFunc := func(objKey string, obj runtime.Object) bool { if !hasPathPrefix(objKey, key) { return false } @@ -626,12 +619,12 @@ type cacheWatcher struct { sync.Mutex input chan watchCacheEvent result chan watch.Event - filter FilterFunc + filter filterObjectFunc stopped bool forget func(bool) } -func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher { +func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []watchCacheEvent, filter filterObjectFunc, forget func(bool)) *cacheWatcher { watcher := &cacheWatcher{ input: make(chan watchCacheEvent, chanSize), result: make(chan watch.Event, chanSize), @@ -709,10 +702,10 @@ func (c *cacheWatcher) add(event *watchCacheEvent) { // NOTE: sendWatchCacheEvent is assumed to not modify !!! func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { - curObjPasses := event.Type != watch.Deleted && c.filter(event.Object) + curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.Object) oldObjPasses := false if event.PrevObject != nil { - oldObjPasses = c.filter(event.PrevObject) + oldObjPasses = c.filter(event.Key, event.PrevObject) } if !curObjPasses && !oldObjPasses { // Watcher is not interested in that object. diff --git a/pkg/storage/watch_cache.go b/pkg/storage/watch_cache.go index 76f56d50160..683526bbf75 100644 --- a/pkg/storage/watch_cache.go +++ b/pkg/storage/watch_cache.go @@ -46,6 +46,7 @@ type watchCacheEvent struct { Type watch.EventType Object runtime.Object PrevObject runtime.Object + Key string ResourceVersion uint64 } @@ -215,6 +216,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd Type: event.Type, Object: event.Object, PrevObject: prevObject, + Key: key, ResourceVersion: resourceVersion, } if w.onEvent != nil { @@ -376,6 +378,7 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]wa result[i] = watchCacheEvent{ Type: watch.Added, Object: elem.Object, + Key: elem.Key, ResourceVersion: w.resourceVersion, } }