Merge pull request #99393 from chenyw1990/reduceKubeletResourceWatch

reduce configmap and secret watch of kubelet
This commit is contained in:
Kubernetes Prow Robot 2021-03-08 03:59:44 -08:00 committed by GitHub
commit a517eccd9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 244 additions and 38 deletions

View File

@ -135,7 +135,7 @@ func NewCachingConfigMapManager(kubeClient clientset.Interface, getTTL manager.G
// - whenever a pod is created or updated, we start individual watches for all // - whenever a pod is created or updated, we start individual watches for all
// referenced objects that aren't referenced from other registered pods // referenced objects that aren't referenced from other registered pods
// - every GetObject() returns a value from local cache propagated via watches // - every GetObject() returns a value from local cache propagated via watches
func NewWatchingConfigMapManager(kubeClient clientset.Interface) Manager { func NewWatchingConfigMapManager(kubeClient clientset.Interface, resyncInterval time.Duration) Manager {
listConfigMap := func(namespace string, opts metav1.ListOptions) (runtime.Object, error) { listConfigMap := func(namespace string, opts metav1.ListOptions) (runtime.Object, error) {
return kubeClient.CoreV1().ConfigMaps(namespace).List(context.TODO(), opts) return kubeClient.CoreV1().ConfigMaps(namespace).List(context.TODO(), opts)
} }
@ -153,6 +153,6 @@ func NewWatchingConfigMapManager(kubeClient clientset.Interface) Manager {
} }
gr := corev1.Resource("configmap") gr := corev1.Resource("configmap")
return &configMapManager{ return &configMapManager{
manager: manager.NewWatchBasedManager(listConfigMap, watchConfigMap, newConfigMap, isImmutable, gr, getConfigMapNames), manager: manager.NewWatchBasedManager(listConfigMap, watchConfigMap, newConfigMap, isImmutable, gr, resyncInterval, getConfigMapNames),
} }
} }

View File

@ -558,8 +558,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
var configMapManager configmap.Manager var configMapManager configmap.Manager
switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy { switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {
case kubeletconfiginternal.WatchChangeDetectionStrategy: case kubeletconfiginternal.WatchChangeDetectionStrategy:
secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient) secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient, klet.resyncInterval)
configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient) configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient, klet.resyncInterval)
case kubeletconfiginternal.TTLCacheChangeDetectionStrategy: case kubeletconfiginternal.TTLCacheChangeDetectionStrategy:
secretManager = secret.NewCachingSecretManager( secretManager = secret.NewCachingSecretManager(
kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode)) kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))

View File

@ -136,7 +136,7 @@ func NewCachingSecretManager(kubeClient clientset.Interface, getTTL manager.GetO
// - whenever a pod is created or updated, we start individual watches for all // - whenever a pod is created or updated, we start individual watches for all
// referenced objects that aren't referenced from other registered pods // referenced objects that aren't referenced from other registered pods
// - every GetObject() returns a value from local cache propagated via watches // - every GetObject() returns a value from local cache propagated via watches
func NewWatchingSecretManager(kubeClient clientset.Interface) Manager { func NewWatchingSecretManager(kubeClient clientset.Interface, resyncInterval time.Duration) Manager {
listSecret := func(namespace string, opts metav1.ListOptions) (runtime.Object, error) { listSecret := func(namespace string, opts metav1.ListOptions) (runtime.Object, error) {
return kubeClient.CoreV1().Secrets(namespace).List(context.TODO(), opts) return kubeClient.CoreV1().Secrets(namespace).List(context.TODO(), opts)
} }
@ -154,6 +154,6 @@ func NewWatchingSecretManager(kubeClient clientset.Interface) Manager {
} }
gr := corev1.Resource("secret") gr := corev1.Resource("secret")
return &secretManager{ return &secretManager{
manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, isImmutable, gr, getSecretNames), manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, isImmutable, gr, resyncInterval, getSecretNames),
} }
} }

View File

