From 6f706775bcb0007082ca940527a154e728b4399f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Thu, 12 May 2022 12:10:02 +0200 Subject: [PATCH] Clean shutdown of test apiserver --- cmd/kube-apiserver/app/testing/testserver.go | 32 +++++++++---- .../apiserver/certreload/certreload_test.go | 18 +++----- .../max_json_patch_operations_test.go | 5 +- .../apiserver/max_request_body_bytes_test.go | 5 +- .../apiserver/podlogs/podlogs_test.go | 5 +- test/integration/auth/dynamic_client_test.go | 6 +-- .../controlplane/graceful_shutdown_test.go | 11 ++++- test/integration/etcd/server.go | 44 ++++++++++-------- test/integration/examples/webhook_test.go | 6 +-- test/integration/framework/test_server.go | 46 +++++++++++++++---- 10 files changed, 111 insertions(+), 67 deletions(-) diff --git a/cmd/kube-apiserver/app/testing/testserver.go b/cmd/kube-apiserver/app/testing/testserver.go index 2c6a8ee402b..e92fff9b7f0 100644 --- a/cmd/kube-apiserver/app/testing/testserver.go +++ b/cmd/kube-apiserver/app/testing/testserver.go @@ -44,6 +44,8 @@ import ( "k8s.io/kubernetes/cmd/kube-apiserver/app" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" testutil "k8s.io/kubernetes/test/utils" + + "k8s.io/klog/v2" ) // This key is for testing purposes only and is not considered secure. @@ -99,14 +101,27 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo instanceOptions = NewDefaultTestServerOptions() } + result.TmpDir, err = os.MkdirTemp("", "kubernetes-kube-apiserver") + if err != nil { + return result, fmt.Errorf("failed to create temp dir: %v", err) + } + stopCh := make(chan struct{}) + var errCh chan error tearDown := func() { // Closing stopCh is stopping apiserver and cleaning up // after itself, including shutting down its storage layer. close(stopCh) - if len(result.TmpDir) != 0 { - os.RemoveAll(result.TmpDir) + + // If the apiserver was started, let's wait for it to + // shutdown clearly. + if errCh != nil { + err, ok := <-errCh + if ok && err != nil { + klog.Errorf("Failed to shutdown test server clearly: %v", err) + } } + os.RemoveAll(result.TmpDir) } defer func() { if result.TearDownFn == nil { @@ -114,11 +129,6 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo } }() - result.TmpDir, err = os.MkdirTemp("", "kubernetes-kube-apiserver") - if err != nil { - return result, fmt.Errorf("failed to create temp dir: %v", err) - } - fs := pflag.NewFlagSet("test", pflag.PanicOnError) s := options.NewServerRunOptions() @@ -209,8 +219,9 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo server.GenericAPIServer.StorageVersionManager = instanceOptions.StorageVersionWrapFunc(server.GenericAPIServer.StorageVersionManager) } - errCh := make(chan error) + errCh = make(chan error) go func(stopCh <-chan struct{}) { + defer close(errCh) prepared, err := server.PrepareRun() if err != nil { errCh <- err @@ -302,7 +313,10 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo result.ClientConfig.QPS = 1000 result.ClientConfig.Burst = 10000 result.ServerOpts = s - result.TearDownFn = tearDown + result.TearDownFn = func() { + tearDown() + etcdClient.Close() + } result.EtcdClient = etcdClient result.EtcdStoragePrefix = storageConfig.Prefix diff --git a/test/integration/apiserver/certreload/certreload_test.go b/test/integration/apiserver/certreload/certreload_test.go index 2d169218d47..7559c4dfab0 100644 --- a/test/integration/apiserver/certreload/certreload_test.go +++ b/test/integration/apiserver/certreload/certreload_test.go @@ -135,9 +135,6 @@ func TestClientCARecreate(t *testing.T) { } func testClientCA(t *testing.T, recreate bool) { - stopCh := make(chan struct{}) - defer close(stopCh) - frontProxyCA, err := newTestCAWithClient( pkix.Name{ CommonName: "test-front-proxy-ca", @@ -173,7 +170,7 @@ func testClientCA(t *testing.T, recreate bool) { clientCAFilename := "" frontProxyCAFilename := "" - kubeClient, kubeconfig := framework.StartTestServer(t, stopCh, framework.TestServerSetup{ + kubeClient, kubeconfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ ModifyServerRunOptions: func(opts *options.ServerRunOptions) { opts.GenericServerRunOptions.MaxRequestBodyBytes = 1024 * 1024 clientCAFilename = opts.Authentication.ClientCert.ClientCA @@ -181,6 +178,7 @@ func testClientCA(t *testing.T, recreate bool) { opts.Authentication.RequestHeader.AllowedNames = append(opts.Authentication.RequestHeader.AllowedNames, "test-aggregated-apiserver") }, }) + defer tearDownFn() // wait for request header info err = wait.PollImmediate(100*time.Millisecond, 30*time.Second, waitForConfigMapCAContent(t, kubeClient, "requestheader-client-ca-file", "-----BEGIN CERTIFICATE-----", 1)) @@ -470,17 +468,15 @@ func TestServingCertRecreate(t *testing.T) { } func testServingCert(t *testing.T, recreate bool) { - stopCh := make(chan struct{}) - defer close(stopCh) - var servingCertPath string - _, kubeconfig := framework.StartTestServer(t, stopCh, framework.TestServerSetup{ + _, kubeconfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ ModifyServerRunOptions: func(opts *options.ServerRunOptions) { opts.GenericServerRunOptions.MaxRequestBodyBytes = 1024 * 1024 servingCertPath = opts.SecureServing.ServerCert.CertDirectory }, }) + defer tearDownFn() if recreate { if err := os.Remove(path.Join(servingCertPath, "apiserver.key")); err != nil { @@ -511,12 +507,9 @@ func testServingCert(t *testing.T, recreate bool) { } func TestSNICert(t *testing.T) { - stopCh := make(chan struct{}) - defer close(stopCh) - var servingCertPath string - _, kubeconfig := framework.StartTestServer(t, stopCh, framework.TestServerSetup{ + _, kubeconfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ ModifyServerRunOptions: func(opts *options.ServerRunOptions) { opts.GenericServerRunOptions.MaxRequestBodyBytes = 1024 * 1024 servingCertPath = opts.SecureServing.ServerCert.CertDirectory @@ -535,6 +528,7 @@ func TestSNICert(t *testing.T) { }} }, }) + defer tearDownFn() // When we run this the second time, we know which one we are expecting. _, actualCerts, err := cert.GetServingCertificatesForURL(kubeconfig.Host, "foo") diff --git a/test/integration/apiserver/max_json_patch_operations_test.go b/test/integration/apiserver/max_json_patch_operations_test.go index 005ad090213..357d7092955 100644 --- a/test/integration/apiserver/max_json_patch_operations_test.go +++ b/test/integration/apiserver/max_json_patch_operations_test.go @@ -32,13 +32,12 @@ import ( // Tests that the apiserver limits the number of operations in a json patch. func TestMaxJSONPatchOperations(t *testing.T) { - stopCh := make(chan struct{}) - defer close(stopCh) - clientSet, _ := framework.StartTestServer(t, stopCh, framework.TestServerSetup{ + clientSet, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ ModifyServerRunOptions: func(opts *options.ServerRunOptions) { opts.GenericServerRunOptions.MaxRequestBodyBytes = 1024 * 1024 }, }) + defer tearDownFn() p := `{"op":"add","path":"/x","value":"y"}` // maxJSONPatchOperations = 10000 diff --git a/test/integration/apiserver/max_request_body_bytes_test.go b/test/integration/apiserver/max_request_body_bytes_test.go index 1d53a16e11a..9b493058b12 100644 --- a/test/integration/apiserver/max_request_body_bytes_test.go +++ b/test/integration/apiserver/max_request_body_bytes_test.go @@ -30,9 +30,8 @@ import ( // Tests that the apiserver limits the resource size in write operations. func TestMaxResourceSize(t *testing.T) { - stopCh := make(chan struct{}) - defer close(stopCh) - clientSet, _ := framework.StartTestServer(t, stopCh, framework.TestServerSetup{}) + clientSet, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{}) + defer tearDownFn() hugeData := []byte(strings.Repeat("x", 3*1024*1024+1)) diff --git a/test/integration/apiserver/podlogs/podlogs_test.go b/test/integration/apiserver/podlogs/podlogs_test.go index d423f014b0e..0a349395cce 100644 --- a/test/integration/apiserver/podlogs/podlogs_test.go +++ b/test/integration/apiserver/podlogs/podlogs_test.go @@ -33,9 +33,7 @@ import ( ) func TestInsecurePodLogs(t *testing.T) { - stopCh := make(chan struct{}) - defer close(stopCh) - clientSet, _ := framework.StartTestServer(t, stopCh, framework.TestServerSetup{ + clientSet, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ ModifyServerRunOptions: func(opts *options.ServerRunOptions) { opts.GenericServerRunOptions.MaxRequestBodyBytes = 1024 * 1024 // I have no idea what this cert is, but it doesn't matter, we just want something that always fails validation @@ -63,6 +61,7 @@ Bgqc+dJN9xS9Ah5gLiGQJ6C4niUA11piCpvMsy+j/LQ1Erx47KMar5fuMXYk7iPq `) }, }) + defer tearDownFn() fakeKubeletServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { w.Write([]byte("fake-log")) diff --git a/test/integration/auth/dynamic_client_test.go b/test/integration/auth/dynamic_client_test.go index 9cce2dd1dcb..823a7d0dd41 100644 --- a/test/integration/auth/dynamic_client_test.go +++ b/test/integration/auth/dynamic_client_test.go @@ -53,10 +53,7 @@ func TestDynamicClientBuilder(t *testing.T) { t.Fatalf("parse duration failed: %v", err) } - stopCh := make(chan struct{}) - defer close(stopCh) - - baseClient, baseConfig := framework.StartTestServer(t, stopCh, framework.TestServerSetup{ + baseClient, baseConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ ModifyServerRunOptions: func(opts *options.ServerRunOptions) { opts.ServiceAccountSigningKeyFile = tmpfile.Name() opts.ServiceAccountTokenMaxExpiration = maxExpirationDuration @@ -75,6 +72,7 @@ func TestDynamicClientBuilder(t *testing.T) { config.GenericConfig.Authorization.Authorizer = authorizerfactory.NewAlwaysAllowAuthorizer() }, }) + defer tearDownFn() // We want to test if the token rotation works fine here. // To minimize the time this test would consume, we use the minimial token expiration. diff --git a/test/integration/controlplane/graceful_shutdown_test.go b/test/integration/controlplane/graceful_shutdown_test.go index 3d2bd617478..36ae8fa0a84 100644 --- a/test/integration/controlplane/graceful_shutdown_test.go +++ b/test/integration/controlplane/graceful_shutdown_test.go @@ -70,7 +70,14 @@ func TestGracefulShutdown(t *testing.T) { resp.Body.Close() t.Logf("shutting down server") - tearDownOnce.Do(server.TearDownFn) + // We tear it down in the background to ensure that + // pending requests should work fine. + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + tearDownOnce.Do(server.TearDownFn) + }() t.Logf("server should fail new requests") if err := wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, func() (done bool, err error) { @@ -100,6 +107,8 @@ func TestGracefulShutdown(t *testing.T) { t.Fatal(err) } t.Logf("response: code %d, body: %s", respErr.resp.StatusCode, string(bs)) + + wg.Wait() } type responseErrorPair struct { diff --git a/test/integration/etcd/server.go b/test/integration/etcd/server.go index 9d7514cd464..f8c9455eb10 100644 --- a/test/integration/etcd/server.go +++ b/test/integration/etcd/server.go @@ -27,7 +27,6 @@ import ( "time" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/client/v3/concurrency" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" @@ -118,24 +117,11 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu t.Fatal(err) } - // get a leased session - session, err := concurrency.NewSession(rawClient) - if err != nil { - t.Fatal(err) - } - - // then build and use an etcd lock - // this prevents more than one of these api servers from running at the same time - lock := concurrency.NewLocker(session, "kube_integration_etcd_raw") - lock.Lock() - // make sure we start with a clean slate if _, err := kvClient.Delete(context.Background(), "/registry/", clientv3.WithPrefix()); err != nil { t.Fatal(err) } - stopCh := make(chan struct{}) - kubeAPIServer, err := app.CreateServerChain(completedOptions) if err != nil { t.Fatal(err) @@ -152,6 +138,8 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu kubeClient := clientset.NewForConfigOrDie(kubeClientConfig) + stopCh := make(chan struct{}) + errCh := make(chan error) go func() { // Catch panics that occur in this go routine so we get a comprehensible failure defer func() { @@ -159,19 +147,29 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu t.Errorf("Unexpected panic trying to start API server: %#v", err) } }() + defer close(errCh) prepared, err := kubeAPIServer.PrepareRun() if err != nil { - t.Error(err) + errCh <- err + return } if err := prepared.Run(stopCh); err != nil { + errCh <- err t.Error(err) + return } }() lastHealth := "" attempt := 0 if err := wait.PollImmediate(time.Second, time.Minute, func() (done bool, err error) { + select { + case err := <-errCh: + return false, err + default: + } + // wait for the server to be healthy result := kubeClient.RESTClient().Get().AbsPath("/healthz").Do(context.TODO()) content, _ := result.Raw() @@ -207,12 +205,18 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu } cleanup := func() { - if err := os.RemoveAll(certDir); err != nil { - t.Log(err) - } + // Closing stopCh is stopping apiserver and cleaning up + // after itself, including shutting down its storage layer. close(stopCh) - lock.Unlock() - if err := session.Close(); err != nil { + + // If the apiserver was started, let's wait for it to + // shutdown clearly. + err, ok := <-errCh + if ok && err != nil { + t.Error(err) + } + rawClient.Close() + if err := os.RemoveAll(certDir); err != nil { t.Log(err) } } diff --git a/test/integration/examples/webhook_test.go b/test/integration/examples/webhook_test.go index 1e265986a16..d89cdd0be38 100644 --- a/test/integration/examples/webhook_test.go +++ b/test/integration/examples/webhook_test.go @@ -37,14 +37,11 @@ import ( ) func TestWebhookLoopback(t *testing.T) { - stopCh := make(chan struct{}) - defer close(stopCh) - webhookPath := "/webhook-test" called := int32(0) - client, _ := framework.StartTestServer(t, stopCh, framework.TestServerSetup{ + client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ ModifyServerRunOptions: func(opts *options.ServerRunOptions) { }, ModifyServerConfig: func(config *controlplane.Config) { @@ -66,6 +63,7 @@ func TestWebhookLoopback(t *testing.T) { }) }, }) + defer tearDownFn() fail := admissionregistrationv1.Fail noSideEffects := admissionregistrationv1.SideEffectClassNone diff --git a/test/integration/framework/test_server.go b/test/integration/framework/test_server.go index c007ef5dfa0..b77ebf05729 100644 --- a/test/integration/framework/test_server.go +++ b/test/integration/framework/test_server.go @@ -55,13 +55,34 @@ type TestServerSetup struct { ModifyServerConfig func(*controlplane.Config) } +type TearDownFunc func() + // StartTestServer runs a kube-apiserver, optionally calling out to the setup.ModifyServerRunOptions and setup.ModifyServerConfig functions -func StartTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup) (client.Interface, *rest.Config) { - certDir, _ := os.MkdirTemp("", "test-integration-"+t.Name()) - go func() { - <-stopCh - os.RemoveAll(certDir) - }() +func StartTestServer(t *testing.T, setup TestServerSetup) (client.Interface, *rest.Config, TearDownFunc) { + certDir, err := os.MkdirTemp("", "test-integration-"+t.Name()) + if err != nil { + t.Fatalf("Couldn't create temp dir: %v", err) + } + + stopCh := make(chan struct{}) + var errCh chan error + tearDownFn := func() { + // Closing stopCh is stopping apiserver and cleaning up + // after itself, including shutting down its storage layer. + close(stopCh) + + // If the apiserver was started, let's wait for it to + // shutdown clearly. + if errCh != nil { + err, ok := <-errCh + if ok && err != nil { + t.Error(err) + } + } + if err := os.RemoveAll(certDir); err != nil { + t.Log(err) + } + } _, defaultServiceClusterIPRange, _ := netutils.ParseCIDRSloppy("10.0.0.0/24") proxySigningKey, err := utils.NewPrivateKey() @@ -147,9 +168,12 @@ func StartTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup if err != nil { t.Fatal(err) } + + errCh = make(chan error) go func() { + defer close(errCh) if err := kubeAPIServer.GenericAPIServer.PrepareRun().Run(stopCh); err != nil { - t.Error(err) + errCh <- err } }() @@ -161,6 +185,12 @@ func StartTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup // wait for health err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) { + select { + case err := <-errCh: + return false, err + default: + } + healthzConfig := rest.CopyConfig(kubeAPIServerClientConfig) healthzConfig.ContentType = "" healthzConfig.AcceptContentTypes = "" @@ -195,5 +225,5 @@ func StartTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup t.Fatal(err) } - return kubeAPIServerClient, kubeAPIServerClientConfig + return kubeAPIServerClient, kubeAPIServerClientConfig, tearDownFn }