From 833ce487b9fab1650d5aaba2a8b295f8a90e07bd Mon Sep 17 00:00:00 2001 From: bjrara Date: Fri, 9 Oct 2020 16:51:19 +0800 Subject: [PATCH] Add multi request test --- .../filters/priority-and-fairness_test.go | 143 ++++++++++++++---- 1 file changed, 116 insertions(+), 27 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index 91b381fb676..d2e130b3576 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -22,6 +22,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "sync" "testing" "time" @@ -39,8 +40,10 @@ import ( "k8s.io/component-base/metrics/legacyregistry" ) +type mockDecision int + const ( - decisionNoQueuingExecute = iota + decisionNoQueuingExecute mockDecision = iota decisionQueuingExecute decisionCancelWait decisionReject @@ -48,7 +51,7 @@ const ( ) type fakeApfFilter struct { - mockDecision int + mockDecision mockDecision postEnqueue func() postDequeue func() } @@ -92,29 +95,41 @@ func (t fakeApfFilter) Run(stopCh <-chan struct{}) error { func (t fakeApfFilter) Install(c *mux.PathRecorderMux) { } -func newApfServer(decision int, t *testing.T) *httptest.Server { +func newApfServerWithSingleRequest(decision mockDecision, t *testing.T) *httptest.Server { + onExecuteFunc := func() { + if decision == decisionCancelWait { + t.Errorf("execute should not be invoked") + } + // atomicReadOnlyExecuting can be either 0 or 1 as we test one request at a time. + if decision != decisionSkipFilter && atomicReadOnlyExecuting != 1 { + t.Errorf("Wanted %d requests executing, got %d", 1, atomicReadOnlyExecuting) + } + } + postExecuteFunc := func() {} + // atomicReadOnlyWaiting can be either 0 or 1 as we test one request at a time. + postEnqueueFunc := func() { + if atomicReadOnlyWaiting != 1 { + t.Errorf("Wanted %d requests in queue, got %d", 1, atomicReadOnlyWaiting) + } + } + postDequeueFunc := func() { + if atomicReadOnlyWaiting != 0 { + t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting) + } + } + return newApfServerWithHooks(decision, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc, t) +} + +func newApfServerWithHooks(decision mockDecision, onExecute, postExecute, postEnqueue, postDequeue func(), t *testing.T) *httptest.Server { requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")} longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy")) apfHandler := WithPriorityAndFairness(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if decision == decisionCancelWait { - t.Errorf("execute should not be invoked") - } - if decision != decisionSkipFilter && atomicReadOnlyExecuting != 1 { - t.Errorf("Wanted %d requests executing, got %d", 1, atomicReadOnlyExecuting) - } + onExecute() }), longRunningRequestCheck, fakeApfFilter{ mockDecision: decision, - postEnqueue: func() { - if atomicReadOnlyWaiting != 1 { - t.Errorf("Wanted %d requests in queue, got %d", 1, atomicReadOnlyWaiting) - } - }, - postDequeue: func() { - if atomicReadOnlyWaiting != 0 { - t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting) - } - }, + postEnqueue: postEnqueue, + postDequeue: postDequeue, }) handler := apifilters.WithRequestInfo(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -122,6 +137,7 @@ func newApfServer(decision int, t *testing.T) *httptest.Server { Groups: []string{user.AllUnauthenticated}, })) apfHandler.ServeHTTP(w, r) + postExecute() if atomicReadOnlyExecuting != 0 { t.Errorf("Wanted %d requests executing, got %d", 0, atomicReadOnlyExecuting) } @@ -134,9 +150,10 @@ func newApfServer(decision int, t *testing.T) *httptest.Server { func TestApfSkipLongRunningRequest(t *testing.T) { epmetrics.Register() - server := newApfServer(decisionSkipFilter, t) + server := newApfServerWithSingleRequest(decisionSkipFilter, t) defer server.Close() + // send a watch request to test skipping long running request if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces?watch=true", server.URL), http.StatusOK); err != nil { // request should not be rejected t.Error(err) @@ -146,7 +163,7 @@ func TestApfSkipLongRunningRequest(t *testing.T) { func TestApfRejectRequest(t *testing.T) { epmetrics.Register() - server := newApfServer(decisionReject, t) + server := newApfServerWithSingleRequest(decisionReject, t) defer server.Close() if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil { @@ -163,10 +180,11 @@ func TestApfExemptRequest(t *testing.T) { epmetrics.Register() fcmetrics.Register() - // wait the first sampleAndWaterMark metrics to be collected + // Wait for at least one sampling window to pass since creation of metrics.ReadWriteConcurrencyObserverPairGenerator, + // so that an observation will cause some data to go into the Prometheus metrics. time.Sleep(time.Millisecond * 50) - server := newApfServer(decisionNoQueuingExecute, t) + server := newApfServerWithSingleRequest(decisionNoQueuingExecute, t) defer server.Close() if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK); err != nil { @@ -184,10 +202,11 @@ func TestApfExecuteRequest(t *testing.T) { epmetrics.Register() fcmetrics.Register() - // wait the first sampleAndWaterMark metrics to be collected + // Wait for at least one sampling window to pass since creation of metrics.ReadWriteConcurrencyObserverPairGenerator, + // so that an observation will cause some data to go into the Prometheus metrics. time.Sleep(time.Millisecond * 50) - server := newApfServer(decisionQueuingExecute, t) + server := newApfServerWithSingleRequest(decisionQueuingExecute, t) defer server.Close() if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK); err != nil { @@ -202,10 +221,81 @@ func TestApfExecuteRequest(t *testing.T) { }) } +func TestApfExecuteMultipleRequests(t *testing.T) { + epmetrics.Register() + fcmetrics.Register() + + // Wait for at least one sampling window to pass since creation of metrics.ReadWriteConcurrencyObserverPairGenerator, + // so that an observation will cause some data to go into the Prometheus metrics. + time.Sleep(time.Millisecond * 50) + + concurrentRequests := 5 + var preStartExecute, postStartExecute, preEnqueue, postEnqueue, preDequeue, postDequeue, finishExecute sync.WaitGroup + for _, wg := range []*sync.WaitGroup{&preStartExecute, &postStartExecute, &preEnqueue, &postEnqueue, &preDequeue, &postDequeue, &finishExecute} { + wg.Add(concurrentRequests) + } + + onExecuteFunc := func() { + preStartExecute.Done() + preStartExecute.Wait() + if int(atomicReadOnlyExecuting) != concurrentRequests { + t.Errorf("Wanted %d requests executing, got %d", concurrentRequests, atomicReadOnlyExecuting) + } + postStartExecute.Done() + postStartExecute.Wait() + } + + postEnqueueFunc := func() { + preEnqueue.Done() + preEnqueue.Wait() + if int(atomicReadOnlyWaiting) != concurrentRequests { + t.Errorf("Wanted %d requests in queue, got %d", 1, atomicReadOnlyWaiting) + + } + postEnqueue.Done() + postEnqueue.Wait() + } + + postDequeueFunc := func() { + preDequeue.Done() + preDequeue.Wait() + if atomicReadOnlyWaiting != 0 { + t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting) + } + postDequeue.Done() + postDequeue.Wait() + } + + postExecuteFunc := func() { + finishExecute.Done() + finishExecute.Wait() + } + + server := newApfServerWithHooks(decisionQueuingExecute, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc, t) + defer server.Close() + + for i := 0; i < concurrentRequests; i++ { + var err error + go func() { + err = expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK) + }() + if err != nil { + t.Error(err) + } + } + + checkForExpectedMetricsWithRetry(t, []string{ + "apiserver_current_inflight_requests", + "apiserver_current_inqueue_requests", + "apiserver_flowcontrol_read_vs_write_request_count_watermarks", + "apiserver_flowcontrol_read_vs_write_request_count_samples", + }) +} + func TestApfCancelWaitRequest(t *testing.T) { epmetrics.Register() - server := newApfServer(decisionCancelWait, t) + server := newApfServerWithSingleRequest(decisionCancelWait, t) defer server.Close() if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil { @@ -232,7 +322,6 @@ func checkForExpectedMetricsWithRetry(t *testing.T, expectedMetrics []string) { metrics := map[string]interface{}{} for _, mf := range metricsFamily { - mf := mf metrics[*mf.Name] = mf }