apiserver: add callback to get notified of object count

This commit is contained in:
Abu Kashem 2021-07-14 16:44:34 -04:00
parent 2d08fd4f56
commit 2c60feffbe
No known key found for this signature in database
GPG Key ID: 33A4FA7088DB68A9
9 changed files with 369 additions and 43 deletions

View File

@ -76,6 +76,7 @@ import (
genericfilters "k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/storage/storagebackend"
utilfeature "k8s.io/apiserver/pkg/util/feature"
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
utilopenapi "k8s.io/apiserver/pkg/util/openapi"
"k8s.io/apiserver/pkg/util/webhook"
"k8s.io/apiserver/pkg/warning"
@ -1134,23 +1135,25 @@ func (d unstructuredDefaulter) Default(in runtime.Object) {
}
type CRDRESTOptionsGetter struct {
StorageConfig storagebackend.Config
StoragePrefix string
EnableWatchCache bool
DefaultWatchCacheSize int
EnableGarbageCollection bool
DeleteCollectionWorkers int
CountMetricPollPeriod time.Duration
StorageConfig storagebackend.Config
StoragePrefix string
EnableWatchCache bool
DefaultWatchCacheSize int
EnableGarbageCollection bool
DeleteCollectionWorkers int
CountMetricPollPeriod time.Duration
StorageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker
}
func (t CRDRESTOptionsGetter) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
ret := generic.RESTOptions{
StorageConfig: &t.StorageConfig,
Decorator: generic.UndecoratedStorage,
EnableGarbageCollection: t.EnableGarbageCollection,
DeleteCollectionWorkers: t.DeleteCollectionWorkers,
ResourcePrefix: resource.Group + "/" + resource.Resource,
CountMetricPollPeriod: t.CountMetricPollPeriod,
StorageConfig: &t.StorageConfig,
Decorator: generic.UndecoratedStorage,
EnableGarbageCollection: t.EnableGarbageCollection,
DeleteCollectionWorkers: t.DeleteCollectionWorkers,
ResourcePrefix: resource.Group + "/" + resource.Resource,
CountMetricPollPeriod: t.CountMetricPollPeriod,
StorageObjectCountTracker: t.StorageObjectCountTracker,
}
if t.EnableWatchCache {
ret.Decorator = genericregistry.StorageWithCacher()

View File

@ -120,13 +120,14 @@ func (o CustomResourceDefinitionsServerOptions) Config() (*apiserver.Config, err
// NewCRDRESTOptionsGetter create a RESTOptionsGetter for CustomResources.
func NewCRDRESTOptionsGetter(etcdOptions genericoptions.EtcdOptions) genericregistry.RESTOptionsGetter {
ret := apiserver.CRDRESTOptionsGetter{
StorageConfig: etcdOptions.StorageConfig,
StoragePrefix: etcdOptions.StorageConfig.Prefix,
EnableWatchCache: etcdOptions.EnableWatchCache,
DefaultWatchCacheSize: etcdOptions.DefaultWatchCacheSize,
EnableGarbageCollection: etcdOptions.EnableGarbageCollection,
DeleteCollectionWorkers: etcdOptions.DeleteCollectionWorkers,
CountMetricPollPeriod: etcdOptions.StorageConfig.CountMetricPollPeriod,
StorageConfig: etcdOptions.StorageConfig,
StoragePrefix: etcdOptions.StorageConfig.Prefix,
EnableWatchCache: etcdOptions.EnableWatchCache,
DefaultWatchCacheSize: etcdOptions.DefaultWatchCacheSize,
EnableGarbageCollection: etcdOptions.EnableGarbageCollection,
DeleteCollectionWorkers: etcdOptions.DeleteCollectionWorkers,
CountMetricPollPeriod: etcdOptions.StorageConfig.CountMetricPollPeriod,
StorageObjectCountTracker: etcdOptions.StorageConfig.StorageObjectCountTracker,
}
ret.StorageConfig.Codec = unstructured.UnstructuredJSONScheme

View File

@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/client-go/tools/cache"
)
@ -30,10 +31,11 @@ type RESTOptions struct {
StorageConfig *storagebackend.Config
Decorator StorageDecorator
EnableGarbageCollection bool
DeleteCollectionWorkers int
ResourcePrefix string
CountMetricPollPeriod time.Duration
EnableGarbageCollection bool
DeleteCollectionWorkers int
ResourcePrefix string
CountMetricPollPeriod time.Duration
StorageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker
}
// Implement RESTOptionsGetter so that RESTOptions can directly be used when available (i.e. tests)

View File

@ -44,6 +44,7 @@ import (
storeerr "k8s.io/apiserver/pkg/storage/errors"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/util/dryrun"
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/structured-merge-diff/v4/fieldpath"
@ -1413,7 +1414,7 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
e.StorageVersioner = opts.StorageConfig.EncodeVersioner
if opts.CountMetricPollPeriod > 0 {
stopFunc := e.startObservingCount(opts.CountMetricPollPeriod)
stopFunc := e.startObservingCount(opts.CountMetricPollPeriod, opts.StorageObjectCountTracker)
previousDestroy := e.DestroyFunc
e.DestroyFunc = func() {
stopFunc()
@ -1428,7 +1429,7 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
}
// startObservingCount starts monitoring given prefix and periodically updating metrics. It returns a function to stop collection.
func (e *Store) startObservingCount(period time.Duration) func() {
func (e *Store) startObservingCount(period time.Duration, objectCountTracker flowcontrolrequest.StorageObjectCountTracker) func() {
prefix := e.KeyRootFunc(genericapirequest.NewContext())
resourceName := e.DefaultQualifiedResource.String()
klog.V(2).InfoS("Monitoring resource count at path", "resource", resourceName, "path", "<storage-prefix>/"+prefix)
@ -1437,9 +1438,12 @@ func (e *Store) startObservingCount(period time.Duration) func() {
count, err := e.Storage.Count(prefix)
if err != nil {
klog.V(5).InfoS("Failed to update storage count metric", "err", err)
metrics.UpdateObjectCount(resourceName, -1)
} else {
metrics.UpdateObjectCount(resourceName, count)
count = -1
}
metrics.UpdateObjectCount(resourceName, count)
if objectCountTracker != nil {
objectCountTracker.Set(resourceName, count)
}
}, period, resourceCountPollPeriodJitter, true, stopCh)
return func() { close(stopCh) }

View File

@ -227,6 +227,10 @@ type Config struct {
// it's intentionally marked private as it should never be overridden.
lifecycleSignals lifecycleSignals
// StorageObjectCountTracker is used to keep track of the total number of objects
// in the storage per resource, so we can estimate width of incoming requests.
StorageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker
//===========================================================================
// values below here are targets for removal
//===========================================================================
@ -312,6 +316,8 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
if feature.DefaultFeatureGate.Enabled(features.APIServerIdentity) {
id = "kube-apiserver-" + uuid.New().String()
}
lifecycleSignals := newLifecycleSignals()
return &Config{
Serializer: codecs,
BuildHandlerChainFunc: DefaultBuildHandlerChain,
@ -349,8 +355,9 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
// Default to treating watch as a long-running operation
// Generic API servers have no inherent long-running subresources
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
lifecycleSignals: newLifecycleSignals(),
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
lifecycleSignals: lifecycleSignals,
StorageObjectCountTracker: flowcontrolrequest.NewStorageObjectCountTracker(lifecycleSignals.ShutdownInitiated.Signaled()),
APIServerID: id,
StorageVersionManager: storageversion.NewDefaultManager(),

View File

@ -206,6 +206,9 @@ func (s *EtcdOptions) ApplyTo(c *server.Config) error {
}
}
// use the StorageObjectCountTracker interface instance from server.Config
s.StorageConfig.StorageObjectCountTracker = c.StorageObjectCountTracker
c.RESTOptionsGetter = &SimpleRestOptionsFactory{
Options: *s,
TransformerOverrides: transformerOverrides,
@ -217,6 +220,10 @@ func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFac
if err := s.addEtcdHealthEndpoint(c); err != nil {
return err
}
// use the StorageObjectCountTracker interface instance from server.Config
s.StorageConfig.StorageObjectCountTracker = c.StorageObjectCountTracker
c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
return nil
}
@ -248,12 +255,13 @@ type SimpleRestOptionsFactory struct {
func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
ret := generic.RESTOptions{
StorageConfig: &f.Options.StorageConfig,
Decorator: generic.UndecoratedStorage,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
ResourcePrefix: resource.Group + "/" + resource.Resource,
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
StorageConfig: &f.Options.StorageConfig,
Decorator: generic.UndecoratedStorage,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
ResourcePrefix: resource.Group + "/" + resource.Resource,
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
StorageObjectCountTracker: f.Options.StorageConfig.StorageObjectCountTracker,
}
if f.TransformerOverrides != nil {
if transformer, ok := f.TransformerOverrides[resource]; ok {
@ -290,12 +298,13 @@ func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupR
}
ret := generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
StorageObjectCountTracker: f.Options.StorageConfig.StorageObjectCountTracker,
}
if f.Options.EnableWatchCache {
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)

View File

@ -24,6 +24,7 @@ import (
"k8s.io/apiserver/pkg/server/egressselector"
"k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/apiserver/pkg/storage/value"
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
)
const (
@ -84,6 +85,10 @@ type Config struct {
HealthcheckTimeout time.Duration
LeaseManagerConfig etcd3.LeaseManagerConfig
// StorageObjectCountTracker is used to keep track of the total
// number of objects in the storage per resource.
StorageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker
}
func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {

View File

@ -0,0 +1,166 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package request
import (
"errors"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
)
const (
// type deletion (it applies mostly to CRD) is not a very frequent
// operation so we can afford to prune the cache at a large interval.
// at the same time, we also want to make sure that the scalability
// tests hit this code path.
pruneInterval = 1 * time.Hour
// the storage layer polls for object count at every 1m interval, we will allow
// up to 2-3 transient failures to get the latest count for a given resource.
staleTolerationThreshold = 3 * time.Minute
)
var (
// ObjectCountNotFoundErr is returned when the object count for
// a given resource is not being tracked.
ObjectCountNotFoundErr = errors.New("object count not found for the given resource")
// ObjectCountStaleErr is returned when the object count for a
// given resource has gone stale due to transient failures.
ObjectCountStaleErr = errors.New("object count has gone stale for the given resource")
)
// StorageObjectCountTracker is an interface that is used to keep track of
// of the total number of objects for each resource.
// {group}.{resource} is used as the key name to update and retrieve
// the total number of objects for a given resource.
type StorageObjectCountTracker interface {
// Set is invoked to update the current number of total
// objects for the given resource
Set(string, int64)
// Get returns the total number of objects for the given resource.
// The following errors are returned:
// - if the count has gone stale for a given resource due to transient
// failures ObjectCountStaleErr is returned.
// - if the given resource is not being tracked then
// ObjectCountNotFoundErr is returned.
Get(string) (int64, error)
}
// NewStorageObjectCountTracker returns an instance of
// StorageObjectCountTracker interface that can be used to
// keep track of the total number of objects for each resource.
func NewStorageObjectCountTracker(stopCh <-chan struct{}) StorageObjectCountTracker {
tracker := &objectCountTracker{
clock: &clock.RealClock{},
counts: map[string]*timestampedCount{},
}
go func() {
wait.PollUntil(
pruneInterval,
func() (bool, error) {
// always prune at every pruneInterval
return false, tracker.prune(pruneInterval)
}, stopCh)
klog.InfoS("StorageObjectCountTracker pruner is exiting")
}()
return tracker
}
// timestampedCount stores the count of a given resource with a last updated
// timestamp so we can prune it after it goes stale for certain threshold.
type timestampedCount struct {
count int64
lastUpdatedAt time.Time
}
// objectCountTracker implements StorageObjectCountTracker with
// reader/writer mutual exclusion lock.
type objectCountTracker struct {
clock clock.PassiveClock
lock sync.RWMutex
counts map[string]*timestampedCount
}
func (t *objectCountTracker) Set(groupResource string, count int64) {
if count <= -1 {
// a value of -1 indicates that the 'Count' call failed to contact
// the storage layer, in most cases this error can be transient.
// we will continue to work with the count that is in the cache
// up to a certain threshold defined by staleTolerationThreshold.
// in case this becomes a non transient error then the count for
// the given resource will will eventually be removed from
// the cache by the pruner.
return
}
now := t.clock.Now()
// lock for writing
t.lock.Lock()
defer t.lock.Unlock()
if item, ok := t.counts[groupResource]; ok {
item.count = count
item.lastUpdatedAt = now
return
}
t.counts[groupResource] = &timestampedCount{
count: count,
lastUpdatedAt: now,
}
}
func (t *objectCountTracker) Get(groupResource string) (int64, error) {
staleThreshold := t.clock.Now().Add(-staleTolerationThreshold)
t.lock.RLock()
defer t.lock.RUnlock()
if item, ok := t.counts[groupResource]; ok {
if item.lastUpdatedAt.Before(staleThreshold) {
return item.count, ObjectCountStaleErr
}
return item.count, nil
}
return 0, ObjectCountNotFoundErr
}
func (t *objectCountTracker) prune(threshold time.Duration) error {
oldestLastUpdatedAtAllowed := t.clock.Now().Add(-threshold)
// lock for writing
t.lock.Lock()
defer t.lock.Unlock()
for groupResource, count := range t.counts {
if count.lastUpdatedAt.After(oldestLastUpdatedAtAllowed) {
continue
}
delete(t.counts, groupResource)
}
return nil
}

View File

@ -0,0 +1,129 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package request
import (
"reflect"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"k8s.io/apimachinery/pkg/util/clock"
)
func TestStorageObjectCountTracker(t *testing.T) {
tests := []struct {
name string
lastUpdated time.Duration
count int64
errExpected error
countExpected int64
}{
{
name: "object count not tracked for given resource",
count: -2,
errExpected: ObjectCountNotFoundErr,
},
{
name: "transient failure",
count: -1,
errExpected: ObjectCountNotFoundErr,
},
{
name: "object count is zero",
count: 0,
countExpected: 0,
errExpected: nil,
},
{
name: "object count is more than zero",
count: 799,
countExpected: 799,
errExpected: nil,
},
{
name: "object count stale",
count: 799,
countExpected: 799,
lastUpdated: staleTolerationThreshold + time.Millisecond,
errExpected: ObjectCountStaleErr,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeClock := &clock.FakePassiveClock{}
tracker := &objectCountTracker{
clock: fakeClock,
counts: map[string]*timestampedCount{},
}
key := "foo.bar.resource"
now := time.Now()
fakeClock.SetTime(now.Add(-test.lastUpdated))
tracker.Set(key, test.count)
fakeClock.SetTime(now)
countGot, err := tracker.Get(key)
if test.errExpected != err {
t.Errorf("Expected error: %v, but got: %v", test.errExpected, err)
}
if test.countExpected != countGot {
t.Errorf("Expected count: %d, but got: %d", test.countExpected, countGot)
}
if test.count <= -1 && len(tracker.counts) > 0 {
t.Errorf("Expected the cache to be empty, but got: %d", len(tracker.counts))
}
})
}
}
func TestStorageObjectCountTrackerWithPrune(t *testing.T) {
fakeClock := &clock.FakePassiveClock{}
tracker := &objectCountTracker{
clock: fakeClock,
counts: map[string]*timestampedCount{},
}
now := time.Now()
fakeClock.SetTime(now.Add(-61 * time.Minute))
tracker.Set("k1", 61)
fakeClock.SetTime(now.Add(-60 * time.Minute))
tracker.Set("k2", 60)
// we are going to prune keys that are stale for >= 1h
// so the above keys are expected to be pruned and the
// key below should not be pruned.
mostRecent := now.Add(-59 * time.Minute)
fakeClock.SetTime(mostRecent)
tracker.Set("k3", 59)
expected := map[string]*timestampedCount{
"k3": {
count: 59,
lastUpdatedAt: mostRecent,
},
}
fakeClock.SetTime(now)
if err := tracker.prune(time.Hour); err != nil {
t.Fatalf("Expected no error, but got: %v", err)
}
// we expect only one entry in the map, so DeepEqual should work.
if !reflect.DeepEqual(expected, tracker.counts) {
t.Errorf("Expected prune to remove stale entries - diff: %s", cmp.Diff(expected, tracker.counts))
}
}