From 1c2fa0c7f791cbe28ee54d957d6f8a1120d91018 Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Sun, 5 Feb 2023 18:46:19 -0500 Subject: [PATCH] Partition watchers by namespace/name scope --- pkg/registry/core/componentstatus/rest.go | 7 +- .../core/configmap/storage/storage.go | 2 - pkg/registry/core/configmap/strategy.go | 12 +- pkg/registry/core/node/storage/storage.go | 2 - pkg/registry/core/node/strategy.go | 12 +- pkg/registry/core/secret/storage/storage.go | 2 - pkg/registry/core/secret/strategy.go | 12 +- .../pkg/registry/generic/registry/store.go | 20 + .../apiserver/pkg/storage/cacher/cacher.go | 78 ++- .../storage/cacher/cacher_whitebox_test.go | 1 + .../pkg/storage/etcd3/watcher_test.go | 10 + .../pkg/storage/selection_predicate.go | 12 + .../pkg/storage/testing/watcher_tests.go | 541 ++++++++++++++++++ .../pkg/storage/tests/cacher_test.go | 32 ++ 14 files changed, 693 insertions(+), 50 deletions(-) diff --git a/pkg/registry/core/componentstatus/rest.go b/pkg/registry/core/componentstatus/rest.go index 4539217a674..dc940a42283 100644 --- a/pkg/registry/core/componentstatus/rest.go +++ b/pkg/registry/core/componentstatus/rest.go @@ -108,10 +108,9 @@ func (rs *REST) List(ctx context.Context, options *metainternalversion.ListOptio func componentStatusPredicate(options *metainternalversion.ListOptions) storage.SelectionPredicate { pred := storage.SelectionPredicate{ - Label: labels.Everything(), - Field: fields.Everything(), - GetAttrs: nil, - IndexFields: []string{}, + Label: labels.Everything(), + Field: fields.Everything(), + GetAttrs: nil, } if options != nil { if options.LabelSelector != nil { diff --git a/pkg/registry/core/configmap/storage/storage.go b/pkg/registry/core/configmap/storage/storage.go index 14e64a53936..c3cff18ab5f 100644 --- a/pkg/registry/core/configmap/storage/storage.go +++ b/pkg/registry/core/configmap/storage/storage.go @@ -21,7 +21,6 @@ import ( "k8s.io/apiserver/pkg/registry/generic" genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" "k8s.io/apiserver/pkg/registry/rest" - "k8s.io/apiserver/pkg/storage" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/printers" printersinternal "k8s.io/kubernetes/pkg/printers/internalversion" @@ -52,7 +51,6 @@ func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, error) { options := &generic.StoreOptions{ RESTOptions: optsGetter, AttrFunc: configmap.GetAttrs, - TriggerFunc: map[string]storage.IndexerFunc{"metadata.name": configmap.NameTriggerFunc}, } if err := store.CompleteWithOptions(options); err != nil { return nil, err diff --git a/pkg/registry/core/configmap/strategy.go b/pkg/registry/core/configmap/strategy.go index a78bc5d4cec..c85e115cbf3 100644 --- a/pkg/registry/core/configmap/strategy.go +++ b/pkg/registry/core/configmap/strategy.go @@ -109,18 +109,12 @@ func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) { // Matcher returns a selection predicate for a given label and field selector. func Matcher(label labels.Selector, field fields.Selector) pkgstorage.SelectionPredicate { return pkgstorage.SelectionPredicate{ - Label: label, - Field: field, - GetAttrs: GetAttrs, - IndexFields: []string{"metadata.name"}, + Label: label, + Field: field, + GetAttrs: GetAttrs, } } -// NameTriggerFunc returns value metadata.namespace of given object. -func NameTriggerFunc(obj runtime.Object) string { - return obj.(*api.ConfigMap).ObjectMeta.Name -} - // SelectableFields returns a field set that can be used for filter selection func SelectableFields(obj *api.ConfigMap) fields.Set { return generic.ObjectMetaFieldsSet(&obj.ObjectMeta, true) diff --git a/pkg/registry/core/node/storage/storage.go b/pkg/registry/core/node/storage/storage.go index 912dcb84171..647df2e6414 100644 --- a/pkg/registry/core/node/storage/storage.go +++ b/pkg/registry/core/node/storage/storage.go @@ -28,7 +28,6 @@ import ( "k8s.io/apiserver/pkg/registry/generic" genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" "k8s.io/apiserver/pkg/registry/rest" - "k8s.io/apiserver/pkg/storage" api "k8s.io/kubernetes/pkg/apis/core" k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1" "k8s.io/kubernetes/pkg/kubelet/client" @@ -112,7 +111,6 @@ func NewStorage(optsGetter generic.RESTOptionsGetter, kubeletClientConfig client options := &generic.StoreOptions{ RESTOptions: optsGetter, AttrFunc: node.GetAttrs, - TriggerFunc: map[string]storage.IndexerFunc{"metadata.name": node.NameTriggerFunc}, } if err := store.CompleteWithOptions(options); err != nil { return nil, err diff --git a/pkg/registry/core/node/strategy.go b/pkg/registry/core/node/strategy.go index 61a49b507fc..b2492d74281 100644 --- a/pkg/registry/core/node/strategy.go +++ b/pkg/registry/core/node/strategy.go @@ -223,18 +223,12 @@ func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) { // MatchNode returns a generic matcher for a given label and field selector. func MatchNode(label labels.Selector, field fields.Selector) pkgstorage.SelectionPredicate { return pkgstorage.SelectionPredicate{ - Label: label, - Field: field, - GetAttrs: GetAttrs, - IndexFields: []string{"metadata.name"}, + Label: label, + Field: field, + GetAttrs: GetAttrs, } } -// NameTriggerFunc returns value metadata.namespace of given object. -func NameTriggerFunc(obj runtime.Object) string { - return obj.(*api.Node).ObjectMeta.Name -} - // ResourceLocation returns a URL and transport which one can use to send traffic for the specified node. func ResourceLocation(getter ResourceGetter, connection client.ConnectionInfoGetter, proxyTransport http.RoundTripper, ctx context.Context, id string) (*url.URL, http.RoundTripper, error) { schemeReq, name, portReq, valid := utilnet.SplitSchemeNamePort(id) diff --git a/pkg/registry/core/secret/storage/storage.go b/pkg/registry/core/secret/storage/storage.go index c329c414761..3808cd7258f 100644 --- a/pkg/registry/core/secret/storage/storage.go +++ b/pkg/registry/core/secret/storage/storage.go @@ -20,7 +20,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apiserver/pkg/registry/generic" genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" - "k8s.io/apiserver/pkg/storage" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/printers" printersinternal "k8s.io/kubernetes/pkg/printers/internalversion" @@ -51,7 +50,6 @@ func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, error) { options := &generic.StoreOptions{ RESTOptions: optsGetter, AttrFunc: secret.GetAttrs, - TriggerFunc: map[string]storage.IndexerFunc{"metadata.name": secret.NameTriggerFunc}, } if err := store.CompleteWithOptions(options); err != nil { return nil, err diff --git a/pkg/registry/core/secret/strategy.go b/pkg/registry/core/secret/strategy.go index 717f731423e..18b6c0116d7 100644 --- a/pkg/registry/core/secret/strategy.go +++ b/pkg/registry/core/secret/strategy.go @@ -113,18 +113,12 @@ func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) { // Matcher returns a selection predicate for a given label and field selector. func Matcher(label labels.Selector, field fields.Selector) pkgstorage.SelectionPredicate { return pkgstorage.SelectionPredicate{ - Label: label, - Field: field, - GetAttrs: GetAttrs, - IndexFields: []string{"metadata.name"}, + Label: label, + Field: field, + GetAttrs: GetAttrs, } } -// NameTriggerFunc returns value metadata.namespace of given object. -func NameTriggerFunc(obj runtime.Object) string { - return obj.(*api.Secret).ObjectMeta.Name -} - // SelectableFields returns a field set that can be used for filter selection func SelectableFields(obj *api.Secret) fields.Set { objectMetaFieldsSet := generic.ObjectMetaFieldsSet(&obj.ObjectMeta, true) diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go index 79c42a19663..7fa412ef3e2 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go @@ -25,6 +25,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/api/validation" "k8s.io/apimachinery/pkg/api/validation/path" metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -364,6 +365,16 @@ func (e *Store) ListPredicate(ctx context.Context, p storage.SelectionPredicate, Predicate: p, Recursive: true, } + + // if we're not already namespace-scoped, see if the field selector narrows the scope of the watch + if requestNamespace, _ := genericapirequest.NamespaceFrom(ctx); len(requestNamespace) == 0 { + if selectorNamespace, ok := p.MatchesSingleNamespace(); ok { + if len(validation.ValidateNamespaceName(selectorNamespace, false)) == 0 { + ctx = genericapirequest.WithNamespace(ctx, selectorNamespace) + } + } + } + if name, ok := p.MatchesSingle(); ok { if key, err := e.KeyFunc(ctx, name); err == nil { storageOpts.Recursive = false @@ -1279,6 +1290,15 @@ func (e *Store) Watch(ctx context.Context, options *metainternalversion.ListOpti func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate, resourceVersion string) (watch.Interface, error) { storageOpts := storage.ListOptions{ResourceVersion: resourceVersion, Predicate: p, Recursive: true} + // if we're not already namespace-scoped, see if the field selector narrows the scope of the watch + if requestNamespace, _ := genericapirequest.NamespaceFrom(ctx); len(requestNamespace) == 0 { + if selectorNamespace, ok := p.MatchesSingleNamespace(); ok { + if len(validation.ValidateNamespaceName(selectorNamespace, false)) == 0 { + ctx = genericapirequest.WithNamespace(ctx, selectorNamespace) + } + } + } + key := e.KeyRootFunc(ctx) if name, ok := p.MatchesSingle(); ok { if k, err := e.KeyFunc(ctx, name); err == nil { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 1956e5fc030..0e03f3d5b77 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -37,6 +37,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/audit" + "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/cacher/metrics" @@ -126,29 +127,37 @@ func (wm watchersMap) terminateAll(done func(*cacheWatcher)) { } type indexedWatchers struct { - allWatchers watchersMap + allWatchers map[namespacedName]watchersMap valueWatchers map[string]watchersMap } -func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, value string, supported bool) { +func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, scope namespacedName, value string, supported bool) { if supported { if _, ok := i.valueWatchers[value]; !ok { i.valueWatchers[value] = watchersMap{} } i.valueWatchers[value].addWatcher(w, number) } else { - i.allWatchers.addWatcher(w, number) + scopedWatchers, ok := i.allWatchers[scope] + if !ok { + scopedWatchers = watchersMap{} + i.allWatchers[scope] = scopedWatchers + } + scopedWatchers.addWatcher(w, number) } } -func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool, done func(*cacheWatcher)) { +func (i *indexedWatchers) deleteWatcher(number int, scope namespacedName, value string, supported bool, done func(*cacheWatcher)) { if supported { i.valueWatchers[value].deleteWatcher(number, done) if len(i.valueWatchers[value]) == 0 { delete(i.valueWatchers, value) } } else { - i.allWatchers.deleteWatcher(number, done) + i.allWatchers[scope].deleteWatcher(number, done) + if len(i.allWatchers[scope]) == 0 { + delete(i.allWatchers, scope) + } } } @@ -160,10 +169,13 @@ func (i *indexedWatchers) terminateAll(groupResource schema.GroupResource, done if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 { klog.Warningf("Terminating all watchers from cacher %v", groupResource) } - i.allWatchers.terminateAll(done) + for _, watchers := range i.allWatchers { + watchers.terminateAll(done) + } for _, watchers := range i.valueWatchers { watchers.terminateAll(done) } + i.allWatchers = map[namespacedName]watchersMap{} i.valueWatchers = map[string]watchersMap{} } @@ -361,7 +373,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { indexedTrigger: indexedTrigger, watcherIdx: 0, watchers: indexedWatchers{ - allWatchers: make(map[int]*cacheWatcher), + allWatchers: make(map[namespacedName]watchersMap), valueWatchers: make(map[string]watchersMap), }, // TODO: Figure out the correct value for the buffer size. @@ -478,6 +490,11 @@ func (c *Cacher) Delete( return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, nil) } +type namespacedName struct { + namespace string + name string +} + // Watch implements storage.Interface. func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { pred := opts.Predicate @@ -497,6 +514,19 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions return nil, errors.NewServiceUnavailable(err.Error()) } + // determine the namespace and name scope of the watch, first from the request, secondarily from the field selector + scope := namespacedName{} + if requestNamespace, ok := request.NamespaceFrom(ctx); ok && len(requestNamespace) > 0 { + scope.namespace = requestNamespace + } else if selectorNamespace, ok := pred.Field.RequiresExactMatch("metadata.namespace"); ok { + scope.namespace = selectorNamespace + } + if requestInfo, ok := request.RequestInfoFrom(ctx); ok && requestInfo != nil && len(requestInfo.Name) > 0 { + scope.name = requestInfo.Name + } else if selectorName, ok := pred.Field.RequiresExactMatch("metadata.name"); ok { + scope.name = selectorName + } + triggerValue, triggerSupported := "", false if c.indexedTrigger != nil { for _, field := range pred.IndexFields { @@ -554,8 +584,8 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions c.Lock() defer c.Unlock() // Update watcher.forget function once we can compute it. - watcher.forget = forgetWatcher(c, watcher, c.watcherIdx, triggerValue, triggerSupported) - c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported) + watcher.forget = forgetWatcher(c, watcher, c.watcherIdx, scope, triggerValue, triggerSupported) + c.watchers.addWatcher(watcher, c.watcherIdx, scope, triggerValue, triggerSupported) // Add it to the queue only when the client support watch bookmarks. if watcher.allowWatchBookmarks { @@ -986,10 +1016,32 @@ func (c *Cacher) startDispatching(event *watchCacheEvent) { return } - // Iterate over "allWatchers" no matter what the trigger function is. - for _, watcher := range c.watchers.allWatchers { + // iterate over watchers for each applicable namespace/name tuple + namespace := event.ObjFields["metadata.namespace"] + name := event.ObjFields["metadata.name"] + if len(namespace) > 0 { + if len(name) > 0 { + // namespaced watchers scoped by name + for _, watcher := range c.watchers.allWatchers[namespacedName{namespace: namespace, name: name}] { + c.watchersBuffer = append(c.watchersBuffer, watcher) + } + } + // namespaced watchers not scoped by name + for _, watcher := range c.watchers.allWatchers[namespacedName{namespace: namespace}] { + c.watchersBuffer = append(c.watchersBuffer, watcher) + } + } + if len(name) > 0 { + // cluster-wide watchers scoped by name + for _, watcher := range c.watchers.allWatchers[namespacedName{name: name}] { + c.watchersBuffer = append(c.watchersBuffer, watcher) + } + } + // cluster-wide watchers unscoped by name + for _, watcher := range c.watchers.allWatchers[namespacedName{}] { c.watchersBuffer = append(c.watchersBuffer, watcher) } + if supported { // Iterate over watchers interested in the given values of the trigger. for _, triggerValue := range triggerValues { @@ -1071,7 +1123,7 @@ func (c *Cacher) Stop() { c.stopWg.Wait() } -func forgetWatcher(c *Cacher, w *cacheWatcher, index int, triggerValue string, triggerSupported bool) func(bool) { +func forgetWatcher(c *Cacher, w *cacheWatcher, index int, scope namespacedName, triggerValue string, triggerSupported bool) func(bool) { return func(drainWatcher bool) { c.Lock() defer c.Unlock() @@ -1081,7 +1133,7 @@ func forgetWatcher(c *Cacher, w *cacheWatcher, index int, triggerValue string, t // It's possible that the watcher is already not in the structure (e.g. in case of // simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked() // on a watcher multiple times. - c.watchers.deleteWatcher(index, triggerValue, triggerSupported, c.stopWatcherLocked) + c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported, c.stopWatcherLocked) } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index b95cf10513d..e721ef87e37 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -298,6 +298,7 @@ func TestWatchCacheBypass(t *testing.T) { backingStorage.injectError(errDummy) _, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{ ResourceVersion: "0", + Predicate: storage.Everything, }) if err != nil { t.Errorf("Watch with RV=0 should be served from cache: %v", err) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index d4239062dee..628c9124d79 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -34,6 +34,16 @@ func TestWatch(t *testing.T) { storagetesting.RunTestWatch(ctx, t, store) } +func TestClusterScopedWatch(t *testing.T) { + ctx, store, _ := testSetup(t) + storagetesting.TestClusterScopedWatch(ctx, t, store) +} + +func TestNamespaceScopedWatch(t *testing.T) { + ctx, store, _ := testSetup(t) + storagetesting.TestNamespaceScopedWatch(ctx, t, store) +} + func TestDeleteTriggerWatch(t *testing.T) { ctx, store, _ := testSetup(t) storagetesting.RunTestDeleteTriggerWatch(ctx, t, store) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go b/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go index 7370518e394..a0a14366f2a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go @@ -112,6 +112,18 @@ func (s *SelectionPredicate) MatchesObjectAttributes(l labels.Set, f fields.Set) return matched } +// MatchesSingleNamespace will return (namespace, true) if and only if s.Field matches on the object's +// namespace. +func (s *SelectionPredicate) MatchesSingleNamespace() (string, bool) { + if len(s.Continue) > 0 { + return "", false + } + if namespace, ok := s.Field.RequiresExactMatch("metadata.namespace"); ok { + return namespace, true + } + return "", false +} + // MatchesSingle will return (name, true) if and only if s.Field matches on the object's // name. func (s *SelectionPredicate) MatchesSingle() (string, bool) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go index 30558e9ff58..c3096493939 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/apis/example" + genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/value" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" @@ -372,8 +373,548 @@ func RunOptionalTestProgressNotify(ctx context.Context, t *testing.T, store stor }) } +// It tests watches of cluster-scoped resources. +func TestClusterScopedWatch(ctx context.Context, t *testing.T, store storage.Interface) { + tests := []struct { + name string + // For watch request, the name of object is specified with field selector + // "metadata.name=objectName". So in this watch tests, we should set the + // requestedName and field selector "metadata.name=requestedName" at the + // same time or set neighter of them. + requestedName string + recursive bool + fieldSelector fields.Selector + indexFields []string + watchTests []*testWatchStruct + }{ + { + name: "cluster-wide watch, request without name, without field selector", + recursive: true, + fieldSelector: fields.Everything(), + watchTests: []*testWatchStruct{ + {basePod("t1-foo1"), true, watch.Added}, + {basePodUpdated("t1-foo1"), true, watch.Modified}, + {basePodAssigned("t1-foo2", "t1-bar1"), true, watch.Added}, + }, + }, + { + name: "cluster-wide watch, request without name, field selector with spec.nodeName", + recursive: true, + fieldSelector: fields.ParseSelectorOrDie("spec.nodeName=t2-bar1"), + indexFields: []string{"spec.nodeName"}, + watchTests: []*testWatchStruct{ + {basePod("t2-foo1"), false, ""}, + {basePodAssigned("t2-foo1", "t2-bar1"), true, watch.Added}, + }, + }, + { + name: "cluster-wide watch, request without name, field selector with spec.nodeName to filter out watch", + recursive: true, + fieldSelector: fields.ParseSelectorOrDie("spec.nodeName!=t3-bar1"), + indexFields: []string{"spec.nodeName"}, + watchTests: []*testWatchStruct{ + {basePod("t3-foo1"), true, watch.Added}, + {basePod("t3-foo2"), true, watch.Added}, + {basePodUpdated("t3-foo1"), true, watch.Modified}, + {basePodAssigned("t3-foo1", "t3-bar1"), true, watch.Deleted}, + }, + }, + { + name: "cluster-wide watch, request with name, field selector with metadata.name", + requestedName: "t4-foo1", + fieldSelector: fields.ParseSelectorOrDie("metadata.name=t4-foo1"), + watchTests: []*testWatchStruct{ + {basePod("t4-foo1"), true, watch.Added}, + {basePod("t4-foo2"), false, ""}, + {basePodUpdated("t4-foo1"), true, watch.Modified}, + {basePodUpdated("t4-foo2"), false, ""}, + }, + }, + { + name: "cluster-wide watch, request with name, field selector with metadata.name and spec.nodeName", + requestedName: "t5-foo1", + fieldSelector: fields.SelectorFromSet(fields.Set{ + "metadata.name": "t5-foo1", + "spec.nodeName": "t5-bar1", + }), + indexFields: []string{"spec.nodeName"}, + watchTests: []*testWatchStruct{ + {basePod("t5-foo1"), false, ""}, + {basePod("t5-foo2"), false, ""}, + {basePodUpdated("t5-foo1"), false, ""}, + {basePodUpdated("t5-foo2"), false, ""}, + {basePodAssigned("t5-foo1", "t5-bar1"), true, watch.Added}, + }, + }, + { + name: "cluster-wide watch, request with name, field selector with metadata.name, and with spec.nodeName to filter out watch", + requestedName: "t6-foo1", + fieldSelector: fields.AndSelectors( + fields.ParseSelectorOrDie("spec.nodeName!=t6-bar1"), + fields.SelectorFromSet(fields.Set{"metadata.name": "t6-foo1"}), + ), + indexFields: []string{"spec.nodeName"}, + watchTests: []*testWatchStruct{ + {basePod("t6-foo1"), true, watch.Added}, + {basePod("t6-foo2"), false, ""}, + {basePodUpdated("t6-foo1"), true, watch.Modified}, + {basePodAssigned("t6-foo1", "t6-bar1"), true, watch.Deleted}, + {basePodAssigned("t6-foo2", "t6-bar1"), false, ""}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + requestInfo := &genericapirequest.RequestInfo{} + requestInfo.Name = tt.requestedName + requestInfo.Namespace = "" + ctx = genericapirequest.WithRequestInfo(ctx, requestInfo) + ctx = genericapirequest.WithNamespace(ctx, "") + + watchKey := "/pods" + if tt.requestedName != "" { + watchKey += "/" + tt.requestedName + } + + predicate := createPodPredicate(tt.fieldSelector, false, tt.indexFields) + + list := &example.PodList{} + opts := storage.ListOptions{ + ResourceVersion: "", + Predicate: predicate, + Recursive: true, + } + if err := store.GetList(ctx, "/pods", opts, list); err != nil { + t.Errorf("Unexpected error: %v", err) + } + + opts.ResourceVersion = list.ResourceVersion + opts.Recursive = tt.recursive + + w, err := store.Watch(ctx, watchKey, opts) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + + currentObjs := map[string]*example.Pod{} + for _, watchTest := range tt.watchTests { + out := &example.Pod{} + key := "pods/" + watchTest.obj.Name + err := store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( + func(runtime.Object) (runtime.Object, error) { + obj := watchTest.obj.DeepCopy() + return obj, nil + }), nil) + if err != nil { + t.Fatalf("GuaranteedUpdate failed: %v", err) + } + + expectObj := out + if watchTest.watchType == watch.Deleted { + expectObj = currentObjs[watchTest.obj.Name] + expectObj.ResourceVersion = out.ResourceVersion + delete(currentObjs, watchTest.obj.Name) + } else { + currentObjs[watchTest.obj.Name] = out + } + if watchTest.expectEvent { + testCheckResult(t, watchTest.watchType, w, expectObj) + } + } + w.Stop() + testCheckStop(t, w) + }) + } +} + +// It tests watch of namespace-scoped resources. +func TestNamespaceScopedWatch(ctx context.Context, t *testing.T, store storage.Interface) { + tests := []struct { + name string + // For watch request, the name of object is specified with field selector + // "metadata.name=objectName". So in this watch tests, we should set the + // requestedName and field selector "metadata.name=requestedName" at the + // same time or set neighter of them. + requestedName string + requestedNamespace string + recursive bool + fieldSelector fields.Selector + indexFields []string + watchTests []*testWatchStruct + }{ + { + name: "namespaced watch, request without name, request without namespace, without field selector", + recursive: true, + fieldSelector: fields.Everything(), + watchTests: []*testWatchStruct{ + {baseNamespacedPod("t1-foo1", "t1-ns1"), true, watch.Added}, + {baseNamespacedPod("t1-foo2", "t1-ns2"), true, watch.Added}, + {baseNamespacedPodUpdated("t1-foo1", "t1-ns1"), true, watch.Modified}, + {baseNamespacedPodUpdated("t1-foo2", "t1-ns2"), true, watch.Modified}, + }, + }, + { + name: "namespaced watch, request without name, request without namespace, field selector with metadata.namespace", + recursive: true, + fieldSelector: fields.ParseSelectorOrDie("metadata.namespace=t2-ns1"), + watchTests: []*testWatchStruct{ + {baseNamespacedPod("t2-foo1", "t2-ns1"), true, watch.Added}, + {baseNamespacedPod("t2-foo1", "t2-ns2"), false, ""}, + {baseNamespacedPodUpdated("t2-foo1", "t2-ns1"), true, watch.Modified}, + {baseNamespacedPodUpdated("t2-foo1", "t2-ns2"), false, ""}, + }, + }, + { + name: "namespaced watch, request without name, request without namespace, field selector with spec.nodename", + recursive: true, + fieldSelector: fields.ParseSelectorOrDie("spec.nodeName=t3-bar1"), + indexFields: []string{"spec.nodeName"}, + watchTests: []*testWatchStruct{ + {baseNamespacedPod("t3-foo1", "t3-ns1"), false, ""}, + {baseNamespacedPod("t3-foo2", "t3-ns2"), false, ""}, + {baseNamespacedPodAssigned("t3-foo1", "t3-ns1", "t3-bar1"), true, watch.Added}, + {baseNamespacedPodAssigned("t3-foo2", "t3-ns2", "t3-bar1"), true, watch.Added}, + }, + }, + { + name: "namespaced watch, request without name, request without namespace, field selector with spec.nodename to filter out watch", + recursive: true, + fieldSelector: fields.ParseSelectorOrDie("spec.nodeName!=t4-bar1"), + indexFields: []string{"spec.nodeName"}, + watchTests: []*testWatchStruct{ + {baseNamespacedPod("t4-foo1", "t4-ns1"), true, watch.Added}, + {baseNamespacedPod("t4-foo2", "t4-ns1"), true, watch.Added}, + {baseNamespacedPodUpdated("t4-foo1", "t4-ns1"), true, watch.Modified}, + {baseNamespacedPodAssigned("t4-foo1", "t4-ns1", "t4-bar1"), true, watch.Deleted}, + }, + }, + { + name: "namespaced watch, request without name, request with namespace, without field selector", + requestedNamespace: "t5-ns1", + recursive: true, + fieldSelector: fields.Everything(), + watchTests: []*testWatchStruct{ + {baseNamespacedPod("t5-foo1", "t5-ns1"), true, watch.Added}, + {baseNamespacedPod("t5-foo1", "t5-ns2"), false, ""}, + {baseNamespacedPod("t5-foo2", "t5-ns1"), true, watch.Added}, + {baseNamespacedPodUpdated("t5-foo1", "t5-ns1"), true, watch.Modified}, + {baseNamespacedPodUpdated("t5-foo1", "t5-ns2"), false, ""}, + }, + }, + { + name: "namespaced watch, request without name, request with namespace, field selector with matched metadata.namespace", + requestedNamespace: "t6-ns1", + recursive: true, + fieldSelector: fields.ParseSelectorOrDie("metadata.namespace=t6-ns1"), + watchTests: []*testWatchStruct{ + {baseNamespacedPod("t6-foo1", "t6-ns1"), true, watch.Added}, + {baseNamespacedPod("t6-foo1", "t6-ns2"), false, ""}, + {baseNamespacedPodUpdated("t6-foo1", "t6-ns1"), true, watch.Modified}, + }, + }, + { + name: "namespaced watch, request without name, request with namespace, field selector with non-matched metadata.namespace", + requestedNamespace: "t7-ns1", + recursive: true, + fieldSelector: fields.ParseSelectorOrDie("metadata.namespace=t7-ns2"), + watchTests: []*testWatchStruct{ + {baseNamespacedPod("t7-foo1", "t7-ns1"), false, ""}, + {baseNamespacedPod("t7-foo1", "t7-ns2"), false, ""}, + {baseNamespacedPodUpdated("t7-foo1", "t7-ns1"), false, ""}, + {baseNamespacedPodUpdated("t7-foo1", "t7-ns2"), false, ""}, + }, + }, + { + name: "namespaced watch, request without name, request with namespace, field selector with spec.nodename", + requestedNamespace: "t8-ns1", + recursive: true, + fieldSelector: fields.ParseSelectorOrDie("spec.nodeName=t8-bar2"), + indexFields: []string{"spec.nodeName"}, + watchTests: []*testWatchStruct{ + {baseNamespacedPod("t8-foo1", "t8-ns1"), false, ""}, + {baseNamespacedPodAssigned("t8-foo1", "t8-ns1", "t8-bar1"), false, ""}, + {baseNamespacedPodAssigned("t8-foo1", "t8-ns2", "t8-bar2"), false, ""}, + {baseNamespacedPodAssigned("t8-foo1", "t8-ns1", "t8-bar2"), true, watch.Added}, + }, + }, + { + name: "namespaced watch, request without name, request with namespace, field selector with spec.nodename to filter out watch", + requestedNamespace: "t9-ns2", + recursive: true, + fieldSelector: fields.ParseSelectorOrDie("spec.nodeName!=t9-bar1"), + indexFields: []string{"spec.nodeName"}, + watchTests: []*testWatchStruct{ + {baseNamespacedPod("t9-foo1", "t9-ns1"), false, ""}, + {baseNamespacedPod("t9-foo1", "t9-ns2"), true, watch.Added}, + {baseNamespacedPodAssigned("t9-foo1", "t9-ns2", "t9-bar1"), true, watch.Deleted}, + {baseNamespacedPodAssigned("t9-foo1", "t9-ns2", "t9-bar2"), true, watch.Added}, + }, + }, + { + name: "namespaced watch, request with name, request without namespace, field selector with metadata.name", + requestedName: "t10-foo1", + recursive: true, + fieldSelector: fields.ParseSelectorOrDie("metadata.name=t10-foo1"), + watchTests: []*testWatchStruct{ + {baseNamespacedPod("t10-foo1", "t10-ns1"), true, watch.Added}, + {baseNamespacedPod("t10-foo1", "t10-ns2"), true, watch.Added}, + {baseNamespacedPod("t10-foo2", "t10-ns1"), false, ""}, + {baseNamespacedPodUpdated("t10-foo1", "t10-ns1"), true, watch.Modified}, + {baseNamespacedPodAssigned("t10-foo1", "t10-ns1", "t10-bar1"), true, watch.Modified}, + }, + }, + { + name: "namespaced watch, request with name, request without namespace, field selector with metadata.name and metadata.namespace", + requestedName: "t11-foo1", + recursive: true, + fieldSelector: fields.SelectorFromSet(fields.Set{ + "metadata.name": "t11-foo1", + "metadata.namespace": "t11-ns1", + }), + watchTests: []*testWatchStruct{ + {baseNamespacedPod("t11-foo1", "t11-ns1"), true, watch.Added}, + {baseNamespacedPod("t11-foo2", "t11-ns1"), false, ""}, + {baseNamespacedPod("t11-foo1", "t11-ns2"), false, ""}, + {baseNamespacedPodUpdated("t11-foo1", "t11-ns1"), true, watch.Modified}, + {baseNamespacedPodAssigned("t11-foo1", "t11-ns1", "t11-bar1"), true, watch.Modified}, + }, + }, + { + name: "namespaced watch, request with name, request without namespace, field selector with metadata.name and spec.nodeName", + requestedName: "t12-foo1", + recursive: true, + fieldSelector: fields.SelectorFromSet(fields.Set{ + "metadata.name": "t12-foo1", + "spec.nodeName": "t12-bar1", + }), + indexFields: []string{"spec.nodeName"}, + watchTests: []*testWatchStruct{ + {baseNamespacedPod("t12-foo1", "t12-ns1"), false, ""}, + {baseNamespacedPodUpdated("t12-foo1", "t12-ns1"), false, ""}, + {baseNamespacedPodAssigned("t12-foo1", "t12-ns1", "t12-bar1"), true, watch.Added}, + }, + }, + { + name: "namespaced watch, request with name, request without namespace, field selector with metadata.name, and with spec.nodeName to filter out watch", + requestedName: "t15-foo1", + recursive: true, + fieldSelector: fields.AndSelectors( + fields.ParseSelectorOrDie("spec.nodeName!=t15-bar1"), + fields.SelectorFromSet(fields.Set{"metadata.name": "t15-foo1"}), + ), + indexFields: []string{"spec.nodeName"}, + watchTests: []*testWatchStruct{ + {baseNamespacedPod("t15-foo1", "t15-ns1"), true, watch.Added}, + {baseNamespacedPod("t15-foo2", "t15-ns1"), false, ""}, + {baseNamespacedPodUpdated("t15-foo1", "t15-ns1"), true, watch.Modified}, + {baseNamespacedPodAssigned("t15-foo1", "t15-ns1", "t15-bar1"), true, watch.Deleted}, + {baseNamespacedPodAssigned("t15-foo1", "t15-ns1", "t15-bar2"), true, watch.Added}, + }, + }, + { + name: "namespaced watch, request with name, request with namespace, with field selector metadata.name", + requestedName: "t16-foo1", + requestedNamespace: "t16-ns1", + fieldSelector: fields.ParseSelectorOrDie("metadata.name=t16-foo1"), + watchTests: []*testWatchStruct{ + {baseNamespacedPod("t16-foo1", "t16-ns1"), true, watch.Added}, + {baseNamespacedPod("t16-foo2", "t16-ns1"), false, ""}, + {baseNamespacedPodUpdated("t16-foo1", "t16-ns1"), true, watch.Modified}, + {baseNamespacedPodAssigned("t16-foo1", "t16-ns1", "t16-bar1"), true, watch.Modified}, + }, + }, + { + name: "namespaced watch, request with name, request with namespace, with field selector metadata.name and metadata.namespace", + requestedName: "t17-foo2", + requestedNamespace: "t17-ns1", + fieldSelector: fields.SelectorFromSet(fields.Set{ + "metadata.name": "t17-foo2", + "metadata.namespace": "t17-ns1", + }), + watchTests: []*testWatchStruct{ + {baseNamespacedPod("t17-foo1", "t17-ns1"), false, ""}, + {baseNamespacedPod("t17-foo2", "t17-ns1"), true, watch.Added}, + {baseNamespacedPodUpdated("t17-foo1", "t17-ns1"), false, ""}, + {baseNamespacedPodAssigned("t17-foo2", "t17-ns1", "t17-bar1"), true, watch.Modified}, + }, + }, + { + name: "namespaced watch, request with name, request with namespace, with field selector metadata.name, metadata.namespace and spec.nodename", + requestedName: "t18-foo1", + requestedNamespace: "t18-ns1", + fieldSelector: fields.SelectorFromSet(fields.Set{ + "metadata.name": "t18-foo1", + "metadata.namespace": "t18-ns1", + "spec.nodeName": "t18-bar1", + }), + indexFields: []string{"spec.nodeName"}, + watchTests: []*testWatchStruct{ + {baseNamespacedPod("t18-foo1", "t18-ns1"), false, ""}, + {baseNamespacedPod("t18-foo2", "t18-ns1"), false, ""}, + {baseNamespacedPod("t18-foo1", "t18-ns2"), false, ""}, + {baseNamespacedPodUpdated("t18-foo1", "t18-ns1"), false, ""}, + {baseNamespacedPodAssigned("t18-foo1", "t18-ns1", "t18-bar1"), true, watch.Added}, + }, + }, + { + name: "namespaced watch, request with name, request with namespace, with field selector metadata.name, metadata.namespace, and with spec.nodename to filter out watch", + requestedName: "t19-foo2", + requestedNamespace: "t19-ns1", + fieldSelector: fields.AndSelectors( + fields.ParseSelectorOrDie("spec.nodeName!=t19-bar1"), + fields.SelectorFromSet(fields.Set{"metadata.name": "t19-foo2", "metadata.namespace": "t19-ns1"}), + ), + indexFields: []string{"spec.nodeName"}, + watchTests: []*testWatchStruct{ + {baseNamespacedPod("t19-foo1", "t19-ns1"), false, ""}, + {baseNamespacedPod("t19-foo2", "t19-ns2"), false, ""}, + {baseNamespacedPod("t19-foo2", "t19-ns1"), true, watch.Added}, + {baseNamespacedPodUpdated("t19-foo2", "t19-ns1"), true, watch.Modified}, + {baseNamespacedPodAssigned("t19-foo2", "t19-ns1", "t19-bar1"), true, watch.Deleted}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + requestInfo := &genericapirequest.RequestInfo{} + requestInfo.Name = tt.requestedName + requestInfo.Namespace = tt.requestedNamespace + ctx = genericapirequest.WithRequestInfo(ctx, requestInfo) + ctx = genericapirequest.WithNamespace(ctx, tt.requestedNamespace) + + watchKey := "/pods" + if tt.requestedNamespace != "" { + watchKey += "/" + tt.requestedNamespace + if tt.requestedName != "" { + watchKey += "/" + tt.requestedName + } + } + + predicate := createPodPredicate(tt.fieldSelector, true, tt.indexFields) + + list := &example.PodList{} + opts := storage.ListOptions{ + ResourceVersion: "", + Predicate: predicate, + Recursive: true, + } + if err := store.GetList(ctx, "/pods", opts, list); err != nil { + t.Errorf("Unexpected error: %v", err) + } + + opts.ResourceVersion = list.ResourceVersion + opts.Recursive = tt.recursive + + w, err := store.Watch(ctx, watchKey, opts) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + + currentObjs := map[string]*example.Pod{} + for _, watchTest := range tt.watchTests { + out := &example.Pod{} + key := "pods/" + watchTest.obj.Namespace + "/" + watchTest.obj.Name + err := store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( + func(runtime.Object) (runtime.Object, error) { + obj := watchTest.obj.DeepCopy() + return obj, nil + }), nil) + if err != nil { + t.Fatalf("GuaranteedUpdate failed: %v", err) + } + + expectObj := out + podIdentifier := watchTest.obj.Namespace + "/" + watchTest.obj.Name + if watchTest.watchType == watch.Deleted { + expectObj = currentObjs[podIdentifier] + expectObj.ResourceVersion = out.ResourceVersion + delete(currentObjs, podIdentifier) + } else { + currentObjs[podIdentifier] = out + } + if watchTest.expectEvent { + testCheckResult(t, watchTest.watchType, w, expectObj) + } + } + w.Stop() + testCheckStop(t, w) + }) + } +} + type testWatchStruct struct { obj *example.Pod expectEvent bool watchType watch.EventType } + +func createPodPredicate(field fields.Selector, namespaceScoped bool, indexField []string) storage.SelectionPredicate { + return storage.SelectionPredicate{ + Label: labels.Everything(), + Field: field, + GetAttrs: determinePodGetAttrFunc(namespaceScoped, indexField), + IndexFields: indexField, + } +} + +func determinePodGetAttrFunc(namespaceScoped bool, indexField []string) storage.AttrFunc { + if indexField != nil { + if namespaceScoped { + return namespacedScopedNodeNameAttrFunc + } + return clusterScopedNodeNameAttrFunc + } + if namespaceScoped { + return storage.DefaultNamespaceScopedAttr + } + return storage.DefaultClusterScopedAttr +} + +func namespacedScopedNodeNameAttrFunc(obj runtime.Object) (labels.Set, fields.Set, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{ + "spec.nodeName": pod.Spec.NodeName, + "metadata.name": pod.ObjectMeta.Name, + "metadata.namespace": pod.ObjectMeta.Namespace, + }, nil +} + +func clusterScopedNodeNameAttrFunc(obj runtime.Object) (labels.Set, fields.Set, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{ + "spec.nodeName": pod.Spec.NodeName, + "metadata.name": pod.ObjectMeta.Name, + }, nil +} + +func basePod(podName string) *example.Pod { + return baseNamespacedPod(podName, "") +} + +func basePodUpdated(podName string) *example.Pod { + return baseNamespacedPodUpdated(podName, "") +} + +func basePodAssigned(podName, nodeName string) *example.Pod { + return baseNamespacedPodAssigned(podName, "", nodeName) +} + +func baseNamespacedPod(podName, namespace string) *example.Pod { + return &example.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: podName, Namespace: namespace}, + } +} + +func baseNamespacedPodUpdated(podName, namespace string) *example.Pod { + return &example.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: podName, Namespace: namespace}, + Status: example.PodStatus{Phase: "Running"}, + } +} + +func baseNamespacedPodAssigned(podName, namespace, nodeName string) *example.Pod { + return &example.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: podName, Namespace: namespace}, + Spec: example.PodSpec{NodeName: nodeName}, + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index 0e4b39e04e0..d4c0a716efb 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -179,6 +179,18 @@ func TestList(t *testing.T) { storagetesting.RunTestList(ctx, t, cacher, true) } +func TestClusterScopedWatch(t *testing.T) { + ctx, cacher, terminate := testSetup(t, withClusterScopedKeyFunc, withSpecNodeNameIndexerFuncs) + t.Cleanup(terminate) + storagetesting.TestClusterScopedWatch(ctx, t, cacher) +} + +func TestNamespaceScopedWatch(t *testing.T) { + ctx, cacher, terminate := testSetup(t, withSpecNodeNameIndexerFuncs) + t.Cleanup(terminate) + storagetesting.TestNamespaceScopedWatch(ctx, t, cacher) +} + func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) { _, _, line, _ := goruntime.Caller(1) select { @@ -759,6 +771,7 @@ type tearDownFunc func() type setupOptions struct { resourcePrefix string keyFunc func(runtime.Object) (string, error) + indexerFuncs map[string]storage.IndexerFunc clock clock.Clock } @@ -772,6 +785,24 @@ func withDefaults(options *setupOptions) { options.clock = clock.RealClock{} } +func withClusterScopedKeyFunc(options *setupOptions) { + options.keyFunc = func(obj runtime.Object) (string, error) { + return storage.NoNamespaceKeyFunc(options.resourcePrefix, obj) + } +} + +func withSpecNodeNameIndexerFuncs(options *setupOptions) { + options.indexerFuncs = map[string]storage.IndexerFunc{ + "spec.nodeName": func(obj runtime.Object) string { + pod, ok := obj.(*example.Pod) + if !ok { + return "" + } + return pod.Spec.NodeName + }, + } +} + func testSetup(t *testing.T, opts ...setupOption) (context.Context, *cacherstorage.Cacher, tearDownFunc) { setupOpts := setupOptions{} opts = append([]setupOption{withDefaults}, opts...) @@ -795,6 +826,7 @@ func testSetup(t *testing.T, opts ...setupOption) (context.Context, *cacherstora GetAttrsFunc: GetPodAttrs, NewFunc: newPod, NewListFunc: newPodList, + IndexerFuncs: setupOpts.indexerFuncs, Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), Clock: setupOpts.clock, }