From 41d7ddee1afe14eca4ad202567ebd14186082960 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Sun, 12 Jun 2022 23:27:52 +0200 Subject: [PATCH] Clean shutdown of apimachinery integration tests --- .../apimachinery/watch_restart_test.go | 21 ++- .../apimachinery/watch_timeout_test.go | 122 +++++++++++++----- 2 files changed, 101 insertions(+), 42 deletions(-) diff --git a/test/integration/apimachinery/watch_restart_test.go b/test/integration/apimachinery/watch_restart_test.go index 3b73c0f6189..d6380b1b2d6 100644 --- a/test/integration/apimachinery/watch_restart_test.go +++ b/test/integration/apimachinery/watch_restart_test.go @@ -31,9 +31,9 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" - restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" watchtools "k8s.io/client-go/tools/watch" + kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/test/integration/framework" ) @@ -66,19 +66,16 @@ func TestWatchRestartsIfTimeoutNotReached(t *testing.T) { // Has to be longer than 5 seconds timeout := 30 * time.Second - // Set up an API server - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - // Timeout is set random between MinRequestTimeout and 2x - controlPlaneConfig.GenericConfig.MinRequestTimeout = int(timeout.Seconds()) / 4 - _, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig) - defer closeFn() + server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--min-request-timeout=7"}, framework.SharedEtcd()) + defer server.TearDownFn() - config := &restclient.Config{ - Host: s.URL, + clientset, err := kubernetes.NewForConfig(server.ClientConfig) + if err != nil { + t.Fatal(err) } - namespaceObject := framework.CreateTestingNamespace("retry-watch", t) - defer framework.DeleteTestingNamespace(namespaceObject, t) + namespaceObject := framework.CreateNamespaceOrDie(clientset, "retry-watch", t) + defer framework.DeleteNamespaceOrDie(clientset, namespaceObject, t) getListFunc := func(c *kubernetes.Clientset, secret *corev1.Secret) func(options metav1.ListOptions) *corev1.SecretList { return func(options metav1.ListOptions) *corev1.SecretList { @@ -215,7 +212,7 @@ func TestWatchRestartsIfTimeoutNotReached(t *testing.T) { tc := tmptc // we need to copy it for parallel runs t.Run(tc.name, func(t *testing.T) { t.Parallel() - c, err := kubernetes.NewForConfig(config) + c, err := kubernetes.NewForConfig(server.ClientConfig) if err != nil { t.Fatalf("Failed to create clientset: %v", err) } diff --git a/test/integration/apimachinery/watch_timeout_test.go b/test/integration/apimachinery/watch_timeout_test.go index b7ba18cf9fc..529aaaa278d 100644 --- a/test/integration/apimachinery/watch_timeout_test.go +++ b/test/integration/apimachinery/watch_timeout_test.go @@ -19,8 +19,9 @@ package apimachinery import ( "bytes" "context" + "fmt" "io" - "log" + "net/http" "net/http/httptest" "net/http/httputil" "net/url" @@ -38,14 +39,67 @@ import ( restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" kubectlproxy "k8s.io/kubectl/pkg/proxy" + kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/test/integration/framework" ) +type extractRT struct { + http.Header +} + +func (rt *extractRT) RoundTrip(req *http.Request) (*http.Response, error) { + rt.Header = req.Header + return &http.Response{}, nil +} + +// headersForConfig extracts any http client logic necessary for the provided +// config. +func headersForConfig(c *restclient.Config, url *url.URL) (http.Header, error) { + extract := &extractRT{} + rt, err := restclient.HTTPWrappersForConfig(c, extract) + if err != nil { + return nil, err + } + request, err := http.NewRequest("GET", url.String(), nil) + if err != nil { + return nil, err + } + if _, err := rt.RoundTrip(request); err != nil { + return nil, err + } + return extract.Header, nil +} + +// websocketConfig constructs a websocket config to the provided URL, using the client +// config, with the specified protocols. +func websocketConfig(url *url.URL, config *restclient.Config, protocols []string) (*websocket.Config, error) { + tlsConfig, err := restclient.TLSConfigFor(config) + if err != nil { + return nil, fmt.Errorf("Failed to create tls config: %v", err) + } + if url.Scheme == "https" { + url.Scheme = "wss" + } else { + url.Scheme = "ws" + } + headers, err := headersForConfig(config, url) + if err != nil { + return nil, fmt.Errorf("Failed to load http headers: %v", err) + } + cfg, err := websocket.NewConfig(url.String(), "http://localhost") + if err != nil { + return nil, fmt.Errorf("Failed to create websocket config: %v", err) + } + cfg.Header = headers + cfg.TlsConfig = tlsConfig + cfg.Protocol = protocols + return cfg, err +} + func TestWebsocketWatchClientTimeout(t *testing.T) { // server setup - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - instance, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig) - defer closeFn() + server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) + defer server.TearDownFn() // object setup service := &corev1.Service{ @@ -57,7 +111,7 @@ func TestWebsocketWatchClientTimeout(t *testing.T) { configmap := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{Name: "test"}, } - clientset, err := kubernetes.NewForConfig(instance.GenericAPIServer.LoopbackClientConfig) + clientset, err := kubernetes.NewForConfig(server.ClientConfig) if err != nil { t.Fatal(err) } @@ -90,12 +144,13 @@ func TestWebsocketWatchClientTimeout(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - - u, _ := url.Parse(s.URL) - apiURL := "ws://" + u.Host + tc.path - wsc, err := websocket.NewConfig(apiURL, apiURL) + url, err := url.Parse(server.ClientConfig.Host + tc.path) if err != nil { - log.Fatal(err) + t.Fatal(err) + } + wsc, err := websocketConfig(url, server.ClientConfig, nil) + if err != nil { + t.Fatal(err) } wsConn, err := websocket.DialConfig(wsc) @@ -142,29 +197,36 @@ func TestWebsocketWatchClientTimeout(t *testing.T) { } } -func TestWatchClientTimeout(t *testing.T) { - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - _, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig) - defer closeFn() +func TestWatchClientTimeoutXXX(t *testing.T) { + // server setup + server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) + defer server.TearDownFn() t.Run("direct", func(t *testing.T) { - t.Logf("client at %s", s.URL) - testWatchClientTimeouts(t, s.URL) + t.Logf("client at %s", server.ClientConfig.Host) + testWatchClientTimeouts(t, restclient.CopyConfig(server.ClientConfig)) }) t.Run("reverse proxy", func(t *testing.T) { - u, _ := url.Parse(s.URL) + u, _ := url.Parse(server.ClientConfig.Host) proxy := httputil.NewSingleHostReverseProxy(u) proxy.FlushInterval = -1 - proxyServer := httptest.NewServer(httputil.NewSingleHostReverseProxy(u)) + + transport, err := restclient.TransportFor(server.ClientConfig) + if err != nil { + t.Fatal(err) + } + proxy.Transport = transport + + proxyServer := httptest.NewServer(proxy) defer proxyServer.Close() - t.Logf("client to %s, backend at %s", proxyServer.URL, s.URL) - testWatchClientTimeouts(t, proxyServer.URL) + t.Logf("client to %s, backend at %s", proxyServer.URL, server.ClientConfig.Host) + testWatchClientTimeouts(t, &restclient.Config{Host: proxyServer.URL}) }) t.Run("kubectl proxy", func(t *testing.T) { - kubectlProxyServer, err := kubectlproxy.NewServer("", "/", "/static/", nil, &restclient.Config{Host: s.URL, Timeout: 2 * time.Second}, 0, false) + kubectlProxyServer, err := kubectlproxy.NewServer("", "/", "/static/", nil, server.ClientConfig, 0, false) if err != nil { t.Fatal(err) } @@ -175,26 +237,26 @@ func TestWatchClientTimeout(t *testing.T) { defer kubectlProxyListener.Close() go kubectlProxyServer.ServeOnListener(kubectlProxyListener) - t.Logf("client to %s, backend at %s", kubectlProxyListener.Addr().String(), s.URL) - testWatchClientTimeouts(t, "http://"+kubectlProxyListener.Addr().String()) + t.Logf("client to %s, backend at %s", kubectlProxyListener.Addr().String(), server.ClientConfig.Host) + testWatchClientTimeouts(t, &restclient.Config{Host: "http://" + kubectlProxyListener.Addr().String()}) }) } -func testWatchClientTimeouts(t *testing.T, url string) { +func testWatchClientTimeouts(t *testing.T, config *restclient.Config) { t.Run("timeout", func(t *testing.T) { - testWatchClientTimeout(t, url, time.Second, 0) + testWatchClientTimeout(t, config, time.Second, 0) }) t.Run("timeoutSeconds", func(t *testing.T) { - testWatchClientTimeout(t, url, 0, time.Second) + testWatchClientTimeout(t, config, 0, time.Second) }) t.Run("timeout+timeoutSeconds", func(t *testing.T) { - testWatchClientTimeout(t, url, time.Second, time.Second) + testWatchClientTimeout(t, config, time.Second, time.Second) }) } -func testWatchClientTimeout(t *testing.T, serverURL string, timeout, timeoutSeconds time.Duration) { - // client - client, err := kubernetes.NewForConfig(&restclient.Config{Host: serverURL, Timeout: timeout}) +func testWatchClientTimeout(t *testing.T, config *restclient.Config, timeout, timeoutSeconds time.Duration) { + config.Timeout = timeout + client, err := kubernetes.NewForConfig(config) if err != nil { t.Fatal(err) }