From 04f0bd4e83bbc0a24b8a924333544be86b252c97 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 14 Oct 2024 08:17:14 +0200 Subject: [PATCH] storage/cacher/ready: dynamically calculate the retryAfterSeconds retryAfterSeconds is based on the time elapsed since the state (ready, unready) was last changed. --- .../apiserver/pkg/storage/cacher/cacher.go | 16 ++-- .../storage/cacher/cacher_whitebox_test.go | 43 ++++++++++- .../apiserver/pkg/storage/cacher/ready.go | 35 ++++++--- .../pkg/storage/cacher/ready_test.go | 75 ++++++++++++++----- .../apiserver/pkg/storage/cacher/util.go | 10 +++ .../apiserver/pkg/storage/cacher/util_test.go | 29 +++++++ 6 files changed, 173 insertions(+), 35 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index ec9cc0dcf7d..7109097701e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -374,7 +374,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { objType := reflect.TypeOf(obj) cacher := &Cacher{ resourcePrefix: config.ResourcePrefix, - ready: newReady(), + ready: newReady(config.Clock), storage: config.Storage, objectType: objType, groupResource: config.GroupResource, @@ -506,9 +506,10 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions var readyGeneration int if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { var ok bool - readyGeneration, ok = c.ready.checkAndReadGeneration() + var downtime time.Duration + readyGeneration, downtime, ok = c.ready.checkAndReadGeneration() if !ok { - return nil, errors.NewTooManyRequests("storage is (re)initializing", 1) + return nil, errors.NewTooManyRequests("storage is (re)initializing", calculateRetryAfterForUnreadyCache(downtime)) } } else { readyGeneration, err = c.ready.waitAndReadGeneration(ctx) @@ -629,7 +630,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions c.Lock() defer c.Unlock() - if generation, ok := c.ready.checkAndReadGeneration(); generation != readyGeneration || !ok { + if generation, _, ok := c.ready.checkAndReadGeneration(); generation != readyGeneration || !ok { // We went unready or are already on a different generation. // Avoid registering and starting the watch as it will have to be // terminated immediately anyway. @@ -783,10 +784,10 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio defer span.End(500 * time.Millisecond) if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { - if !c.ready.check() { + if downtime, ok := c.ready.check(); !ok { // If Cacher is not initialized, reject List requests // as described in https://kep.k8s.io/4568 - return errors.NewTooManyRequests("storage is (re)initializing", 1) + return errors.NewTooManyRequests("storage is (re)initializing", calculateRetryAfterForUnreadyCache(downtime)) } } else { if err := c.ready.wait(ctx); err != nil { @@ -1338,7 +1339,8 @@ func (c *Cacher) setInitialEventsEndBookmarkIfRequested(cacheInterval *watchCach } func (c *Cacher) Ready() bool { - return c.ready.check() + _, ok := c.ready.check() + return ok } // errWatcher implements watch.Interface to return a single error diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 480ad7d6753..f13ed32275f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -750,7 +750,7 @@ func TestWatchNotHangingOnStartupFailure(t *testing.T) { // constantly failing lists to the underlying storage. dummyErr := fmt.Errorf("dummy") backingStorage := &dummyStorage{err: dummyErr} - cacher, _, err := newTestCacherWithoutSyncing(backingStorage, clock.RealClock{}) + cacher, _, err := newTestCacherWithoutSyncing(backingStorage, testingclock.NewFakeClock(time.Now())) if err != nil { t.Fatalf("Couldn't create cacher: %v", err) } @@ -3155,3 +3155,44 @@ func TestListIndexer(t *testing.T) { }) } } + +func TestRetryAfterForUnreadyCache(t *testing.T) { + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + t.Skipf("the test requires %v to be enabled", features.ResilientWatchCacheInitialization) + } + backingStorage := &dummyStorage{} + clock := testingclock.NewFakeClock(time.Now()) + cacher, _, err := newTestCacherWithoutSyncing(backingStorage, clock) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + if err = cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("Unexpected error waiting for the cache to be ready") + } + + cacher.ready.set(false) + clock.Step(14 * time.Second) + + opts := storage.ListOptions{ + ResourceVersion: "0", + Predicate: storage.Everything, + } + result := &example.PodList{} + proxy := NewCacheProxy(cacher, backingStorage) + err = proxy.GetList(context.TODO(), "/pods/ns", opts, result) + + if !apierrors.IsTooManyRequests(err) { + t.Fatalf("Unexpected GetList error: %v", err) + } + var statusError apierrors.APIStatus + if !errors.As(err, &statusError) { + t.Fatalf("Unexpected error: %v, expected apierrors.APIStatus", err) + } + if statusError.Status().Details == nil { + t.Fatalf("Expected to get status details, got none") + } + if statusError.Status().Details.RetryAfterSeconds != 2 { + t.Fatalf("Unexpected retry after: %v", statusError.Status().Details.RetryAfterSeconds) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go index 012d6d585c9..0ba05738d13 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go @@ -20,6 +20,9 @@ import ( "context" "fmt" "sync" + "time" + + "k8s.io/utils/clock" ) type status int @@ -43,13 +46,20 @@ type ready struct { lock sync.RWMutex // protect the state and generation variables restartLock sync.Mutex // protect the transition from ready to pending where the channel is recreated waitCh chan struct{} // blocks until is ready or stopped + + clock clock.Clock + lastStateChangeTime time.Time } -func newReady() *ready { - return &ready{ +func newReady(c clock.Clock) *ready { + r := &ready{ waitCh: make(chan struct{}), state: Pending, + clock: c, } + r.updateLastStateChangeTimeLocked() + + return r } // done close the channel once the state is Ready or Stopped @@ -100,17 +110,17 @@ func (r *ready) waitAndReadGeneration(ctx context.Context) (int, error) { } } -// check returns true only if it is Ready. -func (r *ready) check() bool { - _, ok := r.checkAndReadGeneration() - return ok +// check returns the time elapsed since the state was last changed and the current value. +func (r *ready) check() (time.Duration, bool) { + _, elapsed, ok := r.checkAndReadGeneration() + return elapsed, ok } -// checkAndReadGeneration returns the current generation and whether it is Ready. -func (r *ready) checkAndReadGeneration() (int, bool) { +// checkAndReadGeneration returns the current generation, the time elapsed since the state was last changed and the current value. +func (r *ready) checkAndReadGeneration() (int, time.Duration, bool) { r.lock.RLock() defer r.lock.RUnlock() - return r.generation, r.state == Ready + return r.generation, r.clock.Since(r.lastStateChangeTime), r.state == Ready } // set the state to Pending (false) or Ready (true), it does not have effect if the state is Stopped. @@ -123,6 +133,7 @@ func (r *ready) set(ok bool) { if ok && r.state == Pending { r.state = Ready r.generation++ + r.updateLastStateChangeTimeLocked() select { case <-r.waitCh: default: @@ -139,6 +150,7 @@ func (r *ready) set(ok bool) { default: } r.state = Pending + r.updateLastStateChangeTimeLocked() } } @@ -148,6 +160,7 @@ func (r *ready) stop() { defer r.lock.Unlock() if r.state != Stopped { r.state = Stopped + r.updateLastStateChangeTimeLocked() } select { case <-r.waitCh: @@ -155,3 +168,7 @@ func (r *ready) stop() { close(r.waitCh) } } + +func (r *ready) updateLastStateChangeTimeLocked() { + r.lastStateChangeTime = r.clock.Now() +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready_test.go index 34c20592550..3246327cd80 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready_test.go @@ -21,11 +21,13 @@ import ( "sync" "testing" "time" + + testingclock "k8s.io/utils/clock/testing" ) func Test_newReady(t *testing.T) { errCh := make(chan error, 10) - ready := newReady() + ready := newReady(testingclock.NewFakeClock(time.Now())) ready.set(false) // create 10 goroutines waiting for ready for i := 0; i < 10; i++ { @@ -48,20 +50,20 @@ func Test_newReady(t *testing.T) { func Test_newReadySetIdempotent(t *testing.T) { errCh := make(chan error, 10) - ready := newReady() + ready := newReady(testingclock.NewFakeClock(time.Now())) ready.set(false) ready.set(false) ready.set(false) - if generation, ok := ready.checkAndReadGeneration(); generation != 0 || ok { + if generation, _, ok := ready.checkAndReadGeneration(); generation != 0 || ok { t.Errorf("unexpected state: generation=%v ready=%v", generation, ok) } ready.set(true) - if generation, ok := ready.checkAndReadGeneration(); generation != 1 || !ok { + if generation, _, ok := ready.checkAndReadGeneration(); generation != 1 || !ok { t.Errorf("unexpected state: generation=%v ready=%v", generation, ok) } ready.set(true) ready.set(true) - if generation, ok := ready.checkAndReadGeneration(); generation != 1 || !ok { + if generation, _, ok := ready.checkAndReadGeneration(); generation != 1 || !ok { t.Errorf("unexpected state: generation=%v ready=%v", generation, ok) } ready.set(false) @@ -77,7 +79,7 @@ func Test_newReadySetIdempotent(t *testing.T) { t.Errorf("ready should be blocking") } ready.set(true) - if generation, ok := ready.checkAndReadGeneration(); generation != 2 || !ok { + if generation, _, ok := ready.checkAndReadGeneration(); generation != 2 || !ok { t.Errorf("unexpected state: generation=%v ready=%v", generation, ok) } for i := 0; i < 10; i++ { @@ -92,7 +94,7 @@ func Test_newReadySetIdempotent(t *testing.T) { func Test_newReadyRacy(t *testing.T) { concurrency := 1000 errCh := make(chan error, concurrency) - ready := newReady() + ready := newReady(testingclock.NewFakeClock(time.Now())) ready.set(false) wg := sync.WaitGroup{} @@ -123,7 +125,7 @@ func Test_newReadyRacy(t *testing.T) { func Test_newReadyStop(t *testing.T) { errCh := make(chan error, 10) - ready := newReady() + ready := newReady(testingclock.NewFakeClock(time.Now())) ready.set(false) // create 10 goroutines waiting for ready and stop for i := 0; i < 10; i++ { @@ -145,24 +147,24 @@ func Test_newReadyStop(t *testing.T) { } func Test_newReadyCheck(t *testing.T) { - ready := newReady() + ready := newReady(testingclock.NewFakeClock(time.Now())) // it starts as false - if ready.check() { - t.Errorf("unexpected ready state %v", ready.check()) + if _, ok := ready.check(); ok { + t.Errorf("unexpected ready state %v", ok) } ready.set(true) - if !ready.check() { - t.Errorf("unexpected ready state %v", ready.check()) + if _, ok := ready.check(); !ok { + t.Errorf("unexpected ready state %v", ok) } // stop sets ready to false ready.stop() - if ready.check() { - t.Errorf("unexpected ready state %v", ready.check()) + if _, ok := ready.check(); ok { + t.Errorf("unexpected ready state %v", ok) } // can not set to true if is stopped ready.set(true) - if ready.check() { - t.Errorf("unexpected ready state %v", ready.check()) + if _, ok := ready.check(); ok { + t.Errorf("unexpected ready state %v", ok) } err := ready.wait(context.Background()) if err == nil { @@ -172,7 +174,7 @@ func Test_newReadyCheck(t *testing.T) { func Test_newReadyCancelPending(t *testing.T) { errCh := make(chan error, 10) - ready := newReady() + ready := newReady(testingclock.NewFakeClock(time.Now())) ready.set(false) ctx, cancel := context.WithCancel(context.Background()) // create 10 goroutines stuck on pending @@ -193,3 +195,40 @@ func Test_newReadyCancelPending(t *testing.T) { } } } + +func Test_newReadyStateChangeTimestamp(t *testing.T) { + fakeClock := testingclock.NewFakeClock(time.Now()) + fakeClock.SetTime(time.Now()) + + ready := newReady(fakeClock) + fakeClock.Step(time.Minute) + checkReadyTransitionTime(t, ready, time.Minute) + + ready.set(true) + fakeClock.Step(time.Minute) + checkReadyTransitionTime(t, ready, time.Minute) + fakeClock.Step(time.Minute) + checkReadyTransitionTime(t, ready, 2*time.Minute) + + ready.set(false) + fakeClock.Step(time.Minute) + checkReadyTransitionTime(t, ready, time.Minute) + fakeClock.Step(time.Minute) + checkReadyTransitionTime(t, ready, 2*time.Minute) + + ready.set(true) + fakeClock.Step(time.Minute) + checkReadyTransitionTime(t, ready, time.Minute) + + ready.stop() + fakeClock.Step(time.Minute) + checkReadyTransitionTime(t, ready, time.Minute) + fakeClock.Step(time.Minute) + checkReadyTransitionTime(t, ready, 2*time.Minute) +} + +func checkReadyTransitionTime(t *testing.T, r *ready, expectedLastStateChangeDuration time.Duration) { + if lastStateChangeDuration, _ := r.check(); lastStateChangeDuration != expectedLastStateChangeDuration { + t.Errorf("unexpected last state change duration: %v, expected: %v", lastStateChangeDuration, expectedLastStateChangeDuration) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/util.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/util.go index 63a23800f02..341e256ad5d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/util.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/util.go @@ -17,7 +17,9 @@ limitations under the License. package cacher import ( + "math" "strings" + "time" ) // hasPathPrefix returns true if the string matches pathPrefix exactly, or if is prefixed with pathPrefix at a path segment boundary @@ -44,3 +46,11 @@ func hasPathPrefix(s, pathPrefix string) bool { } return false } + +// calculateRetryAfterForUnreadyCache calculates the retry duration based on the cache downtime. +func calculateRetryAfterForUnreadyCache(downtime time.Duration) int { + factor := 0.06 + result := math.Exp(factor * downtime.Seconds()) + result = math.Min(30, math.Max(1, result)) + return int(result) +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/util_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/util_test.go index 90eb4b3594f..9f001aeec27 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/util_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/util_test.go @@ -18,6 +18,7 @@ package cacher import ( "testing" + "time" ) func TestHasPathPrefix(t *testing.T) { @@ -67,3 +68,31 @@ func TestHasPathPrefix(t *testing.T) { } } } + +func TestCalculateRetryAfterForUnreadyCache(t *testing.T) { + tests := []struct { + downtime time.Duration + expected int + }{ + {downtime: 0 * time.Second, expected: 1}, + {downtime: 1 * time.Second, expected: 1}, + {downtime: 3 * time.Second, expected: 1}, + {downtime: 5 * time.Second, expected: 1}, + {downtime: 7 * time.Second, expected: 1}, + {downtime: 10 * time.Second, expected: 1}, + {downtime: 14 * time.Second, expected: 2}, + {downtime: 20 * time.Second, expected: 3}, + {downtime: 30 * time.Second, expected: 6}, + {downtime: 40 * time.Second, expected: 11}, + {downtime: 540 * time.Second, expected: 30}, + } + + for _, test := range tests { + t.Run(test.downtime.String(), func(t *testing.T) { + result := calculateRetryAfterForUnreadyCache(test.downtime) + if result != test.expected { + t.Errorf("for downtime %s, expected %d, but got %d", test.downtime, test.expected, result) + } + }) + } +}