diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index 58fb9990c04..6b398778160 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" "runtime" + "strconv" "sync" "sync/atomic" "time" @@ -72,6 +73,11 @@ type priorityAndFairnessHandler struct { longRunningRequestCheck apirequest.LongRunningRequestCheck fcIfc utilflowcontrol.Interface workEstimator flowcontrolrequest.WorkEstimatorFunc + + // droppedRequests tracks the history of dropped requests for + // the purpose of computing RetryAfter header to avoid system + // overload. + droppedRequests utilflowcontrol.DroppedRequestsTracker } func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Request) { @@ -288,7 +294,11 @@ func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Reque epmetrics.RecordDroppedRequest(r, requestInfo, epmetrics.APIServerComponent, isMutatingRequest) epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests) - tooManyRequests(r, w, retryAfter) + h.droppedRequests.RecordDroppedRequest(classification.PriorityLevelName) + + // TODO(wojtek-t): Idea from deads2k: we can consider some jittering and in case of non-int + // number, just return the truncated result and sleep the remainder server-side. + tooManyRequests(r, w, strconv.Itoa(int(h.droppedRequests.GetRetryAfter(classification.PriorityLevelName)))) } } @@ -317,6 +327,7 @@ func WithPriorityAndFairness( longRunningRequestCheck: longRunningRequestCheck, fcIfc: fcIfc, workEstimator: workEstimator, + droppedRequests: utilflowcontrol.NewDroppedRequestsTracker(), } return http.HandlerFunc(priorityAndFairnessHandler.Handle) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index e6116b536bc..062584177ee 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -360,11 +360,12 @@ func newFakeWatchApfFilter(capacity int) *fakeWatchApfFilter { func (f *fakeWatchApfFilter) Handle(ctx context.Context, requestDigest utilflowcontrol.RequestDigest, - _ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), + noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), _ func() fcrequest.WorkEstimate, _ fq.QueueNoteFn, execFn func(), ) { + noteFn(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault, requestDigest.User.GetName()) canExecute := false func() { f.lock.Lock() diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/dropped_requests_tracker.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/dropped_requests_tracker.go new file mode 100644 index 00000000000..15394120ec0 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/dropped_requests_tracker.go @@ -0,0 +1,231 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flowcontrol + +import ( + "sync" + "sync/atomic" + "time" + + "k8s.io/utils/clock" +) + +const ( + // maxRetryAfter represents the maximum possible retryAfter. + maxRetryAfter = int64(32) +) + +// DroppedRequestsTracker is an interface that allows tracking +// a history od dropped requests in the system for the purpose +// of adjusting RetryAfter header to avoid system overload. +type DroppedRequestsTracker interface { + // RecordDroppedRequest records a request that was just + // dropped from processing. + RecordDroppedRequest(plName string) + + // GetRetryAfter returns the current suggested value of + // RetryAfter value. + GetRetryAfter(plName string) int64 +} + +// unixStat keeps a statistic how many requests were dropped within +// a single second. +type unixStat struct { + unixTime int64 + requests int64 +} + +type droppedRequestsStats struct { + lock sync.RWMutex + + // history stores the history of dropped requests. + history []unixStat + + // To reduce lock-contention, we store the information about + // the current second here, which we can then access under + // reader lock. + currentUnix int64 + currentCount atomic.Int64 + + retryAfter atomic.Int64 + retryAfterUpdateUnix int64 +} + +func newDroppedRequestsStats(nowUnix int64) *droppedRequestsStats { + result := &droppedRequestsStats{ + // We assume that we can bump at any time after first dropped request. + retryAfterUpdateUnix: 0, + } + result.retryAfter.Store(1) + return result +} + +func (s *droppedRequestsStats) recordDroppedRequest(unixTime int64) { + // Short path - if the current second matches passed time, + // just update the stats. + if done := func() bool { + s.lock.RLock() + defer s.lock.RUnlock() + if s.currentUnix == unixTime { + s.currentCount.Add(1) + return true + } + return false + }(); done { + return + } + + // We trigger the change of . + s.lock.Lock() + defer s.lock.Unlock() + if s.currentUnix == unixTime { + s.currentCount.Add(1) + return + } + + s.updateHistory(s.currentUnix, s.currentCount.Load()) + s.currentUnix = unixTime + s.currentCount.Store(1) + + // We only consider updating retryAfter when bumping the current second. + // However, given that we didn't report anything for the current second, + // we recompute it based on statistics from the previous one. + s.updateRetryAfterIfNeededLocked(unixTime) +} + +func (s *droppedRequestsStats) updateHistory(unixTime int64, count int64) { + s.history = append(s.history, unixStat{unixTime: unixTime, requests: count}) + + startIndex := 0 + for ; startIndex < len(s.history) && unixTime-s.history[startIndex].unixTime > maxRetryAfter; startIndex++ { + } + if startIndex > 0 { + s.history = s.history[startIndex:] + } +} + +// updateRetryAfterIfNeededLocked updates the retryAfter based on the number of +// dropped requests in the last `retryAfter` seconds: +// - if there were less than `retryAfter` dropped requests, it decreases +// retryAfter +// - if there were at least 3*`retryAfter` dropped requests, it increases +// retryAfter +// +// The rationale behind these numbers being fairly low is that APF is queuing +// requests and rejecting (dropping) them is a last resort, which is not expected +// unless a given priority level is actually overloaded. +// +// Additionally, we rate-limit the increases of retryAfter to wait at least +// `retryAfter' seconds after the previous increase to avoid multiple bumps +// on a single spike. +// +// We're working with the interval [unixTime-retryAfter, unixTime). +func (s *droppedRequestsStats) updateRetryAfterIfNeededLocked(unixTime int64) { + retryAfter := s.retryAfter.Load() + + droppedRequests := int64(0) + if len(s.history) > 0 { + for i := len(s.history) - 1; i >= 0; i-- { + if unixTime-s.history[i].unixTime > retryAfter { + break + } + if s.history[i].unixTime < unixTime { + droppedRequests += s.history[i].requests + } + } + } + + if unixTime-s.retryAfterUpdateUnix >= retryAfter && droppedRequests >= 3*retryAfter { + // We try to mimic the TCP algorithm and thus are doubling + // the retryAfter here. + retryAfter *= 2 + if retryAfter >= maxRetryAfter { + retryAfter = maxRetryAfter + } + s.retryAfter.Store(retryAfter) + s.retryAfterUpdateUnix = unixTime + return + } + + if droppedRequests < retryAfter && retryAfter > 1 { + // We try to mimc the TCP algorithm and thus are linearly + // scaling down the retryAfter here. + retryAfter-- + s.retryAfter.Store(retryAfter) + return + } +} + +// droppedRequestsTracker implement DroppedRequestsTracker interface +// for the purpose of adjusting RetryAfter header for newly dropped +// requests to avoid system overload. +type droppedRequestsTracker struct { + now func() time.Time + + lock sync.RWMutex + plStats map[string]*droppedRequestsStats +} + +// NewDroppedRequestsTracker is creating a new instance of +// DroppedRequestsTracker. +func NewDroppedRequestsTracker() DroppedRequestsTracker { + return newDroppedRequestsTracker(clock.RealClock{}.Now) +} + +func newDroppedRequestsTracker(now func() time.Time) *droppedRequestsTracker { + return &droppedRequestsTracker{ + now: now, + plStats: make(map[string]*droppedRequestsStats), + } +} + +func (t *droppedRequestsTracker) RecordDroppedRequest(plName string) { + unixTime := t.now().Unix() + + stats := func() *droppedRequestsStats { + // The list of priority levels should change very infrequently, + // so in almost all cases, the fast path should be enough. + t.lock.RLock() + if plStats, ok := t.plStats[plName]; ok { + t.lock.RUnlock() + return plStats + } + t.lock.RUnlock() + + // Slow path taking writer lock to update the map. + t.lock.Lock() + defer t.lock.Unlock() + if plStats, ok := t.plStats[plName]; ok { + return plStats + } + stats := newDroppedRequestsStats(unixTime) + t.plStats[plName] = stats + return stats + }() + + stats.recordDroppedRequest(unixTime) +} + +func (t *droppedRequestsTracker) GetRetryAfter(plName string) int64 { + t.lock.RLock() + defer t.lock.RUnlock() + + if plStats, ok := t.plStats[plName]; ok { + return plStats.retryAfter.Load() + } + return 1 +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/dropped_requests_tracker_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/dropped_requests_tracker_test.go new file mode 100644 index 00000000000..2962d25ae43 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/dropped_requests_tracker_test.go @@ -0,0 +1,170 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flowcontrol + +import ( + "fmt" + "sync" + "testing" + "time" + + testingclock "k8s.io/utils/clock/testing" +) + +func TestDroppedRequestsTracker(t *testing.T) { + fakeClock := testingclock.NewFakeClock(time.Now()) + tracker := newDroppedRequestsTracker(fakeClock.Now) + + // The following table represents the list over time of: + // - seconds elapsed (as computed since the initial time) + // - requests that will be recorded as dropped in a current second + steps := []struct { + secondsElapsed int + // droppedRequests is the number of requests to drop, after + // secondsElapsed. + droppedRequests int + // retryAfter is the expected retryAfter after all dropped + // requests are recorded via RecordDroppedRequest. + retryAfter int64 + }{ + {secondsElapsed: 0, droppedRequests: 5, retryAfter: 1}, + {secondsElapsed: 1, droppedRequests: 11, retryAfter: 2}, + // Check that we don't bump immediately after despite + // multiple dropped requests. + {secondsElapsed: 2, droppedRequests: 1, retryAfter: 2}, + {secondsElapsed: 3, droppedRequests: 11, retryAfter: 4}, + {secondsElapsed: 4, droppedRequests: 1, retryAfter: 4}, + {secondsElapsed: 7, droppedRequests: 1, retryAfter: 8}, + {secondsElapsed: 11, droppedRequests: 1, retryAfter: 8}, + {secondsElapsed: 15, droppedRequests: 1, retryAfter: 7}, + {secondsElapsed: 17, droppedRequests: 1, retryAfter: 6}, + {secondsElapsed: 21, droppedRequests: 14, retryAfter: 5}, + {secondsElapsed: 22, droppedRequests: 1, retryAfter: 10}, + } + + for i, step := range steps { + secondsToAdvance := step.secondsElapsed + if i > 0 { + secondsToAdvance -= steps[i-1].secondsElapsed + } + fakeClock.Step(time.Duration(secondsToAdvance) * time.Second) + + // Record only first dropped request and recompute retryAfter. + for r := 0; r < step.droppedRequests; r++ { + tracker.RecordDroppedRequest("pl") + } + if retryAfter := tracker.GetRetryAfter("pl"); retryAfter != step.retryAfter { + t.Errorf("Unexpected retryAfter: %v, expected: %v", retryAfter, step.retryAfter) + } + + } +} + +func TestDroppedRequestsTrackerPLIndependent(t *testing.T) { + fakeClock := testingclock.NewFakeClock(time.Now()) + tracker := newDroppedRequestsTracker(fakeClock.Now) + + // Report single dropped requests in multiple PLs. + // Validate if RetryAfter isn't bumped next second. + for i := 0; i < 10; i++ { + tracker.RecordDroppedRequest(fmt.Sprintf("pl-%d", i)) + } + fakeClock.Step(time.Second) + for i := 0; i < 10; i++ { + tracker.RecordDroppedRequest(fmt.Sprintf("pl-%d", i)) + retryAfter := tracker.GetRetryAfter(fmt.Sprintf("pl-%d", i)) + if retryAfter != 1 { + t.Errorf("Unexpected retryAfter for pl-%d: %v", i, retryAfter) + } + } + + // Record few droped requests on a single PL. + // Validate that RetryAfter is bumped only for this PL. + for i := 0; i < 5; i++ { + tracker.RecordDroppedRequest("pl-0") + } + fakeClock.Step(time.Second) + for i := 0; i < 10; i++ { + tracker.RecordDroppedRequest(fmt.Sprintf("pl-%d", i)) + retryAfter := tracker.GetRetryAfter(fmt.Sprintf("pl-%d", i)) + switch i { + case 0: + if retryAfter != 2 { + t.Errorf("Unexpected retryAfter for pl-0: %v", retryAfter) + } + default: + if retryAfter != 1 { + t.Errorf("Unexpected retryAfter for pl-%d: %v", i, retryAfter) + } + } + } + // Validate also PL for which no dropped requests was recorded. + if retryAfter := tracker.GetRetryAfter("other-pl"); retryAfter != 1 { + t.Errorf("Unexpected retryAfter for other-pl: %v", retryAfter) + } +} + +func BenchmarkDroppedRequestsTracker(b *testing.B) { + b.StopTimer() + + fakeClock := testingclock.NewFakeClock(time.Now()) + tracker := newDroppedRequestsTracker(fakeClock.Now) + + startCh := make(chan struct{}) + wg := sync.WaitGroup{} + numPLs := 5 + // For all `numPLs` priority levels, create b.N workers each + // of which will try to record a dropped request every 100ms + // with a random jitter. + for i := 0; i < numPLs; i++ { + plName := fmt.Sprintf("priority-level-%d", i) + for i := 0; i < b.N; i++ { + wg.Add(1) + go func() { + defer wg.Done() + <-startCh + + for a := 0; a < 5; a++ { + tracker.RecordDroppedRequest(plName) + time.Sleep(25 * time.Millisecond) + } + }() + } + } + // Time-advancing goroutine. + stopCh := make(chan struct{}) + timeWg := sync.WaitGroup{} + timeWg.Add(1) + go func() { + defer timeWg.Done() + for { + select { + case <-stopCh: + return + case <-time.After(25 * time.Millisecond): + fakeClock.Step(time.Second) + } + } + }() + + b.StartTimer() + close(startCh) + wg.Wait() + + close(stopCh) + timeWg.Wait() +}