Fix leaking goroutines in kubelet integration test

This commit is contained in:
Wojciech Tyczyński 2022-05-19 22:09:04 +02:00
parent 32773d61c4
commit 0d41d2921e
2 changed files with 42 additions and 11 deletions

View File

@ -164,8 +164,9 @@ type objectCache struct {
clock clock.Clock
maxIdleTime time.Duration
lock sync.RWMutex
items map[objectKey]*objectCacheItem
lock sync.RWMutex
items map[objectKey]*objectCacheItem
stopped bool
}
const minIdleTime = 1 * time.Minute
@ -178,7 +179,8 @@ func NewObjectCache(
isImmutable isImmutableFunc,
groupResource schema.GroupResource,
clock clock.Clock,
maxIdleTime time.Duration) Store {
maxIdleTime time.Duration,
stopCh <-chan struct{}) Store {
if maxIdleTime < minIdleTime {
maxIdleTime = minIdleTime
@ -195,8 +197,8 @@ func NewObjectCache(
items: make(map[objectKey]*objectCacheItem),
}
// TODO propagate stopCh from the higher level.
go wait.Until(store.startRecycleIdleWatch, time.Minute, wait.NeverStop)
go wait.Until(store.startRecycleIdleWatch, time.Minute, stopCh)
go store.shutdownWhenStopped(stopCh)
return store
}
@ -210,7 +212,7 @@ func (c *objectCache) newStore() *cacheStore {
return &cacheStore{store, sync.Mutex{}, false}
}
func (c *objectCache) newReflector(namespace, name string) *objectCacheItem {
func (c *objectCache) newReflectorLocked(namespace, name string) *objectCacheItem {
fieldSelector := fields.Set{"metadata.name": name}.AsSelector().String()
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fieldSelector
@ -235,7 +237,11 @@ func (c *objectCache) newReflector(namespace, name string) *objectCacheItem {
hasSynced: func() (bool, error) { return store.hasSynced(), nil },
stopCh: make(chan struct{}),
}
go item.startReflector()
// Don't start reflector if Kubelet is already shutting down.
if !c.stopped {
go item.startReflector()
}
return item
}
@ -251,7 +257,7 @@ func (c *objectCache) AddReference(namespace, name string) {
defer c.lock.Unlock()
item, exists := c.items[key]
if !exists {
item = c.newReflector(namespace, name)
item = c.newReflectorLocked(namespace, name)
c.items[key] = item
}
item.refCount++
@ -281,6 +287,12 @@ func (c *objectCache) key(namespace, name string) string {
return name
}
func (c *objectCache) isStopped() bool {
c.lock.RLock()
defer c.lock.RUnlock()
return c.stopped
}
func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
key := objectKey{namespace: namespace, name: name}
@ -295,7 +307,10 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
// This protects from premature (racy) reflector closure.
item.setLastAccessTime(c.clock.Now())
item.restartReflectorIfNeeded()
// Don't restart reflector if Kubelet is already shutting down.
if !c.isStopped() {
item.restartReflectorIfNeeded()
}
if err := wait.PollImmediate(10*time.Millisecond, time.Second, item.hasSynced); err != nil {
return nil, fmt.Errorf("failed to sync %s cache: %v", c.groupResource.String(), err)
}
@ -339,6 +354,18 @@ func (c *objectCache) startRecycleIdleWatch() {
}
}
func (c *objectCache) shutdownWhenStopped(stopCh <-chan struct{}) {
<-stopCh
c.lock.Lock()
defer c.lock.Unlock()
c.stopped = true
for _, item := range c.items {
item.stop()
}
}
// NewWatchBasedManager creates a manager that keeps a cache of all objects
// necessary for registered pods.
// It implements the following logic:
@ -360,6 +387,7 @@ func NewWatchBasedManager(
// We currently set it to 5 times.
maxIdleTime := resyncInterval * 5
objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource, clock.RealClock{}, maxIdleTime)
// TODO propagate stopCh from the higher level.
objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource, clock.RealClock{}, maxIdleTime, wait.NeverStop)
return NewCacheBasedManager(objectStore, getReferencedObjects)
}

View File

@ -63,7 +63,10 @@ func TestWatchBasedManager(t *testing.T) {
// So don't treat any secret as immutable here.
isImmutable := func(_ runtime.Object) bool { return false }
fakeClock := testingclock.NewFakeClock(time.Now())
store := manager.NewObjectCache(listObj, watchObj, newObj, isImmutable, schema.GroupResource{Group: "v1", Resource: "secrets"}, fakeClock, time.Minute)
stopCh := make(chan struct{})
defer close(stopCh)
store := manager.NewObjectCache(listObj, watchObj, newObj, isImmutable, schema.GroupResource{Group: "v1", Resource: "secrets"}, fakeClock, time.Minute, stopCh)
// create 1000 secrets in parallel
t.Log(time.Now(), "creating 1000 secrets")