mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #54849 from hzxuzhonghu/audit-graceful-shutdown
Automatic merge from submit-queue (batch tested with PRs 46581, 55426, 54849). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. apiserver shutdown gracefully **What this PR does / why we need it**: apiserver shutdown gracefully and wait all non-long running requests finish before process exit. **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes #54793 **Special notes for your reviewer**: remove waitGroup, use atomic to count. **Release note**: ```release-note NONE ```
This commit is contained in:
commit
d12d711ba6
@ -156,7 +156,7 @@ func CreateServerChain(runOptions *options.ServerRunOptions, stopCh <-chan struc
|
||||
if len(os.Getenv("KUBE_API_VERSIONS")) > 0 {
|
||||
if insecureServingOptions != nil {
|
||||
insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(kubeAPIServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
|
||||
if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, stopCh); err != nil {
|
||||
if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
@ -186,7 +186,7 @@ func CreateServerChain(runOptions *options.ServerRunOptions, stopCh <-chan struc
|
||||
|
||||
if insecureServingOptions != nil {
|
||||
insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
|
||||
if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, stopCh); err != nil {
|
||||
if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
@ -481,6 +481,7 @@ func BuildGenericConfig(s *options.ServerRunOptions, proxyTransport *http.Transp
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, nil, fmt.Errorf("failed to initialize admission: %v", err)
|
||||
}
|
||||
|
||||
return genericConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, nil
|
||||
}
|
||||
|
||||
|
@ -19,6 +19,7 @@ package server
|
||||
import (
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
@ -45,11 +46,12 @@ func BuildInsecureHandlerChain(apiHandler http.Handler, c *server.Config) http.H
|
||||
}
|
||||
handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, insecureSuperuser{}, nil)
|
||||
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
|
||||
handler = genericfilters.WithPanicRecovery(handler)
|
||||
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.RequestContextMapper, c.LongRunningFunc, c.RequestTimeout)
|
||||
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.RequestContextMapper, c.LongRunningFunc)
|
||||
handler = genericfilters.WithWaitGroup(handler, c.RequestContextMapper, c.LongRunningFunc, c.HandlerChainWaitGroup)
|
||||
handler = genericapifilters.WithRequestInfo(handler, server.NewRequestInfoResolver(c), c.RequestContextMapper)
|
||||
handler = apirequest.WithRequestContext(handler, c.RequestContextMapper)
|
||||
handler = genericfilters.WithPanicRecovery(handler)
|
||||
|
||||
return handler
|
||||
}
|
||||
@ -84,12 +86,12 @@ func (s *InsecureServingInfo) NewLoopbackClientConfig(token string) (*rest.Confi
|
||||
|
||||
// NonBlockingRun spawns the insecure http server. An error is
|
||||
// returned if the ports cannot be listened on.
|
||||
func NonBlockingRun(insecureServingInfo *InsecureServingInfo, insecureHandler http.Handler, stopCh <-chan struct{}) error {
|
||||
func NonBlockingRun(insecureServingInfo *InsecureServingInfo, insecureHandler http.Handler, shutDownTimeout time.Duration, stopCh <-chan struct{}) error {
|
||||
// Use an internal stop channel to allow cleanup of the listeners on error.
|
||||
internalStopCh := make(chan struct{})
|
||||
|
||||
if insecureServingInfo != nil && insecureHandler != nil {
|
||||
if err := serveInsecurely(insecureServingInfo, insecureHandler, internalStopCh); err != nil {
|
||||
if err := serveInsecurely(insecureServingInfo, insecureHandler, shutDownTimeout, internalStopCh); err != nil {
|
||||
close(internalStopCh)
|
||||
return err
|
||||
}
|
||||
@ -109,7 +111,7 @@ func NonBlockingRun(insecureServingInfo *InsecureServingInfo, insecureHandler ht
|
||||
// serveInsecurely run the insecure http server. It fails only if the initial listen
|
||||
// call fails. The actual server loop (stoppable by closing stopCh) runs in a go
|
||||
// routine, i.e. serveInsecurely does not block.
|
||||
func serveInsecurely(insecureServingInfo *InsecureServingInfo, insecureHandler http.Handler, stopCh <-chan struct{}) error {
|
||||
func serveInsecurely(insecureServingInfo *InsecureServingInfo, insecureHandler http.Handler, shutDownTimeout time.Duration, stopCh <-chan struct{}) error {
|
||||
insecureServer := &http.Server{
|
||||
Addr: insecureServingInfo.BindAddress,
|
||||
Handler: insecureHandler,
|
||||
@ -117,7 +119,7 @@ func serveInsecurely(insecureServingInfo *InsecureServingInfo, insecureHandler h
|
||||
}
|
||||
glog.Infof("Serving insecurely on %s", insecureServingInfo.BindAddress)
|
||||
var err error
|
||||
_, err = server.RunServer(insecureServer, insecureServingInfo.BindNetwork, stopCh)
|
||||
_, err = server.RunServer(insecureServer, insecureServingInfo.BindNetwork, shutDownTimeout, stopCh)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -85,6 +85,7 @@ filegroup(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:all-srcs",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/validation:all-srcs",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:all-srcs",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/waitgroup:all-srcs",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/yaml:all-srcs",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/version:all-srcs",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/watch:all-srcs",
|
||||
|
@ -826,6 +826,10 @@
|
||||
"ImportPath": "k8s.io/apimachinery/pkg/util/wait",
|
||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/apimachinery/pkg/util/waitgroup",
|
||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/apimachinery/pkg/util/yaml",
|
||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
|
32
staging/src/k8s.io/apimachinery/pkg/util/waitgroup/BUILD
Normal file
32
staging/src/k8s.io/apimachinery/pkg/util/waitgroup/BUILD
Normal file
@ -0,0 +1,32 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"doc.go",
|
||||
"waitgroup.go",
|
||||
],
|
||||
importpath = "k8s.io/apimachinery/pkg/util/waitgroup",
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["waitgroup_test.go"],
|
||||
importpath = "k8s.io/apimachinery/pkg/util/waitgroup",
|
||||
library = ":go_default_library",
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
19
staging/src/k8s.io/apimachinery/pkg/util/waitgroup/doc.go
Normal file
19
staging/src/k8s.io/apimachinery/pkg/util/waitgroup/doc.go
Normal file
@ -0,0 +1,19 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package waitgroup implements SafeWaitGroup wrap of sync.WaitGroup.
|
||||
// Add with positive delta when waiting will fail, to prevent sync.WaitGroup race issue.
|
||||
package waitgroup // import "k8s.io/apimachinery/pkg/util/waitgroup"
|
@ -0,0 +1,57 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package waitgroup
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// SafeWaitGroup must not be copied after first use.
|
||||
type SafeWaitGroup struct {
|
||||
wg sync.WaitGroup
|
||||
mu sync.RWMutex
|
||||
// wait indicate whether Wait is called, if true,
|
||||
// then any Add with positive delta will return error.
|
||||
wait bool
|
||||
}
|
||||
|
||||
// Add adds delta, which may be negative, similar to sync.WaitGroup.
|
||||
// If Add with a positive delta happens after Wait, it will return error,
|
||||
// which prevent unsafe Add.
|
||||
func (wg *SafeWaitGroup) Add(delta int) error {
|
||||
wg.mu.RLock()
|
||||
if wg.wait && delta > 0 {
|
||||
return fmt.Errorf("Add with postive delta after Wait is forbidden")
|
||||
}
|
||||
wg.mu.RUnlock()
|
||||
wg.wg.Add(delta)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Done decrements the WaitGroup counter.
|
||||
func (wg *SafeWaitGroup) Done() {
|
||||
wg.wg.Done()
|
||||
}
|
||||
|
||||
// Wait blocks until the WaitGroup counter is zero.
|
||||
func (wg *SafeWaitGroup) Wait() {
|
||||
wg.mu.Lock()
|
||||
wg.wait = true
|
||||
wg.mu.Unlock()
|
||||
wg.wg.Wait()
|
||||
}
|
@ -0,0 +1,178 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package waitgroup test cases reference golang sync.WaitGroup https://golang.org/src/sync/waitgroup_test.go.
|
||||
package waitgroup
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestWaitGroup(t *testing.T) {
|
||||
wg1 := &SafeWaitGroup{}
|
||||
wg2 := &SafeWaitGroup{}
|
||||
n := 16
|
||||
wg1.Add(n)
|
||||
wg2.Add(n)
|
||||
exited := make(chan bool, n)
|
||||
for i := 0; i != n; i++ {
|
||||
go func(i int) {
|
||||
wg1.Done()
|
||||
wg2.Wait()
|
||||
exited <- true
|
||||
}(i)
|
||||
}
|
||||
wg1.Wait()
|
||||
for i := 0; i != n; i++ {
|
||||
select {
|
||||
case <-exited:
|
||||
t.Fatal("SafeWaitGroup released group too soon")
|
||||
default:
|
||||
}
|
||||
wg2.Done()
|
||||
}
|
||||
for i := 0; i != n; i++ {
|
||||
<-exited // Will block if barrier fails to unlock someone.
|
||||
}
|
||||
}
|
||||
|
||||
func TestWaitGroupNegativeCounter(t *testing.T) {
|
||||
defer func() {
|
||||
err := recover()
|
||||
if err != "sync: negative WaitGroup counter" {
|
||||
t.Fatalf("Unexpected panic: %#v", err)
|
||||
}
|
||||
}()
|
||||
wg := &SafeWaitGroup{}
|
||||
wg.Add(1)
|
||||
wg.Done()
|
||||
wg.Done()
|
||||
t.Fatal("Should panic")
|
||||
}
|
||||
|
||||
func TestWaitGroupAddFail(t *testing.T) {
|
||||
wg := &SafeWaitGroup{}
|
||||
wg.Add(1)
|
||||
wg.Done()
|
||||
wg.Wait()
|
||||
if err := wg.Add(1); err == nil {
|
||||
t.Errorf("Should return error when add positive after Wait")
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkWaitGroupUncontended(b *testing.B) {
|
||||
type PaddedWaitGroup struct {
|
||||
SafeWaitGroup
|
||||
pad [128]uint8
|
||||
}
|
||||
const CallsPerSched = 1000
|
||||
procs := runtime.GOMAXPROCS(-1)
|
||||
N := int32(b.N / CallsPerSched)
|
||||
c := make(chan bool, procs)
|
||||
for p := 0; p < procs; p++ {
|
||||
go func() {
|
||||
var wg PaddedWaitGroup
|
||||
for atomic.AddInt32(&N, -1) >= 0 {
|
||||
runtime.Gosched()
|
||||
for g := 0; g < CallsPerSched; g++ {
|
||||
wg.Add(1)
|
||||
wg.Done()
|
||||
}
|
||||
}
|
||||
c <- true
|
||||
}()
|
||||
}
|
||||
for p := 0; p < procs; p++ {
|
||||
<-c
|
||||
}
|
||||
}
|
||||
|
||||
func benchmarkWaitGroupAddDone(b *testing.B, localWork int) {
|
||||
const CallsPerSched = 1000
|
||||
procs := runtime.GOMAXPROCS(-1)
|
||||
N := int32(b.N / CallsPerSched)
|
||||
c := make(chan bool, procs)
|
||||
var wg SafeWaitGroup
|
||||
for p := 0; p < procs; p++ {
|
||||
go func() {
|
||||
foo := 0
|
||||
for atomic.AddInt32(&N, -1) >= 0 {
|
||||
runtime.Gosched()
|
||||
for g := 0; g < CallsPerSched; g++ {
|
||||
wg.Add(1)
|
||||
for i := 0; i < localWork; i++ {
|
||||
foo *= 2
|
||||
foo /= 2
|
||||
}
|
||||
wg.Done()
|
||||
}
|
||||
}
|
||||
c <- foo == 42
|
||||
}()
|
||||
}
|
||||
for p := 0; p < procs; p++ {
|
||||
<-c
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkWaitGroupAddDone(b *testing.B) {
|
||||
benchmarkWaitGroupAddDone(b, 0)
|
||||
}
|
||||
|
||||
func BenchmarkWaitGroupAddDoneWork(b *testing.B) {
|
||||
benchmarkWaitGroupAddDone(b, 100)
|
||||
}
|
||||
|
||||
func benchmarkWaitGroupWait(b *testing.B, localWork int) {
|
||||
const CallsPerSched = 1000
|
||||
procs := runtime.GOMAXPROCS(-1)
|
||||
N := int32(b.N / CallsPerSched)
|
||||
c := make(chan bool, procs)
|
||||
var wg SafeWaitGroup
|
||||
wg.Add(procs)
|
||||
for p := 0; p < procs; p++ {
|
||||
go wg.Done()
|
||||
}
|
||||
for p := 0; p < procs; p++ {
|
||||
go func() {
|
||||
foo := 0
|
||||
for atomic.AddInt32(&N, -1) >= 0 {
|
||||
runtime.Gosched()
|
||||
for g := 0; g < CallsPerSched; g++ {
|
||||
wg.Wait()
|
||||
for i := 0; i < localWork; i++ {
|
||||
foo *= 2
|
||||
foo /= 2
|
||||
}
|
||||
}
|
||||
}
|
||||
c <- foo == 42
|
||||
}()
|
||||
}
|
||||
for p := 0; p < procs; p++ {
|
||||
<-c
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkWaitGroupWait(b *testing.B) {
|
||||
benchmarkWaitGroupWait(b, 0)
|
||||
}
|
||||
|
||||
func BenchmarkWaitGroupWaitWork(b *testing.B) {
|
||||
benchmarkWaitGroupWait(b, 100)
|
||||
}
|
4
staging/src/k8s.io/apiserver/Godeps/Godeps.json
generated
4
staging/src/k8s.io/apiserver/Godeps/Godeps.json
generated
@ -1078,6 +1078,10 @@
|
||||
"ImportPath": "k8s.io/apimachinery/pkg/util/wait",
|
||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/apimachinery/pkg/util/waitgroup",
|
||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/apimachinery/pkg/util/yaml",
|
||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
|
@ -30,8 +30,10 @@ go_test(
|
||||
"//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/endpoints/discovery:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/endpoints/filters:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/server/filters:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage/etcd/testing:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers:go_default_library",
|
||||
@ -81,6 +83,7 @@ go_library(
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/validation:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/waitgroup:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/version:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/admission:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/admission/plugin/initialization:go_default_library",
|
||||
|
@ -36,6 +36,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
|
||||
"k8s.io/apimachinery/pkg/version"
|
||||
"k8s.io/apiserver/pkg/admission"
|
||||
"k8s.io/apiserver/pkg/audit"
|
||||
@ -128,6 +129,8 @@ type Config struct {
|
||||
|
||||
// BuildHandlerChainFunc allows you to build custom handler chains by decorating the apiHandler.
|
||||
BuildHandlerChainFunc func(apiHandler http.Handler, c *Config) (secure http.Handler)
|
||||
// HandlerChainWaitGroup allows you to wait for all chain handlers exit after the server shutdown.
|
||||
HandlerChainWaitGroup *utilwaitgroup.SafeWaitGroup
|
||||
// DiscoveryAddresses is used to build the IPs pass to discovery. If nil, the ExternalAddress is
|
||||
// always reported
|
||||
DiscoveryAddresses discovery.Addresses
|
||||
@ -236,6 +239,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
|
||||
ReadWritePort: 443,
|
||||
RequestContextMapper: apirequest.NewRequestContextMapper(),
|
||||
BuildHandlerChainFunc: DefaultBuildHandlerChain,
|
||||
HandlerChainWaitGroup: new(utilwaitgroup.SafeWaitGroup),
|
||||
LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix),
|
||||
DisabledPostStartHooks: sets.NewString(),
|
||||
HealthzChecks: []healthz.HealthzChecker{healthz.PingHealthz},
|
||||
@ -446,8 +450,10 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
||||
Serializer: c.Serializer,
|
||||
AuditBackend: c.AuditBackend,
|
||||
delegationTarget: delegationTarget,
|
||||
HandlerChainWaitGroup: c.HandlerChainWaitGroup,
|
||||
|
||||
minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
|
||||
ShutdownTimeout: c.RequestTimeout,
|
||||
|
||||
SecureServingInfo: c.SecureServingInfo,
|
||||
ExternalAddress: c.ExternalAddress,
|
||||
@ -488,6 +494,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
for _, delegateCheck := range delegationTarget.HealthzChecks() {
|
||||
skip := false
|
||||
for _, existingCheck := range c.HealthzChecks {
|
||||
@ -535,6 +542,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
|
||||
handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, c.Authenticator, failedHandler)
|
||||
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
|
||||
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.RequestContextMapper, c.LongRunningFunc, c.RequestTimeout)
|
||||
handler = genericfilters.WithWaitGroup(handler, c.RequestContextMapper, c.LongRunningFunc, c.HandlerChainWaitGroup)
|
||||
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver, c.RequestContextMapper)
|
||||
handler = apirequest.WithRequestContext(handler, c.RequestContextMapper)
|
||||
handler = genericfilters.WithPanicRecovery(handler)
|
||||
|
@ -37,6 +37,7 @@ go_library(
|
||||
"longrunning.go",
|
||||
"maxinflight.go",
|
||||
"timeout.go",
|
||||
"waitgroup.go",
|
||||
"wrap.go",
|
||||
],
|
||||
importpath = "k8s.io/apiserver/pkg/server/filters",
|
||||
@ -46,6 +47,7 @@ go_library(
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/waitgroup:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||
|
53
staging/src/k8s.io/apiserver/pkg/server/filters/waitgroup.go
Normal file
53
staging/src/k8s.io/apiserver/pkg/server/filters/waitgroup.go
Normal file
@ -0,0 +1,53 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package filters
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
|
||||
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
)
|
||||
|
||||
// WithWaitGroup adds all non long-running requests to wait group, which is used for graceful shutdown.
|
||||
func WithWaitGroup(handler http.Handler, requestContextMapper apirequest.RequestContextMapper, longRunning apirequest.LongRunningRequestCheck, wg *utilwaitgroup.SafeWaitGroup) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
ctx, ok := requestContextMapper.Get(req)
|
||||
if !ok {
|
||||
// if this happens, the handler chain isn't setup correctly because there is no context mapper
|
||||
handler.ServeHTTP(w, req)
|
||||
return
|
||||
}
|
||||
|
||||
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
|
||||
if !ok {
|
||||
// if this happens, the handler chain isn't setup correctly because there is no request info
|
||||
handler.ServeHTTP(w, req)
|
||||
return
|
||||
}
|
||||
|
||||
if !longRunning(req, requestInfo) {
|
||||
if err := wg.Add(1); err != nil {
|
||||
http.Error(w, "Apisever is shutting down.", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
defer wg.Done()
|
||||
}
|
||||
|
||||
handler.ServeHTTP(w, req)
|
||||
})
|
||||
}
|
@ -34,6 +34,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
|
||||
"k8s.io/apiserver/pkg/admission"
|
||||
"k8s.io/apiserver/pkg/audit"
|
||||
genericapi "k8s.io/apiserver/pkg/endpoints"
|
||||
@ -83,6 +84,10 @@ type GenericAPIServer struct {
|
||||
// minRequestTimeout is how short the request timeout can be. This is used to build the RESTHandler
|
||||
minRequestTimeout time.Duration
|
||||
|
||||
// ShutdownTimeout is the timeout used for server shutdown. This specifies the timeout before server
|
||||
// gracefully shutdown returns.
|
||||
ShutdownTimeout time.Duration
|
||||
|
||||
// legacyAPIGroupPrefixes is used to set up URL parsing for authorization and for validating requests
|
||||
// to InstallLegacyAPIGroup
|
||||
legacyAPIGroupPrefixes sets.String
|
||||
@ -146,6 +151,9 @@ type GenericAPIServer struct {
|
||||
|
||||
// delegationTarget is the next delegate in the chain or nil
|
||||
delegationTarget DelegationTarget
|
||||
|
||||
// HandlerChainWaitGroup allows you to wait for all chain handlers finish after the server shutdown.
|
||||
HandlerChainWaitGroup *utilwaitgroup.SafeWaitGroup
|
||||
}
|
||||
|
||||
// DelegationTarget is an interface which allows for composition of API servers with top level handling that works
|
||||
@ -275,16 +283,28 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
|
||||
|
||||
<-stopCh
|
||||
|
||||
return s.RunPreShutdownHooks()
|
||||
err = s.RunPreShutdownHooks()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Wait for all requests to finish, which are bounded by the RequestTimeout variable.
|
||||
s.HandlerChainWaitGroup.Wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NonBlockingRun spawns the secure http server. An error is
|
||||
// returned if the secure port cannot be listened on.
|
||||
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
|
||||
// Use an stop channel to allow graceful shutdown without dropping audit events
|
||||
// after http server shutdown.
|
||||
auditStopCh := make(chan struct{})
|
||||
|
||||
// Start the audit backend before any request comes in. This means we must call Backend.Run
|
||||
// before http server start serving. Otherwise the Backend.ProcessEvents call might block.
|
||||
if s.AuditBackend != nil {
|
||||
if err := s.AuditBackend.Run(stopCh); err != nil {
|
||||
if err := s.AuditBackend.Run(auditStopCh); err != nil {
|
||||
return fmt.Errorf("failed to run the audit backend: %v", err)
|
||||
}
|
||||
}
|
||||
@ -305,6 +325,8 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
|
||||
go func() {
|
||||
<-stopCh
|
||||
close(internalStopCh)
|
||||
s.HandlerChainWaitGroup.Wait()
|
||||
close(auditStopCh)
|
||||
}()
|
||||
|
||||
s.RunPostStartHooks(stopCh)
|
||||
|
@ -25,6 +25,8 @@ import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
goruntime "runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -44,8 +46,11 @@ import (
|
||||
"k8s.io/apiserver/pkg/authentication/user"
|
||||
"k8s.io/apiserver/pkg/authorization/authorizer"
|
||||
"k8s.io/apiserver/pkg/endpoints/discovery"
|
||||
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
|
||||
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
genericfilters "k8s.io/apiserver/pkg/server/filters"
|
||||
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
@ -509,3 +514,75 @@ func fakeVersion() version.Info {
|
||||
Platform: fmt.Sprintf("%s/%s", goruntime.GOOS, goruntime.GOARCH),
|
||||
}
|
||||
}
|
||||
|
||||
// TestGracefulShutdown verifies server shutdown after request handler finish.
|
||||
func TestGracefulShutdown(t *testing.T) {
|
||||
etcdserver, config, _ := setUp(t)
|
||||
defer etcdserver.Terminate(t)
|
||||
|
||||
var graceShutdown bool
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
|
||||
config.BuildHandlerChainFunc = func(apiHandler http.Handler, c *Config) http.Handler {
|
||||
handler := genericfilters.WithWaitGroup(apiHandler, c.RequestContextMapper, c.LongRunningFunc, c.HandlerChainWaitGroup)
|
||||
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver, c.RequestContextMapper)
|
||||
handler = apirequest.WithRequestContext(handler, c.RequestContextMapper)
|
||||
return handler
|
||||
}
|
||||
|
||||
handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
wg.Done()
|
||||
time.Sleep(2 * time.Second)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
graceShutdown = true
|
||||
})
|
||||
|
||||
s, err := config.Complete(nil).New("test", EmptyDelegate)
|
||||
if err != nil {
|
||||
t.Fatalf("Error in bringing up the server: %v", err)
|
||||
}
|
||||
|
||||
s.Handler.NonGoRestfulMux.Handle("/test", handler)
|
||||
|
||||
insecureServer := &http.Server{
|
||||
Addr: "0.0.0.0:0",
|
||||
Handler: s.Handler,
|
||||
}
|
||||
stopCh := make(chan struct{})
|
||||
serverPort, err := RunServer(insecureServer, "tcp", 10*time.Second, stopCh)
|
||||
if err != nil {
|
||||
t.Errorf("RunServer err: %v", err)
|
||||
}
|
||||
|
||||
graceCh := make(chan struct{})
|
||||
// mock a client request
|
||||
go func() {
|
||||
resp, err := http.Get("http://127.0.0.1:" + strconv.Itoa(serverPort) + "/test")
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected http error: %v", err)
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
t.Errorf("Unexpected http status code: %v", resp.StatusCode)
|
||||
}
|
||||
close(graceCh)
|
||||
}()
|
||||
|
||||
// close stopCh after request sent to server to guarantee request handler is running.
|
||||
wg.Wait()
|
||||
close(stopCh)
|
||||
// wait for wait group handler finish
|
||||
s.HandlerChainWaitGroup.Wait()
|
||||
|
||||
// check server all handlers finished.
|
||||
if !graceShutdown {
|
||||
t.Errorf("server shutdown not gracefully.")
|
||||
}
|
||||
// check client to make sure receive response.
|
||||
select {
|
||||
case <-graceCh:
|
||||
t.Logf("server shutdown gracefully.")
|
||||
case <-time.After(30 * time.Second):
|
||||
t.Errorf("Timed out waiting for response.")
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
@ -84,13 +85,13 @@ func (s *GenericAPIServer) serveSecurely(stopCh <-chan struct{}) error {
|
||||
|
||||
glog.Infof("Serving securely on %s", s.SecureServingInfo.BindAddress)
|
||||
var err error
|
||||
s.effectiveSecurePort, err = RunServer(secureServer, s.SecureServingInfo.BindNetwork, stopCh)
|
||||
s.effectiveSecurePort, err = RunServer(secureServer, s.SecureServingInfo.BindNetwork, s.ShutdownTimeout, stopCh)
|
||||
return err
|
||||
}
|
||||
|
||||
// RunServer listens on the given port, then spawns a go-routine continuously serving
|
||||
// until the stopCh is closed. The port is returned. This function does not block.
|
||||
func RunServer(server *http.Server, network string, stopCh <-chan struct{}) (int, error) {
|
||||
func RunServer(server *http.Server, network string, shutDownTimeout time.Duration, stopCh <-chan struct{}) (int, error) {
|
||||
if len(server.Addr) == 0 {
|
||||
return 0, errors.New("address cannot be empty")
|
||||
}
|
||||
@ -111,10 +112,12 @@ func RunServer(server *http.Server, network string, stopCh <-chan struct{}) (int
|
||||
return 0, fmt.Errorf("invalid listen address: %q", ln.Addr().String())
|
||||
}
|
||||
|
||||
// Stop the server by closing the listener
|
||||
// Shutdown server gracefully.
|
||||
go func() {
|
||||
<-stopCh
|
||||
ln.Close()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shutDownTimeout)
|
||||
server.Shutdown(ctx)
|
||||
cancel()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
|
@ -794,6 +794,10 @@
|
||||
"ImportPath": "k8s.io/apimachinery/pkg/util/wait",
|
||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/apimachinery/pkg/util/waitgroup",
|
||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/apimachinery/pkg/util/yaml",
|
||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
|
@ -790,6 +790,10 @@
|
||||
"ImportPath": "k8s.io/apimachinery/pkg/util/wait",
|
||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/apimachinery/pkg/util/waitgroup",
|
||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/apimachinery/pkg/util/yaml",
|
||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
|
Loading…
Reference in New Issue
Block a user