From 515106b7957c4da6fdbbb7e474d19bad0fa3f57f Mon Sep 17 00:00:00 2001 From: wojtekt Date: Wed, 25 Aug 2021 21:01:08 +0200 Subject: [PATCH] Don't prematurely close reflectors in case of slow initialization in watch based manager --- .../util/manager/watch_based_manager.go | 11 ++- .../util/manager/watch_based_manager_test.go | 91 +++++++++++++++++++ 2 files changed, 100 insertions(+), 2 deletions(-) diff --git a/pkg/kubelet/util/manager/watch_based_manager.go b/pkg/kubelet/util/manager/watch_based_manager.go index ba0bfe35bc0..98bc6303236 100644 --- a/pkg/kubelet/util/manager/watch_based_manager.go +++ b/pkg/kubelet/util/manager/watch_based_manager.go @@ -95,7 +95,11 @@ func (i *objectCacheItem) setImmutable() { func (i *objectCacheItem) stopIfIdle(now time.Time, maxIdleTime time.Duration) bool { i.lock.Lock() defer i.lock.Unlock() - if !i.stopped && now.After(i.lastAccessTime.Add(maxIdleTime)) { + // Ensure that we don't try to stop not yet initialized reflector. + // In case of overloaded kube-apiserver, if the list request is + // already being processed, all the work would lost and would have + // to be retried. + if !i.stopped && i.store.hasSynced() && now.After(i.lastAccessTime.Add(maxIdleTime)) { return i.stopThreadUnsafe() } return false @@ -287,11 +291,14 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) { if !exists { return nil, fmt.Errorf("object %q/%q not registered", namespace, name) } + // Record last access time independently if it succeeded or not. + // This protects from premature (racy) reflector closure. + item.setLastAccessTime(c.clock.Now()) + item.restartReflectorIfNeeded() if err := wait.PollImmediate(10*time.Millisecond, time.Second, item.hasSynced); err != nil { return nil, fmt.Errorf("failed to sync %s cache: %v", c.groupResource.String(), err) } - item.setLastAccessTime(c.clock.Now()) obj, exists, err := item.store.GetByKey(c.key(namespace, name)) if err != nil { return nil, err diff --git a/pkg/kubelet/util/manager/watch_based_manager_test.go b/pkg/kubelet/util/manager/watch_based_manager_test.go index 415df658608..6d7f68975a8 100644 --- a/pkg/kubelet/util/manager/watch_based_manager_test.go +++ b/pkg/kubelet/util/manager/watch_based_manager_test.go @@ -405,3 +405,94 @@ func TestMaxIdleTimeStopsTheReflector(t *testing.T) { // Reflector should be running when the get function is called periodically. assert.True(t, reflectorRunning()) } + +func TestReflectorNotStopedOnSlowInitialization(t *testing.T) { + secret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "ns", + ResourceVersion: "200", + }, + } + + fakeClock := clock.NewFakeClock(time.Now()) + + fakeClient := &fake.Clientset{} + listReactor := func(a core.Action) (bool, runtime.Object, error) { + <-fakeClock.After(120 * time.Second) + + result := &v1.SecretList{ + ListMeta: metav1.ListMeta{ + ResourceVersion: "200", + }, + Items: []v1.Secret{*secret}, + } + + return true, result, nil + } + + fakeClient.AddReactor("list", "secrets", listReactor) + fakeWatch := watch.NewFake() + fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil)) + store := newSecretCache(fakeClient, fakeClock, time.Minute) + + key := objectKey{namespace: "ns", name: "name"} + itemExists := func() (bool, error) { + store.lock.Lock() + defer store.lock.Unlock() + _, ok := store.items[key] + return ok, nil + } + + reflectorRunning := func() bool { + store.lock.Lock() + defer store.lock.Unlock() + item := store.items[key] + + item.lock.Lock() + defer item.lock.Unlock() + return !item.stopped + } + + reflectorInitialized := func() (bool, error) { + store.lock.Lock() + defer store.lock.Unlock() + item := store.items[key] + + item.lock.Lock() + defer item.lock.Unlock() + return item.store.hasSynced(), nil + } + + // AddReference should start reflector. + store.AddReference("ns", "name") + if err := wait.Poll(10*time.Millisecond, 10*time.Second, itemExists); err != nil { + t.Errorf("item wasn't added to cache") + } + + fakeClock.Step(90 * time.Second) + store.startRecycleIdleWatch() + + // Reflector didn't yet initialize, so it shouldn't be stopped. + // However, Get should still be failing. + assert.True(t, reflectorRunning()) + initialized, _ := reflectorInitialized() + assert.False(t, initialized) + _, err := store.Get("ns", "name") + if err == nil || !strings.Contains(err.Error(), "failed to sync") { + t.Errorf("Expected failed to sync error, got: %v", err) + } + + // Initialization should successfully finish. + fakeClock.Step(30 * time.Second) + if err := wait.Poll(10*time.Millisecond, time.Second, reflectorInitialized); err != nil { + t.Errorf("reflector didn't iniailize correctly") + } + + // recycling shouldn't stop the reflector because it was accessed within last minute. + store.startRecycleIdleWatch() + assert.True(t, reflectorRunning()) + + obj, _ := store.Get("ns", "name") + assert.True(t, apiequality.Semantic.DeepEqual(secret, obj)) +}