mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-13 13:55:41 +00:00
Merge pull request #120222 from tkashem/apf-queue-wait-ctx
apf: manage request queue wait with context in APF Filter
This commit is contained in:
commit
6a84edb2ce
@ -202,7 +202,6 @@ func BuildPriorityAndFairness(s controlplaneapiserver.CompletedOptions, extclien
|
|||||||
versionedInformer,
|
versionedInformer,
|
||||||
extclient.FlowcontrolV1beta3(),
|
extclient.FlowcontrolV1beta3(),
|
||||||
s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight,
|
s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight,
|
||||||
s.GenericServerRunOptions.RequestTimeout/4,
|
|
||||||
), nil
|
), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -915,7 +915,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
|
|||||||
requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(
|
requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(
|
||||||
c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg, c.FlowControl.GetMaxSeats)
|
c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg, c.FlowControl.GetMaxSeats)
|
||||||
handler = filterlatency.TrackCompleted(handler)
|
handler = filterlatency.TrackCompleted(handler)
|
||||||
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
|
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator, c.RequestTimeout/4)
|
||||||
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "priorityandfairness")
|
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "priorityandfairness")
|
||||||
} else {
|
} else {
|
||||||
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
|
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
|
||||||
|
@ -35,6 +35,7 @@ import (
|
|||||||
fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||||
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
|
utilsclock "k8s.io/utils/clock"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PriorityAndFairnessClassification identifies the results of
|
// PriorityAndFairnessClassification identifies the results of
|
||||||
@ -78,6 +79,10 @@ type priorityAndFairnessHandler struct {
|
|||||||
// the purpose of computing RetryAfter header to avoid system
|
// the purpose of computing RetryAfter header to avoid system
|
||||||
// overload.
|
// overload.
|
||||||
droppedRequests utilflowcontrol.DroppedRequestsTracker
|
droppedRequests utilflowcontrol.DroppedRequestsTracker
|
||||||
|
|
||||||
|
// newReqWaitCtxFn creates a derived context with a deadline
|
||||||
|
// of how long a given request can wait in its queue.
|
||||||
|
newReqWaitCtxFn func(context.Context) (context.Context, context.CancelFunc)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Request) {
|
func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -240,8 +245,9 @@ func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Reque
|
|||||||
resultCh <- err
|
resultCh <- err
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// We create handleCtx with explicit cancelation function.
|
// We create handleCtx with an adjusted deadline, for two reasons.
|
||||||
// The reason for it is that Handle() underneath may start additional goroutine
|
// One is to limit the time the request waits before its execution starts.
|
||||||
|
// The other reason for it is that Handle() underneath may start additional goroutine
|
||||||
// that is blocked on context cancellation. However, from APF point of view,
|
// that is blocked on context cancellation. However, from APF point of view,
|
||||||
// we don't want to wait until the whole watch request is processed (which is
|
// we don't want to wait until the whole watch request is processed (which is
|
||||||
// when it context is actually cancelled) - we want to unblock the goroutine as
|
// when it context is actually cancelled) - we want to unblock the goroutine as
|
||||||
@ -249,7 +255,7 @@ func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Reque
|
|||||||
//
|
//
|
||||||
// Note that we explicitly do NOT call the actuall handler using that context
|
// Note that we explicitly do NOT call the actuall handler using that context
|
||||||
// to avoid cancelling request too early.
|
// to avoid cancelling request too early.
|
||||||
handleCtx, handleCtxCancel := context.WithCancel(ctx)
|
handleCtx, handleCtxCancel := h.newReqWaitCtxFn(ctx)
|
||||||
defer handleCtxCancel()
|
defer handleCtxCancel()
|
||||||
|
|
||||||
// Note that Handle will return irrespective of whether the request
|
// Note that Handle will return irrespective of whether the request
|
||||||
@ -286,7 +292,11 @@ func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Reque
|
|||||||
h.handler.ServeHTTP(w, r)
|
h.handler.ServeHTTP(w, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
h.fcIfc.Handle(ctx, digest, noteFn, estimateWork, queueNote, execute)
|
func() {
|
||||||
|
handleCtx, cancelFn := h.newReqWaitCtxFn(ctx)
|
||||||
|
defer cancelFn()
|
||||||
|
h.fcIfc.Handle(handleCtx, digest, noteFn, estimateWork, queueNote, execute)
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
if !served {
|
if !served {
|
||||||
@ -309,6 +319,7 @@ func WithPriorityAndFairness(
|
|||||||
longRunningRequestCheck apirequest.LongRunningRequestCheck,
|
longRunningRequestCheck apirequest.LongRunningRequestCheck,
|
||||||
fcIfc utilflowcontrol.Interface,
|
fcIfc utilflowcontrol.Interface,
|
||||||
workEstimator flowcontrolrequest.WorkEstimatorFunc,
|
workEstimator flowcontrolrequest.WorkEstimatorFunc,
|
||||||
|
defaultRequestWaitLimit time.Duration,
|
||||||
) http.Handler {
|
) http.Handler {
|
||||||
if fcIfc == nil {
|
if fcIfc == nil {
|
||||||
klog.Warningf("priority and fairness support not found, skipping")
|
klog.Warningf("priority and fairness support not found, skipping")
|
||||||
@ -322,12 +333,18 @@ func WithPriorityAndFairness(
|
|||||||
waitingMark.mutatingObserver = fcmetrics.GetWaitingMutatingConcurrency()
|
waitingMark.mutatingObserver = fcmetrics.GetWaitingMutatingConcurrency()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
clock := &utilsclock.RealClock{}
|
||||||
|
newReqWaitCtxFn := func(ctx context.Context) (context.Context, context.CancelFunc) {
|
||||||
|
return getRequestWaitContext(ctx, defaultRequestWaitLimit, clock)
|
||||||
|
}
|
||||||
|
|
||||||
priorityAndFairnessHandler := &priorityAndFairnessHandler{
|
priorityAndFairnessHandler := &priorityAndFairnessHandler{
|
||||||
handler: handler,
|
handler: handler,
|
||||||
longRunningRequestCheck: longRunningRequestCheck,
|
longRunningRequestCheck: longRunningRequestCheck,
|
||||||
fcIfc: fcIfc,
|
fcIfc: fcIfc,
|
||||||
workEstimator: workEstimator,
|
workEstimator: workEstimator,
|
||||||
droppedRequests: utilflowcontrol.NewDroppedRequestsTracker(),
|
droppedRequests: utilflowcontrol.NewDroppedRequestsTracker(),
|
||||||
|
newReqWaitCtxFn: newReqWaitCtxFn,
|
||||||
}
|
}
|
||||||
return http.HandlerFunc(priorityAndFairnessHandler.Handle)
|
return http.HandlerFunc(priorityAndFairnessHandler.Handle)
|
||||||
}
|
}
|
||||||
@ -356,3 +373,48 @@ func tooManyRequests(req *http.Request, w http.ResponseWriter, retryAfter string
|
|||||||
w.Header().Set("Retry-After", retryAfter)
|
w.Header().Set("Retry-After", retryAfter)
|
||||||
http.Error(w, "Too many requests, please try again later.", http.StatusTooManyRequests)
|
http.Error(w, "Too many requests, please try again later.", http.StatusTooManyRequests)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getRequestWaitContext returns a new context with a deadline of how
|
||||||
|
// long the request is allowed to wait before it is removed from its
|
||||||
|
// queue and rejected.
|
||||||
|
// The context.CancelFunc returned must never be nil and the caller is
|
||||||
|
// responsible for calling the CancelFunc function for cleanup.
|
||||||
|
// - ctx: the context associated with the request (it may or may
|
||||||
|
// not have a deadline).
|
||||||
|
// - defaultRequestWaitLimit: the default wait duration that is used
|
||||||
|
// if the request context does not have any deadline.
|
||||||
|
// (a) initialization of a watch or
|
||||||
|
// (b) a request whose context has no deadline
|
||||||
|
//
|
||||||
|
// clock comes in handy for testing the function
|
||||||
|
func getRequestWaitContext(ctx context.Context, defaultRequestWaitLimit time.Duration, clock utilsclock.PassiveClock) (context.Context, context.CancelFunc) {
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return ctx, func() {}
|
||||||
|
}
|
||||||
|
|
||||||
|
reqArrivedAt := clock.Now()
|
||||||
|
if reqReceivedTimestamp, ok := apirequest.ReceivedTimestampFrom(ctx); ok {
|
||||||
|
reqArrivedAt = reqReceivedTimestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
// a) we will allow the request to wait in the queue for one
|
||||||
|
// fourth of the time of its allotted deadline.
|
||||||
|
// b) if the request context does not have any deadline
|
||||||
|
// then we default to 'defaultRequestWaitLimit'
|
||||||
|
// in any case, the wait limit for any request must not
|
||||||
|
// exceed the hard limit of 1m
|
||||||
|
//
|
||||||
|
// request has deadline:
|
||||||
|
// wait-limit = min(remaining deadline / 4, 1m)
|
||||||
|
// request has no deadline:
|
||||||
|
// wait-limit = min(defaultRequestWaitLimit, 1m)
|
||||||
|
thisReqWaitLimit := defaultRequestWaitLimit
|
||||||
|
if deadline, ok := ctx.Deadline(); ok {
|
||||||
|
thisReqWaitLimit = deadline.Sub(reqArrivedAt) / 4
|
||||||
|
}
|
||||||
|
if thisReqWaitLimit > time.Minute {
|
||||||
|
thisReqWaitLimit = time.Minute
|
||||||
|
}
|
||||||
|
|
||||||
|
return context.WithDeadline(ctx, reqArrivedAt.Add(thisReqWaitLimit))
|
||||||
|
}
|
||||||
|
@ -51,6 +51,7 @@ import (
|
|||||||
"k8s.io/component-base/metrics/legacyregistry"
|
"k8s.io/component-base/metrics/legacyregistry"
|
||||||
"k8s.io/component-base/metrics/testutil"
|
"k8s.io/component-base/metrics/testutil"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
|
clocktesting "k8s.io/utils/clock/testing"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
)
|
)
|
||||||
@ -153,23 +154,23 @@ func newApfServerWithHooks(t *testing.T, decision mockDecision, onExecute, postE
|
|||||||
WatchTracker: utilflowcontrol.NewWatchTracker(),
|
WatchTracker: utilflowcontrol.NewWatchTracker(),
|
||||||
MaxSeatsTracker: utilflowcontrol.NewMaxSeatsTracker(),
|
MaxSeatsTracker: utilflowcontrol.NewMaxSeatsTracker(),
|
||||||
}
|
}
|
||||||
return newApfServerWithFilter(t, fakeFilter, onExecute, postExecute)
|
return newApfServerWithFilter(t, fakeFilter, time.Minute/4, onExecute, postExecute)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newApfServerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func()) *httptest.Server {
|
func newApfServerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Interface, defaultWaitLimit time.Duration, onExecute, postExecute func()) *httptest.Server {
|
||||||
epmetrics.Register()
|
epmetrics.Register()
|
||||||
fcmetrics.Register()
|
fcmetrics.Register()
|
||||||
apfServer := httptest.NewServer(newApfHandlerWithFilter(t, flowControlFilter, onExecute, postExecute))
|
apfServer := httptest.NewServer(newApfHandlerWithFilter(t, flowControlFilter, defaultWaitLimit, onExecute, postExecute))
|
||||||
return apfServer
|
return apfServer
|
||||||
}
|
}
|
||||||
|
|
||||||
func newApfHandlerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func()) http.Handler {
|
func newApfHandlerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Interface, defaultWaitLimit time.Duration, onExecute, postExecute func()) http.Handler {
|
||||||
requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")}
|
requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")}
|
||||||
longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
|
longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
|
||||||
|
|
||||||
apfHandler := WithPriorityAndFairness(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
apfHandler := WithPriorityAndFairness(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
onExecute()
|
onExecute()
|
||||||
}), longRunningRequestCheck, flowControlFilter, defaultRequestWorkEstimator)
|
}), longRunningRequestCheck, flowControlFilter, defaultRequestWorkEstimator, defaultWaitLimit)
|
||||||
|
|
||||||
handler := apifilters.WithRequestInfo(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
handler := apifilters.WithRequestInfo(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{
|
r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{
|
||||||
@ -458,7 +459,7 @@ func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) {
|
|||||||
|
|
||||||
postExecuteFunc := func() {}
|
postExecuteFunc := func() {}
|
||||||
|
|
||||||
server := newApfServerWithFilter(t, fakeFilter, onExecuteFunc, postExecuteFunc)
|
server := newApfServerWithFilter(t, fakeFilter, time.Minute/4, onExecuteFunc, postExecuteFunc)
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
@ -498,7 +499,7 @@ func TestApfRejectWatchRequestsWithInitializationSignal(t *testing.T) {
|
|||||||
}
|
}
|
||||||
postExecuteFunc := func() {}
|
postExecuteFunc := func() {}
|
||||||
|
|
||||||
server := newApfServerWithFilter(t, fakeFilter, onExecuteFunc, postExecuteFunc)
|
server := newApfServerWithFilter(t, fakeFilter, time.Minute/4, onExecuteFunc, postExecuteFunc)
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusTooManyRequests); err != nil {
|
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusTooManyRequests); err != nil {
|
||||||
@ -517,7 +518,7 @@ func TestApfWatchPanic(t *testing.T) {
|
|||||||
}
|
}
|
||||||
postExecuteFunc := func() {}
|
postExecuteFunc := func() {}
|
||||||
|
|
||||||
apfHandler := newApfHandlerWithFilter(t, fakeFilter, onExecuteFunc, postExecuteFunc)
|
apfHandler := newApfHandlerWithFilter(t, fakeFilter, time.Minute/4, onExecuteFunc, postExecuteFunc)
|
||||||
handler := func(w http.ResponseWriter, r *http.Request) {
|
handler := func(w http.ResponseWriter, r *http.Request) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := recover(); err == nil {
|
if err := recover(); err == nil {
|
||||||
@ -564,7 +565,7 @@ func TestApfWatchHandlePanic(t *testing.T) {
|
|||||||
|
|
||||||
for _, test := range testCases {
|
for _, test := range testCases {
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
apfHandler := newApfHandlerWithFilter(t, test.filter, onExecuteFunc, postExecuteFunc)
|
apfHandler := newApfHandlerWithFilter(t, test.filter, time.Minute/4, onExecuteFunc, postExecuteFunc)
|
||||||
handler := func(w http.ResponseWriter, r *http.Request) {
|
handler := func(w http.ResponseWriter, r *http.Request) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := recover(); err == nil {
|
if err := recover(); err == nil {
|
||||||
@ -649,6 +650,7 @@ func TestApfWithRequestDigest(t *testing.T) {
|
|||||||
longRunningFunc,
|
longRunningFunc,
|
||||||
fakeFilter,
|
fakeFilter,
|
||||||
func(_ *http.Request, _, _ string) fcrequest.WorkEstimate { return workExpected },
|
func(_ *http.Request, _, _ string) fcrequest.WorkEstimate { return workExpected },
|
||||||
|
time.Minute/4,
|
||||||
)
|
)
|
||||||
|
|
||||||
w := httptest.NewRecorder()
|
w := httptest.NewRecorder()
|
||||||
@ -685,7 +687,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
|
|
||||||
apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0)
|
apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0)
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency)
|
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency)
|
||||||
|
|
||||||
headerMatcher := headerMatcher{}
|
headerMatcher := headerMatcher{}
|
||||||
var executed bool
|
var executed bool
|
||||||
@ -755,7 +757,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
|
|
||||||
apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0)
|
apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0)
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency)
|
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency)
|
||||||
|
|
||||||
headerMatcher := headerMatcher{}
|
headerMatcher := headerMatcher{}
|
||||||
var executed bool
|
var executed bool
|
||||||
@ -831,7 +833,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
|
|
||||||
apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0)
|
apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0)
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency)
|
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency)
|
||||||
|
|
||||||
headerMatcher := headerMatcher{}
|
headerMatcher := headerMatcher{}
|
||||||
var innerHandlerWriteErr error
|
var innerHandlerWriteErr error
|
||||||
@ -909,7 +911,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
|
|
||||||
apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0)
|
apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0)
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency)
|
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency)
|
||||||
|
|
||||||
headerMatcher := headerMatcher{}
|
headerMatcher := headerMatcher{}
|
||||||
var innerHandlerWriteErr error
|
var innerHandlerWriteErr error
|
||||||
@ -984,7 +986,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
|||||||
|
|
||||||
apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, queueLength)
|
apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, queueLength)
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency)
|
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency)
|
||||||
|
|
||||||
headerMatcher := headerMatcher{}
|
headerMatcher := headerMatcher{}
|
||||||
var firstRequestInnerHandlerWriteErr error
|
var firstRequestInnerHandlerWriteErr error
|
||||||
@ -1116,11 +1118,11 @@ func fmtError(err error) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func startAPFController(t *testing.T, stopCh <-chan struct{}, apfConfiguration []runtime.Object, serverConcurrency int,
|
func startAPFController(t *testing.T, stopCh <-chan struct{}, apfConfiguration []runtime.Object, serverConcurrency int,
|
||||||
requestWaitLimit time.Duration, plName string, plConcurrency int) (utilflowcontrol.Interface, <-chan error) {
|
plName string, plConcurrency int) (utilflowcontrol.Interface, <-chan error) {
|
||||||
clientset := newClientset(t, apfConfiguration...)
|
clientset := newClientset(t, apfConfiguration...)
|
||||||
// this test does not rely on resync, so resync period is set to zero
|
// this test does not rely on resync, so resync period is set to zero
|
||||||
factory := informers.NewSharedInformerFactory(clientset, 0)
|
factory := informers.NewSharedInformerFactory(clientset, 0)
|
||||||
controller := utilflowcontrol.New(factory, clientset.FlowcontrolV1beta3(), serverConcurrency, requestWaitLimit)
|
controller := utilflowcontrol.New(factory, clientset.FlowcontrolV1beta3(), serverConcurrency)
|
||||||
|
|
||||||
factory.Start(stopCh)
|
factory.Start(stopCh)
|
||||||
|
|
||||||
@ -1231,7 +1233,7 @@ func newHandlerChain(t *testing.T, handler http.Handler, filter utilflowcontrol.
|
|||||||
requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")}
|
requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")}
|
||||||
longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
|
longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
|
||||||
|
|
||||||
apfHandler := WithPriorityAndFairness(handler, longRunningRequestCheck, filter, defaultRequestWorkEstimator)
|
apfHandler := WithPriorityAndFairness(handler, longRunningRequestCheck, filter, defaultRequestWorkEstimator, time.Minute/4)
|
||||||
|
|
||||||
// add the handler in the chain that adds the specified user to the request context
|
// add the handler in the chain that adds the specified user to the request context
|
||||||
handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -1407,3 +1409,107 @@ func isStreamReset(err error) bool {
|
|||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGetRequestWaitContext(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
defaultRequestWaitLimit time.Duration
|
||||||
|
parent func(t time.Time) (context.Context, context.CancelFunc)
|
||||||
|
newReqWaitCtxExpected bool
|
||||||
|
reqWaitLimitExpected time.Duration
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "context deadline has exceeded",
|
||||||
|
parent: func(time.Time) (context.Context, context.CancelFunc) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
cancel()
|
||||||
|
return ctx, cancel
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context has a deadline, 'received at' is not set, wait limit should be one fourth of the remaining deadline from now",
|
||||||
|
parent: func(now time.Time) (context.Context, context.CancelFunc) {
|
||||||
|
return context.WithDeadline(context.Background(), now.Add(60*time.Second))
|
||||||
|
},
|
||||||
|
newReqWaitCtxExpected: true,
|
||||||
|
reqWaitLimitExpected: 15 * time.Second,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context has a deadline, 'received at' is set, wait limit should be one fourth of the deadline starting from the 'received at' time",
|
||||||
|
parent: func(now time.Time) (context.Context, context.CancelFunc) {
|
||||||
|
ctx := apirequest.WithReceivedTimestamp(context.Background(), now.Add(-10*time.Second))
|
||||||
|
return context.WithDeadline(ctx, now.Add(50*time.Second))
|
||||||
|
},
|
||||||
|
newReqWaitCtxExpected: true,
|
||||||
|
reqWaitLimitExpected: 5 * time.Second, // from now
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context does not have any deadline, 'received at' is not set, default wait limit should be in effect from now",
|
||||||
|
defaultRequestWaitLimit: 15 * time.Second,
|
||||||
|
parent: func(time.Time) (context.Context, context.CancelFunc) {
|
||||||
|
return context.WithCancel(context.Background())
|
||||||
|
},
|
||||||
|
newReqWaitCtxExpected: true,
|
||||||
|
reqWaitLimitExpected: 15 * time.Second,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context does not have any deadline, 'received at' is set, default wait limit should be in effect starting from the 'received at' time",
|
||||||
|
defaultRequestWaitLimit: 15 * time.Second,
|
||||||
|
parent: func(now time.Time) (context.Context, context.CancelFunc) {
|
||||||
|
ctx := apirequest.WithReceivedTimestamp(context.Background(), now.Add(-10*time.Second))
|
||||||
|
return context.WithCancel(ctx)
|
||||||
|
},
|
||||||
|
newReqWaitCtxExpected: true,
|
||||||
|
reqWaitLimitExpected: 5 * time.Second, // from now
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context has a deadline, wait limit should not exceed the hard limit of 1m",
|
||||||
|
parent: func(now time.Time) (context.Context, context.CancelFunc) {
|
||||||
|
// let 1/4th of the remaining deadline exceed the hard limit
|
||||||
|
return context.WithDeadline(context.Background(), now.Add(8*time.Minute))
|
||||||
|
},
|
||||||
|
newReqWaitCtxExpected: true,
|
||||||
|
reqWaitLimitExpected: time.Minute,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context has no deadline, wait limit should not exceed the hard limit of 1m",
|
||||||
|
defaultRequestWaitLimit: 2 * time.Minute, // it exceeds the hard limit
|
||||||
|
parent: func(now time.Time) (context.Context, context.CancelFunc) {
|
||||||
|
return context.WithCancel(context.Background())
|
||||||
|
},
|
||||||
|
newReqWaitCtxExpected: true,
|
||||||
|
reqWaitLimitExpected: time.Minute,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
parent, cancel := test.parent(now)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
clock := clocktesting.NewFakePassiveClock(now)
|
||||||
|
newReqWaitCtxGot, cancelGot := getRequestWaitContext(parent, test.defaultRequestWaitLimit, clock)
|
||||||
|
if cancelGot == nil {
|
||||||
|
t.Errorf("Expected a non nil context.CancelFunc")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer cancelGot()
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case test.newReqWaitCtxExpected:
|
||||||
|
deadlineGot, ok := newReqWaitCtxGot.Deadline()
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("Expected the new wait limit context to have a deadline")
|
||||||
|
}
|
||||||
|
if waitLimitGot := deadlineGot.Sub(now); test.reqWaitLimitExpected != waitLimitGot {
|
||||||
|
t.Errorf("Expected request wait limit %s, but got: %s", test.reqWaitLimitExpected, waitLimitGot)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
if parent != newReqWaitCtxGot {
|
||||||
|
t.Errorf("Expected the parent context to be returned: want: %#v, got %#v", parent, newReqWaitCtxGot)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -154,7 +154,6 @@ func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error {
|
|||||||
config.SharedInformerFactory,
|
config.SharedInformerFactory,
|
||||||
kubernetes.NewForConfigOrDie(config.ClientConfig).FlowcontrolV1beta3(),
|
kubernetes.NewForConfigOrDie(config.ClientConfig).FlowcontrolV1beta3(),
|
||||||
config.MaxRequestsInFlight+config.MaxMutatingRequestsInFlight,
|
config.MaxRequestsInFlight+config.MaxMutatingRequestsInFlight,
|
||||||
config.RequestTimeout/4,
|
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
klog.Warningf("Neither kubeconfig is provided nor service-account is mounted, so APIPriorityAndFairness will be disabled")
|
klog.Warningf("Neither kubeconfig is provided nor service-account is mounted, so APIPriorityAndFairness will be disabled")
|
||||||
|
@ -150,9 +150,6 @@ type configController struct {
|
|||||||
// from server configuration.
|
// from server configuration.
|
||||||
serverConcurrencyLimit int
|
serverConcurrencyLimit int
|
||||||
|
|
||||||
// requestWaitLimit comes from server configuration.
|
|
||||||
requestWaitLimit time.Duration
|
|
||||||
|
|
||||||
// watchTracker implements the necessary WatchTracker interface.
|
// watchTracker implements the necessary WatchTracker interface.
|
||||||
WatchTracker
|
WatchTracker
|
||||||
|
|
||||||
@ -287,13 +284,12 @@ func newTestableController(config TestableConfig) *configController {
|
|||||||
asFieldManager: config.AsFieldManager,
|
asFieldManager: config.AsFieldManager,
|
||||||
foundToDangling: config.FoundToDangling,
|
foundToDangling: config.FoundToDangling,
|
||||||
serverConcurrencyLimit: config.ServerConcurrencyLimit,
|
serverConcurrencyLimit: config.ServerConcurrencyLimit,
|
||||||
requestWaitLimit: config.RequestWaitLimit,
|
|
||||||
flowcontrolClient: config.FlowcontrolClient,
|
flowcontrolClient: config.FlowcontrolClient,
|
||||||
priorityLevelStates: make(map[string]*priorityLevelState),
|
priorityLevelStates: make(map[string]*priorityLevelState),
|
||||||
WatchTracker: NewWatchTracker(),
|
WatchTracker: NewWatchTracker(),
|
||||||
MaxSeatsTracker: NewMaxSeatsTracker(),
|
MaxSeatsTracker: NewMaxSeatsTracker(),
|
||||||
}
|
}
|
||||||
klog.V(2).Infof("NewTestableController %q with serverConcurrencyLimit=%d, requestWaitLimit=%s, name=%s, asFieldManager=%q", cfgCtlr.name, cfgCtlr.serverConcurrencyLimit, cfgCtlr.requestWaitLimit, cfgCtlr.name, cfgCtlr.asFieldManager)
|
klog.V(2).Infof("NewTestableController %q with serverConcurrencyLimit=%d, name=%s, asFieldManager=%q", cfgCtlr.name, cfgCtlr.serverConcurrencyLimit, cfgCtlr.name, cfgCtlr.asFieldManager)
|
||||||
// Start with longish delay because conflicts will be between
|
// Start with longish delay because conflicts will be between
|
||||||
// different processes, so take some time to go away.
|
// different processes, so take some time to go away.
|
||||||
cfgCtlr.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue")
|
cfgCtlr.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue")
|
||||||
@ -433,7 +429,7 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta
|
|||||||
plState := plStates[plName]
|
plState := plStates[plName]
|
||||||
if setCompleters {
|
if setCompleters {
|
||||||
qsCompleter, err := queueSetCompleterForPL(cfgCtlr.queueSetFactory, plState.queues,
|
qsCompleter, err := queueSetCompleterForPL(cfgCtlr.queueSetFactory, plState.queues,
|
||||||
plState.pl, cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs,
|
plState.pl, plState.reqsGaugePair, plState.execSeatsObs,
|
||||||
metrics.NewUnionGauge(plState.seatDemandIntegrator, plState.seatDemandRatioedGauge))
|
metrics.NewUnionGauge(plState.seatDemandIntegrator, plState.seatDemandRatioedGauge))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.ErrorS(err, "Inconceivable! Configuration error in existing priority level", "pl", plState.pl)
|
klog.ErrorS(err, "Inconceivable! Configuration error in existing priority level", "pl", plState.pl)
|
||||||
@ -657,10 +653,10 @@ func (cfgCtlr *configController) lockAndDigestConfigObjects(newPLs []*flowcontro
|
|||||||
|
|
||||||
// Supply missing mandatory PriorityLevelConfiguration objects
|
// Supply missing mandatory PriorityLevelConfiguration objects
|
||||||
if !meal.haveExemptPL {
|
if !meal.haveExemptPL {
|
||||||
meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationExempt, cfgCtlr.requestWaitLimit)
|
meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationExempt)
|
||||||
}
|
}
|
||||||
if !meal.haveCatchAllPL {
|
if !meal.haveCatchAllPL {
|
||||||
meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationCatchAll, cfgCtlr.requestWaitLimit)
|
meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationCatchAll)
|
||||||
}
|
}
|
||||||
|
|
||||||
meal.finishQueueSetReconfigsLocked()
|
meal.finishQueueSetReconfigsLocked()
|
||||||
@ -692,7 +688,7 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfi
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues,
|
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues,
|
||||||
pl, meal.cfgCtlr.requestWaitLimit, state.reqsGaugePair, state.execSeatsObs,
|
pl, state.reqsGaugePair, state.execSeatsObs,
|
||||||
metrics.NewUnionGauge(state.seatDemandIntegrator, state.seatDemandRatioedGauge))
|
metrics.NewUnionGauge(state.seatDemandIntegrator, state.seatDemandRatioedGauge))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err)
|
klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err)
|
||||||
@ -798,7 +794,7 @@ func (meal *cfgMeal) processOldPLsLocked() {
|
|||||||
}
|
}
|
||||||
var err error
|
var err error
|
||||||
plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues,
|
plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues,
|
||||||
plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs,
|
plState.pl, plState.reqsGaugePair, plState.execSeatsObs,
|
||||||
metrics.NewUnionGauge(plState.seatDemandIntegrator, plState.seatDemandRatioedGauge))
|
metrics.NewUnionGauge(plState.seatDemandIntegrator, plState.seatDemandRatioedGauge))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// This can not happen because queueSetCompleterForPL already approved this config
|
// This can not happen because queueSetCompleterForPL already approved this config
|
||||||
@ -880,7 +876,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
|
|||||||
// queueSetCompleterForPL returns an appropriate QueueSetCompleter for the
|
// queueSetCompleterForPL returns an appropriate QueueSetCompleter for the
|
||||||
// given priority level configuration. Returns nil and an error if the given
|
// given priority level configuration. Returns nil and an error if the given
|
||||||
// object is malformed in a way that is a problem for this package.
|
// object is malformed in a way that is a problem for this package.
|
||||||
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge, seatDemandGauge metrics.Gauge) (fq.QueueSetCompleter, error) {
|
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge, seatDemandGauge metrics.Gauge) (fq.QueueSetCompleter, error) {
|
||||||
if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementLimited) != (pl.Spec.Limited != nil) {
|
if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementLimited) != (pl.Spec.Limited != nil) {
|
||||||
return nil, errors.New("broken union structure at the top, for Limited")
|
return nil, errors.New("broken union structure at the top, for Limited")
|
||||||
}
|
}
|
||||||
@ -902,7 +898,6 @@ func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flow
|
|||||||
DesiredNumQueues: int(qcAPI.Queues),
|
DesiredNumQueues: int(qcAPI.Queues),
|
||||||
QueueLengthLimit: int(qcAPI.QueueLengthLimit),
|
QueueLengthLimit: int(qcAPI.QueueLengthLimit),
|
||||||
HandSize: int(qcAPI.HandSize),
|
HandSize: int(qcAPI.HandSize),
|
||||||
RequestWaitLimit: requestWaitLimit,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -956,16 +951,15 @@ func (meal *cfgMeal) presyncFlowSchemaStatus(fs *flowcontrol.FlowSchema, isDangl
|
|||||||
|
|
||||||
// imaginePL adds a priority level based on one of the mandatory ones
|
// imaginePL adds a priority level based on one of the mandatory ones
|
||||||
// that does not actually exist (right now) as a real API object.
|
// that does not actually exist (right now) as a real API object.
|
||||||
func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration) {
|
func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration) {
|
||||||
klog.V(3).Infof("No %s PriorityLevelConfiguration found, imagining one", proto.Name)
|
klog.V(3).Infof("No %s PriorityLevelConfiguration found, imagining one", proto.Name)
|
||||||
labelValues := []string{proto.Name}
|
labelValues := []string{proto.Name}
|
||||||
reqsGaugePair := metrics.RatioedGaugeVecPhasedElementPair(meal.cfgCtlr.reqsGaugeVec, 1, 1, labelValues)
|
reqsGaugePair := metrics.RatioedGaugeVecPhasedElementPair(meal.cfgCtlr.reqsGaugeVec, 1, 1, labelValues)
|
||||||
execSeatsObs := meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues)
|
execSeatsObs := meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues)
|
||||||
seatDemandIntegrator := fq.NewNamedIntegrator(meal.cfgCtlr.clock, proto.Name)
|
seatDemandIntegrator := fq.NewNamedIntegrator(meal.cfgCtlr.clock, proto.Name)
|
||||||
seatDemandRatioedGauge := metrics.ApiserverSeatDemands.NewForLabelValuesSafe(0, 1, []string{proto.Name})
|
seatDemandRatioedGauge := metrics.ApiserverSeatDemands.NewForLabelValuesSafe(0, 1, []string{proto.Name})
|
||||||
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto,
|
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, reqsGaugePair,
|
||||||
requestWaitLimit, reqsGaugePair, execSeatsObs,
|
execSeatsObs, metrics.NewUnionGauge(seatDemandIntegrator, seatDemandRatioedGauge))
|
||||||
metrics.NewUnionGauge(seatDemandIntegrator, seatDemandRatioedGauge))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// This can not happen because proto is one of the mandatory
|
// This can not happen because proto is one of the mandatory
|
||||||
// objects and these are not erroneous
|
// objects and these are not erroneous
|
||||||
|
@ -90,7 +90,6 @@ func New(
|
|||||||
informerFactory kubeinformers.SharedInformerFactory,
|
informerFactory kubeinformers.SharedInformerFactory,
|
||||||
flowcontrolClient flowcontrolclient.FlowcontrolV1beta3Interface,
|
flowcontrolClient flowcontrolclient.FlowcontrolV1beta3Interface,
|
||||||
serverConcurrencyLimit int,
|
serverConcurrencyLimit int,
|
||||||
requestWaitLimit time.Duration,
|
|
||||||
) Interface {
|
) Interface {
|
||||||
clk := eventclock.Real{}
|
clk := eventclock.Real{}
|
||||||
return NewTestable(TestableConfig{
|
return NewTestable(TestableConfig{
|
||||||
@ -101,7 +100,6 @@ func New(
|
|||||||
InformerFactory: informerFactory,
|
InformerFactory: informerFactory,
|
||||||
FlowcontrolClient: flowcontrolClient,
|
FlowcontrolClient: flowcontrolClient,
|
||||||
ServerConcurrencyLimit: serverConcurrencyLimit,
|
ServerConcurrencyLimit: serverConcurrencyLimit,
|
||||||
RequestWaitLimit: requestWaitLimit,
|
|
||||||
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
|
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
|
||||||
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
|
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
|
||||||
QueueSetFactory: fqs.NewQueueSetFactory(clk),
|
QueueSetFactory: fqs.NewQueueSetFactory(clk),
|
||||||
@ -139,9 +137,6 @@ type TestableConfig struct {
|
|||||||
// ServerConcurrencyLimit for the controller to enforce
|
// ServerConcurrencyLimit for the controller to enforce
|
||||||
ServerConcurrencyLimit int
|
ServerConcurrencyLimit int
|
||||||
|
|
||||||
// RequestWaitLimit configured on the server
|
|
||||||
RequestWaitLimit time.Duration
|
|
||||||
|
|
||||||
// GaugeVec for metrics about requests, broken down by phase and priority_level
|
// GaugeVec for metrics about requests, broken down by phase and priority_level
|
||||||
ReqsGaugeVec metrics.RatioedGaugeVec
|
ReqsGaugeVec metrics.RatioedGaugeVec
|
||||||
|
|
||||||
|
@ -109,7 +109,6 @@ func TestQueueWaitTimeLatencyTracker(t *testing.T) {
|
|||||||
InformerFactory: informerFactory,
|
InformerFactory: informerFactory,
|
||||||
FlowcontrolClient: flowcontrolClient,
|
FlowcontrolClient: flowcontrolClient,
|
||||||
ServerConcurrencyLimit: 24,
|
ServerConcurrencyLimit: 24,
|
||||||
RequestWaitLimit: time.Minute,
|
|
||||||
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
|
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
|
||||||
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
|
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
|
||||||
QueueSetFactory: fqs.NewQueueSetFactory(clk),
|
QueueSetFactory: fqs.NewQueueSetFactory(clk),
|
||||||
|
@ -143,7 +143,6 @@ func TestBorrowing(t *testing.T) {
|
|||||||
InformerFactory: informerFactory,
|
InformerFactory: informerFactory,
|
||||||
FlowcontrolClient: flowcontrolClient,
|
FlowcontrolClient: flowcontrolClient,
|
||||||
ServerConcurrencyLimit: 24,
|
ServerConcurrencyLimit: 24,
|
||||||
RequestWaitLimit: time.Minute,
|
|
||||||
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
|
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
|
||||||
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
|
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
|
||||||
QueueSetFactory: fqs.NewQueueSetFactory(clk),
|
QueueSetFactory: fqs.NewQueueSetFactory(clk),
|
||||||
|
@ -251,8 +251,7 @@ func TestConfigConsumer(t *testing.T) {
|
|||||||
FoundToDangling: func(found bool) bool { return !found },
|
FoundToDangling: func(found bool) bool { return !found },
|
||||||
InformerFactory: informerFactory,
|
InformerFactory: informerFactory,
|
||||||
FlowcontrolClient: flowcontrolClient,
|
FlowcontrolClient: flowcontrolClient,
|
||||||
ServerConcurrencyLimit: 100, // server concurrency limit
|
ServerConcurrencyLimit: 100, // server concurrency limit
|
||||||
RequestWaitLimit: time.Minute, // request wait limit
|
|
||||||
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
|
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
|
||||||
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
|
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
|
||||||
QueueSetFactory: cts,
|
QueueSetFactory: cts,
|
||||||
@ -384,7 +383,6 @@ func TestAPFControllerWithGracefulShutdown(t *testing.T) {
|
|||||||
InformerFactory: informerFactory,
|
InformerFactory: informerFactory,
|
||||||
FlowcontrolClient: flowcontrolClient,
|
FlowcontrolClient: flowcontrolClient,
|
||||||
ServerConcurrencyLimit: 100,
|
ServerConcurrencyLimit: 100,
|
||||||
RequestWaitLimit: time.Minute,
|
|
||||||
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
|
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
|
||||||
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
|
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
|
||||||
QueueSetFactory: cts,
|
QueueSetFactory: cts,
|
||||||
|
@ -18,7 +18,6 @@ package fairqueuing
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"time"
|
|
||||||
|
|
||||||
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
||||||
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||||
@ -117,7 +116,7 @@ type QueuingConfig struct {
|
|||||||
|
|
||||||
// DesiredNumQueues is the number of queues that the API says
|
// DesiredNumQueues is the number of queues that the API says
|
||||||
// should exist now. This may be non-positive, in which case
|
// should exist now. This may be non-positive, in which case
|
||||||
// QueueLengthLimit, HandSize, and RequestWaitLimit are ignored.
|
// QueueLengthLimit, and HandSize are ignored.
|
||||||
// A value of zero means to respect the ConcurrencyLimit of the DispatchingConfig.
|
// A value of zero means to respect the ConcurrencyLimit of the DispatchingConfig.
|
||||||
// A negative value means to always dispatch immediately upon arrival
|
// A negative value means to always dispatch immediately upon arrival
|
||||||
// (i.e., the requests are "exempt" from limitation).
|
// (i.e., the requests are "exempt" from limitation).
|
||||||
@ -129,10 +128,6 @@ type QueuingConfig struct {
|
|||||||
// HandSize is a parameter of shuffle sharding. Upon arrival of a request, a queue is chosen by randomly
|
// HandSize is a parameter of shuffle sharding. Upon arrival of a request, a queue is chosen by randomly
|
||||||
// dealing a "hand" of this many queues and then picking one of minimum length.
|
// dealing a "hand" of this many queues and then picking one of minimum length.
|
||||||
HandSize int
|
HandSize int
|
||||||
|
|
||||||
// RequestWaitLimit is the maximum amount of time that a request may wait in a queue.
|
|
||||||
// If, by the end of that time, the request has not been dispatched then it is rejected.
|
|
||||||
RequestWaitLimit time.Duration
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// DispatchingConfig defines the configuration of the dispatching aspect of a QueueSet.
|
// DispatchingConfig defines the configuration of the dispatching aspect of a QueueSet.
|
||||||
|
@ -272,7 +272,6 @@ func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig,
|
|||||||
} else {
|
} else {
|
||||||
qCfg.QueueLengthLimit = qs.qCfg.QueueLengthLimit
|
qCfg.QueueLengthLimit = qs.qCfg.QueueLengthLimit
|
||||||
qCfg.HandSize = qs.qCfg.HandSize
|
qCfg.HandSize = qs.qCfg.HandSize
|
||||||
qCfg.RequestWaitLimit = qs.qCfg.RequestWaitLimit
|
|
||||||
}
|
}
|
||||||
|
|
||||||
qs.qCfg = qCfg
|
qs.qCfg = qCfg
|
||||||
@ -300,9 +299,6 @@ const (
|
|||||||
// Serve this one
|
// Serve this one
|
||||||
decisionExecute requestDecision = iota
|
decisionExecute requestDecision = iota
|
||||||
|
|
||||||
// Reject this one due to APF queuing considerations
|
|
||||||
decisionReject
|
|
||||||
|
|
||||||
// This one's context timed out / was canceled
|
// This one's context timed out / was canceled
|
||||||
decisionCancel
|
decisionCancel
|
||||||
)
|
)
|
||||||
@ -337,11 +333,10 @@ func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.Wo
|
|||||||
// ========================================================================
|
// ========================================================================
|
||||||
// Step 1:
|
// Step 1:
|
||||||
// 1) Start with shuffle sharding, to pick a queue.
|
// 1) Start with shuffle sharding, to pick a queue.
|
||||||
// 2) Reject old requests that have been waiting too long
|
// 2) Reject current request if there is not enough concurrency shares and
|
||||||
// 3) Reject current request if there is not enough concurrency shares and
|
|
||||||
// we are at max queue length
|
// we are at max queue length
|
||||||
// 4) If not rejected, create a request and enqueue
|
// 3) If not rejected, create a request and enqueue
|
||||||
req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, workEstimate, hashValue, flowDistinguisher, fsName, descr1, descr2, queueNoteFn)
|
req = qs.shuffleShardAndRejectOrEnqueueLocked(ctx, workEstimate, hashValue, flowDistinguisher, fsName, descr1, descr2, queueNoteFn)
|
||||||
// req == nil means that the request was rejected - no remaining
|
// req == nil means that the request was rejected - no remaining
|
||||||
// concurrency shares and at max queue length already
|
// concurrency shares and at max queue length already
|
||||||
if req == nil {
|
if req == nil {
|
||||||
@ -422,13 +417,7 @@ func (req *request) wait() (bool, bool) {
|
|||||||
}
|
}
|
||||||
req.waitStarted = true
|
req.waitStarted = true
|
||||||
switch decisionAny {
|
switch decisionAny {
|
||||||
case decisionReject:
|
case decisionCancel: // handle in code following this switch
|
||||||
klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, req.descr1, req.descr2)
|
|
||||||
qs.totRequestsRejected++
|
|
||||||
qs.totRequestsTimedout++
|
|
||||||
metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "time-out")
|
|
||||||
return false, qs.isIdleLocked()
|
|
||||||
case decisionCancel:
|
|
||||||
case decisionExecute:
|
case decisionExecute:
|
||||||
klog.V(5).Infof("QS(%s): Dispatching request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2)
|
klog.V(5).Infof("QS(%s): Dispatching request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2)
|
||||||
return true, false
|
return true, false
|
||||||
@ -438,7 +427,7 @@ func (req *request) wait() (bool, bool) {
|
|||||||
}
|
}
|
||||||
// TODO(aaron-prindle) add metrics for this case
|
// TODO(aaron-prindle) add metrics for this case
|
||||||
klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2)
|
klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2)
|
||||||
// remove the request from the queue as it has timed out
|
// remove the request from the queue as its queue wait time has exceeded
|
||||||
queue := req.queue
|
queue := req.queue
|
||||||
if req.removeFromQueueLocked() != nil {
|
if req.removeFromQueueLocked() != nil {
|
||||||
defer qs.boundNextDispatchLocked(queue)
|
defer qs.boundNextDispatchLocked(queue)
|
||||||
@ -446,7 +435,7 @@ func (req *request) wait() (bool, bool) {
|
|||||||
qs.totSeatsWaiting -= req.MaxSeats()
|
qs.totSeatsWaiting -= req.MaxSeats()
|
||||||
qs.totRequestsRejected++
|
qs.totRequestsRejected++
|
||||||
qs.totRequestsCancelled++
|
qs.totRequestsCancelled++
|
||||||
metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled")
|
metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "time-out")
|
||||||
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
|
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
|
||||||
metrics.AddSeatsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -req.MaxSeats())
|
metrics.AddSeatsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -req.MaxSeats())
|
||||||
req.NoteQueued(false)
|
req.NoteQueued(false)
|
||||||
@ -556,25 +545,19 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 {
|
|||||||
return math.Min(float64(seatsRequested), float64(qs.dCfg.ConcurrencyLimit)) / float64(activeQueues)
|
return math.Min(float64(seatsRequested), float64(qs.dCfg.ConcurrencyLimit)) / float64(activeQueues)
|
||||||
}
|
}
|
||||||
|
|
||||||
// timeoutOldRequestsAndRejectOrEnqueueLocked encapsulates the logic required
|
// shuffleShardAndRejectOrEnqueueLocked encapsulates the logic required
|
||||||
// to validate and enqueue a request for the queueSet/QueueSet:
|
// to validate and enqueue a request for the queueSet/QueueSet:
|
||||||
// 1) Start with shuffle sharding, to pick a queue.
|
// 1) Start with shuffle sharding, to pick a queue.
|
||||||
// 2) Reject old requests that have been waiting too long
|
// 2) Reject current request if there is not enough concurrency shares and
|
||||||
// 3) Reject current request if there is not enough concurrency shares and
|
|
||||||
// we are at max queue length
|
// we are at max queue length
|
||||||
// 4) If not rejected, create a request and enqueue
|
// 3) If not rejected, create a request and enqueue
|
||||||
// returns the enqueud request on a successful enqueue
|
// returns the enqueud request on a successful enqueue
|
||||||
// returns nil in the case that there is no available concurrency or
|
// returns nil in the case that there is no available concurrency or
|
||||||
// the queuelengthlimit has been reached
|
// the queuelengthlimit has been reached
|
||||||
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request {
|
func (qs *queueSet) shuffleShardAndRejectOrEnqueueLocked(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request {
|
||||||
// Start with the shuffle sharding, to pick a queue.
|
// Start with the shuffle sharding, to pick a queue.
|
||||||
queueIdx := qs.shuffleShardLocked(hashValue, descr1, descr2)
|
queueIdx := qs.shuffleShardLocked(hashValue, descr1, descr2)
|
||||||
queue := qs.queues[queueIdx]
|
queue := qs.queues[queueIdx]
|
||||||
// The next step is the logic to reject requests that have been waiting too long
|
|
||||||
qs.removeTimedOutRequestsFromQueueToBoundLocked(queue, fsName)
|
|
||||||
// NOTE: currently timeout is only checked for each new request. This means that there can be
|
|
||||||
// requests that are in the queue longer than the timeout if there are no new requests
|
|
||||||
// We prefer the simplicity over the promptness, at least for now.
|
|
||||||
|
|
||||||
defer qs.boundNextDispatchLocked(queue)
|
defer qs.boundNextDispatchLocked(queue)
|
||||||
|
|
||||||
@ -633,44 +616,6 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac
|
|||||||
return bestQueueIdx
|
return bestQueueIdx
|
||||||
}
|
}
|
||||||
|
|
||||||
// removeTimedOutRequestsFromQueueToBoundLocked rejects old requests that have been enqueued
|
|
||||||
// past the requestWaitLimit
|
|
||||||
func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, fsName string) {
|
|
||||||
timeoutCount := 0
|
|
||||||
disqueueSeats := 0
|
|
||||||
now := qs.clock.Now()
|
|
||||||
reqs := queue.requestsWaiting
|
|
||||||
// reqs are sorted oldest -> newest
|
|
||||||
// can short circuit loop (break) if oldest requests are not timing out
|
|
||||||
// as newer requests also will not have timed out
|
|
||||||
|
|
||||||
// now - requestWaitLimit = arrivalLimit
|
|
||||||
arrivalLimit := now.Add(-qs.qCfg.RequestWaitLimit)
|
|
||||||
reqs.Walk(func(req *request) bool {
|
|
||||||
if arrivalLimit.After(req.arrivalTime) {
|
|
||||||
if req.decision.Set(decisionReject) && req.removeFromQueueLocked() != nil {
|
|
||||||
timeoutCount++
|
|
||||||
disqueueSeats += req.MaxSeats()
|
|
||||||
req.NoteQueued(false)
|
|
||||||
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
|
|
||||||
metrics.AddSeatsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -req.MaxSeats())
|
|
||||||
}
|
|
||||||
// we need to check if the next request has timed out.
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
// since reqs are sorted oldest -> newest, we are done here.
|
|
||||||
return false
|
|
||||||
})
|
|
||||||
|
|
||||||
// remove timed out requests from queue
|
|
||||||
if timeoutCount > 0 {
|
|
||||||
qs.totRequestsWaiting -= timeoutCount
|
|
||||||
qs.totSeatsWaiting -= disqueueSeats
|
|
||||||
qs.reqsGaugePair.RequestsWaiting.Add(float64(-timeoutCount))
|
|
||||||
qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// rejectOrEnqueueToBoundLocked rejects or enqueues the newly arrived
|
// rejectOrEnqueueToBoundLocked rejects or enqueues the newly arrived
|
||||||
// request, which has been assigned to a queue. If up against the
|
// request, which has been assigned to a queue. If up against the
|
||||||
// queue length limit and the concurrency limit then returns false.
|
// queue length limit and the concurrency limit then returns false.
|
||||||
|
@ -551,7 +551,6 @@ func TestBaseline(t *testing.T) {
|
|||||||
DesiredNumQueues: 9,
|
DesiredNumQueues: 9,
|
||||||
QueueLengthLimit: 8,
|
QueueLengthLimit: 8,
|
||||||
HandSize: 3,
|
HandSize: 3,
|
||||||
RequestWaitLimit: 10 * time.Minute,
|
|
||||||
}
|
}
|
||||||
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject")
|
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject")
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
|
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
|
||||||
@ -590,7 +589,6 @@ func TestExampt(t *testing.T) {
|
|||||||
DesiredNumQueues: -1,
|
DesiredNumQueues: -1,
|
||||||
QueueLengthLimit: 2,
|
QueueLengthLimit: 2,
|
||||||
HandSize: 3,
|
HandSize: 3,
|
||||||
RequestWaitLimit: 10 * time.Minute,
|
|
||||||
}
|
}
|
||||||
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject")
|
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject")
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
|
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
|
||||||
@ -663,7 +661,6 @@ func TestSeparations(t *testing.T) {
|
|||||||
DesiredNumQueues: 9,
|
DesiredNumQueues: 9,
|
||||||
QueueLengthLimit: 8,
|
QueueLengthLimit: 8,
|
||||||
HandSize: 3,
|
HandSize: 3,
|
||||||
RequestWaitLimit: 10 * time.Minute,
|
|
||||||
}
|
}
|
||||||
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, caseName+" seatDemandSubject")
|
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, caseName+" seatDemandSubject")
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
|
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
|
||||||
@ -704,7 +701,6 @@ func TestUniformFlowsHandSize1(t *testing.T) {
|
|||||||
DesiredNumQueues: 9,
|
DesiredNumQueues: 9,
|
||||||
QueueLengthLimit: 8,
|
QueueLengthLimit: 8,
|
||||||
HandSize: 1,
|
HandSize: 1,
|
||||||
RequestWaitLimit: 10 * time.Minute,
|
|
||||||
}
|
}
|
||||||
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject")
|
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject")
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
|
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
|
||||||
@ -743,7 +739,6 @@ func TestUniformFlowsHandSize3(t *testing.T) {
|
|||||||
DesiredNumQueues: 8,
|
DesiredNumQueues: 8,
|
||||||
QueueLengthLimit: 16,
|
QueueLengthLimit: 16,
|
||||||
HandSize: 3,
|
HandSize: 3,
|
||||||
RequestWaitLimit: 10 * time.Minute,
|
|
||||||
}
|
}
|
||||||
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
|
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
|
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
|
||||||
@ -781,7 +776,6 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
|
|||||||
DesiredNumQueues: 9,
|
DesiredNumQueues: 9,
|
||||||
QueueLengthLimit: 8,
|
QueueLengthLimit: 8,
|
||||||
HandSize: 1,
|
HandSize: 1,
|
||||||
RequestWaitLimit: 10 * time.Minute,
|
|
||||||
}
|
}
|
||||||
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
|
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
|
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
|
||||||
@ -823,7 +817,6 @@ func TestSeatSecondsRollover(t *testing.T) {
|
|||||||
DesiredNumQueues: 9,
|
DesiredNumQueues: 9,
|
||||||
QueueLengthLimit: 8,
|
QueueLengthLimit: 8,
|
||||||
HandSize: 1,
|
HandSize: 1,
|
||||||
RequestWaitLimit: 40 * Quarter,
|
|
||||||
}
|
}
|
||||||
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
|
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
|
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
|
||||||
@ -863,7 +856,6 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
|
|||||||
DesiredNumQueues: 9,
|
DesiredNumQueues: 9,
|
||||||
QueueLengthLimit: 6,
|
QueueLengthLimit: 6,
|
||||||
HandSize: 1,
|
HandSize: 1,
|
||||||
RequestWaitLimit: 10 * time.Minute,
|
|
||||||
}
|
}
|
||||||
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
|
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
|
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
|
||||||
@ -902,7 +894,6 @@ func TestDifferentWidths(t *testing.T) {
|
|||||||
DesiredNumQueues: 64,
|
DesiredNumQueues: 64,
|
||||||
QueueLengthLimit: 13,
|
QueueLengthLimit: 13,
|
||||||
HandSize: 7,
|
HandSize: 7,
|
||||||
RequestWaitLimit: 10 * time.Minute,
|
|
||||||
}
|
}
|
||||||
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
|
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
|
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
|
||||||
@ -940,7 +931,6 @@ func TestTooWide(t *testing.T) {
|
|||||||
DesiredNumQueues: 64,
|
DesiredNumQueues: 64,
|
||||||
QueueLengthLimit: 35,
|
QueueLengthLimit: 35,
|
||||||
HandSize: 7,
|
HandSize: 7,
|
||||||
RequestWaitLimit: 10 * time.Minute,
|
|
||||||
}
|
}
|
||||||
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
|
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
|
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
|
||||||
@ -1003,7 +993,6 @@ func TestWindup(t *testing.T) {
|
|||||||
DesiredNumQueues: 9,
|
DesiredNumQueues: 9,
|
||||||
QueueLengthLimit: 6,
|
QueueLengthLimit: 6,
|
||||||
HandSize: 1,
|
HandSize: 1,
|
||||||
RequestWaitLimit: 10 * time.Minute,
|
|
||||||
}
|
}
|
||||||
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
|
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
|
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
|
||||||
@ -1067,44 +1056,6 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
|
|||||||
}.exercise(t)
|
}.exercise(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTimeout(t *testing.T) {
|
|
||||||
metrics.Register()
|
|
||||||
now := time.Now()
|
|
||||||
|
|
||||||
clk, counter := testeventclock.NewFake(now, 0, nil)
|
|
||||||
qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter))
|
|
||||||
qCfg := fq.QueuingConfig{
|
|
||||||
Name: "TestTimeout",
|
|
||||||
DesiredNumQueues: 128,
|
|
||||||
QueueLengthLimit: 128,
|
|
||||||
HandSize: 1,
|
|
||||||
RequestWaitLimit: 0,
|
|
||||||
}
|
|
||||||
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
|
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
qs := qsComplete(qsc, 1)
|
|
||||||
|
|
||||||
uniformScenario{name: qCfg.Name,
|
|
||||||
qs: qs,
|
|
||||||
clients: []uniformClient{
|
|
||||||
newUniformClient(1001001001, 5, 100, time.Second, time.Second),
|
|
||||||
},
|
|
||||||
concurrencyLimit: 1,
|
|
||||||
evalDuration: time.Second * 10,
|
|
||||||
expectedFair: []bool{true},
|
|
||||||
expectedFairnessMargin: []float64{0.01},
|
|
||||||
evalInqueueMetrics: true,
|
|
||||||
evalExecutingMetrics: true,
|
|
||||||
rejectReason: "time-out",
|
|
||||||
clk: clk,
|
|
||||||
counter: counter,
|
|
||||||
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
|
|
||||||
}.exercise(t)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestContextCancel tests cancellation of a request's context.
|
// TestContextCancel tests cancellation of a request's context.
|
||||||
// The outline is:
|
// The outline is:
|
||||||
// 1. Use a concurrency limit of 1.
|
// 1. Use a concurrency limit of 1.
|
||||||
@ -1131,7 +1082,6 @@ func TestContextCancel(t *testing.T) {
|
|||||||
DesiredNumQueues: 11,
|
DesiredNumQueues: 11,
|
||||||
QueueLengthLimit: 11,
|
QueueLengthLimit: 11,
|
||||||
HandSize: 1,
|
HandSize: 1,
|
||||||
RequestWaitLimit: 15 * time.Second,
|
|
||||||
}
|
}
|
||||||
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
|
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
|
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
|
||||||
@ -1238,7 +1188,6 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) {
|
|||||||
qCfg := fq.QueuingConfig{
|
qCfg := fq.QueuingConfig{
|
||||||
Name: "TestTotalRequestsExecutingWithPanic",
|
Name: "TestTotalRequestsExecutingWithPanic",
|
||||||
DesiredNumQueues: 0,
|
DesiredNumQueues: 0,
|
||||||
RequestWaitLimit: 15 * time.Second,
|
|
||||||
}
|
}
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), fq.NewNamedIntegrator(clk, qCfg.Name))
|
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), fq.NewNamedIntegrator(clk, qCfg.Name))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -21,7 +21,6 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"k8s.io/utils/clock"
|
"k8s.io/utils/clock"
|
||||||
|
|
||||||
@ -60,7 +59,7 @@ func genPL(rng *rand.Rand, name string) *flowcontrol.PriorityLevelConfiguration
|
|||||||
QueueLengthLimit: 5}
|
QueueLengthLimit: 5}
|
||||||
}
|
}
|
||||||
labelVals := []string{"test"}
|
labelVals := []string{"test"}
|
||||||
_, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.RatioedGaugeVecPhasedElementPair(metrics.PriorityLevelConcurrencyGaugeVec, 1, 1, labelVals), metrics.PriorityLevelExecutionSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelVals), fq.NewNamedIntegrator(clock.RealClock{}, name))
|
_, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, metrics.RatioedGaugeVecPhasedElementPair(metrics.PriorityLevelConcurrencyGaugeVec, 1, 1, labelVals), metrics.PriorityLevelExecutionSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelVals), fq.NewNamedIntegrator(clock.RealClock{}, name))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
@ -108,7 +108,6 @@ func Test_GetMaxSeats(t *testing.T) {
|
|||||||
// for the purposes of this test, serverCL ~= nominalCL since there is
|
// for the purposes of this test, serverCL ~= nominalCL since there is
|
||||||
// only 1 PL with large concurrency shares, making mandatory PLs negligible.
|
// only 1 PL with large concurrency shares, making mandatory PLs negligible.
|
||||||
ServerConcurrencyLimit: testcase.nominalCL,
|
ServerConcurrencyLimit: testcase.nominalCL,
|
||||||
RequestWaitLimit: time.Minute,
|
|
||||||
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
|
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
|
||||||
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
|
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
|
||||||
QueueSetFactory: fqs.NewQueueSetFactory(clk),
|
QueueSetFactory: fqs.NewQueueSetFactory(clk),
|
||||||
|
@ -139,8 +139,7 @@ func (ft *fightTest) createController(invert bool, i int) {
|
|||||||
AsFieldManager: fieldMgr,
|
AsFieldManager: fieldMgr,
|
||||||
InformerFactory: informerFactory,
|
InformerFactory: informerFactory,
|
||||||
FlowcontrolClient: fcIfc,
|
FlowcontrolClient: fcIfc,
|
||||||
ServerConcurrencyLimit: 200, // server concurrency limit
|
ServerConcurrencyLimit: 200, // server concurrency limit
|
||||||
RequestWaitLimit: time.Minute / 4, // request wait limit
|
|
||||||
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
|
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
|
||||||
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
|
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
|
||||||
QueueSetFactory: fqtesting.NewNoRestraintFactory(),
|
QueueSetFactory: fqtesting.NewNoRestraintFactory(),
|
||||||
|
Loading…
Reference in New Issue
Block a user