diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index e459e75de0b..8491188e5b1 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -225,6 +225,11 @@ type Config struct { // RequestWidthEstimator is used to estimate the "width" of the incoming request(s). RequestWidthEstimator flowcontrolrequest.WidthEstimatorFunc + // terminationSignals provides access to the various shutdown signals + // that happen during the graceful termination of the apiserver. + // it's intentionally marked private as it should never be overridden. + terminationSignals terminationSignals + //=========================================================================== // values below here are targets for removal //=========================================================================== @@ -349,6 +354,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config { // Generic API servers have no inherent long-running subresources LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()), RequestWidthEstimator: flowcontrolrequest.DefaultWidthEstimator, + terminationSignals: newTerminationSignals(), APIServerID: id, StorageVersionManager: storageversion.NewDefaultManager(), @@ -595,7 +601,6 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G healthzChecks: c.HealthzChecks, livezChecks: c.LivezChecks, readyzChecks: c.ReadyzChecks, - readinessStopCh: make(chan struct{}), livezGracePeriod: c.LivezGracePeriod, DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer), @@ -603,6 +608,8 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G maxRequestBodyBytes: c.MaxRequestBodyBytes, livezClock: clock.RealClock{}, + terminationSignals: c.terminationSignals, + APIServerID: c.APIServerID, StorageVersionManager: c.StorageVersionManager, diff --git a/staging/src/k8s.io/apiserver/pkg/server/deprecated_insecure_serving.go b/staging/src/k8s.io/apiserver/pkg/server/deprecated_insecure_serving.go index 655543a2513..1de20682afa 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/deprecated_insecure_serving.go +++ b/staging/src/k8s.io/apiserver/pkg/server/deprecated_insecure_serving.go @@ -52,7 +52,7 @@ func (s *DeprecatedInsecureServingInfo) Serve(handler http.Handler, shutdownTime } else { klog.Infof("Serving insecurely on %s", s.Listener.Addr()) } - _, err := RunServer(insecureServer, s.Listener, shutdownTimeout, stopCh) + _, _, err := RunServer(insecureServer, s.Listener, shutdownTimeout, stopCh) // NOTE: we do not handle stoppedCh returned by RunServer for graceful termination here return err } diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 20ff9363c9a..5c4cbd729e3 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -174,9 +174,6 @@ type GenericAPIServer struct { readyzChecksInstalled bool livezGracePeriod time.Duration livezClock clock.Clock - // the readiness stop channel is used to signal that the apiserver has initiated a shutdown sequence, this - // will cause readyz to return unhealthy. - readinessStopCh chan struct{} // auditing. The backend is started after the server starts listening. AuditBackend audit.Backend @@ -213,6 +210,10 @@ type GenericAPIServer struct { // Version will enable the /version endpoint if non-nil Version *version.Info + + // terminationSignals provides access to the various termination + // signals that happen during the shutdown period of the apiserver. + terminationSignals terminationSignals } // DelegationTarget is an interface which allows for composition of API servers with top level handling that works @@ -307,7 +308,10 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer { s.installHealthz() s.installLivez() - err := s.addReadyzShutdownCheck(s.readinessStopCh) + + // as soon as shutdown is initiated, readiness should start failing + readinessStopCh := s.terminationSignals.ShutdownInitiated.Signaled() + err := s.addReadyzShutdownCheck(readinessStopCh) if err != nil { klog.Errorf("Failed to install readyz shutdown check %s", err) } @@ -330,38 +334,45 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer { // Run spawns the secure http server. It only returns if stopCh is closed // or the secure port cannot be listened on initially. func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { - delayedStopCh := make(chan struct{}) + delayedStopCh := s.terminationSignals.AfterShutdownDelayDuration + shutdownInitiatedCh := s.terminationSignals.ShutdownInitiated go func() { - defer close(delayedStopCh) + defer delayedStopCh.Signal() <-stopCh // As soon as shutdown is initiated, /readyz should start returning failure. // This gives the load balancer a window defined by ShutdownDelayDuration to detect that /readyz is red // and stop sending traffic to this server. - close(s.readinessStopCh) + shutdownInitiatedCh.Signal() time.Sleep(s.ShutdownDelayDuration) }() // close socket after delayed stopCh - stoppedCh, err := s.NonBlockingRun(delayedStopCh) + stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(delayedStopCh.Signaled()) if err != nil { return err } - - drainedCh := make(chan struct{}) + httpServerStoppedListeningCh := s.terminationSignals.HTTPServerStoppedListening go func() { - defer close(drainedCh) + <-listenerStoppedCh + httpServerStoppedListeningCh.Signal() + }() + + drainedCh := s.terminationSignals.InFlightRequestsDrained + go func() { + defer drainedCh.Signal() // wait for the delayed stopCh before closing the handler chain (it rejects everything after Wait has been called). - <-delayedStopCh + <-delayedStopCh.Signaled() // Wait for all requests to finish, which are bounded by the RequestTimeout variable. s.HandlerChainWaitGroup.Wait() }() + klog.V(1).Info("[graceful-termination] waiting for shutdown to be initiated") <-stopCh // run shutdown hooks directly. This includes deregistering from the kubernetes endpoint in case of kube-apiserver. @@ -369,19 +380,21 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { if err != nil { return err } + klog.V(1).Info("[graceful-termination] RunPreShutdownHooks has completed") // Wait for all requests in flight to drain, bounded by the RequestTimeout variable. - <-drainedCh + <-drainedCh.Signaled() // wait for stoppedCh that is closed when the graceful termination (server.Shutdown) is finished. <-stoppedCh + klog.V(1).Info("[graceful-termination] apiserver is exiting") return nil } // NonBlockingRun spawns the secure http server. An error is // returned if the secure port cannot be listened on. // The returned channel is closed when the (asynchronous) termination is finished. -func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan struct{}, error) { +func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan struct{}, <-chan struct{}, error) { // Use an stop channel to allow graceful shutdown without dropping audit events // after http server shutdown. auditStopCh := make(chan struct{}) @@ -390,20 +403,22 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan // before http server start serving. Otherwise the Backend.ProcessEvents call might block. if s.AuditBackend != nil { if err := s.AuditBackend.Run(auditStopCh); err != nil { - return nil, fmt.Errorf("failed to run the audit backend: %v", err) + return nil, nil, fmt.Errorf("failed to run the audit backend: %v", err) } } // Use an internal stop channel to allow cleanup of the listeners on error. internalStopCh := make(chan struct{}) var stoppedCh <-chan struct{} + var listenerStoppedCh <-chan struct{} if s.SecureServingInfo != nil && s.Handler != nil { var err error - stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh) + klog.V(1).Infof("[graceful-termination] ShutdownTimeout=%s", s.ShutdownTimeout) + stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.ServeWithListenerStopped(s.Handler, s.ShutdownTimeout, internalStopCh) if err != nil { close(internalStopCh) close(auditStopCh) - return nil, err + return nil, nil, err } } @@ -426,7 +441,7 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err) } - return stoppedCh, nil + return stoppedCh, listenerStoppedCh, nil } // installAPIResources is a private method for installing the REST storage backing each api groupversionresource diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go new file mode 100644 index 00000000000..7c30add2203 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go @@ -0,0 +1,404 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "io/ioutil" + "log" + "net" + "net/http" + "net/http/httptrace" + "reflect" + "sync" + "testing" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + genericapifilters "k8s.io/apiserver/pkg/endpoints/filters" + "k8s.io/apiserver/pkg/server/dynamiccertificates" + genericfilters "k8s.io/apiserver/pkg/server/filters" + + "github.com/google/go-cmp/cmp" + "golang.org/x/net/http2" +) + +// doer sends a request to the server +type doer func(client *http.Client, gci func(httptrace.GotConnInfo), path string, timeout time.Duration) result + +func (d doer) Do(client *http.Client, gci func(httptrace.GotConnInfo), path string, timeout time.Duration) result { + return d(client, gci, path, timeout) +} + +type result struct { + err error + response *http.Response +} + +// wrap a terminationSignal so the test can inject its own callback +type wrappedTerminationSignal struct { + terminationSignal + callback func(bool, string, terminationSignal) +} + +func (w *wrappedTerminationSignal) Signal() { + var name string + if ncw, ok := w.terminationSignal.(*namedChannelWrapper); ok { + name = ncw.name + } + + // the callback is invoked before and after the termination event is signaled + if w.callback != nil { + w.callback(true, name, w.terminationSignal) + } + w.terminationSignal.Signal() + if w.callback != nil { + w.callback(false, name, w.terminationSignal) + } +} + +func wrapTerminationSignals(t *testing.T, ts *terminationSignals, callback func(bool, string, terminationSignal)) { + newWrappedTerminationSignal := func(delegated terminationSignal) terminationSignal { + return &wrappedTerminationSignal{ + terminationSignal: delegated, + callback: callback, + } + } + + ts.AfterShutdownDelayDuration = newWrappedTerminationSignal(ts.AfterShutdownDelayDuration) + ts.HTTPServerStoppedListening = newWrappedTerminationSignal(ts.HTTPServerStoppedListening) + ts.InFlightRequestsDrained = newWrappedTerminationSignal(ts.InFlightRequestsDrained) + ts.ShutdownInitiated = newWrappedTerminationSignal(ts.ShutdownInitiated) +} + +type step struct { + waitCh, doneCh chan struct{} + fn func() +} + +func (s step) done() <-chan struct{} { + close(s.waitCh) + return s.doneCh +} +func (s step) execute() { + defer close(s.doneCh) + <-s.waitCh + s.fn() +} +func newStep(fn func()) *step { + return &step{ + fn: fn, + waitCh: make(chan struct{}), + doneCh: make(chan struct{}), + } +} + +func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t *testing.T) { + s := newGenericAPIServer(t) + + // record the termination events in the order they are signaled + var signalOrderLock sync.Mutex + signalOrderGot := make([]string, 0) + recordOrderFn := func(before bool, name string, e terminationSignal) { + if !before { + return + } + signalOrderLock.Lock() + defer signalOrderLock.Unlock() + signalOrderGot = append(signalOrderGot, name) + } + + // handler for a request that we want to keep in flight through to the end + inFlightRequestBlockedCh, inFlightStartedCh := make(chan result), make(chan struct{}) + inFlightRequest := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + close(inFlightStartedCh) + // this request handler blocks until we deliberately unblock it. + <-inFlightRequestBlockedCh + w.WriteHeader(http.StatusOK) + }) + s.Handler.NonGoRestfulMux.Handle("/in-flight-request-as-designed", inFlightRequest) + + connReusingClient := newClient(false) + doer := setupDoer(t, s.SecureServingInfo) + + var delayedStopVerificationStepExecuted bool + delayedStopVerificationStep := newStep(func() { + delayedStopVerificationStepExecuted = true + t.Log("Before ShutdownDelayDuration elapses new request(s) should be served") + resultGot := doer.Do(connReusingClient, shouldReuseConnection(t), "/echo?message=request-on-an-existing-connection-should-succeed", time.Second) + requestMustSucceed(t, resultGot) + resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-succeed", time.Second) + requestMustSucceed(t, resultGot) + }) + steps := func(before bool, name string, e terminationSignal) { + // Before AfterShutdownDelayDuration event is signaled, the test + // will send request(s) to assert on expected behavior. + if name == "AfterShutdownDelayDuration" && before { + // it unblocks the verification step and waits for it to complete + <-delayedStopVerificationStep.done() + } + } + + // wrap the termination signals of the GenericAPIServer so the test can inject its own callback + wrapTerminationSignals(t, &s.terminationSignals, func(before bool, name string, e terminationSignal) { + recordOrderFn(before, name, e) + steps(before, name, e) + }) + + // start the API server + stopCh, runCompletedCh := make(chan struct{}), make(chan struct{}) + go func() { + defer close(runCompletedCh) + s.PrepareRun().Run(stopCh) + }() + waitForAPIServerStarted(t, doer) + + // step 1: fire a request that we want to keep in-flight through to the end + inFlightResultCh := make(chan result) + go func() { + resultGot := doer.Do(connReusingClient, func(httptrace.GotConnInfo) {}, "/in-flight-request-as-designed", 0) + inFlightResultCh <- resultGot + }() + select { + case <-inFlightStartedCh: + case <-time.After(5 * time.Second): + t.Fatalf("Waited for 5s for the in-flight request to reach the server") + } + + // step 2: signal termination event: initiate a shutdown + close(stopCh) + + // step 3: before ShutdownDelayDuration elapses new request(s) should be served successfully. + delayedStopVerificationStep.execute() + if !delayedStopVerificationStepExecuted { + t.Fatal("Expected the AfterShutdownDelayDuration verification step to execute") + } + + // step 4: wait for the HTTP Server listener to have stopped + httpServerStoppedListeningCh := s.terminationSignals.HTTPServerStoppedListening + select { + case <-httpServerStoppedListeningCh.Signaled(): + case <-time.After(5 * time.Second): + t.Fatal("Expected the server to signal HTTPServerStoppedListening event") + } + + // step 5: the server has stopped listening but we still have a request + // in flight, let it unblock and we expect the request to succeed. + close(inFlightRequestBlockedCh) + var inFlightResultGot result + select { + case inFlightResultGot = <-inFlightResultCh: + case <-time.After(5 * time.Second): + t.Fatal("Expected the server to send a response") + } + requestMustSucceed(t, inFlightResultGot) + + t.Log("Waiting for the apiserver Run method to return") + select { + case <-runCompletedCh: + case <-time.After(5 * time.Second): + t.Fatal("Expected the apiserver Run method to return") + } + + terminationSignalOrderExpected := []string{ + string("ShutdownInitiated"), + string("AfterShutdownDelayDuration"), + string("HTTPServerStoppedListening"), + string("InFlightRequestsDrained"), + } + func() { + signalOrderLock.Lock() + defer signalOrderLock.Unlock() + if !reflect.DeepEqual(terminationSignalOrderExpected, signalOrderGot) { + t.Errorf("Expected order of termination event signal to match, diff: %s", cmp.Diff(terminationSignalOrderExpected, signalOrderGot)) + } + }() +} + +func shouldReuseConnection(t *testing.T) func(httptrace.GotConnInfo) { + return func(ci httptrace.GotConnInfo) { + if !ci.Reused { + t.Errorf("Expected the request to use an existing TCP connection, but got: %+v", ci) + } + } +} + +func shouldUseNewConnection(t *testing.T) func(httptrace.GotConnInfo) { + return func(ci httptrace.GotConnInfo) { + if ci.Reused { + t.Errorf("Expected the request to use a new TCP connection, but got: %+v", ci) + } + } +} + +func requestMustSucceed(t *testing.T, resultGot result) { + if resultGot.err != nil { + t.Errorf("Expected no error, but got: %v", resultGot.err) + return + } + if resultGot.response.StatusCode != http.StatusOK { + t.Errorf("Expected Status Code: %d, but got: %d", http.StatusOK, resultGot.response.StatusCode) + } +} + +func waitForAPIServerStarted(t *testing.T, doer doer) { + client := newClient(true) + i := 1 + err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) { + result := doer.Do(client, func(httptrace.GotConnInfo) {}, fmt.Sprintf("/echo?message=attempt-%d", i), 100*time.Millisecond) + i++ + + if result.err != nil { + t.Logf("Still waiting for the server to start - err: %v", err) + return false, nil + } + if result.response.StatusCode != http.StatusOK { + t.Logf("Still waiting for the server to start - expecting: %d, but got: %v", http.StatusOK, result.response) + return false, nil + } + + t.Log("The API server has started") + return true, nil + }) + + if err != nil { + t.Fatalf("The server has failed to start - err: %v", err) + } +} + +func setupDoer(t *testing.T, info *SecureServingInfo) doer { + _, port, err := info.HostPort() + if err != nil { + t.Fatalf("Expected host, port from SecureServingInfo, but got: %v", err) + } + + return func(client *http.Client, callback func(httptrace.GotConnInfo), path string, timeout time.Duration) result { + url := fmt.Sprintf("https://%s:%d%s", "127.0.0.1", port, path) + t.Logf("Sending request - timeout: %s, url: %s", timeout, url) + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return result{response: nil, err: err} + } + + // setup request timeout + var ctx context.Context + if timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(req.Context(), timeout) + defer cancel() + + req = req.WithContext(ctx) + } + + // setup trace + trace := &httptrace.ClientTrace{ + GotConn: func(connInfo httptrace.GotConnInfo) { + callback(connInfo) + }, + } + req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace)) + + response, err := client.Do(req) + // in this test, we don't depend on the body of the response, so we can + // close the Body here to ensure the underlying transport can be reused + if response != nil { + ioutil.ReadAll(response.Body) + response.Body.Close() + } + return result{ + err: err, + response: response, + } + } +} + +func newClient(useNewConnection bool) *http.Client { + clientCACertPool := x509.NewCertPool() + clientCACertPool.AppendCertsFromPEM(backendCrt) + tlsConfig := &tls.Config{ + RootCAs: clientCACertPool, + NextProtos: []string{http2.NextProtoTLS}, + } + + tr := &http.Transport{ + TLSClientConfig: tlsConfig, + DisableKeepAlives: useNewConnection, + } + if err := http2.ConfigureTransport(tr); err != nil { + log.Fatalf("Failed to configure HTTP2 transport: %v", err) + } + return &http.Client{ + Timeout: 0, + Transport: tr, + } +} + +func newGenericAPIServer(t *testing.T) *GenericAPIServer { + config, _ := setUp(t) + config.ShutdownDelayDuration = 100 * time.Millisecond + config.BuildHandlerChainFunc = func(apiHandler http.Handler, c *Config) http.Handler { + handler := genericfilters.WithWaitGroup(apiHandler, c.LongRunningFunc, c.HandlerChainWaitGroup) + handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver) + return handler + } + + s, err := config.Complete(nil).New("test", NewEmptyDelegate()) + if err != nil { + t.Fatalf("Error in bringing up the server: %v", err) + } + + ln, err := net.Listen("tcp", "0.0.0.0:0") + if err != nil { + t.Fatalf("failed to listen on %v: %v", "0.0.0.0:0", err) + } + s.SecureServingInfo = &SecureServingInfo{} + s.SecureServingInfo.Listener = &wrappedListener{ln, t} + + cert, err := dynamiccertificates.NewStaticCertKeyContent("serving-cert", backendCrt, backendKey) + if err != nil { + t.Fatalf("failed to load cert - %v", err) + } + s.SecureServingInfo.Cert = cert + + // we use this handler to send a test request to the server. + s.Handler.NonGoRestfulMux.Handle("/echo", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + t.Logf("[server] received a request, proto: %s, url: %s", req.Proto, req.RequestURI) + + w.Header().Add("echo", req.URL.Query().Get("message")) + w.WriteHeader(http.StatusOK) + })) + + return s +} + +type wrappedListener struct { + net.Listener + t *testing.T +} + +func (ln wrappedListener) Accept() (net.Conn, error) { + c, err := ln.Listener.Accept() + + if tc, ok := c.(*net.TCPConn); ok { + ln.t.Logf("[server] seen new connection: %#v", tc) + } + return c, err +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go index 3990358e4e3..7a8f2a7f90b 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go @@ -590,7 +590,7 @@ func TestGracefulShutdown(t *testing.T) { // get port serverPort := ln.Addr().(*net.TCPAddr).Port - stoppedCh, err := RunServer(insecureServer, ln, 10*time.Second, stopCh) + stoppedCh, _, err := RunServer(insecureServer, ln, 10*time.Second, stopCh) if err != nil { t.Fatalf("RunServer err: %v", err) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/graceful_termination.go b/staging/src/k8s.io/apiserver/pkg/server/graceful_termination.go new file mode 100644 index 00000000000..817ba074ebd --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/graceful_termination.go @@ -0,0 +1,136 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "k8s.io/klog/v2" +) + +/* +We make an attempt here to identify the events that take place during +the graceful shutdown of the apiserver. + +We also identify each event with a name so we can refer to it. + +Events: +- ShutdownInitiated: KILL signal received +- AfterShutdownDelayDuration: shutdown delay duration has passed +- InFlightRequestsDrained: all in flight request(s) have been drained + +The following is a sequence of shutdown events that we expect to see during termination: +T0: ShutdownInitiated: KILL signal received + - /readyz starts returning red + - run pre shutdown hooks + +T0+70s: AfterShutdownDelayDuration: shutdown delay duration has passed + - the default value of 'ShutdownDelayDuration' is '70s' + - it's time to initiate shutdown of the HTTP Server, server.Shutdown is invoked + - as a consequene, the Close function has is called for all listeners + - the HTTP Server stops listening immediately + - any new request arriving on a new TCP socket is denied with + a network error similar to 'connection refused' + - the HTTP Server waits gracefully for existing requests to complete + up to '60s' (dictated by ShutdownTimeout) + - active long running requests will receive a GOAWAY. + +T0+70s: HTTPServerStoppedListening: + - this event is signaled when the HTTP Server has stopped listening + which is immediately after server.Shutdown has been invoked + +T0 + 70s + up-to 60s: InFlightRequestsDrained: existing in flight requests have been drained + - long running requests are outside of this scope + - up-to 60s: the default value of 'ShutdownTimeout' is 60s, this means that + any request in flight has a hard timeout of 60s. + - it's time to call 'Shutdown' on the audit events since all + in flight request(s) have drained. +*/ + +// terminationSignal encapsulates a named apiserver termination event +type terminationSignal interface { + // Signal signals the event, indicating that the event has occurred. + // Signal is idempotent, once signaled the event stays signaled and + // it immediately unblocks any goroutine waiting for this event. + Signal() + + // Signaled returns a channel that is closed when the underlying termination + // event has been signaled. Successive calls to Signaled return the same value. + Signaled() <-chan struct{} +} + +// terminationSignals provides an abstraction of the termination events that +// transpire during the shutdown period of the apiserver. This abstraction makes it easy +// for us to write unit tests that can verify expected graceful termination behavior. +// +// GenericAPIServer can use these to either: +// - signal that a particular termination event has transpired +// - wait for a designated termination event to transpire and do some action. +type terminationSignals struct { + // ShutdownInitiated event is signaled when an apiserver shutdown has been initiated. + // It is signaled when the `stopCh` provided by the main goroutine + // receives a KILL signal and is closed as a consequence. + ShutdownInitiated terminationSignal + + // AfterShutdownDelayDuration event is signaled as soon as ShutdownDelayDuration + // has elapsed since the ShutdownInitiated event. + // ShutdownDelayDuration allows the apiserver to delay shutdown for some time. + AfterShutdownDelayDuration terminationSignal + + // InFlightRequestsDrained event is signaled when the existing requests + // in flight have completed. This is used as signal to shut down the audit backends + InFlightRequestsDrained terminationSignal + + // HTTPServerStoppedListening termination event is signaled when the + // HTTP Server has stopped listening to the underlying socket. + HTTPServerStoppedListening terminationSignal +} + +// newTerminationSignals returns an instance of terminationSignals interface to be used +// to coordinate graceful termination of the apiserver +func newTerminationSignals() terminationSignals { + return terminationSignals{ + ShutdownInitiated: newNamedChannelWrapper("ShutdownInitiated"), + AfterShutdownDelayDuration: newNamedChannelWrapper("AfterShutdownDelayDuration"), + InFlightRequestsDrained: newNamedChannelWrapper("InFlightRequestsDrained"), + HTTPServerStoppedListening: newNamedChannelWrapper("HTTPServerStoppedListening"), + } +} + +func newNamedChannelWrapper(name string) terminationSignal { + return &namedChannelWrapper{ + name: name, + ch: make(chan struct{}), + } +} + +type namedChannelWrapper struct { + name string + ch chan struct{} +} + +func (e *namedChannelWrapper) Signal() { + select { + case <-e.ch: + // already closed, don't close again. + default: + close(e.ch) + klog.V(1).InfoS("[graceful-termination] shutdown event", "name", e.name) + } +} + +func (e *namedChannelWrapper) Signaled() <-chan struct{} { + return e.ch +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go b/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go index 356bd3a2fe8..c706afb5f51 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go +++ b/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go @@ -192,6 +192,67 @@ func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Dur tlsErrorLogger := log.New(tlsErrorWriter, "", 0) secureServer.ErrorLog = tlsErrorLogger + klog.Infof("Serving securely on %s", secureServer.Addr) + stoppedCh, _, err := RunServer(secureServer, s.Listener, shutdownTimeout, stopCh) + return stoppedCh, err +} + +// ServeWithListenerStopped runs the secure http server. It fails only if certificates cannot be loaded or the initial listen call fails. +// The actual server loop (stoppable by closing stopCh) runs in a go routine, i.e. ServeWithListenerStopped does not block. +// It returns a stoppedCh that is closed when all non-hijacked active requests have been processed. +// It returns a listenerStoppedCh that is closed when the underlying http Server has stopped listening. +// TODO: do a follow up PR to remove this function, change 'Serve' to return listenerStoppedCh +// and update all components that call 'Serve' +func (s *SecureServingInfo) ServeWithListenerStopped(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, <-chan struct{}, error) { + if s.Listener == nil { + return nil, nil, fmt.Errorf("listener must not be nil") + } + + tlsConfig, err := s.tlsConfig(stopCh) + if err != nil { + return nil, nil, err + } + + secureServer := &http.Server{ + Addr: s.Listener.Addr().String(), + Handler: handler, + MaxHeaderBytes: 1 << 20, + TLSConfig: tlsConfig, + } + + // At least 99% of serialized resources in surveyed clusters were smaller than 256kb. + // This should be big enough to accommodate most API POST requests in a single frame, + // and small enough to allow a per connection buffer of this size multiplied by `MaxConcurrentStreams`. + const resourceBody99Percentile = 256 * 1024 + + http2Options := &http2.Server{} + + // shrink the per-stream buffer and max framesize from the 1MB default while still accommodating most API POST requests in a single frame + http2Options.MaxUploadBufferPerStream = resourceBody99Percentile + http2Options.MaxReadFrameSize = resourceBody99Percentile + + // use the overridden concurrent streams setting or make the default of 250 explicit so we can size MaxUploadBufferPerConnection appropriately + if s.HTTP2MaxStreamsPerConnection > 0 { + http2Options.MaxConcurrentStreams = uint32(s.HTTP2MaxStreamsPerConnection) + } else { + http2Options.MaxConcurrentStreams = 250 + } + + // increase the connection buffer size from the 1MB default to handle the specified number of concurrent streams + http2Options.MaxUploadBufferPerConnection = http2Options.MaxUploadBufferPerStream * int32(http2Options.MaxConcurrentStreams) + + if !s.DisableHTTP2 { + // apply settings to the server + if err := http2.ConfigureServer(secureServer, http2Options); err != nil { + return nil, nil, fmt.Errorf("error configuring http2: %v", err) + } + } + + // use tlsHandshakeErrorWriter to handle messages of tls handshake error + tlsErrorWriter := &tlsHandshakeErrorWriter{os.Stderr} + tlsErrorLogger := log.New(tlsErrorWriter, "", 0) + secureServer.ErrorLog = tlsErrorLogger + klog.Infof("Serving securely on %s", secureServer.Addr) return RunServer(secureServer, s.Listener, shutdownTimeout, stopCh) } @@ -207,15 +268,15 @@ func RunServer( ln net.Listener, shutDownTimeout time.Duration, stopCh <-chan struct{}, -) (<-chan struct{}, error) { +) (<-chan struct{}, <-chan struct{}, error) { if ln == nil { - return nil, fmt.Errorf("listener must not be nil") + return nil, nil, fmt.Errorf("listener must not be nil") } // Shutdown server gracefully. - stoppedCh := make(chan struct{}) + serverShutdownCh, listenerStoppedCh := make(chan struct{}), make(chan struct{}) go func() { - defer close(stoppedCh) + defer close(serverShutdownCh) <-stopCh ctx, cancel := context.WithTimeout(context.Background(), shutDownTimeout) server.Shutdown(ctx) @@ -224,6 +285,7 @@ func RunServer( go func() { defer utilruntime.HandleCrash() + defer close(listenerStoppedCh) var listener net.Listener listener = tcpKeepAliveListener{ln} @@ -242,7 +304,7 @@ func RunServer( } }() - return stoppedCh, nil + return serverShutdownCh, listenerStoppedCh, nil } // tcpKeepAliveListener sets TCP keep-alive timeouts on accepted