mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
Merge pull request #110062 from wojtek-t/fix_storage_object_count_tracker_registration
Avoid leaking StorageObjectCountTracker goroutine
This commit is contained in:
commit
45844049fc
@ -370,7 +370,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
|
|||||||
// Generic API servers have no inherent long-running subresources
|
// Generic API servers have no inherent long-running subresources
|
||||||
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
|
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
|
||||||
lifecycleSignals: lifecycleSignals,
|
lifecycleSignals: lifecycleSignals,
|
||||||
StorageObjectCountTracker: flowcontrolrequest.NewStorageObjectCountTracker(lifecycleSignals.ShutdownInitiated.Signaled()),
|
StorageObjectCountTracker: flowcontrolrequest.NewStorageObjectCountTracker(),
|
||||||
|
|
||||||
APIServerID: id,
|
APIServerID: id,
|
||||||
StorageVersionManager: storageversion.NewDefaultManager(),
|
StorageVersionManager: storageversion.NewDefaultManager(),
|
||||||
@ -754,6 +754,19 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add PostStartHook for maintenaing the object count tracker.
|
||||||
|
if c.StorageObjectCountTracker != nil {
|
||||||
|
const storageObjectCountTrackerHookName = "storage-object-count-tracker-hook"
|
||||||
|
if !s.isPostStartHookRegistered(storageObjectCountTrackerHookName) {
|
||||||
|
if err := s.AddPostStartHook(storageObjectCountTrackerHookName, func(context PostStartHookContext) error {
|
||||||
|
go c.StorageObjectCountTracker.RunUntil(context.StopCh)
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for _, delegateCheck := range delegationTarget.HealthzChecks() {
|
for _, delegateCheck := range delegationTarget.HealthzChecks() {
|
||||||
skip := false
|
skip := false
|
||||||
for _, existingCheck := range c.HealthzChecks {
|
for _, existingCheck := range c.HealthzChecks {
|
||||||
|
@ -156,6 +156,7 @@ func TestNewWithDelegate(t *testing.T) {
|
|||||||
"/healthz/poststarthook/delegate-post-start-hook",
|
"/healthz/poststarthook/delegate-post-start-hook",
|
||||||
"/healthz/poststarthook/generic-apiserver-start-informers",
|
"/healthz/poststarthook/generic-apiserver-start-informers",
|
||||||
"/healthz/poststarthook/max-in-flight-filter",
|
"/healthz/poststarthook/max-in-flight-filter",
|
||||||
|
"/healthz/poststarthook/storage-object-count-tracker-hook",
|
||||||
"/healthz/poststarthook/wrapping-post-start-hook",
|
"/healthz/poststarthook/wrapping-post-start-hook",
|
||||||
"/healthz/wrapping-health",
|
"/healthz/wrapping-health",
|
||||||
"/livez",
|
"/livez",
|
||||||
@ -165,6 +166,7 @@ func TestNewWithDelegate(t *testing.T) {
|
|||||||
"/livez/poststarthook/delegate-post-start-hook",
|
"/livez/poststarthook/delegate-post-start-hook",
|
||||||
"/livez/poststarthook/generic-apiserver-start-informers",
|
"/livez/poststarthook/generic-apiserver-start-informers",
|
||||||
"/livez/poststarthook/max-in-flight-filter",
|
"/livez/poststarthook/max-in-flight-filter",
|
||||||
|
"/livez/poststarthook/storage-object-count-tracker-hook",
|
||||||
"/livez/poststarthook/wrapping-post-start-hook",
|
"/livez/poststarthook/wrapping-post-start-hook",
|
||||||
"/metrics",
|
"/metrics",
|
||||||
"/readyz",
|
"/readyz",
|
||||||
@ -175,6 +177,7 @@ func TestNewWithDelegate(t *testing.T) {
|
|||||||
"/readyz/poststarthook/delegate-post-start-hook",
|
"/readyz/poststarthook/delegate-post-start-hook",
|
||||||
"/readyz/poststarthook/generic-apiserver-start-informers",
|
"/readyz/poststarthook/generic-apiserver-start-informers",
|
||||||
"/readyz/poststarthook/max-in-flight-filter",
|
"/readyz/poststarthook/max-in-flight-filter",
|
||||||
|
"/readyz/poststarthook/storage-object-count-tracker-hook",
|
||||||
"/readyz/poststarthook/wrapping-post-start-hook",
|
"/readyz/poststarthook/wrapping-post-start-hook",
|
||||||
"/readyz/shutdown",
|
"/readyz/shutdown",
|
||||||
}
|
}
|
||||||
@ -203,6 +206,7 @@ func TestNewWithDelegate(t *testing.T) {
|
|||||||
[-]delegate-health failed: reason withheld
|
[-]delegate-health failed: reason withheld
|
||||||
[+]poststarthook/generic-apiserver-start-informers ok
|
[+]poststarthook/generic-apiserver-start-informers ok
|
||||||
[+]poststarthook/max-in-flight-filter ok
|
[+]poststarthook/max-in-flight-filter ok
|
||||||
|
[+]poststarthook/storage-object-count-tracker-hook ok
|
||||||
[+]poststarthook/delegate-post-start-hook ok
|
[+]poststarthook/delegate-post-start-hook ok
|
||||||
[+]poststarthook/wrapping-post-start-hook ok
|
[+]poststarthook/wrapping-post-start-hook ok
|
||||||
healthz check failed
|
healthz check failed
|
||||||
|
@ -64,27 +64,19 @@ type StorageObjectCountTracker interface {
|
|||||||
// - if the given resource is not being tracked then
|
// - if the given resource is not being tracked then
|
||||||
// ObjectCountNotFoundErr is returned.
|
// ObjectCountNotFoundErr is returned.
|
||||||
Get(string) (int64, error)
|
Get(string) (int64, error)
|
||||||
|
|
||||||
|
// RunUntil starts all the necessary maintenance.
|
||||||
|
RunUntil(stopCh <-chan struct{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStorageObjectCountTracker returns an instance of
|
// NewStorageObjectCountTracker returns an instance of
|
||||||
// StorageObjectCountTracker interface that can be used to
|
// StorageObjectCountTracker interface that can be used to
|
||||||
// keep track of the total number of objects for each resource.
|
// keep track of the total number of objects for each resource.
|
||||||
func NewStorageObjectCountTracker(stopCh <-chan struct{}) StorageObjectCountTracker {
|
func NewStorageObjectCountTracker() StorageObjectCountTracker {
|
||||||
tracker := &objectCountTracker{
|
return &objectCountTracker{
|
||||||
clock: &clock.RealClock{},
|
clock: &clock.RealClock{},
|
||||||
counts: map[string]*timestampedCount{},
|
counts: map[string]*timestampedCount{},
|
||||||
}
|
}
|
||||||
go func() {
|
|
||||||
wait.PollUntil(
|
|
||||||
pruneInterval,
|
|
||||||
func() (bool, error) {
|
|
||||||
// always prune at every pruneInterval
|
|
||||||
return false, tracker.prune(pruneInterval)
|
|
||||||
}, stopCh)
|
|
||||||
klog.InfoS("StorageObjectCountTracker pruner is exiting")
|
|
||||||
}()
|
|
||||||
|
|
||||||
return tracker
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// timestampedCount stores the count of a given resource with a last updated
|
// timestampedCount stores the count of a given resource with a last updated
|
||||||
@ -148,6 +140,17 @@ func (t *objectCountTracker) Get(groupResource string) (int64, error) {
|
|||||||
return 0, ObjectCountNotFoundErr
|
return 0, ObjectCountNotFoundErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RunUntil runs all the necessary maintenance.
|
||||||
|
func (t *objectCountTracker) RunUntil(stopCh <-chan struct{}) {
|
||||||
|
wait.PollUntil(
|
||||||
|
pruneInterval,
|
||||||
|
func() (bool, error) {
|
||||||
|
// always prune at every pruneInterval
|
||||||
|
return false, t.prune(pruneInterval)
|
||||||
|
}, stopCh)
|
||||||
|
klog.InfoS("StorageObjectCountTracker pruner is exiting")
|
||||||
|
}
|
||||||
|
|
||||||
func (t *objectCountTracker) prune(threshold time.Duration) error {
|
func (t *objectCountTracker) prune(threshold time.Duration) error {
|
||||||
oldestLastUpdatedAtAllowed := t.clock.Now().Add(-threshold)
|
oldestLastUpdatedAtAllowed := t.clock.Now().Add(-threshold)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user