From 1d9bc58328cb45164bc20167db613fd0eb60fb9f Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Fri, 3 Jun 2016 15:23:26 +0200 Subject: [PATCH] Extend Filter interface with Trigger() and use it for pods and nodes --- examples/apiserver/rest/reststorage.go | 2 +- federation/registry/cluster/etcd/etcd.go | 10 +- pkg/registry/certificates/etcd/etcd.go | 11 +- pkg/registry/clusterrole/etcd/etcd.go | 2 + pkg/registry/clusterrolebinding/etcd/etcd.go | 2 + pkg/registry/configmap/etcd/etcd.go | 6 +- pkg/registry/controller/etcd/etcd.go | 10 +- pkg/registry/daemonset/etcd/etcd.go | 10 +- pkg/registry/deployment/etcd/etcd.go | 9 +- pkg/registry/endpoint/etcd/etcd.go | 10 +- pkg/registry/generic/matcher.go | 34 +++- .../generic/registry/storage_factory.go | 16 +- pkg/registry/generic/registry/store.go | 2 +- pkg/registry/generic/registry/store_test.go | 8 + pkg/registry/generic/storage_decorator.go | 6 +- .../horizontalpodautoscaler/etcd/etcd.go | 10 +- pkg/registry/ingress/etcd/etcd.go | 10 +- pkg/registry/job/etcd/etcd.go | 10 +- pkg/registry/limitrange/etcd/etcd.go | 10 +- pkg/registry/namespace/etcd/etcd.go | 9 +- pkg/registry/networkpolicy/etcd/etcd.go | 10 +- pkg/registry/node/etcd/etcd.go | 8 +- pkg/registry/node/strategy.go | 8 + pkg/registry/persistentvolume/etcd/etcd.go | 10 +- .../persistentvolumeclaim/etcd/etcd.go | 10 +- pkg/registry/petset/etcd/etcd.go | 10 +- pkg/registry/pod/etcd/etcd.go | 9 +- pkg/registry/pod/strategy.go | 8 + pkg/registry/poddisruptionbudget/etcd/etcd.go | 10 +- pkg/registry/podsecuritypolicy/etcd/etcd.go | 10 +- pkg/registry/podtemplate/etcd/etcd.go | 10 +- pkg/registry/replicaset/etcd/etcd.go | 10 +- pkg/registry/resourcequota/etcd/etcd.go | 10 +- pkg/registry/role/etcd/etcd.go | 2 + pkg/registry/rolebinding/etcd/etcd.go | 2 + pkg/registry/secret/etcd/etcd.go | 10 +- pkg/registry/service/etcd/etcd.go | 10 +- pkg/registry/serviceaccount/etcd/etcd.go | 10 +- pkg/storage/cacher.go | 166 +++++++++++++++--- pkg/storage/cacher_test.go | 2 +- pkg/storage/etcd/etcd_helper_test.go | 2 +- pkg/storage/etcd/etcd_watcher_test.go | 6 +- pkg/storage/etcd3/store_test.go | 13 +- pkg/storage/etcd3/watcher_test.go | 8 +- pkg/storage/interfaces.go | 25 ++- pkg/storage/util.go | 22 ++- 46 files changed, 511 insertions(+), 77 deletions(-) diff --git a/examples/apiserver/rest/reststorage.go b/examples/apiserver/rest/reststorage.go index 4a4b9ef2b73..20e6e0dc1ea 100644 --- a/examples/apiserver/rest/reststorage.go +++ b/examples/apiserver/rest/reststorage.go @@ -38,7 +38,7 @@ func NewREST(s storage.Interface, storageDecorator generic.StorageDecorator) *RE // Usually you should reuse your RESTCreateStrategy. strategy := &NotNamespaceScoped{} storageInterface := storageDecorator( - s, 100, &testgroup.TestType{}, prefix, strategy, newListFunc) + s, 100, &testgroup.TestType{}, prefix, strategy, newListFunc, storage.NoTriggerPublisher) store := ®istry.Store{ NewFunc: func() runtime.Object { return &testgroup.TestType{} }, // NewListFunc returns an object capable of storing results of an etcd list. diff --git a/federation/registry/cluster/etcd/etcd.go b/federation/registry/cluster/etcd/etcd.go index a49457623a4..38f2cc4abf2 100644 --- a/federation/registry/cluster/etcd/etcd.go +++ b/federation/registry/cluster/etcd/etcd.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic" "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) type REST struct { @@ -49,7 +50,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { newListFunc := func() runtime.Object { return &federation.ClusterList{} } storageInterface := opts.Decorator( - opts.Storage, 100, &federation.Cluster{}, prefix, cluster.Strategy, newListFunc) + opts.Storage, + 100, + &federation.Cluster{}, + prefix, + cluster.Strategy, + newListFunc, + storage.NoTriggerPublisher, + ) store := ®istry.Store{ NewFunc: func() runtime.Object { return &federation.Cluster{} }, diff --git a/pkg/registry/certificates/etcd/etcd.go b/pkg/registry/certificates/etcd/etcd.go index 6ae90404b00..561d81c2c65 100644 --- a/pkg/registry/certificates/etcd/etcd.go +++ b/pkg/registry/certificates/etcd/etcd.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic" "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) // REST implements a RESTStorage for CertificateSigningRequest against etcd @@ -39,7 +40,15 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST, *ApprovalREST) { prefix := "/certificatesigningrequests" newListFunc := func() runtime.Object { return &certificates.CertificateSigningRequestList{} } - storageInterface := opts.Decorator(opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.CertificateSigningRequests), &certificates.CertificateSigningRequest{}, prefix, csrregistry.Strategy, newListFunc) + storageInterface := opts.Decorator( + opts.Storage, + cachesize.GetWatchCacheSizeByResource(cachesize.CertificateSigningRequests), + &certificates.CertificateSigningRequest{}, + prefix, + csrregistry.Strategy, + newListFunc, + storage.NoTriggerPublisher, + ) store := ®istry.Store{ NewFunc: func() runtime.Object { return &certificates.CertificateSigningRequest{} }, diff --git a/pkg/registry/clusterrole/etcd/etcd.go b/pkg/registry/clusterrole/etcd/etcd.go index d62a15a7b17..df3f2e5b8a1 100644 --- a/pkg/registry/clusterrole/etcd/etcd.go +++ b/pkg/registry/clusterrole/etcd/etcd.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic" "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) // REST implements a RESTStorage for ClusterRole against etcd @@ -43,6 +44,7 @@ func NewREST(opts generic.RESTOptions) *REST { prefix, clusterrole.Strategy, newListFunc, + storage.NoTriggerPublisher, ) store := ®istry.Store{ diff --git a/pkg/registry/clusterrolebinding/etcd/etcd.go b/pkg/registry/clusterrolebinding/etcd/etcd.go index 3bce9a9841f..8d053ea1211 100644 --- a/pkg/registry/clusterrolebinding/etcd/etcd.go +++ b/pkg/registry/clusterrolebinding/etcd/etcd.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic" "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) // REST implements a RESTStorage for ClusterRoleBinding against etcd @@ -43,6 +44,7 @@ func NewREST(opts generic.RESTOptions) *REST { prefix, clusterrolebinding.Strategy, newListFunc, + storage.NoTriggerPublisher, ) store := ®istry.Store{ diff --git a/pkg/registry/configmap/etcd/etcd.go b/pkg/registry/configmap/etcd/etcd.go index 88e5b0eee6a..83c7f7aaaff 100644 --- a/pkg/registry/configmap/etcd/etcd.go +++ b/pkg/registry/configmap/etcd/etcd.go @@ -20,9 +20,9 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/registry/configmap" "k8s.io/kubernetes/pkg/registry/generic" - "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/registry/generic/registry" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) // REST implements a RESTStorage for ConfigMap against etcd @@ -36,7 +36,7 @@ func NewREST(opts generic.RESTOptions) *REST { newListFunc := func() runtime.Object { return &api.ConfigMapList{} } storageInterface := opts.Decorator( - opts.Storage, 100, &api.ConfigMap{}, prefix, configmap.Strategy, newListFunc) + opts.Storage, 100, &api.ConfigMap{}, prefix, configmap.Strategy, newListFunc, storage.NoTriggerPublisher) store := ®istry.Store{ NewFunc: func() runtime.Object { diff --git a/pkg/registry/controller/etcd/etcd.go b/pkg/registry/controller/etcd/etcd.go index f38c8b4a1dc..b600118b52f 100644 --- a/pkg/registry/controller/etcd/etcd.go +++ b/pkg/registry/controller/etcd/etcd.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic" "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) // ControllerStorage includes dummy storage for Replication Controllers and for Scale subresource. @@ -62,7 +63,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { newListFunc := func() runtime.Object { return &api.ReplicationControllerList{} } storageInterface := opts.Decorator( - opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Controllers), &api.ReplicationController{}, prefix, controller.Strategy, newListFunc) + opts.Storage, + cachesize.GetWatchCacheSizeByResource(cachesize.Controllers), + &api.ReplicationController{}, + prefix, + controller.Strategy, + newListFunc, + storage.NoTriggerPublisher, + ) store := ®istry.Store{ NewFunc: func() runtime.Object { return &api.ReplicationController{} }, diff --git a/pkg/registry/daemonset/etcd/etcd.go b/pkg/registry/daemonset/etcd/etcd.go index c54b394e19c..42fb837d30c 100644 --- a/pkg/registry/daemonset/etcd/etcd.go +++ b/pkg/registry/daemonset/etcd/etcd.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic" "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) // rest implements a RESTStorage for DaemonSets against etcd @@ -38,7 +39,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { newListFunc := func() runtime.Object { return &extensions.DaemonSetList{} } storageInterface := opts.Decorator( - opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Daemonsets), &extensions.DaemonSet{}, prefix, daemonset.Strategy, newListFunc) + opts.Storage, + cachesize.GetWatchCacheSizeByResource(cachesize.Daemonsets), + &extensions.DaemonSet{}, + prefix, + daemonset.Strategy, + newListFunc, + storage.NoTriggerPublisher, + ) store := ®istry.Store{ NewFunc: func() runtime.Object { return &extensions.DaemonSet{} }, diff --git a/pkg/registry/deployment/etcd/etcd.go b/pkg/registry/deployment/etcd/etcd.go index ad0284ed818..42de475acb7 100644 --- a/pkg/registry/deployment/etcd/etcd.go +++ b/pkg/registry/deployment/etcd/etcd.go @@ -63,7 +63,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST, *RollbackREST) { newListFunc := func() runtime.Object { return &extensions.DeploymentList{} } storageInterface := opts.Decorator( - opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Deployments), &extensions.Deployment{}, prefix, deployment.Strategy, newListFunc) + opts.Storage, + cachesize.GetWatchCacheSizeByResource(cachesize.Deployments), + &extensions.Deployment{}, + prefix, + deployment.Strategy, + newListFunc, + storage.NoTriggerPublisher, + ) store := ®istry.Store{ NewFunc: func() runtime.Object { return &extensions.Deployment{} }, diff --git a/pkg/registry/endpoint/etcd/etcd.go b/pkg/registry/endpoint/etcd/etcd.go index d749de7356c..c90074dcf39 100644 --- a/pkg/registry/endpoint/etcd/etcd.go +++ b/pkg/registry/endpoint/etcd/etcd.go @@ -23,6 +23,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic" "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) type REST struct { @@ -35,7 +36,14 @@ func NewREST(opts generic.RESTOptions) *REST { newListFunc := func() runtime.Object { return &api.EndpointsList{} } storageInterface := opts.Decorator( - opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Endpoints), &api.Endpoints{}, prefix, endpoint.Strategy, newListFunc) + opts.Storage, + cachesize.GetWatchCacheSizeByResource(cachesize.Endpoints), + &api.Endpoints{}, + prefix, + endpoint.Strategy, + newListFunc, + storage.NoTriggerPublisher, + ) store := ®istry.Store{ NewFunc: func() runtime.Object { return &api.Endpoints{} }, diff --git a/pkg/registry/generic/matcher.go b/pkg/registry/generic/matcher.go index a3ca2bddfad..3c1b7ecaaeb 100644 --- a/pkg/registry/generic/matcher.go +++ b/pkg/registry/generic/matcher.go @@ -21,6 +21,7 @@ import ( "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) // AttrFunc returns label and field sets for List or Watch to compare against, or an error. @@ -50,9 +51,10 @@ func MergeFieldsSets(source fields.Set, fragment fields.Set) fields.Set { // SelectionPredicate implements a generic predicate that can be passed to // GenericRegistry's List or Watch methods. Implements the Matcher interface. type SelectionPredicate struct { - Label labels.Selector - Field fields.Selector - GetAttrs AttrFunc + Label labels.Selector + Field fields.Selector + GetAttrs AttrFunc + IndexFields []string } // Matches returns true if the given object's labels and fields (as @@ -79,6 +81,20 @@ func (s *SelectionPredicate) MatchesSingle() (string, bool) { return "", false } +// For any index defined by IndexFields, if a matcher can match only (a subset) +// of objects that return for a given index, a pair (, ) +// wil be returned. +// TODO: Consider supporting also labels. +func (s *SelectionPredicate) MatcherIndex() []storage.MatchValue { + var result []storage.MatchValue + for _, field := range s.IndexFields { + if value, ok := s.Field.RequiresExactMatch(field); ok { + result = append(result, storage.MatchValue{IndexName: field, Value: value}) + } + } + return result +} + // Matcher can return true if an object matches the Matcher's selection // criteria. If it is known that the matcher will match only a single object // then MatchesSingle should return the key of that object and true. This is an @@ -93,9 +109,10 @@ type Matcher interface { // include the object's namespace. MatchesSingle() (key string, matchesSingleObject bool) - // TODO: when we start indexing objects, add something like the below: - // MatchesIndices() (indexName []string, indexValue []string) - // where indexName/indexValue are the same length. + // For any known index, if a matcher can match only (a subset) of objects + // that return for a given index, a pair (, ) + // will be returned. + MatcherIndex() []storage.MatchValue } // MatcherFunc makes a matcher from the provided function. For easy definition @@ -117,6 +134,11 @@ func (m matcherFunc) MatchesSingle() (string, bool) { return "", false } +// MatcherIndex always returns empty list. +func (m matcherFunc) MatcherIndex() []storage.MatchValue { + return nil +} + // MatchOnKey returns a matcher that will send only the object matching key // through the matching function f. For testing! // Note: use SelectionPredicate above for real code! diff --git a/pkg/registry/generic/registry/storage_factory.go b/pkg/registry/generic/registry/storage_factory.go index ed4066902a4..3b96e77c135 100644 --- a/pkg/registry/generic/registry/storage_factory.go +++ b/pkg/registry/generic/registry/storage_factory.go @@ -30,15 +30,17 @@ func StorageWithCacher( objectType runtime.Object, resourcePrefix string, scopeStrategy rest.NamespaceScopedStrategy, - newListFunc func() runtime.Object) storage.Interface { + newListFunc func() runtime.Object, + triggerFunc storage.TriggerPublisherFunc) storage.Interface { config := storage.CacherConfig{ - CacheCapacity: capacity, - Storage: storageInterface, - Versioner: etcdstorage.APIObjectVersioner{}, - Type: objectType, - ResourcePrefix: resourcePrefix, - NewListFunc: newListFunc, + CacheCapacity: capacity, + Storage: storageInterface, + Versioner: etcdstorage.APIObjectVersioner{}, + Type: objectType, + ResourcePrefix: resourcePrefix, + NewListFunc: newListFunc, + TriggerPublisherFunc: triggerFunc, } if scopeStrategy.NamespaceScoped() { config.KeyFunc = func(obj runtime.Object) (string, error) { diff --git a/pkg/registry/generic/registry/store.go b/pkg/registry/generic/registry/store.go index c38a696aa18..9234e006ea3 100644 --- a/pkg/registry/generic/registry/store.go +++ b/pkg/registry/generic/registry/store.go @@ -828,7 +828,7 @@ func (e *Store) createFilter(m generic.Matcher) storage.Filter { } return matches } - return storage.NewSimpleFilter(filterFunc) + return storage.NewSimpleFilter(filterFunc, m.MatcherIndex) } // calculateTTL is a helper for retrieving the updated TTL for an object or returning an error diff --git a/pkg/registry/generic/registry/store_test.go b/pkg/registry/generic/registry/store_test.go index 81381389b09..3fe6ae207db 100644 --- a/pkg/registry/generic/registry/store_test.go +++ b/pkg/registry/generic/registry/store_test.go @@ -105,6 +105,10 @@ func (sm setMatcher) MatchesSingle() (string, bool) { return "", false } +func (sm setMatcher) MatcherIndex() []storage.MatchValue { + return nil +} + // everythingMatcher matches everything type everythingMatcher struct{} @@ -116,6 +120,10 @@ func (everythingMatcher) MatchesSingle() (string, bool) { return "", false } +func (everythingMatcher) MatcherIndex() []storage.MatchValue { + return nil +} + func TestStoreList(t *testing.T) { podA := &api.Pod{ ObjectMeta: api.ObjectMeta{Namespace: "test", Name: "bar"}, diff --git a/pkg/registry/generic/storage_decorator.go b/pkg/registry/generic/storage_decorator.go index 79198663a01..965a272e277 100644 --- a/pkg/registry/generic/storage_decorator.go +++ b/pkg/registry/generic/storage_decorator.go @@ -30,7 +30,8 @@ type StorageDecorator func( objectType runtime.Object, resourcePrefix string, scopeStrategy rest.NamespaceScopedStrategy, - newListFunc func() runtime.Object) storage.Interface + newListFunc func() runtime.Object, + trigger storage.TriggerPublisherFunc) storage.Interface // Returns given 'storageInterface' without any decoration. func UndecoratedStorage( @@ -39,6 +40,7 @@ func UndecoratedStorage( objectType runtime.Object, resourcePrefix string, scopeStrategy rest.NamespaceScopedStrategy, - newListFunc func() runtime.Object) storage.Interface { + newListFunc func() runtime.Object, + trigger storage.TriggerPublisherFunc) storage.Interface { return storageInterface } diff --git a/pkg/registry/horizontalpodautoscaler/etcd/etcd.go b/pkg/registry/horizontalpodautoscaler/etcd/etcd.go index 84bdab081a8..d4d224d1722 100644 --- a/pkg/registry/horizontalpodautoscaler/etcd/etcd.go +++ b/pkg/registry/horizontalpodautoscaler/etcd/etcd.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/registry/horizontalpodautoscaler" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) type REST struct { @@ -37,7 +38,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { newListFunc := func() runtime.Object { return &autoscaling.HorizontalPodAutoscalerList{} } storageInterface := opts.Decorator( - opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.HorizontalPodAutoscalers), &autoscaling.HorizontalPodAutoscaler{}, prefix, horizontalpodautoscaler.Strategy, newListFunc) + opts.Storage, + cachesize.GetWatchCacheSizeByResource(cachesize.HorizontalPodAutoscalers), + &autoscaling.HorizontalPodAutoscaler{}, + prefix, + horizontalpodautoscaler.Strategy, + newListFunc, + storage.NoTriggerPublisher, + ) store := ®istry.Store{ NewFunc: func() runtime.Object { return &autoscaling.HorizontalPodAutoscaler{} }, diff --git a/pkg/registry/ingress/etcd/etcd.go b/pkg/registry/ingress/etcd/etcd.go index 0afc8885592..b3fab226cc6 100644 --- a/pkg/registry/ingress/etcd/etcd.go +++ b/pkg/registry/ingress/etcd/etcd.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic/registry" ingress "k8s.io/kubernetes/pkg/registry/ingress" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) // rest implements a RESTStorage for replication controllers against etcd @@ -38,7 +39,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { newListFunc := func() runtime.Object { return &extensions.IngressList{} } storageInterface := opts.Decorator( - opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Ingress), &extensions.Ingress{}, prefix, ingress.Strategy, newListFunc) + opts.Storage, + cachesize.GetWatchCacheSizeByResource(cachesize.Ingress), + &extensions.Ingress{}, + prefix, + ingress.Strategy, + newListFunc, + storage.NoTriggerPublisher, + ) store := ®istry.Store{ NewFunc: func() runtime.Object { return &extensions.Ingress{} }, diff --git a/pkg/registry/job/etcd/etcd.go b/pkg/registry/job/etcd/etcd.go index 77efe73b9ce..5d9313d85d6 100644 --- a/pkg/registry/job/etcd/etcd.go +++ b/pkg/registry/job/etcd/etcd.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/registry/job" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) // REST implements a RESTStorage for jobs against etcd @@ -38,7 +39,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { newListFunc := func() runtime.Object { return &batch.JobList{} } storageInterface := opts.Decorator( - opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Jobs), &batch.Job{}, prefix, job.Strategy, newListFunc) + opts.Storage, + cachesize.GetWatchCacheSizeByResource(cachesize.Jobs), + &batch.Job{}, + prefix, + job.Strategy, + newListFunc, + storage.NoTriggerPublisher, + ) store := ®istry.Store{ NewFunc: func() runtime.Object { return &batch.Job{} }, diff --git a/pkg/registry/limitrange/etcd/etcd.go b/pkg/registry/limitrange/etcd/etcd.go index f8386d6cd67..9765e48cdfe 100644 --- a/pkg/registry/limitrange/etcd/etcd.go +++ b/pkg/registry/limitrange/etcd/etcd.go @@ -23,6 +23,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/registry/limitrange" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) type REST struct { @@ -35,7 +36,14 @@ func NewREST(opts generic.RESTOptions) *REST { newListFunc := func() runtime.Object { return &api.LimitRangeList{} } storageInterface := opts.Decorator( - opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.LimitRanges), &api.LimitRange{}, prefix, limitrange.Strategy, newListFunc) + opts.Storage, + cachesize.GetWatchCacheSizeByResource(cachesize.LimitRanges), + &api.LimitRange{}, + prefix, + limitrange.Strategy, + newListFunc, + storage.NoTriggerPublisher, + ) store := ®istry.Store{ NewFunc: func() runtime.Object { return &api.LimitRange{} }, diff --git a/pkg/registry/namespace/etcd/etcd.go b/pkg/registry/namespace/etcd/etcd.go index 46cdc1a8aa6..4d722231c89 100644 --- a/pkg/registry/namespace/etcd/etcd.go +++ b/pkg/registry/namespace/etcd/etcd.go @@ -54,7 +54,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST, *FinalizeREST) { newListFunc := func() runtime.Object { return &api.NamespaceList{} } storageInterface := opts.Decorator( - opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Namespaces), &api.Namespace{}, prefix, namespace.Strategy, newListFunc) + opts.Storage, + cachesize.GetWatchCacheSizeByResource(cachesize.Namespaces), + &api.Namespace{}, + prefix, + namespace.Strategy, + newListFunc, + storage.NoTriggerPublisher, + ) store := ®istry.Store{ NewFunc: func() runtime.Object { return &api.Namespace{} }, diff --git a/pkg/registry/networkpolicy/etcd/etcd.go b/pkg/registry/networkpolicy/etcd/etcd.go index f0caacbcfa5..acc8818c325 100644 --- a/pkg/registry/networkpolicy/etcd/etcd.go +++ b/pkg/registry/networkpolicy/etcd/etcd.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/registry/networkpolicy" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) // rest implements a RESTStorage for network policies against etcd @@ -37,7 +38,14 @@ func NewREST(opts generic.RESTOptions) *REST { newListFunc := func() runtime.Object { return &extensionsapi.NetworkPolicyList{} } storageInterface := opts.Decorator( - opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.NetworkPolicys), &extensionsapi.NetworkPolicy{}, prefix, networkpolicy.Strategy, newListFunc) + opts.Storage, + cachesize.GetWatchCacheSizeByResource(cachesize.NetworkPolicys), + &extensionsapi.NetworkPolicy{}, + prefix, + networkpolicy.Strategy, + newListFunc, + storage.NoTriggerPublisher, + ) store := ®istry.Store{ NewFunc: func() runtime.Object { return &extensionsapi.NetworkPolicy{} }, diff --git a/pkg/registry/node/etcd/etcd.go b/pkg/registry/node/etcd/etcd.go index d983b917ede..371ad0dc451 100644 --- a/pkg/registry/node/etcd/etcd.go +++ b/pkg/registry/node/etcd/etcd.go @@ -70,7 +70,13 @@ func NewStorage(opts generic.RESTOptions, connection client.ConnectionInfoGetter newListFunc := func() runtime.Object { return &api.NodeList{} } storageInterface := opts.Decorator( - opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Nodes), &api.Node{}, prefix, node.Strategy, newListFunc) + opts.Storage, + cachesize.GetWatchCacheSizeByResource(cachesize.Nodes), + &api.Node{}, + prefix, + node.Strategy, + newListFunc, + node.NodeNameTriggerFunc) store := ®istry.Store{ NewFunc: func() runtime.Object { return &api.Node{} }, diff --git a/pkg/registry/node/strategy.go b/pkg/registry/node/strategy.go index 1dda98d2b46..83aef5e1671 100644 --- a/pkg/registry/node/strategy.go +++ b/pkg/registry/node/strategy.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/master/ports" "k8s.io/kubernetes/pkg/registry/generic" "k8s.io/kubernetes/pkg/runtime" + pkgstorage "k8s.io/kubernetes/pkg/storage" utilnet "k8s.io/kubernetes/pkg/util/net" nodeutil "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/util/validation/field" @@ -157,9 +158,16 @@ func MatchNode(label labels.Selector, field fields.Selector) generic.Matcher { } return labels.Set(nodeObj.ObjectMeta.Labels), NodeToSelectableFields(nodeObj), nil }, + IndexFields: []string{"metadata.name"}, } } +func NodeNameTriggerFunc(obj runtime.Object) []pkgstorage.MatchValue { + node := obj.(*api.Node) + result := pkgstorage.MatchValue{IndexName: "metadata.name", Value: node.ObjectMeta.Name} + return []pkgstorage.MatchValue{result} +} + // ResourceLocation returns an URL and transport which one can use to send traffic for the specified node. func ResourceLocation(getter ResourceGetter, connection client.ConnectionInfoGetter, proxyTransport http.RoundTripper, ctx api.Context, id string) (*url.URL, http.RoundTripper, error) { schemeReq, name, portReq, valid := utilnet.SplitSchemeNamePort(id) diff --git a/pkg/registry/persistentvolume/etcd/etcd.go b/pkg/registry/persistentvolume/etcd/etcd.go index a55f4c962bb..bdc73611855 100644 --- a/pkg/registry/persistentvolume/etcd/etcd.go +++ b/pkg/registry/persistentvolume/etcd/etcd.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/registry/persistentvolume" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) type REST struct { @@ -36,7 +37,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { newListFunc := func() runtime.Object { return &api.PersistentVolumeList{} } storageInterface := opts.Decorator( - opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.PersistentVolumes), &api.PersistentVolume{}, prefix, persistentvolume.Strategy, newListFunc) + opts.Storage, + cachesize.GetWatchCacheSizeByResource(cachesize.PersistentVolumes), + &api.PersistentVolume{}, + prefix, + persistentvolume.Strategy, + newListFunc, + storage.NoTriggerPublisher, + ) store := ®istry.Store{ NewFunc: func() runtime.Object { return &api.PersistentVolume{} }, diff --git a/pkg/registry/persistentvolumeclaim/etcd/etcd.go b/pkg/registry/persistentvolumeclaim/etcd/etcd.go index a896fb5ddb8..e78c0c83f26 100644 --- a/pkg/registry/persistentvolumeclaim/etcd/etcd.go +++ b/pkg/registry/persistentvolumeclaim/etcd/etcd.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/registry/persistentvolumeclaim" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) type REST struct { @@ -36,7 +37,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { newListFunc := func() runtime.Object { return &api.PersistentVolumeClaimList{} } storageInterface := opts.Decorator( - opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.PersistentVolumeClaims), &api.PersistentVolumeClaim{}, prefix, persistentvolumeclaim.Strategy, newListFunc) + opts.Storage, + cachesize.GetWatchCacheSizeByResource(cachesize.PersistentVolumeClaims), + &api.PersistentVolumeClaim{}, + prefix, + persistentvolumeclaim.Strategy, + newListFunc, + storage.NoTriggerPublisher, + ) store := ®istry.Store{ NewFunc: func() runtime.Object { return &api.PersistentVolumeClaim{} }, diff --git a/pkg/registry/petset/etcd/etcd.go b/pkg/registry/petset/etcd/etcd.go index 8a0c48b963a..bec07543a30 100644 --- a/pkg/registry/petset/etcd/etcd.go +++ b/pkg/registry/petset/etcd/etcd.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/registry/petset" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) // rest implements a RESTStorage for replication controllers against etcd @@ -38,7 +39,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { newListFunc := func() runtime.Object { return &appsapi.PetSetList{} } storageInterface := opts.Decorator( - opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.PetSet), &appsapi.PetSet{}, prefix, petset.Strategy, newListFunc) + opts.Storage, + cachesize.GetWatchCacheSizeByResource(cachesize.PetSet), + &appsapi.PetSet{}, + prefix, + petset.Strategy, + newListFunc, + storage.NoTriggerPublisher, + ) store := ®istry.Store{ NewFunc: func() runtime.Object { return &appsapi.PetSet{} }, diff --git a/pkg/registry/pod/etcd/etcd.go b/pkg/registry/pod/etcd/etcd.go index 385a399394a..98599594ada 100644 --- a/pkg/registry/pod/etcd/etcd.go +++ b/pkg/registry/pod/etcd/etcd.go @@ -61,7 +61,14 @@ func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTr newListFunc := func() runtime.Object { return &api.PodList{} } storageInterface := opts.Decorator( - opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Pods), &api.Pod{}, prefix, pod.Strategy, newListFunc) + opts.Storage, + cachesize.GetWatchCacheSizeByResource(cachesize.Pods), + &api.Pod{}, + prefix, + pod.Strategy, + newListFunc, + pod.NodeNameTriggerFunc, + ) store := ®istry.Store{ NewFunc: func() runtime.Object { return &api.Pod{} }, diff --git a/pkg/registry/pod/strategy.go b/pkg/registry/pod/strategy.go index e48775cf6bd..38ebda14571 100644 --- a/pkg/registry/pod/strategy.go +++ b/pkg/registry/pod/strategy.go @@ -33,6 +33,7 @@ import ( "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/registry/generic" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" utilnet "k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/pkg/util/validation/field" ) @@ -177,9 +178,16 @@ func MatchPod(label labels.Selector, field fields.Selector) generic.Matcher { } return podLabels, podFields, nil }, + IndexFields: []string{"spec.nodeName"}, } } +func NodeNameTriggerFunc(obj runtime.Object) []storage.MatchValue { + pod := obj.(*api.Pod) + result := storage.MatchValue{IndexName: "spec.nodeName", Value: pod.Spec.NodeName} + return []storage.MatchValue{result} +} + // PodToSelectableFields returns a field set that represents the object // TODO: fields are not labels, and the validation rules for them do not apply. func PodToSelectableFields(pod *api.Pod) fields.Set { diff --git a/pkg/registry/poddisruptionbudget/etcd/etcd.go b/pkg/registry/poddisruptionbudget/etcd/etcd.go index e2a7e6e30ed..3fd434828a8 100644 --- a/pkg/registry/poddisruptionbudget/etcd/etcd.go +++ b/pkg/registry/poddisruptionbudget/etcd/etcd.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/registry/poddisruptionbudget" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) // rest implements a RESTStorage for pod disruption budgets against etcd @@ -38,7 +39,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { newListFunc := func() runtime.Object { return &policyapi.PodDisruptionBudgetList{} } storageInterface := opts.Decorator( - opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.PodDisruptionBudget), &policyapi.PodDisruptionBudget{}, prefix, poddisruptionbudget.Strategy, newListFunc) + opts.Storage, + cachesize.GetWatchCacheSizeByResource(cachesize.PodDisruptionBudget), + &policyapi.PodDisruptionBudget{}, + prefix, + poddisruptionbudget.Strategy, + newListFunc, + storage.NoTriggerPublisher, + ) store := ®istry.Store{ NewFunc: func() runtime.Object { return &policyapi.PodDisruptionBudget{} }, diff --git a/pkg/registry/podsecuritypolicy/etcd/etcd.go b/pkg/registry/podsecuritypolicy/etcd/etcd.go index fd65d64b93d..dbabceb7a22 100644 --- a/pkg/registry/podsecuritypolicy/etcd/etcd.go +++ b/pkg/registry/podsecuritypolicy/etcd/etcd.go @@ -23,6 +23,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/registry/podsecuritypolicy" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) // REST implements a RESTStorage for PodSecurityPolicies against etcd. @@ -36,7 +37,14 @@ const Prefix = "/podsecuritypolicies" func NewREST(opts generic.RESTOptions) *REST { newListFunc := func() runtime.Object { return &extensions.PodSecurityPolicyList{} } storageInterface := opts.Decorator( - opts.Storage, 100, &extensions.PodSecurityPolicy{}, Prefix, podsecuritypolicy.Strategy, newListFunc) + opts.Storage, + 100, + &extensions.PodSecurityPolicy{}, + Prefix, + podsecuritypolicy.Strategy, + newListFunc, + storage.NoTriggerPublisher, + ) store := ®istry.Store{ NewFunc: func() runtime.Object { return &extensions.PodSecurityPolicy{} }, diff --git a/pkg/registry/podtemplate/etcd/etcd.go b/pkg/registry/podtemplate/etcd/etcd.go index 7da58ecd095..b3e29eee550 100644 --- a/pkg/registry/podtemplate/etcd/etcd.go +++ b/pkg/registry/podtemplate/etcd/etcd.go @@ -23,6 +23,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/registry/podtemplate" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) type REST struct { @@ -35,7 +36,14 @@ func NewREST(opts generic.RESTOptions) *REST { newListFunc := func() runtime.Object { return &api.PodTemplateList{} } storageInterface := opts.Decorator( - opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.PodTemplates), &api.PodTemplate{}, prefix, podtemplate.Strategy, newListFunc) + opts.Storage, + cachesize.GetWatchCacheSizeByResource(cachesize.PodTemplates), + &api.PodTemplate{}, + prefix, + podtemplate.Strategy, + newListFunc, + storage.NoTriggerPublisher, + ) store := ®istry.Store{ NewFunc: func() runtime.Object { return &api.PodTemplate{} }, diff --git a/pkg/registry/replicaset/etcd/etcd.go b/pkg/registry/replicaset/etcd/etcd.go index a0b20ef5c79..46ba79bab11 100644 --- a/pkg/registry/replicaset/etcd/etcd.go +++ b/pkg/registry/replicaset/etcd/etcd.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/registry/replicaset" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) // ReplicaSetStorage includes dummy storage for ReplicaSets and for Scale subresource. @@ -61,7 +62,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { newListFunc := func() runtime.Object { return &extensions.ReplicaSetList{} } storageInterface := opts.Decorator( - opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Replicasets), &extensions.ReplicaSet{}, prefix, replicaset.Strategy, newListFunc) + opts.Storage, + cachesize.GetWatchCacheSizeByResource(cachesize.Replicasets), + &extensions.ReplicaSet{}, + prefix, + replicaset.Strategy, + newListFunc, + storage.NoTriggerPublisher, + ) store := ®istry.Store{ NewFunc: func() runtime.Object { return &extensions.ReplicaSet{} }, diff --git a/pkg/registry/resourcequota/etcd/etcd.go b/pkg/registry/resourcequota/etcd/etcd.go index fbb5695a950..d635749a518 100644 --- a/pkg/registry/resourcequota/etcd/etcd.go +++ b/pkg/registry/resourcequota/etcd/etcd.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/registry/resourcequota" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) type REST struct { @@ -36,7 +37,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { newListFunc := func() runtime.Object { return &api.ResourceQuotaList{} } storageInterface := opts.Decorator( - opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.ResourceQuotas), &api.ResourceQuota{}, prefix, resourcequota.Strategy, newListFunc) + opts.Storage, + cachesize.GetWatchCacheSizeByResource(cachesize.ResourceQuotas), + &api.ResourceQuota{}, + prefix, + resourcequota.Strategy, + newListFunc, + storage.NoTriggerPublisher, + ) store := ®istry.Store{ NewFunc: func() runtime.Object { return &api.ResourceQuota{} }, diff --git a/pkg/registry/role/etcd/etcd.go b/pkg/registry/role/etcd/etcd.go index 026fb43f6d1..af9a4279523 100644 --- a/pkg/registry/role/etcd/etcd.go +++ b/pkg/registry/role/etcd/etcd.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/registry/role" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) // REST implements a RESTStorage for Role against etcd @@ -43,6 +44,7 @@ func NewREST(opts generic.RESTOptions) *REST { prefix, role.Strategy, newListFunc, + storage.NoTriggerPublisher, ) store := ®istry.Store{ diff --git a/pkg/registry/rolebinding/etcd/etcd.go b/pkg/registry/rolebinding/etcd/etcd.go index bc16622f08c..5383bc1136c 100644 --- a/pkg/registry/rolebinding/etcd/etcd.go +++ b/pkg/registry/rolebinding/etcd/etcd.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/registry/rolebinding" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) // REST implements a RESTStorage for RoleBinding against etcd @@ -43,6 +44,7 @@ func NewREST(opts generic.RESTOptions) *REST { prefix, rolebinding.Strategy, newListFunc, + storage.NoTriggerPublisher, ) store := ®istry.Store{ diff --git a/pkg/registry/secret/etcd/etcd.go b/pkg/registry/secret/etcd/etcd.go index 84d50ba6914..d191457e67d 100644 --- a/pkg/registry/secret/etcd/etcd.go +++ b/pkg/registry/secret/etcd/etcd.go @@ -23,6 +23,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/registry/secret" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) type REST struct { @@ -35,7 +36,14 @@ func NewREST(opts generic.RESTOptions) *REST { newListFunc := func() runtime.Object { return &api.SecretList{} } storageInterface := opts.Decorator( - opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Secrets), &api.Secret{}, prefix, secret.Strategy, newListFunc) + opts.Storage, + cachesize.GetWatchCacheSizeByResource(cachesize.Secrets), + &api.Secret{}, + prefix, + secret.Strategy, + newListFunc, + storage.NoTriggerPublisher, + ) store := ®istry.Store{ NewFunc: func() runtime.Object { return &api.Secret{} }, diff --git a/pkg/registry/service/etcd/etcd.go b/pkg/registry/service/etcd/etcd.go index ceeb96b79ab..e8b24fb3668 100644 --- a/pkg/registry/service/etcd/etcd.go +++ b/pkg/registry/service/etcd/etcd.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/registry/service" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) type REST struct { @@ -36,7 +37,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { newListFunc := func() runtime.Object { return &api.ServiceList{} } storageInterface := opts.Decorator( - opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Services), &api.Service{}, prefix, service.Strategy, newListFunc) + opts.Storage, + cachesize.GetWatchCacheSizeByResource(cachesize.Services), + &api.Service{}, + prefix, + service.Strategy, + newListFunc, + storage.NoTriggerPublisher, + ) store := ®istry.Store{ NewFunc: func() runtime.Object { return &api.Service{} }, diff --git a/pkg/registry/serviceaccount/etcd/etcd.go b/pkg/registry/serviceaccount/etcd/etcd.go index 6aa81777569..d2cb94aa6f9 100644 --- a/pkg/registry/serviceaccount/etcd/etcd.go +++ b/pkg/registry/serviceaccount/etcd/etcd.go @@ -23,6 +23,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/registry/serviceaccount" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" ) type REST struct { @@ -35,7 +36,14 @@ func NewREST(opts generic.RESTOptions) *REST { newListFunc := func() runtime.Object { return &api.ServiceAccountList{} } storageInterface := opts.Decorator( - opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.ServiceAccounts), &api.ServiceAccount{}, prefix, serviceaccount.Strategy, newListFunc) + opts.Storage, + cachesize.GetWatchCacheSizeByResource(cachesize.ServiceAccounts), + &api.ServiceAccount{}, + prefix, + serviceaccount.Strategy, + newListFunc, + storage.NoTriggerPublisher, + ) store := ®istry.Store{ NewFunc: func() runtime.Object { return &api.ServiceAccount{} }, diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index a0f4685eceb..d9c08e6719b 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -59,11 +59,67 @@ type CacherConfig struct { // KeyFunc is used to get a key in the underyling storage for a given object. KeyFunc func(runtime.Object) (string, error) + // TriggerPublisherFunc is used for optimizing amount of watchers that + // needs to process an incoming event. + TriggerPublisherFunc TriggerPublisherFunc + // NewList is a function that creates new empty object storing a list of // objects of type Type. NewListFunc func() runtime.Object } +type watchersMap map[int]*cacheWatcher + +func (wm watchersMap) addWatcher(w *cacheWatcher, number int) { + wm[number] = w +} + +func (wm watchersMap) deleteWatcher(number int) { + delete(wm, number) +} + +func (wm watchersMap) terminateAll() { + for key, watcher := range wm { + delete(wm, key) + watcher.stop() + } +} + +type indexedWatchers struct { + allWatchers watchersMap + valueWatchers map[string]watchersMap +} + +func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, value string, supported bool) { + if supported { + if _, ok := i.valueWatchers[value]; !ok { + i.valueWatchers[value] = watchersMap{} + } + i.valueWatchers[value].addWatcher(w, number) + } else { + i.allWatchers.addWatcher(w, number) + } +} + +func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool) { + if supported { + i.valueWatchers[value].deleteWatcher(number) + if len(i.valueWatchers[value]) == 0 { + delete(i.valueWatchers, value) + } + } else { + i.allWatchers.deleteWatcher(number) + } +} + +func (i *indexedWatchers) terminateAll() { + i.allWatchers.terminateAll() + for index, watchers := range i.valueWatchers { + watchers.terminateAll() + delete(i.valueWatchers, index) + } +} + // Cacher is responsible for serving WATCH and LIST requests for a given // resource from its internal cache and updating its cache in the background // based on the underlying storage contents. @@ -87,16 +143,20 @@ type Cacher struct { watchCache *watchCache reflector *cache.Reflector - // Registered watchers. - watcherIdx int - watchers map[int]*cacheWatcher - // Versioner is used to handle resource versions. versioner Versioner // keyFunc is used to get a key in the underyling storage for a given object. keyFunc func(runtime.Object) (string, error) + // triggerFunc is used for optimizing amount of watchers that needs to process + // an incoming event. + triggerFunc TriggerPublisherFunc + // watchers is mapping from the value of trigger function that a + // watcher is interested into the watchers + watcherIdx int + watchers indexedWatchers + // Handling graceful termination. stopLock sync.RWMutex stopped bool @@ -120,13 +180,18 @@ func NewCacherFromConfig(config CacherConfig) *Cacher { } cacher := &Cacher{ - ready: newReady(), - storage: config.Storage, - watchCache: watchCache, - reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0), - watchers: make(map[int]*cacheWatcher), - versioner: config.Versioner, - keyFunc: config.KeyFunc, + ready: newReady(), + storage: config.Storage, + watchCache: watchCache, + reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0), + versioner: config.Versioner, + keyFunc: config.KeyFunc, + triggerFunc: config.TriggerPublisherFunc, + watcherIdx: 0, + watchers: indexedWatchers{ + allWatchers: make(map[int]*cacheWatcher), + valueWatchers: make(map[string]watchersMap), + }, // We need to (potentially) stop both: // - wait.Until go-routine // - reflector.ListAndWatch @@ -223,10 +288,20 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, return newErrWatcher(err), nil } + triggerValue, triggerSupported := "", false + // TODO: Currently we assume that in a given Cacher object, any that is + // passed here is aware of exactly the same trigger (at most one). + // Thus, either 0 or 1 values will be returned. + if matchValues := filter.Trigger(); len(matchValues) > 0 { + triggerValue, triggerSupported = matchValues[0].Value, true + } + c.Lock() defer c.Unlock() - watcher := newCacheWatcher(watchRV, initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx)) - c.watchers[c.watcherIdx] = watcher + forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported) + watcher := newCacheWatcher(watchRV, initEvents, filterFunction(key, c.keyFunc, filter), forget) + + c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported) c.watcherIdx++ return watcher, nil } @@ -307,21 +382,68 @@ func (c *Cacher) Codec() runtime.Codec { return c.storage.Codec() } +func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) { + // TODO: Currently we assume that in a given Cacher object, its + // is aware of exactly the same trigger (at most one). Thus calling: + // c.triggerFunc() + // can return only 0 or 1 values. + // That means, that triggerValues itself may return up to 2 different values. + if c.triggerFunc == nil { + return nil, false + } + result := make([]string, 0, 2) + matchValues := c.triggerFunc(event.Object) + if len(matchValues) > 0 { + result = append(result, matchValues[0].Value) + } + if event.PrevObject == nil { + return result, len(result) > 0 + } + prevMatchValues := c.triggerFunc(event.PrevObject) + if len(prevMatchValues) > 0 { + if len(result) == 0 || result[0] != prevMatchValues[0].Value { + result = append(result, prevMatchValues[0].Value) + } + } + return result, len(result) > 0 +} + func (c *Cacher) processEvent(event watchCacheEvent) { + triggerValues, supported := c.triggerValues(&event) + c.Lock() defer c.Unlock() - for _, watcher := range c.watchers { + // Iterate over "allWatchers" no matter what the trigger function is. + for _, watcher := range c.watchers.allWatchers { watcher.add(event) } + if supported { + // Iterate over watchers interested in the given values of the trigger. + for _, triggerValue := range triggerValues { + for _, watcher := range c.watchers.valueWatchers[triggerValue] { + watcher.add(event) + } + } + } else { + // supported equal to false generally means that trigger function + // is not defined (or not aware of any indexes). In this case, + // watchers filters should generally also don't generate any + // trigger values, but can cause problems in case of some + // misconfiguration. Thus we paranoidly leave this branch. + + // Iterate over watchers interested in exact values for all values. + for _, watchers := range c.watchers.valueWatchers { + for _, watcher := range watchers { + watcher.add(event) + } + } + } } func (c *Cacher) terminateAllWatchers() { c.Lock() defer c.Unlock() - for key, watcher := range c.watchers { - delete(c.watchers, key) - watcher.stop() - } + c.watchers.terminateAll() } func (c *Cacher) isStopped() bool { @@ -338,15 +460,15 @@ func (c *Cacher) Stop() { c.stopWg.Wait() } -func forgetWatcher(c *Cacher, index int) func(bool) { +func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported bool) func(bool) { return func(lock bool) { if lock { c.Lock() defer c.Unlock() } - // It's possible that the watcher is already not in the map (e.g. in case of + // It's possible that the watcher is already not in the structure (e.g. in case of // simulaneous Stop() and terminateAllWatchers(), but it doesn't break anything. - delete(c.watchers, index) + c.watchers.deleteWatcher(index, triggerValue, triggerSupported) } } @@ -362,7 +484,7 @@ func filterFunction(key string, keyFunc func(runtime.Object) (string, error), fi } return filter.Filter(obj) } - return NewSimpleFilter(filterFunc) + return NewSimpleFilter(filterFunc, filter.Trigger) } // Returns resource version to which the underlying cache is synced. diff --git a/pkg/storage/cacher_test.go b/pkg/storage/cacher_test.go index 3a461d38368..51751491d52 100644 --- a/pkg/storage/cacher_test.go +++ b/pkg/storage/cacher_test.go @@ -340,7 +340,7 @@ func TestFiltering(t *testing.T) { } return selector.Matches(labels.Set(metadata.GetLabels())) } - filter := storage.NewSimpleFilter(filterFunc) + filter := storage.NewSimpleFilter(filterFunc, storage.NoTriggerFunc) watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, filter) if err != nil { t.Fatalf("Unexpected error: %v", err) diff --git a/pkg/storage/etcd/etcd_helper_test.go b/pkg/storage/etcd/etcd_helper_test.go index 0b6444845b7..c178fc860b5 100644 --- a/pkg/storage/etcd/etcd_helper_test.go +++ b/pkg/storage/etcd/etcd_helper_test.go @@ -158,7 +158,7 @@ func TestListFiltered(t *testing.T) { pod := obj.(*api.Pod) return pod.Name == "bar" } - filter := storage.NewSimpleFilter(filterFunc) + filter := storage.NewSimpleFilter(filterFunc, storage.NoTriggerFunc) var got api.PodList err := helper.List(context.TODO(), key, "", filter, &got) diff --git a/pkg/storage/etcd/etcd_watcher_test.go b/pkg/storage/etcd/etcd_watcher_test.go index 5c9ab590d50..5caa7eb7a71 100644 --- a/pkg/storage/etcd/etcd_watcher_test.go +++ b/pkg/storage/etcd/etcd_watcher_test.go @@ -56,6 +56,10 @@ func (f *firstLetterIsB) Filter(obj runtime.Object) bool { return obj.(*api.Pod).Name[0] == 'b' } +func (f *firstLetterIsB) Trigger() []storage.MatchValue { + return nil +} + func TestWatchInterpretations(t *testing.T) { codec := testapi.Default.Codec() // Declare some pods to make the test cases compact. @@ -230,7 +234,7 @@ func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) { filterFunc := func(obj runtime.Object) bool { return obj.(*api.Pod).Name != "bar" } - filter := storage.NewSimpleFilter(filterFunc) + filter := storage.NewSimpleFilter(filterFunc, storage.NoTriggerFunc) w := newEtcdWatcher(false, false, nil, filter, codec, versioner, nil, &fakeEtcdCache{}) eventChan := make(chan watch.Event, 1) diff --git a/pkg/storage/etcd3/store_test.go b/pkg/storage/etcd3/store_test.go index 36c0d02b2e3..2de0e696dcd 100644 --- a/pkg/storage/etcd3/store_test.go +++ b/pkg/storage/etcd3/store_test.go @@ -227,14 +227,17 @@ func TestGetToList(t *testing.T) { tests := []struct { key string filter func(runtime.Object) bool + trigger func() []storage.MatchValue expectedOut []*api.Pod }{{ // test GetToList on existing key key: key, filter: storage.EverythingFunc, + trigger: storage.NoTriggerFunc, expectedOut: []*api.Pod{storedObj}, }, { // test GetToList on non-existing key key: "/non-existing", filter: storage.EverythingFunc, + trigger: storage.NoTriggerFunc, expectedOut: nil, }, { // test GetToList with filter to reject the pod key: "/non-existing", @@ -245,12 +248,13 @@ func TestGetToList(t *testing.T) { } return pod.Name != storedObj.Name }, + trigger: storage.NoTriggerFunc, expectedOut: nil, }} for i, tt := range tests { out := &api.PodList{} - filter := storage.NewSimpleFilter(tt.filter) + filter := storage.NewSimpleFilter(tt.filter, tt.trigger) err := store.GetToList(ctx, tt.key, filter, out) if err != nil { t.Fatalf("GetToList failed: %v", err) @@ -489,14 +493,17 @@ func TestList(t *testing.T) { tests := []struct { prefix string filter func(runtime.Object) bool + trigger func() []storage.MatchValue expectedOut []*api.Pod }{{ // test List on existing key prefix: "/one-level/", filter: storage.EverythingFunc, + trigger: storage.NoTriggerFunc, expectedOut: []*api.Pod{preset[0].storedObj}, }, { // test List on non-existing key prefix: "/non-existing/", filter: storage.EverythingFunc, + trigger: storage.NoTriggerFunc, expectedOut: nil, }, { // test List with filter prefix: "/one-level/", @@ -507,16 +514,18 @@ func TestList(t *testing.T) { } return pod.Name != preset[0].storedObj.Name }, + trigger: storage.NoTriggerFunc, expectedOut: nil, }, { // test List with multiple levels of directories and expect flattened result prefix: "/two-level/", filter: storage.EverythingFunc, + trigger: storage.NoTriggerFunc, expectedOut: []*api.Pod{preset[1].storedObj, preset[2].storedObj}, }} for i, tt := range tests { out := &api.PodList{} - filter := storage.NewSimpleFilter(tt.filter) + filter := storage.NewSimpleFilter(tt.filter, tt.trigger) err := store.List(ctx, tt.prefix, "0", filter, out) if err != nil { t.Fatalf("List failed: %v", err) diff --git a/pkg/storage/etcd3/watcher_test.go b/pkg/storage/etcd3/watcher_test.go index 166f32042d3..9421e73f3b6 100644 --- a/pkg/storage/etcd3/watcher_test.go +++ b/pkg/storage/etcd3/watcher_test.go @@ -58,15 +58,18 @@ func testWatch(t *testing.T, recursive bool) { tests := []struct { key string filter func(runtime.Object) bool + trigger func() []storage.MatchValue watchTests []*testWatchStruct }{{ // create a key key: "/somekey-1", watchTests: []*testWatchStruct{{podFoo, true, watch.Added}}, filter: storage.EverythingFunc, + trigger: storage.NoTriggerFunc, }, { // create a key but obj gets filtered key: "/somekey-2", watchTests: []*testWatchStruct{{podFoo, false, ""}}, filter: func(runtime.Object) bool { return false }, + trigger: storage.NoTriggerFunc, }, { // create a key but obj gets filtered. Then update it with unfiltered obj key: "/somekey-3", watchTests: []*testWatchStruct{{podFoo, false, ""}, {podBar, true, watch.Added}}, @@ -74,10 +77,12 @@ func testWatch(t *testing.T, recursive bool) { pod := obj.(*api.Pod) return pod.Name == "bar" }, + trigger: storage.NoTriggerFunc, }, { // update key: "/somekey-4", watchTests: []*testWatchStruct{{podFoo, true, watch.Added}, {podBar, true, watch.Modified}}, filter: storage.EverythingFunc, + trigger: storage.NoTriggerFunc, }, { // delete because of being filtered key: "/somekey-5", watchTests: []*testWatchStruct{{podFoo, true, watch.Added}, {podBar, true, watch.Deleted}}, @@ -85,9 +90,10 @@ func testWatch(t *testing.T, recursive bool) { pod := obj.(*api.Pod) return pod.Name != "bar" }, + trigger: storage.NoTriggerFunc, }} for i, tt := range tests { - filter := storage.NewSimpleFilter(tt.filter) + filter := storage.NewSimpleFilter(tt.filter, tt.trigger) w, err := store.watch(ctx, tt.key, "0", filter, recursive) if err != nil { t.Fatalf("Watch failed: %v", err) diff --git a/pkg/storage/interfaces.go b/pkg/storage/interfaces.go index f8da6124f3b..ae216b56194 100644 --- a/pkg/storage/interfaces.go +++ b/pkg/storage/interfaces.go @@ -51,11 +51,30 @@ type ResponseMeta struct { ResourceVersion uint64 } +// MatchValue defines a pair (, ). +type MatchValue struct { + IndexName string + Value string +} + +// TriggerPublisherFunc is a function that takes an object, and returns a list of pairs +// (, ) for all indexes known +// to that function. +type TriggerPublisherFunc func(obj runtime.Object) []MatchValue + // Filter is interface that is used to pass filtering mechanism. type Filter interface { // Filter is a predicate which takes an API object and returns true // if and only if the object should remain in the set. Filter(obj runtime.Object) bool + // For any triggers known to the Filter, if Filter() can return only + // (a subset of) objects for which indexing function returns , + // (, pair would be returned. + // + // This is optimization to avoid computing Filter() function (which are + // usually relatively expensive) in case we are sure they will return + // false anyway. + Trigger() []MatchValue } // Everything is a Filter which accepts all objects. @@ -65,10 +84,14 @@ var Everything Filter = everything{} type everything struct { } -func (e everything) Filter(_ runtime.Object) bool { +func (e everything) Filter(runtime.Object) bool { return true } +func (e everything) Trigger() []MatchValue { + return nil +} + // Pass an UpdateFunc to Interface.GuaranteedUpdate to make an update // that is guaranteed to succeed. // See the comment for GuaranteedUpdate for more details. diff --git a/pkg/storage/util.go b/pkg/storage/util.go index c4418c235c7..437028ae5a7 100644 --- a/pkg/storage/util.go +++ b/pkg/storage/util.go @@ -38,16 +38,24 @@ func SimpleUpdate(fn SimpleUpdateFunc) UpdateFunc { // SimpleFilter implements Filter interface. type SimpleFilter struct { - filterFunc func(runtime.Object) bool + filterFunc func(runtime.Object) bool + triggerFunc func() []MatchValue } func (s *SimpleFilter) Filter(obj runtime.Object) bool { return s.filterFunc(obj) } -func NewSimpleFilter(filterFunc func(runtime.Object) bool) Filter { +func (s *SimpleFilter) Trigger() []MatchValue { + return s.triggerFunc() +} + +func NewSimpleFilter( + filterFunc func(runtime.Object) bool, + triggerFunc func() []MatchValue) Filter { return &SimpleFilter{ - filterFunc: filterFunc, + filterFunc: filterFunc, + triggerFunc: triggerFunc, } } @@ -55,6 +63,14 @@ func EverythingFunc(runtime.Object) bool { return true } +func NoTriggerFunc() []MatchValue { + return nil +} + +func NoTriggerPublisher(runtime.Object) []MatchValue { + return nil +} + // ParseWatchResourceVersion takes a resource version argument and converts it to // the etcd version we should pass to helper.Watch(). Because resourceVersion is // an opaque value, the default watch behavior for non-zero watch is to watch