From 68fd329e00d1ddf19a7ec0db1607820a1c4c008a Mon Sep 17 00:00:00 2001 From: wojtekt Date: Thu, 4 Jul 2019 14:21:06 +0200 Subject: [PATCH] Simplify trigger functions in cacher --- pkg/registry/core/node/storage/BUILD | 1 + pkg/registry/core/node/storage/storage.go | 7 +- pkg/registry/core/node/strategy.go | 6 +- pkg/registry/core/pod/storage/storage.go | 6 +- pkg/registry/core/pod/strategy.go | 6 +- pkg/registry/core/secret/storage/BUILD | 1 + pkg/registry/core/secret/storage/storage.go | 7 +- pkg/registry/core/secret/strategy.go | 6 +- .../apiserver/pkg/registry/generic/options.go | 2 +- .../generic/registry/storage_factory.go | 22 ++--- .../pkg/registry/generic/registry/store.go | 7 +- .../pkg/registry/generic/storage_decorator.go | 4 +- .../apiserver/pkg/storage/cacher/cacher.go | 93 +++++++++++-------- .../apiserver/pkg/storage/interfaces.go | 17 ++-- .../pkg/storage/selection_predicate.go | 14 --- .../src/k8s.io/apiserver/pkg/storage/util.go | 8 -- 16 files changed, 103 insertions(+), 104 deletions(-) diff --git a/pkg/registry/core/node/storage/BUILD b/pkg/registry/core/node/storage/BUILD index 5aa7b73b4d5..da7362cafb7 100644 --- a/pkg/registry/core/node/storage/BUILD +++ b/pkg/registry/core/node/storage/BUILD @@ -44,6 +44,7 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storage:go_default_library", ], ) diff --git a/pkg/registry/core/node/storage/storage.go b/pkg/registry/core/node/storage/storage.go index bf88a8d61d1..a073832c35a 100644 --- a/pkg/registry/core/node/storage/storage.go +++ b/pkg/registry/core/node/storage/storage.go @@ -28,6 +28,7 @@ import ( "k8s.io/apiserver/pkg/registry/generic" genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/apiserver/pkg/storage" api "k8s.io/kubernetes/pkg/apis/core" k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1" "k8s.io/kubernetes/pkg/kubelet/client" @@ -89,7 +90,11 @@ func NewStorage(optsGetter generic.RESTOptionsGetter, kubeletClientConfig client TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)}, } - options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: node.GetAttrs, TriggerFunc: node.NodeNameTriggerFunc} + options := &generic.StoreOptions{ + RESTOptions: optsGetter, + AttrFunc: node.GetAttrs, + TriggerFunc: map[string]storage.TriggerPublisherFunc{"metadata.name": node.NodeNameTriggerFunc}, + } if err := store.CompleteWithOptions(options); err != nil { return nil, err } diff --git a/pkg/registry/core/node/strategy.go b/pkg/registry/core/node/strategy.go index 9cfebd4d622..221d762a569 100644 --- a/pkg/registry/core/node/strategy.go +++ b/pkg/registry/core/node/strategy.go @@ -225,10 +225,8 @@ func MatchNode(label labels.Selector, field fields.Selector) pkgstorage.Selectio } } -func NodeNameTriggerFunc(obj runtime.Object) []pkgstorage.MatchValue { - node := obj.(*api.Node) - result := pkgstorage.MatchValue{IndexName: "metadata.name", Value: node.ObjectMeta.Name} - return []pkgstorage.MatchValue{result} +func NodeNameTriggerFunc(obj runtime.Object) string { + return obj.(*api.Node).ObjectMeta.Name } // ResourceLocation returns a URL and transport which one can use to send traffic for the specified node. diff --git a/pkg/registry/core/pod/storage/storage.go b/pkg/registry/core/pod/storage/storage.go index 311f2a3c724..e268c46268b 100644 --- a/pkg/registry/core/pod/storage/storage.go +++ b/pkg/registry/core/pod/storage/storage.go @@ -78,7 +78,11 @@ func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGet TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)}, } - options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: pod.GetAttrs, TriggerFunc: pod.NodeNameTriggerFunc} + options := &generic.StoreOptions{ + RESTOptions: optsGetter, + AttrFunc: pod.GetAttrs, + TriggerFunc: map[string]storage.TriggerPublisherFunc{"spec.nodeName": pod.NodeNameTriggerFunc}, + } if err := store.CompleteWithOptions(options); err != nil { panic(err) // TODO: Propagate error up } diff --git a/pkg/registry/core/pod/strategy.go b/pkg/registry/core/pod/strategy.go index 5e3e289a8e7..07db221b2a7 100644 --- a/pkg/registry/core/pod/strategy.go +++ b/pkg/registry/core/pod/strategy.go @@ -193,10 +193,8 @@ func MatchPod(label labels.Selector, field fields.Selector) storage.SelectionPre } } -func NodeNameTriggerFunc(obj runtime.Object) []storage.MatchValue { - pod := obj.(*api.Pod) - result := storage.MatchValue{IndexName: "spec.nodeName", Value: pod.Spec.NodeName} - return []storage.MatchValue{result} +func NodeNameTriggerFunc(obj runtime.Object) string { + return obj.(*api.Pod).Spec.NodeName } // PodToSelectableFields returns a field set that represents the object diff --git a/pkg/registry/core/secret/storage/BUILD b/pkg/registry/core/secret/storage/BUILD index 6d670c8135c..295be787637 100644 --- a/pkg/registry/core/secret/storage/BUILD +++ b/pkg/registry/core/secret/storage/BUILD @@ -36,6 +36,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storage:go_default_library", ], ) diff --git a/pkg/registry/core/secret/storage/storage.go b/pkg/registry/core/secret/storage/storage.go index fb852682fd3..b27b5c012bb 100644 --- a/pkg/registry/core/secret/storage/storage.go +++ b/pkg/registry/core/secret/storage/storage.go @@ -20,6 +20,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apiserver/pkg/registry/generic" genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" + "k8s.io/apiserver/pkg/storage" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/printers" printersinternal "k8s.io/kubernetes/pkg/printers/internalversion" @@ -46,7 +47,11 @@ func NewREST(optsGetter generic.RESTOptionsGetter) *REST { TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)}, } - options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: secret.GetAttrs, TriggerFunc: secret.SecretNameTriggerFunc} + options := &generic.StoreOptions{ + RESTOptions: optsGetter, + AttrFunc: secret.GetAttrs, + TriggerFunc: map[string]storage.TriggerPublisherFunc{"metadata.name": secret.SecretNameTriggerFunc}, + } if err := store.CompleteWithOptions(options); err != nil { panic(err) // TODO: Propagate error up } diff --git a/pkg/registry/core/secret/strategy.go b/pkg/registry/core/secret/strategy.go index 52d39cbe97d..d693c72d684 100644 --- a/pkg/registry/core/secret/strategy.go +++ b/pkg/registry/core/secret/strategy.go @@ -116,10 +116,8 @@ func Matcher(label labels.Selector, field fields.Selector) pkgstorage.SelectionP } } -func SecretNameTriggerFunc(obj runtime.Object) []pkgstorage.MatchValue { - secret := obj.(*api.Secret) - result := pkgstorage.MatchValue{IndexName: "metadata.name", Value: secret.ObjectMeta.Name} - return []pkgstorage.MatchValue{result} +func SecretNameTriggerFunc(obj runtime.Object) string { + return obj.(*api.Secret).ObjectMeta.Name } // SelectableFields returns a field set that can be used for filter selection diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/options.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/options.go index af651371f09..fb9adba1ab0 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/options.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/options.go @@ -47,6 +47,6 @@ type RESTOptionsGetter interface { // StoreOptions is set of configuration options used to complete generic registries. type StoreOptions struct { RESTOptions RESTOptionsGetter - TriggerFunc storage.TriggerPublisherFunc + TriggerFunc storage.TriggerPublisherFuncs AttrFunc storage.AttrFunc } diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go index 6b34fc1e41a..acefc238689 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go @@ -39,7 +39,7 @@ func StorageWithCacher(capacity int) generic.StorageDecorator { newFunc func() runtime.Object, newListFunc func() runtime.Object, getAttrsFunc storage.AttrFunc, - triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc, error) { + triggerFuncs storage.TriggerPublisherFuncs) (storage.Interface, factory.DestroyFunc, error) { s, d, err := generic.NewRawStorage(storageConfig) if err != nil { @@ -56,16 +56,16 @@ func StorageWithCacher(capacity int) generic.StorageDecorator { // TODO: we would change this later to make storage always have cacher and hide low level KV layer inside. // Currently it has two layers of same storage interface -- cacher and low level kv. cacherConfig := cacherstorage.Config{ - CacheCapacity: capacity, - Storage: s, - Versioner: etcd3.APIObjectVersioner{}, - ResourcePrefix: resourcePrefix, - KeyFunc: keyFunc, - NewFunc: newFunc, - NewListFunc: newListFunc, - GetAttrsFunc: getAttrsFunc, - TriggerPublisherFunc: triggerFunc, - Codec: storageConfig.Codec, + CacheCapacity: capacity, + Storage: s, + Versioner: etcd3.APIObjectVersioner{}, + ResourcePrefix: resourcePrefix, + KeyFunc: keyFunc, + NewFunc: newFunc, + NewListFunc: newListFunc, + GetAttrsFunc: getAttrsFunc, + TriggerPublisherFuncs: triggerFuncs, + Codec: storageConfig.Codec, } cacher, err := cacherstorage.NewCacherFromConfig(cacherConfig) if err != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go index ee3c4fb8d7f..be03caa41b1 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go @@ -1287,11 +1287,6 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error { return e.KeyFunc(genericapirequest.NewContext(), accessor.GetName()) } - triggerFunc := options.TriggerFunc - if triggerFunc == nil { - triggerFunc = storage.NoTriggerPublisher - } - if e.DeleteCollectionWorkers == 0 { e.DeleteCollectionWorkers = opts.DeleteCollectionWorkers } @@ -1318,7 +1313,7 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error { e.NewFunc, e.NewListFunc, attrFunc, - triggerFunc, + options.TriggerFunc, ) if err != nil { return err diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go index 6509ef831be..1c553413efa 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go @@ -32,7 +32,7 @@ type StorageDecorator func( newFunc func() runtime.Object, newListFunc func() runtime.Object, getAttrsFunc storage.AttrFunc, - trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc, error) + trigger storage.TriggerPublisherFuncs) (storage.Interface, factory.DestroyFunc, error) // UndecoratedStorage returns the given a new storage from the given config // without any decoration. @@ -43,7 +43,7 @@ func UndecoratedStorage( newFunc func() runtime.Object, newListFunc func() runtime.Object, getAttrsFunc storage.AttrFunc, - trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc, error) { + trigger storage.TriggerPublisherFuncs) (storage.Interface, factory.DestroyFunc, error) { return NewRawStorage(config) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 2c4c1a35431..2b701084db4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -88,9 +88,9 @@ type Config struct { // GetAttrsFunc is used to get object labels, fields GetAttrsFunc func(runtime.Object) (label labels.Set, field fields.Set, err error) - // TriggerPublisherFunc is used for optimizing amount of watchers that + // TriggerPublisherFuncs is used for optimizing amount of watchers that // needs to process an incoming event. - TriggerPublisherFunc storage.TriggerPublisherFunc + TriggerPublisherFuncs storage.TriggerPublisherFuncs // NewFunc is a function that creates new empty object storing a object of type Type. NewFunc func() runtime.Object @@ -209,6 +209,11 @@ func (t *watcherBookmarkTimeBuckets) popExpiredWatchers() [][]*cacheWatcher { type filterWithAttrsFunc func(key string, l labels.Set, f fields.Set) bool +type indexedTriggerFunc struct { + indexName string + triggerFunc storage.TriggerPublisherFunc +} + // Cacher is responsible for serving WATCH and LIST requests for a given // resource from its internal cache and updating its cache in the background // based on the underlying storage contents. @@ -248,9 +253,9 @@ type Cacher struct { // newFunc is a function that creates new empty object storing a object of type Type. newFunc func() runtime.Object - // triggerFunc is used for optimizing amount of watchers that needs to process + // indexedTrigger is used for optimizing amount of watchers that needs to process // an incoming event. - triggerFunc storage.TriggerPublisherFunc + indexedTrigger *indexedTriggerFunc // watchers is mapping from the value of trigger function that a // watcher is interested into the watchers watcherIdx int @@ -300,15 +305,32 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { return nil, fmt.Errorf("storage codec doesn't seem to match given type: %v", err) } + var indexedTrigger *indexedTriggerFunc + if config.TriggerPublisherFuncs != nil { + // For now, we don't support multiple trigger functions defined + // for a given resource. + if len(config.TriggerPublisherFuncs) > 1 { + return nil, fmt.Errorf("cacher %s doesn't support more than one TriggerPublisherFunc: ", reflect.TypeOf(obj).String()) + } + for key, value := range config.TriggerPublisherFuncs { + if value != nil { + indexedTrigger = &indexedTriggerFunc{ + indexName: key, + triggerFunc: value, + } + } + } + } + clock := clock.RealClock{} cacher := &Cacher{ - ready: newReady(), - storage: config.Storage, - objectType: reflect.TypeOf(obj), - versioner: config.Versioner, - newFunc: config.NewFunc, - triggerFunc: config.TriggerPublisherFunc, - watcherIdx: 0, + ready: newReady(), + storage: config.Storage, + objectType: reflect.TypeOf(obj), + versioner: config.Versioner, + newFunc: config.NewFunc, + indexedTrigger: indexedTrigger, + watcherIdx: 0, watchers: indexedWatchers{ allWatchers: make(map[int]*cacheWatcher), valueWatchers: make(map[string]watchersMap), @@ -419,23 +441,27 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, c.ready.wait() triggerValue, triggerSupported := "", false - // TODO: Currently we assume that in a given Cacher object, any that is - // passed here is aware of exactly the same trigger (at most one). - // Thus, either 0 or 1 values will be returned. - if matchValues := pred.MatcherIndex(); len(matchValues) > 0 { - triggerValue, triggerSupported = matchValues[0].Value, true + if c.indexedTrigger != nil { + for _, field := range pred.IndexFields { + if field == c.indexedTrigger.indexName { + if value, ok := pred.Field.RequiresExactMatch(field); ok { + triggerValue, triggerSupported = value, true + } + } + } } - // If there is triggerFunc defined, but triggerSupported is false, + // If there is indexedTrigger defined, but triggerSupported is false, // we can't narrow the amount of events significantly at this point. // - // That said, currently triggerFunc is defined only for Pods and Nodes, - // and there is only constant number of watchers for which triggerSupported - // is false (excluding those issues explicitly by users). + // That said, currently indexedTrigger is defined only for couple resources: + // Pods, Nodes, Secrets and ConfigMaps and there is only a constant + // number of watchers for which triggerSupported is false (excluding those + // issued explicitly by users). // Thus, to reduce the risk of those watchers blocking all watchers of a // given resource in the system, we increase the sizes of buffers for them. chanSize := 10 - if c.triggerFunc != nil && !triggerSupported { + if c.indexedTrigger != nil && !triggerSupported { // TODO: We should tune this value and ideally make it dependent on the // number of objects of a given type and/or their churn. chanSize = 1000 @@ -711,29 +737,20 @@ func (c *Cacher) Count(pathPrefix string) (int64, error) { } func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) { - // TODO: Currently we assume that in a given Cacher object, its - // is aware of exactly the same trigger (at most one). Thus calling: - // c.triggerFunc() - // can return only 0 or 1 values. - // That means, that triggerValues itself may return up to 2 different values. - if c.triggerFunc == nil { + if c.indexedTrigger == nil { return nil, false } + result := make([]string, 0, 2) - matchValues := c.triggerFunc(event.Object) - if len(matchValues) > 0 { - result = append(result, matchValues[0].Value) - } + result = append(result, c.indexedTrigger.triggerFunc(event.Object)) if event.PrevObject == nil { - return result, len(result) > 0 + return result, true } - prevMatchValues := c.triggerFunc(event.PrevObject) - if len(prevMatchValues) > 0 { - if len(result) == 0 || result[0] != prevMatchValues[0].Value { - result = append(result, prevMatchValues[0].Value) - } + prevTriggerValue := c.indexedTrigger.triggerFunc(event.PrevObject) + if result[0] != prevTriggerValue { + result = append(result, prevTriggerValue) } - return result, len(result) > 0 + return result, true } func (c *Cacher) processEvent(event *watchCacheEvent) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go index e649c87f5aa..c850b248439 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go @@ -73,16 +73,15 @@ type ResponseMeta struct { ResourceVersion uint64 } -// MatchValue defines a pair (, ). -type MatchValue struct { - IndexName string - Value string -} +// TriggerPublisherFunc is a function that for a given object computes +// for a particular . +// TODO(wojtek-t): Rename to IndexerFunc? +type TriggerPublisherFunc func(obj runtime.Object) string -// TriggerPublisherFunc is a function that takes an object, and returns a list of pairs -// (, ) for all indexes known -// to that function. -type TriggerPublisherFunc func(obj runtime.Object) []MatchValue +// TriggerPublisherFuncs is a mapping from to function that +// for a given object computes . +// TODO(wojtek-t): Rename to IndexerFuncs? +type TriggerPublisherFuncs map[string]TriggerPublisherFunc // Everything accepts all objects. var Everything = SelectionPredicate{ diff --git a/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go b/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go index b2f8c8e88ef..66d8d1f958a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go @@ -124,20 +124,6 @@ func (s *SelectionPredicate) MatchesSingle() (string, bool) { return "", false } -// For any index defined by IndexFields, if a matcher can match only (a subset) -// of objects that return for a given index, a pair (, ) -// wil be returned. -// TODO: Consider supporting also labels. -func (s *SelectionPredicate) MatcherIndex() []MatchValue { - var result []MatchValue - for _, field := range s.IndexFields { - if value, ok := s.Field.RequiresExactMatch(field); ok { - result = append(result, MatchValue{IndexName: field, Value: value}) - } - } - return result -} - // Empty returns true if the predicate performs no filtering. func (s *SelectionPredicate) Empty() bool { return s.Label.Empty() && s.Field.Empty() diff --git a/staging/src/k8s.io/apiserver/pkg/storage/util.go b/staging/src/k8s.io/apiserver/pkg/storage/util.go index 8c571b1c831..9da8d9713c1 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/util.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/util.go @@ -39,14 +39,6 @@ func EverythingFunc(runtime.Object) bool { return true } -func NoTriggerFunc() []MatchValue { - return nil -} - -func NoTriggerPublisher(runtime.Object) []MatchValue { - return nil -} - func NamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) { meta, err := meta.Accessor(obj) if err != nil {