From 0d41d2921e2a38e65f18a2cc16de8fee5fbe1074 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Thu, 19 May 2022 22:09:04 +0200 Subject: [PATCH] Fix leaking goroutines in kubelet integration test --- .../util/manager/watch_based_manager.go | 48 +++++++++++++++---- .../integration/kubelet/watch_manager_test.go | 5 +- 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/pkg/kubelet/util/manager/watch_based_manager.go b/pkg/kubelet/util/manager/watch_based_manager.go index 5d3c1dc3fb7..785f01be256 100644 --- a/pkg/kubelet/util/manager/watch_based_manager.go +++ b/pkg/kubelet/util/manager/watch_based_manager.go @@ -164,8 +164,9 @@ type objectCache struct { clock clock.Clock maxIdleTime time.Duration - lock sync.RWMutex - items map[objectKey]*objectCacheItem + lock sync.RWMutex + items map[objectKey]*objectCacheItem + stopped bool } const minIdleTime = 1 * time.Minute @@ -178,7 +179,8 @@ func NewObjectCache( isImmutable isImmutableFunc, groupResource schema.GroupResource, clock clock.Clock, - maxIdleTime time.Duration) Store { + maxIdleTime time.Duration, + stopCh <-chan struct{}) Store { if maxIdleTime < minIdleTime { maxIdleTime = minIdleTime @@ -195,8 +197,8 @@ func NewObjectCache( items: make(map[objectKey]*objectCacheItem), } - // TODO propagate stopCh from the higher level. - go wait.Until(store.startRecycleIdleWatch, time.Minute, wait.NeverStop) + go wait.Until(store.startRecycleIdleWatch, time.Minute, stopCh) + go store.shutdownWhenStopped(stopCh) return store } @@ -210,7 +212,7 @@ func (c *objectCache) newStore() *cacheStore { return &cacheStore{store, sync.Mutex{}, false} } -func (c *objectCache) newReflector(namespace, name string) *objectCacheItem { +func (c *objectCache) newReflectorLocked(namespace, name string) *objectCacheItem { fieldSelector := fields.Set{"metadata.name": name}.AsSelector().String() listFunc := func(options metav1.ListOptions) (runtime.Object, error) { options.FieldSelector = fieldSelector @@ -235,7 +237,11 @@ func (c *objectCache) newReflector(namespace, name string) *objectCacheItem { hasSynced: func() (bool, error) { return store.hasSynced(), nil }, stopCh: make(chan struct{}), } - go item.startReflector() + + // Don't start reflector if Kubelet is already shutting down. + if !c.stopped { + go item.startReflector() + } return item } @@ -251,7 +257,7 @@ func (c *objectCache) AddReference(namespace, name string) { defer c.lock.Unlock() item, exists := c.items[key] if !exists { - item = c.newReflector(namespace, name) + item = c.newReflectorLocked(namespace, name) c.items[key] = item } item.refCount++ @@ -281,6 +287,12 @@ func (c *objectCache) key(namespace, name string) string { return name } +func (c *objectCache) isStopped() bool { + c.lock.RLock() + defer c.lock.RUnlock() + return c.stopped +} + func (c *objectCache) Get(namespace, name string) (runtime.Object, error) { key := objectKey{namespace: namespace, name: name} @@ -295,7 +307,10 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) { // This protects from premature (racy) reflector closure. item.setLastAccessTime(c.clock.Now()) - item.restartReflectorIfNeeded() + // Don't restart reflector if Kubelet is already shutting down. + if !c.isStopped() { + item.restartReflectorIfNeeded() + } if err := wait.PollImmediate(10*time.Millisecond, time.Second, item.hasSynced); err != nil { return nil, fmt.Errorf("failed to sync %s cache: %v", c.groupResource.String(), err) } @@ -339,6 +354,18 @@ func (c *objectCache) startRecycleIdleWatch() { } } +func (c *objectCache) shutdownWhenStopped(stopCh <-chan struct{}) { + <-stopCh + + c.lock.Lock() + defer c.lock.Unlock() + + c.stopped = true + for _, item := range c.items { + item.stop() + } +} + // NewWatchBasedManager creates a manager that keeps a cache of all objects // necessary for registered pods. // It implements the following logic: @@ -360,6 +387,7 @@ func NewWatchBasedManager( // We currently set it to 5 times. maxIdleTime := resyncInterval * 5 - objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource, clock.RealClock{}, maxIdleTime) + // TODO propagate stopCh from the higher level. + objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource, clock.RealClock{}, maxIdleTime, wait.NeverStop) return NewCacheBasedManager(objectStore, getReferencedObjects) } diff --git a/test/integration/kubelet/watch_manager_test.go b/test/integration/kubelet/watch_manager_test.go index 201baa4b304..1c92fb37b9b 100644 --- a/test/integration/kubelet/watch_manager_test.go +++ b/test/integration/kubelet/watch_manager_test.go @@ -63,7 +63,10 @@ func TestWatchBasedManager(t *testing.T) { // So don't treat any secret as immutable here. isImmutable := func(_ runtime.Object) bool { return false } fakeClock := testingclock.NewFakeClock(time.Now()) - store := manager.NewObjectCache(listObj, watchObj, newObj, isImmutable, schema.GroupResource{Group: "v1", Resource: "secrets"}, fakeClock, time.Minute) + + stopCh := make(chan struct{}) + defer close(stopCh) + store := manager.NewObjectCache(listObj, watchObj, newObj, isImmutable, schema.GroupResource{Group: "v1", Resource: "secrets"}, fakeClock, time.Minute, stopCh) // create 1000 secrets in parallel t.Log(time.Now(), "creating 1000 secrets")