Merge pull request #111242 from wojtek-t/fix_leaking_goroutines_11

Fix leaking goroutines in multiple integration tests
This commit is contained in:
Kubernetes Prow Robot 2022-07-19 05:34:30 -07:00 committed by GitHub
commit 0cde1b7446
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 102 additions and 54 deletions

View File

@ -222,11 +222,6 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
crdClient.ApiextensionsV1(),
crdHandler,
)
openapiController := openapicontroller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
var openapiv3Controller *openapiv3controller.Controller
if utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) {
openapiv3Controller = openapiv3controller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
}
s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {
s.Informers.Start(context.StopCh)
@ -239,10 +234,12 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
// and StaticOpenAPISpec are both null. In that case we don't run the CRD OpenAPI controller.
if s.GenericAPIServer.StaticOpenAPISpec != nil {
if s.GenericAPIServer.OpenAPIVersionedService != nil {
openapiController := openapicontroller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)
}
if s.GenericAPIServer.OpenAPIV3VersionedService != nil && utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) {
openapiv3Controller := openapiv3controller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
go openapiv3Controller.Run(s.GenericAPIServer.OpenAPIV3VersionedService, context.StopCh)
}
}

View File

@ -34,6 +34,8 @@ import (
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/klog/v2"
)
// TearDownFunc is to be called to tear down a test server.
@ -71,11 +73,22 @@ func NewDefaultTestServerOptions() *TestServerInstanceOptions {
// enough time to remove temporary files.
func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) {
stopCh := make(chan struct{})
var errCh chan error
tearDown := func() {
// Closing stopCh is stopping apiextensions apiserver and its
// delegates, which itself is cleaning up after itself,
// including shutting down its storage layer.
close(stopCh)
// If the apiextensions apiserver was started, let's wait for
// it to shutdown clearly.
if errCh != nil {
err, ok := <-errCh
if ok && err != nil {
klog.Errorf("Failed to shutdown test server clearly: %v", err)
}
}
if len(result.TmpDir) != 0 {
os.RemoveAll(result.TmpDir)
}
@ -135,8 +148,10 @@ func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []strin
return result, fmt.Errorf("failed to create server: %v", err)
}
errCh := make(chan error)
errCh = make(chan error)
go func(stopCh <-chan struct{}) {
defer close(errCh)
if err := server.GenericAPIServer.PrepareRun().Run(stopCh); err != nil {
errCh <- err
}

View File

@ -36,6 +36,11 @@ type tlsTransportCache struct {
transports map[tlsCacheKey]*http.Transport
}
// DialerStopCh is stop channel that is passed down to dynamic cert dialer.
// It's exposed as variable for testing purposes to avoid testing for goroutine
// leakages.
var DialerStopCh = wait.NeverStop
const idleConnsPerHost = 25
var tlsCache = &tlsTransportCache{transports: make(map[tlsCacheKey]*http.Transport)}
@ -101,7 +106,7 @@ func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) {
dynamicCertDialer := certRotatingDialer(tlsConfig.GetClientCertificate, dial)
tlsConfig.GetClientCertificate = dynamicCertDialer.GetClientCertificate
dial = dynamicCertDialer.connDialer.DialContext
go dynamicCertDialer.Run(wait.NeverStop)
go dynamicCertDialer.Run(DialerStopCh)
}
proxy := http.ProxyFromEnvironment

View File

@ -401,10 +401,14 @@ func TestListOptions(t *testing.T) {
}
// compact some of the revision history in etcd so we can test "too old" resource versions
_, kvClient, err := integration.GetEtcdClients(*storageTransport)
rawClient, kvClient, err := integration.GetEtcdClients(*storageTransport)
if err != nil {
t.Fatal(err)
}
// kvClient is a wrapper around rawClient and to avoid leaking goroutines we need to
// close the client (which we can do by closing rawClient).
defer rawClient.Close()
revision, err := strconv.Atoi(oldestUncompactedRv)
if err != nil {
t.Fatal(err)

View File

@ -199,6 +199,7 @@ func startPodSecurityWebhook(t *testing.T, testServer *kubeapiservertesting.Test
if err != nil {
return false, err
}
defer resp.Body.Close()
return resp.StatusCode == 200, nil
}); err != nil {
return "", err

View File

@ -24,6 +24,7 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/url"
"reflect"
@ -45,12 +46,15 @@ import (
apiserverserviceaccount "k8s.io/apiserver/pkg/authentication/serviceaccount"
"k8s.io/apiserver/pkg/authorization/authorizerfactory"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/keyutil"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/apis/core"
serviceaccountgetter "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/test/integration/framework"
)
@ -87,49 +91,66 @@ func TestServiceAccountTokenCreate(t *testing.T) {
gcs := &clientset.Clientset{}
// Start the server
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
controlPlaneConfig.GenericConfig.Authorization.Authorizer = authorizerfactory.NewAlwaysAllowAuthorizer()
controlPlaneConfig.GenericConfig.Authentication.APIAudiences = aud
controlPlaneConfig.GenericConfig.Authentication.Authenticator = bearertoken.New(
serviceaccount.JWTTokenAuthenticator(
[]string{iss},
[]interface{}{&pk},
aud,
serviceaccount.NewValidator(serviceaccountgetter.NewGetterFromClient(
gcs,
v1listers.NewSecretLister(newIndexer(func(namespace, name string) (interface{}, error) {
return gcs.CoreV1().Secrets(namespace).Get(context.TODO(), name, metav1.GetOptions{})
})),
v1listers.NewServiceAccountLister(newIndexer(func(namespace, name string) (interface{}, error) {
return gcs.CoreV1().ServiceAccounts(namespace).Get(context.TODO(), name, metav1.GetOptions{})
})),
v1listers.NewPodLister(newIndexer(func(namespace, name string) (interface{}, error) {
return gcs.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
})),
)),
),
)
tokenGenerator, err := serviceaccount.JWTTokenGenerator(iss, sk)
if err != nil {
t.Fatalf("err: %v", err)
}
controlPlaneConfig.ExtraConfig.ServiceAccountIssuer = tokenGenerator
controlPlaneConfig.ExtraConfig.ServiceAccountMaxExpiration = maxExpirationDuration
controlPlaneConfig.GenericConfig.Authentication.APIAudiences = aud
controlPlaneConfig.ExtraConfig.ExtendExpiration = true
controlPlaneConfig.ExtraConfig.ServiceAccountIssuerURL = iss
controlPlaneConfig.ExtraConfig.ServiceAccountJWKSURI = ""
controlPlaneConfig.ExtraConfig.ServiceAccountPublicKeys = []interface{}{&pk}
// Start the server
var serverAddress string
kubeClient, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"}
},
ModifyServerConfig: func(config *controlplane.Config) {
config.GenericConfig.Authorization.Authorizer = authorizerfactory.NewAlwaysAllowAuthorizer()
config.GenericConfig.Authentication.APIAudiences = aud
config.GenericConfig.Authentication.Authenticator = bearertoken.New(
serviceaccount.JWTTokenAuthenticator(
[]string{iss},
[]interface{}{&pk},
aud,
serviceaccount.NewValidator(serviceaccountgetter.NewGetterFromClient(
gcs,
v1listers.NewSecretLister(newIndexer(func(namespace, name string) (interface{}, error) {
return gcs.CoreV1().Secrets(namespace).Get(context.TODO(), name, metav1.GetOptions{})
})),
v1listers.NewServiceAccountLister(newIndexer(func(namespace, name string) (interface{}, error) {
return gcs.CoreV1().ServiceAccounts(namespace).Get(context.TODO(), name, metav1.GetOptions{})
})),
v1listers.NewPodLister(newIndexer(func(namespace, name string) (interface{}, error) {
return gcs.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
})),
)),
),
)
instanceConfig, _, closeFn := framework.RunAnAPIServer(controlPlaneConfig)
defer closeFn()
config.ExtraConfig.ServiceAccountIssuer = tokenGenerator
config.ExtraConfig.ServiceAccountMaxExpiration = maxExpirationDuration
config.ExtraConfig.ExtendExpiration = true
config.ExtraConfig.ServiceAccountIssuerURL = iss
config.ExtraConfig.ServiceAccountJWKSURI = ""
config.ExtraConfig.ServiceAccountPublicKeys = []interface{}{&pk}
// Compute the serverAddress.
serverAddress = config.GenericConfig.ExternalAddress
_, port, err := config.GenericConfig.SecureServing.HostPort()
if err != nil {
t.Fatalf("Couldn't get server port: %v", err)
}
serverAddress = net.JoinHostPort(serverAddress, strconv.Itoa(port))
},
})
defer tearDownFn()
ns := framework.CreateNamespaceOrDie(kubeClient, "myns", t)
defer framework.DeleteNamespaceOrDie(kubeClient, ns, t)
warningHandler := &recordingWarningHandler{}
configWithWarningHandler := rest.CopyConfig(instanceConfig.GenericAPIServer.LoopbackClientConfig)
configWithWarningHandler := rest.CopyConfig(kubeConfig)
configWithWarningHandler.WarningHandler = warningHandler
cs, err := clientset.NewForConfig(configWithWarningHandler)
if err != nil {
@ -137,7 +158,8 @@ func TestServiceAccountTokenCreate(t *testing.T) {
}
*gcs = *cs
rc, err := rest.UnversionedRESTClientFor(instanceConfig.GenericAPIServer.LoopbackClientConfig)
kubeConfig.NegotiatedSerializer = scheme.Codecs.WithoutConversion()
rc, err := rest.UnversionedRESTClientFor(kubeConfig)
if err != nil {
t.Fatal(err)
}
@ -146,7 +168,7 @@ func TestServiceAccountTokenCreate(t *testing.T) {
sa = &v1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: "test-svcacct",
Namespace: "myns",
Namespace: ns.Name,
},
}
pod = &v1.Pod{
@ -431,7 +453,7 @@ func TestServiceAccountTokenCreate(t *testing.T) {
ObjectMeta: sa.ObjectMeta,
}
_, pc := serviceaccount.Claims(coresa, nil, nil, 0, 0, nil)
tok, err := controlPlaneConfig.ExtraConfig.ServiceAccountIssuer.GenerateToken(sc, pc)
tok, err := tokenGenerator.GenerateToken(sc, pc)
if err != nil {
t.Fatalf("err signing expired token: %v", err)
}
@ -830,14 +852,9 @@ func TestServiceAccountTokenCreate(t *testing.T) {
t.Fatalf("invalid issuer in discovery doc: got %s, want %s",
discoveryDoc.Issuer, iss)
}
// Parse the JWKSURI see if the path is what we expect. Since the
// integration test framework hardcodes 192.168.10.4 as the PublicAddress,
// which results in the same for ExternalAddress, we expect the JWKS URI
// to be 192.168.10.4:443, even if that's not necessarily the external
// IP of the test machine.
expectJWKSURI := (&url.URL{
Scheme: "https",
Host: "192.168.10.4:443",
Host: serverAddress,
Path: serviceaccount.JWKSPath,
}).String()
if discoveryDoc.JWKS != expectJWKSURI {

View File

@ -46,6 +46,7 @@ func TestCertRotation(t *testing.T) {
defer close(stopCh)
transport.CertCallbackRefreshDuration = 1 * time.Second
transport.DialerStopCh = stopCh
certDir := os.TempDir()
clientCAFilename, clientSigningCert, clientSigningKey := writeCACertFiles(t, certDir)
@ -103,6 +104,7 @@ func TestCertRotationContinuousRequests(t *testing.T) {
defer close(stopCh)
transport.CertCallbackRefreshDuration = 1 * time.Second
transport.DialerStopCh = stopCh
certDir := os.TempDir()
clientCAFilename, clientSigningCert, clientSigningKey := writeCACertFiles(t, certDir)

View File

@ -120,7 +120,6 @@ resources:
cachesize: 1000
endpoint: unix:///@kms-provider.sock
`
providerName := "kms-provider"
pluginMock, err := mock.NewBase64Plugin("@kms-provider.sock")
if err != nil {

View File

@ -236,10 +236,14 @@ func (e *transformTest) createSecret(name, namespace string) (*corev1.Secret, er
}
func (e *transformTest) readRawRecordFromETCD(path string) (*clientv3.GetResponse, error) {
_, etcdClient, err := integration.GetEtcdClients(e.kubeAPIServer.ServerOpts.Etcd.StorageConfig.Transport)
rawClient, etcdClient, err := integration.GetEtcdClients(e.kubeAPIServer.ServerOpts.Etcd.StorageConfig.Transport)
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %v", err)
}
// kvClient is a wrapper around rawClient and to avoid leaking goroutines we need to
// close the client (which we can do by closing rawClient).
defer rawClient.Close()
response, err := etcdClient.Get(context.Background(), path, clientv3.WithPrefix())
if err != nil {
return nil, fmt.Errorf("failed to retrieve secret from etcd %v", err)
@ -249,10 +253,14 @@ func (e *transformTest) readRawRecordFromETCD(path string) (*clientv3.GetRespons
}
func (e *transformTest) writeRawRecordToETCD(path string, data []byte) (*clientv3.PutResponse, error) {
_, etcdClient, err := integration.GetEtcdClients(e.kubeAPIServer.ServerOpts.Etcd.StorageConfig.Transport)
rawClient, etcdClient, err := integration.GetEtcdClients(e.kubeAPIServer.ServerOpts.Etcd.StorageConfig.Transport)
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %v", err)
}
// kvClient is a wrapper around rawClient and to avoid leaking goroutines we need to
// close the client (which we can do by closing rawClient).
defer rawClient.Close()
response, err := etcdClient.Put(context.Background(), path, string(data))
if err != nil {
return nil, fmt.Errorf("failed to write secret to etcd %v", err)