mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #108033 from tkashem/pre-shutdown-hook
apiserver: stop http server after pre shutdown hooks
This commit is contained in:
commit
33f56203e3
@ -423,7 +423,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
|
|||||||
|
|
||||||
// close socket after delayed stopCh
|
// close socket after delayed stopCh
|
||||||
drainedCh := s.lifecycleSignals.InFlightRequestsDrained
|
drainedCh := s.lifecycleSignals.InFlightRequestsDrained
|
||||||
stopHttpServerCh := delayedStopCh.Signaled()
|
delayedStopOrDrainedCh := delayedStopCh.Signaled()
|
||||||
shutdownTimeout := s.ShutdownTimeout
|
shutdownTimeout := s.ShutdownTimeout
|
||||||
if s.ShutdownSendRetryAfter {
|
if s.ShutdownSendRetryAfter {
|
||||||
// when this mode is enabled, we do the following:
|
// when this mode is enabled, we do the following:
|
||||||
@ -432,11 +432,20 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
|
|||||||
// - once drained, http Server Shutdown is invoked with a timeout of 2s,
|
// - once drained, http Server Shutdown is invoked with a timeout of 2s,
|
||||||
// net/http waits for 1s for the peer to respond to a GO_AWAY frame, so
|
// net/http waits for 1s for the peer to respond to a GO_AWAY frame, so
|
||||||
// we should wait for a minimum of 2s
|
// we should wait for a minimum of 2s
|
||||||
stopHttpServerCh = drainedCh.Signaled()
|
delayedStopOrDrainedCh = drainedCh.Signaled()
|
||||||
shutdownTimeout = 2 * time.Second
|
shutdownTimeout = 2 * time.Second
|
||||||
klog.V(1).InfoS("[graceful-termination] using HTTP Server shutdown timeout", "ShutdownTimeout", shutdownTimeout)
|
klog.V(1).InfoS("[graceful-termination] using HTTP Server shutdown timeout", "ShutdownTimeout", shutdownTimeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// pre-shutdown hooks need to finish before we stop the http server
|
||||||
|
preShutdownHooksHasStoppedCh, stopHttpServerCh := make(chan struct{}), make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(stopHttpServerCh)
|
||||||
|
|
||||||
|
<-delayedStopOrDrainedCh
|
||||||
|
<-preShutdownHooksHasStoppedCh
|
||||||
|
}()
|
||||||
|
|
||||||
stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout)
|
stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -462,8 +471,12 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
|
|||||||
klog.V(1).Info("[graceful-termination] waiting for shutdown to be initiated")
|
klog.V(1).Info("[graceful-termination] waiting for shutdown to be initiated")
|
||||||
<-stopCh
|
<-stopCh
|
||||||
|
|
||||||
// run shutdown hooks directly. This includes deregistering from the kubernetes endpoint in case of kube-apiserver.
|
// run shutdown hooks directly. This includes deregistering from
|
||||||
err = s.RunPreShutdownHooks()
|
// the kubernetes endpoint in case of kube-apiserver.
|
||||||
|
func() {
|
||||||
|
defer close(preShutdownHooksHasStoppedCh)
|
||||||
|
err = s.RunPreShutdownHooks()
|
||||||
|
}()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptrace"
|
"net/http/httptrace"
|
||||||
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
@ -35,11 +36,17 @@ import (
|
|||||||
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
|
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
|
||||||
"k8s.io/apiserver/pkg/server/dynamiccertificates"
|
"k8s.io/apiserver/pkg/server/dynamiccertificates"
|
||||||
genericfilters "k8s.io/apiserver/pkg/server/filters"
|
genericfilters "k8s.io/apiserver/pkg/server/filters"
|
||||||
|
"k8s.io/klog/v2"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
"golang.org/x/net/http2"
|
"golang.org/x/net/http2"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
klog.InitFlags(nil)
|
||||||
|
os.Exit(m.Run())
|
||||||
|
}
|
||||||
|
|
||||||
// doer sends a request to the server
|
// doer sends a request to the server
|
||||||
type doer func(client *http.Client, gci func(httptrace.GotConnInfo), path string, timeout time.Duration) result
|
type doer func(client *http.Client, gci func(httptrace.GotConnInfo), path string, timeout time.Duration) result
|
||||||
|
|
||||||
@ -420,6 +427,63 @@ func TestMuxAndDiscoveryComplete(t *testing.T) {
|
|||||||
t.Fatalf("%s wasn't closed", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
|
t.Fatalf("%s wasn't closed", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPreShutdownHooks(t *testing.T) {
|
||||||
|
s := newGenericAPIServer(t, true)
|
||||||
|
doer := setupDoer(t, s.SecureServingInfo)
|
||||||
|
|
||||||
|
preShutdownHookErrCh := make(chan error)
|
||||||
|
err := s.AddPreShutdownHook("test-backend", func() error {
|
||||||
|
// this pre-shutdown hook waits for the requests in flight to drain
|
||||||
|
// and then send a series of requests to the apiserver, and
|
||||||
|
// we expect these series of requests to be completed successfully
|
||||||
|
<-s.lifecycleSignals.InFlightRequestsDrained.Signaled()
|
||||||
|
|
||||||
|
// we send 5 requests, once every second
|
||||||
|
var r result
|
||||||
|
client := newClient(true)
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
r = doer.Do(client, func(httptrace.GotConnInfo) {}, fmt.Sprintf("/echo?message=attempt-%d", i), 100*time.Millisecond)
|
||||||
|
if r.err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
}
|
||||||
|
preShutdownHookErrCh <- r.err
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to add pre-shutdown hook - %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// start the API server
|
||||||
|
stopCh, runCompletedCh := make(chan struct{}), make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
// this test has an inherent race condition when we wait for two go routines
|
||||||
|
// to finish - the Run method and the pre-shutdown hook, each running in
|
||||||
|
// its own goroutine, give it a second before unblocking the test assert
|
||||||
|
<-time.After(time.Second)
|
||||||
|
close(runCompletedCh)
|
||||||
|
}()
|
||||||
|
s.PrepareRun().Run(stopCh)
|
||||||
|
}()
|
||||||
|
waitForAPIServerStarted(t, doer)
|
||||||
|
|
||||||
|
close(stopCh)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-preShutdownHookErrCh:
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("PreSHutdown hook can not access the API server - %v", err)
|
||||||
|
}
|
||||||
|
case <-runCompletedCh:
|
||||||
|
t.Fatalf("API Server exited without running the PreShutdown hooks")
|
||||||
|
case <-time.After(15 * time.Second):
|
||||||
|
t.Fatalf("test timed out after 15 seconds")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func shouldReuseConnection(t *testing.T) func(httptrace.GotConnInfo) {
|
func shouldReuseConnection(t *testing.T) func(httptrace.GotConnInfo) {
|
||||||
return func(ci httptrace.GotConnInfo) {
|
return func(ci httptrace.GotConnInfo) {
|
||||||
if !ci.Reused {
|
if !ci.Reused {
|
||||||
|
Loading…
Reference in New Issue
Block a user