From 16fecf3e76163ddb6d93199f5cf094fd9588b706 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Tue, 18 Apr 2023 20:34:25 +0200 Subject: [PATCH 1/2] Refactor APF handler in preparation for dynamic retryAfter --- .../pkg/server/filters/maxinflight.go | 9 +- .../server/filters/priority-and-fairness.go | 454 +++++++++--------- 2 files changed, 239 insertions(+), 224 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go index 5d7b00ec337..9effcb768f2 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go @@ -34,7 +34,6 @@ import ( const ( // Constant for the retry-after interval on rate limiting. - // TODO: maybe make this dynamic? or user-adjustable? retryAfter = "1" // How often inflight usage metric should be updated. Because @@ -210,7 +209,7 @@ func WithMaxInFlightLimit( // We need to split this data between buckets used for throttling. metrics.RecordDroppedRequest(r, requestInfo, metrics.APIServerComponent, isMutatingRequest) metrics.RecordRequestTermination(r, requestInfo, metrics.APIServerComponent, http.StatusTooManyRequests) - tooManyRequests(r, w) + tooManyRequests(r, w, retryAfter) } } }) @@ -221,9 +220,3 @@ func WithMaxInFlightLimit( func StartMaxInFlightWatermarkMaintenance(stopCh <-chan struct{}) { startWatermarkMaintenance(watermark, stopCh) } - -func tooManyRequests(req *http.Request, w http.ResponseWriter) { - // Return a 429 status indicating "Too Many Requests" - w.Header().Set("Retry-After", retryAfter) - http.Error(w, "Too many requests, please try again later.", http.StatusTooManyRequests) -} diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index 937971c17eb..58fb9990c04 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -67,6 +67,231 @@ func truncateLogField(s string) string { var initAPFOnce sync.Once +type priorityAndFairnessHandler struct { + handler http.Handler + longRunningRequestCheck apirequest.LongRunningRequestCheck + fcIfc utilflowcontrol.Interface + workEstimator flowcontrolrequest.WorkEstimatorFunc +} + +func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + requestInfo, ok := apirequest.RequestInfoFrom(ctx) + if !ok { + handleError(w, r, fmt.Errorf("no RequestInfo found in context")) + return + } + user, ok := apirequest.UserFrom(ctx) + if !ok { + handleError(w, r, fmt.Errorf("no User found in context")) + return + } + + isWatchRequest := watchVerbs.Has(requestInfo.Verb) + + // Skip tracking long running non-watch requests. + if h.longRunningRequestCheck != nil && h.longRunningRequestCheck(r, requestInfo) && !isWatchRequest { + klog.V(6).Infof("Serving RequestInfo=%#+v, user.Info=%#+v as longrunning\n", requestInfo, user) + h.handler.ServeHTTP(w, r) + return + } + + var classification *PriorityAndFairnessClassification + noteFn := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) { + classification = &PriorityAndFairnessClassification{ + FlowSchemaName: fs.Name, + FlowSchemaUID: fs.UID, + PriorityLevelName: pl.Name, + PriorityLevelUID: pl.UID, + } + + httplog.AddKeyValue(ctx, "apf_pl", truncateLogField(pl.Name)) + httplog.AddKeyValue(ctx, "apf_fs", truncateLogField(fs.Name)) + } + // estimateWork is called, if at all, after noteFn + estimateWork := func() flowcontrolrequest.WorkEstimate { + if classification == nil { + // workEstimator is being invoked before classification of + // the request has completed, we should never be here though. + klog.ErrorS(fmt.Errorf("workEstimator is being invoked before classification of the request has completed"), + "Using empty FlowSchema and PriorityLevelConfiguration name", "verb", r.Method, "URI", r.RequestURI) + return h.workEstimator(r, "", "") + } + + workEstimate := h.workEstimator(r, classification.FlowSchemaName, classification.PriorityLevelName) + + fcmetrics.ObserveWorkEstimatedSeats(classification.PriorityLevelName, classification.FlowSchemaName, workEstimate.MaxSeats()) + httplog.AddKeyValue(ctx, "apf_iseats", workEstimate.InitialSeats) + httplog.AddKeyValue(ctx, "apf_fseats", workEstimate.FinalSeats) + httplog.AddKeyValue(ctx, "apf_additionalLatency", workEstimate.AdditionalLatency) + + return workEstimate + } + + var served bool + isMutatingRequest := !nonMutatingRequestVerbs.Has(requestInfo.Verb) + noteExecutingDelta := func(delta int32) { + if isMutatingRequest { + watermark.recordMutating(int(atomic.AddInt32(&atomicMutatingExecuting, delta))) + } else { + watermark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyExecuting, delta))) + } + } + noteWaitingDelta := func(delta int32) { + if isMutatingRequest { + waitingMark.recordMutating(int(atomic.AddInt32(&atomicMutatingWaiting, delta))) + } else { + waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta))) + } + } + queueNote := func(inQueue bool) { + if inQueue { + noteWaitingDelta(1) + } else { + noteWaitingDelta(-1) + } + } + + digest := utilflowcontrol.RequestDigest{ + RequestInfo: requestInfo, + User: user, + } + + if isWatchRequest { + // This channel blocks calling handler.ServeHTTP() until closed, and is closed inside execute(). + // If APF rejects the request, it is never closed. + shouldStartWatchCh := make(chan struct{}) + + watchInitializationSignal := newInitializationSignal() + // This wraps the request passed to handler.ServeHTTP(), + // setting a context that plumbs watchInitializationSignal to storage + var watchReq *http.Request + // This is set inside execute(), prior to closing shouldStartWatchCh. + // If the request is rejected by APF it is left nil. + var forgetWatch utilflowcontrol.ForgetWatchFunc + + defer func() { + // Protect from the situation when request will not reach storage layer + // and the initialization signal will not be send. + if watchInitializationSignal != nil { + watchInitializationSignal.Signal() + } + // Forget the watcher if it was registered. + // + // This is race-free because by this point, one of the following occurred: + // case <-shouldStartWatchCh: execute() completed the assignment to forgetWatch + // case <-resultCh: Handle() completed, and Handle() does not return + // while execute() is running + if forgetWatch != nil { + forgetWatch() + } + }() + + execute := func() { + startedAt := time.Now() + defer func() { + httplog.AddKeyValue(ctx, "apf_init_latency", time.Since(startedAt)) + }() + noteExecutingDelta(1) + defer noteExecutingDelta(-1) + served = true + setResponseHeaders(classification, w) + + forgetWatch = h.fcIfc.RegisterWatch(r) + + // Notify the main thread that we're ready to start the watch. + close(shouldStartWatchCh) + + // Wait until the request is finished from the APF point of view + // (which is when its initialization is done). + watchInitializationSignal.Wait() + } + + // Ensure that an item can be put to resultCh asynchronously. + resultCh := make(chan interface{}, 1) + + // Call Handle in a separate goroutine. + // The reason for it is that from APF point of view, the request processing + // finishes as soon as watch is initialized (which is generally orders of + // magnitude faster then the watch request itself). This means that Handle() + // call finishes much faster and for performance reasons we want to reduce + // the number of running goroutines - so we run the shorter thing in a + // dedicated goroutine and the actual watch handler in the main one. + go func() { + defer func() { + err := recover() + // do not wrap the sentinel ErrAbortHandler panic value + if err != nil && err != http.ErrAbortHandler { + // Same as stdlib http server code. Manually allocate stack + // trace buffer size to prevent excessively large logs + const size = 64 << 10 + buf := make([]byte, size) + buf = buf[:runtime.Stack(buf, false)] + err = fmt.Sprintf("%v\n%s", err, buf) + } + + // Ensure that the result is put into resultCh independently of the panic. + resultCh <- err + }() + + // We create handleCtx with explicit cancelation function. + // The reason for it is that Handle() underneath may start additional goroutine + // that is blocked on context cancellation. However, from APF point of view, + // we don't want to wait until the whole watch request is processed (which is + // when it context is actually cancelled) - we want to unblock the goroutine as + // soon as the request is processed from the APF point of view. + // + // Note that we explicitly do NOT call the actuall handler using that context + // to avoid cancelling request too early. + handleCtx, handleCtxCancel := context.WithCancel(ctx) + defer handleCtxCancel() + + // Note that Handle will return irrespective of whether the request + // executes or is rejected. In the latter case, the function will return + // without calling the passed `execute` function. + h.fcIfc.Handle(handleCtx, digest, noteFn, estimateWork, queueNote, execute) + }() + + select { + case <-shouldStartWatchCh: + watchCtx := utilflowcontrol.WithInitializationSignal(ctx, watchInitializationSignal) + watchReq = r.WithContext(watchCtx) + h.handler.ServeHTTP(w, watchReq) + // Protect from the situation when request will not reach storage layer + // and the initialization signal will not be send. + // It has to happen before waiting on the resultCh below. + watchInitializationSignal.Signal() + // TODO: Consider finishing the request as soon as Handle call panics. + if err := <-resultCh; err != nil { + panic(err) + } + case err := <-resultCh: + if err != nil { + panic(err) + } + } + } else { + execute := func() { + noteExecutingDelta(1) + defer noteExecutingDelta(-1) + served = true + setResponseHeaders(classification, w) + + h.handler.ServeHTTP(w, r) + } + + h.fcIfc.Handle(ctx, digest, noteFn, estimateWork, queueNote, execute) + } + + if !served { + setResponseHeaders(classification, w) + + epmetrics.RecordDroppedRequest(r, requestInfo, epmetrics.APIServerComponent, isMutatingRequest) + epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests) + tooManyRequests(r, w, retryAfter) + } +} + // WithPriorityAndFairness limits the number of in-flight // requests in a fine-grained way. func WithPriorityAndFairness( @@ -86,223 +311,14 @@ func WithPriorityAndFairness( waitingMark.readOnlyObserver = fcmetrics.GetWaitingReadonlyConcurrency() waitingMark.mutatingObserver = fcmetrics.GetWaitingMutatingConcurrency() }) - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - requestInfo, ok := apirequest.RequestInfoFrom(ctx) - if !ok { - handleError(w, r, fmt.Errorf("no RequestInfo found in context")) - return - } - user, ok := apirequest.UserFrom(ctx) - if !ok { - handleError(w, r, fmt.Errorf("no User found in context")) - return - } - isWatchRequest := watchVerbs.Has(requestInfo.Verb) - - // Skip tracking long running non-watch requests. - if longRunningRequestCheck != nil && longRunningRequestCheck(r, requestInfo) && !isWatchRequest { - klog.V(6).Infof("Serving RequestInfo=%#+v, user.Info=%#+v as longrunning\n", requestInfo, user) - handler.ServeHTTP(w, r) - return - } - - var classification *PriorityAndFairnessClassification - noteFn := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) { - classification = &PriorityAndFairnessClassification{ - FlowSchemaName: fs.Name, - FlowSchemaUID: fs.UID, - PriorityLevelName: pl.Name, - PriorityLevelUID: pl.UID} - - httplog.AddKeyValue(ctx, "apf_pl", truncateLogField(pl.Name)) - httplog.AddKeyValue(ctx, "apf_fs", truncateLogField(fs.Name)) - } - // estimateWork is called, if at all, after noteFn - estimateWork := func() flowcontrolrequest.WorkEstimate { - if classification == nil { - // workEstimator is being invoked before classification of - // the request has completed, we should never be here though. - klog.ErrorS(fmt.Errorf("workEstimator is being invoked before classification of the request has completed"), - "Using empty FlowSchema and PriorityLevelConfiguration name", "verb", r.Method, "URI", r.RequestURI) - - return workEstimator(r, "", "") - } - - workEstimate := workEstimator(r, classification.FlowSchemaName, classification.PriorityLevelName) - - fcmetrics.ObserveWorkEstimatedSeats(classification.PriorityLevelName, classification.FlowSchemaName, workEstimate.MaxSeats()) - httplog.AddKeyValue(ctx, "apf_iseats", workEstimate.InitialSeats) - httplog.AddKeyValue(ctx, "apf_fseats", workEstimate.FinalSeats) - httplog.AddKeyValue(ctx, "apf_additionalLatency", workEstimate.AdditionalLatency) - - return workEstimate - } - - var served bool - isMutatingRequest := !nonMutatingRequestVerbs.Has(requestInfo.Verb) - noteExecutingDelta := func(delta int32) { - if isMutatingRequest { - watermark.recordMutating(int(atomic.AddInt32(&atomicMutatingExecuting, delta))) - } else { - watermark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyExecuting, delta))) - } - } - noteWaitingDelta := func(delta int32) { - if isMutatingRequest { - waitingMark.recordMutating(int(atomic.AddInt32(&atomicMutatingWaiting, delta))) - } else { - waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta))) - } - } - queueNote := func(inQueue bool) { - if inQueue { - noteWaitingDelta(1) - } else { - noteWaitingDelta(-1) - } - } - - digest := utilflowcontrol.RequestDigest{ - RequestInfo: requestInfo, - User: user, - } - - if isWatchRequest { - // This channel blocks calling handler.ServeHTTP() until closed, and is closed inside execute(). - // If APF rejects the request, it is never closed. - shouldStartWatchCh := make(chan struct{}) - - watchInitializationSignal := newInitializationSignal() - // This wraps the request passed to handler.ServeHTTP(), - // setting a context that plumbs watchInitializationSignal to storage - var watchReq *http.Request - // This is set inside execute(), prior to closing shouldStartWatchCh. - // If the request is rejected by APF it is left nil. - var forgetWatch utilflowcontrol.ForgetWatchFunc - - defer func() { - // Protect from the situation when request will not reach storage layer - // and the initialization signal will not be send. - if watchInitializationSignal != nil { - watchInitializationSignal.Signal() - } - // Forget the watcher if it was registered. - // - // // This is race-free because by this point, one of the following occurred: - // case <-shouldStartWatchCh: execute() completed the assignment to forgetWatch - // case <-resultCh: Handle() completed, and Handle() does not return - // while execute() is running - if forgetWatch != nil { - forgetWatch() - } - }() - - execute := func() { - startedAt := time.Now() - defer func() { - httplog.AddKeyValue(ctx, "apf_init_latency", time.Since(startedAt)) - }() - noteExecutingDelta(1) - defer noteExecutingDelta(-1) - served = true - setResponseHeaders(classification, w) - - forgetWatch = fcIfc.RegisterWatch(r) - - // Notify the main thread that we're ready to start the watch. - close(shouldStartWatchCh) - - // Wait until the request is finished from the APF point of view - // (which is when its initialization is done). - watchInitializationSignal.Wait() - } - - // Ensure that an item can be put to resultCh asynchronously. - resultCh := make(chan interface{}, 1) - - // Call Handle in a separate goroutine. - // The reason for it is that from APF point of view, the request processing - // finishes as soon as watch is initialized (which is generally orders of - // magnitude faster then the watch request itself). This means that Handle() - // call finishes much faster and for performance reasons we want to reduce - // the number of running goroutines - so we run the shorter thing in a - // dedicated goroutine and the actual watch handler in the main one. - go func() { - defer func() { - err := recover() - // do not wrap the sentinel ErrAbortHandler panic value - if err != nil && err != http.ErrAbortHandler { - // Same as stdlib http server code. Manually allocate stack - // trace buffer size to prevent excessively large logs - const size = 64 << 10 - buf := make([]byte, size) - buf = buf[:runtime.Stack(buf, false)] - err = fmt.Sprintf("%v\n%s", err, buf) - } - - // Ensure that the result is put into resultCh independently of the panic. - resultCh <- err - }() - - // We create handleCtx with explicit cancelation function. - // The reason for it is that Handle() underneath may start additional goroutine - // that is blocked on context cancellation. However, from APF point of view, - // we don't want to wait until the whole watch request is processed (which is - // when it context is actually cancelled) - we want to unblock the goroutine as - // soon as the request is processed from the APF point of view. - // - // Note that we explicitly do NOT call the actuall handler using that context - // to avoid cancelling request too early. - handleCtx, handleCtxCancel := context.WithCancel(ctx) - defer handleCtxCancel() - - // Note that Handle will return irrespective of whether the request - // executes or is rejected. In the latter case, the function will return - // without calling the passed `execute` function. - fcIfc.Handle(handleCtx, digest, noteFn, estimateWork, queueNote, execute) - }() - - select { - case <-shouldStartWatchCh: - watchCtx := utilflowcontrol.WithInitializationSignal(ctx, watchInitializationSignal) - watchReq = r.WithContext(watchCtx) - handler.ServeHTTP(w, watchReq) - // Protect from the situation when request will not reach storage layer - // and the initialization signal will not be send. - // It has to happen before waiting on the resultCh below. - watchInitializationSignal.Signal() - // TODO: Consider finishing the request as soon as Handle call panics. - if err := <-resultCh; err != nil { - panic(err) - } - case err := <-resultCh: - if err != nil { - panic(err) - } - } - } else { - execute := func() { - noteExecutingDelta(1) - defer noteExecutingDelta(-1) - served = true - setResponseHeaders(classification, w) - - handler.ServeHTTP(w, r) - } - - fcIfc.Handle(ctx, digest, noteFn, estimateWork, queueNote, execute) - } - - if !served { - setResponseHeaders(classification, w) - - epmetrics.RecordDroppedRequest(r, requestInfo, epmetrics.APIServerComponent, isMutatingRequest) - epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests) - tooManyRequests(r, w) - } - }) + priorityAndFairnessHandler := &priorityAndFairnessHandler{ + handler: handler, + longRunningRequestCheck: longRunningRequestCheck, + fcIfc: fcIfc, + workEstimator: workEstimator, + } + return http.HandlerFunc(priorityAndFairnessHandler.Handle) } // StartPriorityAndFairnessWatermarkMaintenance starts the goroutines to observe and maintain watermarks for @@ -323,3 +339,9 @@ func setResponseHeaders(classification *PriorityAndFairnessClassification, w htt w.Header().Set(flowcontrol.ResponseHeaderMatchedPriorityLevelConfigurationUID, string(classification.PriorityLevelUID)) w.Header().Set(flowcontrol.ResponseHeaderMatchedFlowSchemaUID, string(classification.FlowSchemaUID)) } + +func tooManyRequests(req *http.Request, w http.ResponseWriter, retryAfter string) { + // Return a 429 status indicating "Too Many Requests" + w.Header().Set("Retry-After", retryAfter) + http.Error(w, "Too many requests, please try again later.", http.StatusTooManyRequests) +} From 23ac0fdaa52209c06eacf3613101174ea77ec42b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Thu, 20 Apr 2023 10:18:48 +0200 Subject: [PATCH 2/2] APF: Dynamically compute retry-after based on history --- .../server/filters/priority-and-fairness.go | 13 +- .../filters/priority-and-fairness_test.go | 3 +- .../flowcontrol/dropped_requests_tracker.go | 231 ++++++++++++++++++ .../dropped_requests_tracker_test.go | 170 +++++++++++++ 4 files changed, 415 insertions(+), 2 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/util/flowcontrol/dropped_requests_tracker.go create mode 100644 staging/src/k8s.io/apiserver/pkg/util/flowcontrol/dropped_requests_tracker_test.go diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index 58fb9990c04..6b398778160 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" "runtime" + "strconv" "sync" "sync/atomic" "time" @@ -72,6 +73,11 @@ type priorityAndFairnessHandler struct { longRunningRequestCheck apirequest.LongRunningRequestCheck fcIfc utilflowcontrol.Interface workEstimator flowcontrolrequest.WorkEstimatorFunc + + // droppedRequests tracks the history of dropped requests for + // the purpose of computing RetryAfter header to avoid system + // overload. + droppedRequests utilflowcontrol.DroppedRequestsTracker } func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Request) { @@ -288,7 +294,11 @@ func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Reque epmetrics.RecordDroppedRequest(r, requestInfo, epmetrics.APIServerComponent, isMutatingRequest) epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests) - tooManyRequests(r, w, retryAfter) + h.droppedRequests.RecordDroppedRequest(classification.PriorityLevelName) + + // TODO(wojtek-t): Idea from deads2k: we can consider some jittering and in case of non-int + // number, just return the truncated result and sleep the remainder server-side. + tooManyRequests(r, w, strconv.Itoa(int(h.droppedRequests.GetRetryAfter(classification.PriorityLevelName)))) } } @@ -317,6 +327,7 @@ func WithPriorityAndFairness( longRunningRequestCheck: longRunningRequestCheck, fcIfc: fcIfc, workEstimator: workEstimator, + droppedRequests: utilflowcontrol.NewDroppedRequestsTracker(), } return http.HandlerFunc(priorityAndFairnessHandler.Handle) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index e6116b536bc..062584177ee 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -360,11 +360,12 @@ func newFakeWatchApfFilter(capacity int) *fakeWatchApfFilter { func (f *fakeWatchApfFilter) Handle(ctx context.Context, requestDigest utilflowcontrol.RequestDigest, - _ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), + noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), _ func() fcrequest.WorkEstimate, _ fq.QueueNoteFn, execFn func(), ) { + noteFn(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault, requestDigest.User.GetName()) canExecute := false func() { f.lock.Lock() diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/dropped_requests_tracker.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/dropped_requests_tracker.go new file mode 100644 index 00000000000..15394120ec0 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/dropped_requests_tracker.go @@ -0,0 +1,231 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flowcontrol + +import ( + "sync" + "sync/atomic" + "time" + + "k8s.io/utils/clock" +) + +const ( + // maxRetryAfter represents the maximum possible retryAfter. + maxRetryAfter = int64(32) +) + +// DroppedRequestsTracker is an interface that allows tracking +// a history od dropped requests in the system for the purpose +// of adjusting RetryAfter header to avoid system overload. +type DroppedRequestsTracker interface { + // RecordDroppedRequest records a request that was just + // dropped from processing. + RecordDroppedRequest(plName string) + + // GetRetryAfter returns the current suggested value of + // RetryAfter value. + GetRetryAfter(plName string) int64 +} + +// unixStat keeps a statistic how many requests were dropped within +// a single second. +type unixStat struct { + unixTime int64 + requests int64 +} + +type droppedRequestsStats struct { + lock sync.RWMutex + + // history stores the history of dropped requests. + history []unixStat + + // To reduce lock-contention, we store the information about + // the current second here, which we can then access under + // reader lock. + currentUnix int64 + currentCount atomic.Int64 + + retryAfter atomic.Int64 + retryAfterUpdateUnix int64 +} + +func newDroppedRequestsStats(nowUnix int64) *droppedRequestsStats { + result := &droppedRequestsStats{ + // We assume that we can bump at any time after first dropped request. + retryAfterUpdateUnix: 0, + } + result.retryAfter.Store(1) + return result +} + +func (s *droppedRequestsStats) recordDroppedRequest(unixTime int64) { + // Short path - if the current second matches passed time, + // just update the stats. + if done := func() bool { + s.lock.RLock() + defer s.lock.RUnlock() + if s.currentUnix == unixTime { + s.currentCount.Add(1) + return true + } + return false + }(); done { + return + } + + // We trigger the change of . + s.lock.Lock() + defer s.lock.Unlock() + if s.currentUnix == unixTime { + s.currentCount.Add(1) + return + } + + s.updateHistory(s.currentUnix, s.currentCount.Load()) + s.currentUnix = unixTime + s.currentCount.Store(1) + + // We only consider updating retryAfter when bumping the current second. + // However, given that we didn't report anything for the current second, + // we recompute it based on statistics from the previous one. + s.updateRetryAfterIfNeededLocked(unixTime) +} + +func (s *droppedRequestsStats) updateHistory(unixTime int64, count int64) { + s.history = append(s.history, unixStat{unixTime: unixTime, requests: count}) + + startIndex := 0 + for ; startIndex < len(s.history) && unixTime-s.history[startIndex].unixTime > maxRetryAfter; startIndex++ { + } + if startIndex > 0 { + s.history = s.history[startIndex:] + } +} + +// updateRetryAfterIfNeededLocked updates the retryAfter based on the number of +// dropped requests in the last `retryAfter` seconds: +// - if there were less than `retryAfter` dropped requests, it decreases +// retryAfter +// - if there were at least 3*`retryAfter` dropped requests, it increases +// retryAfter +// +// The rationale behind these numbers being fairly low is that APF is queuing +// requests and rejecting (dropping) them is a last resort, which is not expected +// unless a given priority level is actually overloaded. +// +// Additionally, we rate-limit the increases of retryAfter to wait at least +// `retryAfter' seconds after the previous increase to avoid multiple bumps +// on a single spike. +// +// We're working with the interval [unixTime-retryAfter, unixTime). +func (s *droppedRequestsStats) updateRetryAfterIfNeededLocked(unixTime int64) { + retryAfter := s.retryAfter.Load() + + droppedRequests := int64(0) + if len(s.history) > 0 { + for i := len(s.history) - 1; i >= 0; i-- { + if unixTime-s.history[i].unixTime > retryAfter { + break + } + if s.history[i].unixTime < unixTime { + droppedRequests += s.history[i].requests + } + } + } + + if unixTime-s.retryAfterUpdateUnix >= retryAfter && droppedRequests >= 3*retryAfter { + // We try to mimic the TCP algorithm and thus are doubling + // the retryAfter here. + retryAfter *= 2 + if retryAfter >= maxRetryAfter { + retryAfter = maxRetryAfter + } + s.retryAfter.Store(retryAfter) + s.retryAfterUpdateUnix = unixTime + return + } + + if droppedRequests < retryAfter && retryAfter > 1 { + // We try to mimc the TCP algorithm and thus are linearly + // scaling down the retryAfter here. + retryAfter-- + s.retryAfter.Store(retryAfter) + return + } +} + +// droppedRequestsTracker implement DroppedRequestsTracker interface +// for the purpose of adjusting RetryAfter header for newly dropped +// requests to avoid system overload. +type droppedRequestsTracker struct { + now func() time.Time + + lock sync.RWMutex + plStats map[string]*droppedRequestsStats +} + +// NewDroppedRequestsTracker is creating a new instance of +// DroppedRequestsTracker. +func NewDroppedRequestsTracker() DroppedRequestsTracker { + return newDroppedRequestsTracker(clock.RealClock{}.Now) +} + +func newDroppedRequestsTracker(now func() time.Time) *droppedRequestsTracker { + return &droppedRequestsTracker{ + now: now, + plStats: make(map[string]*droppedRequestsStats), + } +} + +func (t *droppedRequestsTracker) RecordDroppedRequest(plName string) { + unixTime := t.now().Unix() + + stats := func() *droppedRequestsStats { + // The list of priority levels should change very infrequently, + // so in almost all cases, the fast path should be enough. + t.lock.RLock() + if plStats, ok := t.plStats[plName]; ok { + t.lock.RUnlock() + return plStats + } + t.lock.RUnlock() + + // Slow path taking writer lock to update the map. + t.lock.Lock() + defer t.lock.Unlock() + if plStats, ok := t.plStats[plName]; ok { + return plStats + } + stats := newDroppedRequestsStats(unixTime) + t.plStats[plName] = stats + return stats + }() + + stats.recordDroppedRequest(unixTime) +} + +func (t *droppedRequestsTracker) GetRetryAfter(plName string) int64 { + t.lock.RLock() + defer t.lock.RUnlock() + + if plStats, ok := t.plStats[plName]; ok { + return plStats.retryAfter.Load() + } + return 1 +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/dropped_requests_tracker_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/dropped_requests_tracker_test.go new file mode 100644 index 00000000000..2962d25ae43 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/dropped_requests_tracker_test.go @@ -0,0 +1,170 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flowcontrol + +import ( + "fmt" + "sync" + "testing" + "time" + + testingclock "k8s.io/utils/clock/testing" +) + +func TestDroppedRequestsTracker(t *testing.T) { + fakeClock := testingclock.NewFakeClock(time.Now()) + tracker := newDroppedRequestsTracker(fakeClock.Now) + + // The following table represents the list over time of: + // - seconds elapsed (as computed since the initial time) + // - requests that will be recorded as dropped in a current second + steps := []struct { + secondsElapsed int + // droppedRequests is the number of requests to drop, after + // secondsElapsed. + droppedRequests int + // retryAfter is the expected retryAfter after all dropped + // requests are recorded via RecordDroppedRequest. + retryAfter int64 + }{ + {secondsElapsed: 0, droppedRequests: 5, retryAfter: 1}, + {secondsElapsed: 1, droppedRequests: 11, retryAfter: 2}, + // Check that we don't bump immediately after despite + // multiple dropped requests. + {secondsElapsed: 2, droppedRequests: 1, retryAfter: 2}, + {secondsElapsed: 3, droppedRequests: 11, retryAfter: 4}, + {secondsElapsed: 4, droppedRequests: 1, retryAfter: 4}, + {secondsElapsed: 7, droppedRequests: 1, retryAfter: 8}, + {secondsElapsed: 11, droppedRequests: 1, retryAfter: 8}, + {secondsElapsed: 15, droppedRequests: 1, retryAfter: 7}, + {secondsElapsed: 17, droppedRequests: 1, retryAfter: 6}, + {secondsElapsed: 21, droppedRequests: 14, retryAfter: 5}, + {secondsElapsed: 22, droppedRequests: 1, retryAfter: 10}, + } + + for i, step := range steps { + secondsToAdvance := step.secondsElapsed + if i > 0 { + secondsToAdvance -= steps[i-1].secondsElapsed + } + fakeClock.Step(time.Duration(secondsToAdvance) * time.Second) + + // Record only first dropped request and recompute retryAfter. + for r := 0; r < step.droppedRequests; r++ { + tracker.RecordDroppedRequest("pl") + } + if retryAfter := tracker.GetRetryAfter("pl"); retryAfter != step.retryAfter { + t.Errorf("Unexpected retryAfter: %v, expected: %v", retryAfter, step.retryAfter) + } + + } +} + +func TestDroppedRequestsTrackerPLIndependent(t *testing.T) { + fakeClock := testingclock.NewFakeClock(time.Now()) + tracker := newDroppedRequestsTracker(fakeClock.Now) + + // Report single dropped requests in multiple PLs. + // Validate if RetryAfter isn't bumped next second. + for i := 0; i < 10; i++ { + tracker.RecordDroppedRequest(fmt.Sprintf("pl-%d", i)) + } + fakeClock.Step(time.Second) + for i := 0; i < 10; i++ { + tracker.RecordDroppedRequest(fmt.Sprintf("pl-%d", i)) + retryAfter := tracker.GetRetryAfter(fmt.Sprintf("pl-%d", i)) + if retryAfter != 1 { + t.Errorf("Unexpected retryAfter for pl-%d: %v", i, retryAfter) + } + } + + // Record few droped requests on a single PL. + // Validate that RetryAfter is bumped only for this PL. + for i := 0; i < 5; i++ { + tracker.RecordDroppedRequest("pl-0") + } + fakeClock.Step(time.Second) + for i := 0; i < 10; i++ { + tracker.RecordDroppedRequest(fmt.Sprintf("pl-%d", i)) + retryAfter := tracker.GetRetryAfter(fmt.Sprintf("pl-%d", i)) + switch i { + case 0: + if retryAfter != 2 { + t.Errorf("Unexpected retryAfter for pl-0: %v", retryAfter) + } + default: + if retryAfter != 1 { + t.Errorf("Unexpected retryAfter for pl-%d: %v", i, retryAfter) + } + } + } + // Validate also PL for which no dropped requests was recorded. + if retryAfter := tracker.GetRetryAfter("other-pl"); retryAfter != 1 { + t.Errorf("Unexpected retryAfter for other-pl: %v", retryAfter) + } +} + +func BenchmarkDroppedRequestsTracker(b *testing.B) { + b.StopTimer() + + fakeClock := testingclock.NewFakeClock(time.Now()) + tracker := newDroppedRequestsTracker(fakeClock.Now) + + startCh := make(chan struct{}) + wg := sync.WaitGroup{} + numPLs := 5 + // For all `numPLs` priority levels, create b.N workers each + // of which will try to record a dropped request every 100ms + // with a random jitter. + for i := 0; i < numPLs; i++ { + plName := fmt.Sprintf("priority-level-%d", i) + for i := 0; i < b.N; i++ { + wg.Add(1) + go func() { + defer wg.Done() + <-startCh + + for a := 0; a < 5; a++ { + tracker.RecordDroppedRequest(plName) + time.Sleep(25 * time.Millisecond) + } + }() + } + } + // Time-advancing goroutine. + stopCh := make(chan struct{}) + timeWg := sync.WaitGroup{} + timeWg.Add(1) + go func() { + defer timeWg.Done() + for { + select { + case <-stopCh: + return + case <-time.After(25 * time.Millisecond): + fakeClock.Step(time.Second) + } + } + }() + + b.StartTimer() + close(startCh) + wg.Wait() + + close(stopCh) + timeWg.Wait() +}