Refactor APF handler in preparation for dynamic retryAfter

This commit is contained in:
Wojciech Tyczyński 2023-04-18 20:34:25 +02:00
parent f418411d0f
commit 16fecf3e76
2 changed files with 239 additions and 224 deletions

View File

@ -34,7 +34,6 @@ import (
const (
// Constant for the retry-after interval on rate limiting.
// TODO: maybe make this dynamic? or user-adjustable?
retryAfter = "1"
// How often inflight usage metric should be updated. Because
@ -210,7 +209,7 @@ func WithMaxInFlightLimit(
// We need to split this data between buckets used for throttling.
metrics.RecordDroppedRequest(r, requestInfo, metrics.APIServerComponent, isMutatingRequest)
metrics.RecordRequestTermination(r, requestInfo, metrics.APIServerComponent, http.StatusTooManyRequests)
tooManyRequests(r, w)
tooManyRequests(r, w, retryAfter)
}
}
})
@ -221,9 +220,3 @@ func WithMaxInFlightLimit(
func StartMaxInFlightWatermarkMaintenance(stopCh <-chan struct{}) {
startWatermarkMaintenance(watermark, stopCh)
}
func tooManyRequests(req *http.Request, w http.ResponseWriter) {
// Return a 429 status indicating "Too Many Requests"
w.Header().Set("Retry-After", retryAfter)
http.Error(w, "Too many requests, please try again later.", http.StatusTooManyRequests)
}

View File

@ -67,6 +67,231 @@ func truncateLogField(s string) string {
var initAPFOnce sync.Once
type priorityAndFairnessHandler struct {
handler http.Handler
longRunningRequestCheck apirequest.LongRunningRequestCheck
fcIfc utilflowcontrol.Interface
workEstimator flowcontrolrequest.WorkEstimatorFunc
}
func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
if !ok {
handleError(w, r, fmt.Errorf("no RequestInfo found in context"))
return
}
user, ok := apirequest.UserFrom(ctx)
if !ok {
handleError(w, r, fmt.Errorf("no User found in context"))
return
}
isWatchRequest := watchVerbs.Has(requestInfo.Verb)
// Skip tracking long running non-watch requests.
if h.longRunningRequestCheck != nil && h.longRunningRequestCheck(r, requestInfo) && !isWatchRequest {
klog.V(6).Infof("Serving RequestInfo=%#+v, user.Info=%#+v as longrunning\n", requestInfo, user)
h.handler.ServeHTTP(w, r)
return
}
var classification *PriorityAndFairnessClassification
noteFn := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) {
classification = &PriorityAndFairnessClassification{
FlowSchemaName: fs.Name,
FlowSchemaUID: fs.UID,
PriorityLevelName: pl.Name,
PriorityLevelUID: pl.UID,
}
httplog.AddKeyValue(ctx, "apf_pl", truncateLogField(pl.Name))
httplog.AddKeyValue(ctx, "apf_fs", truncateLogField(fs.Name))
}
// estimateWork is called, if at all, after noteFn
estimateWork := func() flowcontrolrequest.WorkEstimate {
if classification == nil {
// workEstimator is being invoked before classification of
// the request has completed, we should never be here though.
klog.ErrorS(fmt.Errorf("workEstimator is being invoked before classification of the request has completed"),
"Using empty FlowSchema and PriorityLevelConfiguration name", "verb", r.Method, "URI", r.RequestURI)
return h.workEstimator(r, "", "")
}
workEstimate := h.workEstimator(r, classification.FlowSchemaName, classification.PriorityLevelName)
fcmetrics.ObserveWorkEstimatedSeats(classification.PriorityLevelName, classification.FlowSchemaName, workEstimate.MaxSeats())
httplog.AddKeyValue(ctx, "apf_iseats", workEstimate.InitialSeats)
httplog.AddKeyValue(ctx, "apf_fseats", workEstimate.FinalSeats)
httplog.AddKeyValue(ctx, "apf_additionalLatency", workEstimate.AdditionalLatency)
return workEstimate
}
var served bool
isMutatingRequest := !nonMutatingRequestVerbs.Has(requestInfo.Verb)
noteExecutingDelta := func(delta int32) {
if isMutatingRequest {
watermark.recordMutating(int(atomic.AddInt32(&atomicMutatingExecuting, delta)))
} else {
watermark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyExecuting, delta)))
}
}
noteWaitingDelta := func(delta int32) {
if isMutatingRequest {
waitingMark.recordMutating(int(atomic.AddInt32(&atomicMutatingWaiting, delta)))
} else {
waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta)))
}
}
queueNote := func(inQueue bool) {
if inQueue {
noteWaitingDelta(1)
} else {
noteWaitingDelta(-1)
}
}
digest := utilflowcontrol.RequestDigest{
RequestInfo: requestInfo,
User: user,
}
if isWatchRequest {
// This channel blocks calling handler.ServeHTTP() until closed, and is closed inside execute().
// If APF rejects the request, it is never closed.
shouldStartWatchCh := make(chan struct{})
watchInitializationSignal := newInitializationSignal()
// This wraps the request passed to handler.ServeHTTP(),
// setting a context that plumbs watchInitializationSignal to storage
var watchReq *http.Request
// This is set inside execute(), prior to closing shouldStartWatchCh.
// If the request is rejected by APF it is left nil.
var forgetWatch utilflowcontrol.ForgetWatchFunc
defer func() {
// Protect from the situation when request will not reach storage layer
// and the initialization signal will not be send.
if watchInitializationSignal != nil {
watchInitializationSignal.Signal()
}
// Forget the watcher if it was registered.
//
// This is race-free because by this point, one of the following occurred:
// case <-shouldStartWatchCh: execute() completed the assignment to forgetWatch
// case <-resultCh: Handle() completed, and Handle() does not return
// while execute() is running
if forgetWatch != nil {
forgetWatch()
}
}()
execute := func() {
startedAt := time.Now()
defer func() {
httplog.AddKeyValue(ctx, "apf_init_latency", time.Since(startedAt))
}()
noteExecutingDelta(1)
defer noteExecutingDelta(-1)
served = true
setResponseHeaders(classification, w)
forgetWatch = h.fcIfc.RegisterWatch(r)
// Notify the main thread that we're ready to start the watch.
close(shouldStartWatchCh)
// Wait until the request is finished from the APF point of view
// (which is when its initialization is done).
watchInitializationSignal.Wait()
}
// Ensure that an item can be put to resultCh asynchronously.
resultCh := make(chan interface{}, 1)
// Call Handle in a separate goroutine.
// The reason for it is that from APF point of view, the request processing
// finishes as soon as watch is initialized (which is generally orders of
// magnitude faster then the watch request itself). This means that Handle()
// call finishes much faster and for performance reasons we want to reduce
// the number of running goroutines - so we run the shorter thing in a
// dedicated goroutine and the actual watch handler in the main one.
go func() {
defer func() {
err := recover()
// do not wrap the sentinel ErrAbortHandler panic value
if err != nil && err != http.ErrAbortHandler {
// Same as stdlib http server code. Manually allocate stack
// trace buffer size to prevent excessively large logs
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
err = fmt.Sprintf("%v\n%s", err, buf)
}
// Ensure that the result is put into resultCh independently of the panic.
resultCh <- err
}()
// We create handleCtx with explicit cancelation function.
// The reason for it is that Handle() underneath may start additional goroutine
// that is blocked on context cancellation. However, from APF point of view,
// we don't want to wait until the whole watch request is processed (which is
// when it context is actually cancelled) - we want to unblock the goroutine as
// soon as the request is processed from the APF point of view.
//
// Note that we explicitly do NOT call the actuall handler using that context
// to avoid cancelling request too early.
handleCtx, handleCtxCancel := context.WithCancel(ctx)
defer handleCtxCancel()
// Note that Handle will return irrespective of whether the request
// executes or is rejected. In the latter case, the function will return
// without calling the passed `execute` function.
h.fcIfc.Handle(handleCtx, digest, noteFn, estimateWork, queueNote, execute)
}()
select {
case <-shouldStartWatchCh:
watchCtx := utilflowcontrol.WithInitializationSignal(ctx, watchInitializationSignal)
watchReq = r.WithContext(watchCtx)
h.handler.ServeHTTP(w, watchReq)
// Protect from the situation when request will not reach storage layer
// and the initialization signal will not be send.
// It has to happen before waiting on the resultCh below.
watchInitializationSignal.Signal()
// TODO: Consider finishing the request as soon as Handle call panics.
if err := <-resultCh; err != nil {
panic(err)
}
case err := <-resultCh:
if err != nil {
panic(err)
}
}
} else {
execute := func() {
noteExecutingDelta(1)
defer noteExecutingDelta(-1)
served = true
setResponseHeaders(classification, w)
h.handler.ServeHTTP(w, r)
}
h.fcIfc.Handle(ctx, digest, noteFn, estimateWork, queueNote, execute)
}
if !served {
setResponseHeaders(classification, w)
epmetrics.RecordDroppedRequest(r, requestInfo, epmetrics.APIServerComponent, isMutatingRequest)
epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests)
tooManyRequests(r, w, retryAfter)
}
}
// WithPriorityAndFairness limits the number of in-flight
// requests in a fine-grained way.
func WithPriorityAndFairness(
@ -86,223 +311,14 @@ func WithPriorityAndFairness(
waitingMark.readOnlyObserver = fcmetrics.GetWaitingReadonlyConcurrency()
waitingMark.mutatingObserver = fcmetrics.GetWaitingMutatingConcurrency()
})
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
if !ok {
handleError(w, r, fmt.Errorf("no RequestInfo found in context"))
return
}
user, ok := apirequest.UserFrom(ctx)
if !ok {
handleError(w, r, fmt.Errorf("no User found in context"))
return
}
isWatchRequest := watchVerbs.Has(requestInfo.Verb)
// Skip tracking long running non-watch requests.
if longRunningRequestCheck != nil && longRunningRequestCheck(r, requestInfo) && !isWatchRequest {
klog.V(6).Infof("Serving RequestInfo=%#+v, user.Info=%#+v as longrunning\n", requestInfo, user)
handler.ServeHTTP(w, r)
return
}
var classification *PriorityAndFairnessClassification
noteFn := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) {
classification = &PriorityAndFairnessClassification{
FlowSchemaName: fs.Name,
FlowSchemaUID: fs.UID,
PriorityLevelName: pl.Name,
PriorityLevelUID: pl.UID}
httplog.AddKeyValue(ctx, "apf_pl", truncateLogField(pl.Name))
httplog.AddKeyValue(ctx, "apf_fs", truncateLogField(fs.Name))
}
// estimateWork is called, if at all, after noteFn
estimateWork := func() flowcontrolrequest.WorkEstimate {
if classification == nil {
// workEstimator is being invoked before classification of
// the request has completed, we should never be here though.
klog.ErrorS(fmt.Errorf("workEstimator is being invoked before classification of the request has completed"),
"Using empty FlowSchema and PriorityLevelConfiguration name", "verb", r.Method, "URI", r.RequestURI)
return workEstimator(r, "", "")
}
workEstimate := workEstimator(r, classification.FlowSchemaName, classification.PriorityLevelName)
fcmetrics.ObserveWorkEstimatedSeats(classification.PriorityLevelName, classification.FlowSchemaName, workEstimate.MaxSeats())
httplog.AddKeyValue(ctx, "apf_iseats", workEstimate.InitialSeats)
httplog.AddKeyValue(ctx, "apf_fseats", workEstimate.FinalSeats)
httplog.AddKeyValue(ctx, "apf_additionalLatency", workEstimate.AdditionalLatency)
return workEstimate
}
var served bool
isMutatingRequest := !nonMutatingRequestVerbs.Has(requestInfo.Verb)
noteExecutingDelta := func(delta int32) {
if isMutatingRequest {
watermark.recordMutating(int(atomic.AddInt32(&atomicMutatingExecuting, delta)))
} else {
watermark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyExecuting, delta)))
}
}
noteWaitingDelta := func(delta int32) {
if isMutatingRequest {
waitingMark.recordMutating(int(atomic.AddInt32(&atomicMutatingWaiting, delta)))
} else {
waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta)))
}
}
queueNote := func(inQueue bool) {
if inQueue {
noteWaitingDelta(1)
} else {
noteWaitingDelta(-1)
}
}
digest := utilflowcontrol.RequestDigest{
RequestInfo: requestInfo,
User: user,
}
if isWatchRequest {
// This channel blocks calling handler.ServeHTTP() until closed, and is closed inside execute().
// If APF rejects the request, it is never closed.
shouldStartWatchCh := make(chan struct{})
watchInitializationSignal := newInitializationSignal()
// This wraps the request passed to handler.ServeHTTP(),
// setting a context that plumbs watchInitializationSignal to storage
var watchReq *http.Request
// This is set inside execute(), prior to closing shouldStartWatchCh.
// If the request is rejected by APF it is left nil.
var forgetWatch utilflowcontrol.ForgetWatchFunc
defer func() {
// Protect from the situation when request will not reach storage layer
// and the initialization signal will not be send.
if watchInitializationSignal != nil {
watchInitializationSignal.Signal()
}
// Forget the watcher if it was registered.
//
// // This is race-free because by this point, one of the following occurred:
// case <-shouldStartWatchCh: execute() completed the assignment to forgetWatch
// case <-resultCh: Handle() completed, and Handle() does not return
// while execute() is running
if forgetWatch != nil {
forgetWatch()
}
}()
execute := func() {
startedAt := time.Now()
defer func() {
httplog.AddKeyValue(ctx, "apf_init_latency", time.Since(startedAt))
}()
noteExecutingDelta(1)
defer noteExecutingDelta(-1)
served = true
setResponseHeaders(classification, w)
forgetWatch = fcIfc.RegisterWatch(r)
// Notify the main thread that we're ready to start the watch.
close(shouldStartWatchCh)
// Wait until the request is finished from the APF point of view
// (which is when its initialization is done).
watchInitializationSignal.Wait()
}
// Ensure that an item can be put to resultCh asynchronously.
resultCh := make(chan interface{}, 1)
// Call Handle in a separate goroutine.
// The reason for it is that from APF point of view, the request processing
// finishes as soon as watch is initialized (which is generally orders of
// magnitude faster then the watch request itself). This means that Handle()
// call finishes much faster and for performance reasons we want to reduce
// the number of running goroutines - so we run the shorter thing in a
// dedicated goroutine and the actual watch handler in the main one.
go func() {
defer func() {
err := recover()
// do not wrap the sentinel ErrAbortHandler panic value
if err != nil && err != http.ErrAbortHandler {
// Same as stdlib http server code. Manually allocate stack
// trace buffer size to prevent excessively large logs
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
err = fmt.Sprintf("%v\n%s", err, buf)
}
// Ensure that the result is put into resultCh independently of the panic.
resultCh <- err
}()
// We create handleCtx with explicit cancelation function.
// The reason for it is that Handle() underneath may start additional goroutine
// that is blocked on context cancellation. However, from APF point of view,
// we don't want to wait until the whole watch request is processed (which is
// when it context is actually cancelled) - we want to unblock the goroutine as
// soon as the request is processed from the APF point of view.
//
// Note that we explicitly do NOT call the actuall handler using that context
// to avoid cancelling request too early.
handleCtx, handleCtxCancel := context.WithCancel(ctx)
defer handleCtxCancel()
// Note that Handle will return irrespective of whether the request
// executes or is rejected. In the latter case, the function will return
// without calling the passed `execute` function.
fcIfc.Handle(handleCtx, digest, noteFn, estimateWork, queueNote, execute)
}()
select {
case <-shouldStartWatchCh:
watchCtx := utilflowcontrol.WithInitializationSignal(ctx, watchInitializationSignal)
watchReq = r.WithContext(watchCtx)
handler.ServeHTTP(w, watchReq)
// Protect from the situation when request will not reach storage layer
// and the initialization signal will not be send.
// It has to happen before waiting on the resultCh below.
watchInitializationSignal.Signal()
// TODO: Consider finishing the request as soon as Handle call panics.
if err := <-resultCh; err != nil {
panic(err)
}
case err := <-resultCh:
if err != nil {
panic(err)
}
}
} else {
execute := func() {
noteExecutingDelta(1)
defer noteExecutingDelta(-1)
served = true
setResponseHeaders(classification, w)
handler.ServeHTTP(w, r)
}
fcIfc.Handle(ctx, digest, noteFn, estimateWork, queueNote, execute)
}
if !served {
setResponseHeaders(classification, w)
epmetrics.RecordDroppedRequest(r, requestInfo, epmetrics.APIServerComponent, isMutatingRequest)
epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests)
tooManyRequests(r, w)
}
})
priorityAndFairnessHandler := &priorityAndFairnessHandler{
handler: handler,
longRunningRequestCheck: longRunningRequestCheck,
fcIfc: fcIfc,
workEstimator: workEstimator,
}
return http.HandlerFunc(priorityAndFairnessHandler.Handle)
}
// StartPriorityAndFairnessWatermarkMaintenance starts the goroutines to observe and maintain watermarks for
@ -323,3 +339,9 @@ func setResponseHeaders(classification *PriorityAndFairnessClassification, w htt
w.Header().Set(flowcontrol.ResponseHeaderMatchedPriorityLevelConfigurationUID, string(classification.PriorityLevelUID))
w.Header().Set(flowcontrol.ResponseHeaderMatchedFlowSchemaUID, string(classification.FlowSchemaUID))
}
func tooManyRequests(req *http.Request, w http.ResponseWriter, retryAfter string) {
// Return a 429 status indicating "Too Many Requests"
w.Header().Set("Retry-After", retryAfter)
http.Error(w, "Too many requests, please try again later.", http.StatusTooManyRequests)
}