Merge pull request #103042 from tkashem/refactor-gt

apiserver: refactor graceful termination logic
This commit is contained in:
Kubernetes Prow Robot 2021-06-25 12:12:47 -07:00 committed by GitHub
commit a1d9479d20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 650 additions and 26 deletions

View File

@ -225,6 +225,11 @@ type Config struct {
// RequestWidthEstimator is used to estimate the "width" of the incoming request(s).
RequestWidthEstimator flowcontrolrequest.WidthEstimatorFunc
// terminationSignals provides access to the various shutdown signals
// that happen during the graceful termination of the apiserver.
// it's intentionally marked private as it should never be overridden.
terminationSignals terminationSignals
//===========================================================================
// values below here are targets for removal
//===========================================================================
@ -349,6 +354,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
// Generic API servers have no inherent long-running subresources
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
RequestWidthEstimator: flowcontrolrequest.DefaultWidthEstimator,
terminationSignals: newTerminationSignals(),
APIServerID: id,
StorageVersionManager: storageversion.NewDefaultManager(),
@ -595,7 +601,6 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
healthzChecks: c.HealthzChecks,
livezChecks: c.LivezChecks,
readyzChecks: c.ReadyzChecks,
readinessStopCh: make(chan struct{}),
livezGracePeriod: c.LivezGracePeriod,
DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer),
@ -603,6 +608,8 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
maxRequestBodyBytes: c.MaxRequestBodyBytes,
livezClock: clock.RealClock{},
terminationSignals: c.terminationSignals,
APIServerID: c.APIServerID,
StorageVersionManager: c.StorageVersionManager,

View File

@ -52,7 +52,7 @@ func (s *DeprecatedInsecureServingInfo) Serve(handler http.Handler, shutdownTime
} else {
klog.Infof("Serving insecurely on %s", s.Listener.Addr())
}
_, err := RunServer(insecureServer, s.Listener, shutdownTimeout, stopCh)
_, _, err := RunServer(insecureServer, s.Listener, shutdownTimeout, stopCh)
// NOTE: we do not handle stoppedCh returned by RunServer for graceful termination here
return err
}

View File

