Merge pull request #117547 from wojtek-t/apf_dynamic_retry_after

Return dynamic RetryAfter header from APF
This commit is contained in:
Kubernetes Prow Robot 2023-05-15 12:19:07 -07:00 committed by GitHub
commit 2a4bf451b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 653 additions and 225 deletions

View File

@ -34,7 +34,6 @@ import (
const (
// Constant for the retry-after interval on rate limiting.
// TODO: maybe make this dynamic? or user-adjustable?
retryAfter = "1"
// How often inflight usage metric should be updated. Because
@ -210,7 +209,7 @@ func WithMaxInFlightLimit(
// We need to split this data between buckets used for throttling.
metrics.RecordDroppedRequest(r, requestInfo, metrics.APIServerComponent, isMutatingRequest)
metrics.RecordRequestTermination(r, requestInfo, metrics.APIServerComponent, http.StatusTooManyRequests)
tooManyRequests(r, w)
tooManyRequests(r, w, retryAfter)
}
}
})
@ -221,9 +220,3 @@ func WithMaxInFlightLimit(
func StartMaxInFlightWatermarkMaintenance(stopCh <-chan struct{}) {
startWatermarkMaintenance(watermark, stopCh)
}
func tooManyRequests(req *http.Request, w http.ResponseWriter) {
// Return a 429 status indicating "Too Many Requests"
w.Header().Set("Retry-After", retryAfter)
http.Error(w, "Too many requests, please try again later.", http.StatusTooManyRequests)
}

View File

@ -21,6 +21,7 @@ import (
"fmt"
"net/http"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
@ -67,6 +68,240 @@ func truncateLogField(s string) string {
var initAPFOnce sync.Once
type priorityAndFairnessHandler struct {
handler http.Handler
longRunningRequestCheck apirequest.LongRunningRequestCheck
fcIfc utilflowcontrol.Interface
workEstimator flowcontrolrequest.WorkEstimatorFunc
// droppedRequests tracks the history of dropped requests for
// the purpose of computing RetryAfter header to avoid system
// overload.
droppedRequests utilflowcontrol.DroppedRequestsTracker
}
func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
if !ok {
handleError(w, r, fmt.Errorf("no RequestInfo found in context"))
return
}
user, ok := apirequest.UserFrom(ctx)
if !ok {
handleError(w, r, fmt.Errorf("no User found in context"))
return
}
isWatchRequest := watchVerbs.Has(requestInfo.Verb)
// Skip tracking long running non-watch requests.
if h.longRunningRequestCheck != nil && h.longRunningRequestCheck(r, requestInfo) && !isWatchRequest {
klog.V(6).Infof("Serving RequestInfo=%#+v, user.Info=%#+v as longrunning\n", requestInfo, user)
h.handler.ServeHTTP(w, r)
return
}
var classification *PriorityAndFairnessClassification
noteFn := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) {
classification = &PriorityAndFairnessClassification{
FlowSchemaName: fs.Name,
FlowSchemaUID: fs.UID,
PriorityLevelName: pl.Name,
PriorityLevelUID: pl.UID,
}
httplog.AddKeyValue(ctx, "apf_pl", truncateLogField(pl.Name))
httplog.AddKeyValue(ctx, "apf_fs", truncateLogField(fs.Name))
}
// estimateWork is called, if at all, after noteFn
estimateWork := func() flowcontrolrequest.WorkEstimate {
if classification == nil {
// workEstimator is being invoked before classification of
// the request has completed, we should never be here though.
klog.ErrorS(fmt.Errorf("workEstimator is being invoked before classification of the request has completed"),
"Using empty FlowSchema and PriorityLevelConfiguration name", "verb", r.Method, "URI", r.RequestURI)
return h.workEstimator(r, "", "")
}
workEstimate := h.workEstimator(r, classification.FlowSchemaName, classification.PriorityLevelName)
fcmetrics.ObserveWorkEstimatedSeats(classification.PriorityLevelName, classification.FlowSchemaName, workEstimate.MaxSeats())
httplog.AddKeyValue(ctx, "apf_iseats", workEstimate.InitialSeats)
httplog.AddKeyValue(ctx, "apf_fseats", workEstimate.FinalSeats)
httplog.AddKeyValue(ctx, "apf_additionalLatency", workEstimate.AdditionalLatency)
return workEstimate
}
var served bool
isMutatingRequest := !nonMutatingRequestVerbs.Has(requestInfo.Verb)
noteExecutingDelta := func(delta int32) {
if isMutatingRequest {
watermark.recordMutating(int(atomic.AddInt32(&atomicMutatingExecuting, delta)))
} else {
watermark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyExecuting, delta)))
}
}
noteWaitingDelta := func(delta int32) {
if isMutatingRequest {
waitingMark.recordMutating(int(atomic.AddInt32(&atomicMutatingWaiting, delta)))
} else {
waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta)))
}
}
queueNote := func(inQueue bool) {
if inQueue {
noteWaitingDelta(1)
} else {
noteWaitingDelta(-1)
}
}
digest := utilflowcontrol.RequestDigest{
RequestInfo: requestInfo,
User: user,
}
if isWatchRequest {
// This channel blocks calling handler.ServeHTTP() until closed, and is closed inside execute().
// If APF rejects the request, it is never closed.
shouldStartWatchCh := make(chan struct{})
watchInitializationSignal := newInitializationSignal()
// This wraps the request passed to handler.ServeHTTP(),
// setting a context that plumbs watchInitializationSignal to storage
var watchReq *http.Request
// This is set inside execute(), prior to closing shouldStartWatchCh.
// If the request is rejected by APF it is left nil.
var forgetWatch utilflowcontrol.ForgetWatchFunc
defer func() {
// Protect from the situation when request will not reach storage layer
// and the initialization signal will not be send.
if watchInitializationSignal != nil {
watchInitializationSignal.Signal()
}
// Forget the watcher if it was registered.
//
// This is race-free because by this point, one of the following occurred:
// case <-shouldStartWatchCh: execute() completed the assignment to forgetWatch
// case <-resultCh: Handle() completed, and Handle() does not return
// while execute() is running
if forgetWatch != nil {
forgetWatch()
}
}()
execute := func() {
startedAt := time.Now()
defer func() {
httplog.AddKeyValue(ctx, "apf_init_latency", time.Since(startedAt))
}()
noteExecutingDelta(1)
defer noteExecutingDelta(-1)
served = true
setResponseHeaders(classification, w)
forgetWatch = h.fcIfc.RegisterWatch(r)
// Notify the main thread that we're ready to start the watch.
close(shouldStartWatchCh)
// Wait until the request is finished from the APF point of view
// (which is when its initialization is done).
watchInitializationSignal.Wait()
}
// Ensure that an item can be put to resultCh asynchronously.
resultCh := make(chan interface{}, 1)
// Call Handle in a separate goroutine.
// The reason for it is that from APF point of view, the request processing
// finishes as soon as watch is initialized (which is generally orders of
// magnitude faster then the watch request itself). This means that Handle()
// call finishes much faster and for performance reasons we want to reduce
// the number of running goroutines - so we run the shorter thing in a
// dedicated goroutine and the actual watch handler in the main one.
go func() {
defer func() {
err := recover()
// do not wrap the sentinel ErrAbortHandler panic value
if err != nil && err != http.ErrAbortHandler {
// Same as stdlib http server code. Manually allocate stack
// trace buffer size to prevent excessively large logs
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
err = fmt.Sprintf("%v\n%s", err, buf)
}
// Ensure that the result is put into resultCh independently of the panic.
resultCh <- err
}()
// We create handleCtx with explicit cancelation function.
// The reason for it is that Handle() underneath may start additional goroutine
// that is blocked on context cancellation. However, from APF point of view,
// we don't want to wait until the whole watch request is processed (which is
// when it context is actually cancelled) - we want to unblock the goroutine as
// soon as the request is processed from the APF point of view.
//
// Note that we explicitly do NOT call the actuall handler using that context
// to avoid cancelling request too early.
handleCtx, handleCtxCancel := context.WithCancel(ctx)
defer handleCtxCancel()
// Note that Handle will return irrespective of whether the request
// executes or is rejected. In the latter case, the function will return
// without calling the passed `execute` function.
h.fcIfc.Handle(handleCtx, digest, noteFn, estimateWork, queueNote, execute)
}()
select {
case <-shouldStartWatchCh:
watchCtx := utilflowcontrol.WithInitializationSignal(ctx, watchInitializationSignal)
watchReq = r.WithContext(watchCtx)
h.handler.ServeHTTP(w, watchReq)
// Protect from the situation when request will not reach storage layer
// and the initialization signal will not be send.
// It has to happen before waiting on the resultCh below.
watchInitializationSignal.Signal()
// TODO: Consider finishing the request as soon as Handle call panics.
if err := <-resultCh; err != nil {
panic(err)
}
case err := <-resultCh:
if err != nil {
panic(err)
}
}
} else {
execute := func() {
noteExecutingDelta(1)
defer noteExecutingDelta(-1)
served = true
setResponseHeaders(classification, w)
h.handler.ServeHTTP(w, r)
}
h.fcIfc.Handle(ctx, digest, noteFn, estimateWork, queueNote, execute)
}
if !served {
setResponseHeaders(classification, w)
epmetrics.RecordDroppedRequest(r, requestInfo, epmetrics.APIServerComponent, isMutatingRequest)
epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests)
h.droppedRequests.RecordDroppedRequest(classification.PriorityLevelName)
// TODO(wojtek-t): Idea from deads2k: we can consider some jittering and in case of non-int
// number, just return the truncated result and sleep the remainder server-side.
tooManyRequests(r, w, strconv.Itoa(int(h.droppedRequests.GetRetryAfter(classification.PriorityLevelName))))
}
}
// WithPriorityAndFairness limits the number of in-flight
// requests in a fine-grained way.
func WithPriorityAndFairness(
@ -86,223 +321,15 @@ func WithPriorityAndFairness(
waitingMark.readOnlyObserver = fcmetrics.GetWaitingReadonlyConcurrency()
waitingMark.mutatingObserver = fcmetrics.GetWaitingMutatingConcurrency()
})
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
if !ok {
handleError(w, r, fmt.Errorf("no RequestInfo found in context"))
return
}
user, ok := apirequest.UserFrom(ctx)
if !ok {
handleError(w, r, fmt.Errorf("no User found in context"))
return
}
isWatchRequest := watchVerbs.Has(requestInfo.Verb)
// Skip tracking long running non-watch requests.
if longRunningRequestCheck != nil && longRunningRequestCheck(r, requestInfo) && !isWatchRequest {
klog.V(6).Infof("Serving RequestInfo=%#+v, user.Info=%#+v as longrunning\n", requestInfo, user)
handler.ServeHTTP(w, r)
return
}
var classification *PriorityAndFairnessClassification
noteFn := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) {
classification = &PriorityAndFairnessClassification{
FlowSchemaName: fs.Name,
FlowSchemaUID: fs.UID,
PriorityLevelName: pl.Name,
PriorityLevelUID: pl.UID}
httplog.AddKeyValue(ctx, "apf_pl", truncateLogField(pl.Name))
httplog.AddKeyValue(ctx, "apf_fs", truncateLogField(fs.Name))
}
// estimateWork is called, if at all, after noteFn
estimateWork := func() flowcontrolrequest.WorkEstimate {
if classification == nil {
// workEstimator is being invoked before classification of
// the request has completed, we should never be here though.
klog.ErrorS(fmt.Errorf("workEstimator is being invoked before classification of the request has completed"),
"Using empty FlowSchema and PriorityLevelConfiguration name", "verb", r.Method, "URI", r.RequestURI)
return workEstimator(r, "", "")
}
workEstimate := workEstimator(r, classification.FlowSchemaName, classification.PriorityLevelName)
fcmetrics.ObserveWorkEstimatedSeats(classification.PriorityLevelName, classification.FlowSchemaName, workEstimate.MaxSeats())
httplog.AddKeyValue(ctx, "apf_iseats", workEstimate.InitialSeats)
httplog.AddKeyValue(ctx, "apf_fseats", workEstimate.FinalSeats)
httplog.AddKeyValue(ctx, "apf_additionalLatency", workEstimate.AdditionalLatency)
return workEstimate
}
var served bool
isMutatingRequest := !nonMutatingRequestVerbs.Has(requestInfo.Verb)
noteExecutingDelta := func(delta int32) {
if isMutatingRequest {
watermark.recordMutating(int(atomic.AddInt32(&atomicMutatingExecuting, delta)))
} else {
watermark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyExecuting, delta)))
}
}
noteWaitingDelta := func(delta int32) {
if isMutatingRequest {
waitingMark.recordMutating(int(atomic.AddInt32(&atomicMutatingWaiting, delta)))
} else {
waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta)))
}
}
queueNote := func(inQueue bool) {
if inQueue {
noteWaitingDelta(1)
} else {
noteWaitingDelta(-1)
}
}
digest := utilflowcontrol.RequestDigest{
RequestInfo: requestInfo,
User: user,
}
if isWatchRequest {
// This channel blocks calling handler.ServeHTTP() until closed, and is closed inside execute().
// If APF rejects the request, it is never closed.
shouldStartWatchCh := make(chan struct{})
watchInitializationSignal := newInitializationSignal()
// This wraps the request passed to handler.ServeHTTP(),
// setting a context that plumbs watchInitializationSignal to storage
var watchReq *http.Request
// This is set inside execute(), prior to closing shouldStartWatchCh.
// If the request is rejected by APF it is left nil.
var forgetWatch utilflowcontrol.ForgetWatchFunc
defer func() {
// Protect from the situation when request will not reach storage layer
// and the initialization signal will not be send.
if watchInitializationSignal != nil {
watchInitializationSignal.Signal()
}
// Forget the watcher if it was registered.
//
// // This is race-free because by this point, one of the following occurred:
// case <-shouldStartWatchCh: execute() completed the assignment to forgetWatch
// case <-resultCh: Handle() completed, and Handle() does not return
// while execute() is running
if forgetWatch != nil {
forgetWatch()
}
}()
execute := func() {
startedAt := time.Now()
defer func() {
httplog.AddKeyValue(ctx, "apf_init_latency", time.Since(startedAt))
}()
noteExecutingDelta(1)
defer noteExecutingDelta(-1)
served = true
setResponseHeaders(classification, w)
forgetWatch = fcIfc.RegisterWatch(r)
// Notify the main thread that we're ready to start the watch.
close(shouldStartWatchCh)
// Wait until the request is finished from the APF point of view
// (which is when its initialization is done).
watchInitializationSignal.Wait()
}
// Ensure that an item can be put to resultCh asynchronously.
resultCh := make(chan interface{}, 1)
// Call Handle in a separate goroutine.
// The reason for it is that from APF point of view, the request processing
// finishes as soon as watch is initialized (which is generally orders of
// magnitude faster then the watch request itself). This means that Handle()
// call finishes much faster and for performance reasons we want to reduce
// the number of running goroutines - so we run the shorter thing in a
// dedicated goroutine and the actual watch handler in the main one.
go func() {
defer func() {
err := recover()
// do not wrap the sentinel ErrAbortHandler panic value
if err != nil && err != http.ErrAbortHandler {
// Same as stdlib http server code. Manually allocate stack
// trace buffer size to prevent excessively large logs
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
err = fmt.Sprintf("%v\n%s", err, buf)
}
// Ensure that the result is put into resultCh independently of the panic.
resultCh <- err
}()
// We create handleCtx with explicit cancelation function.
// The reason for it is that Handle() underneath may start additional goroutine
// that is blocked on context cancellation. However, from APF point of view,
// we don't want to wait until the whole watch request is processed (which is
// when it context is actually cancelled) - we want to unblock the goroutine as
// soon as the request is processed from the APF point of view.
//
// Note that we explicitly do NOT call the actuall handler using that context
// to avoid cancelling request too early.
handleCtx, handleCtxCancel := context.WithCancel(ctx)
defer handleCtxCancel()
// Note that Handle will return irrespective of whether the request
// executes or is rejected. In the latter case, the function will return
// without calling the passed `execute` function.
fcIfc.Handle(handleCtx, digest, noteFn, estimateWork, queueNote, execute)
}()
select {
case <-shouldStartWatchCh:
watchCtx := utilflowcontrol.WithInitializationSignal(ctx, watchInitializationSignal)
watchReq = r.WithContext(watchCtx)
handler.ServeHTTP(w, watchReq)
// Protect from the situation when request will not reach storage layer
// and the initialization signal will not be send.
// It has to happen before waiting on the resultCh below.
watchInitializationSignal.Signal()
// TODO: Consider finishing the request as soon as Handle call panics.
if err := <-resultCh; err != nil {
panic(err)
}
case err := <-resultCh:
if err != nil {
panic(err)
}
}
} else {
execute := func() {
noteExecutingDelta(1)
defer noteExecutingDelta(-1)
served = true
setResponseHeaders(classification, w)
handler.ServeHTTP(w, r)
}
fcIfc.Handle(ctx, digest, noteFn, estimateWork, queueNote, execute)
}
if !served {
setResponseHeaders(classification, w)
epmetrics.RecordDroppedRequest(r, requestInfo, epmetrics.APIServerComponent, isMutatingRequest)
epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests)
tooManyRequests(r, w)
}
})
priorityAndFairnessHandler := &priorityAndFairnessHandler{
handler: handler,
longRunningRequestCheck: longRunningRequestCheck,
fcIfc: fcIfc,
workEstimator: workEstimator,
droppedRequests: utilflowcontrol.NewDroppedRequestsTracker(),
}
return http.HandlerFunc(priorityAndFairnessHandler.Handle)
}
// StartPriorityAndFairnessWatermarkMaintenance starts the goroutines to observe and maintain watermarks for
@ -323,3 +350,9 @@ func setResponseHeaders(classification *PriorityAndFairnessClassification, w htt
w.Header().Set(flowcontrol.ResponseHeaderMatchedPriorityLevelConfigurationUID, string(classification.PriorityLevelUID))
w.Header().Set(flowcontrol.ResponseHeaderMatchedFlowSchemaUID, string(classification.FlowSchemaUID))
}
func tooManyRequests(req *http.Request, w http.ResponseWriter, retryAfter string) {
// Return a 429 status indicating "Too Many Requests"
w.Header().Set("Retry-After", retryAfter)
http.Error(w, "Too many requests, please try again later.", http.StatusTooManyRequests)
}

