diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 000c52e8b7c..b98dd800a16 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -544,10 +544,22 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { defer drainedCh.Signal() defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", drainedCh.Name()) - // wait for the delayed stopCh before closing the handler chain (it rejects everything after Wait has been called). + // wait for the delayed stopCh before closing the handler chain <-delayedStopCh.Signaled() // Wait for all requests to finish, which are bounded by the RequestTimeout variable. + // once HandlerChainWaitGroup.Wait is invoked, the apiserver is + // expected to reject any incoming request with a {503, Retry-After} + // response via the WithWaitGroup filter. On the contrary, we observe + // that incoming request(s) get a 'connection refused' error, this is + // because, at this point, we have called 'Server.Shutdown' and + // net/http server has stopped listening. This causes incoming + // request to get a 'connection refused' error. + // On the other hand, if 'ShutdownSendRetryAfter' is enabled incoming + // requests will be rejected with a {429, Retry-After} since + // 'Server.Shutdown' will be invoked only after in-flight requests + // have been drained. + // TODO: can we consolidate these two modes of graceful termination? s.HandlerChainWaitGroup.Wait() }() diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go index ecadbf93828..52495a13e8e 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go @@ -29,13 +29,16 @@ import ( "os" "reflect" "sync" + "syscall" "testing" "time" + utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/wait" - genericapifilters "k8s.io/apiserver/pkg/endpoints/filters" + auditinternal "k8s.io/apiserver/pkg/apis/audit" + "k8s.io/apiserver/pkg/audit" + "k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/server/dynamiccertificates" - genericfilters "k8s.io/apiserver/pkg/server/filters" "k8s.io/klog/v2" "github.com/google/go-cmp/cmp" @@ -62,112 +65,131 @@ type result struct { // wrap a lifecycleSignal so the test can inject its own callback type wrappedLifecycleSignal struct { lifecycleSignal - callback func(bool, string, lifecycleSignal) + before func(lifecycleSignal) + after func(lifecycleSignal) } func (w *wrappedLifecycleSignal) Signal() { - var name string - if ncw, ok := w.lifecycleSignal.(*namedChannelWrapper); ok { - name = ncw.name - } - - // the callback is invoked before and after the termination event is signaled - if w.callback != nil { - w.callback(true, name, w.lifecycleSignal) + if w.before != nil { + w.before(w.lifecycleSignal) } w.lifecycleSignal.Signal() - if w.callback != nil { - w.callback(false, name, w.lifecycleSignal) + if w.after != nil { + w.after(w.lifecycleSignal) } } -func wrapLifecycleSignals(t *testing.T, ts *lifecycleSignals, callback func(bool, string, lifecycleSignal)) { - newWrappedLifecycleSignal := func(delegated lifecycleSignal) lifecycleSignal { - return &wrappedLifecycleSignal{ - lifecycleSignal: delegated, - callback: callback, - } +func wrapLifecycleSignalsWithRecorer(t *testing.T, signals *lifecycleSignals, before func(lifecycleSignal)) { + // it's important to record the signal being fired on a 'before' callback + // to avoid flakes, since on the server the signaling of events are + // an asynchronous process. + signals.AfterShutdownDelayDuration = wrapLifecycleSignal(t, signals.AfterShutdownDelayDuration, before, nil) + signals.PreShutdownHooksStopped = wrapLifecycleSignal(t, signals.PreShutdownHooksStopped, before, nil) + signals.HTTPServerStoppedListening = wrapLifecycleSignal(t, signals.HTTPServerStoppedListening, before, nil) + signals.InFlightRequestsDrained = wrapLifecycleSignal(t, signals.InFlightRequestsDrained, before, nil) + signals.ShutdownInitiated = wrapLifecycleSignal(t, signals.ShutdownInitiated, before, nil) +} + +func wrapLifecycleSignal(t *testing.T, delegated lifecycleSignal, before, after func(_ lifecycleSignal)) lifecycleSignal { + return &wrappedLifecycleSignal{ + lifecycleSignal: delegated, + before: before, + after: after, } - - ts.AfterShutdownDelayDuration = newWrappedLifecycleSignal(ts.AfterShutdownDelayDuration) - ts.HTTPServerStoppedListening = newWrappedLifecycleSignal(ts.HTTPServerStoppedListening) - ts.InFlightRequestsDrained = newWrappedLifecycleSignal(ts.InFlightRequestsDrained) - ts.ShutdownInitiated = newWrappedLifecycleSignal(ts.ShutdownInitiated) } -type step struct { - waitCh, doneCh chan struct{} - fn func() +// the server may not wait enough time between firing two events for +// the test to execute its steps, this allows us to intercept the +// signal and execute verification steps inside the goroutine that +// is executing the test. +type signalInterceptingTestStep struct { + doneCh chan struct{} } -func (s step) done() <-chan struct{} { - close(s.waitCh) - return s.doneCh +func (ts signalInterceptingTestStep) done() <-chan struct{} { + return ts.doneCh } -func (s step) execute() { - defer close(s.doneCh) - <-s.waitCh - s.fn() +func (ts signalInterceptingTestStep) execute(fn func()) { + defer close(ts.doneCh) + fn() } -func newStep(fn func()) *step { - return &step{ - fn: fn, - waitCh: make(chan struct{}), +func newSignalInterceptingTestStep() *signalInterceptingTestStep { + return &signalInterceptingTestStep{ doneCh: make(chan struct{}), } } +// This test exercises the graceful termination scenario +// described in the following diagram +// - every vertical line is an independent timeline +// - the leftmost vertical line represents the go routine that +// is executing GenericAPIServer.Run methos +// - (signal name) indicates that the given lifecycle signal has been fired +// +// stopCh +// | +// |--------------------------------------------| +// | | +// call PreShutdownHooks (ShutdownInitiated) +// | | +// (PreShutdownHooksStopped) Sleep(ShutdownDelayDuration) +// | | +// | (AfterShutdownDelayDuration) +// | | +// | | +// |--------------------------------------------| +// | | +// | | +// | |-------------------------------------------------| +// | | | +// | close(stopHttpServerCh) HandlerChainWaitGroup.Wait() +// | | | +// | server.Shutdown(timeout=60s) | +// | | | +// | stop listener (net/http) | +// | | | +// | |-------------------------------------| | +// | | | | +// | | (HTTPServerStoppedListening) | +// | | | +// | wait up to 60s | +// | | (InFlightRequestsDrained) +// | | | +// | | | +// | stoppedCh is closed s.AuditBackend.Shutdown() +// | +// | +// <-drainedCh.Signaled() +// | +// <-stoppedCh +// | +// return nil +// func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t *testing.T) { - s := newGenericAPIServer(t, false) - - // record the termination events in the order they are signaled - var signalOrderLock sync.Mutex - signalOrderGot := make([]string, 0) - recordOrderFn := func(before bool, name string, e lifecycleSignal) { - if !before { - return - } - signalOrderLock.Lock() - defer signalOrderLock.Unlock() - signalOrderGot = append(signalOrderGot, name) - } - - // handler for a request that we want to keep in flight through to the end - inFlightRequestBlockedCh, inFlightStartedCh := make(chan result), make(chan struct{}) - inFlightRequest := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - close(inFlightStartedCh) - // this request handler blocks until we deliberately unblock it. - <-inFlightRequestBlockedCh - w.WriteHeader(http.StatusOK) - }) - s.Handler.NonGoRestfulMux.Handle("/in-flight-request-as-designed", inFlightRequest) - + fakeAudit := &fakeAudit{} + s := newGenericAPIServer(t, fakeAudit, false) connReusingClient := newClient(false) doer := setupDoer(t, s.SecureServingInfo) - var delayedStopVerificationStepExecuted bool - delayedStopVerificationStep := newStep(func() { - delayedStopVerificationStepExecuted = true - t.Log("Before ShutdownDelayDuration elapses new request(s) should be served") - resultGot := doer.Do(connReusingClient, shouldReuseConnection(t), "/echo?message=request-on-an-existing-connection-should-succeed", time.Second) - assertResponse(t, resultGot, http.StatusOK) - resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-succeed", time.Second) - assertResponse(t, resultGot, http.StatusOK) - }) - steps := func(before bool, name string, e lifecycleSignal) { - // Before AfterShutdownDelayDuration event is signaled, the test - // will send request(s) to assert on expected behavior. - if name == "AfterShutdownDelayDuration" && before { - // it unblocks the verification step and waits for it to complete - <-delayedStopVerificationStep.done() - } - } + // handler for a request that we want to keep in flight through to the end + inflightRequest := setupInFlightReuestHandler(s) - // wrap the termination signals of the GenericAPIServer so the test can inject its own callback - wrapLifecycleSignals(t, &s.lifecycleSignals, func(before bool, name string, e lifecycleSignal) { - recordOrderFn(before, name, e) - steps(before, name, e) - }) + // API calls from the pre-shutdown hook(s) must succeed up to + // the point where the HTTP server is shut down. + preShutdownHook := setupPreShutdownHookHandler(t, s, doer, newClient(true)) + + signals := &s.lifecycleSignals + recorder := &signalRecorder{} + wrapLifecycleSignalsWithRecorer(t, signals, recorder.before) + + // before the AfterShutdownDelayDuration signal is fired, we want + // the test to execute a verification step. + beforeShutdownDelayDurationStep := newSignalInterceptingTestStep() + signals.AfterShutdownDelayDuration = wrapLifecycleSignal(t, signals.AfterShutdownDelayDuration, func(_ lifecycleSignal) { + // wait for the test to execute verification steps before + // the server signals the next steps + <-beforeShutdownDelayDurationStep.done() + }, nil) // start the API server stopCh, runCompletedCh := make(chan struct{}), make(chan struct{}) @@ -177,220 +199,317 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t }() waitForAPIServerStarted(t, doer) - // step 1: fire a request that we want to keep in-flight through to the end - inFlightResultCh := make(chan result) - go func() { - resultGot := doer.Do(connReusingClient, func(httptrace.GotConnInfo) {}, "/in-flight-request-as-designed", 0) - inFlightResultCh <- resultGot - }() - select { - case <-inFlightStartedCh: - case <-time.After(5 * time.Second): - t.Fatalf("Waited for 5s for the in-flight request to reach the server") + // fire a request now so it is in-flight on the server now, and + // we will unblock it after ShutdownDelayDuration elapses + inflightRequest.launch(doer, connReusingClient) + waitForeverUntil(t, inflightRequest.startedCh, "in-flight request did not reach the server") + + // /readyz should return OK + resultGot := doer.Do(newClient(true), func(httptrace.GotConnInfo) {}, "/readyz", time.Second) + if err := assertResponseStatusCode(resultGot, http.StatusOK); err != nil { + t.Errorf("%s", err.Error()) } - // step 2: signal termination event: initiate a shutdown + // signal termination event: initiate a shutdown close(stopCh) + waitForeverUntilSignaled(t, signals.ShutdownInitiated) - // step 3: before ShutdownDelayDuration elapses new request(s) should be served successfully. - delayedStopVerificationStep.execute() - if !delayedStopVerificationStepExecuted { - t.Fatal("Expected the AfterShutdownDelayDuration verification step to execute") - } - - // step 4: wait for the HTTP Server listener to have stopped - httpServerStoppedListeningCh := s.lifecycleSignals.HTTPServerStoppedListening - select { - case <-httpServerStoppedListeningCh.Signaled(): - case <-time.After(5 * time.Second): - t.Fatal("Expected the server to signal HTTPServerStoppedListening event") - } - - // step 5: the server has stopped listening but we still have a request - // in flight, let it unblock and we expect the request to succeed. - close(inFlightRequestBlockedCh) - var inFlightResultGot result - select { - case inFlightResultGot = <-inFlightResultCh: - case <-time.After(5 * time.Second): - t.Fatal("Expected the server to send a response") - } - assertResponse(t, inFlightResultGot, http.StatusOK) - - t.Log("Waiting for the apiserver Run method to return") - select { - case <-runCompletedCh: - case <-time.After(5 * time.Second): - t.Fatal("Expected the apiserver Run method to return") - } - - lifecycleSignalOrderExpected := []string{ - string("ShutdownInitiated"), - string("AfterShutdownDelayDuration"), - string("HTTPServerStoppedListening"), - string("InFlightRequestsDrained"), - } - func() { - signalOrderLock.Lock() - defer signalOrderLock.Unlock() - if !reflect.DeepEqual(lifecycleSignalOrderExpected, signalOrderGot) { - t.Errorf("Expected order of termination event signal to match, diff: %s", cmp.Diff(lifecycleSignalOrderExpected, signalOrderGot)) - } - }() -} - -func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t *testing.T) { - s := newGenericAPIServer(t, true) - - // record the termination events in the order they are signaled - var signalOrderLock sync.Mutex - signalOrderGot := make([]string, 0) - recordOrderFn := func(before bool, name string, e lifecycleSignal) { - if !before { - return - } - signalOrderLock.Lock() - defer signalOrderLock.Unlock() - signalOrderGot = append(signalOrderGot, name) - } - - // handler for a request that we want to keep in flight through to the end - inFlightRequestBlockedCh, inFlightStartedCh := make(chan result), make(chan struct{}) - inFlightRequest := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - close(inFlightStartedCh) - // this request handler blocks until we deliberately unblock it. - <-inFlightRequestBlockedCh - w.WriteHeader(http.StatusOK) - }) - s.Handler.NonGoRestfulMux.Handle("/in-flight-request-as-designed", inFlightRequest) - - connReusingClient := newClient(false) - doer := setupDoer(t, s.SecureServingInfo) - - var delayedStopVerificationStepExecuted bool - delayedStopVerificationStep := newStep(func() { - delayedStopVerificationStepExecuted = true - t.Log("Before ShutdownDelayDuration elapses new request(s) should be served") - resultGot := doer.Do(connReusingClient, shouldReuseConnection(t), "/echo?message=request-on-an-existing-connection-should-succeed", time.Second) - assertResponse(t, resultGot, http.StatusOK) - resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-succeed", time.Second) - assertResponse(t, resultGot, http.StatusOK) - }) - steps := func(before bool, name string, e lifecycleSignal) { - // Before AfterShutdownDelayDuration event is signaled, the test - // will send request(s) to assert on expected behavior. - if name == "AfterShutdownDelayDuration" && before { - // it unblocks the verification step and waits for it to complete - <-delayedStopVerificationStep.done() - } - } - - // wrap the termination signals of the GenericAPIServer so the test can inject its own callback - wrapLifecycleSignals(t, &s.lifecycleSignals, func(before bool, name string, e lifecycleSignal) { - recordOrderFn(before, name, e) - steps(before, name, e) - }) - - // start the API server - stopCh, runCompletedCh := make(chan struct{}), make(chan struct{}) - go func() { - defer close(runCompletedCh) - s.PrepareRun().Run(stopCh) - }() - waitForAPIServerStarted(t, doer) - - // step 1: fire a request that we want to keep in-flight through to the end - inFlightResultCh := make(chan result) - go func() { - resultGot := doer.Do(connReusingClient, func(httptrace.GotConnInfo) {}, "/in-flight-request-as-designed", 0) - inFlightResultCh <- resultGot - }() - select { - case <-inFlightStartedCh: - case <-time.After(5 * time.Second): - t.Fatalf("Waited for 5s for the in-flight request to reach the server") - } - - //step 1: /readyz should return OK - resultGot := doer.Do(connReusingClient, func(httptrace.GotConnInfo) {}, "/readyz", time.Second) - assertResponse(t, resultGot, http.StatusOK) - - // step 2: signal termination event: initiate a shutdown - close(stopCh) - - // step 3: /readyz must return an error, but we need to give it some time - err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) { - resultGot := doer.Do(connReusingClient, func(httptrace.GotConnInfo) {}, "/readyz", time.Second) + // /readyz must return an error, but we need to give it some time + err := wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) { + resultGot := doer.Do(newClient(true), func(httptrace.GotConnInfo) {}, "/readyz", time.Second) // wait until we have a non 200 response if resultGot.response != nil && resultGot.response.StatusCode == http.StatusOK { return false, nil } - assertResponse(t, resultGot, http.StatusInternalServerError) + if err := assertResponseStatusCode(resultGot, http.StatusInternalServerError); err != nil { + return true, err + } return true, nil }) if err != nil { t.Errorf("Expected /readyz to return 500 status code, but got: %v", err) } - // step 4: before ShutdownDelayDuration elapses new request(s) should be served successfully. - delayedStopVerificationStep.execute() - if !delayedStopVerificationStepExecuted { - t.Fatal("Expected the AfterShutdownDelayDuration verification step to execute") + // before ShutdownDelayDuration elapses new request(s) should be served successfully. + beforeShutdownDelayDurationStep.execute(func() { + t.Log("Before ShutdownDelayDuration elapses new request(s) should be served") + resultGot := doer.Do(connReusingClient, shouldReuseConnection(t), "/echo?message=request-on-an-existing-connection-should-succeed", time.Second) + if err := assertResponseStatusCode(resultGot, http.StatusOK); err != nil { + t.Errorf("%s", err.Error()) + } + resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-succeed", time.Second) + if err := assertResponseStatusCode(resultGot, http.StatusOK); err != nil { + t.Errorf("%s", err.Error()) + } + }) + + waitForeverUntilSignaled(t, signals.AfterShutdownDelayDuration) + + // preshutdown hook has not completed yet, new incomng request should succeed + resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-succeed", time.Second) + // TODO: we expect the request to succeed with http.StatusOK, + // https://github.com/kubernetes/kubernetes/pull/110026 will fix it. + if err := assertResponseStatusCode(resultGot, http.StatusServiceUnavailable); err != nil { + t.Errorf("%s", err.Error()) } - // step 5: ShutdownDelayDuration has elapsed, all incoming requests should receive 429 - t.Log("Verify that new incoming request(s) get 429") - resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-fail-with-429", time.Second) - requestMustFailWithRetryHeader(t, resultGot, http.StatusTooManyRequests) - resultGot = doer.Do(connReusingClient, shouldReuseConnection(t), "/echo?message=request-on-an-existing-connection-should-fail-with-429", time.Second) - requestMustFailWithRetryHeader(t, resultGot, http.StatusTooManyRequests) - - // step 6: we still have a request in flight, let it unblock and we expect the request to succeed. - close(inFlightRequestBlockedCh) - var inFlightResultGot result - select { - case inFlightResultGot = <-inFlightResultCh: - case <-time.After(5 * time.Second): - t.Fatal("Expected the server to send a response") + // let the preshutdown hook issue an API call now, and then let's wait + // for it to return the result, it should succeed. + close(preShutdownHook.blockedCh) + preShutdownHookResult := <-preShutdownHook.resultCh + waitForeverUntilSignaled(t, signals.PreShutdownHooksStopped) + // TODO: the API call from the preshutdown hook is expected to pass wth + // http.StatusOK, https://github.com/kubernetes/kubernetes/pull/110026 + // will fix it. + if err := assertResponseStatusCode(preShutdownHookResult, http.StatusServiceUnavailable); err != nil { + t.Errorf("%s", err.Error()) } - assertResponse(t, inFlightResultGot, http.StatusOK) - // step 7: wait for the HTTP Server listener to have stopped - httpServerStoppedListeningCh := s.lifecycleSignals.HTTPServerStoppedListening - select { - case <-httpServerStoppedListeningCh.Signaled(): - case <-time.After(5 * time.Second): - t.Fatal("Expected the server to signal HTTPServerStoppedListening event") + waitForeverUntilSignaled(t, signals.PreShutdownHooksStopped) + waitForeverUntilSignaled(t, signals.HTTPServerStoppedListening) + + resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-fail-with-503", time.Second) + if !utilnet.IsConnectionRefused(resultGot.err) { + t.Errorf("Expected error %v, but got: %v %v", syscall.ECONNREFUSED, resultGot.err, resultGot.response) } + // even though Server.Serve() has returned, an existing connection on + // the server may take some time to be in "closing" state, the following + // poll eliminates any flake due to that delay. + if err := wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) { + result := doer.Do(connReusingClient, shouldReuseConnection(t), "/echo?message=waiting-for-the-existing-connection-to-reject-incoming-request", time.Second) + if result.response != nil { + t.Logf("Still waiting for the server to return error - response: %v", result.response) + return false, nil + } + return true, nil + }); err != nil { + t.Errorf("Expected no error, but got: %v", err) + } + + // TODO: our original intention was for any incoming request to receive a 503 + // via the WithWaitGroup filter, but, at this point, any incoming requests + // will get a 'connection refused' error since the net/http server has + // stopped listening. + resultGot = doer.Do(connReusingClient, shouldReuseConnection(t), "/echo?message=request-on-an-existing-connection-should-fail-with-error", time.Second) + if !utilnet.IsConnectionRefused(resultGot.err) { + t.Errorf("Expected error %v, but got: %v %v", syscall.ECONNREFUSED, resultGot.err, resultGot.response) + } + + // the server has stopped listening but we still have a request + // in flight, let it unblock and we expect the request to succeed. + inFlightResultGot := inflightRequest.unblockAndWaitForResult(t) + if err := assertResponseStatusCode(inFlightResultGot, http.StatusOK); err != nil { + t.Errorf("%s", err.Error()) + } + if err := assertRequestAudited(inFlightResultGot, fakeAudit); err != nil { + t.Errorf("%s", err.Error()) + } + + // all requests in flight have drained + waitForeverUntilSignaled(t, signals.InFlightRequestsDrained) + t.Log("Waiting for the apiserver Run method to return") - select { - case <-runCompletedCh: - case <-time.After(5 * time.Second): - t.Fatal("Expected the apiserver Run method to return") + waitForeverUntil(t, runCompletedCh, "the apiserver Run method did not return") + + if err := recorder.verify([]string{ + "ShutdownInitiated", + "AfterShutdownDelayDuration", + "PreShutdownHooksStopped", + "HTTPServerStoppedListening", + "InFlightRequestsDrained", + }); err != nil { + t.Errorf("%s", err.Error()) + } +} + +// This test exercises the graceful termination scenario +// described in the following diagram +// - every vertical line is an independent timeline +// - the leftmost vertical line represents the go routine that +// is executing GenericAPIServer.Run methos +// - (signal) indicates that the given lifecycle signal has been fired +// +// stopCh +// | +// |--------------------------------------------| +// | | +// call PreShutdownHooks (ShutdownInitiated) +// | | +// (PreShutdownHooksCompleted) Sleep(ShutdownDelayDuration) +// | | +// | (AfterShutdownDelayDuration) +// | | +// | | +// |--------------------------------------------| +// | | +// | | +// | HandlerChainWaitGroup.Wait() +// | | +// | (InFlightRequestsDrained) +// | | +// | | +// | |-------------------------------------| +// | | | +// | | close(stopHttpServerCh) +// | | | +// | s.AuditBackend.Shutdown() server.Shutdown(timeout=2s) +// | | +// | | +// | stop listener (net/http) +// | | +// | |-------------------------------------| +// <-drainedCh.Signaled() | | +// | wait up to 2s (HTTPServerStoppedListening) +// <-stoppedCh | +// | stoppedCh is closed +// return nil +// +func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t *testing.T) { + fakeAudit := &fakeAudit{} + s := newGenericAPIServer(t, fakeAudit, true) + connReusingClient := newClient(false) + doer := setupDoer(t, s.SecureServingInfo) + + // handler for a request that we want to keep in flight through to the end + inflightRequest := setupInFlightReuestHandler(s) + + // API calls from the pre-shutdown hook(s) must succeed up to + // the point where the HTTP server is shut down. + preShutdownHook := setupPreShutdownHookHandler(t, s, doer, newClient(true)) + + signals := &s.lifecycleSignals + recorder := &signalRecorder{} + wrapLifecycleSignalsWithRecorer(t, signals, recorder.before) + + // before the AfterShutdownDelayDuration signal is fired, we want + // the test to execute a verification step. + beforeShutdownDelayDurationStep := newSignalInterceptingTestStep() + signals.AfterShutdownDelayDuration = wrapLifecycleSignal(t, signals.AfterShutdownDelayDuration, func(_ lifecycleSignal) { + // Before AfterShutdownDelayDuration event is signaled, the test + // will send request(s) to assert on expected behavior. + <-beforeShutdownDelayDurationStep.done() + }, nil) + + // start the API server + stopCh, runCompletedCh := make(chan struct{}), make(chan struct{}) + go func() { + defer close(runCompletedCh) + s.PrepareRun().Run(stopCh) + }() + waitForAPIServerStarted(t, doer) + + // fire a request now so it is in-flight on the server now, and + // we will unblock it after ShutdownDelayDuration elapses + inflightRequest.launch(doer, connReusingClient) + waitForeverUntil(t, inflightRequest.startedCh, "in-flight request did not reach the server") + + // /readyz should return OK + resultGot := doer.Do(newClient(true), func(httptrace.GotConnInfo) {}, "/readyz", time.Second) + if err := assertResponseStatusCode(resultGot, http.StatusOK); err != nil { + t.Errorf("%s", err.Error()) } - lifecycleSignalOrderExpected := []string{ - string("ShutdownInitiated"), - string("AfterShutdownDelayDuration"), - string("InFlightRequestsDrained"), - string("HTTPServerStoppedListening"), - } - func() { - signalOrderLock.Lock() - defer signalOrderLock.Unlock() - if !reflect.DeepEqual(lifecycleSignalOrderExpected, signalOrderGot) { - t.Errorf("Expected order of termination event signal to match, diff: %s", cmp.Diff(lifecycleSignalOrderExpected, signalOrderGot)) + // signal termination event: initiate a shutdown + close(stopCh) + waitForeverUntilSignaled(t, signals.ShutdownInitiated) + + // /readyz must return an error, but we need to give it some time + err := wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) { + resultGot := doer.Do(newClient(true), func(httptrace.GotConnInfo) {}, "/readyz", time.Second) + // wait until we have a non 200 response + if resultGot.response != nil && resultGot.response.StatusCode == http.StatusOK { + return false, nil } - }() + + if err := assertResponseStatusCode(resultGot, http.StatusInternalServerError); err != nil { + return true, err + } + return true, nil + }) + if err != nil { + t.Errorf("Expected /readyz to return 500 status code, but got: %v", err) + } + + // before ShutdownDelayDuration elapses new request(s) should be served successfully. + beforeShutdownDelayDurationStep.execute(func() { + t.Log("Before ShutdownDelayDuration elapses new request(s) should be served") + resultGot := doer.Do(connReusingClient, shouldReuseConnection(t), "/echo?message=request-on-an-existing-connection-should-succeed", time.Second) + if err := assertResponseStatusCode(resultGot, http.StatusOK); err != nil { + t.Errorf("%s", err.Error()) + } + resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-succeed", time.Second) + if err := assertResponseStatusCode(resultGot, http.StatusOK); err != nil { + t.Errorf("%s", err.Error()) + } + }) + + waitForeverUntilSignaled(t, signals.AfterShutdownDelayDuration) + + // preshutdown hook has not completed yet, new incomng request should succeed + resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-succeed", time.Second) + // TODO: we expect the request to succeed with http.StatusOK, + // https://github.com/kubernetes/kubernetes/pull/110026 will fix it. + if err := assertResponseStatusCode(resultGot, http.StatusTooManyRequests); err != nil { + t.Errorf("%s", err.Error()) + } + + // let the preshutdown hook issue an API call now, and then let's wait + // for it to return the result, it should succeed. + close(preShutdownHook.blockedCh) + preShutdownHookResult := <-preShutdownHook.resultCh + waitForeverUntilSignaled(t, signals.PreShutdownHooksStopped) + // TODO: the API call from the preshutdown hook is expected to pass wth + // http.StatusOK, https://github.com/kubernetes/kubernetes/pull/110026 + // will fix it. + if err := assertResponseStatusCode(preShutdownHookResult, http.StatusTooManyRequests); err != nil { + t.Errorf("%s", err.Error()) + } + + // both AfterShutdownDelayDuration and PreShutdownHooksCompleted + // have been signaled, any incoming request should receive 429 + resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-fail-with-429", time.Second) + if err := requestMustFailWithRetryHeader(resultGot, http.StatusTooManyRequests); err != nil { + t.Errorf("%s", err.Error()) + } + resultGot = doer.Do(connReusingClient, shouldReuseConnection(t), "/echo?message=request-on-an-existing-connection-should-fail-with-429", time.Second) + if err := requestMustFailWithRetryHeader(resultGot, http.StatusTooManyRequests); err != nil { + t.Errorf("%s", err.Error()) + } + + // we still have a request in flight, let it unblock and we expect the request to succeed. + inFlightResultGot := inflightRequest.unblockAndWaitForResult(t) + if err := assertResponseStatusCode(inFlightResultGot, http.StatusOK); err != nil { + t.Errorf("%s", err.Error()) + } + if err := assertRequestAudited(inFlightResultGot, fakeAudit); err != nil { + t.Errorf("%s", err.Error()) + } + + // all requests in flight have drained + waitForeverUntilSignaled(t, signals.InFlightRequestsDrained) + waitForeverUntilSignaled(t, signals.HTTPServerStoppedListening) + + t.Log("Waiting for the apiserver Run method to return") + waitForeverUntil(t, runCompletedCh, "the apiserver Run method did not return") + + if err := recorder.verify([]string{ + "ShutdownInitiated", + "AfterShutdownDelayDuration", + "PreShutdownHooksStopped", + "InFlightRequestsDrained", + "HTTPServerStoppedListening", + }); err != nil { + t.Errorf("%s", err.Error()) + } } func TestMuxAndDiscoveryComplete(t *testing.T) { // setup testSignal1 := make(chan struct{}) testSignal2 := make(chan struct{}) - s := newGenericAPIServer(t, true) + s := newGenericAPIServer(t, &fakeAudit{}, true) s.muxAndDiscoveryCompleteSignals["TestSignal1"] = testSignal1 s.muxAndDiscoveryCompleteSignals["TestSignal2"] = testSignal2 doer := setupDoer(t, s.SecureServingInfo) @@ -429,58 +548,244 @@ func TestMuxAndDiscoveryComplete(t *testing.T) { } func TestPreShutdownHooks(t *testing.T) { - s := newGenericAPIServer(t, true) - doer := setupDoer(t, s.SecureServingInfo) - - preShutdownHookErrCh := make(chan error) - err := s.AddPreShutdownHook("test-backend", func() error { - // this pre-shutdown hook waits for the requests in flight to drain - // and then send a series of requests to the apiserver, and - // we expect these series of requests to be completed successfully - <-s.lifecycleSignals.InFlightRequestsDrained.Signaled() - - // we send 5 requests, once every second - var r result - client := newClient(true) - for i := 0; i < 5; i++ { - r = doer.Do(client, func(httptrace.GotConnInfo) {}, fmt.Sprintf("/echo?message=attempt-%d", i), 1*time.Second) - if r.err != nil { - break - } - time.Sleep(time.Second) - } - preShutdownHookErrCh <- r.err - return nil - }) - if err != nil { - t.Fatalf("Failed to add pre-shutdown hook - %v", err) + tests := []struct { + name string + server func() *GenericAPIServer + }{ + { + name: "ShutdownSendRetryAfter is disabled", + server: func() *GenericAPIServer { + return newGenericAPIServer(t, &fakeAudit{}, false) + }, + }, + { + name: "ShutdownSendRetryAfter is enabled", + server: func() *GenericAPIServer { + return newGenericAPIServer(t, &fakeAudit{}, true) + }, + }, } - // start the API server - stopCh, runCompletedCh := make(chan struct{}), make(chan struct{}) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := test.server() + doer := setupDoer(t, s.SecureServingInfo) + + // preshutdown hook should not block when sending to the error channel + preShutdownHookErrCh := make(chan error, 1) + err := s.AddPreShutdownHook("test-backend", func() error { + // this pre-shutdown hook waits for the shutdown duration to elapse, + // and then send a series of requests to the apiserver, and + // we expect these series of requests to be completed successfully + <-s.lifecycleSignals.AfterShutdownDelayDuration.Signaled() + + // we send 5 requests, one every second + var err error + client := newClient(true) + for i := 0; i < 5; i++ { + r := doer.Do(client, func(httptrace.GotConnInfo) {}, fmt.Sprintf("/echo?message=attempt-%d", i), 1*time.Second) + // TODO: this is broken, we should check the response for a status code of 200 + // https://github.com/kubernetes/kubernetes/pull/110026 fixes this issue. + if r.err != nil { + break + } + time.Sleep(time.Second) + } + preShutdownHookErrCh <- err + return nil + }) + if err != nil { + t.Fatalf("Failed to add pre-shutdown hook - %v", err) + } + + // start the API server + stopCh, runCompletedCh := make(chan struct{}), make(chan struct{}) + go func() { + defer close(runCompletedCh) + s.PrepareRun().Run(stopCh) + }() + waitForAPIServerStarted(t, doer) + + close(stopCh) + + waitForeverUntil(t, runCompletedCh, "the apiserver Run method did not return") + + select { + case err := <-preShutdownHookErrCh: + if err != nil { + t.Errorf("PreSHutdown hook can not access the API server - %v", err) + } + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("pre-shutdown hook did not complete as expected") + } + }) + } +} + +type signalRecorder struct { + lock sync.Mutex + order []string +} + +func (r *signalRecorder) before(s lifecycleSignal) { + r.lock.Lock() + defer r.lock.Unlock() + r.order = append(r.order, s.Name()) +} + +func (r *signalRecorder) verify(got []string) error { + r.lock.Lock() + defer r.lock.Unlock() + want := r.order + if !reflect.DeepEqual(want, got) { + return fmt.Errorf("Expected order of termination event signal to match, diff: %s", cmp.Diff(want, got)) + } + return nil +} + +type inFlightRequest struct { + blockedCh, startedCh chan struct{} + resultCh chan result + url string +} + +func setupInFlightReuestHandler(s *GenericAPIServer) *inFlightRequest { + inflight := &inFlightRequest{ + blockedCh: make(chan struct{}), + startedCh: make(chan struct{}), + resultCh: make(chan result), + url: "/in-flight-request-as-designed", + } + handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + close(inflight.startedCh) + // this request handler blocks until we deliberately unblock it. + <-inflight.blockedCh + w.WriteHeader(http.StatusOK) + }) + s.Handler.NonGoRestfulMux.Handle(inflight.url, handler) + return inflight +} + +func (ifr *inFlightRequest) launch(doer doer, client *http.Client) { go func() { - defer func() { - // this test has an inherent race condition when we wait for two go routines - // to finish - the Run method and the pre-shutdown hook, each running in - // its own goroutine, give it a second before unblocking the test assert - <-time.After(time.Second) - close(runCompletedCh) - }() - s.PrepareRun().Run(stopCh) + result := doer.Do(client, func(httptrace.GotConnInfo) {}, ifr.url, 0) + ifr.resultCh <- result }() - waitForAPIServerStarted(t, doer) +} - close(stopCh) +func (ifr *inFlightRequest) unblockAndWaitForResult(t *testing.T) result { + close(ifr.blockedCh) + var resultGot result select { - case err := <-preShutdownHookErrCh: - if err != nil { - t.Fatalf("PreSHutdown hook can not access the API server - %v", err) - } - case <-runCompletedCh: - t.Fatalf("API Server exited without running the PreShutdown hooks") - case <-time.After(15 * time.Second): - t.Fatalf("test timed out after 15 seconds") + case resultGot = <-ifr.resultCh: + return resultGot + case <-time.After(wait.ForeverTestTimeout): + t.Fatal("Expected the server to send a response") + } + return resultGot +} + +type preShutdownHookHandler struct { + blockedCh chan struct{} + resultCh chan result +} + +func setupPreShutdownHookHandler(t *testing.T, s *GenericAPIServer, doer doer, client *http.Client) *preShutdownHookHandler { + hook := &preShutdownHookHandler{ + blockedCh: make(chan struct{}), + resultCh: make(chan result), + } + if err := s.AddPreShutdownHook("test-preshutdown-hook", func() error { + // wait until the test commands this pre shutdown + // hook to invoke an API call. + <-hook.blockedCh + + resultGot := doer.Do(client, func(httptrace.GotConnInfo) {}, "/echo?message=request-from-pre-shutdown-hook-should-succeed", time.Second) + hook.resultCh <- resultGot + return nil + }); err != nil { + t.Fatalf("Failed to register preshutdown hook - %v", err) + } + + return hook +} + +type fakeAudit struct { + shutdownCh chan struct{} + lock sync.Mutex + audits map[string]struct{} +} + +func (a *fakeAudit) Run(stopCh <-chan struct{}) error { + a.shutdownCh = make(chan struct{}) + go func() { + defer close(a.shutdownCh) + <-stopCh + }() + return nil +} + +func (a *fakeAudit) Shutdown() { + // TODO: uncomment it in https://github.com/kubernetes/kubernetes/pull/110026 + // <-a.shutdownCh +} + +func (a *fakeAudit) String() string { + return "fake-audit" +} + +func (a *fakeAudit) ProcessEvents(events ...*auditinternal.Event) bool { + a.lock.Lock() + defer a.lock.Unlock() + if len(a.audits) == 0 { + a.audits = map[string]struct{}{} + } + for _, event := range events { + a.audits[string(event.AuditID)] = struct{}{} + } + + return true +} + +func (a *fakeAudit) requestAudited(auditID string) bool { + a.lock.Lock() + defer a.lock.Unlock() + _, exists := a.audits[auditID] + return exists +} + +func (a *fakeAudit) EvaluatePolicyRule(attrs authorizer.Attributes) audit.RequestAuditConfigWithLevel { + return audit.RequestAuditConfigWithLevel{ + Level: auditinternal.LevelMetadata, + RequestAuditConfig: audit.RequestAuditConfig{}, + } +} + +func assertRequestAudited(resultGot result, backend *fakeAudit) error { + resp := resultGot.response + if resp == nil { + return fmt.Errorf("Expected a response, but got nil") + } + auditIDGot := resp.Header.Get(auditinternal.HeaderAuditID) + if len(auditIDGot) == 0 { + return fmt.Errorf("Expected non-empty %q response header, but got: %v", auditinternal.HeaderAuditID, resp) + } + if !backend.requestAudited(auditIDGot) { + return fmt.Errorf("Expected the request to be audited: %q", auditIDGot) + } + return nil +} + +func waitForeverUntilSignaled(t *testing.T, s lifecycleSignal) { + waitForeverUntil(t, s.Signaled(), fmt.Sprintf("Expected the server to signal %s event", s.Name())) +} + +func waitForeverUntil(t *testing.T, ch <-chan struct{}, msg string) { + select { + case <-ch: + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("%s", msg) } } @@ -500,28 +805,28 @@ func shouldUseNewConnection(t *testing.T) func(httptrace.GotConnInfo) { } } -func assertResponse(t *testing.T, resultGot result, statusCodeExpected int) { +func assertResponseStatusCode(resultGot result, statusCodeExpected int) error { if resultGot.err != nil { - t.Errorf("Expected no error, but got: %v", resultGot.err) - return + return fmt.Errorf("Expected no error, but got: %v", resultGot.err) } if resultGot.response.StatusCode != statusCodeExpected { - t.Errorf("Expected Status Code: %d, but got: %d", statusCodeExpected, resultGot.response.StatusCode) + return fmt.Errorf("Expected Status Code: %d, but got: %d", statusCodeExpected, resultGot.response.StatusCode) } + return nil } -func requestMustFailWithRetryHeader(t *testing.T, resultGot result, statusCodedExpected int) { +func requestMustFailWithRetryHeader(resultGot result, statusCodedExpected int) error { if resultGot.err != nil { - t.Errorf("Expected no error, but got: %v", resultGot.err) - return + return fmt.Errorf("Expected no error, but got: %v", resultGot.err) } if statusCodedExpected != resultGot.response.StatusCode { - t.Errorf("Expected Status Code: %d, but got: %d", statusCodedExpected, resultGot.response.StatusCode) + return fmt.Errorf("Expected Status Code: %d, but got: %d", statusCodedExpected, resultGot.response.StatusCode) } retryAfterGot := resultGot.response.Header.Get("Retry-After") if retryAfterGot != "5" { - t.Errorf("Expected Retry-After Response Header, but got: %v", resultGot.response) + return fmt.Errorf("Expected Retry-After Response Header, but got: %v", resultGot.response) } + return nil } func waitForAPIServerStarted(t *testing.T, doer doer) { @@ -617,18 +922,12 @@ func newClient(useNewConnection bool) *http.Client { } } -func newGenericAPIServer(t *testing.T, keepListening bool) *GenericAPIServer { +func newGenericAPIServer(t *testing.T, fAudit *fakeAudit, keepListening bool) *GenericAPIServer { config, _ := setUp(t) config.ShutdownDelayDuration = 100 * time.Millisecond config.ShutdownSendRetryAfter = keepListening - config.BuildHandlerChainFunc = func(apiHandler http.Handler, c *Config) http.Handler { - handler := genericfilters.WithWaitGroup(apiHandler, c.LongRunningFunc, c.HandlerChainWaitGroup) - if c.ShutdownSendRetryAfter { - handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled()) - } - handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver) - return handler - } + config.AuditPolicyRuleEvaluator = fAudit + config.AuditBackend = fAudit s, err := config.Complete(nil).New("test", NewEmptyDelegate()) if err != nil {