@ -174,9 +174,6 @@ type GenericAPIServer struct {
readyzChecksInstalled bool
livezGracePeriod time.Duration
livezClock clock.Clock
// the readiness stop channel is used to signal that the apiserver has initiated a shutdown sequence, this
// will cause readyz to return unhealthy.
readinessStopCh chan struct{}
// auditing. The backend is started after the server starts listening.
AuditBackend audit.Backend
@ -213,6 +210,10 @@ type GenericAPIServer struct {
// Version will enable the /version endpoint if non-nil
Version *version.Info
// terminationSignals provides access to the various termination
// signals that happen during the shutdown period of the apiserver.
terminationSignals terminationSignals
}
// DelegationTarget is an interface which allows for composition of API servers with top level handling that works
@ -307,7 +308,10 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
s.installHealthz()
s.installLivez()
err := s.addReadyzShutdownCheck(s.readinessStopCh)
// as soon as shutdown is initiated, readiness should start failing
readinessStopCh := s.terminationSignals.ShutdownInitiated.Signaled()
err := s.addReadyzShutdownCheck(readinessStopCh)
if err != nil {
klog.Errorf("Failed to install readyz shutdown check %s", err)
}
@ -330,38 +334,45 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
// Run spawns the secure http server. It only returns if stopCh is closed
// or the secure port cannot be listened on initially.
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
delayedStopCh := make(chan struct{})
delayedStopCh := s.terminationSignals.AfterShutdownDelayDuration
shutdownInitiatedCh := s.terminationSignals.ShutdownInitiated
go func() {
defer close(delayedStopCh)
defer delayedStopCh.Signal()
<-stopCh
// As soon as shutdown is initiated, /readyz should start returning failure.
// This gives the load balancer a window defined by ShutdownDelayDuration to detect that /readyz is red
// and stop sending traffic to this server.
close(s.readinessStopCh)
shutdownInitiatedCh.Signal()
time.Sleep(s.ShutdownDelayDuration)
}()
// close socket after delayed stopCh
stoppedCh, err := s.NonBlockingRun(delayedStopCh)
stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(delayedStopCh.Signaled())
if err != nil {
return err
}
drainedCh := make(chan struct{})
httpServerStoppedListeningCh := s.terminationSignals.HTTPServerStoppedListening
go func() {
defer close(drainedCh)
<-listenerStoppedCh
httpServerStoppedListeningCh.Signal()
}()
drainedCh := s.terminationSignals.InFlightRequestsDrained
go func() {
defer drainedCh.Signal()
// wait for the delayed stopCh before closing the handler chain (it rejects everything after Wait has been called).
<-delayedStopCh
<-delayedStopCh.Signaled()
// Wait for all requests to finish, which are bounded by the RequestTimeout variable.
s.HandlerChainWaitGroup.Wait()
}()
klog.V(1).Info("[graceful-termination] waiting for shutdown to be initiated")
<-stopCh
// run shutdown hooks directly. This includes deregistering from the kubernetes endpoint in case of kube-apiserver.
@ -369,19 +380,21 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
if err != nil {
return err
}
klog.V(1).Info("[graceful-termination] RunPreShutdownHooks has completed")
// Wait for all requests in flight to drain, bounded by the RequestTimeout variable.
<-drainedCh
<-drainedCh.Signaled()
// wait for stoppedCh that is closed when the graceful termination (server.Shutdown) is finished.
<-stoppedCh
klog.V(1).Info("[graceful-termination] apiserver is exiting")
return nil
}
// NonBlockingRun spawns the secure http server. An error is
// returned if the secure port cannot be listened on.
// The returned channel is closed when the (asynchronous) termination is finished.
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan struct{}, error) {
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan struct{}, <-chan struct{}, error) {
// Use an stop channel to allow graceful shutdown without dropping audit events
// after http server shutdown.
auditStopCh := make(chan struct{})
@ -390,20 +403,22 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan
// before http server start serving. Otherwise the Backend.ProcessEvents call might block.
if s.AuditBackend != nil {
if err := s.AuditBackend.Run(auditStopCh); err != nil {
return nil, fmt.Errorf("failed to run the audit backend: %v", err)
return nil, nil, fmt.Errorf("failed to run the audit backend: %v", err)
}
}
// Use an internal stop channel to allow cleanup of the listeners on error.
internalStopCh := make(chan struct{})
var stoppedCh <-chan struct{}
var listenerStoppedCh <-chan struct{}
if s.SecureServingInfo != nil && s.Handler != nil {
var err error
stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh)
klog.V(1).Infof("[graceful-termination] ShutdownTimeout=%s", s.ShutdownTimeout)
stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.ServeWithListenerStopped(s.Handler, s.ShutdownTimeout, internalStopCh)
if err != nil {
close(internalStopCh)
close(auditStopCh)
return nil, err
return nil, nil, err
}
}
@ -426,7 +441,7 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan
klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
}
return stoppedCh, nil
return stoppedCh, listenerStoppedCh, nil
}
// installAPIResources is a private method for installing the REST storage backing each api groupversionresource

View File

