mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 04:06:03 +00:00
Merge pull request #45403 from sttts/sttts-tri-state-watch-capacity
Automatic merge from submit-queue apiserver: injectable default watch cache size This makes it possible to override the default watch capacity in the REST options getter. Before this PR the default is written into the storage struct explicitly, and if it is the default, the REST options getter didn't know. With this the PR the default is applied late and can be injected from the outside.
This commit is contained in:
commit
49e5435529
@ -11,10 +11,7 @@ go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["cachesize.go"],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library",
|
||||
],
|
||||
deps = ["//vendor/github.com/golang/glog:go_default_library"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
|
@ -23,8 +23,6 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/apiserver/pkg/registry/generic/registry"
|
||||
)
|
||||
|
||||
type Resource string
|
||||
@ -111,11 +109,13 @@ func SetWatchCacheSizes(cacheSizes []string) {
|
||||
}
|
||||
}
|
||||
|
||||
func GetWatchCacheSizeByResource(resource string) int { // TODO this should use schema.GroupResource for lookups
|
||||
// GetWatchCacheSizeByResource returns the configured watch cache size for the given resource.
|
||||
// A nil value means to use a default size, zero means to disable caching.
|
||||
func GetWatchCacheSizeByResource(resource string) (ret *int) { // TODO this should use schema.GroupResource for lookups
|
||||
if value, found := watchCacheSizes[Resource(resource)]; found {
|
||||
return value
|
||||
return &value
|
||||
}
|
||||
return registry.DefaultWatchCacheSize
|
||||
return nil
|
||||
}
|
||||
|
||||
func maxInt(a, b int) int {
|
||||
|
@ -40,7 +40,7 @@ func NewREST(optsGetter generic.RESTOptionsGetter) *REST {
|
||||
}
|
||||
|
||||
// We explicitly do NOT do any decoration here yet. // TODO determine why we do not want to cache here
|
||||
opts.Decorator = generic.UndecoratedStorage // TODO use watchCacheSize=-1 to signal UndecoratedStorage
|
||||
opts.Decorator = generic.UndecoratedStorage
|
||||
|
||||
store := &genericregistry.Store{
|
||||
Copier: api.Scheme,
|
||||
|
@ -25,41 +25,49 @@ import (
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
||||
)
|
||||
|
||||
var _ generic.StorageDecorator = StorageWithCacher
|
||||
|
||||
// Creates a cacher based given storageConfig.
|
||||
func StorageWithCacher(
|
||||
copier runtime.ObjectCopier,
|
||||
storageConfig *storagebackend.Config,
|
||||
capacity int,
|
||||
objectType runtime.Object,
|
||||
resourcePrefix string,
|
||||
keyFunc func(obj runtime.Object) (string, error),
|
||||
newListFunc func() runtime.Object,
|
||||
getAttrsFunc storage.AttrFunc,
|
||||
triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
|
||||
func StorageWithCacher(defaultCapacity int) generic.StorageDecorator {
|
||||
return func(
|
||||
copier runtime.ObjectCopier,
|
||||
storageConfig *storagebackend.Config,
|
||||
requestedSize *int,
|
||||
objectType runtime.Object,
|
||||
resourcePrefix string,
|
||||
keyFunc func(obj runtime.Object) (string, error),
|
||||
newListFunc func() runtime.Object,
|
||||
getAttrsFunc storage.AttrFunc,
|
||||
triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
|
||||
|
||||
s, d := generic.NewRawStorage(storageConfig)
|
||||
// TODO: we would change this later to make storage always have cacher and hide low level KV layer inside.
|
||||
// Currently it has two layers of same storage interface -- cacher and low level kv.
|
||||
cacherConfig := storage.CacherConfig{
|
||||
CacheCapacity: capacity,
|
||||
Storage: s,
|
||||
Versioner: etcdstorage.APIObjectVersioner{},
|
||||
Copier: copier,
|
||||
Type: objectType,
|
||||
ResourcePrefix: resourcePrefix,
|
||||
KeyFunc: keyFunc,
|
||||
NewListFunc: newListFunc,
|
||||
GetAttrsFunc: getAttrsFunc,
|
||||
TriggerPublisherFunc: triggerFunc,
|
||||
Codec: storageConfig.Codec,
|
||||
}
|
||||
cacher := storage.NewCacherFromConfig(cacherConfig)
|
||||
destroyFunc := func() {
|
||||
cacher.Stop()
|
||||
d()
|
||||
}
|
||||
capacity := defaultCapacity
|
||||
if requestedSize != nil && *requestedSize == 0 {
|
||||
panic("StorageWithCacher must not be called with zero cache size")
|
||||
}
|
||||
if requestedSize != nil {
|
||||
capacity = *requestedSize
|
||||
}
|
||||
|
||||
return cacher, destroyFunc
|
||||
s, d := generic.NewRawStorage(storageConfig)
|
||||
// TODO: we would change this later to make storage always have cacher and hide low level KV layer inside.
|
||||
// Currently it has two layers of same storage interface -- cacher and low level kv.
|
||||
cacherConfig := storage.CacherConfig{
|
||||
CacheCapacity: capacity,
|
||||
Storage: s,
|
||||
Versioner: etcdstorage.APIObjectVersioner{},
|
||||
Copier: copier,
|
||||
Type: objectType,
|
||||
ResourcePrefix: resourcePrefix,
|
||||
KeyFunc: keyFunc,
|
||||
NewListFunc: newListFunc,
|
||||
GetAttrsFunc: getAttrsFunc,
|
||||
TriggerPublisherFunc: triggerFunc,
|
||||
Codec: storageConfig.Codec,
|
||||
}
|
||||
cacher := storage.NewCacherFromConfig(cacherConfig)
|
||||
destroyFunc := func() {
|
||||
cacher.Stop()
|
||||
d()
|
||||
}
|
||||
|
||||
return cacher, destroyFunc
|
||||
}
|
||||
}
|
||||
|
@ -44,9 +44,6 @@ import (
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// defaultWatchCacheSize is the default size of a watch catch per resource in number of entries.
|
||||
const DefaultWatchCacheSize = 100
|
||||
|
||||
// ObjectFunc is a function to act on a given object. An error may be returned
|
||||
// if the hook cannot be completed. An ObjectFunc may transform the provided
|
||||
// object.
|
||||
@ -164,9 +161,9 @@ type Store struct {
|
||||
// Called to cleanup clients used by the underlying Storage; optional.
|
||||
DestroyFunc func()
|
||||
// Maximum size of the watch history cached in memory, in number of entries.
|
||||
// A zero value here means that a default is used. This value is ignored if
|
||||
// Storage is non-nil.
|
||||
WatchCacheSize int
|
||||
// This value is ignored if Storage is non-nil. Nil is replaced with a default value.
|
||||
// A zero integer will disable caching.
|
||||
WatchCacheSize *int
|
||||
}
|
||||
|
||||
// Note: the rest.StandardStorage interface aggregates the common REST verbs
|
||||
@ -1205,14 +1202,10 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
|
||||
}
|
||||
|
||||
if e.Storage == nil {
|
||||
capacity := DefaultWatchCacheSize
|
||||
if e.WatchCacheSize != 0 {
|
||||
capacity = e.WatchCacheSize
|
||||
}
|
||||
e.Storage, e.DestroyFunc = opts.Decorator(
|
||||
e.Copier,
|
||||
opts.StorageConfig,
|
||||
capacity,
|
||||
e.WatchCacheSize,
|
||||
e.NewFunc(),
|
||||
prefix,
|
||||
keyFunc,
|
||||
|
@ -26,10 +26,11 @@ import (
|
||||
|
||||
// StorageDecorator is a function signature for producing a storage.Interface
|
||||
// and an associated DestroyFunc from given parameters.
|
||||
// A zero capacity means to disable caching, nil means to use a default.
|
||||
type StorageDecorator func(
|
||||
copier runtime.ObjectCopier,
|
||||
config *storagebackend.Config,
|
||||
capacity int,
|
||||
capacity *int,
|
||||
objectType runtime.Object,
|
||||
resourcePrefix string,
|
||||
keyFunc func(obj runtime.Object) (string, error),
|
||||
@ -42,7 +43,7 @@ type StorageDecorator func(
|
||||
func UndecoratedStorage(
|
||||
copier runtime.ObjectCopier,
|
||||
config *storagebackend.Config,
|
||||
capacity int,
|
||||
capacity *int,
|
||||
objectType runtime.Object,
|
||||
resourcePrefix string,
|
||||
keyFunc func(obj runtime.Object) (string, error),
|
||||
|
@ -39,7 +39,11 @@ type EtcdOptions struct {
|
||||
DefaultStorageMediaType string
|
||||
DeleteCollectionWorkers int
|
||||
EnableGarbageCollection bool
|
||||
EnableWatchCache bool
|
||||
|
||||
// Set EnableWatchCache to false to disable all watch caches
|
||||
EnableWatchCache bool
|
||||
// Set DefaultWatchCacheSize to zero to disable watch caches for those resources that have no explicit cache size set
|
||||
DefaultWatchCacheSize int
|
||||
}
|
||||
|
||||
func NewEtcdOptions(backendConfig *storagebackend.Config) *EtcdOptions {
|
||||
@ -49,6 +53,7 @@ func NewEtcdOptions(backendConfig *storagebackend.Config) *EtcdOptions {
|
||||
DeleteCollectionWorkers: 1,
|
||||
EnableGarbageCollection: true,
|
||||
EnableWatchCache: true,
|
||||
DefaultWatchCacheSize: 100,
|
||||
}
|
||||
}
|
||||
|
||||
@ -129,7 +134,7 @@ func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource)
|
||||
ResourcePrefix: f.Options.StorageConfig.Prefix + "/" + resource.Group + "/" + resource.Resource,
|
||||
}
|
||||
if f.Options.EnableWatchCache {
|
||||
ret.Decorator = genericregistry.StorageWithCacher
|
||||
ret.Decorator = genericregistry.StorageWithCacher(f.Options.DefaultWatchCacheSize)
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
@ -153,7 +158,7 @@ func (f *storageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupR
|
||||
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
|
||||
}
|
||||
if f.Options.EnableWatchCache {
|
||||
ret.Decorator = genericregistry.StorageWithCacher
|
||||
ret.Decorator = genericregistry.StorageWithCacher(f.Options.DefaultWatchCacheSize)
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
|
@ -38,7 +38,6 @@ func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) *REST
|
||||
NewListFunc: func() runtime.Object { return &apiregistration.APIServiceList{} },
|
||||
PredicateFunc: apiservice.MatchAPIService,
|
||||
QualifiedResource: apiregistration.Resource("apiservices"),
|
||||
WatchCacheSize: 100,
|
||||
|
||||
CreateStrategy: strategy,
|
||||
UpdateStrategy: strategy,
|
||||
|
@ -353,6 +353,7 @@ type CustomResourceRESTOptionsGetter struct {
|
||||
StorageConfig storagebackend.Config
|
||||
StoragePrefix string
|
||||
EnableWatchCache bool
|
||||
DefaultWatchCacheSize int
|
||||
EnableGarbageCollection bool
|
||||
DeleteCollectionWorkers int
|
||||
}
|
||||
@ -366,7 +367,7 @@ func (t CustomResourceRESTOptionsGetter) GetRESTOptions(resource schema.GroupRes
|
||||
ResourcePrefix: t.StoragePrefix + "/" + resource.Group + "/" + resource.Resource,
|
||||
}
|
||||
if t.EnableWatchCache {
|
||||
ret.Decorator = genericregistry.StorageWithCacher
|
||||
ret.Decorator = genericregistry.StorageWithCacher(t.DefaultWatchCacheSize)
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
@ -99,6 +99,7 @@ func (o CustomResourcesServerOptions) Config() (*apiserver.Config, error) {
|
||||
StorageConfig: o.RecommendedOptions.Etcd.StorageConfig,
|
||||
StoragePrefix: o.RecommendedOptions.Etcd.StorageConfig.Prefix,
|
||||
EnableWatchCache: o.RecommendedOptions.Etcd.EnableWatchCache,
|
||||
DefaultWatchCacheSize: o.RecommendedOptions.Etcd.DefaultWatchCacheSize,
|
||||
EnableGarbageCollection: o.RecommendedOptions.Etcd.EnableGarbageCollection,
|
||||
DeleteCollectionWorkers: o.RecommendedOptions.Etcd.DeleteCollectionWorkers,
|
||||
}
|
||||
|
@ -80,6 +80,7 @@ func DefaultServerConfig() (*extensionsapiserver.Config, error) {
|
||||
StorageConfig: options.RecommendedOptions.Etcd.StorageConfig,
|
||||
StoragePrefix: options.RecommendedOptions.Etcd.StorageConfig.Prefix,
|
||||
EnableWatchCache: options.RecommendedOptions.Etcd.EnableWatchCache,
|
||||
DefaultWatchCacheSize: options.RecommendedOptions.Etcd.DefaultWatchCacheSize,
|
||||
EnableGarbageCollection: options.RecommendedOptions.Etcd.EnableGarbageCollection,
|
||||
DeleteCollectionWorkers: options.RecommendedOptions.Etcd.DeleteCollectionWorkers,
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user