mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #104604 from wojtek-t/fix_secret_manager_2
Don't prematurely close reflectors in case of slow initialization in watch based manager
This commit is contained in:
commit
c262d09bb7
@ -95,7 +95,11 @@ func (i *objectCacheItem) setImmutable() {
|
||||
func (i *objectCacheItem) stopIfIdle(now time.Time, maxIdleTime time.Duration) bool {
|
||||
i.lock.Lock()
|
||||
defer i.lock.Unlock()
|
||||
if !i.stopped && now.After(i.lastAccessTime.Add(maxIdleTime)) {
|
||||
// Ensure that we don't try to stop not yet initialized reflector.
|
||||
// In case of overloaded kube-apiserver, if the list request is
|
||||
// already being processed, all the work would lost and would have
|
||||
// to be retried.
|
||||
if !i.stopped && i.store.hasSynced() && now.After(i.lastAccessTime.Add(maxIdleTime)) {
|
||||
return i.stopThreadUnsafe()
|
||||
}
|
||||
return false
|
||||
@ -287,11 +291,14 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("object %q/%q not registered", namespace, name)
|
||||
}
|
||||
// Record last access time independently if it succeeded or not.
|
||||
// This protects from premature (racy) reflector closure.
|
||||
item.setLastAccessTime(c.clock.Now())
|
||||
|
||||
item.restartReflectorIfNeeded()
|
||||
if err := wait.PollImmediate(10*time.Millisecond, time.Second, item.hasSynced); err != nil {
|
||||
return nil, fmt.Errorf("failed to sync %s cache: %v", c.groupResource.String(), err)
|
||||
}
|
||||
item.setLastAccessTime(c.clock.Now())
|
||||
obj, exists, err := item.store.GetByKey(c.key(namespace, name))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -405,3 +405,94 @@ func TestMaxIdleTimeStopsTheReflector(t *testing.T) {
|
||||
// Reflector should be running when the get function is called periodically.
|
||||
assert.True(t, reflectorRunning())
|
||||
}
|
||||
|
||||
func TestReflectorNotStopedOnSlowInitialization(t *testing.T) {
|
||||
secret := &v1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "name",
|
||||
Namespace: "ns",
|
||||
ResourceVersion: "200",
|
||||
},
|
||||
}
|
||||
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
|
||||
fakeClient := &fake.Clientset{}
|
||||
listReactor := func(a core.Action) (bool, runtime.Object, error) {
|
||||
<-fakeClock.After(120 * time.Second)
|
||||
|
||||
result := &v1.SecretList{
|
||||
ListMeta: metav1.ListMeta{
|
||||
ResourceVersion: "200",
|
||||
},
|
||||
Items: []v1.Secret{*secret},
|
||||
}
|
||||
|
||||
return true, result, nil
|
||||
}
|
||||
|
||||
fakeClient.AddReactor("list", "secrets", listReactor)
|
||||
fakeWatch := watch.NewFake()
|
||||
fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
|
||||
store := newSecretCache(fakeClient, fakeClock, time.Minute)
|
||||
|
||||
key := objectKey{namespace: "ns", name: "name"}
|
||||
itemExists := func() (bool, error) {
|
||||
store.lock.Lock()
|
||||
defer store.lock.Unlock()
|
||||
_, ok := store.items[key]
|
||||
return ok, nil
|
||||
}
|
||||
|
||||
reflectorRunning := func() bool {
|
||||
store.lock.Lock()
|
||||
defer store.lock.Unlock()
|
||||
item := store.items[key]
|
||||
|
||||
item.lock.Lock()
|
||||
defer item.lock.Unlock()
|
||||
return !item.stopped
|
||||
}
|
||||
|
||||
reflectorInitialized := func() (bool, error) {
|
||||
store.lock.Lock()
|
||||
defer store.lock.Unlock()
|
||||
item := store.items[key]
|
||||
|
||||
item.lock.Lock()
|
||||
defer item.lock.Unlock()
|
||||
return item.store.hasSynced(), nil
|
||||
}
|
||||
|
||||
// AddReference should start reflector.
|
||||
store.AddReference("ns", "name")
|
||||
if err := wait.Poll(10*time.Millisecond, 10*time.Second, itemExists); err != nil {
|
||||
t.Errorf("item wasn't added to cache")
|
||||
}
|
||||
|
||||
fakeClock.Step(90 * time.Second)
|
||||
store.startRecycleIdleWatch()
|
||||
|
||||
// Reflector didn't yet initialize, so it shouldn't be stopped.
|
||||
// However, Get should still be failing.
|
||||
assert.True(t, reflectorRunning())
|
||||
initialized, _ := reflectorInitialized()
|
||||
assert.False(t, initialized)
|
||||
_, err := store.Get("ns", "name")
|
||||
if err == nil || !strings.Contains(err.Error(), "failed to sync") {
|
||||
t.Errorf("Expected failed to sync error, got: %v", err)
|
||||
}
|
||||
|
||||
// Initialization should successfully finish.
|
||||
fakeClock.Step(30 * time.Second)
|
||||
if err := wait.Poll(10*time.Millisecond, time.Second, reflectorInitialized); err != nil {
|
||||
t.Errorf("reflector didn't iniailize correctly")
|
||||
}
|
||||
|
||||
// recycling shouldn't stop the reflector because it was accessed within last minute.
|
||||
store.startRecycleIdleWatch()
|
||||
assert.True(t, reflectorRunning())
|
||||
|
||||
obj, _ := store.Get("ns", "name")
|
||||
assert.True(t, apiequality.Semantic.DeepEqual(secret, obj))
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user