Clean shutdown of controlplane integration tests

This commit is contained in:
Wojciech Tyczyński 2022-06-12 15:40:18 +02:00
parent 08f9125cb0
commit 8e267eba51

View File

@ -37,6 +37,7 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
authauthenticator "k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authentication/group"
@ -49,10 +50,11 @@ import (
clientset "k8s.io/client-go/kubernetes"
clienttypedv1 "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/test/integration"
"k8s.io/kubernetes/test/integration/framework"
netutils "k8s.io/utils/net"
)
const (
@ -71,13 +73,23 @@ func (allowAliceAuthorizer) Authorize(ctx context.Context, a authorizer.Attribut
}
func testPrefix(t *testing.T, prefix string) {
_, s, closeFn := framework.RunAnAPIServer(nil)
defer closeFn()
server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
defer server.TearDownFn()
resp, err := http.Get(s.URL + prefix)
transport, err := restclient.TransportFor(server.ClientConfig)
if err != nil {
t.Fatal(err)
}
req, err := http.NewRequest("GET", server.ClientConfig.Host+prefix, nil)
if err != nil {
t.Fatalf("couldn't create a request: %v", err)
}
resp, err := transport.RoundTrip(req)
if err != nil {
t.Fatalf("unexpected error getting %s prefix: %v", prefix, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("got status %v instead of 200 OK", resp.StatusCode)
}
@ -96,10 +108,10 @@ func TestAppsPrefix(t *testing.T) {
}
func TestKubernetesService(t *testing.T) {
config := framework.NewControlPlaneConfig()
_, _, closeFn := framework.RunAnAPIServer(config)
defer closeFn()
coreClient := clientset.NewForConfigOrDie(config.GenericConfig.LoopbackClientConfig)
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--advertise-address=10.1.1.1"}, framework.SharedEtcd())
defer server.TearDownFn()
coreClient := clientset.NewForConfigOrDie(server.ClientConfig)
err := wait.PollImmediate(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) {
if _, err := coreClient.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{}); err != nil && apierrors.IsNotFound(err) {
return false, nil
@ -114,18 +126,28 @@ func TestKubernetesService(t *testing.T) {
}
func TestEmptyList(t *testing.T) {
_, s, closeFn := framework.RunAnAPIServer(nil)
defer closeFn()
server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
defer server.TearDownFn()
u := s.URL + "/api/v1/namespaces/default/pods"
resp, err := http.Get(u)
transport, err := restclient.TransportFor(server.ClientConfig)
if err != nil {
t.Fatalf("unexpected error getting %s: %v", u, err)
t.Fatal(err)
}
u := server.ClientConfig.Host + "/api/v1/namespaces/default/pods"
req, err := http.NewRequest("GET", u, nil)
if err != nil {
t.Fatalf("couldn't create a request: %v", err)
}
resp, err := transport.RoundTrip(req)
if err != nil {
t.Fatalf("unexpected error getting response: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("got status %v instead of 200 OK", resp.StatusCode)
}
defer resp.Body.Close()
data, _ := io.ReadAll(resp.Body)
decodedData := map[string]interface{}{}
if err := json.Unmarshal(data, &decodedData); err != nil {
@ -141,9 +163,8 @@ func TestEmptyList(t *testing.T) {
}
}
func initStatusForbiddenControlPlaneConfig() *controlplane.Config {
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
controlPlaneConfig.GenericConfig.Authentication.Authenticator = authenticatorunion.New(
func initStatusForbiddenControlPlaneConfig(config *controlplane.Config) {
config.GenericConfig.Authentication.Authenticator = authenticatorunion.New(
authauthenticator.RequestFunc(func(req *http.Request) (*authauthenticator.Response, bool, error) {
return &authauthenticator.Response{
User: &user.DefaultInfo{
@ -152,69 +173,86 @@ func initStatusForbiddenControlPlaneConfig() *controlplane.Config {
},
}, true, nil
}))
controlPlaneConfig.GenericConfig.Authorization.Authorizer = authorizerfactory.NewAlwaysDenyAuthorizer()
return controlPlaneConfig
config.GenericConfig.Authorization.Authorizer = authorizerfactory.NewAlwaysDenyAuthorizer()
}
func initUnauthorizedControlPlaneConfig() *controlplane.Config {
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
func initUnauthorizedControlPlaneConfig(config *controlplane.Config) {
tokenAuthenticator := tokentest.New()
tokenAuthenticator.Tokens[AliceToken] = &user.DefaultInfo{Name: "alice", UID: "1"}
tokenAuthenticator.Tokens[BobToken] = &user.DefaultInfo{Name: "bob", UID: "2"}
controlPlaneConfig.GenericConfig.Authentication.Authenticator = group.NewGroupAdder(bearertoken.New(tokenAuthenticator), []string{user.AllAuthenticated})
controlPlaneConfig.GenericConfig.Authorization.Authorizer = allowAliceAuthorizer{}
return controlPlaneConfig
config.GenericConfig.Authentication.Authenticator = group.NewGroupAdder(bearertoken.New(tokenAuthenticator), []string{user.AllAuthenticated})
config.GenericConfig.Authorization.Authorizer = allowAliceAuthorizer{}
}
func TestStatus(t *testing.T) {
testCases := []struct {
name string
controlPlaneConfig *controlplane.Config
statusCode int
reqPath string
reason string
message string
name string
modifyConfig func(*controlplane.Config)
statusCode int
reqPath string
reason string
message string
}{
{
name: "404",
controlPlaneConfig: nil,
statusCode: http.StatusNotFound,
reqPath: "/apis/batch/v1/namespaces/default/jobs/foo",
reason: "NotFound",
message: `jobs.batch "foo" not found`,
name: "404",
modifyConfig: nil,
statusCode: http.StatusNotFound,
reqPath: "/apis/batch/v1/namespaces/default/jobs/foo",
reason: "NotFound",
message: `jobs.batch "foo" not found`,
},
{
name: "403",
controlPlaneConfig: initStatusForbiddenControlPlaneConfig(),
statusCode: http.StatusForbidden,
reqPath: "/apis",
reason: "Forbidden",
message: `forbidden: User "unprivileged" cannot get path "/apis": Everything is forbidden.`,
name: "403",
modifyConfig: initStatusForbiddenControlPlaneConfig,
statusCode: http.StatusForbidden,
reqPath: "/apis",
reason: "Forbidden",
message: `forbidden: User "unprivileged" cannot get path "/apis": Everything is forbidden.`,
},
{
name: "401",
controlPlaneConfig: initUnauthorizedControlPlaneConfig(),
statusCode: http.StatusUnauthorized,
reqPath: "/apis",
reason: "Unauthorized",
message: `Unauthorized`,
name: "401",
modifyConfig: initUnauthorizedControlPlaneConfig,
statusCode: http.StatusUnauthorized,
reqPath: "/apis",
reason: "Unauthorized",
message: `Unauthorized`,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
_, s, closeFn := framework.RunAnAPIServer(tc.controlPlaneConfig)
defer closeFn()
_, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerConfig: func(config *controlplane.Config) {
if tc.modifyConfig != nil {
tc.modifyConfig(config)
}
},
})
defer tearDownFn()
u := s.URL + tc.reqPath
resp, err := http.Get(u)
if err != nil {
t.Fatalf("unexpected error getting %s: %v", u, err)
// When modifying authenticator and authorizer, don't use
// bearer token than will be always authorized.
if tc.modifyConfig != nil {
kubeConfig.BearerToken = ""
}
transport, err := restclient.TransportFor(kubeConfig)
if err != nil {
t.Fatal(err)
}
req, err := http.NewRequest("GET", kubeConfig.Host+tc.reqPath, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
resp, err := transport.RoundTrip(req)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != tc.statusCode {
t.Fatalf("got status %v instead of %s", resp.StatusCode, tc.name)
}
defer resp.Body.Close()
data, _ := io.ReadAll(resp.Body)
decodedData := map[string]interface{}{}
if err := json.Unmarshal(data, &decodedData); err != nil {
@ -311,29 +349,36 @@ func constructBody(val string, size int, field string, t *testing.T) *appsv1.Dep
}
func TestObjectSizeResponses(t *testing.T) {
_, s, closeFn := framework.RunAnAPIServer(nil)
defer closeFn()
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--storage-media-type=application/json"}, framework.SharedEtcd())
defer server.TearDownFn()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL})
server.ClientConfig.ContentType = runtime.ContentTypeJSON
client := clientset.NewForConfigOrDie(server.ClientConfig)
const DeploymentMegabyteSize = 100000
const DeploymentTwoMegabyteSize = 175000
const DeploymentThreeMegabyteSize = 250000
// Computing ManagedFields is extremely inefficient for large object, e.g.
// it may take 10s+ to just compute it if we have ~100k very small labels or
// annotations. This in turn may lead to timing out requests,
// which have hardcoded timeout of 34 seconds.
// As a result, we're using slightly larger individual labels/annotations
// to reduce the number of those.
const DeploymentMegabyteSize = 25000
const DeploymentTwoMegabyteSize = 30000
const DeploymentThreeMegabyteSize = 45000
expectedMsgFor1MB := `etcdserver: request is too large`
expectedMsgFor2MB := `rpc error: code = ResourceExhausted desc = trying to send message larger than max`
expectedMsgFor3MB := `Request entity too large: limit is 3145728`
expectedMsgForLargeAnnotation := `metadata.annotations: Too long: must have at most 262144 bytes`
deployment1 := constructBody("a", DeploymentMegabyteSize, "labels", t) // >1 MB file
deployment2 := constructBody("a", DeploymentTwoMegabyteSize, "labels", t) // >2 MB file
deployment3 := constructBody("a", DeploymentThreeMegabyteSize, "labels", t) // >3 MB file
deployment1 := constructBody("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", DeploymentMegabyteSize, "labels", t) // >1.5 MB file
deployment2 := constructBody("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", DeploymentTwoMegabyteSize, "labels", t) // >2 MB file
deployment3 := constructBody("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", DeploymentThreeMegabyteSize, "labels", t) // >3 MB file
deployment4 := constructBody("a", DeploymentMegabyteSize, "annotations", t)
deployment4 := constructBody("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", DeploymentMegabyteSize, "annotations", t)
deployment5 := constructBody("sample/sample", DeploymentMegabyteSize, "finalizers", t) // >1 MB file
deployment6 := constructBody("sample/sample", DeploymentTwoMegabyteSize, "finalizers", t) // >2 MB file
deployment7 := constructBody("sample/sample", DeploymentThreeMegabyteSize, "finalizers", t) // >3 MB file
deployment5 := constructBody("sample0123456789/sample0123456789", 2*DeploymentMegabyteSize, "finalizers", t) // >1.5 MB file
deployment6 := constructBody("sample0123456789/sample0123456789", 2*DeploymentTwoMegabyteSize, "finalizers", t) // >2 MB file
deployment7 := constructBody("sample0123456789/sample0123456789", 2*DeploymentThreeMegabyteSize, "finalizers", t) // >3 MB file
requests := []struct {
size string
@ -352,6 +397,9 @@ func TestObjectSizeResponses(t *testing.T) {
for _, r := range requests {
t.Run(r.size, func(t *testing.T) {
_, err := client.AppsV1().Deployments(metav1.NamespaceDefault).Create(context.TODO(), r.deploymentObject, metav1.CreateOptions{})
if err == nil {
t.Errorf("got: <nil>;want: %s", r.expectedMessage)
}
if err != nil {
if !strings.Contains(err.Error(), r.expectedMessage) {
t.Errorf("got: %s;want: %s", err.Error(), r.expectedMessage)
@ -362,17 +410,27 @@ func TestObjectSizeResponses(t *testing.T) {
}
func TestWatchSucceedsWithoutArgs(t *testing.T) {
_, s, closeFn := framework.RunAnAPIServer(nil)
defer closeFn()
server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
defer server.TearDownFn()
resp, err := http.Get(s.URL + "/api/v1/namespaces?watch=1")
transport, err := restclient.TransportFor(server.ClientConfig)
if err != nil {
t.Fatalf("unexpected error getting experimental prefix: %v", err)
t.Fatal(err)
}
req, err := http.NewRequest("GET", server.ClientConfig.Host+"/api/v1/namespaces?watch=1", nil)
if err != nil {
t.Fatalf("couldn't create a request: %v", err)
}
resp, err := transport.RoundTrip(req)
if err != nil {
t.Fatalf("unexpected error getting response: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("got status %v instead of 200 OK", resp.StatusCode)
}
resp.Body.Close()
}
var hpaV1 = `
@ -443,9 +501,13 @@ func appsPath(resource, namespace, name string) string {
}
func TestAutoscalingGroupBackwardCompatibility(t *testing.T) {
_, s, closeFn := framework.RunAnAPIServer(nil)
defer closeFn()
transport := http.DefaultTransport
server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
defer server.TearDownFn()
transport, err := restclient.TransportFor(server.ClientConfig)
if err != nil {
t.Fatal(err)
}
requests := []struct {
verb string
@ -460,7 +522,7 @@ func TestAutoscalingGroupBackwardCompatibility(t *testing.T) {
for _, r := range requests {
bodyBytes := bytes.NewReader([]byte(r.body))
req, err := http.NewRequest(r.verb, s.URL+r.URL, bodyBytes)
req, err := http.NewRequest(r.verb, server.ClientConfig.Host+r.URL, bodyBytes)
if err != nil {
t.Logf("case %v", r)
t.Fatalf("unexpected error: %v", err)
@ -488,9 +550,13 @@ func TestAutoscalingGroupBackwardCompatibility(t *testing.T) {
}
func TestAppsGroupBackwardCompatibility(t *testing.T) {
_, s, closeFn := framework.RunAnAPIServer(nil)
defer closeFn()
transport := http.DefaultTransport
server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
defer server.TearDownFn()
transport, err := restclient.TransportFor(server.ClientConfig)
if err != nil {
t.Fatal(err)
}
requests := []struct {
verb string
@ -508,7 +574,7 @@ func TestAppsGroupBackwardCompatibility(t *testing.T) {
for _, r := range requests {
bodyBytes := bytes.NewReader([]byte(r.body))
req, err := http.NewRequest(r.verb, s.URL+r.URL, bodyBytes)
req, err := http.NewRequest(r.verb, server.ClientConfig.Host+r.URL, bodyBytes)
if err != nil {
t.Logf("case %v", r)
t.Fatalf("unexpected error: %v", err)
@ -536,17 +602,26 @@ func TestAppsGroupBackwardCompatibility(t *testing.T) {
}
func TestAccept(t *testing.T) {
_, s, closeFn := framework.RunAnAPIServer(nil)
defer closeFn()
server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
defer server.TearDownFn()
resp, err := http.Get(s.URL + "/api/")
transport, err := restclient.TransportFor(server.ClientConfig)
if err != nil {
t.Fatal(err)
}
req, err := http.NewRequest("GET", server.ClientConfig.Host+"/api/", nil)
if err != nil {
t.Fatal(err)
}
resp, err := transport.RoundTrip(req)
if err != nil {
t.Fatalf("unexpected error getting api: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("got status %v instead of 200 OK", resp.StatusCode)
}
body, _ := io.ReadAll(resp.Body)
if resp.Header.Get("Content-Type") != "application/json" {
t.Errorf("unexpected content: %s", body)
@ -555,15 +630,16 @@ func TestAccept(t *testing.T) {
t.Fatal(err)
}
req, err := http.NewRequest("GET", s.URL+"/api/", nil)
req, err = http.NewRequest("GET", server.ClientConfig.Host+"/api/", nil)
if err != nil {
t.Fatal(err)
}
req.Header.Set("Accept", "application/yaml")
resp, err = http.DefaultClient.Do(req)
resp, err = transport.RoundTrip(req)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
body, _ = io.ReadAll(resp.Body)
if resp.Header.Get("Content-Type") != "application/yaml" {
t.Errorf("unexpected content: %s", body)
@ -573,15 +649,16 @@ func TestAccept(t *testing.T) {
t.Fatal(err)
}
req, err = http.NewRequest("GET", s.URL+"/api/", nil)
req, err = http.NewRequest("GET", server.ClientConfig.Host+"/api/", nil)
if err != nil {
t.Fatal(err)
}
req.Header.Set("Accept", "application/json, application/yaml")
resp, err = http.DefaultClient.Do(req)
resp, err = transport.RoundTrip(req)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
body, _ = io.ReadAll(resp.Body)
if resp.Header.Get("Content-Type") != "application/json" {
t.Errorf("unexpected content: %s", body)
@ -591,15 +668,16 @@ func TestAccept(t *testing.T) {
t.Fatal(err)
}
req, err = http.NewRequest("GET", s.URL+"/api/", nil)
req, err = http.NewRequest("GET", server.ClientConfig.Host+"/api/", nil)
if err != nil {
t.Fatal(err)
}
req.Header.Set("Accept", "application") // not a valid media type
resp, err = http.DefaultClient.Do(req)
resp, err = transport.RoundTrip(req)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNotAcceptable {
t.Errorf("unexpected error from the server")
}
@ -614,10 +692,10 @@ func countEndpoints(eps *corev1.Endpoints) int {
}
func TestAPIServerService(t *testing.T) {
_, s, closeFn := framework.RunAnAPIServer(framework.NewIntegrationTestControlPlaneConfig())
defer closeFn()
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--advertise-address=10.1.1.1"}, framework.SharedEtcd())
defer server.TearDownFn()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL})
client := clientset.NewForConfigOrDie(server.ClientConfig)
err := wait.Poll(time.Second, time.Minute, func() (bool, error) {
svcList, err := client.CoreV1().Services(metav1.NamespaceDefault).List(context.TODO(), metav1.ListOptions{})
@ -650,16 +728,15 @@ func TestAPIServerService(t *testing.T) {
}
func TestServiceAlloc(t *testing.T) {
cfg := framework.NewIntegrationTestControlPlaneConfig()
_, cidr, err := netutils.ParseCIDRSloppy("192.168.0.0/29")
if err != nil {
t.Fatalf("bad cidr: %v", err)
}
cfg.ExtraConfig.ServiceIPRange = *cidr
_, s, closeFn := framework.RunAnAPIServer(cfg)
defer closeFn()
// Create an IPv4 single stack control-plane
serviceCIDR := "192.168.0.0/29"
client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL})
client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.ServiceClusterIPRanges = serviceCIDR
},
})
defer tearDownFn()
svc := func(i int) *corev1.Service {
return &corev1.Service{
@ -676,7 +753,7 @@ func TestServiceAlloc(t *testing.T) {
}
// Wait until the default "kubernetes" service is created.
if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return false, err