diff --git a/pkg/controlplane/apiserver/config.go b/pkg/controlplane/apiserver/config.go index c0c720dd44b..52eed984f43 100644 --- a/pkg/controlplane/apiserver/config.go +++ b/pkg/controlplane/apiserver/config.go @@ -202,7 +202,6 @@ func BuildPriorityAndFairness(s controlplaneapiserver.CompletedOptions, extclien versionedInformer, extclient.FlowcontrolV1beta3(), s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight, - s.GenericServerRunOptions.RequestTimeout/4, ), nil } diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index ebe788b1c50..f31b120a446 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -915,7 +915,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { requestWorkEstimator := flowcontrolrequest.NewWorkEstimator( c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg, c.FlowControl.GetMaxSeats) handler = filterlatency.TrackCompleted(handler) - handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator) + handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator, c.RequestTimeout/4) handler = filterlatency.TrackStarted(handler, c.TracerProvider, "priorityandfairness") } else { handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index 6b398778160..05cc44263fb 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -35,6 +35,7 @@ import ( fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics" flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" "k8s.io/klog/v2" + utilsclock "k8s.io/utils/clock" ) // PriorityAndFairnessClassification identifies the results of @@ -78,6 +79,10 @@ type priorityAndFairnessHandler struct { // the purpose of computing RetryAfter header to avoid system // overload. droppedRequests utilflowcontrol.DroppedRequestsTracker + + // newReqWaitCtxFn creates a derived context with a deadline + // of how long a given request can wait in its queue. + newReqWaitCtxFn func(context.Context) (context.Context, context.CancelFunc) } func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Request) { @@ -240,8 +245,9 @@ func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Reque resultCh <- err }() - // We create handleCtx with explicit cancelation function. - // The reason for it is that Handle() underneath may start additional goroutine + // We create handleCtx with an adjusted deadline, for two reasons. + // One is to limit the time the request waits before its execution starts. + // The other reason for it is that Handle() underneath may start additional goroutine // that is blocked on context cancellation. However, from APF point of view, // we don't want to wait until the whole watch request is processed (which is // when it context is actually cancelled) - we want to unblock the goroutine as @@ -249,7 +255,7 @@ func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Reque // // Note that we explicitly do NOT call the actuall handler using that context // to avoid cancelling request too early. - handleCtx, handleCtxCancel := context.WithCancel(ctx) + handleCtx, handleCtxCancel := h.newReqWaitCtxFn(ctx) defer handleCtxCancel() // Note that Handle will return irrespective of whether the request @@ -286,7 +292,11 @@ func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Reque h.handler.ServeHTTP(w, r) } - h.fcIfc.Handle(ctx, digest, noteFn, estimateWork, queueNote, execute) + func() { + handleCtx, cancelFn := h.newReqWaitCtxFn(ctx) + defer cancelFn() + h.fcIfc.Handle(handleCtx, digest, noteFn, estimateWork, queueNote, execute) + }() } if !served { @@ -309,6 +319,7 @@ func WithPriorityAndFairness( longRunningRequestCheck apirequest.LongRunningRequestCheck, fcIfc utilflowcontrol.Interface, workEstimator flowcontrolrequest.WorkEstimatorFunc, + defaultRequestWaitLimit time.Duration, ) http.Handler { if fcIfc == nil { klog.Warningf("priority and fairness support not found, skipping") @@ -322,12 +333,18 @@ func WithPriorityAndFairness( waitingMark.mutatingObserver = fcmetrics.GetWaitingMutatingConcurrency() }) + clock := &utilsclock.RealClock{} + newReqWaitCtxFn := func(ctx context.Context) (context.Context, context.CancelFunc) { + return getRequestWaitContext(ctx, defaultRequestWaitLimit, clock) + } + priorityAndFairnessHandler := &priorityAndFairnessHandler{ handler: handler, longRunningRequestCheck: longRunningRequestCheck, fcIfc: fcIfc, workEstimator: workEstimator, droppedRequests: utilflowcontrol.NewDroppedRequestsTracker(), + newReqWaitCtxFn: newReqWaitCtxFn, } return http.HandlerFunc(priorityAndFairnessHandler.Handle) } @@ -356,3 +373,48 @@ func tooManyRequests(req *http.Request, w http.ResponseWriter, retryAfter string w.Header().Set("Retry-After", retryAfter) http.Error(w, "Too many requests, please try again later.", http.StatusTooManyRequests) } + +// getRequestWaitContext returns a new context with a deadline of how +// long the request is allowed to wait before it is removed from its +// queue and rejected. +// The context.CancelFunc returned must never be nil and the caller is +// responsible for calling the CancelFunc function for cleanup. +// - ctx: the context associated with the request (it may or may +// not have a deadline). +// - defaultRequestWaitLimit: the default wait duration that is used +// if the request context does not have any deadline. +// (a) initialization of a watch or +// (b) a request whose context has no deadline +// +// clock comes in handy for testing the function +func getRequestWaitContext(ctx context.Context, defaultRequestWaitLimit time.Duration, clock utilsclock.PassiveClock) (context.Context, context.CancelFunc) { + if ctx.Err() != nil { + return ctx, func() {} + } + + reqArrivedAt := clock.Now() + if reqReceivedTimestamp, ok := apirequest.ReceivedTimestampFrom(ctx); ok { + reqArrivedAt = reqReceivedTimestamp + } + + // a) we will allow the request to wait in the queue for one + // fourth of the time of its allotted deadline. + // b) if the request context does not have any deadline + // then we default to 'defaultRequestWaitLimit' + // in any case, the wait limit for any request must not + // exceed the hard limit of 1m + // + // request has deadline: + // wait-limit = min(remaining deadline / 4, 1m) + // request has no deadline: + // wait-limit = min(defaultRequestWaitLimit, 1m) + thisReqWaitLimit := defaultRequestWaitLimit + if deadline, ok := ctx.Deadline(); ok { + thisReqWaitLimit = deadline.Sub(reqArrivedAt) / 4 + } + if thisReqWaitLimit > time.Minute { + thisReqWaitLimit = time.Minute + } + + return context.WithDeadline(ctx, reqArrivedAt.Add(thisReqWaitLimit)) +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index 81a17a5b18e..04772a7de1b 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -51,6 +51,7 @@ import ( "k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/testutil" "k8s.io/klog/v2" + clocktesting "k8s.io/utils/clock/testing" "github.com/google/go-cmp/cmp" ) @@ -153,23 +154,23 @@ func newApfServerWithHooks(t *testing.T, decision mockDecision, onExecute, postE WatchTracker: utilflowcontrol.NewWatchTracker(), MaxSeatsTracker: utilflowcontrol.NewMaxSeatsTracker(), } - return newApfServerWithFilter(t, fakeFilter, onExecute, postExecute) + return newApfServerWithFilter(t, fakeFilter, time.Minute/4, onExecute, postExecute) } -func newApfServerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func()) *httptest.Server { +func newApfServerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Interface, defaultWaitLimit time.Duration, onExecute, postExecute func()) *httptest.Server { epmetrics.Register() fcmetrics.Register() - apfServer := httptest.NewServer(newApfHandlerWithFilter(t, flowControlFilter, onExecute, postExecute)) + apfServer := httptest.NewServer(newApfHandlerWithFilter(t, flowControlFilter, defaultWaitLimit, onExecute, postExecute)) return apfServer } -func newApfHandlerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func()) http.Handler { +func newApfHandlerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Interface, defaultWaitLimit time.Duration, onExecute, postExecute func()) http.Handler { requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")} longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy")) apfHandler := WithPriorityAndFairness(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { onExecute() - }), longRunningRequestCheck, flowControlFilter, defaultRequestWorkEstimator) + }), longRunningRequestCheck, flowControlFilter, defaultRequestWorkEstimator, defaultWaitLimit) handler := apifilters.WithRequestInfo(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{ @@ -458,7 +459,7 @@ func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) { postExecuteFunc := func() {} - server := newApfServerWithFilter(t, fakeFilter, onExecuteFunc, postExecuteFunc) + server := newApfServerWithFilter(t, fakeFilter, time.Minute/4, onExecuteFunc, postExecuteFunc) defer server.Close() var wg sync.WaitGroup @@ -498,7 +499,7 @@ func TestApfRejectWatchRequestsWithInitializationSignal(t *testing.T) { } postExecuteFunc := func() {} - server := newApfServerWithFilter(t, fakeFilter, onExecuteFunc, postExecuteFunc) + server := newApfServerWithFilter(t, fakeFilter, time.Minute/4, onExecuteFunc, postExecuteFunc) defer server.Close() if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusTooManyRequests); err != nil { @@ -517,7 +518,7 @@ func TestApfWatchPanic(t *testing.T) { } postExecuteFunc := func() {} - apfHandler := newApfHandlerWithFilter(t, fakeFilter, onExecuteFunc, postExecuteFunc) + apfHandler := newApfHandlerWithFilter(t, fakeFilter, time.Minute/4, onExecuteFunc, postExecuteFunc) handler := func(w http.ResponseWriter, r *http.Request) { defer func() { if err := recover(); err == nil { @@ -564,7 +565,7 @@ func TestApfWatchHandlePanic(t *testing.T) { for _, test := range testCases { t.Run(test.name, func(t *testing.T) { - apfHandler := newApfHandlerWithFilter(t, test.filter, onExecuteFunc, postExecuteFunc) + apfHandler := newApfHandlerWithFilter(t, test.filter, time.Minute/4, onExecuteFunc, postExecuteFunc) handler := func(w http.ResponseWriter, r *http.Request) { defer func() { if err := recover(); err == nil { @@ -649,6 +650,7 @@ func TestApfWithRequestDigest(t *testing.T) { longRunningFunc, fakeFilter, func(_ *http.Request, _, _ string) fcrequest.WorkEstimate { return workExpected }, + time.Minute/4, ) w := httptest.NewRecorder() @@ -685,7 +687,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0) stopCh := make(chan struct{}) - controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency) + controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency) headerMatcher := headerMatcher{} var executed bool @@ -755,7 +757,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0) stopCh := make(chan struct{}) - controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency) + controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency) headerMatcher := headerMatcher{} var executed bool @@ -831,7 +833,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0) stopCh := make(chan struct{}) - controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency) + controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency) headerMatcher := headerMatcher{} var innerHandlerWriteErr error @@ -909,7 +911,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0) stopCh := make(chan struct{}) - controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency) + controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency) headerMatcher := headerMatcher{} var innerHandlerWriteErr error @@ -984,7 +986,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, queueLength) stopCh := make(chan struct{}) - controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency) + controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency) headerMatcher := headerMatcher{} var firstRequestInnerHandlerWriteErr error @@ -1116,11 +1118,11 @@ func fmtError(err error) string { } func startAPFController(t *testing.T, stopCh <-chan struct{}, apfConfiguration []runtime.Object, serverConcurrency int, - requestWaitLimit time.Duration, plName string, plConcurrency int) (utilflowcontrol.Interface, <-chan error) { + plName string, plConcurrency int) (utilflowcontrol.Interface, <-chan error) { clientset := newClientset(t, apfConfiguration...) // this test does not rely on resync, so resync period is set to zero factory := informers.NewSharedInformerFactory(clientset, 0) - controller := utilflowcontrol.New(factory, clientset.FlowcontrolV1beta3(), serverConcurrency, requestWaitLimit) + controller := utilflowcontrol.New(factory, clientset.FlowcontrolV1beta3(), serverConcurrency) factory.Start(stopCh) @@ -1231,7 +1233,7 @@ func newHandlerChain(t *testing.T, handler http.Handler, filter utilflowcontrol. requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")} longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy")) - apfHandler := WithPriorityAndFairness(handler, longRunningRequestCheck, filter, defaultRequestWorkEstimator) + apfHandler := WithPriorityAndFairness(handler, longRunningRequestCheck, filter, defaultRequestWorkEstimator, time.Minute/4) // add the handler in the chain that adds the specified user to the request context handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -1407,3 +1409,107 @@ func isStreamReset(err error) bool { } return false } + +func TestGetRequestWaitContext(t *testing.T) { + tests := []struct { + name string + defaultRequestWaitLimit time.Duration + parent func(t time.Time) (context.Context, context.CancelFunc) + newReqWaitCtxExpected bool + reqWaitLimitExpected time.Duration + }{ + { + name: "context deadline has exceeded", + parent: func(time.Time) (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + return ctx, cancel + }, + }, + { + name: "context has a deadline, 'received at' is not set, wait limit should be one fourth of the remaining deadline from now", + parent: func(now time.Time) (context.Context, context.CancelFunc) { + return context.WithDeadline(context.Background(), now.Add(60*time.Second)) + }, + newReqWaitCtxExpected: true, + reqWaitLimitExpected: 15 * time.Second, + }, + { + name: "context has a deadline, 'received at' is set, wait limit should be one fourth of the deadline starting from the 'received at' time", + parent: func(now time.Time) (context.Context, context.CancelFunc) { + ctx := apirequest.WithReceivedTimestamp(context.Background(), now.Add(-10*time.Second)) + return context.WithDeadline(ctx, now.Add(50*time.Second)) + }, + newReqWaitCtxExpected: true, + reqWaitLimitExpected: 5 * time.Second, // from now + }, + { + name: "context does not have any deadline, 'received at' is not set, default wait limit should be in effect from now", + defaultRequestWaitLimit: 15 * time.Second, + parent: func(time.Time) (context.Context, context.CancelFunc) { + return context.WithCancel(context.Background()) + }, + newReqWaitCtxExpected: true, + reqWaitLimitExpected: 15 * time.Second, + }, + { + name: "context does not have any deadline, 'received at' is set, default wait limit should be in effect starting from the 'received at' time", + defaultRequestWaitLimit: 15 * time.Second, + parent: func(now time.Time) (context.Context, context.CancelFunc) { + ctx := apirequest.WithReceivedTimestamp(context.Background(), now.Add(-10*time.Second)) + return context.WithCancel(ctx) + }, + newReqWaitCtxExpected: true, + reqWaitLimitExpected: 5 * time.Second, // from now + }, + { + name: "context has a deadline, wait limit should not exceed the hard limit of 1m", + parent: func(now time.Time) (context.Context, context.CancelFunc) { + // let 1/4th of the remaining deadline exceed the hard limit + return context.WithDeadline(context.Background(), now.Add(8*time.Minute)) + }, + newReqWaitCtxExpected: true, + reqWaitLimitExpected: time.Minute, + }, + { + name: "context has no deadline, wait limit should not exceed the hard limit of 1m", + defaultRequestWaitLimit: 2 * time.Minute, // it exceeds the hard limit + parent: func(now time.Time) (context.Context, context.CancelFunc) { + return context.WithCancel(context.Background()) + }, + newReqWaitCtxExpected: true, + reqWaitLimitExpected: time.Minute, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + now := time.Now() + parent, cancel := test.parent(now) + defer cancel() + + clock := clocktesting.NewFakePassiveClock(now) + newReqWaitCtxGot, cancelGot := getRequestWaitContext(parent, test.defaultRequestWaitLimit, clock) + if cancelGot == nil { + t.Errorf("Expected a non nil context.CancelFunc") + return + } + defer cancelGot() + + switch { + case test.newReqWaitCtxExpected: + deadlineGot, ok := newReqWaitCtxGot.Deadline() + if !ok { + t.Errorf("Expected the new wait limit context to have a deadline") + } + if waitLimitGot := deadlineGot.Sub(now); test.reqWaitLimitExpected != waitLimitGot { + t.Errorf("Expected request wait limit %s, but got: %s", test.reqWaitLimitExpected, waitLimitGot) + } + default: + if parent != newReqWaitCtxGot { + t.Errorf("Expected the parent context to be returned: want: %#v, got %#v", parent, newReqWaitCtxGot) + } + } + }) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go b/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go index 69f8fb51556..5d031e202e0 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go @@ -154,7 +154,6 @@ func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error { config.SharedInformerFactory, kubernetes.NewForConfigOrDie(config.ClientConfig).FlowcontrolV1beta3(), config.MaxRequestsInFlight+config.MaxMutatingRequestsInFlight, - config.RequestTimeout/4, ) } else { klog.Warningf("Neither kubeconfig is provided nor service-account is mounted, so APIPriorityAndFairness will be disabled") diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index f2df57ccc22..8c90811bf45 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -150,9 +150,6 @@ type configController struct { // from server configuration. serverConcurrencyLimit int - // requestWaitLimit comes from server configuration. - requestWaitLimit time.Duration - // watchTracker implements the necessary WatchTracker interface. WatchTracker @@ -287,13 +284,12 @@ func newTestableController(config TestableConfig) *configController { asFieldManager: config.AsFieldManager, foundToDangling: config.FoundToDangling, serverConcurrencyLimit: config.ServerConcurrencyLimit, - requestWaitLimit: config.RequestWaitLimit, flowcontrolClient: config.FlowcontrolClient, priorityLevelStates: make(map[string]*priorityLevelState), WatchTracker: NewWatchTracker(), MaxSeatsTracker: NewMaxSeatsTracker(), } - klog.V(2).Infof("NewTestableController %q with serverConcurrencyLimit=%d, requestWaitLimit=%s, name=%s, asFieldManager=%q", cfgCtlr.name, cfgCtlr.serverConcurrencyLimit, cfgCtlr.requestWaitLimit, cfgCtlr.name, cfgCtlr.asFieldManager) + klog.V(2).Infof("NewTestableController %q with serverConcurrencyLimit=%d, name=%s, asFieldManager=%q", cfgCtlr.name, cfgCtlr.serverConcurrencyLimit, cfgCtlr.name, cfgCtlr.asFieldManager) // Start with longish delay because conflicts will be between // different processes, so take some time to go away. cfgCtlr.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue") @@ -433,7 +429,7 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta plState := plStates[plName] if setCompleters { qsCompleter, err := queueSetCompleterForPL(cfgCtlr.queueSetFactory, plState.queues, - plState.pl, cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs, + plState.pl, plState.reqsGaugePair, plState.execSeatsObs, metrics.NewUnionGauge(plState.seatDemandIntegrator, plState.seatDemandRatioedGauge)) if err != nil { klog.ErrorS(err, "Inconceivable! Configuration error in existing priority level", "pl", plState.pl) @@ -657,10 +653,10 @@ func (cfgCtlr *configController) lockAndDigestConfigObjects(newPLs []*flowcontro // Supply missing mandatory PriorityLevelConfiguration objects if !meal.haveExemptPL { - meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationExempt, cfgCtlr.requestWaitLimit) + meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationExempt) } if !meal.haveCatchAllPL { - meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationCatchAll, cfgCtlr.requestWaitLimit) + meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationCatchAll) } meal.finishQueueSetReconfigsLocked() @@ -692,7 +688,7 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfi } } qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, - pl, meal.cfgCtlr.requestWaitLimit, state.reqsGaugePair, state.execSeatsObs, + pl, state.reqsGaugePair, state.execSeatsObs, metrics.NewUnionGauge(state.seatDemandIntegrator, state.seatDemandRatioedGauge)) if err != nil { klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err) @@ -798,7 +794,7 @@ func (meal *cfgMeal) processOldPLsLocked() { } var err error plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, - plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs, + plState.pl, plState.reqsGaugePair, plState.execSeatsObs, metrics.NewUnionGauge(plState.seatDemandIntegrator, plState.seatDemandRatioedGauge)) if err != nil { // This can not happen because queueSetCompleterForPL already approved this config @@ -880,7 +876,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() { // queueSetCompleterForPL returns an appropriate QueueSetCompleter for the // given priority level configuration. Returns nil and an error if the given // object is malformed in a way that is a problem for this package. -func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge, seatDemandGauge metrics.Gauge) (fq.QueueSetCompleter, error) { +func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge, seatDemandGauge metrics.Gauge) (fq.QueueSetCompleter, error) { if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementLimited) != (pl.Spec.Limited != nil) { return nil, errors.New("broken union structure at the top, for Limited") } @@ -902,7 +898,6 @@ func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flow DesiredNumQueues: int(qcAPI.Queues), QueueLengthLimit: int(qcAPI.QueueLengthLimit), HandSize: int(qcAPI.HandSize), - RequestWaitLimit: requestWaitLimit, } } } else { @@ -956,16 +951,15 @@ func (meal *cfgMeal) presyncFlowSchemaStatus(fs *flowcontrol.FlowSchema, isDangl // imaginePL adds a priority level based on one of the mandatory ones // that does not actually exist (right now) as a real API object. -func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration) { +func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration) { klog.V(3).Infof("No %s PriorityLevelConfiguration found, imagining one", proto.Name) labelValues := []string{proto.Name} reqsGaugePair := metrics.RatioedGaugeVecPhasedElementPair(meal.cfgCtlr.reqsGaugeVec, 1, 1, labelValues) execSeatsObs := meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues) seatDemandIntegrator := fq.NewNamedIntegrator(meal.cfgCtlr.clock, proto.Name) seatDemandRatioedGauge := metrics.ApiserverSeatDemands.NewForLabelValuesSafe(0, 1, []string{proto.Name}) - qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, - requestWaitLimit, reqsGaugePair, execSeatsObs, - metrics.NewUnionGauge(seatDemandIntegrator, seatDemandRatioedGauge)) + qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, reqsGaugePair, + execSeatsObs, metrics.NewUnionGauge(seatDemandIntegrator, seatDemandRatioedGauge)) if err != nil { // This can not happen because proto is one of the mandatory // objects and these are not erroneous diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go index 76782623a84..05f4f5e5392 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go @@ -90,7 +90,6 @@ func New( informerFactory kubeinformers.SharedInformerFactory, flowcontrolClient flowcontrolclient.FlowcontrolV1beta3Interface, serverConcurrencyLimit int, - requestWaitLimit time.Duration, ) Interface { clk := eventclock.Real{} return NewTestable(TestableConfig{ @@ -101,7 +100,6 @@ func New( InformerFactory: informerFactory, FlowcontrolClient: flowcontrolClient, ServerConcurrencyLimit: serverConcurrencyLimit, - RequestWaitLimit: requestWaitLimit, ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, QueueSetFactory: fqs.NewQueueSetFactory(clk), @@ -139,9 +137,6 @@ type TestableConfig struct { // ServerConcurrencyLimit for the controller to enforce ServerConcurrencyLimit int - // RequestWaitLimit configured on the server - RequestWaitLimit time.Duration - // GaugeVec for metrics about requests, broken down by phase and priority_level ReqsGaugeVec metrics.RatioedGaugeVec diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter_test.go index 840fecd75ac..dcc4de38add 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter_test.go @@ -109,7 +109,6 @@ func TestQueueWaitTimeLatencyTracker(t *testing.T) { InformerFactory: informerFactory, FlowcontrolClient: flowcontrolClient, ServerConcurrencyLimit: 24, - RequestWaitLimit: time.Minute, ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, QueueSetFactory: fqs.NewQueueSetFactory(clk), diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/borrowing_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/borrowing_test.go index 8511730f03c..e6ab27bea48 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/borrowing_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/borrowing_test.go @@ -143,7 +143,6 @@ func TestBorrowing(t *testing.T) { InformerFactory: informerFactory, FlowcontrolClient: flowcontrolClient, ServerConcurrencyLimit: 24, - RequestWaitLimit: time.Minute, ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, QueueSetFactory: fqs.NewQueueSetFactory(clk), diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go index ec7c58a9b25..a50c096802d 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go @@ -251,8 +251,7 @@ func TestConfigConsumer(t *testing.T) { FoundToDangling: func(found bool) bool { return !found }, InformerFactory: informerFactory, FlowcontrolClient: flowcontrolClient, - ServerConcurrencyLimit: 100, // server concurrency limit - RequestWaitLimit: time.Minute, // request wait limit + ServerConcurrencyLimit: 100, // server concurrency limit ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, QueueSetFactory: cts, @@ -384,7 +383,6 @@ func TestAPFControllerWithGracefulShutdown(t *testing.T) { InformerFactory: informerFactory, FlowcontrolClient: flowcontrolClient, ServerConcurrencyLimit: 100, - RequestWaitLimit: time.Minute, ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, QueueSetFactory: cts, diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go index 013fd41e087..3b0ad16387e 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go @@ -18,7 +18,6 @@ package fairqueuing import ( "context" - "time" "k8s.io/apiserver/pkg/util/flowcontrol/debug" "k8s.io/apiserver/pkg/util/flowcontrol/metrics" @@ -117,7 +116,7 @@ type QueuingConfig struct { // DesiredNumQueues is the number of queues that the API says // should exist now. This may be non-positive, in which case - // QueueLengthLimit, HandSize, and RequestWaitLimit are ignored. + // QueueLengthLimit, and HandSize are ignored. // A value of zero means to respect the ConcurrencyLimit of the DispatchingConfig. // A negative value means to always dispatch immediately upon arrival // (i.e., the requests are "exempt" from limitation). @@ -129,10 +128,6 @@ type QueuingConfig struct { // HandSize is a parameter of shuffle sharding. Upon arrival of a request, a queue is chosen by randomly // dealing a "hand" of this many queues and then picking one of minimum length. HandSize int - - // RequestWaitLimit is the maximum amount of time that a request may wait in a queue. - // If, by the end of that time, the request has not been dispatched then it is rejected. - RequestWaitLimit time.Duration } // DispatchingConfig defines the configuration of the dispatching aspect of a QueueSet. diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 99397ecbaa2..b675bb5453c 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -272,7 +272,6 @@ func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig, } else { qCfg.QueueLengthLimit = qs.qCfg.QueueLengthLimit qCfg.HandSize = qs.qCfg.HandSize - qCfg.RequestWaitLimit = qs.qCfg.RequestWaitLimit } qs.qCfg = qCfg @@ -300,9 +299,6 @@ const ( // Serve this one decisionExecute requestDecision = iota - // Reject this one due to APF queuing considerations - decisionReject - // This one's context timed out / was canceled decisionCancel ) @@ -337,11 +333,10 @@ func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.Wo // ======================================================================== // Step 1: // 1) Start with shuffle sharding, to pick a queue. - // 2) Reject old requests that have been waiting too long - // 3) Reject current request if there is not enough concurrency shares and + // 2) Reject current request if there is not enough concurrency shares and // we are at max queue length - // 4) If not rejected, create a request and enqueue - req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, workEstimate, hashValue, flowDistinguisher, fsName, descr1, descr2, queueNoteFn) + // 3) If not rejected, create a request and enqueue + req = qs.shuffleShardAndRejectOrEnqueueLocked(ctx, workEstimate, hashValue, flowDistinguisher, fsName, descr1, descr2, queueNoteFn) // req == nil means that the request was rejected - no remaining // concurrency shares and at max queue length already if req == nil { @@ -422,13 +417,7 @@ func (req *request) wait() (bool, bool) { } req.waitStarted = true switch decisionAny { - case decisionReject: - klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, req.descr1, req.descr2) - qs.totRequestsRejected++ - qs.totRequestsTimedout++ - metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "time-out") - return false, qs.isIdleLocked() - case decisionCancel: + case decisionCancel: // handle in code following this switch case decisionExecute: klog.V(5).Infof("QS(%s): Dispatching request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2) return true, false @@ -438,7 +427,7 @@ func (req *request) wait() (bool, bool) { } // TODO(aaron-prindle) add metrics for this case klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2) - // remove the request from the queue as it has timed out + // remove the request from the queue as its queue wait time has exceeded queue := req.queue if req.removeFromQueueLocked() != nil { defer qs.boundNextDispatchLocked(queue) @@ -446,7 +435,7 @@ func (req *request) wait() (bool, bool) { qs.totSeatsWaiting -= req.MaxSeats() qs.totRequestsRejected++ qs.totRequestsCancelled++ - metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled") + metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "time-out") metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) metrics.AddSeatsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -req.MaxSeats()) req.NoteQueued(false) @@ -556,25 +545,19 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 { return math.Min(float64(seatsRequested), float64(qs.dCfg.ConcurrencyLimit)) / float64(activeQueues) } -// timeoutOldRequestsAndRejectOrEnqueueLocked encapsulates the logic required +// shuffleShardAndRejectOrEnqueueLocked encapsulates the logic required // to validate and enqueue a request for the queueSet/QueueSet: // 1) Start with shuffle sharding, to pick a queue. -// 2) Reject old requests that have been waiting too long -// 3) Reject current request if there is not enough concurrency shares and +// 2) Reject current request if there is not enough concurrency shares and // we are at max queue length -// 4) If not rejected, create a request and enqueue +// 3) If not rejected, create a request and enqueue // returns the enqueud request on a successful enqueue // returns nil in the case that there is no available concurrency or // the queuelengthlimit has been reached -func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request { +func (qs *queueSet) shuffleShardAndRejectOrEnqueueLocked(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request { // Start with the shuffle sharding, to pick a queue. queueIdx := qs.shuffleShardLocked(hashValue, descr1, descr2) queue := qs.queues[queueIdx] - // The next step is the logic to reject requests that have been waiting too long - qs.removeTimedOutRequestsFromQueueToBoundLocked(queue, fsName) - // NOTE: currently timeout is only checked for each new request. This means that there can be - // requests that are in the queue longer than the timeout if there are no new requests - // We prefer the simplicity over the promptness, at least for now. defer qs.boundNextDispatchLocked(queue) @@ -633,44 +616,6 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac return bestQueueIdx } -// removeTimedOutRequestsFromQueueToBoundLocked rejects old requests that have been enqueued -// past the requestWaitLimit -func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, fsName string) { - timeoutCount := 0 - disqueueSeats := 0 - now := qs.clock.Now() - reqs := queue.requestsWaiting - // reqs are sorted oldest -> newest - // can short circuit loop (break) if oldest requests are not timing out - // as newer requests also will not have timed out - - // now - requestWaitLimit = arrivalLimit - arrivalLimit := now.Add(-qs.qCfg.RequestWaitLimit) - reqs.Walk(func(req *request) bool { - if arrivalLimit.After(req.arrivalTime) { - if req.decision.Set(decisionReject) && req.removeFromQueueLocked() != nil { - timeoutCount++ - disqueueSeats += req.MaxSeats() - req.NoteQueued(false) - metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) - metrics.AddSeatsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -req.MaxSeats()) - } - // we need to check if the next request has timed out. - return true - } - // since reqs are sorted oldest -> newest, we are done here. - return false - }) - - // remove timed out requests from queue - if timeoutCount > 0 { - qs.totRequestsWaiting -= timeoutCount - qs.totSeatsWaiting -= disqueueSeats - qs.reqsGaugePair.RequestsWaiting.Add(float64(-timeoutCount)) - qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting)) - } -} - // rejectOrEnqueueToBoundLocked rejects or enqueues the newly arrived // request, which has been assigned to a queue. If up against the // queue length limit and the concurrency limit then returns false. diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index ab43b54c45a..5e9399605da 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -551,7 +551,6 @@ func TestBaseline(t *testing.T) { DesiredNumQueues: 9, QueueLengthLimit: 8, HandSize: 3, - RequestWaitLimit: 10 * time.Minute, } seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject") qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) @@ -590,7 +589,6 @@ func TestExampt(t *testing.T) { DesiredNumQueues: -1, QueueLengthLimit: 2, HandSize: 3, - RequestWaitLimit: 10 * time.Minute, } seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject") qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) @@ -663,7 +661,6 @@ func TestSeparations(t *testing.T) { DesiredNumQueues: 9, QueueLengthLimit: 8, HandSize: 3, - RequestWaitLimit: 10 * time.Minute, } seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, caseName+" seatDemandSubject") qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) @@ -704,7 +701,6 @@ func TestUniformFlowsHandSize1(t *testing.T) { DesiredNumQueues: 9, QueueLengthLimit: 8, HandSize: 1, - RequestWaitLimit: 10 * time.Minute, } seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject") qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) @@ -743,7 +739,6 @@ func TestUniformFlowsHandSize3(t *testing.T) { DesiredNumQueues: 8, QueueLengthLimit: 16, HandSize: 3, - RequestWaitLimit: 10 * time.Minute, } seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) @@ -781,7 +776,6 @@ func TestDifferentFlowsExpectEqual(t *testing.T) { DesiredNumQueues: 9, QueueLengthLimit: 8, HandSize: 1, - RequestWaitLimit: 10 * time.Minute, } seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) @@ -823,7 +817,6 @@ func TestSeatSecondsRollover(t *testing.T) { DesiredNumQueues: 9, QueueLengthLimit: 8, HandSize: 1, - RequestWaitLimit: 40 * Quarter, } seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) @@ -863,7 +856,6 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) { DesiredNumQueues: 9, QueueLengthLimit: 6, HandSize: 1, - RequestWaitLimit: 10 * time.Minute, } seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) @@ -902,7 +894,6 @@ func TestDifferentWidths(t *testing.T) { DesiredNumQueues: 64, QueueLengthLimit: 13, HandSize: 7, - RequestWaitLimit: 10 * time.Minute, } seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) @@ -940,7 +931,6 @@ func TestTooWide(t *testing.T) { DesiredNumQueues: 64, QueueLengthLimit: 35, HandSize: 7, - RequestWaitLimit: 10 * time.Minute, } seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) @@ -1003,7 +993,6 @@ func TestWindup(t *testing.T) { DesiredNumQueues: 9, QueueLengthLimit: 6, HandSize: 1, - RequestWaitLimit: 10 * time.Minute, } seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) @@ -1067,44 +1056,6 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) { }.exercise(t) } -func TestTimeout(t *testing.T) { - metrics.Register() - now := time.Now() - - clk, counter := testeventclock.NewFake(now, 0, nil) - qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter)) - qCfg := fq.QueuingConfig{ - Name: "TestTimeout", - DesiredNumQueues: 128, - QueueLengthLimit: 128, - HandSize: 1, - RequestWaitLimit: 0, - } - seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) - qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) - if err != nil { - t.Fatal(err) - } - qs := qsComplete(qsc, 1) - - uniformScenario{name: qCfg.Name, - qs: qs, - clients: []uniformClient{ - newUniformClient(1001001001, 5, 100, time.Second, time.Second), - }, - concurrencyLimit: 1, - evalDuration: time.Second * 10, - expectedFair: []bool{true}, - expectedFairnessMargin: []float64{0.01}, - evalInqueueMetrics: true, - evalExecutingMetrics: true, - rejectReason: "time-out", - clk: clk, - counter: counter, - seatDemandIntegratorSubject: seatDemandIntegratorSubject, - }.exercise(t) -} - // TestContextCancel tests cancellation of a request's context. // The outline is: // 1. Use a concurrency limit of 1. @@ -1131,7 +1082,6 @@ func TestContextCancel(t *testing.T) { DesiredNumQueues: 11, QueueLengthLimit: 11, HandSize: 1, - RequestWaitLimit: 15 * time.Second, } seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) @@ -1238,7 +1188,6 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) { qCfg := fq.QueuingConfig{ Name: "TestTotalRequestsExecutingWithPanic", DesiredNumQueues: 0, - RequestWaitLimit: 15 * time.Second, } qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), fq.NewNamedIntegrator(clk, qCfg.Name)) if err != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/gen_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/gen_test.go index 34e67386c0e..4b0d7dba4d8 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/gen_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/gen_test.go @@ -21,7 +21,6 @@ import ( "math/rand" "sync/atomic" "testing" - "time" "k8s.io/utils/clock" @@ -60,7 +59,7 @@ func genPL(rng *rand.Rand, name string) *flowcontrol.PriorityLevelConfiguration QueueLengthLimit: 5} } labelVals := []string{"test"} - _, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.RatioedGaugeVecPhasedElementPair(metrics.PriorityLevelConcurrencyGaugeVec, 1, 1, labelVals), metrics.PriorityLevelExecutionSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelVals), fq.NewNamedIntegrator(clock.RealClock{}, name)) + _, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, metrics.RatioedGaugeVecPhasedElementPair(metrics.PriorityLevelConcurrencyGaugeVec, 1, 1, labelVals), metrics.PriorityLevelExecutionSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelVals), fq.NewNamedIntegrator(clock.RealClock{}, name)) if err != nil { panic(err) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/max_seats_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/max_seats_test.go index 23697acfaa1..92c0367f272 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/max_seats_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/max_seats_test.go @@ -108,7 +108,6 @@ func Test_GetMaxSeats(t *testing.T) { // for the purposes of this test, serverCL ~= nominalCL since there is // only 1 PL with large concurrency shares, making mandatory PLs negligible. ServerConcurrencyLimit: testcase.nominalCL, - RequestWaitLimit: time.Minute, ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, QueueSetFactory: fqs.NewQueueSetFactory(clk), diff --git a/test/integration/apiserver/flowcontrol/fight_test.go b/test/integration/apiserver/flowcontrol/fight_test.go index e80be152fb2..174b30ed3e5 100644 --- a/test/integration/apiserver/flowcontrol/fight_test.go +++ b/test/integration/apiserver/flowcontrol/fight_test.go @@ -139,8 +139,7 @@ func (ft *fightTest) createController(invert bool, i int) { AsFieldManager: fieldMgr, InformerFactory: informerFactory, FlowcontrolClient: fcIfc, - ServerConcurrencyLimit: 200, // server concurrency limit - RequestWaitLimit: time.Minute / 4, // request wait limit + ServerConcurrencyLimit: 200, // server concurrency limit ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, QueueSetFactory: fqtesting.NewNoRestraintFactory(),