diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go index 04a60eaf17f..02095d30ab8 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go @@ -222,11 +222,6 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) crdClient.ApiextensionsV1(), crdHandler, ) - openapiController := openapicontroller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions()) - var openapiv3Controller *openapiv3controller.Controller - if utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) { - openapiv3Controller = openapiv3controller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions()) - } s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error { s.Informers.Start(context.StopCh) @@ -239,10 +234,12 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) // and StaticOpenAPISpec are both null. In that case we don't run the CRD OpenAPI controller. if s.GenericAPIServer.StaticOpenAPISpec != nil { if s.GenericAPIServer.OpenAPIVersionedService != nil { + openapiController := openapicontroller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions()) go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh) } if s.GenericAPIServer.OpenAPIV3VersionedService != nil && utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) { + openapiv3Controller := openapiv3controller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions()) go openapiv3Controller.Run(s.GenericAPIServer.OpenAPIV3VersionedService, context.StopCh) } } diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go index af15959a21e..0f1dc2d0643 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go @@ -34,6 +34,8 @@ import ( "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" + + "k8s.io/klog/v2" ) // TearDownFunc is to be called to tear down a test server. @@ -71,11 +73,22 @@ func NewDefaultTestServerOptions() *TestServerInstanceOptions { // enough time to remove temporary files. func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) { stopCh := make(chan struct{}) + var errCh chan error tearDown := func() { // Closing stopCh is stopping apiextensions apiserver and its // delegates, which itself is cleaning up after itself, // including shutting down its storage layer. close(stopCh) + + // If the apiextensions apiserver was started, let's wait for + // it to shutdown clearly. + if errCh != nil { + err, ok := <-errCh + if ok && err != nil { + klog.Errorf("Failed to shutdown test server clearly: %v", err) + } + } + if len(result.TmpDir) != 0 { os.RemoveAll(result.TmpDir) } @@ -135,8 +148,10 @@ func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []strin return result, fmt.Errorf("failed to create server: %v", err) } - errCh := make(chan error) + errCh = make(chan error) go func(stopCh <-chan struct{}) { + defer close(errCh) + if err := server.GenericAPIServer.PrepareRun().Run(stopCh); err != nil { errCh <- err } diff --git a/staging/src/k8s.io/client-go/transport/cache.go b/staging/src/k8s.io/client-go/transport/cache.go index 5fe768ed5ec..214f0a79cf0 100644 --- a/staging/src/k8s.io/client-go/transport/cache.go +++ b/staging/src/k8s.io/client-go/transport/cache.go @@ -36,6 +36,11 @@ type tlsTransportCache struct { transports map[tlsCacheKey]*http.Transport } +// DialerStopCh is stop channel that is passed down to dynamic cert dialer. +// It's exposed as variable for testing purposes to avoid testing for goroutine +// leakages. +var DialerStopCh = wait.NeverStop + const idleConnsPerHost = 25 var tlsCache = &tlsTransportCache{transports: make(map[tlsCacheKey]*http.Transport)} @@ -101,7 +106,7 @@ func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) { dynamicCertDialer := certRotatingDialer(tlsConfig.GetClientCertificate, dial) tlsConfig.GetClientCertificate = dynamicCertDialer.GetClientCertificate dial = dynamicCertDialer.connDialer.DialContext - go dynamicCertDialer.Run(wait.NeverStop) + go dynamicCertDialer.Run(DialerStopCh) } proxy := http.ProxyFromEnvironment diff --git a/test/integration/apiserver/apiserver_test.go b/test/integration/apiserver/apiserver_test.go index f98b58d3658..c371ead248f 100644 --- a/test/integration/apiserver/apiserver_test.go +++ b/test/integration/apiserver/apiserver_test.go @@ -401,10 +401,14 @@ func TestListOptions(t *testing.T) { } // compact some of the revision history in etcd so we can test "too old" resource versions - _, kvClient, err := integration.GetEtcdClients(*storageTransport) + rawClient, kvClient, err := integration.GetEtcdClients(*storageTransport) if err != nil { t.Fatal(err) } + // kvClient is a wrapper around rawClient and to avoid leaking goroutines we need to + // close the client (which we can do by closing rawClient). + defer rawClient.Close() + revision, err := strconv.Atoi(oldestUncompactedRv) if err != nil { t.Fatal(err) diff --git a/test/integration/auth/podsecurity_test.go b/test/integration/auth/podsecurity_test.go index fc421b894c8..8927f977fc6 100644 --- a/test/integration/auth/podsecurity_test.go +++ b/test/integration/auth/podsecurity_test.go @@ -199,6 +199,7 @@ func startPodSecurityWebhook(t *testing.T, testServer *kubeapiservertesting.Test if err != nil { return false, err } + defer resp.Body.Close() return resp.StatusCode == 200, nil }); err != nil { return "", err diff --git a/test/integration/auth/svcaccttoken_test.go b/test/integration/auth/svcaccttoken_test.go index c02511b43b2..11869ac7e6b 100644 --- a/test/integration/auth/svcaccttoken_test.go +++ b/test/integration/auth/svcaccttoken_test.go @@ -24,6 +24,7 @@ import ( "encoding/json" "fmt" "io" + "net" "net/http" "net/url" "reflect" @@ -45,12 +46,15 @@ import ( apiserverserviceaccount "k8s.io/apiserver/pkg/authentication/serviceaccount" "k8s.io/apiserver/pkg/authorization/authorizerfactory" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" v1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/keyutil" + "k8s.io/kubernetes/cmd/kube-apiserver/app/options" "k8s.io/kubernetes/pkg/apis/core" serviceaccountgetter "k8s.io/kubernetes/pkg/controller/serviceaccount" + "k8s.io/kubernetes/pkg/controlplane" "k8s.io/kubernetes/pkg/serviceaccount" "k8s.io/kubernetes/test/integration/framework" ) @@ -87,49 +91,66 @@ func TestServiceAccountTokenCreate(t *testing.T) { gcs := &clientset.Clientset{} - // Start the server - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - controlPlaneConfig.GenericConfig.Authorization.Authorizer = authorizerfactory.NewAlwaysAllowAuthorizer() - controlPlaneConfig.GenericConfig.Authentication.APIAudiences = aud - controlPlaneConfig.GenericConfig.Authentication.Authenticator = bearertoken.New( - serviceaccount.JWTTokenAuthenticator( - []string{iss}, - []interface{}{&pk}, - aud, - serviceaccount.NewValidator(serviceaccountgetter.NewGetterFromClient( - gcs, - v1listers.NewSecretLister(newIndexer(func(namespace, name string) (interface{}, error) { - return gcs.CoreV1().Secrets(namespace).Get(context.TODO(), name, metav1.GetOptions{}) - })), - v1listers.NewServiceAccountLister(newIndexer(func(namespace, name string) (interface{}, error) { - return gcs.CoreV1().ServiceAccounts(namespace).Get(context.TODO(), name, metav1.GetOptions{}) - })), - v1listers.NewPodLister(newIndexer(func(namespace, name string) (interface{}, error) { - return gcs.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{}) - })), - )), - ), - ) - tokenGenerator, err := serviceaccount.JWTTokenGenerator(iss, sk) if err != nil { t.Fatalf("err: %v", err) } - controlPlaneConfig.ExtraConfig.ServiceAccountIssuer = tokenGenerator - controlPlaneConfig.ExtraConfig.ServiceAccountMaxExpiration = maxExpirationDuration - controlPlaneConfig.GenericConfig.Authentication.APIAudiences = aud - controlPlaneConfig.ExtraConfig.ExtendExpiration = true - controlPlaneConfig.ExtraConfig.ServiceAccountIssuerURL = iss - controlPlaneConfig.ExtraConfig.ServiceAccountJWKSURI = "" - controlPlaneConfig.ExtraConfig.ServiceAccountPublicKeys = []interface{}{&pk} + // Start the server + var serverAddress string + kubeClient, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"} + }, + ModifyServerConfig: func(config *controlplane.Config) { + config.GenericConfig.Authorization.Authorizer = authorizerfactory.NewAlwaysAllowAuthorizer() + config.GenericConfig.Authentication.APIAudiences = aud + config.GenericConfig.Authentication.Authenticator = bearertoken.New( + serviceaccount.JWTTokenAuthenticator( + []string{iss}, + []interface{}{&pk}, + aud, + serviceaccount.NewValidator(serviceaccountgetter.NewGetterFromClient( + gcs, + v1listers.NewSecretLister(newIndexer(func(namespace, name string) (interface{}, error) { + return gcs.CoreV1().Secrets(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + })), + v1listers.NewServiceAccountLister(newIndexer(func(namespace, name string) (interface{}, error) { + return gcs.CoreV1().ServiceAccounts(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + })), + v1listers.NewPodLister(newIndexer(func(namespace, name string) (interface{}, error) { + return gcs.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + })), + )), + ), + ) - instanceConfig, _, closeFn := framework.RunAnAPIServer(controlPlaneConfig) - defer closeFn() + config.ExtraConfig.ServiceAccountIssuer = tokenGenerator + config.ExtraConfig.ServiceAccountMaxExpiration = maxExpirationDuration + config.ExtraConfig.ExtendExpiration = true + + config.ExtraConfig.ServiceAccountIssuerURL = iss + config.ExtraConfig.ServiceAccountJWKSURI = "" + config.ExtraConfig.ServiceAccountPublicKeys = []interface{}{&pk} + + // Compute the serverAddress. + serverAddress = config.GenericConfig.ExternalAddress + _, port, err := config.GenericConfig.SecureServing.HostPort() + if err != nil { + t.Fatalf("Couldn't get server port: %v", err) + } + serverAddress = net.JoinHostPort(serverAddress, strconv.Itoa(port)) + }, + }) + defer tearDownFn() + + ns := framework.CreateNamespaceOrDie(kubeClient, "myns", t) + defer framework.DeleteNamespaceOrDie(kubeClient, ns, t) warningHandler := &recordingWarningHandler{} - configWithWarningHandler := rest.CopyConfig(instanceConfig.GenericAPIServer.LoopbackClientConfig) + configWithWarningHandler := rest.CopyConfig(kubeConfig) configWithWarningHandler.WarningHandler = warningHandler cs, err := clientset.NewForConfig(configWithWarningHandler) if err != nil { @@ -137,7 +158,8 @@ func TestServiceAccountTokenCreate(t *testing.T) { } *gcs = *cs - rc, err := rest.UnversionedRESTClientFor(instanceConfig.GenericAPIServer.LoopbackClientConfig) + kubeConfig.NegotiatedSerializer = scheme.Codecs.WithoutConversion() + rc, err := rest.UnversionedRESTClientFor(kubeConfig) if err != nil { t.Fatal(err) } @@ -146,7 +168,7 @@ func TestServiceAccountTokenCreate(t *testing.T) { sa = &v1.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{ Name: "test-svcacct", - Namespace: "myns", + Namespace: ns.Name, }, } pod = &v1.Pod{ @@ -431,7 +453,7 @@ func TestServiceAccountTokenCreate(t *testing.T) { ObjectMeta: sa.ObjectMeta, } _, pc := serviceaccount.Claims(coresa, nil, nil, 0, 0, nil) - tok, err := controlPlaneConfig.ExtraConfig.ServiceAccountIssuer.GenerateToken(sc, pc) + tok, err := tokenGenerator.GenerateToken(sc, pc) if err != nil { t.Fatalf("err signing expired token: %v", err) } @@ -830,14 +852,9 @@ func TestServiceAccountTokenCreate(t *testing.T) { t.Fatalf("invalid issuer in discovery doc: got %s, want %s", discoveryDoc.Issuer, iss) } - // Parse the JWKSURI see if the path is what we expect. Since the - // integration test framework hardcodes 192.168.10.4 as the PublicAddress, - // which results in the same for ExternalAddress, we expect the JWKS URI - // to be 192.168.10.4:443, even if that's not necessarily the external - // IP of the test machine. expectJWKSURI := (&url.URL{ Scheme: "https", - Host: "192.168.10.4:443", + Host: serverAddress, Path: serviceaccount.JWKSPath, }).String() if discoveryDoc.JWKS != expectJWKSURI { diff --git a/test/integration/client/cert_rotation_test.go b/test/integration/client/cert_rotation_test.go index 282ddf17c73..44089b91128 100644 --- a/test/integration/client/cert_rotation_test.go +++ b/test/integration/client/cert_rotation_test.go @@ -46,6 +46,7 @@ func TestCertRotation(t *testing.T) { defer close(stopCh) transport.CertCallbackRefreshDuration = 1 * time.Second + transport.DialerStopCh = stopCh certDir := os.TempDir() clientCAFilename, clientSigningCert, clientSigningKey := writeCACertFiles(t, certDir) @@ -103,6 +104,7 @@ func TestCertRotationContinuousRequests(t *testing.T) { defer close(stopCh) transport.CertCallbackRefreshDuration = 1 * time.Second + transport.DialerStopCh = stopCh certDir := os.TempDir() clientCAFilename, clientSigningCert, clientSigningKey := writeCACertFiles(t, certDir) diff --git a/test/integration/controlplane/transformation/kms_transformation_test.go b/test/integration/controlplane/transformation/kms_transformation_test.go index c86628f1ff6..e4c151f80ed 100644 --- a/test/integration/controlplane/transformation/kms_transformation_test.go +++ b/test/integration/controlplane/transformation/kms_transformation_test.go @@ -120,7 +120,6 @@ resources: cachesize: 1000 endpoint: unix:///@kms-provider.sock ` - providerName := "kms-provider" pluginMock, err := mock.NewBase64Plugin("@kms-provider.sock") if err != nil { diff --git a/test/integration/controlplane/transformation/transformation_testcase.go b/test/integration/controlplane/transformation/transformation_testcase.go index a8028f26cf6..4667fd45d26 100644 --- a/test/integration/controlplane/transformation/transformation_testcase.go +++ b/test/integration/controlplane/transformation/transformation_testcase.go @@ -236,10 +236,14 @@ func (e *transformTest) createSecret(name, namespace string) (*corev1.Secret, er } func (e *transformTest) readRawRecordFromETCD(path string) (*clientv3.GetResponse, error) { - _, etcdClient, err := integration.GetEtcdClients(e.kubeAPIServer.ServerOpts.Etcd.StorageConfig.Transport) + rawClient, etcdClient, err := integration.GetEtcdClients(e.kubeAPIServer.ServerOpts.Etcd.StorageConfig.Transport) if err != nil { return nil, fmt.Errorf("failed to create etcd client: %v", err) } + // kvClient is a wrapper around rawClient and to avoid leaking goroutines we need to + // close the client (which we can do by closing rawClient). + defer rawClient.Close() + response, err := etcdClient.Get(context.Background(), path, clientv3.WithPrefix()) if err != nil { return nil, fmt.Errorf("failed to retrieve secret from etcd %v", err) @@ -249,10 +253,14 @@ func (e *transformTest) readRawRecordFromETCD(path string) (*clientv3.GetRespons } func (e *transformTest) writeRawRecordToETCD(path string, data []byte) (*clientv3.PutResponse, error) { - _, etcdClient, err := integration.GetEtcdClients(e.kubeAPIServer.ServerOpts.Etcd.StorageConfig.Transport) + rawClient, etcdClient, err := integration.GetEtcdClients(e.kubeAPIServer.ServerOpts.Etcd.StorageConfig.Transport) if err != nil { return nil, fmt.Errorf("failed to create etcd client: %v", err) } + // kvClient is a wrapper around rawClient and to avoid leaking goroutines we need to + // close the client (which we can do by closing rawClient). + defer rawClient.Close() + response, err := etcdClient.Put(context.Background(), path, string(data)) if err != nil { return nil, fmt.Errorf("failed to write secret to etcd %v", err)