diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index c4bbe15119e..89a9877db46 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -230,6 +230,10 @@ type Config struct { // it's intentionally marked private as it should never be overridden. lifecycleSignals lifecycleSignals + // StorageObjectCountTracker is used to keep track of the total number of objects + // in the storage per resource, so we can estimate width of incoming requests. + StorageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker + //=========================================================================== // values below here are targets for removal //=========================================================================== @@ -352,9 +356,10 @@ func NewConfig(codecs serializer.CodecFactory) *Config { // Default to treating watch as a long-running operation // Generic API servers have no inherent long-running subresources - LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()), - RequestWidthEstimator: flowcontrolrequest.DefaultWidthEstimator, - lifecycleSignals: newLifecycleSignals(), + LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()), + RequestWidthEstimator: flowcontrolrequest.DefaultWidthEstimator, + lifecycleSignals: newLifecycleSignals(), + StorageObjectCountTracker: flowcontrolrequest.NewStorageObjectCountTracker(), APIServerID: id, StorageVersionManager: storageversion.NewDefaultManager(), diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go index d8b45b8198f..91f136779f4 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go @@ -206,6 +206,11 @@ func (s *EtcdOptions) ApplyTo(c *server.Config) error { } } + // StorageConfig should use the storage object count tracker provided by server Config + if c.StorageObjectCountTracker != nil { + s.StorageConfig.ObjectCountTracker = c.StorageObjectCountTracker.OnCount + } + c.RESTOptionsGetter = &SimpleRestOptionsFactory{ Options: *s, TransformerOverrides: transformerOverrides, @@ -217,6 +222,12 @@ func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFac if err := s.addEtcdHealthEndpoint(c); err != nil { return err } + + // StorageConfig should use the storage object count tracker provided by server Config + if c.StorageObjectCountTracker != nil { + s.StorageConfig.ObjectCountTracker = c.StorageObjectCountTracker.OnCount + } + c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory} return nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/etcd_object_count_tracker.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/etcd_object_count_tracker.go new file mode 100644 index 00000000000..e83994d7b81 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/etcd_object_count_tracker.go @@ -0,0 +1,62 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd3 + +import ( + "k8s.io/apiserver/pkg/storage" + "k8s.io/klog/v2" +) + +// ObjectCountTrackerFunc is used as a callback to get notified when +// we have a up to date value of the total number of objects in the +// storage for a given resource. +// {group}.{resource} combination is used as the unique key. +type ObjectCountTrackerFunc func(groupResource string, count int64) + +func (f ObjectCountTrackerFunc) OnCount(groupResource string, count int64) { + f(groupResource, count) +} + +// WithObjectCountTracker takes the given storage.Interface and wraps it so +// we can get notified when Count is invoked. +func WithObjectCountTracker(delegate storage.Interface, callback ObjectCountTrackerFunc) storage.Interface { + return &objectCountTracker{ + Interface: delegate, + callback: callback, + } +} + +type objectCountTracker struct { + // embed because we only want to decorate Count + storage.Interface + callback ObjectCountTrackerFunc +} + +func (s *objectCountTracker) Count(key string) (int64, error) { + count, err := s.Interface.Count(key) + if s.callback == nil { + return count, err + } + + if err != nil { + klog.ErrorS(err, "Storage object OnCount callback not invoked", "key", key) + return count, err + } + + s.callback.OnCount(key, count) + return count, err +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go index 45f0d6f4a70..56970788092 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go @@ -81,6 +81,8 @@ type Config struct { HealthcheckTimeout time.Duration LeaseManagerConfig etcd3.LeaseManagerConfig + + ObjectCountTracker etcd3.ObjectCountTrackerFunc } func NewDefaultConfig(prefix string, codec runtime.Codec) *Config { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index 2d2643496f2..f762e94f3fc 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -259,7 +259,12 @@ func newETCD3Storage(c storagebackend.Config, newFunc func() runtime.Object) (st if transformer == nil { transformer = value.IdentityTransformer } - return etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil + + store := etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging, c.LeaseManagerConfig) + if c.ObjectCountTracker != nil { + store = etcd3.WithObjectCountTracker(store, c.ObjectCountTracker) + } + return store, destroyFunc, nil } // startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/object_count_tracker.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/object_count_tracker.go new file mode 100644 index 00000000000..893527d0f39 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/object_count_tracker.go @@ -0,0 +1,67 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package request + +import ( + "sync" +) + +// StorageObjectCountTracker is an interface that is used to keep track of +// of the total number of objects for each resource. +// {group}.{resource} is used as the key name to update and retrieve +// the total number of objects for a given resource. +type StorageObjectCountTracker interface { + // OnCount is invoked to update the current number of total + // objects for the given resource + OnCount(string, int64) + + // Get returns the total number of objects for the given resource. + // If the given resource is not being tracked Get will return zero. + // For now, we do not differentiate between zero object count and + // a given resoure not being present. + Get(string) int64 +} + +// NewStorageObjectCountTracker returns an instance of +// StorageObjectCountTracker interface that can be used to +// keep track of the total number of objects for each resource. +func NewStorageObjectCountTracker() StorageObjectCountTracker { + return &objectCountTracker{ + counts: map[string]int64{}, + } +} + +// objectCountTracker implements StorageObjectCountTracker with +// reader/writer mutual exclusion lock. +type objectCountTracker struct { + lock sync.RWMutex + counts map[string]int64 +} + +func (t *objectCountTracker) OnCount(key string, count int64) { + t.lock.Lock() + defer t.lock.Unlock() + + t.counts[key] = count +} + +func (t *objectCountTracker) Get(key string) int64 { + t.lock.RLock() + defer t.lock.RUnlock() + + return t.counts[key] +}