From 2ed556ae001fce287271382bcc7693c8104da0b2 Mon Sep 17 00:00:00 2001 From: TommyStarK Date: Sun, 26 May 2024 19:26:52 +0200 Subject: [PATCH] kubelet/util/manager: small cleanup and start replacing deprecated functions wait.Poll/wait.PollImmediate Signed-off-by: TommyStarK --- .../util/manager/cache_based_manager_test.go | 25 ++++++++++ .../util/manager/watch_based_manager_test.go | 47 ++++++++++++------- 2 files changed, 55 insertions(+), 17 deletions(-) diff --git a/pkg/kubelet/util/manager/cache_based_manager_test.go b/pkg/kubelet/util/manager/cache_based_manager_test.go index 38bb2439b9a..d85f888767b 100644 --- a/pkg/kubelet/util/manager/cache_based_manager_test.go +++ b/pkg/kubelet/util/manager/cache_based_manager_test.go @@ -437,6 +437,31 @@ func TestCacheInvalidation(t *testing.T) { fakeClient.ClearActions() } +func TestResourceContentExpired(t *testing.T) { + fakeClient := &fake.Clientset{} + fakeClock := testingclock.NewFakeClock(time.Now()) + store := newSecretStore(fakeClient, fakeClock, noObjectTTL, time.Minute) + manager := newCacheBasedSecretManager(store) + + // Create a pod with some secrets. + s1 := secretsToAttach{ + imagePullSecretNames: []string{"s1"}, + containerEnvSecrets: []envSecrets{ + {envVarNames: []string{"s1"}, envFromNames: []string{"s10"}}, + {envVarNames: []string{"s2"}}, + }, + } + + // emulate a requested resource content that has expired from the server + manager.RegisterPod(podWithSecrets("dummy-ns", "dummy-name", s1)) + fakeClient.PrependReactor("get", "secrets", func(action core.Action) (bool, runtime.Object, error) { + return true, &v1.Secret{}, apierrors.NewResourceExpired("expired") + }) + // should fail to fetch the latest object + _, err := manager.GetObject("dummy-ns", "s1") + assert.Error(t, err) +} + func TestRegisterIdempotence(t *testing.T) { fakeClient := &fake.Clientset{} fakeClock := testingclock.NewFakeClock(time.Now()) diff --git a/pkg/kubelet/util/manager/watch_based_manager_test.go b/pkg/kubelet/util/manager/watch_based_manager_test.go index 64d33c6f53c..c427fcdc636 100644 --- a/pkg/kubelet/util/manager/watch_based_manager_test.go +++ b/pkg/kubelet/util/manager/watch_based_manager_test.go @@ -38,6 +38,7 @@ import ( corev1 "k8s.io/kubernetes/pkg/apis/core/v1" + "k8s.io/kubernetes/test/utils/ktesting" "k8s.io/utils/clock" testingclock "k8s.io/utils/clock/testing" @@ -105,7 +106,7 @@ func TestSecretCache(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "name", Namespace: "ns", ResourceVersion: "125"}, } fakeWatch.Add(secret) - getFn := func() (bool, error) { + getFn := func(_ context.Context) (bool, error) { object, err := store.Get("ns", "name") if err != nil { if apierrors.IsNotFound(err) { @@ -119,13 +120,15 @@ func TestSecretCache(t *testing.T) { } return true, nil } - if err := wait.PollImmediate(10*time.Millisecond, time.Second, getFn); err != nil { + + tCtx := ktesting.Init(t) + if err := wait.PollUntilContextCancel(tCtx, 10*time.Millisecond, true, getFn); err != nil { t.Errorf("unexpected error: %v", err) } // Eventually we should observer secret deletion. fakeWatch.Delete(secret) - getFn = func() (bool, error) { + getFn = func(_ context.Context) (bool, error) { _, err := store.Get("ns", "name") if err != nil { if apierrors.IsNotFound(err) { @@ -135,7 +138,9 @@ func TestSecretCache(t *testing.T) { } return false, nil } - if err := wait.PollImmediate(10*time.Millisecond, time.Second, getFn); err != nil { + deadlineCtx, deadlineCancel := context.WithTimeout(tCtx, time.Second) + defer deadlineCancel() + if err := wait.PollUntilContextCancel(deadlineCtx, 10*time.Millisecond, true, getFn); err != nil { t.Errorf("unexpected error: %v", err) } @@ -166,7 +171,7 @@ func TestSecretCacheMultipleRegistrations(t *testing.T) { store.AddReference("ns", "name", "pod") // This should trigger List and Watch actions eventually. - actionsFn := func() (bool, error) { + actionsFn := func(_ context.Context) (bool, error) { actions := fakeClient.Actions() if len(actions) > 2 { return false, fmt.Errorf("too many actions: %v", actions) @@ -179,7 +184,8 @@ func TestSecretCacheMultipleRegistrations(t *testing.T) { } return true, nil } - if err := wait.PollImmediate(10*time.Millisecond, time.Second, actionsFn); err != nil { + tCtx := ktesting.Init(t) + if err := wait.PollUntilContextCancel(tCtx, 10*time.Millisecond, true, actionsFn); err != nil { t.Errorf("unexpected error: %v", err) } @@ -271,7 +277,7 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) { store := newSecretCache(fakeClient, fakeClock, time.Minute) key := objectKey{namespace: "ns", name: "name"} - itemExists := func() (bool, error) { + itemExists := func(_ context.Context) (bool, error) { store.lock.Lock() defer store.lock.Unlock() _, ok := store.items[key] @@ -289,7 +295,8 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) { // AddReference should start reflector. store.AddReference("ns", "name", "pod") - if err := wait.Poll(10*time.Millisecond, time.Second, itemExists); err != nil { + tCtx := ktesting.Init(t) + if err := wait.PollUntilContextCancel(tCtx, 10*time.Millisecond, false, itemExists); err != nil { t.Errorf("item wasn't added to cache") } @@ -309,7 +316,7 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) { fakeWatch.Add(tc.eventual) // Eventually Get should return that secret. - getFn := func() (bool, error) { + getFn := func(_ context.Context) (bool, error) { object, err := store.Get("ns", "name") if err != nil { if apierrors.IsNotFound(err) { @@ -320,7 +327,9 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) { secret := object.(*v1.Secret) return apiequality.Semantic.DeepEqual(tc.eventual, secret), nil } - if err := wait.PollImmediate(10*time.Millisecond, time.Second, getFn); err != nil { + deadlineCtx, deadlineCancel := context.WithTimeout(tCtx, time.Second) + defer deadlineCancel() + if err := wait.PollUntilContextCancel(deadlineCtx, 10*time.Millisecond, true, getFn); err != nil { t.Errorf("unexpected error: %v", err) } @@ -358,7 +367,7 @@ func TestMaxIdleTimeStopsTheReflector(t *testing.T) { store := newSecretCache(fakeClient, fakeClock, time.Minute) key := objectKey{namespace: "ns", name: "name"} - itemExists := func() (bool, error) { + itemExists := func(_ context.Context) (bool, error) { store.lock.Lock() defer store.lock.Unlock() _, ok := store.items[key] @@ -377,7 +386,8 @@ func TestMaxIdleTimeStopsTheReflector(t *testing.T) { // AddReference should start reflector. store.AddReference("ns", "name", "pod") - if err := wait.Poll(10*time.Millisecond, 10*time.Second, itemExists); err != nil { + tCtx := ktesting.Init(t) + if err := wait.PollUntilContextCancel(tCtx, 10*time.Millisecond, false, itemExists); err != nil { t.Errorf("item wasn't added to cache") } @@ -440,7 +450,7 @@ func TestReflectorNotStoppedOnSlowInitialization(t *testing.T) { store := newSecretCache(fakeClient, fakeClock, time.Minute) key := objectKey{namespace: "ns", name: "name"} - itemExists := func() (bool, error) { + itemExists := func(_ context.Context) (bool, error) { store.lock.Lock() defer store.lock.Unlock() _, ok := store.items[key] @@ -457,7 +467,7 @@ func TestReflectorNotStoppedOnSlowInitialization(t *testing.T) { return !item.stopped } - reflectorInitialized := func() (bool, error) { + reflectorInitialized := func(_ context.Context) (bool, error) { store.lock.Lock() defer store.lock.Unlock() item := store.items[key] @@ -469,7 +479,8 @@ func TestReflectorNotStoppedOnSlowInitialization(t *testing.T) { // AddReference should start reflector. store.AddReference("ns", "name", "pod") - if err := wait.Poll(10*time.Millisecond, 10*time.Second, itemExists); err != nil { + tCtx := ktesting.Init(t) + if err := wait.PollUntilContextCancel(tCtx, 10*time.Millisecond, false, itemExists); err != nil { t.Errorf("item wasn't added to cache") } @@ -479,7 +490,7 @@ func TestReflectorNotStoppedOnSlowInitialization(t *testing.T) { // Reflector didn't yet initialize, so it shouldn't be stopped. // However, Get should still be failing. assert.True(t, reflectorRunning()) - initialized, _ := reflectorInitialized() + initialized, _ := reflectorInitialized(tCtx) assert.False(t, initialized) _, err := store.Get("ns", "name") if err == nil || !strings.Contains(err.Error(), "failed to sync") { @@ -488,7 +499,9 @@ func TestReflectorNotStoppedOnSlowInitialization(t *testing.T) { // Initialization should successfully finish. fakeClock.Step(30 * time.Second) - if err := wait.Poll(10*time.Millisecond, time.Second, reflectorInitialized); err != nil { + deadlineCtx, deadlineCancel := context.WithTimeout(tCtx, time.Second) + defer deadlineCancel() + if err := wait.PollUntilContextCancel(deadlineCtx, 10*time.Millisecond, false, reflectorInitialized); err != nil { t.Errorf("reflector didn't iniailize correctly") }