View File

@ -360,11 +360,12 @@ func newFakeWatchApfFilter(capacity int) *fakeWatchApfFilter {
func (f *fakeWatchApfFilter) Handle(ctx context.Context,
requestDigest utilflowcontrol.RequestDigest,
_ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
_ func() fcrequest.WorkEstimate,
_ fq.QueueNoteFn,
execFn func(),
) {
noteFn(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault, requestDigest.User.GetName())
canExecute := false
func() {
f.lock.Lock()

View File

@ -0,0 +1,231 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"sync"
"sync/atomic"
"time"
"k8s.io/utils/clock"
)
const (
// maxRetryAfter represents the maximum possible retryAfter.
maxRetryAfter = int64(32)
)
// DroppedRequestsTracker is an interface that allows tracking
// a history od dropped requests in the system for the purpose
// of adjusting RetryAfter header to avoid system overload.
type DroppedRequestsTracker interface {
// RecordDroppedRequest records a request that was just
// dropped from processing.
RecordDroppedRequest(plName string)
// GetRetryAfter returns the current suggested value of
// RetryAfter value.
GetRetryAfter(plName string) int64
}
// unixStat keeps a statistic how many requests were dropped within
// a single second.
type unixStat struct {
unixTime int64
requests int64
}
type droppedRequestsStats struct {
lock sync.RWMutex
// history stores the history of dropped requests.
history []unixStat
// To reduce lock-contention, we store the information about
// the current second here, which we can then access under
// reader lock.
currentUnix int64
currentCount atomic.Int64
retryAfter atomic.Int64
retryAfterUpdateUnix int64
}
func newDroppedRequestsStats(nowUnix int64) *droppedRequestsStats {
result := &droppedRequestsStats{
// We assume that we can bump at any time after first dropped request.
retryAfterUpdateUnix: 0,
}
result.retryAfter.Store(1)
return result
}
func (s *droppedRequestsStats) recordDroppedRequest(unixTime int64) {
// Short path - if the current second matches passed time,
// just update the stats.
if done := func() bool {
s.lock.RLock()
defer s.lock.RUnlock()
if s.currentUnix == unixTime {
s.currentCount.Add(1)
return true
}
return false
}(); done {
return
}
// We trigger the change of <currentUnix>.
s.lock.Lock()
defer s.lock.Unlock()
if s.currentUnix == unixTime {
s.currentCount.Add(1)
return
}
s.updateHistory(s.currentUnix, s.currentCount.Load())
s.currentUnix = unixTime
s.currentCount.Store(1)
// We only consider updating retryAfter when bumping the current second.
// However, given that we didn't report anything for the current second,
// we recompute it based on statistics from the previous one.
s.updateRetryAfterIfNeededLocked(unixTime)
}
func (s *droppedRequestsStats) updateHistory(unixTime int64, count int64) {
s.history = append(s.history, unixStat{unixTime: unixTime, requests: count})
startIndex := 0
for ; startIndex < len(s.history) && unixTime-s.history[startIndex].unixTime > maxRetryAfter; startIndex++ {
}
if startIndex > 0 {
s.history = s.history[startIndex:]
}
}
// updateRetryAfterIfNeededLocked updates the retryAfter based on the number of
// dropped requests in the last `retryAfter` seconds:
// - if there were less than `retryAfter` dropped requests, it decreases
// retryAfter
// - if there were at least 3*`retryAfter` dropped requests, it increases
// retryAfter
//
// The rationale behind these numbers being fairly low is that APF is queuing
// requests and rejecting (dropping) them is a last resort, which is not expected
// unless a given priority level is actually overloaded.
//
// Additionally, we rate-limit the increases of retryAfter to wait at least
// `retryAfter' seconds after the previous increase to avoid multiple bumps
// on a single spike.
//
// We're working with the interval [unixTime-retryAfter, unixTime).
func (s *droppedRequestsStats) updateRetryAfterIfNeededLocked(unixTime int64) {
retryAfter := s.retryAfter.Load()
droppedRequests := int64(0)
if len(s.history) > 0 {
for i := len(s.history) - 1; i >= 0; i-- {
if unixTime-s.history[i].unixTime > retryAfter {
break
}
if s.history[i].unixTime < unixTime {
droppedRequests += s.history[i].requests
}
}
}
if unixTime-s.retryAfterUpdateUnix >= retryAfter && droppedRequests >= 3*retryAfter {
// We try to mimic the TCP algorithm and thus are doubling
// the retryAfter here.
retryAfter *= 2
if retryAfter >= maxRetryAfter {
retryAfter = maxRetryAfter
}
s.retryAfter.Store(retryAfter)
s.retryAfterUpdateUnix = unixTime
return
}
if droppedRequests < retryAfter && retryAfter > 1 {
// We try to mimc the TCP algorithm and thus are linearly
// scaling down the retryAfter here.
retryAfter--
s.retryAfter.Store(retryAfter)
return
}
}
// droppedRequestsTracker implement DroppedRequestsTracker interface
// for the purpose of adjusting RetryAfter header for newly dropped
// requests to avoid system overload.
type droppedRequestsTracker struct {
now func() time.Time
lock sync.RWMutex
plStats map[string]*droppedRequestsStats
}
// NewDroppedRequestsTracker is creating a new instance of
// DroppedRequestsTracker.
func NewDroppedRequestsTracker() DroppedRequestsTracker {
return newDroppedRequestsTracker(clock.RealClock{}.Now)
}
func newDroppedRequestsTracker(now func() time.Time) *droppedRequestsTracker {
return &droppedRequestsTracker{
now: now,
plStats: make(map[string]*droppedRequestsStats),
}
}
func (t *droppedRequestsTracker) RecordDroppedRequest(plName string) {
unixTime := t.now().Unix()
stats := func() *droppedRequestsStats {
// The list of priority levels should change very infrequently,
// so in almost all cases, the fast path should be enough.
t.lock.RLock()
if plStats, ok := t.plStats[plName]; ok {
t.lock.RUnlock()
return plStats
}
t.lock.RUnlock()
// Slow path taking writer lock to update the map.
t.lock.Lock()
defer t.lock.Unlock()
if plStats, ok := t.plStats[plName]; ok {
return plStats
}
stats := newDroppedRequestsStats(unixTime)
t.plStats[plName] = stats
return stats
}()
stats.recordDroppedRequest(unixTime)
}
func (t *droppedRequestsTracker) GetRetryAfter(plName string) int64 {
t.lock.RLock()
defer t.lock.RUnlock()
if plStats, ok := t.plStats[plName]; ok {
return plStats.retryAfter.Load()
}
return 1
}

View File

@ -0,0 +1,170 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"fmt"
"sync"
"testing"
"time"
testingclock "k8s.io/utils/clock/testing"
)
func TestDroppedRequestsTracker(t *testing.T) {
fakeClock := testingclock.NewFakeClock(time.Now())
tracker := newDroppedRequestsTracker(fakeClock.Now)
// The following table represents the list over time of:
// - seconds elapsed (as computed since the initial time)
// - requests that will be recorded as dropped in a current second
steps := []struct {
secondsElapsed int
// droppedRequests is the number of requests to drop, after
// secondsElapsed.
droppedRequests int
// retryAfter is the expected retryAfter after all dropped
// requests are recorded via RecordDroppedRequest.
retryAfter int64
}{
{secondsElapsed: 0, droppedRequests: 5, retryAfter: 1},
{secondsElapsed: 1, droppedRequests: 11, retryAfter: 2},
// Check that we don't bump immediately after despite
// multiple dropped requests.
{secondsElapsed: 2, droppedRequests: 1, retryAfter: 2},
{secondsElapsed: 3, droppedRequests: 11, retryAfter: 4},
{secondsElapsed: 4, droppedRequests: 1, retryAfter: 4},
{secondsElapsed: 7, droppedRequests: 1, retryAfter: 8},
{secondsElapsed: 11, droppedRequests: 1, retryAfter: 8},
{secondsElapsed: 15, droppedRequests: 1, retryAfter: 7},
{secondsElapsed: 17, droppedRequests: 1, retryAfter: 6},
{secondsElapsed: 21, droppedRequests: 14, retryAfter: 5},
{secondsElapsed: 22, droppedRequests: 1, retryAfter: 10},
}
for i, step := range steps {
secondsToAdvance := step.secondsElapsed
if i > 0 {
secondsToAdvance -= steps[i-1].secondsElapsed
}
fakeClock.Step(time.Duration(secondsToAdvance) * time.Second)
// Record only first dropped request and recompute retryAfter.
for r := 0; r < step.droppedRequests; r++ {
tracker.RecordDroppedRequest("pl")
}
if retryAfter := tracker.GetRetryAfter("pl"); retryAfter != step.retryAfter {
t.Errorf("Unexpected retryAfter: %v, expected: %v", retryAfter, step.retryAfter)
}
}
}
func TestDroppedRequestsTrackerPLIndependent(t *testing.T) {
fakeClock := testingclock.NewFakeClock(time.Now())
tracker := newDroppedRequestsTracker(fakeClock.Now)
// Report single dropped requests in multiple PLs.
// Validate if RetryAfter isn't bumped next second.
for i := 0; i < 10; i++ {
tracker.RecordDroppedRequest(fmt.Sprintf("pl-%d", i))
}
fakeClock.Step(time.Second)
for i := 0; i < 10; i++ {
tracker.RecordDroppedRequest(fmt.Sprintf("pl-%d", i))
retryAfter := tracker.GetRetryAfter(fmt.Sprintf("pl-%d", i))
if retryAfter != 1 {
t.Errorf("Unexpected retryAfter for pl-%d: %v", i, retryAfter)
}
}
// Record few droped requests on a single PL.
// Validate that RetryAfter is bumped only for this PL.
for i := 0; i < 5; i++ {
tracker.RecordDroppedRequest("pl-0")
}
fakeClock.Step(time.Second)
for i := 0; i < 10; i++ {
tracker.RecordDroppedRequest(fmt.Sprintf("pl-%d", i))
retryAfter := tracker.GetRetryAfter(fmt.Sprintf("pl-%d", i))
switch i {
case 0:
if retryAfter != 2 {
t.Errorf("Unexpected retryAfter for pl-0: %v", retryAfter)
}
default:
if retryAfter != 1 {
t.Errorf("Unexpected retryAfter for pl-%d: %v", i, retryAfter)
}
}
}
// Validate also PL for which no dropped requests was recorded.
if retryAfter := tracker.GetRetryAfter("other-pl"); retryAfter != 1 {
t.Errorf("Unexpected retryAfter for other-pl: %v", retryAfter)
}
}
func BenchmarkDroppedRequestsTracker(b *testing.B) {
b.StopTimer()
fakeClock := testingclock.NewFakeClock(time.Now())
tracker := newDroppedRequestsTracker(fakeClock.Now)
startCh := make(chan struct{})
wg := sync.WaitGroup{}
numPLs := 5
// For all `numPLs` priority levels, create b.N workers each
// of which will try to record a dropped request every 100ms
// with a random jitter.
for i := 0; i < numPLs; i++ {
plName := fmt.Sprintf("priority-level-%d", i)
for i := 0; i < b.N; i++ {
wg.Add(1)
go func() {
defer wg.Done()
<-startCh
for a := 0; a < 5; a++ {
tracker.RecordDroppedRequest(plName)
time.Sleep(25 * time.Millisecond)
}
}()
}
}
// Time-advancing goroutine.
stopCh := make(chan struct{})
timeWg := sync.WaitGroup{}
timeWg.Add(1)
go func() {
defer timeWg.Done()
for {
select {
case <-stopCh:
return
case <-time.After(25 * time.Millisecond):
fakeClock.Step(time.Second)
}
}
}()
b.StartTimer()
close(startCh)
wg.Wait()
close(stopCh)
timeWg.Wait()
}