kubelet/util/manager: small cleanup and start replacing deprecated functions wait.Poll/wait.PollImmediate

Signed-off-by: TommyStarK <thomasmilox@gmail.com>
This commit is contained in:
TommyStarK 2024-05-26 19:26:52 +02:00
parent 4bb434501d
commit 2ed556ae00
No known key found for this signature in database
GPG Key ID: 9D2DFCECABB40F9E
2 changed files with 55 additions and 17 deletions

View File

@ -437,6 +437,31 @@ func TestCacheInvalidation(t *testing.T) {
fakeClient.ClearActions()
}
func TestResourceContentExpired(t *testing.T) {
fakeClient := &fake.Clientset{}
fakeClock := testingclock.NewFakeClock(time.Now())
store := newSecretStore(fakeClient, fakeClock, noObjectTTL, time.Minute)
manager := newCacheBasedSecretManager(store)
// Create a pod with some secrets.
s1 := secretsToAttach{
imagePullSecretNames: []string{"s1"},
containerEnvSecrets: []envSecrets{
{envVarNames: []string{"s1"}, envFromNames: []string{"s10"}},
{envVarNames: []string{"s2"}},
},
}
// emulate a requested resource content that has expired from the server
manager.RegisterPod(podWithSecrets("dummy-ns", "dummy-name", s1))
fakeClient.PrependReactor("get", "secrets", func(action core.Action) (bool, runtime.Object, error) {
return true, &v1.Secret{}, apierrors.NewResourceExpired("expired")
})
// should fail to fetch the latest object
_, err := manager.GetObject("dummy-ns", "s1")
assert.Error(t, err)
}
func TestRegisterIdempotence(t *testing.T) {
fakeClient := &fake.Clientset{}
fakeClock := testingclock.NewFakeClock(time.Now())

View File

@ -38,6 +38,7 @@ import (
corev1 "k8s.io/kubernetes/pkg/apis/core/v1"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/clock"
testingclock "k8s.io/utils/clock/testing"
@ -105,7 +106,7 @@ func TestSecretCache(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: "name", Namespace: "ns", ResourceVersion: "125"},
}
fakeWatch.Add(secret)
getFn := func() (bool, error) {
getFn := func(_ context.Context) (bool, error) {
object, err := store.Get("ns", "name")
if err != nil {
if apierrors.IsNotFound(err) {
@ -119,13 +120,15 @@ func TestSecretCache(t *testing.T) {
}
return true, nil
}
if err := wait.PollImmediate(10*time.Millisecond, time.Second, getFn); err != nil {
tCtx := ktesting.Init(t)
if err := wait.PollUntilContextCancel(tCtx, 10*time.Millisecond, true, getFn); err != nil {
t.Errorf("unexpected error: %v", err)
}
// Eventually we should observer secret deletion.
fakeWatch.Delete(secret)
getFn = func() (bool, error) {
getFn = func(_ context.Context) (bool, error) {
_, err := store.Get("ns", "name")
if err != nil {
if apierrors.IsNotFound(err) {
@ -135,7 +138,9 @@ func TestSecretCache(t *testing.T) {
}
return false, nil
}
if err := wait.PollImmediate(10*time.Millisecond, time.Second, getFn); err != nil {
deadlineCtx, deadlineCancel := context.WithTimeout(tCtx, time.Second)
defer deadlineCancel()
if err := wait.PollUntilContextCancel(deadlineCtx, 10*time.Millisecond, true, getFn); err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -166,7 +171,7 @@ func TestSecretCacheMultipleRegistrations(t *testing.T) {
store.AddReference("ns", "name", "pod")
// This should trigger List and Watch actions eventually.
actionsFn := func() (bool, error) {
actionsFn := func(_ context.Context) (bool, error) {
actions := fakeClient.Actions()
if len(actions) > 2 {
return false, fmt.Errorf("too many actions: %v", actions)
@ -179,7 +184,8 @@ func TestSecretCacheMultipleRegistrations(t *testing.T) {
}
return true, nil
}
if err := wait.PollImmediate(10*time.Millisecond, time.Second, actionsFn); err != nil {
tCtx := ktesting.Init(t)
if err := wait.PollUntilContextCancel(tCtx, 10*time.Millisecond, true, actionsFn); err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -271,7 +277,7 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) {
store := newSecretCache(fakeClient, fakeClock, time.Minute)
key := objectKey{namespace: "ns", name: "name"}
itemExists := func() (bool, error) {
itemExists := func(_ context.Context) (bool, error) {
store.lock.Lock()
defer store.lock.Unlock()
_, ok := store.items[key]
@ -289,7 +295,8 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) {
// AddReference should start reflector.
store.AddReference("ns", "name", "pod")
if err := wait.Poll(10*time.Millisecond, time.Second, itemExists); err != nil {
tCtx := ktesting.Init(t)
if err := wait.PollUntilContextCancel(tCtx, 10*time.Millisecond, false, itemExists); err != nil {
t.Errorf("item wasn't added to cache")
}
@ -309,7 +316,7 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) {
fakeWatch.Add(tc.eventual)
// Eventually Get should return that secret.
getFn := func() (bool, error) {
getFn := func(_ context.Context) (bool, error) {
object, err := store.Get("ns", "name")
if err != nil {
if apierrors.IsNotFound(err) {
@ -320,7 +327,9 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) {
secret := object.(*v1.Secret)
return apiequality.Semantic.DeepEqual(tc.eventual, secret), nil
}
if err := wait.PollImmediate(10*time.Millisecond, time.Second, getFn); err != nil {
deadlineCtx, deadlineCancel := context.WithTimeout(tCtx, time.Second)
defer deadlineCancel()
if err := wait.PollUntilContextCancel(deadlineCtx, 10*time.Millisecond, true, getFn); err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -358,7 +367,7 @@ func TestMaxIdleTimeStopsTheReflector(t *testing.T) {
store := newSecretCache(fakeClient, fakeClock, time.Minute)
key := objectKey{namespace: "ns", name: "name"}
itemExists := func() (bool, error) {
itemExists := func(_ context.Context) (bool, error) {
store.lock.Lock()
defer store.lock.Unlock()
_, ok := store.items[key]
@ -377,7 +386,8 @@ func TestMaxIdleTimeStopsTheReflector(t *testing.T) {
// AddReference should start reflector.
store.AddReference("ns", "name", "pod")
if err := wait.Poll(10*time.Millisecond, 10*time.Second, itemExists); err != nil {
tCtx := ktesting.Init(t)
if err := wait.PollUntilContextCancel(tCtx, 10*time.Millisecond, false, itemExists); err != nil {
t.Errorf("item wasn't added to cache")
}
@ -440,7 +450,7 @@ func TestReflectorNotStoppedOnSlowInitialization(t *testing.T) {
store := newSecretCache(fakeClient, fakeClock, time.Minute)
key := objectKey{namespace: "ns", name: "name"}
itemExists := func() (bool, error) {
itemExists := func(_ context.Context) (bool, error) {
store.lock.Lock()
defer store.lock.Unlock()
_, ok := store.items[key]
@ -457,7 +467,7 @@ func TestReflectorNotStoppedOnSlowInitialization(t *testing.T) {
return !item.stopped
}
reflectorInitialized := func() (bool, error) {
reflectorInitialized := func(_ context.Context) (bool, error) {
store.lock.Lock()
defer store.lock.Unlock()
item := store.items[key]
@ -469,7 +479,8 @@ func TestReflectorNotStoppedOnSlowInitialization(t *testing.T) {
// AddReference should start reflector.
store.AddReference("ns", "name", "pod")
if err := wait.Poll(10*time.Millisecond, 10*time.Second, itemExists); err != nil {
tCtx := ktesting.Init(t)
if err := wait.PollUntilContextCancel(tCtx, 10*time.Millisecond, false, itemExists); err != nil {
t.Errorf("item wasn't added to cache")
}
@ -479,7 +490,7 @@ func TestReflectorNotStoppedOnSlowInitialization(t *testing.T) {
// Reflector didn't yet initialize, so it shouldn't be stopped.
// However, Get should still be failing.
assert.True(t, reflectorRunning())
initialized, _ := reflectorInitialized()
initialized, _ := reflectorInitialized(tCtx)
assert.False(t, initialized)
_, err := store.Get("ns", "name")
if err == nil || !strings.Contains(err.Error(), "failed to sync") {
@ -488,7 +499,9 @@ func TestReflectorNotStoppedOnSlowInitialization(t *testing.T) {
// Initialization should successfully finish.
fakeClock.Step(30 * time.Second)
if err := wait.Poll(10*time.Millisecond, time.Second, reflectorInitialized); err != nil {
deadlineCtx, deadlineCancel := context.WithTimeout(tCtx, time.Second)
defer deadlineCancel()
if err := wait.PollUntilContextCancel(deadlineCtx, 10*time.Millisecond, false, reflectorInitialized); err != nil {
t.Errorf("reflector didn't iniailize correctly")
}