mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 04:06:03 +00:00
Store labels and fields with object
This commit is contained in:
parent
69324f90e6
commit
87a65b6c93
@ -130,7 +130,7 @@ func (i *indexedWatchers) terminateAll(objectType reflect.Type) {
|
||||
}
|
||||
}
|
||||
|
||||
type watchFilterFunc func(key string, l labels.Set, f fields.Set, uninitialized bool) bool
|
||||
type filterWithAttrsFunc func(key string, l labels.Set, f fields.Set, uninitialized bool) bool
|
||||
|
||||
// Cacher is responsible for serving WATCH and LIST requests for a given
|
||||
// resource from its internal cache and updating its cache in the background
|
||||
@ -337,7 +337,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
|
||||
watcher := newCacheWatcher(watchRV, chanSize, initEvents, watchFilterFunction(key, pred), forget, c.versioner)
|
||||
watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterWithAttrsFunction(key, pred), forget, c.versioner)
|
||||
|
||||
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
|
||||
c.watcherIdx++
|
||||
@ -439,7 +439,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
|
||||
if err != nil || listVal.Kind() != reflect.Slice {
|
||||
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
|
||||
}
|
||||
filter := filterFunction(key, pred)
|
||||
filter := filterWithAttrsFunction(key, pred)
|
||||
|
||||
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace)
|
||||
if err != nil {
|
||||
@ -452,7 +452,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
|
||||
if !ok {
|
||||
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
|
||||
}
|
||||
if filter(elem.Key, elem.Object) {
|
||||
if filter(elem.Key, elem.Labels, elem.Fields, elem.Uninitialized) {
|
||||
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
|
||||
}
|
||||
}
|
||||
@ -508,7 +508,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
|
||||
if err != nil || listVal.Kind() != reflect.Slice {
|
||||
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
|
||||
}
|
||||
filter := filterFunction(key, pred)
|
||||
filter := filterWithAttrsFunction(key, pred)
|
||||
|
||||
objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace)
|
||||
if err != nil {
|
||||
@ -526,7 +526,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
|
||||
if !ok {
|
||||
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
|
||||
}
|
||||
if filter(elem.Key, elem.Object) {
|
||||
if filter(elem.Key, elem.Labels, elem.Fields, elem.Uninitialized) {
|
||||
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
|
||||
}
|
||||
}
|
||||
@ -680,22 +680,7 @@ func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported b
|
||||
}
|
||||
}
|
||||
|
||||
func filterFunction(key string, p SelectionPredicate) func(string, runtime.Object) bool {
|
||||
filterFunc := func(objKey string, obj runtime.Object) bool {
|
||||
if !hasPathPrefix(objKey, key) {
|
||||
return false
|
||||
}
|
||||
matches, err := p.Matches(obj)
|
||||
if err != nil {
|
||||
glog.Errorf("invalid object for matching. Obj: %v. Err: %v", obj, err)
|
||||
return false
|
||||
}
|
||||
return matches
|
||||
}
|
||||
return filterFunc
|
||||
}
|
||||
|
||||
func watchFilterFunction(key string, p SelectionPredicate) watchFilterFunc {
|
||||
func filterWithAttrsFunction(key string, p SelectionPredicate) filterWithAttrsFunc {
|
||||
filterFunc := func(objKey string, label labels.Set, field fields.Set, uninitialized bool) bool {
|
||||
if !hasPathPrefix(objKey, key) {
|
||||
return false
|
||||
@ -788,13 +773,13 @@ type cacheWatcher struct {
|
||||
input chan *watchCacheEvent
|
||||
result chan watch.Event
|
||||
done chan struct{}
|
||||
filter watchFilterFunc
|
||||
filter filterWithAttrsFunc
|
||||
stopped bool
|
||||
forget func(bool)
|
||||
versioner Versioner
|
||||
}
|
||||
|
||||
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter watchFilterFunc, forget func(bool), versioner Versioner) *cacheWatcher {
|
||||
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter filterWithAttrsFunc, forget func(bool), versioner Versioner) *cacheWatcher {
|
||||
watcher := &cacheWatcher{
|
||||
input: make(chan *watchCacheEvent, chanSize),
|
||||
result: make(chan watch.Event, chanSize),
|
||||
|
@ -61,12 +61,16 @@ type watchCacheEvent struct {
|
||||
}
|
||||
|
||||
// Computing a key of an object is generally non-trivial (it performs
|
||||
// e.g. validation underneath). To avoid computing it multiple times
|
||||
// (to serve the event in different List/Watch requests), in the
|
||||
// underlying store we are keeping pair (key, object).
|
||||
// e.g. validation underneath). Similarly computing object fields and
|
||||
// labels. To avoid computing them multiple times (to serve the event
|
||||
// in different List/Watch requests), in the underlying store we are
|
||||
// keeping structs (key, object, labels, fields, uninitialized).
|
||||
type storeElement struct {
|
||||
Key string
|
||||
Object runtime.Object
|
||||
Key string
|
||||
Object runtime.Object
|
||||
Labels labels.Set
|
||||
Fields fields.Set
|
||||
Uninitialized bool
|
||||
}
|
||||
|
||||
func storeElementKey(obj interface{}) (string, error) {
|
||||
@ -220,6 +224,20 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
|
||||
return fmt.Errorf("couldn't compute key: %v", err)
|
||||
}
|
||||
elem := &storeElement{Key: key, Object: event.Object}
|
||||
elem.Labels, elem.Fields, elem.Uninitialized, err = w.getAttrsFunc(event.Object)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
watchCacheEvent := &watchCacheEvent{
|
||||
Type: event.Type,
|
||||
Object: elem.Object,
|
||||
ObjLabels: elem.Labels,
|
||||
ObjFields: elem.Fields,
|
||||
ObjUninitialized: elem.Uninitialized,
|
||||
Key: key,
|
||||
ResourceVersion: resourceVersion,
|
||||
}
|
||||
|
||||
// TODO: We should consider moving this lock below after the watchCacheEvent
|
||||
// is created. In such situation, the only problematic scenario is Replace(
|
||||
@ -231,34 +249,14 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
objLabels, objFields, objUninitialized, err := w.getAttrsFunc(event.Object)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var prevObject runtime.Object
|
||||
var prevObjLabels labels.Set
|
||||
var prevObjFields fields.Set
|
||||
var prevObjUninitialized bool
|
||||
if exists {
|
||||
prevObject = previous.(*storeElement).Object
|
||||
prevObjLabels, prevObjFields, prevObjUninitialized, err = w.getAttrsFunc(prevObject)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
watchCacheEvent := &watchCacheEvent{
|
||||
Type: event.Type,
|
||||
Object: event.Object,
|
||||
ObjLabels: objLabels,
|
||||
ObjFields: objFields,
|
||||
ObjUninitialized: objUninitialized,
|
||||
PrevObject: prevObject,
|
||||
PrevObjLabels: prevObjLabels,
|
||||
PrevObjFields: prevObjFields,
|
||||
PrevObjUninitialized: prevObjUninitialized,
|
||||
Key: key,
|
||||
ResourceVersion: resourceVersion,
|
||||
previousElem := previous.(*storeElement)
|
||||
watchCacheEvent.PrevObject = previousElem.Object
|
||||
watchCacheEvent.PrevObjLabels = previousElem.Labels
|
||||
watchCacheEvent.PrevObjFields = previousElem.Fields
|
||||
watchCacheEvent.PrevObjUninitialized = previousElem.Uninitialized
|
||||
}
|
||||
|
||||
if w.onEvent != nil {
|
||||
w.onEvent(watchCacheEvent)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user