From 32cbd77121befa630018e064c7890c5ec0086998 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Sun, 12 Jun 2022 19:29:25 +0200 Subject: [PATCH 1/5] Clean shutdown of garbagecollector integration tests --- pkg/controller/garbagecollector/garbagecollector.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index 3fc0fda7ae2..74df517668a 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -77,6 +77,9 @@ type GarbageCollector struct { // GC caches the owners that do not exist according to the API server. absentOwnerCache *ReferenceCache + kubeClient clientset.Interface + eventBroadcaster record.EventBroadcaster + workerLock sync.RWMutex } @@ -94,8 +97,6 @@ func NewGarbageCollector( ) (*GarbageCollector, error) { eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartStructuredLogging(0) - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "garbage-collector-controller"}) attemptToDelete := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_delete") @@ -107,6 +108,8 @@ func NewGarbageCollector( attemptToDelete: attemptToDelete, attemptToOrphan: attemptToOrphan, absentOwnerCache: absentOwnerCache, + kubeClient: kubeClient, + eventBroadcaster: eventBroadcaster, } gc.dependencyGraphBuilder = &GraphBuilder{ eventRecorder: eventRecorder, @@ -146,6 +149,11 @@ func (gc *GarbageCollector) Run(ctx context.Context, workers int) { defer gc.attemptToOrphan.ShutDown() defer gc.dependencyGraphBuilder.graphChanges.ShutDown() + // Start events processing pipeline. + gc.eventBroadcaster.StartStructuredLogging(0) + gc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: gc.kubeClient.CoreV1().Events("")}) + defer gc.eventBroadcaster.Shutdown() + klog.Infof("Starting garbage collector controller") defer klog.Infof("Shutting down garbage collector controller") From 8a87681a399f3a5d6f37bccdc150e53ba83414a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Sun, 12 Jun 2022 17:32:05 +0200 Subject: [PATCH 2/5] Clean shutdown of resourcequota integration tests --- .../resource_quota_controller.go | 1 + .../resourcequota/resource_quota_monitor.go | 11 + test/integration/quota/quota_test.go | 198 +++++++----------- 3 files changed, 89 insertions(+), 121 deletions(-) diff --git a/pkg/controller/resourcequota/resource_quota_controller.go b/pkg/controller/resourcequota/resource_quota_controller.go index a89a16a58b3..8441a3c4032 100644 --- a/pkg/controller/resourcequota/resource_quota_controller.go +++ b/pkg/controller/resourcequota/resource_quota_controller.go @@ -269,6 +269,7 @@ func (rq *Controller) worker(ctx context.Context, queue workqueue.RateLimitingIn func (rq *Controller) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer rq.queue.ShutDown() + defer rq.missingUsageQueue.ShutDown() klog.Infof("Starting resource quota controller") defer klog.Infof("Shutting down resource quota controller") diff --git a/pkg/controller/resourcequota/resource_quota_monitor.go b/pkg/controller/resourcequota/resource_quota_monitor.go index d37a1a0f60f..e2defa9d009 100644 --- a/pkg/controller/resourcequota/resource_quota_monitor.go +++ b/pkg/controller/resourcequota/resource_quota_monitor.go @@ -305,6 +305,8 @@ func (qm *QuotaMonitor) IsSynced() bool { // Run sets the stop channel and starts monitor execution until stopCh is // closed. Any running monitors will be stopped before Run returns. func (qm *QuotaMonitor) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + klog.Infof("QuotaMonitor running") defer klog.Infof("QuotaMonitor stopping") @@ -317,6 +319,15 @@ func (qm *QuotaMonitor) Run(stopCh <-chan struct{}) { // Start monitors and begin change processing until the stop channel is // closed. qm.StartMonitors() + + // The following workers are hanging forever until the queue is + // shutted down, so we need to shut it down in a separate goroutine. + go func() { + defer utilruntime.HandleCrash() + defer qm.resourceChanges.ShutDown() + + <-stopCh + }() wait.Until(qm.runProcessResourceChanges, 1*time.Second, stopCh) // Stop any running monitors. diff --git a/test/integration/quota/quota_test.go b/test/integration/quota/quota_test.go index 382e3d2c643..f8885d84da3 100644 --- a/test/integration/quota/quota_test.go +++ b/test/integration/quota/quota_test.go @@ -20,8 +20,7 @@ import ( "context" "errors" "fmt" - "net/http" - "net/http/httptest" + "os" "testing" "time" @@ -31,23 +30,17 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" - "k8s.io/apiserver/pkg/admission" - genericadmissioninitializer "k8s.io/apiserver/pkg/admission/initializer" - "k8s.io/apiserver/pkg/admission/plugin/resourcequota" - resourcequotaapi "k8s.io/apiserver/pkg/admission/plugin/resourcequota/apis/resourcequota" "k8s.io/apiserver/pkg/quota/v1/generic" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" - restclient "k8s.io/client-go/rest" watchtools "k8s.io/client-go/tools/watch" + "k8s.io/kubernetes/cmd/kube-apiserver/app/options" "k8s.io/kubernetes/pkg/controller" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota" - kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" quotainstall "k8s.io/kubernetes/pkg/quota/v1/install" "k8s.io/kubernetes/test/integration/framework" ) @@ -64,41 +57,20 @@ const ( // quota_test.go:115: Took 12.021640372s to scale up with quota func TestQuota(t *testing.T) { // Set up a API server - h := &framework.APIServerHolder{Initialized: make(chan struct{})} - s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - <-h.Initialized - h.M.GenericAPIServer.Handler.ServeHTTP(w, req) - })) + _, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"} + }, + }) + defer tearDownFn() - admissionCh := make(chan struct{}) - defer close(admissionCh) - clientset := clientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - config := &resourcequotaapi.Configuration{} - admissionControl, err := resourcequota.NewResourceQuota(config, 5) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - internalInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) - qca := quotainstall.NewQuotaConfigurationForAdmission() + clientset := clientset.NewForConfigOrDie(kubeConfig) - initializers := admission.PluginInitializers{ - genericadmissioninitializer.New(clientset, internalInformers, nil, nil, admissionCh), - kubeapiserveradmission.NewPluginInitializer(nil, nil, qca), - } - initializers.Initialize(admissionControl) - if err := admission.ValidateInitialization(admissionControl); err != nil { - t.Fatalf("couldn't initialize resource quota: %v", err) - } - - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - controlPlaneConfig.GenericConfig.AdmissionControl = admissionControl - _, _, closeFn := framework.RunAnAPIServerUsingServer(controlPlaneConfig, s, h) - defer closeFn() - - ns := framework.CreateTestingNamespace("quotaed", t) - defer framework.DeleteTestingNamespace(ns, t) - ns2 := framework.CreateTestingNamespace("non-quotaed", t) - defer framework.DeleteTestingNamespace(ns2, t) + ns := framework.CreateNamespaceOrDie(clientset, "quotaed", t) + defer framework.DeleteNamespaceOrDie(clientset, ns, t) + ns2 := framework.CreateNamespaceOrDie(clientset, "non-quotaed", t) + defer framework.DeleteNamespaceOrDie(clientset, ns2, t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -136,7 +108,6 @@ func TestQuota(t *testing.T) { // Periodically the quota controller to detect new resource types go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Done()) - internalInformers.Start(ctx.Done()) informers.Start(ctx.Done()) close(informersStarted) @@ -292,49 +263,43 @@ func scale(t *testing.T, namespace string, clientset *clientset.Clientset) { } func TestQuotaLimitedResourceDenial(t *testing.T) { - // Set up an API server - h := &framework.APIServerHolder{Initialized: make(chan struct{})} - s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - <-h.Initialized - h.M.GenericAPIServer.Handler.ServeHTTP(w, req) - })) - - admissionCh := make(chan struct{}) - defer close(admissionCh) - clientset := clientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - - // stop creation of a pod resource unless there is a quota - config := &resourcequotaapi.Configuration{ - LimitedResources: []resourcequotaapi.LimitedResource{ - { - Resource: "pods", - MatchContains: []string{"pods"}, - }, - }, - } - qca := quotainstall.NewQuotaConfigurationForAdmission() - admissionControl, err := resourcequota.NewResourceQuota(config, 5) + // Create admission configuration with ResourceQuota configuration. + admissionConfigFile, err := os.CreateTemp("", "admission-config.yaml") if err != nil { - t.Fatalf("unexpected error: %v", err) + t.Fatal(err) } - externalInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) - - initializers := admission.PluginInitializers{ - genericadmissioninitializer.New(clientset, externalInformers, nil, nil, admissionCh), - kubeapiserveradmission.NewPluginInitializer(nil, nil, qca), - } - initializers.Initialize(admissionControl) - if err := admission.ValidateInitialization(admissionControl); err != nil { - t.Fatalf("couldn't initialize resource quota: %v", err) + defer os.Remove(admissionConfigFile.Name()) + if err := os.WriteFile(admissionConfigFile.Name(), []byte(` +apiVersion: apiserver.k8s.io/v1alpha1 +kind: AdmissionConfiguration +plugins: +- name: ResourceQuota + configuration: + apiVersion: apiserver.config.k8s.io/v1 + kind: ResourceQuotaConfiguration + limitedResources: + - resource: pods + matchContains: + - pods +`), os.FileMode(0644)); err != nil { + t.Fatal(err) } - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - controlPlaneConfig.GenericConfig.AdmissionControl = admissionControl - _, _, closeFn := framework.RunAnAPIServerUsingServer(controlPlaneConfig, s, h) - defer closeFn() + // Set up an API server + _, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"} + opts.Admission.GenericAdmission.ConfigFile = admissionConfigFile.Name() - ns := framework.CreateTestingNamespace("quota", t) - defer framework.DeleteTestingNamespace(ns, t) + }, + }) + defer tearDownFn() + + clientset := clientset.NewForConfigOrDie(kubeConfig) + + ns := framework.CreateNamespaceOrDie(clientset, "quota", t) + defer framework.DeleteNamespaceOrDie(clientset, ns, t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -372,7 +337,6 @@ func TestQuotaLimitedResourceDenial(t *testing.T) { // Periodically the quota controller to detect new resource types go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Done()) - externalInformers.Start(ctx.Done()) informers.Start(ctx.Done()) close(informersStarted) @@ -425,50 +389,43 @@ func TestQuotaLimitedResourceDenial(t *testing.T) { } func TestQuotaLimitService(t *testing.T) { + // Create admission configuration with ResourceQuota configuration. + admissionConfigFile, err := os.CreateTemp("", "admission-config.yaml") + if err != nil { + t.Fatal(err) + } + defer os.Remove(admissionConfigFile.Name()) + if err := os.WriteFile(admissionConfigFile.Name(), []byte(` +apiVersion: apiserver.k8s.io/v1alpha1 +kind: AdmissionConfiguration +plugins: +- name: ResourceQuota + configuration: + apiVersion: apiserver.config.k8s.io/v1 + kind: ResourceQuotaConfiguration + limitedResources: + - resource: pods + matchContains: + - pods +`), os.FileMode(0644)); err != nil { + t.Fatal(err) + } // Set up an API server - h := &framework.APIServerHolder{Initialized: make(chan struct{})} - s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - <-h.Initialized - h.M.GenericAPIServer.Handler.ServeHTTP(w, req) - })) + _, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"} + opts.Admission.GenericAdmission.ConfigFile = admissionConfigFile.Name() - admissionCh := make(chan struct{}) - defer close(admissionCh) - clientset := clientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - - // stop creation of a pod resource unless there is a quota - config := &resourcequotaapi.Configuration{ - LimitedResources: []resourcequotaapi.LimitedResource{ - { - Resource: "pods", - MatchContains: []string{"pods"}, - }, }, - } - qca := quotainstall.NewQuotaConfigurationForAdmission() - admissionControl, err := resourcequota.NewResourceQuota(config, 5) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - externalInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) + }) + defer tearDownFn() - initializers := admission.PluginInitializers{ - genericadmissioninitializer.New(clientset, externalInformers, nil, nil, admissionCh), - kubeapiserveradmission.NewPluginInitializer(nil, nil, qca), - } - initializers.Initialize(admissionControl) - if err := admission.ValidateInitialization(admissionControl); err != nil { - t.Fatalf("couldn't initialize resource quota: %v", err) - } + clientset := clientset.NewForConfigOrDie(kubeConfig) - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - controlPlaneConfig.GenericConfig.AdmissionControl = admissionControl - _, _, closeFn := framework.RunAnAPIServerUsingServer(controlPlaneConfig, s, h) - defer closeFn() - - ns := framework.CreateTestingNamespace("quota", t) - defer framework.DeleteTestingNamespace(ns, t) + ns := framework.CreateNamespaceOrDie(clientset, "quota", t) + defer framework.DeleteNamespaceOrDie(clientset, ns, t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -506,7 +463,6 @@ func TestQuotaLimitService(t *testing.T) { // Periodically the quota controller to detect new resource types go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Done()) - externalInformers.Start(ctx.Done()) informers.Start(ctx.Done()) close(informersStarted) From 41d7ddee1afe14eca4ad202567ebd14186082960 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Sun, 12 Jun 2022 23:27:52 +0200 Subject: [PATCH 3/5] Clean shutdown of apimachinery integration tests --- .../apimachinery/watch_restart_test.go | 21 ++- .../apimachinery/watch_timeout_test.go | 122 +++++++++++++----- 2 files changed, 101 insertions(+), 42 deletions(-) diff --git a/test/integration/apimachinery/watch_restart_test.go b/test/integration/apimachinery/watch_restart_test.go index 3b73c0f6189..d6380b1b2d6 100644 --- a/test/integration/apimachinery/watch_restart_test.go +++ b/test/integration/apimachinery/watch_restart_test.go @@ -31,9 +31,9 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" - restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" watchtools "k8s.io/client-go/tools/watch" + kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/test/integration/framework" ) @@ -66,19 +66,16 @@ func TestWatchRestartsIfTimeoutNotReached(t *testing.T) { // Has to be longer than 5 seconds timeout := 30 * time.Second - // Set up an API server - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - // Timeout is set random between MinRequestTimeout and 2x - controlPlaneConfig.GenericConfig.MinRequestTimeout = int(timeout.Seconds()) / 4 - _, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig) - defer closeFn() + server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--min-request-timeout=7"}, framework.SharedEtcd()) + defer server.TearDownFn() - config := &restclient.Config{ - Host: s.URL, + clientset, err := kubernetes.NewForConfig(server.ClientConfig) + if err != nil { + t.Fatal(err) } - namespaceObject := framework.CreateTestingNamespace("retry-watch", t) - defer framework.DeleteTestingNamespace(namespaceObject, t) + namespaceObject := framework.CreateNamespaceOrDie(clientset, "retry-watch", t) + defer framework.DeleteNamespaceOrDie(clientset, namespaceObject, t) getListFunc := func(c *kubernetes.Clientset, secret *corev1.Secret) func(options metav1.ListOptions) *corev1.SecretList { return func(options metav1.ListOptions) *corev1.SecretList { @@ -215,7 +212,7 @@ func TestWatchRestartsIfTimeoutNotReached(t *testing.T) { tc := tmptc // we need to copy it for parallel runs t.Run(tc.name, func(t *testing.T) { t.Parallel() - c, err := kubernetes.NewForConfig(config) + c, err := kubernetes.NewForConfig(server.ClientConfig) if err != nil { t.Fatalf("Failed to create clientset: %v", err) } diff --git a/test/integration/apimachinery/watch_timeout_test.go b/test/integration/apimachinery/watch_timeout_test.go index b7ba18cf9fc..529aaaa278d 100644 --- a/test/integration/apimachinery/watch_timeout_test.go +++ b/test/integration/apimachinery/watch_timeout_test.go @@ -19,8 +19,9 @@ package apimachinery import ( "bytes" "context" + "fmt" "io" - "log" + "net/http" "net/http/httptest" "net/http/httputil" "net/url" @@ -38,14 +39,67 @@ import ( restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" kubectlproxy "k8s.io/kubectl/pkg/proxy" + kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/test/integration/framework" ) +type extractRT struct { + http.Header +} + +func (rt *extractRT) RoundTrip(req *http.Request) (*http.Response, error) { + rt.Header = req.Header + return &http.Response{}, nil +} + +// headersForConfig extracts any http client logic necessary for the provided +// config. +func headersForConfig(c *restclient.Config, url *url.URL) (http.Header, error) { + extract := &extractRT{} + rt, err := restclient.HTTPWrappersForConfig(c, extract) + if err != nil { + return nil, err + } + request, err := http.NewRequest("GET", url.String(), nil) + if err != nil { + return nil, err + } + if _, err := rt.RoundTrip(request); err != nil { + return nil, err + } + return extract.Header, nil +} + +// websocketConfig constructs a websocket config to the provided URL, using the client +// config, with the specified protocols. +func websocketConfig(url *url.URL, config *restclient.Config, protocols []string) (*websocket.Config, error) { + tlsConfig, err := restclient.TLSConfigFor(config) + if err != nil { + return nil, fmt.Errorf("Failed to create tls config: %v", err) + } + if url.Scheme == "https" { + url.Scheme = "wss" + } else { + url.Scheme = "ws" + } + headers, err := headersForConfig(config, url) + if err != nil { + return nil, fmt.Errorf("Failed to load http headers: %v", err) + } + cfg, err := websocket.NewConfig(url.String(), "http://localhost") + if err != nil { + return nil, fmt.Errorf("Failed to create websocket config: %v", err) + } + cfg.Header = headers + cfg.TlsConfig = tlsConfig + cfg.Protocol = protocols + return cfg, err +} + func TestWebsocketWatchClientTimeout(t *testing.T) { // server setup - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - instance, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig) - defer closeFn() + server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) + defer server.TearDownFn() // object setup service := &corev1.Service{ @@ -57,7 +111,7 @@ func TestWebsocketWatchClientTimeout(t *testing.T) { configmap := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{Name: "test"}, } - clientset, err := kubernetes.NewForConfig(instance.GenericAPIServer.LoopbackClientConfig) + clientset, err := kubernetes.NewForConfig(server.ClientConfig) if err != nil { t.Fatal(err) } @@ -90,12 +144,13 @@ func TestWebsocketWatchClientTimeout(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - - u, _ := url.Parse(s.URL) - apiURL := "ws://" + u.Host + tc.path - wsc, err := websocket.NewConfig(apiURL, apiURL) + url, err := url.Parse(server.ClientConfig.Host + tc.path) if err != nil { - log.Fatal(err) + t.Fatal(err) + } + wsc, err := websocketConfig(url, server.ClientConfig, nil) + if err != nil { + t.Fatal(err) } wsConn, err := websocket.DialConfig(wsc) @@ -142,29 +197,36 @@ func TestWebsocketWatchClientTimeout(t *testing.T) { } } -func TestWatchClientTimeout(t *testing.T) { - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - _, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig) - defer closeFn() +func TestWatchClientTimeoutXXX(t *testing.T) { + // server setup + server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) + defer server.TearDownFn() t.Run("direct", func(t *testing.T) { - t.Logf("client at %s", s.URL) - testWatchClientTimeouts(t, s.URL) + t.Logf("client at %s", server.ClientConfig.Host) + testWatchClientTimeouts(t, restclient.CopyConfig(server.ClientConfig)) }) t.Run("reverse proxy", func(t *testing.T) { - u, _ := url.Parse(s.URL) + u, _ := url.Parse(server.ClientConfig.Host) proxy := httputil.NewSingleHostReverseProxy(u) proxy.FlushInterval = -1 - proxyServer := httptest.NewServer(httputil.NewSingleHostReverseProxy(u)) + + transport, err := restclient.TransportFor(server.ClientConfig) + if err != nil { + t.Fatal(err) + } + proxy.Transport = transport + + proxyServer := httptest.NewServer(proxy) defer proxyServer.Close() - t.Logf("client to %s, backend at %s", proxyServer.URL, s.URL) - testWatchClientTimeouts(t, proxyServer.URL) + t.Logf("client to %s, backend at %s", proxyServer.URL, server.ClientConfig.Host) + testWatchClientTimeouts(t, &restclient.Config{Host: proxyServer.URL}) }) t.Run("kubectl proxy", func(t *testing.T) { - kubectlProxyServer, err := kubectlproxy.NewServer("", "/", "/static/", nil, &restclient.Config{Host: s.URL, Timeout: 2 * time.Second}, 0, false) + kubectlProxyServer, err := kubectlproxy.NewServer("", "/", "/static/", nil, server.ClientConfig, 0, false) if err != nil { t.Fatal(err) } @@ -175,26 +237,26 @@ func TestWatchClientTimeout(t *testing.T) { defer kubectlProxyListener.Close() go kubectlProxyServer.ServeOnListener(kubectlProxyListener) - t.Logf("client to %s, backend at %s", kubectlProxyListener.Addr().String(), s.URL) - testWatchClientTimeouts(t, "http://"+kubectlProxyListener.Addr().String()) + t.Logf("client to %s, backend at %s", kubectlProxyListener.Addr().String(), server.ClientConfig.Host) + testWatchClientTimeouts(t, &restclient.Config{Host: "http://" + kubectlProxyListener.Addr().String()}) }) } -func testWatchClientTimeouts(t *testing.T, url string) { +func testWatchClientTimeouts(t *testing.T, config *restclient.Config) { t.Run("timeout", func(t *testing.T) { - testWatchClientTimeout(t, url, time.Second, 0) + testWatchClientTimeout(t, config, time.Second, 0) }) t.Run("timeoutSeconds", func(t *testing.T) { - testWatchClientTimeout(t, url, 0, time.Second) + testWatchClientTimeout(t, config, 0, time.Second) }) t.Run("timeout+timeoutSeconds", func(t *testing.T) { - testWatchClientTimeout(t, url, time.Second, time.Second) + testWatchClientTimeout(t, config, time.Second, time.Second) }) } -func testWatchClientTimeout(t *testing.T, serverURL string, timeout, timeoutSeconds time.Duration) { - // client - client, err := kubernetes.NewForConfig(&restclient.Config{Host: serverURL, Timeout: timeout}) +func testWatchClientTimeout(t *testing.T, config *restclient.Config, timeout, timeoutSeconds time.Duration) { + config.Timeout = timeout + client, err := kubernetes.NewForConfig(config) if err != nil { t.Fatal(err) } From 3930362ad40301de070f0be5e3cd669a38a35062 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Fri, 10 Jun 2022 16:35:42 +0200 Subject: [PATCH 4/5] Clean(er) shutdown of apiserver integration tests --- test/integration/apiserver/apiserver_test.go | 179 +++++++++--------- .../apiserver/cve_2021_29923_test.go | 22 ++- test/integration/apiserver/export_test.go | 4 +- .../apiserver/no_new_betas_test.go | 4 +- test/integration/apiserver/patch_test.go | 13 +- test/integration/apiserver/print_test.go | 10 +- test/integration/apiserver/watchcache_test.go | 29 +-- 7 files changed, 137 insertions(+), 124 deletions(-) diff --git a/test/integration/apiserver/apiserver_test.go b/test/integration/apiserver/apiserver_test.go index 5553e6a5b17..c5ed11ac5b2 100644 --- a/test/integration/apiserver/apiserver_test.go +++ b/test/integration/apiserver/apiserver_test.go @@ -24,7 +24,6 @@ import ( "io" "net" "net/http" - "net/http/httptest" "path" "reflect" "strconv" @@ -55,6 +54,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/endpoints/handlers" "k8s.io/apiserver/pkg/features" + "k8s.io/apiserver/pkg/storage/storagebackend" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/dynamic" clientset "k8s.io/client-go/kubernetes" @@ -64,52 +64,38 @@ import ( "k8s.io/client-go/tools/pager" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/klog/v2" + "k8s.io/kubernetes/cmd/kube-apiserver/app/options" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" - "k8s.io/kubernetes/pkg/controlplane" "k8s.io/kubernetes/test/integration" "k8s.io/kubernetes/test/integration/etcd" "k8s.io/kubernetes/test/integration/framework" ) -func setup(t testing.TB, groupVersions ...schema.GroupVersion) (*httptest.Server, clientset.Interface, framework.CloseFunc) { +func setup(t *testing.T, groupVersions ...schema.GroupVersion) (clientset.Interface, *restclient.Config, framework.TearDownFunc) { return setupWithResources(t, groupVersions, nil) } -func setupWithOptions(t testing.TB, opts *framework.ControlPlaneConfigOptions, groupVersions ...schema.GroupVersion) (*httptest.Server, clientset.Interface, framework.CloseFunc) { - return setupWithResourcesWithOptions(t, opts, groupVersions, nil) +func setupWithResources(t *testing.T, groupVersions []schema.GroupVersion, resources []schema.GroupVersionResource) (clientset.Interface, *restclient.Config, framework.TearDownFunc) { + return framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerConfig: func(config *controlplane.Config) { + if len(groupVersions) > 0 || len(resources) > 0 { + resourceConfig := controlplane.DefaultAPIResourceConfigSource() + resourceConfig.EnableVersions(groupVersions...) + resourceConfig.EnableResources(resources...) + config.ExtraConfig.APIResourceConfigSource = resourceConfig + } + }, + }) } -func setupWithResources(t testing.TB, groupVersions []schema.GroupVersion, resources []schema.GroupVersionResource) (*httptest.Server, clientset.Interface, framework.CloseFunc) { - return setupWithResourcesWithOptions(t, &framework.ControlPlaneConfigOptions{}, groupVersions, resources) -} - -func setupWithResourcesWithOptions(t testing.TB, opts *framework.ControlPlaneConfigOptions, groupVersions []schema.GroupVersion, resources []schema.GroupVersionResource) (*httptest.Server, clientset.Interface, framework.CloseFunc) { - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfigWithOptions(opts) - if len(groupVersions) > 0 || len(resources) > 0 { - resourceConfig := controlplane.DefaultAPIResourceConfigSource() - resourceConfig.EnableVersions(groupVersions...) - resourceConfig.EnableResources(resources...) - controlPlaneConfig.ExtraConfig.APIResourceConfigSource = resourceConfig - } - controlPlaneConfig.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig() - _, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig) - - clientSet, err := clientset.NewForConfig(&restclient.Config{Host: s.URL, QPS: -1}) - if err != nil { - t.Fatalf("Error in create clientset: %v", err) - } - return s, clientSet, closeFn -} - -func verifyStatusCode(t *testing.T, verb, URL, body string, expectedStatusCode int) { +func verifyStatusCode(t *testing.T, transport http.RoundTripper, verb, URL, body string, expectedStatusCode int) { // We don't use the typed Go client to send this request to be able to verify the response status code. bodyBytes := bytes.NewReader([]byte(body)) req, err := http.NewRequest(verb, URL, bodyBytes) if err != nil { t.Fatalf("unexpected error: %v in sending req with verb: %s, URL: %s and body: %s", err, verb, URL, body) } - transport := http.DefaultTransport klog.Infof("Sending request: %v", req) resp, err := transport.RoundTrip(req) if err != nil { @@ -161,8 +147,8 @@ var cascDel = ` ` func Test4xxStatusCodeInvalidPatch(t *testing.T) { - _, client, closeFn := setup(t) - defer closeFn() + client, _, tearDownFn := setup(t) + defer tearDownFn() obj := []byte(`{ "apiVersion": "apps/v1", @@ -225,12 +211,10 @@ func Test4xxStatusCodeInvalidPatch(t *testing.T) { } func TestCacheControl(t *testing.T) { - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfigWithOptions(&framework.ControlPlaneConfigOptions{}) - controlPlaneConfig.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig() - instanceConfig, _, closeFn := framework.RunAnAPIServer(controlPlaneConfig) - defer closeFn() + server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) + defer server.TearDownFn() - rt, err := restclient.TransportFor(instanceConfig.GenericAPIServer.LoopbackClientConfig) + rt, err := restclient.TransportFor(server.ClientConfig) if err != nil { t.Fatal(err) } @@ -254,7 +238,7 @@ func TestCacheControl(t *testing.T) { } for _, path := range paths { t.Run(path, func(t *testing.T) { - req, err := http.NewRequest("GET", instanceConfig.GenericAPIServer.LoopbackClientConfig.Host+path, nil) + req, err := http.NewRequest("GET", server.ClientConfig.Host+path, nil) if err != nil { t.Fatal(err) } @@ -272,13 +256,10 @@ func TestCacheControl(t *testing.T) { // Tests that the apiserver returns HSTS headers as expected. func TestHSTS(t *testing.T) { - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfigWithOptions(&framework.ControlPlaneConfigOptions{}) - controlPlaneConfig.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig() - controlPlaneConfig.GenericConfig.HSTSDirectives = []string{"max-age=31536000", "includeSubDomains"} - instanceConfig, _, closeFn := framework.RunAnAPIServer(controlPlaneConfig) - defer closeFn() + server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--strict-transport-security-directives=max-age=31536000,includeSubDomains"}, framework.SharedEtcd()) + defer server.TearDownFn() - rt, err := restclient.TransportFor(instanceConfig.GenericAPIServer.LoopbackClientConfig) + rt, err := restclient.TransportFor(server.ClientConfig) if err != nil { t.Fatal(err) } @@ -302,7 +283,7 @@ func TestHSTS(t *testing.T) { } for _, path := range paths { t.Run(path, func(t *testing.T) { - req, err := http.NewRequest("GET", instanceConfig.GenericAPIServer.LoopbackClientConfig.Host+path, nil) + req, err := http.NewRequest("GET", server.ClientConfig.Host+path, nil) if err != nil { t.Fatal(err) } @@ -320,11 +301,16 @@ func TestHSTS(t *testing.T) { // Tests that the apiserver returns 202 status code as expected. func Test202StatusCode(t *testing.T) { - s, clientSet, closeFn := setup(t) - defer closeFn() + clientSet, kubeConfig, tearDownFn := setup(t) + defer tearDownFn() - ns := framework.CreateTestingNamespace("status-code", t) - defer framework.DeleteTestingNamespace(ns, t) + transport, err := restclient.TransportFor(kubeConfig) + if err != nil { + t.Fatal(err) + } + + ns := framework.CreateNamespaceOrDie(clientSet, "status-code", t) + defer framework.DeleteNamespaceOrDie(clientSet, ns, t) rsClient := clientSet.AppsV1().ReplicaSets(ns.Name) @@ -334,7 +320,7 @@ func Test202StatusCode(t *testing.T) { if err != nil { t.Fatalf("Failed to create rs: %v", err) } - verifyStatusCode(t, "DELETE", s.URL+path.Join("/apis/apps/v1/namespaces", ns.Name, "replicasets", rs.Name), "", 200) + verifyStatusCode(t, transport, "DELETE", kubeConfig.Host+path.Join("/apis/apps/v1/namespaces", ns.Name, "replicasets", rs.Name), "", 200) // 2. Create the resource with a finalizer so that the resource is not immediately deleted and then delete it without setting DeleteOptions. // Verify that the apiserver still returns 200 since DeleteOptions.OrphanDependents is not set. @@ -344,7 +330,7 @@ func Test202StatusCode(t *testing.T) { if err != nil { t.Fatalf("Failed to create rs: %v", err) } - verifyStatusCode(t, "DELETE", s.URL+path.Join("/apis/apps/v1/namespaces", ns.Name, "replicasets", rs.Name), "", 200) + verifyStatusCode(t, transport, "DELETE", kubeConfig.Host+path.Join("/apis/apps/v1/namespaces", ns.Name, "replicasets", rs.Name), "", 200) // 3. Create the resource and then delete it with DeleteOptions.OrphanDependents=false. // Verify that the server still returns 200 since the resource is immediately deleted. @@ -353,7 +339,7 @@ func Test202StatusCode(t *testing.T) { if err != nil { t.Fatalf("Failed to create rs: %v", err) } - verifyStatusCode(t, "DELETE", s.URL+path.Join("/apis/apps/v1/namespaces", ns.Name, "replicasets", rs.Name), cascDel, 200) + verifyStatusCode(t, transport, "DELETE", kubeConfig.Host+path.Join("/apis/apps/v1/namespaces", ns.Name, "replicasets", rs.Name), cascDel, 200) // 4. Create the resource with a finalizer so that the resource is not immediately deleted and then delete it with DeleteOptions.OrphanDependents=false. // Verify that the server returns 202 in this case. @@ -363,7 +349,7 @@ func Test202StatusCode(t *testing.T) { if err != nil { t.Fatalf("Failed to create rs: %v", err) } - verifyStatusCode(t, "DELETE", s.URL+path.Join("/apis/apps/v1/namespaces", ns.Name, "replicasets", rs.Name), cascDel, 202) + verifyStatusCode(t, transport, "DELETE", kubeConfig.Host+path.Join("/apis/apps/v1/namespaces", ns.Name, "replicasets", rs.Name), cascDel, 202) } var ( @@ -378,13 +364,18 @@ func TestListOptions(t *testing.T) { for _, watchCacheEnabled := range []bool{true, false} { t.Run(fmt.Sprintf("watchCacheEnabled=%t", watchCacheEnabled), func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIListChunking, true)() - etcdOptions := framework.DefaultEtcdOptions() - etcdOptions.EnableWatchCache = watchCacheEnabled - _, clientSet, closeFn := setupWithOptions(t, &framework.ControlPlaneConfigOptions{EtcdOptions: etcdOptions}) - defer closeFn() - ns := framework.CreateTestingNamespace("list-options", t) - defer framework.DeleteTestingNamespace(ns, t) + var storageTransport *storagebackend.TransportConfig + clientSet, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + opts.Etcd.EnableWatchCache = watchCacheEnabled + storageTransport = &opts.Etcd.StorageConfig.Transport + }, + }) + defer tearDownFn() + + ns := framework.CreateNamespaceOrDie(clientSet, "list-options", t) + defer framework.DeleteNamespaceOrDie(clientSet, ns, t) rsClient := clientSet.AppsV1().ReplicaSets(ns.Name) @@ -410,7 +401,7 @@ func TestListOptions(t *testing.T) { } // compact some of the revision history in etcd so we can test "too old" resource versions - _, kvClient, err := integration.GetEtcdClients(etcdOptions.StorageConfig.Transport) + _, kvClient, err := integration.GetEtcdClients(*storageTransport) if err != nil { t.Fatal(err) } @@ -610,13 +601,16 @@ func TestListResourceVersion0(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIListChunking, true)() - etcdOptions := framework.DefaultEtcdOptions() - etcdOptions.EnableWatchCache = tc.watchCacheEnabled - _, clientSet, closeFn := setupWithOptions(t, &framework.ControlPlaneConfigOptions{EtcdOptions: etcdOptions}) - defer closeFn() - ns := framework.CreateTestingNamespace("list-paging", t) - defer framework.DeleteTestingNamespace(ns, t) + clientSet, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + opts.Etcd.EnableWatchCache = tc.watchCacheEnabled + }, + }) + defer tearDownFn() + + ns := framework.CreateNamespaceOrDie(clientSet, "list-paging", t) + defer framework.DeleteNamespaceOrDie(clientSet, ns, t) rsClient := clientSet.AppsV1().ReplicaSets(ns.Name) @@ -665,11 +659,11 @@ func TestListResourceVersion0(t *testing.T) { func TestAPIListChunking(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIListChunking, true)() - _, clientSet, closeFn := setup(t) - defer closeFn() + clientSet, _, tearDownFn := setup(t) + defer tearDownFn() - ns := framework.CreateTestingNamespace("list-paging", t) - defer framework.DeleteTestingNamespace(ns, t) + ns := framework.CreateNamespaceOrDie(clientSet, "list-paging", t) + defer framework.DeleteNamespaceOrDie(clientSet, ns, t) rsClient := clientSet.AppsV1().ReplicaSets(ns.Name) @@ -743,13 +737,13 @@ func makeSecret(name string) *v1.Secret { } func TestNameInFieldSelector(t *testing.T) { - _, clientSet, closeFn := setup(t) - defer closeFn() + clientSet, _, tearDownFn := setup(t) + defer tearDownFn() numNamespaces := 3 for i := 0; i < 3; i++ { - ns := framework.CreateTestingNamespace(fmt.Sprintf("ns%d", i), t) - defer framework.DeleteTestingNamespace(ns, t) + ns := framework.CreateNamespaceOrDie(clientSet, fmt.Sprintf("ns%d", i), t) + defer framework.DeleteNamespaceOrDie(clientSet, ns, t) _, err := clientSet.CoreV1().Secrets(ns.Name).Create(context.TODO(), makeSecret("foo"), metav1.CreateOptions{}) if err != nil { @@ -834,8 +828,8 @@ func TestMetadataClient(t *testing.T) { } defer tearDown() - s, clientset, closeFn := setup(t) - defer closeFn() + clientset, kubeConfig, tearDownFn := setup(t) + defer tearDownFn() apiExtensionClient, err := apiextensionsclient.NewForConfig(config) if err != nil { @@ -885,12 +879,15 @@ func TestMetadataClient(t *testing.T) { name: "list, get, patch, and delete via metadata client", want: func(t *testing.T) { ns := "metadata-builtin" + namespace := framework.CreateNamespaceOrDie(clientset, ns, t) + defer framework.DeleteNamespaceOrDie(clientset, namespace, t) + svc, err := clientset.CoreV1().Services(ns).Create(context.TODO(), &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "test-1", Annotations: map[string]string{"foo": "bar"}}, Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 1000}}}}, metav1.CreateOptions{}) if err != nil { t.Fatalf("unable to create service: %v", err) } - cfg := metadata.ConfigFor(&restclient.Config{Host: s.URL}) + cfg := metadata.ConfigFor(kubeConfig) wrapper := &callWrapper{} cfg.Wrap(func(rt http.RoundTripper) http.RoundTripper { wrapper.nested = rt @@ -1025,6 +1022,9 @@ func TestMetadataClient(t *testing.T) { name: "watch via metadata client", want: func(t *testing.T) { ns := "metadata-watch" + namespace := framework.CreateNamespaceOrDie(clientset, ns, t) + defer framework.DeleteNamespaceOrDie(clientset, namespace, t) + svc, err := clientset.CoreV1().Services(ns).Create(context.TODO(), &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "test-2", Annotations: map[string]string{"foo": "bar"}}, Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 1000}}}}, metav1.CreateOptions{}) if err != nil { t.Fatalf("unable to create service: %v", err) @@ -1033,7 +1033,7 @@ func TestMetadataClient(t *testing.T) { t.Fatalf("unable to patch cr: %v", err) } - cfg := metadata.ConfigFor(&restclient.Config{Host: s.URL}) + cfg := metadata.ConfigFor(kubeConfig) wrapper := &callWrapper{} cfg.Wrap(func(rt http.RoundTripper) http.RoundTripper { wrapper.nested = rt @@ -1162,8 +1162,8 @@ func TestAPICRDProtobuf(t *testing.T) { } defer tearDown() - s, _, closeFn := setup(t) - defer closeFn() + _, kubeConfig, tearDownFn := setup(t) + defer tearDownFn() apiExtensionClient, err := apiextensionsclient.NewForConfig(config) if err != nil { @@ -1337,7 +1337,7 @@ func TestAPICRDProtobuf(t *testing.T) { cfg := dynamic.ConfigFor(config) if len(group) == 0 { - cfg = dynamic.ConfigFor(&restclient.Config{Host: s.URL}) + cfg = dynamic.ConfigFor(kubeConfig) cfg.APIPath = "/api" } else { cfg.APIPath = "/apis" @@ -1376,9 +1376,11 @@ func TestGetSubresourcesAsTables(t *testing.T) { } defer tearDown() - s, clientset, closeFn := setup(t) - defer closeFn() - fmt.Printf("%#v\n", clientset) + clientset, kubeConfig, tearDownFn := setup(t) + defer tearDownFn() + + ns := framework.CreateNamespaceOrDie(clientset, testNamespace, t) + defer framework.DeleteNamespaceOrDie(clientset, ns, t) apiExtensionClient, err := apiextensionsclient.NewForConfig(config) if err != nil { @@ -1557,7 +1559,7 @@ func TestGetSubresourcesAsTables(t *testing.T) { cfg := dynamic.ConfigFor(config) if len(group) == 0 { - cfg = dynamic.ConfigFor(&restclient.Config{Host: s.URL}) + cfg = dynamic.ConfigFor(kubeConfig) cfg.APIPath = "/api" } else { cfg.APIPath = "/apis" @@ -1596,8 +1598,11 @@ func TestTransform(t *testing.T) { } defer tearDown() - s, clientset, closeFn := setup(t) - defer closeFn() + clientset, kubeConfig, tearDownFn := setup(t) + defer tearDownFn() + + ns := framework.CreateNamespaceOrDie(clientset, testNamespace, t) + defer framework.DeleteNamespaceOrDie(clientset, ns, t) apiExtensionClient, err := apiextensionsclient.NewForConfig(config) if err != nil { @@ -2171,7 +2176,7 @@ func TestTransform(t *testing.T) { cfg := dynamic.ConfigFor(config) if len(group) == 0 { - cfg = dynamic.ConfigFor(&restclient.Config{Host: s.URL}) + cfg = dynamic.ConfigFor(kubeConfig) cfg.APIPath = "/api" } else { cfg.APIPath = "/apis" diff --git a/test/integration/apiserver/cve_2021_29923_test.go b/test/integration/apiserver/cve_2021_29923_test.go index 89bdd0aad43..03ae3cd1ed6 100644 --- a/test/integration/apiserver/cve_2021_29923_test.go +++ b/test/integration/apiserver/cve_2021_29923_test.go @@ -25,7 +25,8 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" - restclient "k8s.io/client-go/rest" + clientset "k8s.io/client-go/kubernetes" + kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/test/integration/framework" ) @@ -37,20 +38,23 @@ func gvr(g, v, r string) schema.GroupVersionResource { // Is it possible that exist more fields that can contain IPs, the test consider the most significative. // xref: https://issues.k8s.io/100895 func TestCanaryCVE_2021_29923(t *testing.T) { - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - _, server, closeFn := framework.RunAnAPIServer(controlPlaneConfig) - defer closeFn() + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd()) + defer server.TearDownFn() - config := restclient.Config{Host: server.URL} + client, err := clientset.NewForConfig(server.ClientConfig) + if err != nil { + t.Fatalf("unexpected error creating client: %v", err) + } - dynamicClient, err := dynamic.NewForConfig(&config) + ns := framework.CreateNamespaceOrDie(client, "test-cve-2021-29923", t) + defer framework.DeleteNamespaceOrDie(client, ns, t) + + dynamicClient, err := dynamic.NewForConfig(server.ClientConfig) if err != nil { t.Fatalf("unexpected error creating dynamic client: %v", err) } - ns := framework.CreateTestingNamespace("test-cve-2021-29923", t) - defer framework.DeleteTestingNamespace(ns, t) - objects := map[schema.GroupVersionResource]string{ // k8s.io/kubernetes/pkg/api/v1 gvr("", "v1", "nodes"): `{"kind": "Node", "apiVersion": "v1", "metadata": {"name": "node1"}, "spec": {"unschedulable": true}, "status": {"addresses":[{"address":"172.18.0.012","type":"InternalIP"}]}}`, diff --git a/test/integration/apiserver/export_test.go b/test/integration/apiserver/export_test.go index 5c03355c840..a2ecbea8d2f 100644 --- a/test/integration/apiserver/export_test.go +++ b/test/integration/apiserver/export_test.go @@ -27,8 +27,8 @@ import ( // Tests that the apiserver rejects the export param func TestExportRejection(t *testing.T) { - _, clientSet, closeFn := setup(t) - defer closeFn() + clientSet, _, tearDownFn := setup(t) + defer tearDownFn() _, err := clientSet.CoreV1().Namespaces().Create(context.Background(), &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{Name: "export-fail"}, diff --git a/test/integration/apiserver/no_new_betas_test.go b/test/integration/apiserver/no_new_betas_test.go index af7a6e5196b..fc7fc9706c9 100644 --- a/test/integration/apiserver/no_new_betas_test.go +++ b/test/integration/apiserver/no_new_betas_test.go @@ -57,11 +57,11 @@ func TestNoNewBetaAPIsByDefault(t *testing.T) { // if you found this because you want to create an integration test for your new beta API, the method you're looking for // is this setupWithResources method and you need to pass the resource you want to enable into it. - _, kubeClient, closeFn := setupWithResources(t, + kubeClient, _, tearDownFn := setupWithResources(t, []schema.GroupVersion{}, []schema.GroupVersionResource{}, ) - defer closeFn() + defer tearDownFn() _, allResourceLists, err := kubeClient.Discovery().ServerGroupsAndResources() if err != nil { diff --git a/test/integration/apiserver/patch_test.go b/test/integration/apiserver/patch_test.go index e619f6ce5de..7a1c09d8285 100644 --- a/test/integration/apiserver/patch_test.go +++ b/test/integration/apiserver/patch_test.go @@ -35,11 +35,11 @@ import ( // Tests that the apiserver retries patches func TestPatchConflicts(t *testing.T) { - _, clientSet, closeFn := setup(t) - defer closeFn() + clientSet, _, tearDownFn := setup(t) + defer tearDownFn() - ns := framework.CreateTestingNamespace("status-code", t) - defer framework.DeleteTestingNamespace(ns, t) + ns := framework.CreateNamespaceOrDie(clientSet, "status-code", t) + defer framework.DeleteNamespaceOrDie(clientSet, ns, t) numOfConcurrentPatches := 100 @@ -64,7 +64,10 @@ func TestPatchConflicts(t *testing.T) { } // Create the object we're going to conflict on - clientSet.CoreV1().Secrets(ns.Name).Create(context.TODO(), secret, metav1.CreateOptions{}) + _, err := clientSet.CoreV1().Secrets(ns.Name).Create(context.TODO(), secret, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } client := clientSet.CoreV1().RESTClient() successes := int32(0) diff --git a/test/integration/apiserver/print_test.go b/test/integration/apiserver/print_test.go index 0babb85d8ff..53aefe239a9 100644 --- a/test/integration/apiserver/print_test.go +++ b/test/integration/apiserver/print_test.go @@ -120,7 +120,7 @@ var missingHanlders = sets.NewString( ) func TestServerSidePrint(t *testing.T) { - s, _, closeFn := setupWithResources(t, + clientSet, kubeConfig, tearDownFn := setupWithResources(t, // additional groupversions needed for the test to run []schema.GroupVersion{ {Group: "discovery.k8s.io", Version: "v1"}, @@ -140,16 +140,16 @@ func TestServerSidePrint(t *testing.T) { }, []schema.GroupVersionResource{}, ) - defer closeFn() + defer tearDownFn() - ns := framework.CreateTestingNamespace("server-print", t) - defer framework.DeleteTestingNamespace(ns, t) + ns := framework.CreateNamespaceOrDie(clientSet, "server-print", t) + defer framework.DeleteNamespaceOrDie(clientSet, ns, t) tableParam := fmt.Sprintf("application/json;as=Table;g=%s;v=%s, application/json", metav1beta1.GroupName, metav1beta1.SchemeGroupVersion.Version) printer := newFakePrinter(printersinternal.AddHandlers) configFlags := genericclioptions.NewTestConfigFlags(). - WithClientConfig(clientcmd.NewDefaultClientConfig(*createKubeConfig(s.URL), &clientcmd.ConfigOverrides{})) + WithClientConfig(clientcmd.NewDefaultClientConfig(*createKubeConfig(kubeConfig.Host), &clientcmd.ConfigOverrides{})) restConfig, err := configFlags.ToRESTConfig() if err != nil { diff --git a/test/integration/apiserver/watchcache_test.go b/test/integration/apiserver/watchcache_test.go index 57786d62642..70a6b05d973 100644 --- a/test/integration/apiserver/watchcache_test.go +++ b/test/integration/apiserver/watchcache_test.go @@ -26,14 +26,15 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" - restclient "k8s.io/client-go/rest" + "k8s.io/kubernetes/cmd/kube-apiserver/app/options" + "k8s.io/kubernetes/pkg/controlplane" "k8s.io/kubernetes/pkg/controlplane/reconcilers" "k8s.io/kubernetes/test/integration/framework" ) // setup create kube-apiserver backed up by two separate etcds, // with one of them containing events and the other all other objects. -func multiEtcdSetup(t testing.TB) (clientset.Interface, framework.CloseFunc) { +func multiEtcdSetup(t *testing.T) (clientset.Interface, framework.CloseFunc) { etcdArgs := []string{"--experimental-watch-progress-notify-interval", "1s"} etcd0URL, stopEtcd0, err := framework.RunCustomEtcd("etcd_watchcache0", etcdArgs) if err != nil { @@ -51,25 +52,25 @@ func multiEtcdSetup(t testing.TB) (clientset.Interface, framework.CloseFunc) { etcdOptions.EtcdServersOverrides = []string{fmt.Sprintf("/events#%s", etcd1URL)} etcdOptions.EnableWatchCache = true - opts := framework.ControlPlaneConfigOptions{EtcdOptions: etcdOptions} - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfigWithOptions(&opts) - // Switch off endpoints reconciler to avoid unnecessary operations. - controlPlaneConfig.ExtraConfig.EndpointReconcilerType = reconcilers.NoneEndpointReconcilerType - _, s, stopAPIServer := framework.RunAnAPIServer(controlPlaneConfig) + clientSet, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + // Ensure we're using the same etcd across apiserver restarts. + opts.Etcd = etcdOptions + }, + ModifyServerConfig: func(config *controlplane.Config) { + // Switch off endpoints reconciler to avoid unnecessary operations. + config.ExtraConfig.EndpointReconcilerType = reconcilers.NoneEndpointReconcilerType + }, + }) closeFn := func() { - stopAPIServer() + tearDownFn() stopEtcd1() stopEtcd0() } - clientSet, err := clientset.NewForConfig(&restclient.Config{Host: s.URL, QPS: -1}) - if err != nil { - t.Fatalf("Error in create clientset: %v", err) - } - // Wait for apiserver to be stabilized. - // Everything but default service creation is checked in RunAnAPIServer above by + // Everything but default service creation is checked in StartTestServer above by // waiting for post start hooks, so we just wait for default service to exist. // TODO(wojtek-t): Figure out less fragile way. ctx := context.Background() From ed442cc3dd479dd383532a2e77c6c71087763bf9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Fri, 10 Jun 2022 20:17:25 +0200 Subject: [PATCH 5/5] Clean(er) shutdown of auth integration tests --- test/integration/auth/accessreview_test.go | 53 ++++++----- test/integration/auth/bootstraptoken_test.go | 27 ++++-- test/integration/auth/rbac_test.go | 93 +++++++++++++------- 3 files changed, 107 insertions(+), 66 deletions(-) diff --git a/test/integration/auth/accessreview_test.go b/test/integration/auth/accessreview_test.go index 9b083509d30..e621821d630 100644 --- a/test/integration/auth/accessreview_test.go +++ b/test/integration/auth/accessreview_test.go @@ -28,9 +28,8 @@ import ( "k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authorization/authorizer" - clientset "k8s.io/client-go/kubernetes" - restclient "k8s.io/client-go/rest" api "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/controlplane" "k8s.io/kubernetes/test/integration/framework" ) @@ -57,13 +56,15 @@ func alwaysAlice(req *http.Request) (*authenticator.Response, bool, error) { } func TestSubjectAccessReview(t *testing.T) { - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - controlPlaneConfig.GenericConfig.Authentication.Authenticator = authenticator.RequestFunc(alwaysAlice) - controlPlaneConfig.GenericConfig.Authorization.Authorizer = sarAuthorizer{} - _, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig) - defer closeFn() - - clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL}) + clientset, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerConfig: func(config *controlplane.Config) { + // Unset BearerToken to disable BearerToken authenticator. + config.GenericConfig.LoopbackClientConfig.BearerToken = "" + config.GenericConfig.Authentication.Authenticator = authenticator.RequestFunc(alwaysAlice) + config.GenericConfig.Authorization.Authorizer = sarAuthorizer{} + }, + }) + defer tearDownFn() tests := []struct { name string @@ -148,8 +149,7 @@ func TestSubjectAccessReview(t *testing.T) { func TestSelfSubjectAccessReview(t *testing.T) { username := "alice" - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - controlPlaneConfig.GenericConfig.Authentication.Authenticator = authenticator.RequestFunc(func(req *http.Request) (*authenticator.Response, bool, error) { + authenticatorFunc := func(req *http.Request) (*authenticator.Response, bool, error) { return &authenticator.Response{ User: &user.DefaultInfo{ Name: username, @@ -157,12 +157,17 @@ func TestSelfSubjectAccessReview(t *testing.T) { Groups: []string{user.AllAuthenticated}, }, }, true, nil - }) - controlPlaneConfig.GenericConfig.Authorization.Authorizer = sarAuthorizer{} - _, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig) - defer closeFn() + } - clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL}) + clientset, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerConfig: func(config *controlplane.Config) { + // Unset BearerToken to disable BearerToken authenticator. + config.GenericConfig.LoopbackClientConfig.BearerToken = "" + config.GenericConfig.Authentication.Authenticator = authenticator.RequestFunc(authenticatorFunc) + config.GenericConfig.Authorization.Authorizer = sarAuthorizer{} + }, + }) + defer tearDownFn() tests := []struct { name string @@ -235,13 +240,15 @@ func TestSelfSubjectAccessReview(t *testing.T) { } func TestLocalSubjectAccessReview(t *testing.T) { - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - controlPlaneConfig.GenericConfig.Authentication.Authenticator = authenticator.RequestFunc(alwaysAlice) - controlPlaneConfig.GenericConfig.Authorization.Authorizer = sarAuthorizer{} - _, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig) - defer closeFn() - - clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL}) + clientset, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerConfig: func(config *controlplane.Config) { + // Unset BearerToken to disable BearerToken authenticator. + config.GenericConfig.LoopbackClientConfig.BearerToken = "" + config.GenericConfig.Authentication.Authenticator = authenticator.RequestFunc(alwaysAlice) + config.GenericConfig.Authorization.Authorizer = sarAuthorizer{} + }, + }) + defer tearDownFn() tests := []struct { name string diff --git a/test/integration/auth/bootstraptoken_test.go b/test/integration/auth/bootstraptoken_test.go index 97d2d764392..c538cdeb1bb 100644 --- a/test/integration/auth/bootstraptoken_test.go +++ b/test/integration/auth/bootstraptoken_test.go @@ -29,7 +29,10 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apiserver/pkg/authentication/group" "k8s.io/apiserver/pkg/authentication/request/bearertoken" + "k8s.io/apiserver/pkg/authorization/authorizerfactory" + "k8s.io/client-go/rest" bootstrapapi "k8s.io/cluster-bootstrap/token/api" + "k8s.io/kubernetes/pkg/controlplane" "k8s.io/kubernetes/plugin/pkg/auth/authenticator/token/bootstrap" "k8s.io/kubernetes/test/integration" "k8s.io/kubernetes/test/integration/framework" @@ -117,17 +120,23 @@ func TestBootstrapTokenAuth(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { authenticator := group.NewAuthenticatedGroupAdder(bearertoken.New(bootstrap.NewTokenAuthenticator(bootstrapSecrets{test.secret}))) - // Set up an API server - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - controlPlaneConfig.GenericConfig.Authentication.Authenticator = authenticator - _, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig) - defer closeFn() - ns := framework.CreateTestingNamespace("auth-bootstrap-token", t) - defer framework.DeleteTestingNamespace(ns, t) + kubeClient, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerConfig: func(config *controlplane.Config) { + config.GenericConfig.Authentication.Authenticator = authenticator + config.GenericConfig.Authorization.Authorizer = authorizerfactory.NewAlwaysAllowAuthorizer() + }, + }) + defer tearDownFn() + + ns := framework.CreateNamespaceOrDie(kubeClient, "auth-bootstrap-token", t) + defer framework.DeleteNamespaceOrDie(kubeClient, ns, t) previousResourceVersion := make(map[string]float64) - transport := http.DefaultTransport + transport, err := rest.TransportFor(kubeConfig) + if err != nil { + t.Fatal(err) + } token := validTokenID + "." + validSecret var bodyStr string @@ -144,7 +153,7 @@ func TestBootstrapTokenAuth(t *testing.T) { } test.request.body = bodyStr bodyBytes := bytes.NewReader([]byte(bodyStr)) - req, err := http.NewRequest(test.request.verb, s.URL+test.request.URL, bodyBytes) + req, err := http.NewRequest(test.request.verb, kubeConfig.Host+test.request.URL, bodyBytes) if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/test/integration/auth/rbac_test.go b/test/integration/auth/rbac_test.go index 0cda9b5e149..74fa05cfad5 100644 --- a/test/integration/auth/rbac_test.go +++ b/test/integration/auth/rbac_test.go @@ -47,7 +47,7 @@ import ( "k8s.io/client-go/transport" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/api/legacyscheme" + "k8s.io/kubernetes/cmd/kube-apiserver/app/options" rbachelper "k8s.io/kubernetes/pkg/apis/rbac/v1" "k8s.io/kubernetes/pkg/controlplane" "k8s.io/kubernetes/pkg/registry/rbac/clusterrole" @@ -62,11 +62,11 @@ import ( "k8s.io/kubernetes/test/integration/framework" ) -func clientForToken(user string) *http.Client { +func clientForToken(user string, rt http.RoundTripper) *http.Client { return &http.Client{ Transport: transport.NewBearerAuthRoundTripper( user, - transport.DebugWrappers(http.DefaultTransport), + transport.DebugWrappers(rt), ), } } @@ -519,10 +519,7 @@ func TestRBAC(t *testing.T) { for i, tc := range tests { t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) { - // Create an API Server. - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - controlPlaneConfig.GenericConfig.Authorization.Authorizer = newRBACAuthorizer(t, controlPlaneConfig) - controlPlaneConfig.GenericConfig.Authentication.Authenticator = group.NewAuthenticatedGroupAdder(bearertoken.New(tokenfile.New(map[string]*user.DefaultInfo{ + authenticator := group.NewAuthenticatedGroupAdder(bearertoken.New(tokenfile.New(map[string]*user.DefaultInfo{ superUser: {Name: "admin", Groups: []string{"system:masters"}}, "any-rolebinding-writer": {Name: "any-rolebinding-writer"}, "any-rolebinding-writer-namespace": {Name: "any-rolebinding-writer-namespace"}, @@ -535,14 +532,28 @@ func TestRBAC(t *testing.T) { "limitrange-patcher": {Name: "limitrange-patcher"}, "user-with-no-permissions": {Name: "user-with-no-permissions"}, }))) - controlPlaneConfig.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig() - _, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig) - defer closeFn() - clientConfig := &restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{NegotiatedSerializer: legacyscheme.Codecs}} + _, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + // Also disable namespace lifecycle to workaroung the test limitation that first creates + // roles/rolebindings and only then creates corresponding namespaces. + opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "NamespaceLifecycle"} + }, + ModifyServerConfig: func(config *controlplane.Config) { + config.GenericConfig.Authentication.Authenticator = authenticator + config.GenericConfig.Authorization.Authorizer = newRBACAuthorizer(t, config) + }, + }) + defer tearDownFn() + + transport, err := restclient.TransportFor(kubeConfig) + if err != nil { + t.Fatal(err) + } // Bootstrap the API Server with the test case's initial roles. - superuserClient, _ := clientsetForToken(superUser, clientConfig) + superuserClient, _ := clientsetForToken(superUser, kubeConfig) if err := tc.bootstrapRoles.bootstrap(superuserClient); err != nil { t.Errorf("case %d: failed to apply initial roles: %v", i, err) return @@ -578,7 +589,7 @@ func TestRBAC(t *testing.T) { body = strings.NewReader(fmt.Sprintf(r.body, sub)) } - req, err := http.NewRequest(r.verb, s.URL+path, body) + req, err := http.NewRequest(r.verb, kubeConfig.Host+path, body) if r.verb == "PATCH" { // For patch operations, use the apply content type req.Header.Add("Content-Type", string(types.ApplyPatchType)) @@ -598,7 +609,7 @@ func TestRBAC(t *testing.T) { return } - resp, err := clientForToken(r.token).Do(req) + resp, err := clientForToken(r.token, transport).Do(req) if err != nil { t.Errorf("case %d, req %d: failed to make request: %v", i, j, err) return @@ -644,15 +655,15 @@ func TestRBAC(t *testing.T) { func TestBootstrapping(t *testing.T) { superUser := "admin/system:masters" - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - controlPlaneConfig.GenericConfig.Authorization.Authorizer = newRBACAuthorizer(t, controlPlaneConfig) - controlPlaneConfig.GenericConfig.Authentication.Authenticator = bearertoken.New(tokenfile.New(map[string]*user.DefaultInfo{ - superUser: {Name: "admin", Groups: []string{"system:masters"}}, - })) - _, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig) - defer closeFn() - - clientset := clientset.NewForConfigOrDie(&restclient.Config{BearerToken: superUser, Host: s.URL}) + clientset, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerConfig: func(config *controlplane.Config) { + config.GenericConfig.Authentication.Authenticator = bearertoken.New(tokenfile.New(map[string]*user.DefaultInfo{ + superUser: {Name: "admin", Groups: []string{"system:masters"}}, + })) + config.GenericConfig.Authorization.Authorizer = newRBACAuthorizer(t, config) + }, + }) + defer tearDownFn() watcher, err := clientset.RbacV1().ClusterRoles().Watch(context.TODO(), metav1.ListOptions{ResourceVersion: "0"}) if err != nil { @@ -705,14 +716,19 @@ func TestDiscoveryUpgradeBootstrapping(t *testing.T) { superUser := "admin/system:masters" - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - controlPlaneConfig.GenericConfig.Authorization.Authorizer = newRBACAuthorizer(t, controlPlaneConfig) - controlPlaneConfig.GenericConfig.Authentication.Authenticator = bearertoken.New(tokenfile.New(map[string]*user.DefaultInfo{ - superUser: {Name: "admin", Groups: []string{"system:masters"}}, - })) - _, s, tearDownFn := framework.RunAnAPIServer(controlPlaneConfig) - - client := clientset.NewForConfigOrDie(&restclient.Config{BearerToken: superUser, Host: s.URL}) + etcdConfig := framework.SharedEtcd() + client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + // Ensure we're using the same etcd across apiserver restarts. + opts.Etcd.StorageConfig = *etcdConfig + }, + ModifyServerConfig: func(config *controlplane.Config) { + config.GenericConfig.Authentication.Authenticator = bearertoken.New(tokenfile.New(map[string]*user.DefaultInfo{ + superUser: {Name: "admin", Groups: []string{"system:masters"}}, + })) + config.GenericConfig.Authorization.Authorizer = newRBACAuthorizer(t, config) + }, + }) // Modify the default RBAC discovery ClusterRoleBidnings to look more like the defaults that // existed prior to v1.14, but with user modifications. @@ -754,9 +770,18 @@ func TestDiscoveryUpgradeBootstrapping(t *testing.T) { // Check that upgraded API servers inherit `system:public-info-viewer` settings from // `system:discovery`, and respect auto-reconciliation annotations. - _, s, tearDownFn = framework.RunAnAPIServer(controlPlaneConfig) - - client = clientset.NewForConfigOrDie(&restclient.Config{BearerToken: superUser, Host: s.URL}) + client, _, tearDownFn = framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + // Ensure we're using the same etcd across apiserver restarts. + opts.Etcd.StorageConfig = *etcdConfig + }, + ModifyServerConfig: func(config *controlplane.Config) { + config.GenericConfig.Authentication.Authenticator = bearertoken.New(tokenfile.New(map[string]*user.DefaultInfo{ + superUser: {Name: "admin", Groups: []string{"system:masters"}}, + })) + config.GenericConfig.Authorization.Authorizer = newRBACAuthorizer(t, config) + }, + }) newDiscRoleBinding, err := client.RbacV1().ClusterRoleBindings().Get(context.TODO(), "system:discovery", metav1.GetOptions{}) if err != nil {