From db4f0de28075f34bb4bfa8d821ad25cd3a7eba1f Mon Sep 17 00:00:00 2001 From: hzxuzhonghu Date: Thu, 2 Nov 2017 19:29:31 +0800 Subject: [PATCH] gracefully shutdown apiserver after all non-long running requests finish --- cmd/kube-apiserver/app/server.go | 5 +- pkg/kubeapiserver/server/insecure_handler.go | 12 +-- .../src/k8s.io/apiserver/pkg/server/config.go | 8 ++ .../apiserver/pkg/server/filters/waitgroup.go | 53 +++++++++++++ .../apiserver/pkg/server/genericapiserver.go | 26 ++++++- .../pkg/server/genericapiserver_test.go | 77 +++++++++++++++++++ .../src/k8s.io/apiserver/pkg/server/serve.go | 11 ++- 7 files changed, 179 insertions(+), 13 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/server/filters/waitgroup.go diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index d83b368f5d8..5729463671b 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -156,7 +156,7 @@ func CreateServerChain(runOptions *options.ServerRunOptions, stopCh <-chan struc if len(os.Getenv("KUBE_API_VERSIONS")) > 0 { if insecureServingOptions != nil { insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(kubeAPIServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig) - if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, stopCh); err != nil { + if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil { return nil, err } } @@ -186,7 +186,7 @@ func CreateServerChain(runOptions *options.ServerRunOptions, stopCh <-chan struc if insecureServingOptions != nil { insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig) - if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, stopCh); err != nil { + if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil { return nil, err } } @@ -488,6 +488,7 @@ func BuildGenericConfig(s *options.ServerRunOptions, proxyTransport *http.Transp if err != nil { return nil, nil, nil, nil, nil, fmt.Errorf("failed to initialize admission: %v", err) } + return genericConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, nil } diff --git a/pkg/kubeapiserver/server/insecure_handler.go b/pkg/kubeapiserver/server/insecure_handler.go index 8186a0ac7ab..cc7c0a79de9 100644 --- a/pkg/kubeapiserver/server/insecure_handler.go +++ b/pkg/kubeapiserver/server/insecure_handler.go @@ -19,6 +19,7 @@ package server import ( "net" "net/http" + "time" "github.com/golang/glog" @@ -45,11 +46,12 @@ func BuildInsecureHandlerChain(apiHandler http.Handler, c *server.Config) http.H } handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, insecureSuperuser{}, nil) handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true") - handler = genericfilters.WithPanicRecovery(handler) handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.RequestContextMapper, c.LongRunningFunc, c.RequestTimeout) handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.RequestContextMapper, c.LongRunningFunc) + handler = genericfilters.WithWaitGroup(handler, c.RequestContextMapper, c.LongRunningFunc, c.HandlerChainWaitGroup) handler = genericapifilters.WithRequestInfo(handler, server.NewRequestInfoResolver(c), c.RequestContextMapper) handler = apirequest.WithRequestContext(handler, c.RequestContextMapper) + handler = genericfilters.WithPanicRecovery(handler) return handler } @@ -84,12 +86,12 @@ func (s *InsecureServingInfo) NewLoopbackClientConfig(token string) (*rest.Confi // NonBlockingRun spawns the insecure http server. An error is // returned if the ports cannot be listened on. -func NonBlockingRun(insecureServingInfo *InsecureServingInfo, insecureHandler http.Handler, stopCh <-chan struct{}) error { +func NonBlockingRun(insecureServingInfo *InsecureServingInfo, insecureHandler http.Handler, shutDownTimeout time.Duration, stopCh <-chan struct{}) error { // Use an internal stop channel to allow cleanup of the listeners on error. internalStopCh := make(chan struct{}) if insecureServingInfo != nil && insecureHandler != nil { - if err := serveInsecurely(insecureServingInfo, insecureHandler, internalStopCh); err != nil { + if err := serveInsecurely(insecureServingInfo, insecureHandler, shutDownTimeout, internalStopCh); err != nil { close(internalStopCh) return err } @@ -109,7 +111,7 @@ func NonBlockingRun(insecureServingInfo *InsecureServingInfo, insecureHandler ht // serveInsecurely run the insecure http server. It fails only if the initial listen // call fails. The actual server loop (stoppable by closing stopCh) runs in a go // routine, i.e. serveInsecurely does not block. -func serveInsecurely(insecureServingInfo *InsecureServingInfo, insecureHandler http.Handler, stopCh <-chan struct{}) error { +func serveInsecurely(insecureServingInfo *InsecureServingInfo, insecureHandler http.Handler, shutDownTimeout time.Duration, stopCh <-chan struct{}) error { insecureServer := &http.Server{ Addr: insecureServingInfo.BindAddress, Handler: insecureHandler, @@ -117,7 +119,7 @@ func serveInsecurely(insecureServingInfo *InsecureServingInfo, insecureHandler h } glog.Infof("Serving insecurely on %s", insecureServingInfo.BindAddress) var err error - _, err = server.RunServer(insecureServer, insecureServingInfo.BindNetwork, stopCh) + _, err = server.RunServer(insecureServer, insecureServingInfo.BindNetwork, shutDownTimeout, stopCh) return err } diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index e1c81c7d4a3..ac1317889eb 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -36,6 +36,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/sets" + utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup" "k8s.io/apimachinery/pkg/version" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/audit" @@ -128,6 +129,8 @@ type Config struct { // BuildHandlerChainFunc allows you to build custom handler chains by decorating the apiHandler. BuildHandlerChainFunc func(apiHandler http.Handler, c *Config) (secure http.Handler) + // HandlerChainWaitGroup allows you to wait for all chain handlers exit after the server shutdown. + HandlerChainWaitGroup *utilwaitgroup.SafeWaitGroup // DiscoveryAddresses is used to build the IPs pass to discovery. If nil, the ExternalAddress is // always reported DiscoveryAddresses discovery.Addresses @@ -236,6 +239,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config { ReadWritePort: 443, RequestContextMapper: apirequest.NewRequestContextMapper(), BuildHandlerChainFunc: DefaultBuildHandlerChain, + HandlerChainWaitGroup: new(utilwaitgroup.SafeWaitGroup), LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix), DisabledPostStartHooks: sets.NewString(), HealthzChecks: []healthz.HealthzChecker{healthz.PingHealthz}, @@ -446,8 +450,10 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G Serializer: c.Serializer, AuditBackend: c.AuditBackend, delegationTarget: delegationTarget, + HandlerChainWaitGroup: c.HandlerChainWaitGroup, minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second, + ShutdownTimeout: c.RequestTimeout, SecureServingInfo: c.SecureServingInfo, ExternalAddress: c.ExternalAddress, @@ -488,6 +494,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G return nil, err } } + for _, delegateCheck := range delegationTarget.HealthzChecks() { skip := false for _, existingCheck := range c.HealthzChecks { @@ -535,6 +542,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, c.Authenticator, failedHandler) handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true") handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.RequestContextMapper, c.LongRunningFunc, c.RequestTimeout) + handler = genericfilters.WithWaitGroup(handler, c.RequestContextMapper, c.LongRunningFunc, c.HandlerChainWaitGroup) handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver, c.RequestContextMapper) handler = apirequest.WithRequestContext(handler, c.RequestContextMapper) handler = genericfilters.WithPanicRecovery(handler) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/waitgroup.go b/staging/src/k8s.io/apiserver/pkg/server/filters/waitgroup.go new file mode 100644 index 00000000000..be73a2c9d97 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/waitgroup.go @@ -0,0 +1,53 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filters + +import ( + "net/http" + + utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup" + apirequest "k8s.io/apiserver/pkg/endpoints/request" +) + +// WithWaitGroup adds all non long-running requests to wait group, which is used for graceful shutdown. +func WithWaitGroup(handler http.Handler, requestContextMapper apirequest.RequestContextMapper, longRunning apirequest.LongRunningRequestCheck, wg *utilwaitgroup.SafeWaitGroup) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + ctx, ok := requestContextMapper.Get(req) + if !ok { + // if this happens, the handler chain isn't setup correctly because there is no context mapper + handler.ServeHTTP(w, req) + return + } + + requestInfo, ok := apirequest.RequestInfoFrom(ctx) + if !ok { + // if this happens, the handler chain isn't setup correctly because there is no request info + handler.ServeHTTP(w, req) + return + } + + if !longRunning(req, requestInfo) { + if err := wg.Add(1); err != nil { + http.Error(w, "Apisever is shutting down.", http.StatusInternalServerError) + return + } + defer wg.Done() + } + + handler.ServeHTTP(w, req) + }) +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 3109bb35667..c824719fe8d 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/sets" + utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/audit" genericapi "k8s.io/apiserver/pkg/endpoints" @@ -83,6 +84,10 @@ type GenericAPIServer struct { // minRequestTimeout is how short the request timeout can be. This is used to build the RESTHandler minRequestTimeout time.Duration + // ShutdownTimeout is the timeout used for server shutdown. This specifies the timeout before server + // gracefully shutdown returns. + ShutdownTimeout time.Duration + // legacyAPIGroupPrefixes is used to set up URL parsing for authorization and for validating requests // to InstallLegacyAPIGroup legacyAPIGroupPrefixes sets.String @@ -146,6 +151,9 @@ type GenericAPIServer struct { // delegationTarget is the next delegate in the chain or nil delegationTarget DelegationTarget + + // HandlerChainWaitGroup allows you to wait for all chain handlers finish after the server shutdown. + HandlerChainWaitGroup *utilwaitgroup.SafeWaitGroup } // DelegationTarget is an interface which allows for composition of API servers with top level handling that works @@ -275,16 +283,28 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { <-stopCh - return s.RunPreShutdownHooks() + err = s.RunPreShutdownHooks() + if err != nil { + return err + } + + // Wait for all requests to finish, which are bounded by the RequestTimeout variable. + s.HandlerChainWaitGroup.Wait() + + return nil } // NonBlockingRun spawns the secure http server. An error is // returned if the secure port cannot be listened on. func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error { + // Use an stop channel to allow graceful shutdown without dropping audit events + // after http server shutdown. + auditStopCh := make(chan struct{}) + // Start the audit backend before any request comes in. This means we must call Backend.Run // before http server start serving. Otherwise the Backend.ProcessEvents call might block. if s.AuditBackend != nil { - if err := s.AuditBackend.Run(stopCh); err != nil { + if err := s.AuditBackend.Run(auditStopCh); err != nil { return fmt.Errorf("failed to run the audit backend: %v", err) } } @@ -305,6 +325,8 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error { go func() { <-stopCh close(internalStopCh) + s.HandlerChainWaitGroup.Wait() + close(auditStopCh) }() s.RunPostStartHooks(stopCh) diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go index 54a6540fdaf..f4293fc48f8 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go @@ -25,6 +25,8 @@ import ( "net/http" "net/http/httptest" goruntime "runtime" + "strconv" + "sync" "testing" "time" @@ -44,8 +46,11 @@ import ( "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/endpoints/discovery" + genericapifilters "k8s.io/apiserver/pkg/endpoints/filters" + apirequest "k8s.io/apiserver/pkg/endpoints/request" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" + genericfilters "k8s.io/apiserver/pkg/server/filters" etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" @@ -509,3 +514,75 @@ func fakeVersion() version.Info { Platform: fmt.Sprintf("%s/%s", goruntime.GOOS, goruntime.GOARCH), } } + +// TestGracefulShutdown verifies server shutdown after request handler finish. +func TestGracefulShutdown(t *testing.T) { + etcdserver, config, _ := setUp(t) + defer etcdserver.Terminate(t) + + var graceShutdown bool + wg := sync.WaitGroup{} + wg.Add(1) + + config.BuildHandlerChainFunc = func(apiHandler http.Handler, c *Config) http.Handler { + handler := genericfilters.WithWaitGroup(apiHandler, c.RequestContextMapper, c.LongRunningFunc, c.HandlerChainWaitGroup) + handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver, c.RequestContextMapper) + handler = apirequest.WithRequestContext(handler, c.RequestContextMapper) + return handler + } + + handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + wg.Done() + time.Sleep(2 * time.Second) + w.WriteHeader(http.StatusOK) + graceShutdown = true + }) + + s, err := config.Complete(nil).New("test", EmptyDelegate) + if err != nil { + t.Fatalf("Error in bringing up the server: %v", err) + } + + s.Handler.NonGoRestfulMux.Handle("/test", handler) + + insecureServer := &http.Server{ + Addr: "0.0.0.0:0", + Handler: s.Handler, + } + stopCh := make(chan struct{}) + serverPort, err := RunServer(insecureServer, "tcp", 10*time.Second, stopCh) + if err != nil { + t.Errorf("RunServer err: %v", err) + } + + graceCh := make(chan struct{}) + // mock a client request + go func() { + resp, err := http.Get("http://127.0.0.1:" + strconv.Itoa(serverPort) + "/test") + if err != nil { + t.Errorf("Unexpected http error: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Errorf("Unexpected http status code: %v", resp.StatusCode) + } + close(graceCh) + }() + + // close stopCh after request sent to server to guarantee request handler is running. + wg.Wait() + close(stopCh) + // wait for wait group handler finish + s.HandlerChainWaitGroup.Wait() + + // check server all handlers finished. + if !graceShutdown { + t.Errorf("server shutdown not gracefully.") + } + // check client to make sure receive response. + select { + case <-graceCh: + t.Logf("server shutdown gracefully.") + case <-time.After(30 * time.Second): + t.Errorf("Timed out waiting for response.") + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/serve.go b/staging/src/k8s.io/apiserver/pkg/server/serve.go index 90f4078c753..f7d9f902305 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/serve.go +++ b/staging/src/k8s.io/apiserver/pkg/server/serve.go @@ -17,6 +17,7 @@ limitations under the License. package server import ( + "context" "crypto/tls" "crypto/x509" "fmt" @@ -84,13 +85,13 @@ func (s *GenericAPIServer) serveSecurely(stopCh <-chan struct{}) error { glog.Infof("Serving securely on %s", s.SecureServingInfo.BindAddress) var err error - s.effectiveSecurePort, err = RunServer(secureServer, s.SecureServingInfo.BindNetwork, stopCh) + s.effectiveSecurePort, err = RunServer(secureServer, s.SecureServingInfo.BindNetwork, s.ShutdownTimeout, stopCh) return err } // RunServer listens on the given port, then spawns a go-routine continuously serving // until the stopCh is closed. The port is returned. This function does not block. -func RunServer(server *http.Server, network string, stopCh <-chan struct{}) (int, error) { +func RunServer(server *http.Server, network string, shutDownTimeout time.Duration, stopCh <-chan struct{}) (int, error) { if len(server.Addr) == 0 { return 0, errors.New("address cannot be empty") } @@ -111,10 +112,12 @@ func RunServer(server *http.Server, network string, stopCh <-chan struct{}) (int return 0, fmt.Errorf("invalid listen address: %q", ln.Addr().String()) } - // Stop the server by closing the listener + // Shutdown server gracefully. go func() { <-stopCh - ln.Close() + ctx, cancel := context.WithTimeout(context.Background(), shutDownTimeout) + server.Shutdown(ctx) + cancel() }() go func() {