@ -0,0 +1,404 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"net/http/httptrace"
"reflect"
"sync"
"testing"
"time"
"k8s.io/apimachinery/pkg/util/wait"
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
genericfilters "k8s.io/apiserver/pkg/server/filters"
"github.com/google/go-cmp/cmp"
"golang.org/x/net/http2"
)
// doer sends a request to the server
type doer func(client *http.Client, gci func(httptrace.GotConnInfo), path string, timeout time.Duration) result
func (d doer) Do(client *http.Client, gci func(httptrace.GotConnInfo), path string, timeout time.Duration) result {
return d(client, gci, path, timeout)
}
type result struct {
err error
response *http.Response
}
// wrap a terminationSignal so the test can inject its own callback
type wrappedTerminationSignal struct {
terminationSignal
callback func(bool, string, terminationSignal)
}
func (w *wrappedTerminationSignal) Signal() {
var name string
if ncw, ok := w.terminationSignal.(*namedChannelWrapper); ok {
name = ncw.name
}
// the callback is invoked before and after the termination event is signaled
if w.callback != nil {
w.callback(true, name, w.terminationSignal)
}
w.terminationSignal.Signal()
if w.callback != nil {
w.callback(false, name, w.terminationSignal)
}
}
func wrapTerminationSignals(t *testing.T, ts *terminationSignals, callback func(bool, string, terminationSignal)) {
newWrappedTerminationSignal := func(delegated terminationSignal) terminationSignal {
return &wrappedTerminationSignal{
terminationSignal: delegated,
callback: callback,
}
}
ts.AfterShutdownDelayDuration = newWrappedTerminationSignal(ts.AfterShutdownDelayDuration)
ts.HTTPServerStoppedListening = newWrappedTerminationSignal(ts.HTTPServerStoppedListening)
ts.InFlightRequestsDrained = newWrappedTerminationSignal(ts.InFlightRequestsDrained)
ts.ShutdownInitiated = newWrappedTerminationSignal(ts.ShutdownInitiated)
}
type step struct {
waitCh, doneCh chan struct{}
fn func()
}
func (s step) done() <-chan struct{} {
close(s.waitCh)
return s.doneCh
}
func (s step) execute() {
defer close(s.doneCh)
<-s.waitCh
s.fn()
}
func newStep(fn func()) *step {
return &step{
fn: fn,
waitCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
}
func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t *testing.T) {
s := newGenericAPIServer(t)
// record the termination events in the order they are signaled
var signalOrderLock sync.Mutex
signalOrderGot := make([]string, 0)
recordOrderFn := func(before bool, name string, e terminationSignal) {
if !before {
return
}
signalOrderLock.Lock()
defer signalOrderLock.Unlock()
signalOrderGot = append(signalOrderGot, name)
}
// handler for a request that we want to keep in flight through to the end
inFlightRequestBlockedCh, inFlightStartedCh := make(chan result), make(chan struct{})
inFlightRequest := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
close(inFlightStartedCh)
// this request handler blocks until we deliberately unblock it.
<-inFlightRequestBlockedCh
w.WriteHeader(http.StatusOK)
})
s.Handler.NonGoRestfulMux.Handle("/in-flight-request-as-designed", inFlightRequest)
connReusingClient := newClient(false)
doer := setupDoer(t, s.SecureServingInfo)
var delayedStopVerificationStepExecuted bool
delayedStopVerificationStep := newStep(func() {
delayedStopVerificationStepExecuted = true
t.Log("Before ShutdownDelayDuration elapses new request(s) should be served")
resultGot := doer.Do(connReusingClient, shouldReuseConnection(t), "/echo?message=request-on-an-existing-connection-should-succeed", time.Second)
requestMustSucceed(t, resultGot)
resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-succeed", time.Second)
requestMustSucceed(t, resultGot)
})
steps := func(before bool, name string, e terminationSignal) {
// Before AfterShutdownDelayDuration event is signaled, the test
// will send request(s) to assert on expected behavior.
if name == "AfterShutdownDelayDuration" && before {
// it unblocks the verification step and waits for it to complete
<-delayedStopVerificationStep.done()
}
}
// wrap the termination signals of the GenericAPIServer so the test can inject its own callback
wrapTerminationSignals(t, &s.terminationSignals, func(before bool, name string, e terminationSignal) {
recordOrderFn(before, name, e)
steps(before, name, e)
})
// start the API server
stopCh, runCompletedCh := make(chan struct{}), make(chan struct{})
go func() {
defer close(runCompletedCh)
s.PrepareRun().Run(stopCh)
}()
waitForAPIServerStarted(t, doer)
// step 1: fire a request that we want to keep in-flight through to the end
inFlightResultCh := make(chan result)
go func() {
resultGot := doer.Do(connReusingClient, func(httptrace.GotConnInfo) {}, "/in-flight-request-as-designed", 0)
inFlightResultCh <- resultGot
}()
select {
case <-inFlightStartedCh:
case <-time.After(5 * time.Second):
t.Fatalf("Waited for 5s for the in-flight request to reach the server")
}
// step 2: signal termination event: initiate a shutdown
close(stopCh)
// step 3: before ShutdownDelayDuration elapses new request(s) should be served successfully.
delayedStopVerificationStep.execute()
if !delayedStopVerificationStepExecuted {
t.Fatal("Expected the AfterShutdownDelayDuration verification step to execute")
}
// step 4: wait for the HTTP Server listener to have stopped
httpServerStoppedListeningCh := s.terminationSignals.HTTPServerStoppedListening
select {
case <-httpServerStoppedListeningCh.Signaled():
case <-time.After(5 * time.Second):
t.Fatal("Expected the server to signal HTTPServerStoppedListening event")
}
// step 5: the server has stopped listening but we still have a request
// in flight, let it unblock and we expect the request to succeed.
close(inFlightRequestBlockedCh)
var inFlightResultGot result
select {
case inFlightResultGot = <-inFlightResultCh:
case <-time.After(5 * time.Second):
t.Fatal("Expected the server to send a response")
}
requestMustSucceed(t, inFlightResultGot)
t.Log("Waiting for the apiserver Run method to return")
select {
case <-runCompletedCh:
case <-time.After(5 * time.Second):
t.Fatal("Expected the apiserver Run method to return")
}
terminationSignalOrderExpected := []string{
string("ShutdownInitiated"),
string("AfterShutdownDelayDuration"),
string("HTTPServerStoppedListening"),
string("InFlightRequestsDrained"),
}
func() {
signalOrderLock.Lock()
defer signalOrderLock.Unlock()
if !reflect.DeepEqual(terminationSignalOrderExpected, signalOrderGot) {
t.Errorf("Expected order of termination event signal to match, diff: %s", cmp.Diff(terminationSignalOrderExpected, signalOrderGot))
}
}()
}
func shouldReuseConnection(t *testing.T) func(httptrace.GotConnInfo) {
return func(ci httptrace.GotConnInfo) {
if !ci.Reused {
t.Errorf("Expected the request to use an existing TCP connection, but got: %+v", ci)
}
}
}
func shouldUseNewConnection(t *testing.T) func(httptrace.GotConnInfo) {
return func(ci httptrace.GotConnInfo) {
if ci.Reused {
t.Errorf("Expected the request to use a new TCP connection, but got: %+v", ci)
}
}
}
func requestMustSucceed(t *testing.T, resultGot result) {
if resultGot.err != nil {
t.Errorf("Expected no error, but got: %v", resultGot.err)
return
}
if resultGot.response.StatusCode != http.StatusOK {
t.Errorf("Expected Status Code: %d, but got: %d", http.StatusOK, resultGot.response.StatusCode)
}
}
func waitForAPIServerStarted(t *testing.T, doer doer) {
client := newClient(true)
i := 1
err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) {
result := doer.Do(client, func(httptrace.GotConnInfo) {}, fmt.Sprintf("/echo?message=attempt-%d", i), 100*time.Millisecond)
i++
if result.err != nil {
t.Logf("Still waiting for the server to start - err: %v", err)
return false, nil
}
if result.response.StatusCode != http.StatusOK {
t.Logf("Still waiting for the server to start - expecting: %d, but got: %v", http.StatusOK, result.response)
return false, nil
}
t.Log("The API server has started")
return true, nil
})
if err != nil {
t.Fatalf("The server has failed to start - err: %v", err)
}
}
func setupDoer(t *testing.T, info *SecureServingInfo) doer {
_, port, err := info.HostPort()
if err != nil {
t.Fatalf("Expected host, port from SecureServingInfo, but got: %v", err)
}
return func(client *http.Client, callback func(httptrace.GotConnInfo), path string, timeout time.Duration) result {
url := fmt.Sprintf("https://%s:%d%s", "127.0.0.1", port, path)
t.Logf("Sending request - timeout: %s, url: %s", timeout, url)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return result{response: nil, err: err}
}
// setup request timeout
var ctx context.Context
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(req.Context(), timeout)
defer cancel()
req = req.WithContext(ctx)
}
// setup trace
trace := &httptrace.ClientTrace{
GotConn: func(connInfo httptrace.GotConnInfo) {
callback(connInfo)
},
}
req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace))
response, err := client.Do(req)
// in this test, we don't depend on the body of the response, so we can
// close the Body here to ensure the underlying transport can be reused
if response != nil {
ioutil.ReadAll(response.Body)
response.Body.Close()
}
return result{
err: err,
response: response,
}
}
}
func newClient(useNewConnection bool) *http.Client {
clientCACertPool := x509.NewCertPool()
clientCACertPool.AppendCertsFromPEM(backendCrt)
tlsConfig := &tls.Config{
RootCAs: clientCACertPool,
NextProtos: []string{http2.NextProtoTLS},
}
tr := &http.Transport{
TLSClientConfig: tlsConfig,
DisableKeepAlives: useNewConnection,
}
if err := http2.ConfigureTransport(tr); err != nil {
log.Fatalf("Failed to configure HTTP2 transport: %v", err)
}
return &http.Client{
Timeout: 0,
Transport: tr,
}
}
func newGenericAPIServer(t *testing.T) *GenericAPIServer {
config, _ := setUp(t)
config.ShutdownDelayDuration = 100 * time.Millisecond
config.BuildHandlerChainFunc = func(apiHandler http.Handler, c *Config) http.Handler {
handler := genericfilters.WithWaitGroup(apiHandler, c.LongRunningFunc, c.HandlerChainWaitGroup)
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
return handler
}
s, err := config.Complete(nil).New("test", NewEmptyDelegate())
if err != nil {
t.Fatalf("Error in bringing up the server: %v", err)
}
ln, err := net.Listen("tcp", "0.0.0.0:0")
if err != nil {
t.Fatalf("failed to listen on %v: %v", "0.0.0.0:0", err)
}
s.SecureServingInfo = &SecureServingInfo{}
s.SecureServingInfo.Listener = &wrappedListener{ln, t}
cert, err := dynamiccertificates.NewStaticCertKeyContent("serving-cert", backendCrt, backendKey)
if err != nil {
t.Fatalf("failed to load cert - %v", err)
}
s.SecureServingInfo.Cert = cert
// we use this handler to send a test request to the server.
s.Handler.NonGoRestfulMux.Handle("/echo", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
t.Logf("[server] received a request, proto: %s, url: %s", req.Proto, req.RequestURI)
w.Header().Add("echo", req.URL.Query().Get("message"))
w.WriteHeader(http.StatusOK)
}))
return s
}
type wrappedListener struct {
net.Listener
t *testing.T
}
func (ln wrappedListener) Accept() (net.Conn, error) {
c, err := ln.Listener.Accept()
if tc, ok := c.(*net.TCPConn); ok {
ln.t.Logf("[server] seen new connection: %#v", tc)
}
return c, err
}

