mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-06 18:54:06 +00:00
apf: use context for queue wait
This commit is contained in:
parent
9c25ce6f3e
commit
f39213a7e4
@ -915,7 +915,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
|
|||||||
requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(
|
requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(
|
||||||
c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg, c.FlowControl.GetMaxSeats)
|
c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg, c.FlowControl.GetMaxSeats)
|
||||||
handler = filterlatency.TrackCompleted(handler)
|
handler = filterlatency.TrackCompleted(handler)
|
||||||
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
|
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator, c.RequestTimeout/4)
|
||||||
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "priorityandfairness")
|
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "priorityandfairness")
|
||||||
} else {
|
} else {
|
||||||
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
|
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
|
||||||
|
@ -35,6 +35,7 @@ import (
|
|||||||
fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||||
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
|
utilsclock "k8s.io/utils/clock"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PriorityAndFairnessClassification identifies the results of
|
// PriorityAndFairnessClassification identifies the results of
|
||||||
@ -78,6 +79,10 @@ type priorityAndFairnessHandler struct {
|
|||||||
// the purpose of computing RetryAfter header to avoid system
|
// the purpose of computing RetryAfter header to avoid system
|
||||||
// overload.
|
// overload.
|
||||||
droppedRequests utilflowcontrol.DroppedRequestsTracker
|
droppedRequests utilflowcontrol.DroppedRequestsTracker
|
||||||
|
|
||||||
|
// newReqWaitCtxFn creates a derived context with a deadline
|
||||||
|
// of how long a given request can wait in its queue.
|
||||||
|
newReqWaitCtxFn func(context.Context) (context.Context, context.CancelFunc)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Request) {
|
func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -240,8 +245,9 @@ func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Reque
|
|||||||
resultCh <- err
|
resultCh <- err
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// We create handleCtx with explicit cancelation function.
|
// We create handleCtx with an adjusted deadline, for two reasons.
|
||||||
// The reason for it is that Handle() underneath may start additional goroutine
|
// One is to limit the time the request waits before its execution starts.
|
||||||
|
// The other reason for it is that Handle() underneath may start additional goroutine
|
||||||
// that is blocked on context cancellation. However, from APF point of view,
|
// that is blocked on context cancellation. However, from APF point of view,
|
||||||
// we don't want to wait until the whole watch request is processed (which is
|
// we don't want to wait until the whole watch request is processed (which is
|
||||||
// when it context is actually cancelled) - we want to unblock the goroutine as
|
// when it context is actually cancelled) - we want to unblock the goroutine as
|
||||||
@ -249,7 +255,7 @@ func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Reque
|
|||||||
//
|
//
|
||||||
// Note that we explicitly do NOT call the actuall handler using that context
|
// Note that we explicitly do NOT call the actuall handler using that context
|
||||||
// to avoid cancelling request too early.
|
// to avoid cancelling request too early.
|
||||||
handleCtx, handleCtxCancel := context.WithCancel(ctx)
|
handleCtx, handleCtxCancel := h.newReqWaitCtxFn(ctx)
|
||||||
defer handleCtxCancel()
|
defer handleCtxCancel()
|
||||||
|
|
||||||
// Note that Handle will return irrespective of whether the request
|
// Note that Handle will return irrespective of whether the request
|
||||||
@ -286,7 +292,11 @@ func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Reque
|
|||||||
h.handler.ServeHTTP(w, r)
|
h.handler.ServeHTTP(w, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
h.fcIfc.Handle(ctx, digest, noteFn, estimateWork, queueNote, execute)
|
func() {
|
||||||
|
handleCtx, cancelFn := h.newReqWaitCtxFn(ctx)
|
||||||
|
defer cancelFn()
|
||||||
|
h.fcIfc.Handle(handleCtx, digest, noteFn, estimateWork, queueNote, execute)
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
if !served {
|
if !served {
|
||||||
@ -309,6 +319,7 @@ func WithPriorityAndFairness(
|
|||||||
longRunningRequestCheck apirequest.LongRunningRequestCheck,
|
longRunningRequestCheck apirequest.LongRunningRequestCheck,
|
||||||
fcIfc utilflowcontrol.Interface,
|
fcIfc utilflowcontrol.Interface,
|
||||||
workEstimator flowcontrolrequest.WorkEstimatorFunc,
|
workEstimator flowcontrolrequest.WorkEstimatorFunc,
|
||||||
|
defaultRequestWaitLimit time.Duration,
|
||||||
) http.Handler {
|
) http.Handler {
|
||||||
if fcIfc == nil {
|
if fcIfc == nil {
|
||||||
klog.Warningf("priority and fairness support not found, skipping")
|
klog.Warningf("priority and fairness support not found, skipping")
|
||||||
@ -322,12 +333,18 @@ func WithPriorityAndFairness(
|
|||||||
waitingMark.mutatingObserver = fcmetrics.GetWaitingMutatingConcurrency()
|
waitingMark.mutatingObserver = fcmetrics.GetWaitingMutatingConcurrency()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
clock := &utilsclock.RealClock{}
|
||||||
|
newReqWaitCtxFn := func(ctx context.Context) (context.Context, context.CancelFunc) {
|
||||||
|
return getRequestWaitContext(ctx, defaultRequestWaitLimit, clock)
|
||||||
|
}
|
||||||
|
|
||||||
priorityAndFairnessHandler := &priorityAndFairnessHandler{
|
priorityAndFairnessHandler := &priorityAndFairnessHandler{
|
||||||
handler: handler,
|
handler: handler,
|
||||||
longRunningRequestCheck: longRunningRequestCheck,
|
longRunningRequestCheck: longRunningRequestCheck,
|
||||||
fcIfc: fcIfc,
|
fcIfc: fcIfc,
|
||||||
workEstimator: workEstimator,
|
workEstimator: workEstimator,
|
||||||
droppedRequests: utilflowcontrol.NewDroppedRequestsTracker(),
|
droppedRequests: utilflowcontrol.NewDroppedRequestsTracker(),
|
||||||
|
newReqWaitCtxFn: newReqWaitCtxFn,
|
||||||
}
|
}
|
||||||
return http.HandlerFunc(priorityAndFairnessHandler.Handle)
|
return http.HandlerFunc(priorityAndFairnessHandler.Handle)
|
||||||
}
|
}
|
||||||
@ -356,3 +373,48 @@ func tooManyRequests(req *http.Request, w http.ResponseWriter, retryAfter string
|
|||||||
w.Header().Set("Retry-After", retryAfter)
|
w.Header().Set("Retry-After", retryAfter)
|
||||||
http.Error(w, "Too many requests, please try again later.", http.StatusTooManyRequests)
|
http.Error(w, "Too many requests, please try again later.", http.StatusTooManyRequests)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getRequestWaitContext returns a new context with a deadline of how
|
||||||
|
// long the request is allowed to wait before it is removed from its
|
||||||
|
// queue and rejected.
|
||||||
|
// The context.CancelFunc returned must never be nil and the caller is
|
||||||
|
// responsible for calling the CancelFunc function for cleanup.
|
||||||
|
// - ctx: the context associated with the request (it may or may
|
||||||
|
// not have a deadline).
|
||||||
|
// - defaultRequestWaitLimit: the default wait duration that is used
|
||||||
|
// if the request context does not have any deadline.
|
||||||
|
// (a) initialization of a watch or
|
||||||
|
// (b) a request whose context has no deadline
|
||||||
|
//
|
||||||
|
// clock comes in handy for testing the function
|
||||||
|
func getRequestWaitContext(ctx context.Context, defaultRequestWaitLimit time.Duration, clock utilsclock.PassiveClock) (context.Context, context.CancelFunc) {
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return ctx, func() {}
|
||||||
|
}
|
||||||
|
|
||||||
|
reqArrivedAt := clock.Now()
|
||||||
|
if reqReceivedTimestamp, ok := apirequest.ReceivedTimestampFrom(ctx); ok {
|
||||||
|
reqArrivedAt = reqReceivedTimestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
// a) we will allow the request to wait in the queue for one
|
||||||
|
// fourth of the time of its allotted deadline.
|
||||||
|
// b) if the request context does not have any deadline
|
||||||
|
// then we default to 'defaultRequestWaitLimit'
|
||||||
|
// in any case, the wait limit for any request must not
|
||||||
|
// exceed the hard limit of 1m
|
||||||
|
//
|
||||||
|
// request has deadline:
|
||||||
|
// wait-limit = min(remaining deadline / 4, 1m)
|
||||||
|
// request has no deadline:
|
||||||
|
// wait-limit = min(defaultRequestWaitLimit, 1m)
|
||||||
|
thisReqWaitLimit := defaultRequestWaitLimit
|
||||||
|
if deadline, ok := ctx.Deadline(); ok {
|
||||||
|
thisReqWaitLimit = deadline.Sub(reqArrivedAt) / 4
|
||||||
|
}
|
||||||
|
if thisReqWaitLimit > time.Minute {
|
||||||
|
thisReqWaitLimit = time.Minute
|
||||||
|
}
|
||||||
|
|
||||||
|
return context.WithDeadline(ctx, reqArrivedAt.Add(thisReqWaitLimit))
|
||||||
|
}
|
||||||
|
@ -51,6 +51,7 @@ import (
|
|||||||
"k8s.io/component-base/metrics/legacyregistry"
|
"k8s.io/component-base/metrics/legacyregistry"
|
||||||
"k8s.io/component-base/metrics/testutil"
|
"k8s.io/component-base/metrics/testutil"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
|
clocktesting "k8s.io/utils/clock/testing"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
)
|
)
|
||||||
@ -153,23 +154,23 @@ func newApfServerWithHooks(t *testing.T, decision mockDecision, onExecute, postE
|
|||||||
WatchTracker: utilflowcontrol.NewWatchTracker(),
|
WatchTracker: utilflowcontrol.NewWatchTracker(),
|
||||||
MaxSeatsTracker: utilflowcontrol.NewMaxSeatsTracker(),
|
MaxSeatsTracker: utilflowcontrol.NewMaxSeatsTracker(),
|
||||||
}
|
}
|
||||||
return newApfServerWithFilter(t, fakeFilter, onExecute, postExecute)
|
return newApfServerWithFilter(t, fakeFilter, time.Minute/4, onExecute, postExecute)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newApfServerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func()) *httptest.Server {
|
func newApfServerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Interface, defaultWaitLimit time.Duration, onExecute, postExecute func()) *httptest.Server {
|
||||||
epmetrics.Register()
|
epmetrics.Register()
|
||||||
fcmetrics.Register()
|
fcmetrics.Register()
|
||||||
apfServer := httptest.NewServer(newApfHandlerWithFilter(t, flowControlFilter, onExecute, postExecute))
|
apfServer := httptest.NewServer(newApfHandlerWithFilter(t, flowControlFilter, defaultWaitLimit, onExecute, postExecute))
|
||||||
return apfServer
|
return apfServer
|
||||||
}
|
}
|
||||||
|
|
||||||
func newApfHandlerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func()) http.Handler {
|
func newApfHandlerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Interface, defaultWaitLimit time.Duration, onExecute, postExecute func()) http.Handler {
|
||||||
requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")}
|
requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")}
|
||||||
longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
|
longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
|
||||||
|
|
||||||
apfHandler := WithPriorityAndFairness(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
apfHandler := WithPriorityAndFairness(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
onExecute()
|
onExecute()
|
||||||
}), longRunningRequestCheck, flowControlFilter, defaultRequestWorkEstimator)
|
}), longRunningRequestCheck, flowControlFilter, defaultRequestWorkEstimator, defaultWaitLimit)
|
||||||
|
|
||||||
handler := apifilters.WithRequestInfo(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
handler := apifilters.WithRequestInfo(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{
|
r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{
|
||||||
@ -458,7 +459,7 @@ func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) {
|
|||||||
|
|
||||||
postExecuteFunc := func() {}
|
postExecuteFunc := func() {}
|
||||||
|
|
||||||
server := newApfServerWithFilter(t, fakeFilter, onExecuteFunc, postExecuteFunc)
|
server := newApfServerWithFilter(t, fakeFilter, time.Minute/4, onExecuteFunc, postExecuteFunc)
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
@ -498,7 +499,7 @@ func TestApfRejectWatchRequestsWithInitializationSignal(t *testing.T) {
|
|||||||
}
|
}
|
||||||
postExecuteFunc := func() {}
|
postExecuteFunc := func() {}
|
||||||
|
|
||||||
server := newApfServerWithFilter(t, fakeFilter, onExecuteFunc, postExecuteFunc)
|
server := newApfServerWithFilter(t, fakeFilter, time.Minute/4, onExecuteFunc, postExecuteFunc)
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusTooManyRequests); err != nil {
|
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusTooManyRequests); err != nil {
|
||||||
@ -517,7 +518,7 @@ func TestApfWatchPanic(t *testing.T) {
|
|||||||
}
|
}
|
||||||
postExecuteFunc := func() {}
|
postExecuteFunc := func() {}
|
||||||
|
|
||||||
apfHandler := newApfHandlerWithFilter(t, fakeFilter, onExecuteFunc, postExecuteFunc)
|
apfHandler := newApfHandlerWithFilter(t, fakeFilter, time.Minute/4, onExecuteFunc, postExecuteFunc)
|
||||||
handler := func(w http.ResponseWriter, r *http.Request) {
|
handler := func(w http.ResponseWriter, r *http.Request) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := recover(); err == nil {
|
if err := recover(); err == nil {
|
||||||
@ -564,7 +565,7 @@ func TestApfWatchHandlePanic(t *testing.T) {
|
|||||||
|
|
||||||
for _, test := range testCases {
|
for _, test := range testCases {
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
apfHandler := newApfHandlerWithFilter(t, test.filter, onExecuteFunc, postExecuteFunc)
|
apfHandler := newApfHandlerWithFilter(t, test.filter, time.Minute/4, onExecuteFunc, postExecuteFunc)
|
||||||
handler := func(w http.ResponseWriter, r *http.Request) {
|
handler := func(w http.ResponseWriter, r *http.Request) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := recover(); err == nil {
|
if err := recover(); err == nil {
|
||||||
@ -649,6 +650,7 @@ func TestApfWithRequestDigest(t *testing.T) {
|
|||||||
longRunningFunc,
|
longRunningFunc,
|
||||||
fakeFilter,
|
fakeFilter,
|
||||||
func(_ *http.Request, _, _ string) fcrequest.WorkEstimate { return workExpected },
|
func(_ *http.Request, _, _ string) fcrequest.WorkEstimate { return workExpected },
|
||||||
|
time.Minute/4,
|
||||||
)
|
)
|
||||||
|
|
||||||
w := httptest.NewRecorder()
|
w := httptest.NewRecorder()
|
||||||
@ -1231,7 +1233,7 @@ func newHandlerChain(t *testing.T, handler http.Handler, filter utilflowcontrol.
|
|||||||
requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")}
|
requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")}
|
||||||
longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
|
longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
|
||||||
|
|
||||||
apfHandler := WithPriorityAndFairness(handler, longRunningRequestCheck, filter, defaultRequestWorkEstimator)
|
apfHandler := WithPriorityAndFairness(handler, longRunningRequestCheck, filter, defaultRequestWorkEstimator, time.Minute/4)
|
||||||
|
|
||||||
// add the handler in the chain that adds the specified user to the request context
|
// add the handler in the chain that adds the specified user to the request context
|
||||||
handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -1407,3 +1409,107 @@ func isStreamReset(err error) bool {
|
|||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGetRequestWaitContext(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
defaultRequestWaitLimit time.Duration
|
||||||
|
parent func(t time.Time) (context.Context, context.CancelFunc)
|
||||||
|
newReqWaitCtxExpected bool
|
||||||
|
reqWaitLimitExpected time.Duration
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "context deadline has exceeded",
|
||||||
|
parent: func(time.Time) (context.Context, context.CancelFunc) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
cancel()
|
||||||
|
return ctx, cancel
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context has a deadline, 'received at' is not set, wait limit should be one fourth of the remaining deadline from now",
|
||||||
|
parent: func(now time.Time) (context.Context, context.CancelFunc) {
|
||||||
|
return context.WithDeadline(context.Background(), now.Add(60*time.Second))
|
||||||
|
},
|
||||||
|
newReqWaitCtxExpected: true,
|
||||||
|
reqWaitLimitExpected: 15 * time.Second,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context has a deadline, 'received at' is set, wait limit should be one fourth of the deadline starting from the 'received at' time",
|
||||||
|
parent: func(now time.Time) (context.Context, context.CancelFunc) {
|
||||||
|
ctx := apirequest.WithReceivedTimestamp(context.Background(), now.Add(-10*time.Second))
|
||||||
|
return context.WithDeadline(ctx, now.Add(50*time.Second))
|
||||||
|
},
|
||||||
|
newReqWaitCtxExpected: true,
|
||||||
|
reqWaitLimitExpected: 5 * time.Second, // from now
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context does not have any deadline, 'received at' is not set, default wait limit should be in effect from now",
|
||||||
|
defaultRequestWaitLimit: 15 * time.Second,
|
||||||
|
parent: func(time.Time) (context.Context, context.CancelFunc) {
|
||||||
|
return context.WithCancel(context.Background())
|
||||||
|
},
|
||||||
|
newReqWaitCtxExpected: true,
|
||||||
|
reqWaitLimitExpected: 15 * time.Second,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context does not have any deadline, 'received at' is set, default wait limit should be in effect starting from the 'received at' time",
|
||||||
|
defaultRequestWaitLimit: 15 * time.Second,
|
||||||
|
parent: func(now time.Time) (context.Context, context.CancelFunc) {
|
||||||
|
ctx := apirequest.WithReceivedTimestamp(context.Background(), now.Add(-10*time.Second))
|
||||||
|
return context.WithCancel(ctx)
|
||||||
|
},
|
||||||
|
newReqWaitCtxExpected: true,
|
||||||
|
reqWaitLimitExpected: 5 * time.Second, // from now
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context has a deadline, wait limit should not exceed the hard limit of 1m",
|
||||||
|
parent: func(now time.Time) (context.Context, context.CancelFunc) {
|
||||||
|
// let 1/4th of the remaining deadline exceed the hard limit
|
||||||
|
return context.WithDeadline(context.Background(), now.Add(8*time.Minute))
|
||||||
|
},
|
||||||
|
newReqWaitCtxExpected: true,
|
||||||
|
reqWaitLimitExpected: time.Minute,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context has no deadline, wait limit should not exceed the hard limit of 1m",
|
||||||
|
defaultRequestWaitLimit: 2 * time.Minute, // it exceeds the hard limit
|
||||||
|
parent: func(now time.Time) (context.Context, context.CancelFunc) {
|
||||||
|
return context.WithCancel(context.Background())
|
||||||
|
},
|
||||||
|
newReqWaitCtxExpected: true,
|
||||||
|
reqWaitLimitExpected: time.Minute,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
parent, cancel := test.parent(now)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
clock := clocktesting.NewFakePassiveClock(now)
|
||||||
|
newReqWaitCtxGot, cancelGot := getRequestWaitContext(parent, test.defaultRequestWaitLimit, clock)
|
||||||
|
if cancelGot == nil {
|
||||||
|
t.Errorf("Expected a non nil context.CancelFunc")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer cancelGot()
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case test.newReqWaitCtxExpected:
|
||||||
|
deadlineGot, ok := newReqWaitCtxGot.Deadline()
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("Expected the new wait limit context to have a deadline")
|
||||||
|
}
|
||||||
|
if waitLimitGot := deadlineGot.Sub(now); test.reqWaitLimitExpected != waitLimitGot {
|
||||||
|
t.Errorf("Expected request wait limit %s, but got: %s", test.reqWaitLimitExpected, waitLimitGot)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
if parent != newReqWaitCtxGot {
|
||||||
|
t.Errorf("Expected the parent context to be returned: want: %#v, got %#v", parent, newReqWaitCtxGot)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user