Clean shutdown of apimachinery integration tests

This commit is contained in:
Wojciech Tyczyński 2022-06-12 23:27:52 +02:00
parent 8a87681a39
commit 41d7ddee1a
2 changed files with 101 additions and 42 deletions

View File

@ -31,9 +31,9 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/test/integration/framework"
)
@ -66,19 +66,16 @@ func TestWatchRestartsIfTimeoutNotReached(t *testing.T) {
// Has to be longer than 5 seconds
timeout := 30 * time.Second
// Set up an API server
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
// Timeout is set random between MinRequestTimeout and 2x
controlPlaneConfig.GenericConfig.MinRequestTimeout = int(timeout.Seconds()) / 4
_, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig)
defer closeFn()
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--min-request-timeout=7"}, framework.SharedEtcd())
defer server.TearDownFn()
config := &restclient.Config{
Host: s.URL,
clientset, err := kubernetes.NewForConfig(server.ClientConfig)
if err != nil {
t.Fatal(err)
}
namespaceObject := framework.CreateTestingNamespace("retry-watch", t)
defer framework.DeleteTestingNamespace(namespaceObject, t)
namespaceObject := framework.CreateNamespaceOrDie(clientset, "retry-watch", t)
defer framework.DeleteNamespaceOrDie(clientset, namespaceObject, t)
getListFunc := func(c *kubernetes.Clientset, secret *corev1.Secret) func(options metav1.ListOptions) *corev1.SecretList {
return func(options metav1.ListOptions) *corev1.SecretList {
@ -215,7 +212,7 @@ func TestWatchRestartsIfTimeoutNotReached(t *testing.T) {
tc := tmptc // we need to copy it for parallel runs
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
c, err := kubernetes.NewForConfig(config)
c, err := kubernetes.NewForConfig(server.ClientConfig)
if err != nil {
t.Fatalf("Failed to create clientset: %v", err)
}

View File

@ -19,8 +19,9 @@ package apimachinery
import (
"bytes"
"context"
"fmt"
"io"
"log"
"net/http"
"net/http/httptest"
"net/http/httputil"
"net/url"
@ -38,14 +39,67 @@ import (
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
kubectlproxy "k8s.io/kubectl/pkg/proxy"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/test/integration/framework"
)
type extractRT struct {
http.Header
}
func (rt *extractRT) RoundTrip(req *http.Request) (*http.Response, error) {
rt.Header = req.Header
return &http.Response{}, nil
}
// headersForConfig extracts any http client logic necessary for the provided
// config.
func headersForConfig(c *restclient.Config, url *url.URL) (http.Header, error) {
extract := &extractRT{}
rt, err := restclient.HTTPWrappersForConfig(c, extract)
if err != nil {
return nil, err
}
request, err := http.NewRequest("GET", url.String(), nil)
if err != nil {
return nil, err
}
if _, err := rt.RoundTrip(request); err != nil {
return nil, err
}
return extract.Header, nil
}
// websocketConfig constructs a websocket config to the provided URL, using the client
// config, with the specified protocols.
func websocketConfig(url *url.URL, config *restclient.Config, protocols []string) (*websocket.Config, error) {
tlsConfig, err := restclient.TLSConfigFor(config)
if err != nil {
return nil, fmt.Errorf("Failed to create tls config: %v", err)
}
if url.Scheme == "https" {
url.Scheme = "wss"
} else {
url.Scheme = "ws"
}
headers, err := headersForConfig(config, url)
if err != nil {
return nil, fmt.Errorf("Failed to load http headers: %v", err)
}
cfg, err := websocket.NewConfig(url.String(), "http://localhost")
if err != nil {
return nil, fmt.Errorf("Failed to create websocket config: %v", err)
}
cfg.Header = headers
cfg.TlsConfig = tlsConfig
cfg.Protocol = protocols
return cfg, err
}
func TestWebsocketWatchClientTimeout(t *testing.T) {
// server setup
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
instance, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig)
defer closeFn()
server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
defer server.TearDownFn()
// object setup
service := &corev1.Service{
@ -57,7 +111,7 @@ func TestWebsocketWatchClientTimeout(t *testing.T) {
configmap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: "test"},
}
clientset, err := kubernetes.NewForConfig(instance.GenericAPIServer.LoopbackClientConfig)
clientset, err := kubernetes.NewForConfig(server.ClientConfig)
if err != nil {
t.Fatal(err)
}
@ -90,12 +144,13 @@ func TestWebsocketWatchClientTimeout(t *testing.T) {
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
u, _ := url.Parse(s.URL)
apiURL := "ws://" + u.Host + tc.path
wsc, err := websocket.NewConfig(apiURL, apiURL)
url, err := url.Parse(server.ClientConfig.Host + tc.path)
if err != nil {
log.Fatal(err)
t.Fatal(err)
}
wsc, err := websocketConfig(url, server.ClientConfig, nil)
if err != nil {
t.Fatal(err)
}
wsConn, err := websocket.DialConfig(wsc)
@ -142,29 +197,36 @@ func TestWebsocketWatchClientTimeout(t *testing.T) {
}
}
func TestWatchClientTimeout(t *testing.T) {
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
_, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig)
defer closeFn()
func TestWatchClientTimeoutXXX(t *testing.T) {
// server setup
server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
defer server.TearDownFn()
t.Run("direct", func(t *testing.T) {
t.Logf("client at %s", s.URL)
testWatchClientTimeouts(t, s.URL)
t.Logf("client at %s", server.ClientConfig.Host)
testWatchClientTimeouts(t, restclient.CopyConfig(server.ClientConfig))
})
t.Run("reverse proxy", func(t *testing.T) {
u, _ := url.Parse(s.URL)
u, _ := url.Parse(server.ClientConfig.Host)
proxy := httputil.NewSingleHostReverseProxy(u)
proxy.FlushInterval = -1
proxyServer := httptest.NewServer(httputil.NewSingleHostReverseProxy(u))
transport, err := restclient.TransportFor(server.ClientConfig)
if err != nil {
t.Fatal(err)
}
proxy.Transport = transport
proxyServer := httptest.NewServer(proxy)
defer proxyServer.Close()
t.Logf("client to %s, backend at %s", proxyServer.URL, s.URL)
testWatchClientTimeouts(t, proxyServer.URL)
t.Logf("client to %s, backend at %s", proxyServer.URL, server.ClientConfig.Host)
testWatchClientTimeouts(t, &restclient.Config{Host: proxyServer.URL})
})
t.Run("kubectl proxy", func(t *testing.T) {
kubectlProxyServer, err := kubectlproxy.NewServer("", "/", "/static/", nil, &restclient.Config{Host: s.URL, Timeout: 2 * time.Second}, 0, false)
kubectlProxyServer, err := kubectlproxy.NewServer("", "/", "/static/", nil, server.ClientConfig, 0, false)
if err != nil {
t.Fatal(err)
}
@ -175,26 +237,26 @@ func TestWatchClientTimeout(t *testing.T) {
defer kubectlProxyListener.Close()
go kubectlProxyServer.ServeOnListener(kubectlProxyListener)
t.Logf("client to %s, backend at %s", kubectlProxyListener.Addr().String(), s.URL)
testWatchClientTimeouts(t, "http://"+kubectlProxyListener.Addr().String())
t.Logf("client to %s, backend at %s", kubectlProxyListener.Addr().String(), server.ClientConfig.Host)
testWatchClientTimeouts(t, &restclient.Config{Host: "http://" + kubectlProxyListener.Addr().String()})
})
}
func testWatchClientTimeouts(t *testing.T, url string) {
func testWatchClientTimeouts(t *testing.T, config *restclient.Config) {
t.Run("timeout", func(t *testing.T) {
testWatchClientTimeout(t, url, time.Second, 0)
testWatchClientTimeout(t, config, time.Second, 0)
})
t.Run("timeoutSeconds", func(t *testing.T) {
testWatchClientTimeout(t, url, 0, time.Second)
testWatchClientTimeout(t, config, 0, time.Second)
})
t.Run("timeout+timeoutSeconds", func(t *testing.T) {
testWatchClientTimeout(t, url, time.Second, time.Second)
testWatchClientTimeout(t, config, time.Second, time.Second)
})
}
func testWatchClientTimeout(t *testing.T, serverURL string, timeout, timeoutSeconds time.Duration) {
// client
client, err := kubernetes.NewForConfig(&restclient.Config{Host: serverURL, Timeout: timeout})
func testWatchClientTimeout(t *testing.T, config *restclient.Config, timeout, timeoutSeconds time.Duration) {
config.Timeout = timeout
client, err := kubernetes.NewForConfig(config)
if err != nil {
t.Fatal(err)
}