View File

@ -590,7 +590,7 @@ func TestGracefulShutdown(t *testing.T) {
// get port
serverPort := ln.Addr().(*net.TCPAddr).Port
stoppedCh, err := RunServer(insecureServer, ln, 10*time.Second, stopCh)
stoppedCh, _, err := RunServer(insecureServer, ln, 10*time.Second, stopCh)
if err != nil {
t.Fatalf("RunServer err: %v", err)
}

View File

@ -0,0 +1,136 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"k8s.io/klog/v2"
)
/*
We make an attempt here to identify the events that take place during
the graceful shutdown of the apiserver.
We also identify each event with a name so we can refer to it.
Events:
- ShutdownInitiated: KILL signal received
- AfterShutdownDelayDuration: shutdown delay duration has passed
- InFlightRequestsDrained: all in flight request(s) have been drained
The following is a sequence of shutdown events that we expect to see during termination:
T0: ShutdownInitiated: KILL signal received
- /readyz starts returning red
- run pre shutdown hooks
T0+70s: AfterShutdownDelayDuration: shutdown delay duration has passed
- the default value of 'ShutdownDelayDuration' is '70s'
- it's time to initiate shutdown of the HTTP Server, server.Shutdown is invoked
- as a consequene, the Close function has is called for all listeners
- the HTTP Server stops listening immediately
- any new request arriving on a new TCP socket is denied with
a network error similar to 'connection refused'
- the HTTP Server waits gracefully for existing requests to complete
up to '60s' (dictated by ShutdownTimeout)
- active long running requests will receive a GOAWAY.
T0+70s: HTTPServerStoppedListening:
- this event is signaled when the HTTP Server has stopped listening
which is immediately after server.Shutdown has been invoked
T0 + 70s + up-to 60s: InFlightRequestsDrained: existing in flight requests have been drained
- long running requests are outside of this scope
- up-to 60s: the default value of 'ShutdownTimeout' is 60s, this means that
any request in flight has a hard timeout of 60s.
- it's time to call 'Shutdown' on the audit events since all
in flight request(s) have drained.
*/
// terminationSignal encapsulates a named apiserver termination event
type terminationSignal interface {
// Signal signals the event, indicating that the event has occurred.
// Signal is idempotent, once signaled the event stays signaled and
// it immediately unblocks any goroutine waiting for this event.
Signal()
// Signaled returns a channel that is closed when the underlying termination
// event has been signaled. Successive calls to Signaled return the same value.
Signaled() <-chan struct{}
}
// terminationSignals provides an abstraction of the termination events that
// transpire during the shutdown period of the apiserver. This abstraction makes it easy
// for us to write unit tests that can verify expected graceful termination behavior.
//
// GenericAPIServer can use these to either:
// - signal that a particular termination event has transpired
// - wait for a designated termination event to transpire and do some action.
type terminationSignals struct {
// ShutdownInitiated event is signaled when an apiserver shutdown has been initiated.
// It is signaled when the `stopCh` provided by the main goroutine
// receives a KILL signal and is closed as a consequence.
ShutdownInitiated terminationSignal
// AfterShutdownDelayDuration event is signaled as soon as ShutdownDelayDuration
// has elapsed since the ShutdownInitiated event.
// ShutdownDelayDuration allows the apiserver to delay shutdown for some time.
AfterShutdownDelayDuration terminationSignal
// InFlightRequestsDrained event is signaled when the existing requests
// in flight have completed. This is used as signal to shut down the audit backends
InFlightRequestsDrained terminationSignal
// HTTPServerStoppedListening termination event is signaled when the
// HTTP Server has stopped listening to the underlying socket.
HTTPServerStoppedListening terminationSignal
}
// newTerminationSignals returns an instance of terminationSignals interface to be used
// to coordinate graceful termination of the apiserver
func newTerminationSignals() terminationSignals {
return terminationSignals{
ShutdownInitiated: newNamedChannelWrapper("ShutdownInitiated"),
AfterShutdownDelayDuration: newNamedChannelWrapper("AfterShutdownDelayDuration"),
InFlightRequestsDrained: newNamedChannelWrapper("InFlightRequestsDrained"),
HTTPServerStoppedListening: newNamedChannelWrapper("HTTPServerStoppedListening"),
}
}
func newNamedChannelWrapper(name string) terminationSignal {
return &namedChannelWrapper{
name: name,
ch: make(chan struct{}),
}
}
type namedChannelWrapper struct {
name string
ch chan struct{}
}
func (e *namedChannelWrapper) Signal() {
select {
case <-e.ch:
// already closed, don't close again.
default:
close(e.ch)
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", e.name)
}
}
func (e *namedChannelWrapper) Signaled() <-chan struct{} {
return e.ch
}

