Merge pull request #79793 from wojtek-t/index_in_cacher

Simplify trigger functions in cacher
This commit is contained in:
Kubernetes Prow Robot 2019-07-17 19:34:37 -07:00 committed by GitHub
commit a930b5ca7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 103 additions and 104 deletions

View File

@ -44,6 +44,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
],
)

View File

@ -28,6 +28,7 @@ import (
"k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/storage"
api "k8s.io/kubernetes/pkg/apis/core"
k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1"
"k8s.io/kubernetes/pkg/kubelet/client"
@ -89,7 +90,11 @@ func NewStorage(optsGetter generic.RESTOptionsGetter, kubeletClientConfig client
TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
}
options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: node.GetAttrs, TriggerFunc: node.NodeNameTriggerFunc}
options := &generic.StoreOptions{
RESTOptions: optsGetter,
AttrFunc: node.GetAttrs,
TriggerFunc: map[string]storage.TriggerPublisherFunc{"metadata.name": node.NodeNameTriggerFunc},
}
if err := store.CompleteWithOptions(options); err != nil {
return nil, err
}

View File

@ -225,10 +225,8 @@ func MatchNode(label labels.Selector, field fields.Selector) pkgstorage.Selectio
}
}
func NodeNameTriggerFunc(obj runtime.Object) []pkgstorage.MatchValue {
node := obj.(*api.Node)
result := pkgstorage.MatchValue{IndexName: "metadata.name", Value: node.ObjectMeta.Name}
return []pkgstorage.MatchValue{result}
func NodeNameTriggerFunc(obj runtime.Object) string {
return obj.(*api.Node).ObjectMeta.Name
}
// ResourceLocation returns a URL and transport which one can use to send traffic for the specified node.

View File

@ -78,7 +78,11 @@ func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGet
TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
}
options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: pod.GetAttrs, TriggerFunc: pod.NodeNameTriggerFunc}
options := &generic.StoreOptions{
RESTOptions: optsGetter,
AttrFunc: pod.GetAttrs,
TriggerFunc: map[string]storage.TriggerPublisherFunc{"spec.nodeName": pod.NodeNameTriggerFunc},
}
if err := store.CompleteWithOptions(options); err != nil {
panic(err) // TODO: Propagate error up
}

View File

@ -193,10 +193,8 @@ func MatchPod(label labels.Selector, field fields.Selector) storage.SelectionPre
}
}
func NodeNameTriggerFunc(obj runtime.Object) []storage.MatchValue {
pod := obj.(*api.Pod)
result := storage.MatchValue{IndexName: "spec.nodeName", Value: pod.Spec.NodeName}
return []storage.MatchValue{result}
func NodeNameTriggerFunc(obj runtime.Object) string {
return obj.(*api.Pod).Spec.NodeName
}
// PodToSelectableFields returns a field set that represents the object

View File

@ -36,6 +36,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
],
)

View File

@ -20,6 +20,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/storage"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/printers"
printersinternal "k8s.io/kubernetes/pkg/printers/internalversion"
@ -46,7 +47,11 @@ func NewREST(optsGetter generic.RESTOptionsGetter) *REST {
TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
}
options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: secret.GetAttrs, TriggerFunc: secret.SecretNameTriggerFunc}
options := &generic.StoreOptions{
RESTOptions: optsGetter,
AttrFunc: secret.GetAttrs,
TriggerFunc: map[string]storage.TriggerPublisherFunc{"metadata.name": secret.SecretNameTriggerFunc},
}
if err := store.CompleteWithOptions(options); err != nil {
panic(err) // TODO: Propagate error up
}

View File

