diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 2b262d94237..c60331ffa30 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -156,7 +156,7 @@ func CreateServerChain(runOptions *options.ServerRunOptions, stopCh <-chan struc if len(os.Getenv("KUBE_API_VERSIONS")) > 0 { if insecureServingOptions != nil { insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(kubeAPIServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig) - if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, stopCh); err != nil { + if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil { return nil, err } } @@ -186,7 +186,7 @@ func CreateServerChain(runOptions *options.ServerRunOptions, stopCh <-chan struc if insecureServingOptions != nil { insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig) - if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, stopCh); err != nil { + if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil { return nil, err } } @@ -481,6 +481,7 @@ func BuildGenericConfig(s *options.ServerRunOptions, proxyTransport *http.Transp if err != nil { return nil, nil, nil, nil, nil, fmt.Errorf("failed to initialize admission: %v", err) } + return genericConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, nil } diff --git a/pkg/kubeapiserver/server/insecure_handler.go b/pkg/kubeapiserver/server/insecure_handler.go index 8186a0ac7ab..cc7c0a79de9 100644 --- a/pkg/kubeapiserver/server/insecure_handler.go +++ b/pkg/kubeapiserver/server/insecure_handler.go @@ -19,6 +19,7 @@ package server import ( "net" "net/http" + "time" "github.com/golang/glog" @@ -45,11 +46,12 @@ func BuildInsecureHandlerChain(apiHandler http.Handler, c *server.Config) http.H } handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, insecureSuperuser{}, nil) handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true") - handler = genericfilters.WithPanicRecovery(handler) handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.RequestContextMapper, c.LongRunningFunc, c.RequestTimeout) handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.RequestContextMapper, c.LongRunningFunc) + handler = genericfilters.WithWaitGroup(handler, c.RequestContextMapper, c.LongRunningFunc, c.HandlerChainWaitGroup) handler = genericapifilters.WithRequestInfo(handler, server.NewRequestInfoResolver(c), c.RequestContextMapper) handler = apirequest.WithRequestContext(handler, c.RequestContextMapper) + handler = genericfilters.WithPanicRecovery(handler) return handler } @@ -84,12 +86,12 @@ func (s *InsecureServingInfo) NewLoopbackClientConfig(token string) (*rest.Confi // NonBlockingRun spawns the insecure http server. An error is // returned if the ports cannot be listened on. -func NonBlockingRun(insecureServingInfo *InsecureServingInfo, insecureHandler http.Handler, stopCh <-chan struct{}) error { +func NonBlockingRun(insecureServingInfo *InsecureServingInfo, insecureHandler http.Handler, shutDownTimeout time.Duration, stopCh <-chan struct{}) error { // Use an internal stop channel to allow cleanup of the listeners on error. internalStopCh := make(chan struct{}) if insecureServingInfo != nil && insecureHandler != nil { - if err := serveInsecurely(insecureServingInfo, insecureHandler, internalStopCh); err != nil { + if err := serveInsecurely(insecureServingInfo, insecureHandler, shutDownTimeout, internalStopCh); err != nil { close(internalStopCh) return err } @@ -109,7 +111,7 @@ func NonBlockingRun(insecureServingInfo *InsecureServingInfo, insecureHandler ht // serveInsecurely run the insecure http server. It fails only if the initial listen // call fails. The actual server loop (stoppable by closing stopCh) runs in a go // routine, i.e. serveInsecurely does not block. -func serveInsecurely(insecureServingInfo *InsecureServingInfo, insecureHandler http.Handler, stopCh <-chan struct{}) error { +func serveInsecurely(insecureServingInfo *InsecureServingInfo, insecureHandler http.Handler, shutDownTimeout time.Duration, stopCh <-chan struct{}) error { insecureServer := &http.Server{ Addr: insecureServingInfo.BindAddress, Handler: insecureHandler, @@ -117,7 +119,7 @@ func serveInsecurely(insecureServingInfo *InsecureServingInfo, insecureHandler h } glog.Infof("Serving insecurely on %s", insecureServingInfo.BindAddress) var err error - _, err = server.RunServer(insecureServer, insecureServingInfo.BindNetwork, stopCh) + _, err = server.RunServer(insecureServer, insecureServingInfo.BindNetwork, shutDownTimeout, stopCh) return err } diff --git a/staging/BUILD b/staging/BUILD index 356f6d566a3..14506a8831d 100644 --- a/staging/BUILD +++ b/staging/BUILD @@ -85,6 +85,7 @@ filegroup( "//staging/src/k8s.io/apimachinery/pkg/util/uuid:all-srcs", "//staging/src/k8s.io/apimachinery/pkg/util/validation:all-srcs", "//staging/src/k8s.io/apimachinery/pkg/util/wait:all-srcs", + "//staging/src/k8s.io/apimachinery/pkg/util/waitgroup:all-srcs", "//staging/src/k8s.io/apimachinery/pkg/util/yaml:all-srcs", "//staging/src/k8s.io/apimachinery/pkg/version:all-srcs", "//staging/src/k8s.io/apimachinery/pkg/watch:all-srcs", diff --git a/staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json b/staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json index 0a9d82a04c2..d235d3aa14b 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json +++ b/staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json @@ -826,6 +826,10 @@ "ImportPath": "k8s.io/apimachinery/pkg/util/wait", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" }, + { + "ImportPath": "k8s.io/apimachinery/pkg/util/waitgroup", + "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + }, { "ImportPath": "k8s.io/apimachinery/pkg/util/yaml", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" diff --git a/staging/src/k8s.io/apimachinery/pkg/util/waitgroup/BUILD b/staging/src/k8s.io/apimachinery/pkg/util/waitgroup/BUILD new file mode 100644 index 00000000000..a35c520103b --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/util/waitgroup/BUILD @@ -0,0 +1,32 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "doc.go", + "waitgroup.go", + ], + importpath = "k8s.io/apimachinery/pkg/util/waitgroup", + visibility = ["//visibility:public"], +) + +go_test( + name = "go_default_test", + srcs = ["waitgroup_test.go"], + importpath = "k8s.io/apimachinery/pkg/util/waitgroup", + library = ":go_default_library", +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/apimachinery/pkg/util/waitgroup/doc.go b/staging/src/k8s.io/apimachinery/pkg/util/waitgroup/doc.go new file mode 100644 index 00000000000..a6f29cd7c4d --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/util/waitgroup/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package waitgroup implements SafeWaitGroup wrap of sync.WaitGroup. +// Add with positive delta when waiting will fail, to prevent sync.WaitGroup race issue. +package waitgroup // import "k8s.io/apimachinery/pkg/util/waitgroup" diff --git a/staging/src/k8s.io/apimachinery/pkg/util/waitgroup/waitgroup.go b/staging/src/k8s.io/apimachinery/pkg/util/waitgroup/waitgroup.go new file mode 100644 index 00000000000..afe92fa83e1 --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/util/waitgroup/waitgroup.go @@ -0,0 +1,57 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package waitgroup + +import ( + "fmt" + "sync" +) + +// SafeWaitGroup must not be copied after first use. +type SafeWaitGroup struct { + wg sync.WaitGroup + mu sync.RWMutex + // wait indicate whether Wait is called, if true, + // then any Add with positive delta will return error. + wait bool +} + +// Add adds delta, which may be negative, similar to sync.WaitGroup. +// If Add with a positive delta happens after Wait, it will return error, +// which prevent unsafe Add. +func (wg *SafeWaitGroup) Add(delta int) error { + wg.mu.RLock() + if wg.wait && delta > 0 { + return fmt.Errorf("Add with postive delta after Wait is forbidden") + } + wg.mu.RUnlock() + wg.wg.Add(delta) + return nil +} + +// Done decrements the WaitGroup counter. +func (wg *SafeWaitGroup) Done() { + wg.wg.Done() +} + +// Wait blocks until the WaitGroup counter is zero. +func (wg *SafeWaitGroup) Wait() { + wg.mu.Lock() + wg.wait = true + wg.mu.Unlock() + wg.wg.Wait() +} diff --git a/staging/src/k8s.io/apimachinery/pkg/util/waitgroup/waitgroup_test.go b/staging/src/k8s.io/apimachinery/pkg/util/waitgroup/waitgroup_test.go new file mode 100644 index 00000000000..e1bbddd482f --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/util/waitgroup/waitgroup_test.go @@ -0,0 +1,178 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package waitgroup test cases reference golang sync.WaitGroup https://golang.org/src/sync/waitgroup_test.go. +package waitgroup + +import ( + "runtime" + "sync/atomic" + "testing" +) + +func TestWaitGroup(t *testing.T) { + wg1 := &SafeWaitGroup{} + wg2 := &SafeWaitGroup{} + n := 16 + wg1.Add(n) + wg2.Add(n) + exited := make(chan bool, n) + for i := 0; i != n; i++ { + go func(i int) { + wg1.Done() + wg2.Wait() + exited <- true + }(i) + } + wg1.Wait() + for i := 0; i != n; i++ { + select { + case <-exited: + t.Fatal("SafeWaitGroup released group too soon") + default: + } + wg2.Done() + } + for i := 0; i != n; i++ { + <-exited // Will block if barrier fails to unlock someone. + } +} + +func TestWaitGroupNegativeCounter(t *testing.T) { + defer func() { + err := recover() + if err != "sync: negative WaitGroup counter" { + t.Fatalf("Unexpected panic: %#v", err) + } + }() + wg := &SafeWaitGroup{} + wg.Add(1) + wg.Done() + wg.Done() + t.Fatal("Should panic") +} + +func TestWaitGroupAddFail(t *testing.T) { + wg := &SafeWaitGroup{} + wg.Add(1) + wg.Done() + wg.Wait() + if err := wg.Add(1); err == nil { + t.Errorf("Should return error when add positive after Wait") + } +} + +func BenchmarkWaitGroupUncontended(b *testing.B) { + type PaddedWaitGroup struct { + SafeWaitGroup + pad [128]uint8 + } + const CallsPerSched = 1000 + procs := runtime.GOMAXPROCS(-1) + N := int32(b.N / CallsPerSched) + c := make(chan bool, procs) + for p := 0; p < procs; p++ { + go func() { + var wg PaddedWaitGroup + for atomic.AddInt32(&N, -1) >= 0 { + runtime.Gosched() + for g := 0; g < CallsPerSched; g++ { + wg.Add(1) + wg.Done() + } + } + c <- true + }() + } + for p := 0; p < procs; p++ { + <-c + } +} + +func benchmarkWaitGroupAddDone(b *testing.B, localWork int) { + const CallsPerSched = 1000 + procs := runtime.GOMAXPROCS(-1) + N := int32(b.N / CallsPerSched) + c := make(chan bool, procs) + var wg SafeWaitGroup + for p := 0; p < procs; p++ { + go func() { + foo := 0 + for atomic.AddInt32(&N, -1) >= 0 { + runtime.Gosched() + for g := 0; g < CallsPerSched; g++ { + wg.Add(1) + for i := 0; i < localWork; i++ { + foo *= 2 + foo /= 2 + } + wg.Done() + } + } + c <- foo == 42 + }() + } + for p := 0; p < procs; p++ { + <-c + } +} + +func BenchmarkWaitGroupAddDone(b *testing.B) { + benchmarkWaitGroupAddDone(b, 0) +} + +func BenchmarkWaitGroupAddDoneWork(b *testing.B) { + benchmarkWaitGroupAddDone(b, 100) +} + +func benchmarkWaitGroupWait(b *testing.B, localWork int) { + const CallsPerSched = 1000 + procs := runtime.GOMAXPROCS(-1) + N := int32(b.N / CallsPerSched) + c := make(chan bool, procs) + var wg SafeWaitGroup + wg.Add(procs) + for p := 0; p < procs; p++ { + go wg.Done() + } + for p := 0; p < procs; p++ { + go func() { + foo := 0 + for atomic.AddInt32(&N, -1) >= 0 { + runtime.Gosched() + for g := 0; g < CallsPerSched; g++ { + wg.Wait() + for i := 0; i < localWork; i++ { + foo *= 2 + foo /= 2 + } + } + } + c <- foo == 42 + }() + } + for p := 0; p < procs; p++ { + <-c + } +} + +func BenchmarkWaitGroupWait(b *testing.B) { + benchmarkWaitGroupWait(b, 0) +} + +func BenchmarkWaitGroupWaitWork(b *testing.B) { + benchmarkWaitGroupWait(b, 100) +} diff --git a/staging/src/k8s.io/apiserver/Godeps/Godeps.json b/staging/src/k8s.io/apiserver/Godeps/Godeps.json index 958a270bc06..90a1cb3fb2a 100644 --- a/staging/src/k8s.io/apiserver/Godeps/Godeps.json +++ b/staging/src/k8s.io/apiserver/Godeps/Godeps.json @@ -1078,6 +1078,10 @@ "ImportPath": "k8s.io/apimachinery/pkg/util/wait", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" }, + { + "ImportPath": "k8s.io/apimachinery/pkg/util/waitgroup", + "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + }, { "ImportPath": "k8s.io/apimachinery/pkg/util/yaml", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" diff --git a/staging/src/k8s.io/apiserver/pkg/server/BUILD b/staging/src/k8s.io/apiserver/pkg/server/BUILD index 3d56d3a2664..68c2d540802 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/BUILD @@ -30,8 +30,10 @@ go_test( "//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/discovery:go_default_library", + "//vendor/k8s.io/apiserver/pkg/endpoints/filters:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library", + "//vendor/k8s.io/apiserver/pkg/server/filters:go_default_library", "//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage/etcd/testing:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", @@ -81,6 +83,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/validation:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/waitgroup:go_default_library", "//vendor/k8s.io/apimachinery/pkg/version:go_default_library", "//vendor/k8s.io/apiserver/pkg/admission:go_default_library", "//vendor/k8s.io/apiserver/pkg/admission/plugin/initialization:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index e1c81c7d4a3..ac1317889eb 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -36,6 +36,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/sets" + utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup" "k8s.io/apimachinery/pkg/version" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/audit" @@ -128,6 +129,8 @@ type Config struct { // BuildHandlerChainFunc allows you to build custom handler chains by decorating the apiHandler. BuildHandlerChainFunc func(apiHandler http.Handler, c *Config) (secure http.Handler) + // HandlerChainWaitGroup allows you to wait for all chain handlers exit after the server shutdown. + HandlerChainWaitGroup *utilwaitgroup.SafeWaitGroup // DiscoveryAddresses is used to build the IPs pass to discovery. If nil, the ExternalAddress is // always reported DiscoveryAddresses discovery.Addresses @@ -236,6 +239,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config { ReadWritePort: 443, RequestContextMapper: apirequest.NewRequestContextMapper(), BuildHandlerChainFunc: DefaultBuildHandlerChain, + HandlerChainWaitGroup: new(utilwaitgroup.SafeWaitGroup), LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix), DisabledPostStartHooks: sets.NewString(), HealthzChecks: []healthz.HealthzChecker{healthz.PingHealthz}, @@ -446,8 +450,10 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G Serializer: c.Serializer, AuditBackend: c.AuditBackend, delegationTarget: delegationTarget, + HandlerChainWaitGroup: c.HandlerChainWaitGroup, minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second, + ShutdownTimeout: c.RequestTimeout, SecureServingInfo: c.SecureServingInfo, ExternalAddress: c.ExternalAddress, @@ -488,6 +494,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G return nil, err } } + for _, delegateCheck := range delegationTarget.HealthzChecks() { skip := false for _, existingCheck := range c.HealthzChecks { @@ -535,6 +542,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, c.Authenticator, failedHandler) handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true") handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.RequestContextMapper, c.LongRunningFunc, c.RequestTimeout) + handler = genericfilters.WithWaitGroup(handler, c.RequestContextMapper, c.LongRunningFunc, c.HandlerChainWaitGroup) handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver, c.RequestContextMapper) handler = apirequest.WithRequestContext(handler, c.RequestContextMapper) handler = genericfilters.WithPanicRecovery(handler) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD b/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD index a7e94dca6dc..bf72edc372f 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD @@ -37,6 +37,7 @@ go_library( "longrunning.go", "maxinflight.go", "timeout.go", + "waitgroup.go", "wrap.go", ], importpath = "k8s.io/apiserver/pkg/server/filters", @@ -46,6 +47,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/waitgroup:go_default_library", "//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/waitgroup.go b/staging/src/k8s.io/apiserver/pkg/server/filters/waitgroup.go new file mode 100644 index 00000000000..be73a2c9d97 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/waitgroup.go @@ -0,0 +1,53 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filters + +import ( + "net/http" + + utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup" + apirequest "k8s.io/apiserver/pkg/endpoints/request" +) + +// WithWaitGroup adds all non long-running requests to wait group, which is used for graceful shutdown. +func WithWaitGroup(handler http.Handler, requestContextMapper apirequest.RequestContextMapper, longRunning apirequest.LongRunningRequestCheck, wg *utilwaitgroup.SafeWaitGroup) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + ctx, ok := requestContextMapper.Get(req) + if !ok { + // if this happens, the handler chain isn't setup correctly because there is no context mapper + handler.ServeHTTP(w, req) + return + } + + requestInfo, ok := apirequest.RequestInfoFrom(ctx) + if !ok { + // if this happens, the handler chain isn't setup correctly because there is no request info + handler.ServeHTTP(w, req) + return + } + + if !longRunning(req, requestInfo) { + if err := wg.Add(1); err != nil { + http.Error(w, "Apisever is shutting down.", http.StatusInternalServerError) + return + } + defer wg.Done() + } + + handler.ServeHTTP(w, req) + }) +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 3109bb35667..c824719fe8d 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/sets" + utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/audit" genericapi "k8s.io/apiserver/pkg/endpoints" @@ -83,6 +84,10 @@ type GenericAPIServer struct { // minRequestTimeout is how short the request timeout can be. This is used to build the RESTHandler minRequestTimeout time.Duration + // ShutdownTimeout is the timeout used for server shutdown. This specifies the timeout before server + // gracefully shutdown returns. + ShutdownTimeout time.Duration + // legacyAPIGroupPrefixes is used to set up URL parsing for authorization and for validating requests // to InstallLegacyAPIGroup legacyAPIGroupPrefixes sets.String @@ -146,6 +151,9 @@ type GenericAPIServer struct { // delegationTarget is the next delegate in the chain or nil delegationTarget DelegationTarget + + // HandlerChainWaitGroup allows you to wait for all chain handlers finish after the server shutdown. + HandlerChainWaitGroup *utilwaitgroup.SafeWaitGroup } // DelegationTarget is an interface which allows for composition of API servers with top level handling that works @@ -275,16 +283,28 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { <-stopCh - return s.RunPreShutdownHooks() + err = s.RunPreShutdownHooks() + if err != nil { + return err + } + + // Wait for all requests to finish, which are bounded by the RequestTimeout variable. + s.HandlerChainWaitGroup.Wait() + + return nil } // NonBlockingRun spawns the secure http server. An error is // returned if the secure port cannot be listened on. func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error { + // Use an stop channel to allow graceful shutdown without dropping audit events + // after http server shutdown. + auditStopCh := make(chan struct{}) + // Start the audit backend before any request comes in. This means we must call Backend.Run // before http server start serving. Otherwise the Backend.ProcessEvents call might block. if s.AuditBackend != nil { - if err := s.AuditBackend.Run(stopCh); err != nil { + if err := s.AuditBackend.Run(auditStopCh); err != nil { return fmt.Errorf("failed to run the audit backend: %v", err) } } @@ -305,6 +325,8 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error { go func() { <-stopCh close(internalStopCh) + s.HandlerChainWaitGroup.Wait() + close(auditStopCh) }() s.RunPostStartHooks(stopCh) diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go index 16fee408c7b..64bfa7b232a 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go @@ -25,6 +25,8 @@ import ( "net/http" "net/http/httptest" goruntime "runtime" + "strconv" + "sync" "testing" "time" @@ -44,8 +46,11 @@ import ( "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/endpoints/discovery" + genericapifilters "k8s.io/apiserver/pkg/endpoints/filters" + apirequest "k8s.io/apiserver/pkg/endpoints/request" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" + genericfilters "k8s.io/apiserver/pkg/server/filters" etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" @@ -509,3 +514,75 @@ func fakeVersion() version.Info { Platform: fmt.Sprintf("%s/%s", goruntime.GOOS, goruntime.GOARCH), } } + +// TestGracefulShutdown verifies server shutdown after request handler finish. +func TestGracefulShutdown(t *testing.T) { + etcdserver, config, _ := setUp(t) + defer etcdserver.Terminate(t) + + var graceShutdown bool + wg := sync.WaitGroup{} + wg.Add(1) + + config.BuildHandlerChainFunc = func(apiHandler http.Handler, c *Config) http.Handler { + handler := genericfilters.WithWaitGroup(apiHandler, c.RequestContextMapper, c.LongRunningFunc, c.HandlerChainWaitGroup) + handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver, c.RequestContextMapper) + handler = apirequest.WithRequestContext(handler, c.RequestContextMapper) + return handler + } + + handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + wg.Done() + time.Sleep(2 * time.Second) + w.WriteHeader(http.StatusOK) + graceShutdown = true + }) + + s, err := config.Complete(nil).New("test", EmptyDelegate) + if err != nil { + t.Fatalf("Error in bringing up the server: %v", err) + } + + s.Handler.NonGoRestfulMux.Handle("/test", handler) + + insecureServer := &http.Server{ + Addr: "0.0.0.0:0", + Handler: s.Handler, + } + stopCh := make(chan struct{}) + serverPort, err := RunServer(insecureServer, "tcp", 10*time.Second, stopCh) + if err != nil { + t.Errorf("RunServer err: %v", err) + } + + graceCh := make(chan struct{}) + // mock a client request + go func() { + resp, err := http.Get("http://127.0.0.1:" + strconv.Itoa(serverPort) + "/test") + if err != nil { + t.Errorf("Unexpected http error: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Errorf("Unexpected http status code: %v", resp.StatusCode) + } + close(graceCh) + }() + + // close stopCh after request sent to server to guarantee request handler is running. + wg.Wait() + close(stopCh) + // wait for wait group handler finish + s.HandlerChainWaitGroup.Wait() + + // check server all handlers finished. + if !graceShutdown { + t.Errorf("server shutdown not gracefully.") + } + // check client to make sure receive response. + select { + case <-graceCh: + t.Logf("server shutdown gracefully.") + case <-time.After(30 * time.Second): + t.Errorf("Timed out waiting for response.") + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/serve.go b/staging/src/k8s.io/apiserver/pkg/server/serve.go index 90f4078c753..f7d9f902305 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/serve.go +++ b/staging/src/k8s.io/apiserver/pkg/server/serve.go @@ -17,6 +17,7 @@ limitations under the License. package server import ( + "context" "crypto/tls" "crypto/x509" "fmt" @@ -84,13 +85,13 @@ func (s *GenericAPIServer) serveSecurely(stopCh <-chan struct{}) error { glog.Infof("Serving securely on %s", s.SecureServingInfo.BindAddress) var err error - s.effectiveSecurePort, err = RunServer(secureServer, s.SecureServingInfo.BindNetwork, stopCh) + s.effectiveSecurePort, err = RunServer(secureServer, s.SecureServingInfo.BindNetwork, s.ShutdownTimeout, stopCh) return err } // RunServer listens on the given port, then spawns a go-routine continuously serving // until the stopCh is closed. The port is returned. This function does not block. -func RunServer(server *http.Server, network string, stopCh <-chan struct{}) (int, error) { +func RunServer(server *http.Server, network string, shutDownTimeout time.Duration, stopCh <-chan struct{}) (int, error) { if len(server.Addr) == 0 { return 0, errors.New("address cannot be empty") } @@ -111,10 +112,12 @@ func RunServer(server *http.Server, network string, stopCh <-chan struct{}) (int return 0, fmt.Errorf("invalid listen address: %q", ln.Addr().String()) } - // Stop the server by closing the listener + // Shutdown server gracefully. go func() { <-stopCh - ln.Close() + ctx, cancel := context.WithTimeout(context.Background(), shutDownTimeout) + server.Shutdown(ctx) + cancel() }() go func() { diff --git a/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json b/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json index 9c3b3a5090a..b15737ea7d9 100644 --- a/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json +++ b/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json @@ -794,6 +794,10 @@ "ImportPath": "k8s.io/apimachinery/pkg/util/wait", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" }, + { + "ImportPath": "k8s.io/apimachinery/pkg/util/waitgroup", + "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + }, { "ImportPath": "k8s.io/apimachinery/pkg/util/yaml", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" diff --git a/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json b/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json index 4f502b92882..c610fbbaf3c 100644 --- a/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json +++ b/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json @@ -790,6 +790,10 @@ "ImportPath": "k8s.io/apimachinery/pkg/util/wait", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" }, + { + "ImportPath": "k8s.io/apimachinery/pkg/util/waitgroup", + "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + }, { "ImportPath": "k8s.io/apimachinery/pkg/util/yaml", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"