diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher.go index b452b46df68..f1ea90e91b0 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher.go @@ -130,7 +130,7 @@ func (i *indexedWatchers) terminateAll(objectType reflect.Type) { } } -type watchFilterFunc func(key string, l labels.Set, f fields.Set, uninitialized bool) bool +type filterWithAttrsFunc func(key string, l labels.Set, f fields.Set, uninitialized bool) bool // Cacher is responsible for serving WATCH and LIST requests for a given // resource from its internal cache and updating its cache in the background @@ -337,7 +337,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, c.Lock() defer c.Unlock() forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported) - watcher := newCacheWatcher(watchRV, chanSize, initEvents, watchFilterFunction(key, pred), forget, c.versioner) + watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterWithAttrsFunction(key, pred), forget, c.versioner) c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported) c.watcherIdx++ @@ -439,7 +439,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri if err != nil || listVal.Kind() != reflect.Slice { return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind()) } - filter := filterFunction(key, pred) + filter := filterWithAttrsFunction(key, pred) obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace) if err != nil { @@ -452,7 +452,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri if !ok { return fmt.Errorf("non *storeElement returned from storage: %v", obj) } - if filter(elem.Key, elem.Object) { + if filter(elem.Key, elem.Labels, elem.Fields, elem.Uninitialized) { listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem())) } } @@ -508,7 +508,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p if err != nil || listVal.Kind() != reflect.Slice { return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind()) } - filter := filterFunction(key, pred) + filter := filterWithAttrsFunction(key, pred) objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace) if err != nil { @@ -526,7 +526,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p if !ok { return fmt.Errorf("non *storeElement returned from storage: %v", obj) } - if filter(elem.Key, elem.Object) { + if filter(elem.Key, elem.Labels, elem.Fields, elem.Uninitialized) { listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem())) } } @@ -680,22 +680,7 @@ func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported b } } -func filterFunction(key string, p SelectionPredicate) func(string, runtime.Object) bool { - filterFunc := func(objKey string, obj runtime.Object) bool { - if !hasPathPrefix(objKey, key) { - return false - } - matches, err := p.Matches(obj) - if err != nil { - glog.Errorf("invalid object for matching. Obj: %v. Err: %v", obj, err) - return false - } - return matches - } - return filterFunc -} - -func watchFilterFunction(key string, p SelectionPredicate) watchFilterFunc { +func filterWithAttrsFunction(key string, p SelectionPredicate) filterWithAttrsFunc { filterFunc := func(objKey string, label labels.Set, field fields.Set, uninitialized bool) bool { if !hasPathPrefix(objKey, key) { return false @@ -788,13 +773,13 @@ type cacheWatcher struct { input chan *watchCacheEvent result chan watch.Event done chan struct{} - filter watchFilterFunc + filter filterWithAttrsFunc stopped bool forget func(bool) versioner Versioner } -func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter watchFilterFunc, forget func(bool), versioner Versioner) *cacheWatcher { +func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter filterWithAttrsFunc, forget func(bool), versioner Versioner) *cacheWatcher { watcher := &cacheWatcher{ input: make(chan *watchCacheEvent, chanSize), result: make(chan watch.Event, chanSize), diff --git a/staging/src/k8s.io/apiserver/pkg/storage/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/watch_cache.go index 274617385d4..a21a5f83074 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/watch_cache.go @@ -61,12 +61,16 @@ type watchCacheEvent struct { } // Computing a key of an object is generally non-trivial (it performs -// e.g. validation underneath). To avoid computing it multiple times -// (to serve the event in different List/Watch requests), in the -// underlying store we are keeping pair (key, object). +// e.g. validation underneath). Similarly computing object fields and +// labels. To avoid computing them multiple times (to serve the event +// in different List/Watch requests), in the underlying store we are +// keeping structs (key, object, labels, fields, uninitialized). type storeElement struct { - Key string - Object runtime.Object + Key string + Object runtime.Object + Labels labels.Set + Fields fields.Set + Uninitialized bool } func storeElementKey(obj interface{}) (string, error) { @@ -220,6 +224,20 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd return fmt.Errorf("couldn't compute key: %v", err) } elem := &storeElement{Key: key, Object: event.Object} + elem.Labels, elem.Fields, elem.Uninitialized, err = w.getAttrsFunc(event.Object) + if err != nil { + return err + } + + watchCacheEvent := &watchCacheEvent{ + Type: event.Type, + Object: elem.Object, + ObjLabels: elem.Labels, + ObjFields: elem.Fields, + ObjUninitialized: elem.Uninitialized, + Key: key, + ResourceVersion: resourceVersion, + } // TODO: We should consider moving this lock below after the watchCacheEvent // is created. In such situation, the only problematic scenario is Replace( @@ -231,34 +249,14 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd if err != nil { return err } - objLabels, objFields, objUninitialized, err := w.getAttrsFunc(event.Object) - if err != nil { - return err - } - var prevObject runtime.Object - var prevObjLabels labels.Set - var prevObjFields fields.Set - var prevObjUninitialized bool if exists { - prevObject = previous.(*storeElement).Object - prevObjLabels, prevObjFields, prevObjUninitialized, err = w.getAttrsFunc(prevObject) - if err != nil { - return err - } - } - watchCacheEvent := &watchCacheEvent{ - Type: event.Type, - Object: event.Object, - ObjLabels: objLabels, - ObjFields: objFields, - ObjUninitialized: objUninitialized, - PrevObject: prevObject, - PrevObjLabels: prevObjLabels, - PrevObjFields: prevObjFields, - PrevObjUninitialized: prevObjUninitialized, - Key: key, - ResourceVersion: resourceVersion, + previousElem := previous.(*storeElement) + watchCacheEvent.PrevObject = previousElem.Object + watchCacheEvent.PrevObjLabels = previousElem.Labels + watchCacheEvent.PrevObjFields = previousElem.Fields + watchCacheEvent.PrevObjUninitialized = previousElem.Uninitialized } + if w.onEvent != nil { w.onEvent(watchCacheEvent) }