diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index d678f52dfb7..047736e57d4 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -915,7 +915,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { requestWorkEstimator := flowcontrolrequest.NewWorkEstimator( c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg, c.FlowControl.GetMaxSeats) handler = filterlatency.TrackCompleted(handler) - handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator) + handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator, c.RequestTimeout/4) handler = filterlatency.TrackStarted(handler, c.TracerProvider, "priorityandfairness") } else { handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index 6b398778160..05cc44263fb 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -35,6 +35,7 @@ import ( fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics" flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" "k8s.io/klog/v2" + utilsclock "k8s.io/utils/clock" ) // PriorityAndFairnessClassification identifies the results of @@ -78,6 +79,10 @@ type priorityAndFairnessHandler struct { // the purpose of computing RetryAfter header to avoid system // overload. droppedRequests utilflowcontrol.DroppedRequestsTracker + + // newReqWaitCtxFn creates a derived context with a deadline + // of how long a given request can wait in its queue. + newReqWaitCtxFn func(context.Context) (context.Context, context.CancelFunc) } func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Request) { @@ -240,8 +245,9 @@ func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Reque resultCh <- err }() - // We create handleCtx with explicit cancelation function. - // The reason for it is that Handle() underneath may start additional goroutine + // We create handleCtx with an adjusted deadline, for two reasons. + // One is to limit the time the request waits before its execution starts. + // The other reason for it is that Handle() underneath may start additional goroutine // that is blocked on context cancellation. However, from APF point of view, // we don't want to wait until the whole watch request is processed (which is // when it context is actually cancelled) - we want to unblock the goroutine as @@ -249,7 +255,7 @@ func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Reque // // Note that we explicitly do NOT call the actuall handler using that context // to avoid cancelling request too early. - handleCtx, handleCtxCancel := context.WithCancel(ctx) + handleCtx, handleCtxCancel := h.newReqWaitCtxFn(ctx) defer handleCtxCancel() // Note that Handle will return irrespective of whether the request @@ -286,7 +292,11 @@ func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Reque h.handler.ServeHTTP(w, r) } - h.fcIfc.Handle(ctx, digest, noteFn, estimateWork, queueNote, execute) + func() { + handleCtx, cancelFn := h.newReqWaitCtxFn(ctx) + defer cancelFn() + h.fcIfc.Handle(handleCtx, digest, noteFn, estimateWork, queueNote, execute) + }() } if !served { @@ -309,6 +319,7 @@ func WithPriorityAndFairness( longRunningRequestCheck apirequest.LongRunningRequestCheck, fcIfc utilflowcontrol.Interface, workEstimator flowcontrolrequest.WorkEstimatorFunc, + defaultRequestWaitLimit time.Duration, ) http.Handler { if fcIfc == nil { klog.Warningf("priority and fairness support not found, skipping") @@ -322,12 +333,18 @@ func WithPriorityAndFairness( waitingMark.mutatingObserver = fcmetrics.GetWaitingMutatingConcurrency() }) + clock := &utilsclock.RealClock{} + newReqWaitCtxFn := func(ctx context.Context) (context.Context, context.CancelFunc) { + return getRequestWaitContext(ctx, defaultRequestWaitLimit, clock) + } + priorityAndFairnessHandler := &priorityAndFairnessHandler{ handler: handler, longRunningRequestCheck: longRunningRequestCheck, fcIfc: fcIfc, workEstimator: workEstimator, droppedRequests: utilflowcontrol.NewDroppedRequestsTracker(), + newReqWaitCtxFn: newReqWaitCtxFn, } return http.HandlerFunc(priorityAndFairnessHandler.Handle) } @@ -356,3 +373,48 @@ func tooManyRequests(req *http.Request, w http.ResponseWriter, retryAfter string w.Header().Set("Retry-After", retryAfter) http.Error(w, "Too many requests, please try again later.", http.StatusTooManyRequests) } + +// getRequestWaitContext returns a new context with a deadline of how +// long the request is allowed to wait before it is removed from its +// queue and rejected. +// The context.CancelFunc returned must never be nil and the caller is +// responsible for calling the CancelFunc function for cleanup. +// - ctx: the context associated with the request (it may or may +// not have a deadline). +// - defaultRequestWaitLimit: the default wait duration that is used +// if the request context does not have any deadline. +// (a) initialization of a watch or +// (b) a request whose context has no deadline +// +// clock comes in handy for testing the function +func getRequestWaitContext(ctx context.Context, defaultRequestWaitLimit time.Duration, clock utilsclock.PassiveClock) (context.Context, context.CancelFunc) { + if ctx.Err() != nil { + return ctx, func() {} + } + + reqArrivedAt := clock.Now() + if reqReceivedTimestamp, ok := apirequest.ReceivedTimestampFrom(ctx); ok { + reqArrivedAt = reqReceivedTimestamp + } + + // a) we will allow the request to wait in the queue for one + // fourth of the time of its allotted deadline. + // b) if the request context does not have any deadline + // then we default to 'defaultRequestWaitLimit' + // in any case, the wait limit for any request must not + // exceed the hard limit of 1m + // + // request has deadline: + // wait-limit = min(remaining deadline / 4, 1m) + // request has no deadline: + // wait-limit = min(defaultRequestWaitLimit, 1m) + thisReqWaitLimit := defaultRequestWaitLimit + if deadline, ok := ctx.Deadline(); ok { + thisReqWaitLimit = deadline.Sub(reqArrivedAt) / 4 + } + if thisReqWaitLimit > time.Minute { + thisReqWaitLimit = time.Minute + } + + return context.WithDeadline(ctx, reqArrivedAt.Add(thisReqWaitLimit)) +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index 81a17a5b18e..70af7839fc1 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -51,6 +51,7 @@ import ( "k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/testutil" "k8s.io/klog/v2" + clocktesting "k8s.io/utils/clock/testing" "github.com/google/go-cmp/cmp" ) @@ -153,23 +154,23 @@ func newApfServerWithHooks(t *testing.T, decision mockDecision, onExecute, postE WatchTracker: utilflowcontrol.NewWatchTracker(), MaxSeatsTracker: utilflowcontrol.NewMaxSeatsTracker(), } - return newApfServerWithFilter(t, fakeFilter, onExecute, postExecute) + return newApfServerWithFilter(t, fakeFilter, time.Minute/4, onExecute, postExecute) } -func newApfServerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func()) *httptest.Server { +func newApfServerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Interface, defaultWaitLimit time.Duration, onExecute, postExecute func()) *httptest.Server { epmetrics.Register() fcmetrics.Register() - apfServer := httptest.NewServer(newApfHandlerWithFilter(t, flowControlFilter, onExecute, postExecute)) + apfServer := httptest.NewServer(newApfHandlerWithFilter(t, flowControlFilter, defaultWaitLimit, onExecute, postExecute)) return apfServer } -func newApfHandlerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func()) http.Handler { +func newApfHandlerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Interface, defaultWaitLimit time.Duration, onExecute, postExecute func()) http.Handler { requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")} longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy")) apfHandler := WithPriorityAndFairness(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { onExecute() - }), longRunningRequestCheck, flowControlFilter, defaultRequestWorkEstimator) + }), longRunningRequestCheck, flowControlFilter, defaultRequestWorkEstimator, defaultWaitLimit) handler := apifilters.WithRequestInfo(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{ @@ -458,7 +459,7 @@ func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) { postExecuteFunc := func() {} - server := newApfServerWithFilter(t, fakeFilter, onExecuteFunc, postExecuteFunc) + server := newApfServerWithFilter(t, fakeFilter, time.Minute/4, onExecuteFunc, postExecuteFunc) defer server.Close() var wg sync.WaitGroup @@ -498,7 +499,7 @@ func TestApfRejectWatchRequestsWithInitializationSignal(t *testing.T) { } postExecuteFunc := func() {} - server := newApfServerWithFilter(t, fakeFilter, onExecuteFunc, postExecuteFunc) + server := newApfServerWithFilter(t, fakeFilter, time.Minute/4, onExecuteFunc, postExecuteFunc) defer server.Close() if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusTooManyRequests); err != nil { @@ -517,7 +518,7 @@ func TestApfWatchPanic(t *testing.T) { } postExecuteFunc := func() {} - apfHandler := newApfHandlerWithFilter(t, fakeFilter, onExecuteFunc, postExecuteFunc) + apfHandler := newApfHandlerWithFilter(t, fakeFilter, time.Minute/4, onExecuteFunc, postExecuteFunc) handler := func(w http.ResponseWriter, r *http.Request) { defer func() { if err := recover(); err == nil { @@ -564,7 +565,7 @@ func TestApfWatchHandlePanic(t *testing.T) { for _, test := range testCases { t.Run(test.name, func(t *testing.T) { - apfHandler := newApfHandlerWithFilter(t, test.filter, onExecuteFunc, postExecuteFunc) + apfHandler := newApfHandlerWithFilter(t, test.filter, time.Minute/4, onExecuteFunc, postExecuteFunc) handler := func(w http.ResponseWriter, r *http.Request) { defer func() { if err := recover(); err == nil { @@ -649,6 +650,7 @@ func TestApfWithRequestDigest(t *testing.T) { longRunningFunc, fakeFilter, func(_ *http.Request, _, _ string) fcrequest.WorkEstimate { return workExpected }, + time.Minute/4, ) w := httptest.NewRecorder() @@ -1231,7 +1233,7 @@ func newHandlerChain(t *testing.T, handler http.Handler, filter utilflowcontrol. requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")} longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy")) - apfHandler := WithPriorityAndFairness(handler, longRunningRequestCheck, filter, defaultRequestWorkEstimator) + apfHandler := WithPriorityAndFairness(handler, longRunningRequestCheck, filter, defaultRequestWorkEstimator, time.Minute/4) // add the handler in the chain that adds the specified user to the request context handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -1407,3 +1409,107 @@ func isStreamReset(err error) bool { } return false } + +func TestGetRequestWaitContext(t *testing.T) { + tests := []struct { + name string + defaultRequestWaitLimit time.Duration + parent func(t time.Time) (context.Context, context.CancelFunc) + newReqWaitCtxExpected bool + reqWaitLimitExpected time.Duration + }{ + { + name: "context deadline has exceeded", + parent: func(time.Time) (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + return ctx, cancel + }, + }, + { + name: "context has a deadline, 'received at' is not set, wait limit should be one fourth of the remaining deadline from now", + parent: func(now time.Time) (context.Context, context.CancelFunc) { + return context.WithDeadline(context.Background(), now.Add(60*time.Second)) + }, + newReqWaitCtxExpected: true, + reqWaitLimitExpected: 15 * time.Second, + }, + { + name: "context has a deadline, 'received at' is set, wait limit should be one fourth of the deadline starting from the 'received at' time", + parent: func(now time.Time) (context.Context, context.CancelFunc) { + ctx := apirequest.WithReceivedTimestamp(context.Background(), now.Add(-10*time.Second)) + return context.WithDeadline(ctx, now.Add(50*time.Second)) + }, + newReqWaitCtxExpected: true, + reqWaitLimitExpected: 5 * time.Second, // from now + }, + { + name: "context does not have any deadline, 'received at' is not set, default wait limit should be in effect from now", + defaultRequestWaitLimit: 15 * time.Second, + parent: func(time.Time) (context.Context, context.CancelFunc) { + return context.WithCancel(context.Background()) + }, + newReqWaitCtxExpected: true, + reqWaitLimitExpected: 15 * time.Second, + }, + { + name: "context does not have any deadline, 'received at' is set, default wait limit should be in effect starting from the 'received at' time", + defaultRequestWaitLimit: 15 * time.Second, + parent: func(now time.Time) (context.Context, context.CancelFunc) { + ctx := apirequest.WithReceivedTimestamp(context.Background(), now.Add(-10*time.Second)) + return context.WithCancel(ctx) + }, + newReqWaitCtxExpected: true, + reqWaitLimitExpected: 5 * time.Second, // from now + }, + { + name: "context has a deadline, wait limit should not exceed the hard limit of 1m", + parent: func(now time.Time) (context.Context, context.CancelFunc) { + // let 1/4th of the remaining deadline exceed the hard limit + return context.WithDeadline(context.Background(), now.Add(8*time.Minute)) + }, + newReqWaitCtxExpected: true, + reqWaitLimitExpected: time.Minute, + }, + { + name: "context has no deadline, wait limit should not exceed the hard limit of 1m", + defaultRequestWaitLimit: 2 * time.Minute, // it exceeds the hard limit + parent: func(now time.Time) (context.Context, context.CancelFunc) { + return context.WithCancel(context.Background()) + }, + newReqWaitCtxExpected: true, + reqWaitLimitExpected: time.Minute, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + now := time.Now() + parent, cancel := test.parent(now) + defer cancel() + + clock := clocktesting.NewFakePassiveClock(now) + newReqWaitCtxGot, cancelGot := getRequestWaitContext(parent, test.defaultRequestWaitLimit, clock) + if cancelGot == nil { + t.Errorf("Expected a non nil context.CancelFunc") + return + } + defer cancelGot() + + switch { + case test.newReqWaitCtxExpected: + deadlineGot, ok := newReqWaitCtxGot.Deadline() + if !ok { + t.Errorf("Expected the new wait limit context to have a deadline") + } + if waitLimitGot := deadlineGot.Sub(now); test.reqWaitLimitExpected != waitLimitGot { + t.Errorf("Expected request wait limit %s, but got: %s", test.reqWaitLimitExpected, waitLimitGot) + } + default: + if parent != newReqWaitCtxGot { + t.Errorf("Expected the parent context to be returned: want: %#v, got %#v", parent, newReqWaitCtxGot) + } + } + }) + } +}