diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/BUILD b/staging/src/k8s.io/apiserver/pkg/registry/generic/BUILD index d89d10356f0..5bccea16fa2 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/BUILD @@ -23,6 +23,7 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/storage:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/options.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/options.go index 907928a356e..577192b626e 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/options.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/options.go @@ -22,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/storagebackend" + "k8s.io/client-go/tools/cache" ) // RESTOptions is set of configuration options to generic registries. @@ -49,4 +50,5 @@ type StoreOptions struct { RESTOptions RESTOptionsGetter TriggerFunc storage.IndexerFuncs AttrFunc storage.AttrFunc + Indexers *cache.Indexers } diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/BUILD b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/BUILD index 719a16e3304..38f9d53b179 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/BUILD @@ -85,6 +85,7 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/dryrun:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go index 7fb785a5b75..9e5f84328e1 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go @@ -28,6 +28,7 @@ import ( "k8s.io/apiserver/pkg/storage/etcd3" "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storage/storagebackend/factory" + "k8s.io/client-go/tools/cache" ) // Creates a cacher based given storageConfig. @@ -39,7 +40,8 @@ func StorageWithCacher(capacity int) generic.StorageDecorator { newFunc func() runtime.Object, newListFunc func() runtime.Object, getAttrsFunc storage.AttrFunc, - triggerFuncs storage.IndexerFuncs) (storage.Interface, factory.DestroyFunc, error) { + triggerFuncs storage.IndexerFuncs, + indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) { s, d, err := generic.NewRawStorage(storageConfig) if err != nil { @@ -65,6 +67,7 @@ func StorageWithCacher(capacity int) generic.StorageDecorator { NewListFunc: newListFunc, GetAttrsFunc: getAttrsFunc, IndexerFuncs: triggerFuncs, + Indexers: indexers, Codec: storageConfig.Codec, } cacher, err := cacherstorage.NewCacherFromConfig(cacherConfig) diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go index dc287d1ed5b..5e56cf071c5 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go @@ -1322,6 +1322,7 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error { e.NewListFunc, attrFunc, options.TriggerFunc, + options.Indexers, ) if err != nil { return err diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go index 993f21200f0..223b630fc53 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go @@ -21,6 +21,7 @@ import ( "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storage/storagebackend/factory" + "k8s.io/client-go/tools/cache" ) // StorageDecorator is a function signature for producing a storage.Interface @@ -32,7 +33,8 @@ type StorageDecorator func( newFunc func() runtime.Object, newListFunc func() runtime.Object, getAttrsFunc storage.AttrFunc, - trigger storage.IndexerFuncs) (storage.Interface, factory.DestroyFunc, error) + trigger storage.IndexerFuncs, + indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) // UndecoratedStorage returns the given a new storage from the given config // without any decoration. @@ -43,7 +45,8 @@ func UndecoratedStorage( newFunc func() runtime.Object, newListFunc func() runtime.Object, getAttrsFunc storage.AttrFunc, - trigger storage.IndexerFuncs) (storage.Interface, factory.DestroyFunc, error) { + trigger storage.IndexerFuncs, + indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) { return NewRawStorage(config) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index aadd33d392b..fbf644991ee 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -100,6 +100,10 @@ type Config struct { // needs to process an incoming event. IndexerFuncs storage.IndexerFuncs + // Indexers is used to accelerate the list operation, falls back to regular list + // operation if no indexer found. + Indexers *cache.Indexers + // NewFunc is a function that creates new empty object storing a object of type Type. NewFunc func() runtime.Object @@ -367,7 +371,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { } watchCache := newWatchCache( - config.CacheCapacity, config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner) + config.CacheCapacity, config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers) listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) reflectorName := "storage/cacher.go:" + config.ResourcePrefix @@ -701,7 +705,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p } filter := filterWithAttrsFunction(key, pred) - objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace) + objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace) if err != nil { return err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index ebd3314650f..1a98abd14ea 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -82,6 +82,35 @@ func storeElementKey(obj interface{}) (string, error) { return elem.Key, nil } +func storeElementObject(obj interface{}) (runtime.Object, error) { + elem, ok := obj.(*storeElement) + if !ok { + return nil, fmt.Errorf("not a storeElement: %v", obj) + } + return elem.Object, nil +} + +func storeElementIndexFunc(objIndexFunc cache.IndexFunc) cache.IndexFunc { + return func(obj interface{}) (strings []string, e error) { + seo, err := storeElementObject(obj) + if err != nil { + return nil, err + } + return objIndexFunc(seo) + } +} + +func storeElementIndexers(indexers *cache.Indexers) cache.Indexers { + if indexers == nil { + return cache.Indexers{} + } + ret := cache.Indexers{} + for indexName, indexFunc := range *indexers { + ret[indexName] = storeElementIndexFunc(indexFunc) + } + return ret +} + // watchCache implements a Store interface. // However, it depends on the elements implementing runtime.Object interface. // @@ -116,7 +145,7 @@ type watchCache struct { // history" i.e. from the moment just after the newest cached watched event. // It is necessary to effectively allow clients to start watching at now. // NOTE: We assume that is thread-safe. - store cache.Store + store cache.Indexer // ResourceVersion up to which the watchCache is propagated. resourceVersion uint64 @@ -143,7 +172,8 @@ func newWatchCache( keyFunc func(runtime.Object) (string, error), eventHandler func(*watchCacheEvent), getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error), - versioner storage.Versioner) *watchCache { + versioner storage.Versioner, + indexers *cache.Indexers) *watchCache { wc := &watchCache{ capacity: capacity, keyFunc: keyFunc, @@ -151,7 +181,7 @@ func newWatchCache( cache: make([]*watchCacheEvent, capacity), startIndex: 0, endIndex: 0, - store: cache.NewStore(storeElementKey), + store: cache.NewIndexer(storeElementKey, storeElementIndexers(indexers)), resourceVersion: 0, listResourceVersion: 0, eventHandler: eventHandler, @@ -319,12 +349,19 @@ func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *utilt } // WaitUntilFreshAndList returns list of pointers to objects. -func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, trace *utiltrace.Trace) ([]interface{}, uint64, error) { +func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, matchValues []storage.MatchValue, trace *utiltrace.Trace) ([]interface{}, uint64, error) { err := w.waitUntilFreshAndBlock(resourceVersion, trace) defer w.RUnlock() if err != nil { return nil, 0, err } + + // TODO: use the smallest key size index. + for _, matchValue := range matchValues { + if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil { + return result, w.resourceVersion, nil + } + } return w.store.List(), w.resourceVersion, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index 4f40134a1b4..75f635e1e2c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -39,17 +39,19 @@ import ( ) func makeTestPod(name string, resourceVersion uint64) *v1.Pod { + return makeTestPodDetails(name, resourceVersion, "some-node", map[string]string{"k8s-app": "my-app"}) +} + +func makeTestPodDetails(name string, resourceVersion uint64, nodeName string, labels map[string]string) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: "ns", Name: name, ResourceVersion: strconv.FormatUint(resourceVersion, 10), - Labels: map[string]string{ - "k8s-app": "my-app", - }, + Labels: labels, }, Spec: v1.PodSpec{ - NodeName: "some-node", + NodeName: nodeName, }, } } @@ -64,7 +66,7 @@ func makeTestStoreElement(pod *v1.Pod) *storeElement { } // newTestWatchCache just adds a fake clock. -func newTestWatchCache(capacity int) *watchCache { +func newTestWatchCache(capacity int, indexers *cache.Indexers) *watchCache { keyFunc := func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc("prefix", obj) } @@ -77,13 +79,13 @@ func newTestWatchCache(capacity int) *watchCache { } versioner := etcd3.APIObjectVersioner{} mockHandler := func(*watchCacheEvent) {} - wc := newWatchCache(capacity, keyFunc, mockHandler, getAttrsFunc, versioner) + wc := newWatchCache(capacity, keyFunc, mockHandler, getAttrsFunc, versioner, indexers) wc.clock = clock.NewFakeClock(time.Now()) return wc } func TestWatchCacheBasic(t *testing.T) { - store := newTestWatchCache(2) + store := newTestWatchCache(2, &cache.Indexers{}) // Test Add/Update/Delete. pod1 := makeTestPod("pod", 1) @@ -160,7 +162,7 @@ func TestWatchCacheBasic(t *testing.T) { } func TestEvents(t *testing.T) { - store := newTestWatchCache(5) + store := newTestWatchCache(5, &cache.Indexers{}) store.Add(makeTestPod("pod", 3)) @@ -280,7 +282,7 @@ func TestEvents(t *testing.T) { } func TestMarker(t *testing.T) { - store := newTestWatchCache(3) + store := newTestWatchCache(3, &cache.Indexers{}) // First thing that is called when propagated from storage is Replace. store.Replace([]interface{}{ @@ -315,15 +317,51 @@ func TestMarker(t *testing.T) { } func TestWaitUntilFreshAndList(t *testing.T) { - store := newTestWatchCache(3) + store := newTestWatchCache(3, &cache.Indexers{ + "label": func(obj interface{}) ([]string, error) { + pod, ok := obj.(*v1.Pod) + if !ok { + return nil, fmt.Errorf("not a pod %#v", obj) + } + if value, ok := pod.Labels["label"]; ok { + return []string{value}, nil + } + return nil, nil + }, + "spec.nodeName": func(obj interface{}) ([]string, error) { + pod, ok := obj.(*v1.Pod) + if !ok { + return nil, fmt.Errorf("not a pod %#v", obj) + } + return []string{pod.Spec.NodeName}, nil + }, + }) // In background, update the store. go func() { - store.Add(makeTestPod("foo", 2)) - store.Add(makeTestPod("bar", 5)) + store.Add(makeTestPodDetails("pod1", 2, "node1", map[string]string{"label": "value1"})) + store.Add(makeTestPodDetails("pod2", 3, "node1", map[string]string{"label": "value1"})) + store.Add(makeTestPodDetails("pod3", 5, "node2", map[string]string{"label": "value2"})) }() - list, resourceVersion, err := store.WaitUntilFreshAndList(5, nil) + // list by empty MatchValues. + list, resourceVersion, err := store.WaitUntilFreshAndList(5, nil, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if resourceVersion != 5 { + t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion) + } + if len(list) != 3 { + t.Errorf("unexpected list returned: %#v", list) + } + + // list by label index. + matchValues := []storage.MatchValue{ + {IndexName: "label", Value: "value1"}, + {IndexName: "spec.nodeName", Value: "node2"}, + } + list, resourceVersion, err = store.WaitUntilFreshAndList(5, matchValues, nil) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -333,10 +371,38 @@ func TestWaitUntilFreshAndList(t *testing.T) { if len(list) != 2 { t.Errorf("unexpected list returned: %#v", list) } + + // list with spec.nodeName index. + matchValues = []storage.MatchValue{ + {IndexName: "not-exist-label", Value: "whatever"}, + {IndexName: "spec.nodeName", Value: "node2"}, + } + list, resourceVersion, err = store.WaitUntilFreshAndList(5, matchValues, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if resourceVersion != 5 { + t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion) + } + if len(list) != 1 { + t.Errorf("unexpected list returned: %#v", list) + } + + // list with index not exists. + matchValues = []storage.MatchValue{ + {IndexName: "not-exist-label", Value: "whatever"}, + } + list, resourceVersion, err = store.WaitUntilFreshAndList(5, matchValues, nil) + if resourceVersion != 5 { + t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion) + } + if len(list) != 3 { + t.Errorf("unexpected list returned: %#v", list) + } } func TestWaitUntilFreshAndGet(t *testing.T) { - store := newTestWatchCache(3) + store := newTestWatchCache(3, &cache.Indexers{}) // In background, update the store. go func() { @@ -361,7 +427,7 @@ func TestWaitUntilFreshAndGet(t *testing.T) { } func TestWaitUntilFreshAndListTimeout(t *testing.T) { - store := newTestWatchCache(3) + store := newTestWatchCache(3, &cache.Indexers{}) fc := store.clock.(*clock.FakeClock) // In background, step clock after the below call starts the timer. @@ -378,7 +444,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) { store.Add(makeTestPod("bar", 5)) }() - _, _, err := store.WaitUntilFreshAndList(5, nil) + _, _, err := store.WaitUntilFreshAndList(5, nil, nil) if !errors.IsTimeout(err) { t.Errorf("expected timeout error but got: %v", err) } @@ -400,10 +466,10 @@ func (t *testLW) Watch(options metav1.ListOptions) (watch.Interface, error) { } func TestReflectorForWatchCache(t *testing.T) { - store := newTestWatchCache(5) + store := newTestWatchCache(5, &cache.Indexers{}) { - _, version, err := store.WaitUntilFreshAndList(0, nil) + _, version, err := store.WaitUntilFreshAndList(0, nil, nil) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -426,7 +492,7 @@ func TestReflectorForWatchCache(t *testing.T) { r.ListAndWatch(wait.NeverStop) { - _, version, err := store.WaitUntilFreshAndList(10, nil) + _, version, err := store.WaitUntilFreshAndList(10, nil, nil) if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go index 7e8ae5241ee..31d4b2a2859 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go @@ -87,6 +87,12 @@ var Everything = SelectionPredicate{ Field: fields.Everything(), } +// MatchValue defines a pair (, ). +type MatchValue struct { + IndexName string + Value string +} + // Pass an UpdateFunc to Interface.GuaranteedUpdate to make an update // that is guaranteed to succeed. // See the comment for GuaranteedUpdate for more details. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go b/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go index 66d8d1f958a..a5fc91d8a73 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go @@ -74,6 +74,7 @@ type SelectionPredicate struct { Label labels.Selector Field fields.Selector GetAttrs AttrFunc + IndexLabels []string IndexFields []string Limit int64 Continue string @@ -128,3 +129,21 @@ func (s *SelectionPredicate) MatchesSingle() (string, bool) { func (s *SelectionPredicate) Empty() bool { return s.Label.Empty() && s.Field.Empty() } + +// For any index defined by IndexFields, if a matcher can match only (a subset) +// of objects that return for a given index, a pair (, ) +// wil be returned. +func (s *SelectionPredicate) MatcherIndex() []MatchValue { + var result []MatchValue + for _, field := range s.IndexFields { + if value, ok := s.Field.RequiresExactMatch(field); ok { + result = append(result, MatchValue{IndexName: field, Value: value}) + } + } + for _, label := range s.IndexLabels { + if value, ok := s.Label.RequiresExactMatch(label); ok { + result = append(result, MatchValue{IndexName: label, Value: value}) + } + } + return result +}