@ -116,10 +116,8 @@ func Matcher(label labels.Selector, field fields.Selector) pkgstorage.SelectionP
}
}
func SecretNameTriggerFunc(obj runtime.Object) []pkgstorage.MatchValue {
secret := obj.(*api.Secret)
result := pkgstorage.MatchValue{IndexName: "metadata.name", Value: secret.ObjectMeta.Name}
return []pkgstorage.MatchValue{result}
func SecretNameTriggerFunc(obj runtime.Object) string {
return obj.(*api.Secret).ObjectMeta.Name
}
// SelectableFields returns a field set that can be used for filter selection

View File

@ -47,6 +47,6 @@ type RESTOptionsGetter interface {
// StoreOptions is set of configuration options used to complete generic registries.
type StoreOptions struct {
RESTOptions RESTOptionsGetter
TriggerFunc storage.TriggerPublisherFunc
TriggerFunc storage.TriggerPublisherFuncs
AttrFunc storage.AttrFunc
}

View File

@ -39,7 +39,7 @@ func StorageWithCacher(capacity int) generic.StorageDecorator {
newFunc func() runtime.Object,
newListFunc func() runtime.Object,
getAttrsFunc storage.AttrFunc,
triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc, error) {
triggerFuncs storage.TriggerPublisherFuncs) (storage.Interface, factory.DestroyFunc, error) {
s, d, err := generic.NewRawStorage(storageConfig)
if err != nil {
@ -56,16 +56,16 @@ func StorageWithCacher(capacity int) generic.StorageDecorator {
// TODO: we would change this later to make storage always have cacher and hide low level KV layer inside.
// Currently it has two layers of same storage interface -- cacher and low level kv.
cacherConfig := cacherstorage.Config{
CacheCapacity: capacity,
Storage: s,
Versioner: etcd3.APIObjectVersioner{},
ResourcePrefix: resourcePrefix,
KeyFunc: keyFunc,
NewFunc: newFunc,
NewListFunc: newListFunc,
GetAttrsFunc: getAttrsFunc,
TriggerPublisherFunc: triggerFunc,
Codec: storageConfig.Codec,
CacheCapacity: capacity,
Storage: s,
Versioner: etcd3.APIObjectVersioner{},
ResourcePrefix: resourcePrefix,
KeyFunc: keyFunc,
NewFunc: newFunc,
NewListFunc: newListFunc,
GetAttrsFunc: getAttrsFunc,
TriggerPublisherFuncs: triggerFuncs,
Codec: storageConfig.Codec,
}
cacher, err := cacherstorage.NewCacherFromConfig(cacherConfig)
if err != nil {

View File

@ -1287,11 +1287,6 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
return e.KeyFunc(genericapirequest.NewContext(), accessor.GetName())
}
triggerFunc := options.TriggerFunc
if triggerFunc == nil {
triggerFunc = storage.NoTriggerPublisher
}
if e.DeleteCollectionWorkers == 0 {
e.DeleteCollectionWorkers = opts.DeleteCollectionWorkers
}
@ -1318,7 +1313,7 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
e.NewFunc,
e.NewListFunc,
attrFunc,
triggerFunc,
options.TriggerFunc,
)
if err != nil {
return err

View File

@ -32,7 +32,7 @@ type StorageDecorator func(
newFunc func() runtime.Object,
newListFunc func() runtime.Object,
getAttrsFunc storage.AttrFunc,
trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc, error)
trigger storage.TriggerPublisherFuncs) (storage.Interface, factory.DestroyFunc, error)
// UndecoratedStorage returns the given a new storage from the given config
// without any decoration.
@ -43,7 +43,7 @@ func UndecoratedStorage(
newFunc func() runtime.Object,
newListFunc func() runtime.Object,
getAttrsFunc storage.AttrFunc,
trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc, error) {
trigger storage.TriggerPublisherFuncs) (storage.Interface, factory.DestroyFunc, error) {
return NewRawStorage(config)
}

View File

@ -88,9 +88,9 @@ type Config struct {
// GetAttrsFunc is used to get object labels, fields
GetAttrsFunc func(runtime.Object) (label labels.Set, field fields.Set, err error)
// TriggerPublisherFunc is used for optimizing amount of watchers that
// TriggerPublisherFuncs is used for optimizing amount of watchers that
// needs to process an incoming event.
TriggerPublisherFunc storage.TriggerPublisherFunc
TriggerPublisherFuncs storage.TriggerPublisherFuncs
// NewFunc is a function that creates new empty object storing a object of type Type.
NewFunc func() runtime.Object
@ -209,6 +209,11 @@ func (t *watcherBookmarkTimeBuckets) popExpiredWatchers() [][]*cacheWatcher {
type filterWithAttrsFunc func(key string, l labels.Set, f fields.Set) bool
type indexedTriggerFunc struct {
indexName string
triggerFunc storage.TriggerPublisherFunc
}
// Cacher is responsible for serving WATCH and LIST requests for a given
// resource from its internal cache and updating its cache in the background
// based on the underlying storage contents.
@ -248,9 +253,9 @@ type Cacher struct {
// newFunc is a function that creates new empty object storing a object of type Type.
newFunc func() runtime.Object
// triggerFunc is used for optimizing amount of watchers that needs to process
// indexedTrigger is used for optimizing amount of watchers that needs to process
// an incoming event.
triggerFunc storage.TriggerPublisherFunc
indexedTrigger *indexedTriggerFunc
// watchers is mapping from the value of trigger function that a
// watcher is interested into the watchers
watcherIdx int
@ -300,15 +305,32 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
return nil, fmt.Errorf("storage codec doesn't seem to match given type: %v", err)
}
var indexedTrigger *indexedTriggerFunc
if config.TriggerPublisherFuncs != nil {
// For now, we don't support multiple trigger functions defined
// for a given resource.
if len(config.TriggerPublisherFuncs) > 1 {
return nil, fmt.Errorf("cacher %s doesn't support more than one TriggerPublisherFunc: ", reflect.TypeOf(obj).String())
}
for key, value := range config.TriggerPublisherFuncs {
if value != nil {
indexedTrigger = &indexedTriggerFunc{
indexName: key,
triggerFunc: value,
}
}
}
}
clock := clock.RealClock{}
cacher := &Cacher{
ready: newReady(),
storage: config.Storage,
objectType: reflect.TypeOf(obj),
versioner: config.Versioner,
newFunc: config.NewFunc,
triggerFunc: config.TriggerPublisherFunc,
watcherIdx: 0,
ready: newReady(),
storage: config.Storage,
objectType: reflect.TypeOf(obj),
versioner: config.Versioner,
newFunc: config.NewFunc,
indexedTrigger: indexedTrigger,
watcherIdx: 0,
watchers: indexedWatchers{
allWatchers: make(map[int]*cacheWatcher),
valueWatchers: make(map[string]watchersMap),
@ -419,23 +441,27 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
c.ready.wait()
triggerValue, triggerSupported := "", false
// TODO: Currently we assume that in a given Cacher object, any <predicate> that is
// passed here is aware of exactly the same trigger (at most one).
// Thus, either 0 or 1 values will be returned.
if matchValues := pred.MatcherIndex(); len(matchValues) > 0 {
triggerValue, triggerSupported = matchValues[0].Value, true
if c.indexedTrigger != nil {
for _, field := range pred.IndexFields {
if field == c.indexedTrigger.indexName {
if value, ok := pred.Field.RequiresExactMatch(field); ok {
triggerValue, triggerSupported = value, true
}
}
}
}
// If there is triggerFunc defined, but triggerSupported is false,
// If there is indexedTrigger defined, but triggerSupported is false,
// we can't narrow the amount of events significantly at this point.
//
// That said, currently triggerFunc is defined only for Pods and Nodes,
// and there is only constant number of watchers for which triggerSupported
// is false (excluding those issues explicitly by users).
// That said, currently indexedTrigger is defined only for couple resources:
// Pods, Nodes, Secrets and ConfigMaps and there is only a constant
// number of watchers for which triggerSupported is false (excluding those
// issued explicitly by users).
// Thus, to reduce the risk of those watchers blocking all watchers of a
// given resource in the system, we increase the sizes of buffers for them.
chanSize := 10
if c.triggerFunc != nil && !triggerSupported {
if c.indexedTrigger != nil && !triggerSupported {
// TODO: We should tune this value and ideally make it dependent on the
// number of objects of a given type and/or their churn.
chanSize = 1000
@ -711,29 +737,20 @@ func (c *Cacher) Count(pathPrefix string) (int64, error) {
}
func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) {
// TODO: Currently we assume that in a given Cacher object, its <c.triggerFunc>
// is aware of exactly the same trigger (at most one). Thus calling:
// c.triggerFunc(<some object>)
// can return only 0 or 1 values.
// That means, that triggerValues itself may return up to 2 different values.
if c.triggerFunc == nil {
if c.indexedTrigger == nil {
return nil, false
}
result := make([]string, 0, 2)
matchValues := c.triggerFunc(event.Object)
if len(matchValues) > 0 {
result = append(result, matchValues[0].Value)
}
result = append(result, c.indexedTrigger.triggerFunc(event.Object))
if event.PrevObject == nil {
return result, len(result) > 0
return result, true
}
prevMatchValues := c.triggerFunc(event.PrevObject)
if len(prevMatchValues) > 0 {
if len(result) == 0 || result[0] != prevMatchValues[0].Value {
result = append(result, prevMatchValues[0].Value)
}
prevTriggerValue := c.indexedTrigger.triggerFunc(event.PrevObject)
if result[0] != prevTriggerValue {
result = append(result, prevTriggerValue)
}
return result, len(result) > 0
return result, true
}
func (c *Cacher) processEvent(event *watchCacheEvent) {

View File

@ -73,16 +73,15 @@ type ResponseMeta struct {
ResourceVersion uint64
}
// MatchValue defines a pair (<index name>, <value for that index>).
type MatchValue struct {
IndexName string
Value string
}
// TriggerPublisherFunc is a function that for a given object computes
// <value of an index> for a particular <index>.
// TODO(wojtek-t): Rename to IndexerFunc?
type TriggerPublisherFunc func(obj runtime.Object) string
// TriggerPublisherFunc is a function that takes an object, and returns a list of pairs
// (<index name>, <index value for the given object>) for all indexes known
// to that function.
type TriggerPublisherFunc func(obj runtime.Object) []MatchValue
// TriggerPublisherFuncs is a mapping from <index name> to function that
// for a given object computes <value for that index>.
// TODO(wojtek-t): Rename to IndexerFuncs?
type TriggerPublisherFuncs map[string]TriggerPublisherFunc
// Everything accepts all objects.
var Everything = SelectionPredicate{

View File

@ -124,20 +124,6 @@ func (s *SelectionPredicate) MatchesSingle() (string, bool) {
return "", false
}
// For any index defined by IndexFields, if a matcher can match only (a subset)
// of objects that return <value> for a given index, a pair (<index name>, <value>)
// wil be returned.
// TODO: Consider supporting also labels.
func (s *SelectionPredicate) MatcherIndex() []MatchValue {
var result []MatchValue
for _, field := range s.IndexFields {
if value, ok := s.Field.RequiresExactMatch(field); ok {
result = append(result, MatchValue{IndexName: field, Value: value})
}
}
return result
}
// Empty returns true if the predicate performs no filtering.
func (s *SelectionPredicate) Empty() bool {
return s.Label.Empty() && s.Field.Empty()

View File

@ -39,14 +39,6 @@ func EverythingFunc(runtime.Object) bool {
return true
}
func NoTriggerFunc() []MatchValue {
return nil
}
func NoTriggerPublisher(runtime.Object) []MatchValue {
return nil
}
func NamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) {
meta, err := meta.Accessor(obj)
if err != nil {