diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 580f5eeff73..be27cdfd551 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -370,7 +370,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config { // Generic API servers have no inherent long-running subresources LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()), lifecycleSignals: lifecycleSignals, - StorageObjectCountTracker: flowcontrolrequest.NewStorageObjectCountTracker(lifecycleSignals.ShutdownInitiated.Signaled()), + StorageObjectCountTracker: flowcontrolrequest.NewStorageObjectCountTracker(), APIServerID: id, StorageVersionManager: storageversion.NewDefaultManager(), @@ -754,6 +754,19 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G } } + // Add PostStartHook for maintenaing the object count tracker. + if c.StorageObjectCountTracker != nil { + const storageObjectCountTrackerHookName = "storage-object-count-tracker-hook" + if !s.isPostStartHookRegistered(storageObjectCountTrackerHookName) { + if err := s.AddPostStartHook(storageObjectCountTrackerHookName, func(context PostStartHookContext) error { + go c.StorageObjectCountTracker.RunUntil(context.StopCh) + return nil + }); err != nil { + return nil, err + } + } + } + for _, delegateCheck := range delegationTarget.HealthzChecks() { skip := false for _, existingCheck := range c.HealthzChecks { diff --git a/staging/src/k8s.io/apiserver/pkg/server/config_test.go b/staging/src/k8s.io/apiserver/pkg/server/config_test.go index b2a106e48f5..4f01f74284a 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config_test.go @@ -156,6 +156,7 @@ func TestNewWithDelegate(t *testing.T) { "/healthz/poststarthook/delegate-post-start-hook", "/healthz/poststarthook/generic-apiserver-start-informers", "/healthz/poststarthook/max-in-flight-filter", + "/healthz/poststarthook/storage-object-count-tracker-hook", "/healthz/poststarthook/wrapping-post-start-hook", "/healthz/wrapping-health", "/livez", @@ -165,6 +166,7 @@ func TestNewWithDelegate(t *testing.T) { "/livez/poststarthook/delegate-post-start-hook", "/livez/poststarthook/generic-apiserver-start-informers", "/livez/poststarthook/max-in-flight-filter", + "/livez/poststarthook/storage-object-count-tracker-hook", "/livez/poststarthook/wrapping-post-start-hook", "/metrics", "/readyz", @@ -175,6 +177,7 @@ func TestNewWithDelegate(t *testing.T) { "/readyz/poststarthook/delegate-post-start-hook", "/readyz/poststarthook/generic-apiserver-start-informers", "/readyz/poststarthook/max-in-flight-filter", + "/readyz/poststarthook/storage-object-count-tracker-hook", "/readyz/poststarthook/wrapping-post-start-hook", "/readyz/shutdown", } @@ -203,6 +206,7 @@ func TestNewWithDelegate(t *testing.T) { [-]delegate-health failed: reason withheld [+]poststarthook/generic-apiserver-start-informers ok [+]poststarthook/max-in-flight-filter ok +[+]poststarthook/storage-object-count-tracker-hook ok [+]poststarthook/delegate-post-start-hook ok [+]poststarthook/wrapping-post-start-hook ok healthz check failed diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/object_count_tracker.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/object_count_tracker.go index dd1d5e570c3..62a5e4f2d4b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/object_count_tracker.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/object_count_tracker.go @@ -64,27 +64,19 @@ type StorageObjectCountTracker interface { // - if the given resource is not being tracked then // ObjectCountNotFoundErr is returned. Get(string) (int64, error) + + // RunUntil starts all the necessary maintenance. + RunUntil(stopCh <-chan struct{}) } // NewStorageObjectCountTracker returns an instance of // StorageObjectCountTracker interface that can be used to // keep track of the total number of objects for each resource. -func NewStorageObjectCountTracker(stopCh <-chan struct{}) StorageObjectCountTracker { - tracker := &objectCountTracker{ +func NewStorageObjectCountTracker() StorageObjectCountTracker { + return &objectCountTracker{ clock: &clock.RealClock{}, counts: map[string]*timestampedCount{}, } - go func() { - wait.PollUntil( - pruneInterval, - func() (bool, error) { - // always prune at every pruneInterval - return false, tracker.prune(pruneInterval) - }, stopCh) - klog.InfoS("StorageObjectCountTracker pruner is exiting") - }() - - return tracker } // timestampedCount stores the count of a given resource with a last updated @@ -148,6 +140,17 @@ func (t *objectCountTracker) Get(groupResource string) (int64, error) { return 0, ObjectCountNotFoundErr } +// RunUntil runs all the necessary maintenance. +func (t *objectCountTracker) RunUntil(stopCh <-chan struct{}) { + wait.PollUntil( + pruneInterval, + func() (bool, error) { + // always prune at every pruneInterval + return false, t.prune(pruneInterval) + }, stopCh) + klog.InfoS("StorageObjectCountTracker pruner is exiting") +} + func (t *objectCountTracker) prune(threshold time.Duration) error { oldestLastUpdatedAtAllowed := t.clock.Now().Add(-threshold)