Merge pull request #111325 from wojtek-t/fix_leaking_goroutines_13

Clean shutdown of serviceaccount integration tests
This commit is contained in:
Kubernetes Prow Robot 2022-07-22 08:31:49 -07:00 committed by GitHub
commit 9495b2fb9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -25,15 +25,12 @@ import (
"crypto/rand" "crypto/rand"
"crypto/rsa" "crypto/rsa"
"fmt" "fmt"
"net/http"
"net/http/httptest"
"testing" "testing"
"time" "time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authentication/authenticator"
@ -44,12 +41,13 @@ import (
"k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/authorization/authorizer"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers" clientinformers "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/controlplane"
kubefeatures "k8s.io/kubernetes/pkg/features" kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/serviceaccount" "k8s.io/kubernetes/pkg/serviceaccount"
serviceaccountadmission "k8s.io/kubernetes/plugin/pkg/admission/serviceaccount" serviceaccountadmission "k8s.io/kubernetes/plugin/pkg/admission/serviceaccount"
@ -275,7 +273,7 @@ func TestServiceAccountTokenAuthentication(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Secret not created: %v", err) t.Fatalf("Secret not created: %v", err)
} }
roClientConfig := config roClientConfig := *config
roClientConfig.BearerToken = string(secret.Data[v1.ServiceAccountTokenKey]) roClientConfig.BearerToken = string(secret.Data[v1.ServiceAccountTokenKey])
roClient := clientset.NewForConfigOrDie(&roClientConfig) roClient := clientset.NewForConfigOrDie(&roClientConfig)
doServiceAccountAPIRequests(t, roClient, myns, true, true, false) doServiceAccountAPIRequests(t, roClient, myns, true, true, false)
@ -312,7 +310,7 @@ func TestServiceAccountTokenAuthentication(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Secret not created: %v", err) t.Fatalf("Secret not created: %v", err)
} }
rwClientConfig := config rwClientConfig := *config
rwClientConfig.BearerToken = string(secret.Data[v1.ServiceAccountTokenKey]) rwClientConfig.BearerToken = string(secret.Data[v1.ServiceAccountTokenKey])
rwClient := clientset.NewForConfigOrDie(&rwClientConfig) rwClient := clientset.NewForConfigOrDie(&rwClientConfig)
doServiceAccountAPIRequests(t, rwClient, myns, true, true, true) doServiceAccountAPIRequests(t, rwClient, myns, true, true, true)
@ -321,27 +319,7 @@ func TestServiceAccountTokenAuthentication(t *testing.T) {
// startServiceAccountTestServerAndWaitForCaches returns a started server // startServiceAccountTestServerAndWaitForCaches returns a started server
// It is the responsibility of the caller to ensure the returned stopFunc is called // It is the responsibility of the caller to ensure the returned stopFunc is called
func startServiceAccountTestServerAndWaitForCaches(t *testing.T) (*clientset.Clientset, restclient.Config, func(), error) { func startServiceAccountTestServerAndWaitForCaches(t *testing.T) (*clientset.Clientset, *restclient.Config, func(), error) {
// Listener
h := &framework.APIServerHolder{Initialized: make(chan struct{})}
apiServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
<-h.Initialized
h.M.GenericAPIServer.Handler.ServeHTTP(w, req)
}))
// Anonymous client config
clientConfig := restclient.Config{Host: apiServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}
// Root client
// TODO: remove rootClient after we refactor pkg/admission to use the clientset.
rootClientset := clientset.NewForConfigOrDie(&restclient.Config{Host: apiServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}, BearerToken: rootToken})
externalRootClientset := clientset.NewForConfigOrDie(&restclient.Config{Host: apiServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}, BearerToken: rootToken})
externalInformers := informers.NewSharedInformerFactory(externalRootClientset, controller.NoResyncPeriodFunc())
informers := informers.NewSharedInformerFactory(rootClientset, controller.NoResyncPeriodFunc())
// Set up two authenticators:
// 1. A token authenticator that maps the rootToken to the "root" user
// 2. A ServiceAccountToken authenticator that validates ServiceAccount tokens
rootTokenAuth := authenticator.TokenFunc(func(ctx context.Context, token string) (*authenticator.Response, bool, error) { rootTokenAuth := authenticator.TokenFunc(func(ctx context.Context, token string) (*authenticator.Response, bool, error) {
if token == rootToken { if token == rootToken {
return &authenticator.Response{User: &user.DefaultInfo{Name: rootUserName}}, true, nil return &authenticator.Response{User: &user.DefaultInfo{Name: rootUserName}}, true, nil
@ -349,13 +327,37 @@ func startServiceAccountTestServerAndWaitForCaches(t *testing.T) (*clientset.Cli
return nil, false, nil return nil, false, nil
}) })
serviceAccountKey, _ := rsa.GenerateKey(rand.Reader, 2048) serviceAccountKey, _ := rsa.GenerateKey(rand.Reader, 2048)
var informers clientinformers.SharedInformerFactory
var externalInformers clientinformers.SharedInformerFactory
var rootClientset *clientset.Clientset
// Set up a API server
_, clientConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerConfig: func(config *controlplane.Config) {
rootConfig := restclient.CopyConfig(config.GenericConfig.LoopbackClientConfig)
rootConfig.BearerToken = rootToken
rootClientset = clientset.NewForConfigOrDie(rootConfig)
externalRootClientset := clientset.NewForConfigOrDie(rootConfig)
externalInformers = clientinformers.NewSharedInformerFactory(externalRootClientset, controller.NoResyncPeriodFunc())
informers = clientinformers.NewSharedInformerFactory(rootClientset, controller.NoResyncPeriodFunc())
// Set up two authenticators:
// 1. A token authenticator that maps the rootToken to the "root" user
// 2. A ServiceAccountToken authenticator that validates ServiceAccount tokens
serviceAccountTokenGetter := serviceaccountcontroller.NewGetterFromClient( serviceAccountTokenGetter := serviceaccountcontroller.NewGetterFromClient(
rootClientset, rootClientset,
externalInformers.Core().V1().Secrets().Lister(), externalInformers.Core().V1().Secrets().Lister(),
externalInformers.Core().V1().ServiceAccounts().Lister(), externalInformers.Core().V1().ServiceAccounts().Lister(),
externalInformers.Core().V1().Pods().Lister(), externalInformers.Core().V1().Pods().Lister(),
) )
serviceAccountTokenAuth := serviceaccount.JWTTokenAuthenticator([]string{serviceaccount.LegacyIssuer}, []interface{}{&serviceAccountKey.PublicKey}, nil, serviceaccount.NewLegacyValidator(true, serviceAccountTokenGetter)) serviceAccountTokenAuth := serviceaccount.JWTTokenAuthenticator(
[]string{serviceaccount.LegacyIssuer},
[]interface{}{&serviceAccountKey.PublicKey},
nil,
serviceaccount.NewLegacyValidator(true, serviceAccountTokenGetter),
)
authenticator := group.NewAuthenticatedGroupAdder(union.New( authenticator := group.NewAuthenticatedGroupAdder(union.New(
bearertoken.New(rootTokenAuth), bearertoken.New(rootTokenAuth),
bearertoken.New(serviceAccountTokenAuth), bearertoken.New(serviceAccountTokenAuth),
@ -401,21 +403,20 @@ func startServiceAccountTestServerAndWaitForCaches(t *testing.T) (*clientset.Cli
serviceAccountAdmission.SetExternalKubeClientSet(externalRootClientset) serviceAccountAdmission.SetExternalKubeClientSet(externalRootClientset)
serviceAccountAdmission.SetExternalKubeInformerFactory(externalInformers) serviceAccountAdmission.SetExternalKubeInformerFactory(externalInformers)
controlPlaneConfig := framework.NewControlPlaneConfig() config.GenericConfig.EnableIndex = true
controlPlaneConfig.GenericConfig.EnableIndex = true config.GenericConfig.Authentication.Authenticator = authenticator
controlPlaneConfig.GenericConfig.Authentication.Authenticator = authenticator config.GenericConfig.Authorization.Authorizer = authorizer
controlPlaneConfig.GenericConfig.Authorization.Authorizer = authorizer config.GenericConfig.AdmissionControl = serviceAccountAdmission
controlPlaneConfig.GenericConfig.AdmissionControl = serviceAccountAdmission },
_, _, kubeAPIServerCloseFn := framework.RunAnAPIServerUsingServer(controlPlaneConfig, apiServer, h) })
// Start the service account and service account token controllers
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
stop := func() { stop := func() {
cancel() cancel()
kubeAPIServerCloseFn() tearDownFn()
apiServer.Close()
} }
// Start the service account and service account token controllers
tokenGenerator, err := serviceaccount.JWTTokenGenerator(serviceaccount.LegacyIssuer, serviceAccountKey) tokenGenerator, err := serviceaccount.JWTTokenGenerator(serviceaccount.LegacyIssuer, serviceAccountKey)
if err != nil { if err != nil {
return rootClientset, clientConfig, stop, err return rootClientset, clientConfig, stop, err