Clean shutdown of test apiserver

This commit is contained in:
Wojciech Tyczyński 2022-05-12 12:10:02 +02:00
parent e0dbea2443
commit 6f706775bc
10 changed files with 111 additions and 67 deletions

View File

@ -44,6 +44,8 @@ import (
"k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
testutil "k8s.io/kubernetes/test/utils"
"k8s.io/klog/v2"
)
// This key is for testing purposes only and is not considered secure.
@ -99,14 +101,27 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
instanceOptions = NewDefaultTestServerOptions()
}
result.TmpDir, err = os.MkdirTemp("", "kubernetes-kube-apiserver")
if err != nil {
return result, fmt.Errorf("failed to create temp dir: %v", err)
}
stopCh := make(chan struct{})
var errCh chan error
tearDown := func() {
// Closing stopCh is stopping apiserver and cleaning up
// after itself, including shutting down its storage layer.
close(stopCh)
if len(result.TmpDir) != 0 {
os.RemoveAll(result.TmpDir)
// If the apiserver was started, let's wait for it to
// shutdown clearly.
if errCh != nil {
err, ok := <-errCh
if ok && err != nil {
klog.Errorf("Failed to shutdown test server clearly: %v", err)
}
}
os.RemoveAll(result.TmpDir)
}
defer func() {
if result.TearDownFn == nil {
@ -114,11 +129,6 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
}
}()
result.TmpDir, err = os.MkdirTemp("", "kubernetes-kube-apiserver")
if err != nil {
return result, fmt.Errorf("failed to create temp dir: %v", err)
}
fs := pflag.NewFlagSet("test", pflag.PanicOnError)
s := options.NewServerRunOptions()
@ -209,8 +219,9 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
server.GenericAPIServer.StorageVersionManager = instanceOptions.StorageVersionWrapFunc(server.GenericAPIServer.StorageVersionManager)
}
errCh := make(chan error)
errCh = make(chan error)
go func(stopCh <-chan struct{}) {
defer close(errCh)
prepared, err := server.PrepareRun()
if err != nil {
errCh <- err
@ -302,7 +313,10 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
result.ClientConfig.QPS = 1000
result.ClientConfig.Burst = 10000
result.ServerOpts = s
result.TearDownFn = tearDown
result.TearDownFn = func() {
tearDown()
etcdClient.Close()
}
result.EtcdClient = etcdClient
result.EtcdStoragePrefix = storageConfig.Prefix

View File

@ -135,9 +135,6 @@ func TestClientCARecreate(t *testing.T) {
}
func testClientCA(t *testing.T, recreate bool) {
stopCh := make(chan struct{})
defer close(stopCh)
frontProxyCA, err := newTestCAWithClient(
pkix.Name{
CommonName: "test-front-proxy-ca",
@ -173,7 +170,7 @@ func testClientCA(t *testing.T, recreate bool) {
clientCAFilename := ""
frontProxyCAFilename := ""
kubeClient, kubeconfig := framework.StartTestServer(t, stopCh, framework.TestServerSetup{
kubeClient, kubeconfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.GenericServerRunOptions.MaxRequestBodyBytes = 1024 * 1024
clientCAFilename = opts.Authentication.ClientCert.ClientCA
@ -181,6 +178,7 @@ func testClientCA(t *testing.T, recreate bool) {
opts.Authentication.RequestHeader.AllowedNames = append(opts.Authentication.RequestHeader.AllowedNames, "test-aggregated-apiserver")
},
})
defer tearDownFn()
// wait for request header info
err = wait.PollImmediate(100*time.Millisecond, 30*time.Second, waitForConfigMapCAContent(t, kubeClient, "requestheader-client-ca-file", "-----BEGIN CERTIFICATE-----", 1))
@ -470,17 +468,15 @@ func TestServingCertRecreate(t *testing.T) {
}
func testServingCert(t *testing.T, recreate bool) {
stopCh := make(chan struct{})
defer close(stopCh)
var servingCertPath string
_, kubeconfig := framework.StartTestServer(t, stopCh, framework.TestServerSetup{
_, kubeconfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.GenericServerRunOptions.MaxRequestBodyBytes = 1024 * 1024
servingCertPath = opts.SecureServing.ServerCert.CertDirectory
},
})
defer tearDownFn()
if recreate {
if err := os.Remove(path.Join(servingCertPath, "apiserver.key")); err != nil {
@ -511,12 +507,9 @@ func testServingCert(t *testing.T, recreate bool) {
}
func TestSNICert(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
var servingCertPath string
_, kubeconfig := framework.StartTestServer(t, stopCh, framework.TestServerSetup{
_, kubeconfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.GenericServerRunOptions.MaxRequestBodyBytes = 1024 * 1024
servingCertPath = opts.SecureServing.ServerCert.CertDirectory
@ -535,6 +528,7 @@ func TestSNICert(t *testing.T) {
}}
},
})
defer tearDownFn()
// When we run this the second time, we know which one we are expecting.
_, actualCerts, err := cert.GetServingCertificatesForURL(kubeconfig.Host, "foo")

View File

@ -32,13 +32,12 @@ import (
// Tests that the apiserver limits the number of operations in a json patch.
func TestMaxJSONPatchOperations(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
clientSet, _ := framework.StartTestServer(t, stopCh, framework.TestServerSetup{
clientSet, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.GenericServerRunOptions.MaxRequestBodyBytes = 1024 * 1024
},
})
defer tearDownFn()
p := `{"op":"add","path":"/x","value":"y"}`
// maxJSONPatchOperations = 10000

View File

@ -30,9 +30,8 @@ import (
// Tests that the apiserver limits the resource size in write operations.
func TestMaxResourceSize(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
clientSet, _ := framework.StartTestServer(t, stopCh, framework.TestServerSetup{})
clientSet, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{})
defer tearDownFn()
hugeData := []byte(strings.Repeat("x", 3*1024*1024+1))

View File

@ -33,9 +33,7 @@ import (
)
func TestInsecurePodLogs(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
clientSet, _ := framework.StartTestServer(t, stopCh, framework.TestServerSetup{
clientSet, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.GenericServerRunOptions.MaxRequestBodyBytes = 1024 * 1024
// I have no idea what this cert is, but it doesn't matter, we just want something that always fails validation
@ -63,6 +61,7 @@ Bgqc+dJN9xS9Ah5gLiGQJ6C4niUA11piCpvMsy+j/LQ1Erx47KMar5fuMXYk7iPq
`)
},
})
defer tearDownFn()
fakeKubeletServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Write([]byte("fake-log"))

View File

@ -53,10 +53,7 @@ func TestDynamicClientBuilder(t *testing.T) {
t.Fatalf("parse duration failed: %v", err)
}
stopCh := make(chan struct{})
defer close(stopCh)
baseClient, baseConfig := framework.StartTestServer(t, stopCh, framework.TestServerSetup{
baseClient, baseConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.ServiceAccountSigningKeyFile = tmpfile.Name()
opts.ServiceAccountTokenMaxExpiration = maxExpirationDuration
@ -75,6 +72,7 @@ func TestDynamicClientBuilder(t *testing.T) {
config.GenericConfig.Authorization.Authorizer = authorizerfactory.NewAlwaysAllowAuthorizer()
},
})
defer tearDownFn()
// We want to test if the token rotation works fine here.
// To minimize the time this test would consume, we use the minimial token expiration.

View File

@ -70,7 +70,14 @@ func TestGracefulShutdown(t *testing.T) {
resp.Body.Close()
t.Logf("shutting down server")
tearDownOnce.Do(server.TearDownFn)
// We tear it down in the background to ensure that
// pending requests should work fine.
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
tearDownOnce.Do(server.TearDownFn)
}()
t.Logf("server should fail new requests")
if err := wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, func() (done bool, err error) {
@ -100,6 +107,8 @@ func TestGracefulShutdown(t *testing.T) {
t.Fatal(err)
}
t.Logf("response: code %d, body: %s", respErr.resp.StatusCode, string(bs))
wg.Wait()
}
type responseErrorPair struct {

View File

@ -27,7 +27,6 @@ import (
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
@ -118,24 +117,11 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu
t.Fatal(err)
}
// get a leased session
session, err := concurrency.NewSession(rawClient)
if err != nil {
t.Fatal(err)
}
// then build and use an etcd lock
// this prevents more than one of these api servers from running at the same time
lock := concurrency.NewLocker(session, "kube_integration_etcd_raw")
lock.Lock()
// make sure we start with a clean slate
if _, err := kvClient.Delete(context.Background(), "/registry/", clientv3.WithPrefix()); err != nil {
t.Fatal(err)
}
stopCh := make(chan struct{})
kubeAPIServer, err := app.CreateServerChain(completedOptions)
if err != nil {
t.Fatal(err)
@ -152,6 +138,8 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu
kubeClient := clientset.NewForConfigOrDie(kubeClientConfig)
stopCh := make(chan struct{})
errCh := make(chan error)
go func() {
// Catch panics that occur in this go routine so we get a comprehensible failure
defer func() {
@ -159,19 +147,29 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu
t.Errorf("Unexpected panic trying to start API server: %#v", err)
}
}()
defer close(errCh)
prepared, err := kubeAPIServer.PrepareRun()
if err != nil {
t.Error(err)
errCh <- err
return
}
if err := prepared.Run(stopCh); err != nil {
errCh <- err
t.Error(err)
return
}
}()
lastHealth := ""
attempt := 0
if err := wait.PollImmediate(time.Second, time.Minute, func() (done bool, err error) {
select {
case err := <-errCh:
return false, err
default:
}
// wait for the server to be healthy
result := kubeClient.RESTClient().Get().AbsPath("/healthz").Do(context.TODO())
content, _ := result.Raw()
@ -207,12 +205,18 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu
}
cleanup := func() {
if err := os.RemoveAll(certDir); err != nil {
t.Log(err)
}
// Closing stopCh is stopping apiserver and cleaning up
// after itself, including shutting down its storage layer.
close(stopCh)
lock.Unlock()
if err := session.Close(); err != nil {
// If the apiserver was started, let's wait for it to
// shutdown clearly.
err, ok := <-errCh
if ok && err != nil {
t.Error(err)
}
rawClient.Close()
if err := os.RemoveAll(certDir); err != nil {
t.Log(err)
}
}

View File

@ -37,14 +37,11 @@ import (
)
func TestWebhookLoopback(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
webhookPath := "/webhook-test"
called := int32(0)
client, _ := framework.StartTestServer(t, stopCh, framework.TestServerSetup{
client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
},
ModifyServerConfig: func(config *controlplane.Config) {
@ -66,6 +63,7 @@ func TestWebhookLoopback(t *testing.T) {
})
},
})
defer tearDownFn()
fail := admissionregistrationv1.Fail
noSideEffects := admissionregistrationv1.SideEffectClassNone

View File

@ -55,13 +55,34 @@ type TestServerSetup struct {
ModifyServerConfig func(*controlplane.Config)
}
type TearDownFunc func()
// StartTestServer runs a kube-apiserver, optionally calling out to the setup.ModifyServerRunOptions and setup.ModifyServerConfig functions
func StartTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup) (client.Interface, *rest.Config) {
certDir, _ := os.MkdirTemp("", "test-integration-"+t.Name())
go func() {
<-stopCh
os.RemoveAll(certDir)
}()
func StartTestServer(t *testing.T, setup TestServerSetup) (client.Interface, *rest.Config, TearDownFunc) {
certDir, err := os.MkdirTemp("", "test-integration-"+t.Name())
if err != nil {
t.Fatalf("Couldn't create temp dir: %v", err)
}
stopCh := make(chan struct{})
var errCh chan error
tearDownFn := func() {
// Closing stopCh is stopping apiserver and cleaning up
// after itself, including shutting down its storage layer.
close(stopCh)
// If the apiserver was started, let's wait for it to
// shutdown clearly.
if errCh != nil {
err, ok := <-errCh
if ok && err != nil {
t.Error(err)
}
}
if err := os.RemoveAll(certDir); err != nil {
t.Log(err)
}
}
_, defaultServiceClusterIPRange, _ := netutils.ParseCIDRSloppy("10.0.0.0/24")
proxySigningKey, err := utils.NewPrivateKey()
@ -147,9 +168,12 @@ func StartTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup
if err != nil {
t.Fatal(err)
}
errCh = make(chan error)
go func() {
defer close(errCh)
if err := kubeAPIServer.GenericAPIServer.PrepareRun().Run(stopCh); err != nil {
t.Error(err)
errCh <- err
}
}()
@ -161,6 +185,12 @@ func StartTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup
// wait for health
err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) {
select {
case err := <-errCh:
return false, err
default:
}
healthzConfig := rest.CopyConfig(kubeAPIServerClientConfig)
healthzConfig.ContentType = ""
healthzConfig.AcceptContentTypes = ""
@ -195,5 +225,5 @@ func StartTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup
t.Fatal(err)
}
return kubeAPIServerClient, kubeAPIServerClientConfig
return kubeAPIServerClient, kubeAPIServerClientConfig, tearDownFn
}