mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 13:37:30 +00:00
Merge pull request #102998 from tkashem/apf-width-list
apiserver: add callback to get notified of object count
This commit is contained in:
commit
6d11f22fde
@ -230,6 +230,10 @@ type Config struct {
|
|||||||
// it's intentionally marked private as it should never be overridden.
|
// it's intentionally marked private as it should never be overridden.
|
||||||
lifecycleSignals lifecycleSignals
|
lifecycleSignals lifecycleSignals
|
||||||
|
|
||||||
|
// StorageObjectCountTracker is used to keep track of the total number of objects
|
||||||
|
// in the storage per resource, so we can estimate width of incoming requests.
|
||||||
|
StorageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker
|
||||||
|
|
||||||
//===========================================================================
|
//===========================================================================
|
||||||
// values below here are targets for removal
|
// values below here are targets for removal
|
||||||
//===========================================================================
|
//===========================================================================
|
||||||
@ -355,6 +359,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
|
|||||||
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
|
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
|
||||||
RequestWidthEstimator: flowcontrolrequest.DefaultWidthEstimator,
|
RequestWidthEstimator: flowcontrolrequest.DefaultWidthEstimator,
|
||||||
lifecycleSignals: newLifecycleSignals(),
|
lifecycleSignals: newLifecycleSignals(),
|
||||||
|
StorageObjectCountTracker: flowcontrolrequest.NewStorageObjectCountTracker(),
|
||||||
|
|
||||||
APIServerID: id,
|
APIServerID: id,
|
||||||
StorageVersionManager: storageversion.NewDefaultManager(),
|
StorageVersionManager: storageversion.NewDefaultManager(),
|
||||||
|
@ -206,6 +206,11 @@ func (s *EtcdOptions) ApplyTo(c *server.Config) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StorageConfig should use the storage object count tracker provided by server Config
|
||||||
|
if c.StorageObjectCountTracker != nil {
|
||||||
|
s.StorageConfig.ObjectCountTracker = c.StorageObjectCountTracker.OnCount
|
||||||
|
}
|
||||||
|
|
||||||
c.RESTOptionsGetter = &SimpleRestOptionsFactory{
|
c.RESTOptionsGetter = &SimpleRestOptionsFactory{
|
||||||
Options: *s,
|
Options: *s,
|
||||||
TransformerOverrides: transformerOverrides,
|
TransformerOverrides: transformerOverrides,
|
||||||
@ -217,6 +222,12 @@ func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFac
|
|||||||
if err := s.addEtcdHealthEndpoint(c); err != nil {
|
if err := s.addEtcdHealthEndpoint(c); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StorageConfig should use the storage object count tracker provided by server Config
|
||||||
|
if c.StorageObjectCountTracker != nil {
|
||||||
|
s.StorageConfig.ObjectCountTracker = c.StorageObjectCountTracker.OnCount
|
||||||
|
}
|
||||||
|
|
||||||
c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
|
c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,62 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2021 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package etcd3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"k8s.io/apiserver/pkg/storage"
|
||||||
|
"k8s.io/klog/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ObjectCountTrackerFunc is used as a callback to get notified when
|
||||||
|
// we have a up to date value of the total number of objects in the
|
||||||
|
// storage for a given resource.
|
||||||
|
// {group}.{resource} combination is used as the unique key.
|
||||||
|
type ObjectCountTrackerFunc func(groupResource string, count int64)
|
||||||
|
|
||||||
|
func (f ObjectCountTrackerFunc) OnCount(groupResource string, count int64) {
|
||||||
|
f(groupResource, count)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithObjectCountTracker takes the given storage.Interface and wraps it so
|
||||||
|
// we can get notified when Count is invoked.
|
||||||
|
func WithObjectCountTracker(delegate storage.Interface, callback ObjectCountTrackerFunc) storage.Interface {
|
||||||
|
return &objectCountTracker{
|
||||||
|
Interface: delegate,
|
||||||
|
callback: callback,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type objectCountTracker struct {
|
||||||
|
// embed because we only want to decorate Count
|
||||||
|
storage.Interface
|
||||||
|
callback ObjectCountTrackerFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *objectCountTracker) Count(key string) (int64, error) {
|
||||||
|
count, err := s.Interface.Count(key)
|
||||||
|
if s.callback == nil {
|
||||||
|
return count, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
klog.ErrorS(err, "Storage object OnCount callback not invoked", "key", key)
|
||||||
|
return count, err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.callback.OnCount(key, count)
|
||||||
|
return count, err
|
||||||
|
}
|
@ -81,6 +81,8 @@ type Config struct {
|
|||||||
HealthcheckTimeout time.Duration
|
HealthcheckTimeout time.Duration
|
||||||
|
|
||||||
LeaseManagerConfig etcd3.LeaseManagerConfig
|
LeaseManagerConfig etcd3.LeaseManagerConfig
|
||||||
|
|
||||||
|
ObjectCountTracker etcd3.ObjectCountTrackerFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
|
func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
|
||||||
|
@ -259,7 +259,12 @@ func newETCD3Storage(c storagebackend.Config, newFunc func() runtime.Object) (st
|
|||||||
if transformer == nil {
|
if transformer == nil {
|
||||||
transformer = value.IdentityTransformer
|
transformer = value.IdentityTransformer
|
||||||
}
|
}
|
||||||
return etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil
|
|
||||||
|
store := etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging, c.LeaseManagerConfig)
|
||||||
|
if c.ObjectCountTracker != nil {
|
||||||
|
store = etcd3.WithObjectCountTracker(store, c.ObjectCountTracker)
|
||||||
|
}
|
||||||
|
return store, destroyFunc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the
|
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the
|
||||||
|
@ -0,0 +1,67 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2021 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package request
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// StorageObjectCountTracker is an interface that is used to keep track of
|
||||||
|
// of the total number of objects for each resource.
|
||||||
|
// {group}.{resource} is used as the key name to update and retrieve
|
||||||
|
// the total number of objects for a given resource.
|
||||||
|
type StorageObjectCountTracker interface {
|
||||||
|
// OnCount is invoked to update the current number of total
|
||||||
|
// objects for the given resource
|
||||||
|
OnCount(string, int64)
|
||||||
|
|
||||||
|
// Get returns the total number of objects for the given resource.
|
||||||
|
// If the given resource is not being tracked Get will return zero.
|
||||||
|
// For now, we do not differentiate between zero object count and
|
||||||
|
// a given resoure not being present.
|
||||||
|
Get(string) int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewStorageObjectCountTracker returns an instance of
|
||||||
|
// StorageObjectCountTracker interface that can be used to
|
||||||
|
// keep track of the total number of objects for each resource.
|
||||||
|
func NewStorageObjectCountTracker() StorageObjectCountTracker {
|
||||||
|
return &objectCountTracker{
|
||||||
|
counts: map[string]int64{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// objectCountTracker implements StorageObjectCountTracker with
|
||||||
|
// reader/writer mutual exclusion lock.
|
||||||
|
type objectCountTracker struct {
|
||||||
|
lock sync.RWMutex
|
||||||
|
counts map[string]int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *objectCountTracker) OnCount(key string, count int64) {
|
||||||
|
t.lock.Lock()
|
||||||
|
defer t.lock.Unlock()
|
||||||
|
|
||||||
|
t.counts[key] = count
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *objectCountTracker) Get(key string) int64 {
|
||||||
|
t.lock.RLock()
|
||||||
|
defer t.lock.RUnlock()
|
||||||
|
|
||||||
|
return t.counts[key]
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user