View File

@ -192,6 +192,67 @@ func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Dur
tlsErrorLogger := log.New(tlsErrorWriter, "", 0)
secureServer.ErrorLog = tlsErrorLogger
klog.Infof("Serving securely on %s", secureServer.Addr)
stoppedCh, _, err := RunServer(secureServer, s.Listener, shutdownTimeout, stopCh)
return stoppedCh, err
}
// ServeWithListenerStopped runs the secure http server. It fails only if certificates cannot be loaded or the initial listen call fails.
// The actual server loop (stoppable by closing stopCh) runs in a go routine, i.e. ServeWithListenerStopped does not block.
// It returns a stoppedCh that is closed when all non-hijacked active requests have been processed.
// It returns a listenerStoppedCh that is closed when the underlying http Server has stopped listening.
// TODO: do a follow up PR to remove this function, change 'Serve' to return listenerStoppedCh
// and update all components that call 'Serve'
func (s *SecureServingInfo) ServeWithListenerStopped(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, <-chan struct{}, error) {
if s.Listener == nil {
return nil, nil, fmt.Errorf("listener must not be nil")
}
tlsConfig, err := s.tlsConfig(stopCh)
if err != nil {
return nil, nil, err
}
secureServer := &http.Server{
Addr: s.Listener.Addr().String(),
Handler: handler,
MaxHeaderBytes: 1 << 20,
TLSConfig: tlsConfig,
}
// At least 99% of serialized resources in surveyed clusters were smaller than 256kb.
// This should be big enough to accommodate most API POST requests in a single frame,
// and small enough to allow a per connection buffer of this size multiplied by `MaxConcurrentStreams`.
const resourceBody99Percentile = 256 * 1024
http2Options := &http2.Server{}
// shrink the per-stream buffer and max framesize from the 1MB default while still accommodating most API POST requests in a single frame
http2Options.MaxUploadBufferPerStream = resourceBody99Percentile
http2Options.MaxReadFrameSize = resourceBody99Percentile
// use the overridden concurrent streams setting or make the default of 250 explicit so we can size MaxUploadBufferPerConnection appropriately
if s.HTTP2MaxStreamsPerConnection > 0 {
http2Options.MaxConcurrentStreams = uint32(s.HTTP2MaxStreamsPerConnection)
} else {
http2Options.MaxConcurrentStreams = 250
}
// increase the connection buffer size from the 1MB default to handle the specified number of concurrent streams
http2Options.MaxUploadBufferPerConnection = http2Options.MaxUploadBufferPerStream * int32(http2Options.MaxConcurrentStreams)
if !s.DisableHTTP2 {
// apply settings to the server
if err := http2.ConfigureServer(secureServer, http2Options); err != nil {
return nil, nil, fmt.Errorf("error configuring http2: %v", err)
}
}
// use tlsHandshakeErrorWriter to handle messages of tls handshake error
tlsErrorWriter := &tlsHandshakeErrorWriter{os.Stderr}
tlsErrorLogger := log.New(tlsErrorWriter, "", 0)
secureServer.ErrorLog = tlsErrorLogger
klog.Infof("Serving securely on %s", secureServer.Addr)
return RunServer(secureServer, s.Listener, shutdownTimeout, stopCh)
}
@ -207,15 +268,15 @@ func RunServer(
ln net.Listener,
shutDownTimeout time.Duration,
stopCh <-chan struct{},
) (<-chan struct{}, error) {
) (<-chan struct{}, <-chan struct{}, error) {
if ln == nil {
return nil, fmt.Errorf("listener must not be nil")
return nil, nil, fmt.Errorf("listener must not be nil")
}
// Shutdown server gracefully.
stoppedCh := make(chan struct{})
serverShutdownCh, listenerStoppedCh := make(chan struct{}), make(chan struct{})
go func() {
defer close(stoppedCh)
defer close(serverShutdownCh)
<-stopCh
ctx, cancel := context.WithTimeout(context.Background(), shutDownTimeout)
server.Shutdown(ctx)
@ -224,6 +285,7 @@ func RunServer(
go func() {
defer utilruntime.HandleCrash()
defer close(listenerStoppedCh)
var listener net.Listener
listener = tcpKeepAliveListener{ln}
@ -242,7 +304,7 @@ func RunServer(
}
}()
return stoppedCh, nil
return serverShutdownCh, listenerStoppedCh, nil
}
// tcpKeepAliveListener sets TCP keep-alive timeouts on accepted