@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
@ -44,25 +45,108 @@ type isImmutableFunc func(runtime.Object) bool
// objectCacheItem is a single item stored in objectCache. // objectCacheItem is a single item stored in objectCache.
type objectCacheItem struct { type objectCacheItem struct {
refCount int refCount int
store cache.Store store *cacheStore
reflector *cache.Reflector
hasSynced func() (bool, error) hasSynced func() (bool, error)
// lock is protecting from closing stopCh multiple times. // waitGroup is used to ensure that there won't be two concurrent calls to reflector.Run
lock sync.Mutex waitGroup sync.WaitGroup
stopCh chan struct{}
// lock is to ensure the access and modify of lastAccessTime, stopped, and immutable are thread safety,
// and protecting from closing stopCh multiple times.
lock sync.Mutex
lastAccessTime time.Time
stopped bool
immutable bool
stopCh chan struct{}
} }
func (i *objectCacheItem) stop() bool { func (i *objectCacheItem) stop() bool {
i.lock.Lock() i.lock.Lock()
defer i.lock.Unlock() defer i.lock.Unlock()
select { return i.stopThreadUnsafe()
case <-i.stopCh: }
// This means that channel is already closed.
func (i *objectCacheItem) stopThreadUnsafe() bool {
if i.stopped {
return false return false
default:
close(i.stopCh)
return true
} }
i.stopped = true
close(i.stopCh)
if !i.immutable {
i.store.unsetInitialized()
}
return true
}
func (i *objectCacheItem) setLastAccessTime(time time.Time) {
i.lock.Lock()
defer i.lock.Unlock()
i.lastAccessTime = time
}
func (i *objectCacheItem) setImmutable() {
i.lock.Lock()
defer i.lock.Unlock()
i.immutable = true
}
func (i *objectCacheItem) stopIfIdle(now time.Time, maxIdleTime time.Duration) bool {
i.lock.Lock()
defer i.lock.Unlock()
if !i.stopped && now.After(i.lastAccessTime.Add(maxIdleTime)) {
return i.stopThreadUnsafe()
}
return false
}
func (i *objectCacheItem) restartReflectorIfNeeded() {
i.lock.Lock()
defer i.lock.Unlock()
if i.immutable || !i.stopped {
return
}
i.stopCh = make(chan struct{})
i.stopped = false
go i.startReflector()
}
func (i *objectCacheItem) startReflector() {
i.waitGroup.Wait()
i.waitGroup.Add(1)
defer i.waitGroup.Done()
i.reflector.Run(i.stopCh)
}
// cacheStore is in order to rewrite Replace function to mark initialized flag
type cacheStore struct {
cache.Store
lock sync.Mutex
initialized bool
}
func (c *cacheStore) Replace(list []interface{}, resourceVersion string) error {
c.lock.Lock()
defer c.lock.Unlock()
err := c.Store.Replace(list, resourceVersion)
if err != nil {
return err
}
c.initialized = true
return nil
}
func (c *cacheStore) hasSynced() bool {
c.lock.Lock()
defer c.lock.Unlock()
return c.initialized
}
func (c *cacheStore) unsetInitialized() {
c.lock.Lock()
defer c.lock.Unlock()
c.initialized = false
} }
// objectCache is a local cache of objects propagated via // objectCache is a local cache of objects propagated via
@ -73,35 +157,53 @@ type objectCache struct {
newObject newObjectFunc newObject newObjectFunc
isImmutable isImmutableFunc isImmutable isImmutableFunc
groupResource schema.GroupResource groupResource schema.GroupResource
clock clock.Clock
maxIdleTime time.Duration
lock sync.RWMutex lock sync.RWMutex
items map[objectKey]*objectCacheItem items map[objectKey]*objectCacheItem
} }
const minIdleTime = 1 * time.Minute
// NewObjectCache returns a new watch-based instance of Store interface. // NewObjectCache returns a new watch-based instance of Store interface.
func NewObjectCache( func NewObjectCache(
listObject listObjectFunc, listObject listObjectFunc,
watchObject watchObjectFunc, watchObject watchObjectFunc,
newObject newObjectFunc, newObject newObjectFunc,
isImmutable isImmutableFunc, isImmutable isImmutableFunc,
groupResource schema.GroupResource) Store { groupResource schema.GroupResource,
return &objectCache{ clock clock.Clock,
maxIdleTime time.Duration) Store {
if maxIdleTime < minIdleTime {
maxIdleTime = minIdleTime
}
store := &objectCache{
listObject: listObject, listObject: listObject,
watchObject: watchObject, watchObject: watchObject,
newObject: newObject, newObject: newObject,
isImmutable: isImmutable, isImmutable: isImmutable,
groupResource: groupResource, groupResource: groupResource,
clock: clock,
maxIdleTime: maxIdleTime,
items: make(map[objectKey]*objectCacheItem), items: make(map[objectKey]*objectCacheItem),
} }
// TODO propagate stopCh from the higher level.
go wait.Until(store.startRecycleIdleWatch, time.Minute, wait.NeverStop)
return store
} }
func (c *objectCache) newStore() cache.Store { func (c *objectCache) newStore() *cacheStore {
// TODO: We may consider created a dedicated store keeping just a single // TODO: We may consider created a dedicated store keeping just a single
// item, instead of using a generic store implementation for this purpose. // item, instead of using a generic store implementation for this purpose.
// However, simple benchmarks show that memory overhead in that case is // However, simple benchmarks show that memory overhead in that case is
// decrease from ~600B to ~300B per object. So we are not optimizing it // decrease from ~600B to ~300B per object. So we are not optimizing it
// until we will see a good reason for that. // until we will see a good reason for that.
return cache.NewStore(cache.MetaNamespaceKeyFunc) store := cache.NewStore(cache.MetaNamespaceKeyFunc)
return &cacheStore{store, sync.Mutex{}, false}
} }
func (c *objectCache) newReflector(namespace, name string) *objectCacheItem { func (c *objectCache) newReflector(namespace, name string) *objectCacheItem {
@ -122,14 +224,15 @@ func (c *objectCache) newReflector(namespace, name string) *objectCacheItem {
store, store,
0, 0,
) )
stopCh := make(chan struct{}) item := &objectCacheItem{
go reflector.Run(stopCh)
return &objectCacheItem{
refCount: 0, refCount: 0,
store: store, store: store,
hasSynced: func() (bool, error) { return reflector.LastSyncResourceVersion() != "", nil }, reflector: reflector,
stopCh: stopCh, hasSynced: func() (bool, error) { return store.hasSynced(), nil },
stopCh: make(chan struct{}),
} }
go item.startReflector()
return item
} }
func (c *objectCache) AddReference(namespace, name string) { func (c *objectCache) AddReference(namespace, name string) {
@ -184,10 +287,11 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
if !exists { if !exists {
return nil, fmt.Errorf("object %q/%q not registered", namespace, name) return nil, fmt.Errorf("object %q/%q not registered", namespace, name)
} }
item.restartReflectorIfNeeded()
if err := wait.PollImmediate(10*time.Millisecond, time.Second, item.hasSynced); err != nil { if err := wait.PollImmediate(10*time.Millisecond, time.Second, item.hasSynced); err != nil {
return nil, fmt.Errorf("failed to sync %s cache: %v", c.groupResource.String(), err) return nil, fmt.Errorf("failed to sync %s cache: %v", c.groupResource.String(), err)
} }
item.setLastAccessTime(c.clock.Now())
obj, exists, err := item.store.GetByKey(c.key(namespace, name)) obj, exists, err := item.store.GetByKey(c.key(namespace, name))
if err != nil { if err != nil {
return nil, err return nil, err
@ -207,6 +311,7 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
// - doing that would require significant refactoring to reflector // - doing that would require significant refactoring to reflector
// we limit ourselves to just quickly stop the reflector here. // we limit ourselves to just quickly stop the reflector here.
if c.isImmutable(object) { if c.isImmutable(object) {
item.setImmutable()
if item.stop() { if item.stop() {
klog.V(4).Infof("Stopped watching for changes of %q/%q - object is immutable", namespace, name) klog.V(4).Infof("Stopped watching for changes of %q/%q - object is immutable", namespace, name)
} }
@ -216,6 +321,17 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
return nil, fmt.Errorf("unexpected object type: %v", obj) return nil, fmt.Errorf("unexpected object type: %v", obj)
} }
func (c *objectCache) startRecycleIdleWatch() {
c.lock.Lock()
defer c.lock.Unlock()
for key, item := range c.items {
if item.stopIfIdle(c.clock.Now(), c.maxIdleTime) {
klog.V(4).InfoS("Not acquired for long time, Stopped watching for changes", "objectKey", key, "maxIdleTime", c.maxIdleTime)
}
}
}
// NewWatchBasedManager creates a manager that keeps a cache of all objects // NewWatchBasedManager creates a manager that keeps a cache of all objects
// necessary for registered pods. // necessary for registered pods.
// It implements the following logic: // It implements the following logic:
@ -228,7 +344,15 @@ func NewWatchBasedManager(
newObject newObjectFunc, newObject newObjectFunc,
isImmutable isImmutableFunc, isImmutable isImmutableFunc,
groupResource schema.GroupResource, groupResource schema.GroupResource,
resyncInterval time.Duration,
getReferencedObjects func(*v1.Pod) sets.String) Manager { getReferencedObjects func(*v1.Pod) sets.String) Manager {
objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource)
// If a configmap/secret is used as a volume, the volumeManager will visit the objectCacheItem every resyncInterval cycle,
// We just want to stop the objectCacheItem referenced by environment variables,
// So, maxIdleTime is set to an integer multiple of resyncInterval,
// We currently set it to 5 times.
maxIdleTime := resyncInterval * 5
objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource, clock.RealClock{}, maxIdleTime)
return NewCacheBasedManager(objectStore, getReferencedObjects) return NewCacheBasedManager(objectStore, getReferencedObjects)
} }

View File

@ -28,6 +28,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
@ -59,13 +60,15 @@ func isSecretImmutable(object runtime.Object) bool {
return false return false
} }
func newSecretCache(fakeClient clientset.Interface) *objectCache { func newSecretCache(fakeClient clientset.Interface, fakeClock clock.Clock, maxIdleTime time.Duration) *objectCache {
return &objectCache{ return &objectCache{
listObject: listSecret(fakeClient), listObject: listSecret(fakeClient),
watchObject: watchSecret(fakeClient), watchObject: watchSecret(fakeClient),
newObject: func() runtime.Object { return &v1.Secret{} }, newObject: func() runtime.Object { return &v1.Secret{} },
isImmutable: isSecretImmutable, isImmutable: isSecretImmutable,
groupResource: corev1.Resource("secret"), groupResource: corev1.Resource("secret"),
clock: fakeClock,
maxIdleTime: maxIdleTime,
items: make(map[objectKey]*objectCacheItem), items: make(map[objectKey]*objectCacheItem),
} }
} }
@ -85,7 +88,8 @@ func TestSecretCache(t *testing.T) {
fakeWatch := watch.NewFake() fakeWatch := watch.NewFake()
fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil)) fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
store := newSecretCache(fakeClient) fakeClock := clock.NewFakeClock(time.Now())
store := newSecretCache(fakeClient, fakeClock, time.Minute)
store.AddReference("ns", "name") store.AddReference("ns", "name")
_, err := store.Get("ns", "name") _, err := store.Get("ns", "name")
@ -154,7 +158,8 @@ func TestSecretCacheMultipleRegistrations(t *testing.T) {
fakeWatch := watch.NewFake() fakeWatch := watch.NewFake()
fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil)) fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
store := newSecretCache(fakeClient) fakeClock := clock.NewFakeClock(time.Now())
store := newSecretCache(fakeClient, fakeClock, time.Minute)
store.AddReference("ns", "name") store.AddReference("ns", "name")
// This should trigger List and Watch actions eventually. // This should trigger List and Watch actions eventually.
@ -259,7 +264,8 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) {
fakeWatch := watch.NewFake() fakeWatch := watch.NewFake()
fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil)) fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
store := newSecretCache(fakeClient) fakeClock := clock.NewFakeClock(time.Now())
store := newSecretCache(fakeClient, fakeClock, time.Minute)
key := objectKey{namespace: "ns", name: "name"} key := objectKey{namespace: "ns", name: "name"}
itemExists := func() (bool, error) { itemExists := func() (bool, error) {
@ -275,12 +281,7 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) {
item.lock.Lock() item.lock.Lock()
defer item.lock.Unlock() defer item.lock.Unlock()
select { return !item.stopped
case <-item.stopCh:
return false
default:
return true
}
} }
// AddReference should start reflector. // AddReference should start reflector.
@ -325,3 +326,82 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) {
}) })
} }
} }
func TestMaxIdleTimeStopsTheReflector(t *testing.T) {
secret := &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
Namespace: "ns",
ResourceVersion: "200",
},
}
fakeClient := &fake.Clientset{}
listReactor := func(a core.Action) (bool, runtime.Object, error) {
result := &v1.SecretList{
ListMeta: metav1.ListMeta{
ResourceVersion: "200",
},
Items: []v1.Secret{*secret},
}
return true, result, nil
}
fakeClient.AddReactor("list", "secrets", listReactor)
fakeWatch := watch.NewFake()
fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
fakeClock := clock.NewFakeClock(time.Now())
store := newSecretCache(fakeClient, fakeClock, time.Minute)
key := objectKey{namespace: "ns", name: "name"}
itemExists := func() (bool, error) {
store.lock.Lock()
defer store.lock.Unlock()
_, ok := store.items[key]
return ok, nil
}
reflectorRunning := func() bool {
store.lock.Lock()
defer store.lock.Unlock()
item := store.items[key]
item.lock.Lock()
defer item.lock.Unlock()
return !item.stopped
}
// AddReference should start reflector.
store.AddReference("ns", "name")
if err := wait.Poll(10*time.Millisecond, 10*time.Second, itemExists); err != nil {
t.Errorf("item wasn't added to cache")
}
obj, _ := store.Get("ns", "name")
assert.True(t, apiequality.Semantic.DeepEqual(secret, obj))
assert.True(t, reflectorRunning())
fakeClock.Step(90 * time.Second)
store.startRecycleIdleWatch()
// Reflector should already be stopped for maxIdleTime exceeded.
assert.False(t, reflectorRunning())
obj, _ = store.Get("ns", "name")
assert.True(t, apiequality.Semantic.DeepEqual(secret, obj))
// Reflector should reRun after get secret again.
assert.True(t, reflectorRunning())
fakeClock.Step(20 * time.Second)
_, _ = store.Get("ns", "name")
fakeClock.Step(20 * time.Second)
_, _ = store.Get("ns", "name")
fakeClock.Step(20 * time.Second)
_, _ = store.Get("ns", "name")
store.startRecycleIdleWatch()
// Reflector should be running when the get function is called periodically.
assert.True(t, reflectorRunning())
}

View File

@ -27,6 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
@ -61,7 +62,8 @@ func TestWatchBasedManager(t *testing.T) {
// We want all watches to be up and running to stress test it. // We want all watches to be up and running to stress test it.
// So don't treat any secret as immutable here. // So don't treat any secret as immutable here.
isImmutable := func(_ runtime.Object) bool { return false } isImmutable := func(_ runtime.Object) bool { return false }
store := manager.NewObjectCache(listObj, watchObj, newObj, isImmutable, schema.GroupResource{Group: "v1", Resource: "secrets"}) fakeClock := clock.NewFakeClock(time.Now())
store := manager.NewObjectCache(listObj, watchObj, newObj, isImmutable, schema.GroupResource{Group: "v1", Resource: "secrets"}, fakeClock, time.Minute)
// create 1000 secrets in parallel // create 1000 secrets in parallel
t.Log(time.Now(), "creating 1000 secrets") t.Log(time.Now(), "creating 1000 secrets")