From 1d691ddb44e8dfa54008977469201a811410f7e9 Mon Sep 17 00:00:00 2001 From: Abu Kashem Date: Mon, 21 Dec 2020 12:08:58 -0500 Subject: [PATCH] add unit test to simulate an enqued request that times out --- .../filters/priority-and-fairness_test.go | 407 +++++++++++++++--- 1 file changed, 359 insertions(+), 48 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index fa33c5dc53b..e6e2df6c518 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -22,6 +22,7 @@ import ( "net/http" "net/http/httptest" "net/url" + "os" "reflect" "strings" "sync" @@ -48,8 +49,14 @@ import ( "k8s.io/client-go/kubernetes/fake" "k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/testutil" + "k8s.io/klog/v2" ) +func TestMain(m *testing.M) { + klog.InitFlags(nil) + os.Exit(m.Run()) +} + type mockDecision int const ( @@ -344,57 +351,25 @@ func TestApfCancelWaitRequest(t *testing.T) { }) } -func TestPriorityAndFairnessWithPanicRecoverAndTimeoutFilter(t *testing.T) { +func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { fcmetrics.Register() t.Run("priority level concurrency is set to 1, request handler panics, next request should not be rejected", func(t *testing.T) { const ( - requestTimeout = time.Minute + requestTimeout = 1 * time.Minute userName = "alice" fsName = "test-fs" plName = "test-pl" serverConcurrency, plConcurrencyShares, plConcurrency = 1, 1, 1 ) - objects := newConfiguration(fsName, plName, userName, flowcontrol.LimitResponseTypeReject, plConcurrencyShares) - clientset := newClientset(t, objects...) - // this test does not rely on resync, so resync period is set to zero - factory := informers.NewSharedInformerFactory(clientset, 0) - controller := utilflowcontrol.New(factory, clientset.FlowcontrolV1beta1(), serverConcurrency, requestTimeout/4) - - stopCh, controllerCompletedCh := make(chan struct{}), make(chan struct{}) - factory.Start(stopCh) - - // wait for the informer cache to sync. - timeout, cancel := context.WithTimeout(context.TODO(), 5*time.Second) - defer cancel() - cacheSyncDone := factory.WaitForCacheSync(timeout.Done()) - if names := unsyncedInformers(cacheSyncDone); len(names) > 0 { - t.Fatalf("WaitForCacheSync did not successfully complete, resources=%#v", names) - } - - var controllerErr error - go func() { - defer close(controllerCompletedCh) - controllerErr = controller.Run(stopCh) - }() - - // make sure that apf controller syncs the priority level configuration object we are using in this test. - // read the metrics and ensure the concurrency limit for our priority level is set to the expected value. - pollErr := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) { - if err := gaugeValueMatch("apiserver_flowcontrol_request_concurrency_limit", map[string]string{"priority_level": plName}, plConcurrency); err != nil { - t.Logf("polling retry - error: %s", err) - return false, nil - } - return true, nil - }) - if pollErr != nil { - t.Fatalf("expected the apf controller to sync the priotity level configuration object: %s", "test-pl") - } + apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0) + stopCh := make(chan struct{}) + controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency) var executed bool // we will raise a panic for the first request. - firstRequestPathPanic := "/request/panic" + firstRequestPathPanic := "/request/panic-as-designed" requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { executed = true expectMatchingAPFHeaders(t, w, fsName, plName) @@ -411,32 +386,353 @@ func TestPriorityAndFairnessWithPanicRecoverAndTimeoutFilter(t *testing.T) { var err error _, err = requestGetter(firstRequestPathPanic) if !executed { - t.Errorf("expected inner handler to be executed for request: %s", firstRequestPathPanic) + t.Errorf("Expected inner handler to be executed for request: %q", firstRequestPathPanic) } expectResetStreamError(t, err) executed = false // the second request should be served successfully. - secondRequestPathShouldWork := "/request/should-work" + secondRequestPathShouldWork := "/request/should-succeed-as-expected" response, err := requestGetter(secondRequestPathShouldWork) if !executed { - t.Errorf("expected inner handler to be executed for request: %s", secondRequestPathShouldWork) + t.Errorf("Expected inner handler to be executed for request: %s", secondRequestPathShouldWork) } if err != nil { - t.Errorf("expected request: %s to succeed, but got error: %#v", secondRequestPathShouldWork, err) + t.Fatalf("Expected request: %q to get a response, but got error: %#v", secondRequestPathShouldWork, err) } if response.StatusCode != http.StatusOK { - t.Errorf("expected HTTP status code: %d for request: %s, but got: %#v", http.StatusOK, secondRequestPathShouldWork, response) + t.Errorf("Expected HTTP status code: %d for request: %q, but got: %#v", http.StatusOK, secondRequestPathShouldWork, response) } close(stopCh) - t.Log("waiting for the controller to shutdown") - <-controllerCompletedCh + t.Log("Waiting for the controller to shutdown") + controllerErr := <-controllerCompletedCh if controllerErr != nil { - t.Errorf("expected a nil error from controller, but got: %#v", controllerErr) + t.Errorf("Expected no error from the controller, but got: %#v", controllerErr) } }) + + t.Run("priority level concurrency is set to 1, request times out and inner handler hasn't written to the response yet", func(t *testing.T) { + const ( + requestTimeout = 3 * time.Second + userName = "alice" + fsName = "test-fs" + plName = "test-pl" + serverConcurrency, plConcurrencyShares, plConcurrency = 1, 1, 1 + ) + + apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0) + stopCh := make(chan struct{}) + controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency) + + var executed bool + rquestTimesOutPath := "/request/time-out-as-designed" + reqHandlerCompletedCh, callerRoundTripDoneCh := make(chan struct{}), make(chan struct{}) + requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + executed = true + expectMatchingAPFHeaders(t, w, fsName, plName) + + if r.URL.Path == rquestTimesOutPath { + defer close(reqHandlerCompletedCh) + + // this will force the request to time out. + <-callerRoundTripDoneCh + } + }) + handler := newHandlerChain(t, requestHandler, controller, userName, requestTimeout) + + server, requestGetter := newHTTP2ServerWithClient(handler) + defer server.Close() + + var ( + response *http.Response + err error + ) + func() { + defer close(callerRoundTripDoneCh) + + t.Logf("Waiting for the request: %q to time out", rquestTimesOutPath) + response, err = requestGetter(rquestTimesOutPath) + }() + + if !executed { + t.Errorf("Expected inner handler to be executed for request: %q", rquestTimesOutPath) + } + t.Logf("Waiting for the inner handler of the request: %q to complete", rquestTimesOutPath) + <-reqHandlerCompletedCh + + if err != nil { + t.Fatalf("Expected request: %q to get a response, but got error: %#v", rquestTimesOutPath, err) + } + if response.StatusCode != http.StatusGatewayTimeout { + t.Errorf("Expected HTTP status code: %d for request: %q, but got: %#v", http.StatusGatewayTimeout, rquestTimesOutPath, response) + } + + close(stopCh) + t.Log("Waiting for the controller to shutdown") + + controllerErr := <-controllerCompletedCh + if controllerErr != nil { + t.Errorf("Expected no error from the controller, but got: %#v", controllerErr) + } + }) + + t.Run("priority level concurrency is set to 1, inner handler panics after the request times out", func(t *testing.T) { + const ( + requestTimeout = 3 * time.Second + userName = "alice" + fsName = "test-fs" + plName = "test-pl" + serverConcurrency, plConcurrencyShares, plConcurrency = 1, 1, 1 + ) + + apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0) + stopCh := make(chan struct{}) + controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency) + + reqHandlerCompletedCh, callerRoundTripDoneCh := make(chan struct{}), make(chan struct{}) + rquestTimesOutPath := "/request/time-out-as-designed" + requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + expectMatchingAPFHeaders(t, w, fsName, plName) + + if r.URL.Path == rquestTimesOutPath { + defer close(reqHandlerCompletedCh) + <-callerRoundTripDoneCh + + // we expect the timeout handler to have timed out this request by now and any attempt + // to write to the response should return a http.ErrHandlerTimeout error. + if _, err := w.Write([]byte("foo")); err != http.ErrHandlerTimeout { + t.Fatalf("Expected error: %#v, but got: %#v", http.ErrHandlerTimeout, err) + } + + panic(http.ErrAbortHandler) + } + }) + handler := newHandlerChain(t, requestHandler, controller, userName, requestTimeout) + + server, requestGetter := newHTTP2ServerWithClient(handler) + defer server.Close() + + var ( + response *http.Response + err error + ) + func() { + defer close(callerRoundTripDoneCh) + t.Logf("Waiting for the request: %q to time out", rquestTimesOutPath) + response, err = requestGetter(rquestTimesOutPath) + }() + + t.Logf("Waiting for the inner handler of the request: %q to complete", rquestTimesOutPath) + <-reqHandlerCompletedCh + + if err != nil { + t.Fatalf("Expected request: %q to get a response, but got error: %#v", rquestTimesOutPath, err) + } + if response.StatusCode != http.StatusGatewayTimeout { + t.Errorf("Expected HTTP status code: %d for request: %q, but got: %#v", http.StatusGatewayTimeout, rquestTimesOutPath, response) + } + + close(stopCh) + t.Log("Waiting for the controller to shutdown") + + controllerErr := <-controllerCompletedCh + if controllerErr != nil { + t.Errorf("Expected no error from the controller, but got: %#v", controllerErr) + } + }) + + t.Run("priority level concurrency is set to 1, inner handler writes to the response before request times out", func(t *testing.T) { + const ( + requestTimeout = 3 * time.Second + userName = "alice" + fsName = "test-fs" + plName = "test-pl" + serverConcurrency, plConcurrencyShares, plConcurrency = 1, 1, 1 + ) + + apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0) + stopCh := make(chan struct{}) + controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency) + + rquestTimesOutPath := "/request/time-out-as-designed" + reqHandlerCompletedCh, callerRoundTripDoneCh := make(chan struct{}), make(chan struct{}) + requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + expectMatchingAPFHeaders(t, w, fsName, plName) + + if r.URL.Path == rquestTimesOutPath { + defer close(reqHandlerCompletedCh) + + // inner handler writes header and then let the request time out. + w.WriteHeader(http.StatusBadRequest) + <-callerRoundTripDoneCh + + // we expect the timeout handler to have timed out this request by now and any attempt + // to write to the response should return a http.ErrHandlerTimeout error. + if _, err := w.Write([]byte("foo")); err != http.ErrHandlerTimeout { + t.Fatalf("Expected error: %#v, but got: %#v", http.ErrHandlerTimeout, err) + } + } + }) + handler := newHandlerChain(t, requestHandler, controller, userName, requestTimeout) + + server, requestGetter := newHTTP2ServerWithClient(handler) + defer server.Close() + + var err error + func() { + defer close(callerRoundTripDoneCh) + t.Logf("Waiting for the request: %q to time out", rquestTimesOutPath) + _, err = requestGetter(rquestTimesOutPath) + }() + + t.Logf("Waiting for the inner handler of the request: %q to complete", rquestTimesOutPath) + <-reqHandlerCompletedCh + + expectResetStreamError(t, err) + + close(stopCh) + t.Log("Waiting for the controller to shutdown") + + controllerErr := <-controllerCompletedCh + if controllerErr != nil { + t.Errorf("Expected no error from the controller, but got: %#v", controllerErr) + } + }) + + t.Run("priority level concurrency is set to 1, queue length is 1, first request should time out and second (enqueued) request should time out as well", func(t *testing.T) { + const ( + requestTimeout = 3 * time.Second + userName = "alice" + fsName = "test-fs" + plName = "test-pl" + serverConcurrency, plConcurrencyShares, plConcurrency, queueLength = 1, 1, 1, 1 + ) + + apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, queueLength) + stopCh := make(chan struct{}) + controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency) + + firstRequestTimesOutPath := "/request/first/time-out-as-designed" + secondRequestEnqueuedPath := "/request/second/enqueued-as-designed" + firstReqHandlerCompletedCh, firstReqInProgressCh := make(chan struct{}), make(chan struct{}) + firstReqRoundTripDoneCh, secondReqRoundTripDoneCh := make(chan struct{}), make(chan struct{}) + requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + expectMatchingAPFHeaders(t, w, fsName, plName) + + if r.URL.Path == firstRequestTimesOutPath { + defer close(firstReqHandlerCompletedCh) + + close(firstReqInProgressCh) + <-firstReqRoundTripDoneCh + + // make sure we wait until the caller of the second request returns, this is to + // ensure that second request never has a chance to be executed (to avoid flakes) + <-secondReqRoundTripDoneCh + + // we expect the timeout handler to have timed out this request by now and any attempt + // to write to the response should return a http.ErrHandlerTimeout error. + if _, err := w.Write([]byte("foo")); err != http.ErrHandlerTimeout { + t.Fatalf("Expected error: %#v, but got: %#v", http.ErrHandlerTimeout, err) + } + return + } + + if r.URL.Path == secondRequestEnqueuedPath { + // we expect the concurrency to be set to 1 and so this request should never be executed. + t.Fatalf("Expected request to be enqueued: %q", secondRequestEnqueuedPath) + } + }) + handler := newHandlerChain(t, requestHandler, controller, userName, requestTimeout) + + server, requestGetter := newHTTP2ServerWithClient(handler) + defer server.Close() + + var firstReqErr, secondReqErr error + var resp1, resp2 *http.Response + go func() { + defer close(firstReqRoundTripDoneCh) + t.Logf("Waiting for the request: %q to time out", firstRequestTimesOutPath) + resp1, firstReqErr = requestGetter(firstRequestTimesOutPath) + }() + func() { + defer close(secondReqRoundTripDoneCh) + + // we must wait for the "first" request to start executing first + <-firstReqInProgressCh + resp2, secondReqErr = requestGetter(secondRequestEnqueuedPath) + }() + + <-firstReqRoundTripDoneCh + + t.Logf("Waiting for the inner handler of the request: %q to complete", firstRequestTimesOutPath) + <-firstReqHandlerCompletedCh + + // first request is expected to time out. + if firstReqErr != nil { + t.Fatalf("Expected request: %q to get a response, but got error: %#v", firstRequestTimesOutPath, firstReqErr) + } + if resp1.StatusCode != http.StatusGatewayTimeout { + t.Errorf("Expected HTTP status code: %d for request: %q, but got: %#v", http.StatusGatewayTimeout, firstRequestTimesOutPath, resp1) + } + + // second request is expected to either be rejected (ideal behavior) or time out (current approximation of the ideal behavior) + if secondReqErr != nil { + t.Fatalf("Expected request: %q to get a response, but got error: %#v", secondRequestEnqueuedPath, secondReqErr) + } + if !(resp2.StatusCode == http.StatusTooManyRequests || resp2.StatusCode == http.StatusGatewayTimeout) { + t.Errorf("Expected HTTP status code: %d or %d for request: %q, but got: %#v", http.StatusTooManyRequests, http.StatusGatewayTimeout, secondRequestEnqueuedPath, resp2) + } + + close(stopCh) + t.Log("Waiting for the controller to shutdown") + + controllerErr := <-controllerCompletedCh + if controllerErr != nil { + t.Errorf("Expected no error from the controller, but got: %#v", controllerErr) + } + }) +} + +func startAPFController(t *testing.T, stopCh <-chan struct{}, apfConfiguration []runtime.Object, serverConcurrency int, + requestWaitLimit time.Duration, plName string, plConcurrency int) (utilflowcontrol.Interface, <-chan error) { + clientset := newClientset(t, apfConfiguration...) + // this test does not rely on resync, so resync period is set to zero + factory := informers.NewSharedInformerFactory(clientset, 0) + controller := utilflowcontrol.New(factory, clientset.FlowcontrolV1beta1(), serverConcurrency, requestWaitLimit) + + factory.Start(stopCh) + + // wait for the informer cache to sync. + timeout, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + cacheSyncDone := factory.WaitForCacheSync(timeout.Done()) + if names := unsyncedInformers(cacheSyncDone); len(names) > 0 { + t.Fatalf("WaitForCacheSync did not successfully complete, resources=%#v", names) + } + + controllerCompletedCh := make(chan error) + var controllerErr error + go func() { + controllerErr = controller.Run(stopCh) + controllerCompletedCh <- controllerErr + }() + + // make sure that apf controller syncs the priority level configuration object we are using in this test. + // read the metrics and ensure the concurrency limit for our priority level is set to the expected value. + pollErr := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) { + if err := gaugeValueMatch("apiserver_flowcontrol_request_concurrency_limit", map[string]string{"priority_level": plName}, plConcurrency); err != nil { + t.Logf("polling retry - error: %s", err) + return false, nil + } + return true, nil + }) + if pollErr != nil { + t.Fatalf("expected the apf controller to sync the priotity level configuration object: %s", plName) + } + + return controller, controllerCompletedCh } // returns a started http2 server, with a client function to send request to the server. @@ -510,6 +806,8 @@ func newHandlerChain(t *testing.T, handler http.Handler, filter utilflowcontrol. }) handler = WithTimeoutForNonLongRunningRequests(handler, longRunningRequestCheck) + // we don't have any request with invalid timeout, so leaving audit policy and sink nil. + handler = apifilters.WithRequestDeadline(handler, nil, nil, longRunningRequestCheck, nil, requestTimeout) handler = apifilters.WithRequestInfo(handler, requestInfoFactory) handler = WithPanicRecovery(handler, requestInfoFactory) return handler @@ -527,7 +825,7 @@ func unsyncedInformers(status map[reflect.Type]bool) []string { return names } -func newConfiguration(fsName, plName, user string, responseType flowcontrol.LimitResponseType, concurrency int32) []runtime.Object { +func newConfiguration(fsName, plName, user string, concurrency int32, queueLength int32) []runtime.Object { fs := &flowcontrol.FlowSchema{ ObjectMeta: metav1.ObjectMeta{ Name: fsName, @@ -562,6 +860,18 @@ func newConfiguration(fsName, plName, user string, responseType flowcontrol.Limi }, } + var ( + responseType flowcontrol.LimitResponseType = flowcontrol.LimitResponseTypeReject + qcfg *flowcontrol.QueuingConfiguration + ) + if queueLength > 0 { + responseType = flowcontrol.LimitResponseTypeQueue + qcfg = &flowcontrol.QueuingConfiguration{ + Queues: 1, + QueueLengthLimit: queueLength, + HandSize: 1, + } + } pl := &flowcontrol.PriorityLevelConfiguration{ ObjectMeta: metav1.ObjectMeta{ Name: plName, @@ -572,7 +882,8 @@ func newConfiguration(fsName, plName, user string, responseType flowcontrol.Limi Limited: &flowcontrol.LimitedPriorityLevelConfiguration{ AssuredConcurrencyShares: concurrency, LimitResponse: flowcontrol.LimitResponse{ - Type: responseType, + Type: responseType, + Queuing: qcfg, }, }, },