mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-26 05:03:09 +00:00
Merge pull request #75879 from hormes/watchcache_refactor
Refactor watchcache to pass function to create objects instead of the empty object itself
This commit is contained in:
commit
2f62e8ff57
@ -34,19 +34,21 @@ import (
|
|||||||
func StorageWithCacher(capacity int) generic.StorageDecorator {
|
func StorageWithCacher(capacity int) generic.StorageDecorator {
|
||||||
return func(
|
return func(
|
||||||
storageConfig *storagebackend.Config,
|
storageConfig *storagebackend.Config,
|
||||||
objectType runtime.Object,
|
|
||||||
resourcePrefix string,
|
resourcePrefix string,
|
||||||
keyFunc func(obj runtime.Object) (string, error),
|
keyFunc func(obj runtime.Object) (string, error),
|
||||||
|
newFunc func() runtime.Object,
|
||||||
newListFunc func() runtime.Object,
|
newListFunc func() runtime.Object,
|
||||||
getAttrsFunc storage.AttrFunc,
|
getAttrsFunc storage.AttrFunc,
|
||||||
triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
|
triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
|
||||||
|
|
||||||
s, d := generic.NewRawStorage(storageConfig)
|
s, d := generic.NewRawStorage(storageConfig)
|
||||||
if capacity <= 0 {
|
if capacity <= 0 {
|
||||||
klog.V(5).Infof("Storage caching is disabled for %T", objectType)
|
klog.V(5).Infof("Storage caching is disabled for %T", newFunc())
|
||||||
return s, d
|
return s, d
|
||||||
}
|
}
|
||||||
klog.V(5).Infof("Storage caching is enabled for %T with capacity %v", objectType, capacity)
|
if klog.V(5) {
|
||||||
|
klog.Infof("Storage caching is enabled for %T with capacity %v", newFunc(), capacity)
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: we would change this later to make storage always have cacher and hide low level KV layer inside.
|
// TODO: we would change this later to make storage always have cacher and hide low level KV layer inside.
|
||||||
// Currently it has two layers of same storage interface -- cacher and low level kv.
|
// Currently it has two layers of same storage interface -- cacher and low level kv.
|
||||||
@ -54,9 +56,9 @@ func StorageWithCacher(capacity int) generic.StorageDecorator {
|
|||||||
CacheCapacity: capacity,
|
CacheCapacity: capacity,
|
||||||
Storage: s,
|
Storage: s,
|
||||||
Versioner: etcdstorage.APIObjectVersioner{},
|
Versioner: etcdstorage.APIObjectVersioner{},
|
||||||
Type: objectType,
|
|
||||||
ResourcePrefix: resourcePrefix,
|
ResourcePrefix: resourcePrefix,
|
||||||
KeyFunc: keyFunc,
|
KeyFunc: keyFunc,
|
||||||
|
NewFunc: newFunc,
|
||||||
NewListFunc: newListFunc,
|
NewListFunc: newListFunc,
|
||||||
GetAttrsFunc: getAttrsFunc,
|
GetAttrsFunc: getAttrsFunc,
|
||||||
TriggerPublisherFunc: triggerFunc,
|
TriggerPublisherFunc: triggerFunc,
|
||||||
|
@ -1288,9 +1288,9 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
|
|||||||
e.Storage.Codec = opts.StorageConfig.Codec
|
e.Storage.Codec = opts.StorageConfig.Codec
|
||||||
e.Storage.Storage, e.DestroyFunc = opts.Decorator(
|
e.Storage.Storage, e.DestroyFunc = opts.Decorator(
|
||||||
opts.StorageConfig,
|
opts.StorageConfig,
|
||||||
e.NewFunc(),
|
|
||||||
prefix,
|
prefix,
|
||||||
keyFunc,
|
keyFunc,
|
||||||
|
e.NewFunc,
|
||||||
e.NewListFunc,
|
e.NewListFunc,
|
||||||
attrFunc,
|
attrFunc,
|
||||||
triggerFunc,
|
triggerFunc,
|
||||||
|
@ -1554,10 +1554,10 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
|
|||||||
CacheCapacity: 10,
|
CacheCapacity: 10,
|
||||||
Storage: s,
|
Storage: s,
|
||||||
Versioner: etcdstorage.APIObjectVersioner{},
|
Versioner: etcdstorage.APIObjectVersioner{},
|
||||||
Type: &example.Pod{},
|
|
||||||
ResourcePrefix: podPrefix,
|
ResourcePrefix: podPrefix,
|
||||||
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) },
|
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) },
|
||||||
GetAttrsFunc: getPodAttrs,
|
GetAttrsFunc: getPodAttrs,
|
||||||
|
NewFunc: func() runtime.Object { return &example.Pod{} },
|
||||||
NewListFunc: func() runtime.Object { return &example.PodList{} },
|
NewListFunc: func() runtime.Object { return &example.PodList{} },
|
||||||
Codec: sc.Codec,
|
Codec: sc.Codec,
|
||||||
}
|
}
|
||||||
|
@ -28,9 +28,9 @@ import (
|
|||||||
// and an associated DestroyFunc from given parameters.
|
// and an associated DestroyFunc from given parameters.
|
||||||
type StorageDecorator func(
|
type StorageDecorator func(
|
||||||
config *storagebackend.Config,
|
config *storagebackend.Config,
|
||||||
objectType runtime.Object,
|
|
||||||
resourcePrefix string,
|
resourcePrefix string,
|
||||||
keyFunc func(obj runtime.Object) (string, error),
|
keyFunc func(obj runtime.Object) (string, error),
|
||||||
|
newFunc func() runtime.Object,
|
||||||
newListFunc func() runtime.Object,
|
newListFunc func() runtime.Object,
|
||||||
getAttrsFunc storage.AttrFunc,
|
getAttrsFunc storage.AttrFunc,
|
||||||
trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc)
|
trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc)
|
||||||
@ -39,9 +39,9 @@ type StorageDecorator func(
|
|||||||
// without any decoration.
|
// without any decoration.
|
||||||
func UndecoratedStorage(
|
func UndecoratedStorage(
|
||||||
config *storagebackend.Config,
|
config *storagebackend.Config,
|
||||||
objectType runtime.Object,
|
|
||||||
resourcePrefix string,
|
resourcePrefix string,
|
||||||
keyFunc func(obj runtime.Object) (string, error),
|
keyFunc func(obj runtime.Object) (string, error),
|
||||||
|
newFunc func() runtime.Object,
|
||||||
newListFunc func() runtime.Object,
|
newListFunc func() runtime.Object,
|
||||||
getAttrsFunc storage.AttrFunc,
|
getAttrsFunc storage.AttrFunc,
|
||||||
trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
|
trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
|
||||||
|
@ -73,7 +73,6 @@ type Config struct {
|
|||||||
|
|
||||||
// The Cache will be caching objects of a given Type and assumes that they
|
// The Cache will be caching objects of a given Type and assumes that they
|
||||||
// are all stored under ResourcePrefix directory in the underlying database.
|
// are all stored under ResourcePrefix directory in the underlying database.
|
||||||
Type interface{}
|
|
||||||
ResourcePrefix string
|
ResourcePrefix string
|
||||||
|
|
||||||
// KeyFunc is used to get a key in the underlying storage for a given object.
|
// KeyFunc is used to get a key in the underlying storage for a given object.
|
||||||
@ -86,6 +85,9 @@ type Config struct {
|
|||||||
// needs to process an incoming event.
|
// needs to process an incoming event.
|
||||||
TriggerPublisherFunc storage.TriggerPublisherFunc
|
TriggerPublisherFunc storage.TriggerPublisherFunc
|
||||||
|
|
||||||
|
// NewFunc is a function that creates new empty object storing a object of type Type.
|
||||||
|
NewFunc func() runtime.Object
|
||||||
|
|
||||||
// NewList is a function that creates new empty object storing a list of
|
// NewList is a function that creates new empty object storing a list of
|
||||||
// objects of type Type.
|
// objects of type Type.
|
||||||
NewListFunc func() runtime.Object
|
NewListFunc func() runtime.Object
|
||||||
@ -230,21 +232,20 @@ func NewCacherFromConfig(config Config) *Cacher {
|
|||||||
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
||||||
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
|
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
|
||||||
|
|
||||||
|
obj := config.NewFunc()
|
||||||
// Give this error when it is constructed rather than when you get the
|
// Give this error when it is constructed rather than when you get the
|
||||||
// first watch item, because it's much easier to track down that way.
|
// first watch item, because it's much easier to track down that way.
|
||||||
if obj, ok := config.Type.(runtime.Object); ok {
|
if err := runtime.CheckCodec(config.Codec, obj); err != nil {
|
||||||
if err := runtime.CheckCodec(config.Codec, obj); err != nil {
|
panic("storage codec doesn't seem to match given type: " + err.Error())
|
||||||
panic("storage codec doesn't seem to match given type: " + err.Error())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
cacher := &Cacher{
|
cacher := &Cacher{
|
||||||
ready: newReady(),
|
ready: newReady(),
|
||||||
storage: config.Storage,
|
storage: config.Storage,
|
||||||
objectType: reflect.TypeOf(config.Type),
|
objectType: reflect.TypeOf(obj),
|
||||||
watchCache: watchCache,
|
watchCache: watchCache,
|
||||||
reflector: cache.NewNamedReflector(reflectorName, listerWatcher, config.Type, watchCache, 0),
|
reflector: cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0),
|
||||||
versioner: config.Versioner,
|
versioner: config.Versioner,
|
||||||
triggerFunc: config.TriggerPublisherFunc,
|
triggerFunc: config.TriggerPublisherFunc,
|
||||||
watcherIdx: 0,
|
watcherIdx: 0,
|
||||||
|
@ -252,10 +252,10 @@ func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner) {
|
|||||||
CacheCapacity: cap,
|
CacheCapacity: cap,
|
||||||
Storage: s,
|
Storage: s,
|
||||||
Versioner: testVersioner{},
|
Versioner: testVersioner{},
|
||||||
Type: &example.Pod{},
|
|
||||||
ResourcePrefix: prefix,
|
ResourcePrefix: prefix,
|
||||||
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
|
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
|
||||||
GetAttrsFunc: func(obj runtime.Object) (labels.Set, fields.Set, error) { return nil, nil, nil },
|
GetAttrsFunc: func(obj runtime.Object) (labels.Set, fields.Set, error) { return nil, nil, nil },
|
||||||
|
NewFunc: func() runtime.Object { return &example.Pod{} },
|
||||||
NewListFunc: func() runtime.Object { return &example.PodList{} },
|
NewListFunc: func() runtime.Object { return &example.PodList{} },
|
||||||
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
|
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
|
||||||
}
|
}
|
||||||
|
@ -104,10 +104,10 @@ func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage
|
|||||||
CacheCapacity: cap,
|
CacheCapacity: cap,
|
||||||
Storage: s,
|
Storage: s,
|
||||||
Versioner: v,
|
Versioner: v,
|
||||||
Type: &example.Pod{},
|
|
||||||
ResourcePrefix: prefix,
|
ResourcePrefix: prefix,
|
||||||
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
|
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
|
||||||
GetAttrsFunc: GetAttrs,
|
GetAttrsFunc: GetAttrs,
|
||||||
|
NewFunc: func() runtime.Object { return &example.Pod{} },
|
||||||
NewListFunc: func() runtime.Object { return &example.PodList{} },
|
NewListFunc: func() runtime.Object { return &example.PodList{} },
|
||||||
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
|
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user