From 8054b0f808d116658ac086e4b71fb34d1502cd57 Mon Sep 17 00:00:00 2001 From: wojtekt Date: Wed, 2 Jun 2021 08:22:29 +0200 Subject: [PATCH] Fix watch rejections in P&F filter --- .../server/filters/priority-and-fairness.go | 11 +++- .../filters/priority-and-fairness_test.go | 50 +++++++++++++------ 2 files changed, 44 insertions(+), 17 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index 13658377c67..23ea5b7287a 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -112,6 +112,9 @@ func WithPriorityAndFairness( } } var resultCh chan interface{} + if isWatchRequest { + resultCh = make(chan interface{}) + } execute := func() { noteExecutingDelta(1) defer noteExecutingDelta(-1) @@ -129,7 +132,6 @@ func WithPriorityAndFairness( setResponseHeaders(classification, w) if isWatchRequest { - resultCh = make(chan interface{}) go func() { defer func() { err := recover() @@ -179,7 +181,12 @@ func WithPriorityAndFairness( } tooManyRequests(r, w) } - // In case of watch, from P&F POV it already finished, but we need to wait until the request itself finishes. + + // For watch requests, from the APF point of view the request is already + // finished at this point. However, that doesn't mean it is already finished + // from the non-APF point of view. So we need to wait here until the request is: + // 1) finished being processed or + // 2) rejected if isWatchRequest { err := <-resultCh if err != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index 376ab7ed389..80ccaa35bf1 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -112,7 +112,7 @@ func (t fakeApfFilter) Run(stopCh <-chan struct{}) error { func (t fakeApfFilter) Install(c *mux.PathRecorderMux) { } -func newApfServerWithSingleRequest(decision mockDecision, t *testing.T) *httptest.Server { +func newApfServerWithSingleRequest(t *testing.T, decision mockDecision) *httptest.Server { onExecuteFunc := func() { if decision == decisionCancelWait { t.Errorf("execute should not be invoked") @@ -134,24 +134,24 @@ func newApfServerWithSingleRequest(decision mockDecision, t *testing.T) *httptes t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting) } } - return newApfServerWithHooks(decision, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc, t) + return newApfServerWithHooks(t, decision, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc) } -func newApfServerWithHooks(decision mockDecision, onExecute, postExecute, postEnqueue, postDequeue func(), t *testing.T) *httptest.Server { +func newApfServerWithHooks(t *testing.T, decision mockDecision, onExecute, postExecute, postEnqueue, postDequeue func()) *httptest.Server { fakeFilter := fakeApfFilter{ mockDecision: decision, postEnqueue: postEnqueue, postDequeue: postDequeue, } - return newApfServerWithFilter(fakeFilter, onExecute, postExecute, t) + return newApfServerWithFilter(t, fakeFilter, onExecute, postExecute) } -func newApfServerWithFilter(flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func(), t *testing.T) *httptest.Server { - apfServer := httptest.NewServer(newApfHandlerWithFilter(flowControlFilter, onExecute, postExecute, t)) +func newApfServerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func()) *httptest.Server { + apfServer := httptest.NewServer(newApfHandlerWithFilter(t, flowControlFilter, onExecute, postExecute)) return apfServer } -func newApfHandlerWithFilter(flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func(), t *testing.T) http.Handler { +func newApfHandlerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func()) http.Handler { requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")} longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy")) @@ -176,7 +176,7 @@ func newApfHandlerWithFilter(flowControlFilter utilflowcontrol.Interface, onExec func TestApfSkipLongRunningRequest(t *testing.T) { epmetrics.Register() - server := newApfServerWithSingleRequest(decisionSkipFilter, t) + server := newApfServerWithSingleRequest(t, decisionSkipFilter) defer server.Close() ctx, cancel := context.WithCancel(context.Background()) @@ -193,7 +193,7 @@ func TestApfSkipLongRunningRequest(t *testing.T) { func TestApfRejectRequest(t *testing.T) { epmetrics.Register() - server := newApfServerWithSingleRequest(decisionReject, t) + server := newApfServerWithSingleRequest(t, decisionReject) defer server.Close() ctx, cancel := context.WithCancel(context.Background()) @@ -218,7 +218,7 @@ func TestApfExemptRequest(t *testing.T) { // so that an observation will cause some data to go into the Prometheus metrics. time.Sleep(time.Millisecond * 50) - server := newApfServerWithSingleRequest(decisionNoQueuingExecute, t) + server := newApfServerWithSingleRequest(t, decisionNoQueuingExecute) defer server.Close() ctx, cancel := context.WithCancel(context.Background()) @@ -244,7 +244,7 @@ func TestApfExecuteRequest(t *testing.T) { // so that an observation will cause some data to go into the Prometheus metrics. time.Sleep(time.Millisecond * 50) - server := newApfServerWithSingleRequest(decisionQueuingExecute, t) + server := newApfServerWithSingleRequest(t, decisionQueuingExecute) defer server.Close() ctx, cancel := context.WithCancel(context.Background()) @@ -316,7 +316,7 @@ func TestApfExecuteMultipleRequests(t *testing.T) { finishExecute.Wait() } - server := newApfServerWithHooks(decisionQueuingExecute, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc, t) + server := newApfServerWithHooks(t, decisionQueuingExecute, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc) defer server.Close() ctx, cancel := context.WithCancel(context.Background()) @@ -444,7 +444,7 @@ func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) { postExecuteFunc := func() {} - server := newApfServerWithFilter(fakeFilter, onExecuteFunc, postExecuteFunc, t) + server := newApfServerWithFilter(t, fakeFilter, onExecuteFunc, postExecuteFunc) defer server.Close() var wg sync.WaitGroup @@ -473,6 +473,24 @@ func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) { wg.Wait() } +func TestApfRejectWatchRequestsWithInitializationSignal(t *testing.T) { + fakeFilter := &fakeWatchApfFilter{ + capacity: 0, + } + + onExecuteFunc := func() { + t.Errorf("Request unexepectedly executing") + } + postExecuteFunc := func() {} + + server := newApfServerWithFilter(t, fakeFilter, onExecuteFunc, postExecuteFunc) + defer server.Close() + + if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusTooManyRequests); err != nil { + t.Error(err) + } +} + func TestApfWatchPanic(t *testing.T) { fakeFilter := &fakeWatchApfFilter{ capacity: 1, @@ -483,7 +501,7 @@ func TestApfWatchPanic(t *testing.T) { } postExecuteFunc := func() {} - apfHandler := newApfHandlerWithFilter(fakeFilter, onExecuteFunc, postExecuteFunc, t) + apfHandler := newApfHandlerWithFilter(t, fakeFilter, onExecuteFunc, postExecuteFunc) handler := func(w http.ResponseWriter, r *http.Request) { defer func() { if err := recover(); err == nil { @@ -504,6 +522,8 @@ func TestApfWatchPanic(t *testing.T) { // automatically even if the server doesn't cancel is explicitly. // This is required to ensure we won't be leaking goroutines that wait for context // cancelling (e.g. in queueset::StartRequest method). +// Even though in production we are not using httptest.Server, this logic is shared +// across these two. func TestContextClosesOnRequestProcessed(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) @@ -528,7 +548,7 @@ func TestContextClosesOnRequestProcessed(t *testing.T) { func TestApfCancelWaitRequest(t *testing.T) { epmetrics.Register() - server := newApfServerWithSingleRequest(decisionCancelWait, t) + server := newApfServerWithSingleRequest(t, decisionCancelWait) defer server.Close() if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil {