From 57a3b0abd678c66a9a04e553b6d6ae49671a4779 Mon Sep 17 00:00:00 2001 From: chenyw1990 Date: Wed, 24 Feb 2021 16:48:17 +0800 Subject: [PATCH] reduce configmap and secret watch of kubelet --- pkg/kubelet/configmap/configmap_manager.go | 4 +- pkg/kubelet/kubelet.go | 4 +- pkg/kubelet/secret/secret_manager.go | 4 +- .../util/manager/watch_based_manager.go | 166 +++++++++++++++--- .../util/manager/watch_based_manager_test.go | 100 +++++++++-- .../integration/kubelet/watch_manager_test.go | 4 +- 6 files changed, 244 insertions(+), 38 deletions(-) diff --git a/pkg/kubelet/configmap/configmap_manager.go b/pkg/kubelet/configmap/configmap_manager.go index 2f525357df5..5466dd8b321 100644 --- a/pkg/kubelet/configmap/configmap_manager.go +++ b/pkg/kubelet/configmap/configmap_manager.go @@ -135,7 +135,7 @@ func NewCachingConfigMapManager(kubeClient clientset.Interface, getTTL manager.G // - whenever a pod is created or updated, we start individual watches for all // referenced objects that aren't referenced from other registered pods // - every GetObject() returns a value from local cache propagated via watches -func NewWatchingConfigMapManager(kubeClient clientset.Interface) Manager { +func NewWatchingConfigMapManager(kubeClient clientset.Interface, resyncInterval time.Duration) Manager { listConfigMap := func(namespace string, opts metav1.ListOptions) (runtime.Object, error) { return kubeClient.CoreV1().ConfigMaps(namespace).List(context.TODO(), opts) } @@ -153,6 +153,6 @@ func NewWatchingConfigMapManager(kubeClient clientset.Interface) Manager { } gr := corev1.Resource("configmap") return &configMapManager{ - manager: manager.NewWatchBasedManager(listConfigMap, watchConfigMap, newConfigMap, isImmutable, gr, getConfigMapNames), + manager: manager.NewWatchBasedManager(listConfigMap, watchConfigMap, newConfigMap, isImmutable, gr, resyncInterval, getConfigMapNames), } } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 7ecfbc9a5b7..32719e0ef3d 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -558,8 +558,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, var configMapManager configmap.Manager switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy { case kubeletconfiginternal.WatchChangeDetectionStrategy: - secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient) - configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient) + secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient, klet.resyncInterval) + configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient, klet.resyncInterval) case kubeletconfiginternal.TTLCacheChangeDetectionStrategy: secretManager = secret.NewCachingSecretManager( kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode)) diff --git a/pkg/kubelet/secret/secret_manager.go b/pkg/kubelet/secret/secret_manager.go index d78bbdf0229..0e7485fc4ab 100644 --- a/pkg/kubelet/secret/secret_manager.go +++ b/pkg/kubelet/secret/secret_manager.go @@ -136,7 +136,7 @@ func NewCachingSecretManager(kubeClient clientset.Interface, getTTL manager.GetO // - whenever a pod is created or updated, we start individual watches for all // referenced objects that aren't referenced from other registered pods // - every GetObject() returns a value from local cache propagated via watches -func NewWatchingSecretManager(kubeClient clientset.Interface) Manager { +func NewWatchingSecretManager(kubeClient clientset.Interface, resyncInterval time.Duration) Manager { listSecret := func(namespace string, opts metav1.ListOptions) (runtime.Object, error) { return kubeClient.CoreV1().Secrets(namespace).List(context.TODO(), opts) } @@ -154,6 +154,6 @@ func NewWatchingSecretManager(kubeClient clientset.Interface) Manager { } gr := corev1.Resource("secret") return &secretManager{ - manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, isImmutable, gr, getSecretNames), + manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, isImmutable, gr, resyncInterval, getSecretNames), } } diff --git a/pkg/kubelet/util/manager/watch_based_manager.go b/pkg/kubelet/util/manager/watch_based_manager.go index 30f25b405bc..fa4642fe1e9 100644 --- a/pkg/kubelet/util/manager/watch_based_manager.go +++ b/pkg/kubelet/util/manager/watch_based_manager.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" @@ -44,25 +45,108 @@ type isImmutableFunc func(runtime.Object) bool // objectCacheItem is a single item stored in objectCache. type objectCacheItem struct { refCount int - store cache.Store + store *cacheStore + reflector *cache.Reflector + hasSynced func() (bool, error) - // lock is protecting from closing stopCh multiple times. - lock sync.Mutex - stopCh chan struct{} + // waitGroup is used to ensure that there won't be two concurrent calls to reflector.Run + waitGroup sync.WaitGroup + + // lock is to ensure the access and modify of lastAccessTime, stopped, and immutable are thread safety, + // and protecting from closing stopCh multiple times. + lock sync.Mutex + lastAccessTime time.Time + stopped bool + immutable bool + stopCh chan struct{} } func (i *objectCacheItem) stop() bool { i.lock.Lock() defer i.lock.Unlock() - select { - case <-i.stopCh: - // This means that channel is already closed. + return i.stopThreadUnsafe() +} + +func (i *objectCacheItem) stopThreadUnsafe() bool { + if i.stopped { return false - default: - close(i.stopCh) - return true } + i.stopped = true + close(i.stopCh) + if !i.immutable { + i.store.unsetInitialized() + } + return true +} + +func (i *objectCacheItem) setLastAccessTime(time time.Time) { + i.lock.Lock() + defer i.lock.Unlock() + i.lastAccessTime = time +} + +func (i *objectCacheItem) setImmutable() { + i.lock.Lock() + defer i.lock.Unlock() + i.immutable = true +} + +func (i *objectCacheItem) stopIfIdle(now time.Time, maxIdleTime time.Duration) bool { + i.lock.Lock() + defer i.lock.Unlock() + if !i.stopped && now.After(i.lastAccessTime.Add(maxIdleTime)) { + return i.stopThreadUnsafe() + } + return false +} + +func (i *objectCacheItem) restartReflectorIfNeeded() { + i.lock.Lock() + defer i.lock.Unlock() + if i.immutable || !i.stopped { + return + } + i.stopCh = make(chan struct{}) + i.stopped = false + go i.startReflector() +} + +func (i *objectCacheItem) startReflector() { + i.waitGroup.Wait() + i.waitGroup.Add(1) + defer i.waitGroup.Done() + i.reflector.Run(i.stopCh) +} + +// cacheStore is in order to rewrite Replace function to mark initialized flag +type cacheStore struct { + cache.Store + lock sync.Mutex + initialized bool +} + +func (c *cacheStore) Replace(list []interface{}, resourceVersion string) error { + c.lock.Lock() + defer c.lock.Unlock() + err := c.Store.Replace(list, resourceVersion) + if err != nil { + return err + } + c.initialized = true + return nil +} + +func (c *cacheStore) hasSynced() bool { + c.lock.Lock() + defer c.lock.Unlock() + return c.initialized +} + +func (c *cacheStore) unsetInitialized() { + c.lock.Lock() + defer c.lock.Unlock() + c.initialized = false } // objectCache is a local cache of objects propagated via @@ -73,35 +157,53 @@ type objectCache struct { newObject newObjectFunc isImmutable isImmutableFunc groupResource schema.GroupResource + clock clock.Clock + maxIdleTime time.Duration lock sync.RWMutex items map[objectKey]*objectCacheItem } +const minIdleTime = 1 * time.Minute + // NewObjectCache returns a new watch-based instance of Store interface. func NewObjectCache( listObject listObjectFunc, watchObject watchObjectFunc, newObject newObjectFunc, isImmutable isImmutableFunc, - groupResource schema.GroupResource) Store { - return &objectCache{ + groupResource schema.GroupResource, + clock clock.Clock, + maxIdleTime time.Duration) Store { + + if maxIdleTime < minIdleTime { + maxIdleTime = minIdleTime + } + + store := &objectCache{ listObject: listObject, watchObject: watchObject, newObject: newObject, isImmutable: isImmutable, groupResource: groupResource, + clock: clock, + maxIdleTime: maxIdleTime, items: make(map[objectKey]*objectCacheItem), } + + // TODO propagate stopCh from the higher level. + go wait.Until(store.startRecycleIdleWatch, time.Minute, wait.NeverStop) + return store } -func (c *objectCache) newStore() cache.Store { +func (c *objectCache) newStore() *cacheStore { // TODO: We may consider created a dedicated store keeping just a single // item, instead of using a generic store implementation for this purpose. // However, simple benchmarks show that memory overhead in that case is // decrease from ~600B to ~300B per object. So we are not optimizing it // until we will see a good reason for that. - return cache.NewStore(cache.MetaNamespaceKeyFunc) + store := cache.NewStore(cache.MetaNamespaceKeyFunc) + return &cacheStore{store, sync.Mutex{}, false} } func (c *objectCache) newReflector(namespace, name string) *objectCacheItem { @@ -122,14 +224,15 @@ func (c *objectCache) newReflector(namespace, name string) *objectCacheItem { store, 0, ) - stopCh := make(chan struct{}) - go reflector.Run(stopCh) - return &objectCacheItem{ + item := &objectCacheItem{ refCount: 0, store: store, - hasSynced: func() (bool, error) { return reflector.LastSyncResourceVersion() != "", nil }, - stopCh: stopCh, + reflector: reflector, + hasSynced: func() (bool, error) { return store.hasSynced(), nil }, + stopCh: make(chan struct{}), } + go item.startReflector() + return item } func (c *objectCache) AddReference(namespace, name string) { @@ -184,10 +287,11 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) { if !exists { return nil, fmt.Errorf("object %q/%q not registered", namespace, name) } + item.restartReflectorIfNeeded() if err := wait.PollImmediate(10*time.Millisecond, time.Second, item.hasSynced); err != nil { return nil, fmt.Errorf("failed to sync %s cache: %v", c.groupResource.String(), err) } - + item.setLastAccessTime(c.clock.Now()) obj, exists, err := item.store.GetByKey(c.key(namespace, name)) if err != nil { return nil, err @@ -207,6 +311,7 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) { // - doing that would require significant refactoring to reflector // we limit ourselves to just quickly stop the reflector here. if c.isImmutable(object) { + item.setImmutable() if item.stop() { klog.V(4).Infof("Stopped watching for changes of %q/%q - object is immutable", namespace, name) } @@ -216,6 +321,17 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) { return nil, fmt.Errorf("unexpected object type: %v", obj) } +func (c *objectCache) startRecycleIdleWatch() { + c.lock.Lock() + defer c.lock.Unlock() + + for key, item := range c.items { + if item.stopIfIdle(c.clock.Now(), c.maxIdleTime) { + klog.V(4).InfoS("Not acquired for long time, Stopped watching for changes", "objectKey", key, "maxIdleTime", c.maxIdleTime) + } + } +} + // NewWatchBasedManager creates a manager that keeps a cache of all objects // necessary for registered pods. // It implements the following logic: @@ -228,7 +344,15 @@ func NewWatchBasedManager( newObject newObjectFunc, isImmutable isImmutableFunc, groupResource schema.GroupResource, + resyncInterval time.Duration, getReferencedObjects func(*v1.Pod) sets.String) Manager { - objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource) + + // If a configmap/secret is used as a volume, the volumeManager will visit the objectCacheItem every resyncInterval cycle, + // We just want to stop the objectCacheItem referenced by environment variables, + // So, maxIdleTime is set to an integer multiple of resyncInterval, + // We currently set it to 5 times. + maxIdleTime := resyncInterval * 5 + + objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource, clock.RealClock{}, maxIdleTime) return NewCacheBasedManager(objectStore, getReferencedObjects) } diff --git a/pkg/kubelet/util/manager/watch_based_manager_test.go b/pkg/kubelet/util/manager/watch_based_manager_test.go index d81c4901106..415df658608 100644 --- a/pkg/kubelet/util/manager/watch_based_manager_test.go +++ b/pkg/kubelet/util/manager/watch_based_manager_test.go @@ -28,6 +28,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" @@ -59,13 +60,15 @@ func isSecretImmutable(object runtime.Object) bool { return false } -func newSecretCache(fakeClient clientset.Interface) *objectCache { +func newSecretCache(fakeClient clientset.Interface, fakeClock clock.Clock, maxIdleTime time.Duration) *objectCache { return &objectCache{ listObject: listSecret(fakeClient), watchObject: watchSecret(fakeClient), newObject: func() runtime.Object { return &v1.Secret{} }, isImmutable: isSecretImmutable, groupResource: corev1.Resource("secret"), + clock: fakeClock, + maxIdleTime: maxIdleTime, items: make(map[objectKey]*objectCacheItem), } } @@ -85,7 +88,8 @@ func TestSecretCache(t *testing.T) { fakeWatch := watch.NewFake() fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil)) - store := newSecretCache(fakeClient) + fakeClock := clock.NewFakeClock(time.Now()) + store := newSecretCache(fakeClient, fakeClock, time.Minute) store.AddReference("ns", "name") _, err := store.Get("ns", "name") @@ -154,7 +158,8 @@ func TestSecretCacheMultipleRegistrations(t *testing.T) { fakeWatch := watch.NewFake() fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil)) - store := newSecretCache(fakeClient) + fakeClock := clock.NewFakeClock(time.Now()) + store := newSecretCache(fakeClient, fakeClock, time.Minute) store.AddReference("ns", "name") // This should trigger List and Watch actions eventually. @@ -259,7 +264,8 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) { fakeWatch := watch.NewFake() fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil)) - store := newSecretCache(fakeClient) + fakeClock := clock.NewFakeClock(time.Now()) + store := newSecretCache(fakeClient, fakeClock, time.Minute) key := objectKey{namespace: "ns", name: "name"} itemExists := func() (bool, error) { @@ -275,12 +281,7 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) { item.lock.Lock() defer item.lock.Unlock() - select { - case <-item.stopCh: - return false - default: - return true - } + return !item.stopped } // AddReference should start reflector. @@ -325,3 +326,82 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) { }) } } + +func TestMaxIdleTimeStopsTheReflector(t *testing.T) { + secret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "ns", + ResourceVersion: "200", + }, + } + + fakeClient := &fake.Clientset{} + listReactor := func(a core.Action) (bool, runtime.Object, error) { + result := &v1.SecretList{ + ListMeta: metav1.ListMeta{ + ResourceVersion: "200", + }, + Items: []v1.Secret{*secret}, + } + + return true, result, nil + } + + fakeClient.AddReactor("list", "secrets", listReactor) + fakeWatch := watch.NewFake() + fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil)) + fakeClock := clock.NewFakeClock(time.Now()) + store := newSecretCache(fakeClient, fakeClock, time.Minute) + + key := objectKey{namespace: "ns", name: "name"} + itemExists := func() (bool, error) { + store.lock.Lock() + defer store.lock.Unlock() + _, ok := store.items[key] + return ok, nil + } + + reflectorRunning := func() bool { + store.lock.Lock() + defer store.lock.Unlock() + item := store.items[key] + + item.lock.Lock() + defer item.lock.Unlock() + return !item.stopped + } + + // AddReference should start reflector. + store.AddReference("ns", "name") + if err := wait.Poll(10*time.Millisecond, 10*time.Second, itemExists); err != nil { + t.Errorf("item wasn't added to cache") + } + + obj, _ := store.Get("ns", "name") + assert.True(t, apiequality.Semantic.DeepEqual(secret, obj)) + + assert.True(t, reflectorRunning()) + + fakeClock.Step(90 * time.Second) + store.startRecycleIdleWatch() + + // Reflector should already be stopped for maxIdleTime exceeded. + assert.False(t, reflectorRunning()) + + obj, _ = store.Get("ns", "name") + assert.True(t, apiequality.Semantic.DeepEqual(secret, obj)) + // Reflector should reRun after get secret again. + assert.True(t, reflectorRunning()) + + fakeClock.Step(20 * time.Second) + _, _ = store.Get("ns", "name") + fakeClock.Step(20 * time.Second) + _, _ = store.Get("ns", "name") + fakeClock.Step(20 * time.Second) + _, _ = store.Get("ns", "name") + store.startRecycleIdleWatch() + + // Reflector should be running when the get function is called periodically. + assert.True(t, reflectorRunning()) +} diff --git a/test/integration/kubelet/watch_manager_test.go b/test/integration/kubelet/watch_manager_test.go index a065bc26b71..07c934b5e65 100644 --- a/test/integration/kubelet/watch_manager_test.go +++ b/test/integration/kubelet/watch_manager_test.go @@ -27,6 +27,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" @@ -61,7 +62,8 @@ func TestWatchBasedManager(t *testing.T) { // We want all watches to be up and running to stress test it. // So don't treat any secret as immutable here. isImmutable := func(_ runtime.Object) bool { return false } - store := manager.NewObjectCache(listObj, watchObj, newObj, isImmutable, schema.GroupResource{Group: "v1", Resource: "secrets"}) + fakeClock := clock.NewFakeClock(time.Now()) + store := manager.NewObjectCache(listObj, watchObj, newObj, isImmutable, schema.GroupResource{Group: "v1", Resource: "secrets"}, fakeClock, time.Minute) // create 1000 secrets in parallel t.Log(time.Now(), "creating 1000 secrets")