diff --git a/test/integration/examples/apiserver_test.go b/test/integration/examples/apiserver_test.go index a658a5378b8..e2ccf28079f 100644 --- a/test/integration/examples/apiserver_test.go +++ b/test/integration/examples/apiserver_test.go @@ -237,87 +237,19 @@ func TestAggregatedAPIServer(t *testing.T) { }) } -func testAggregatedAPIServer(t *testing.T, enableWardleFeatureGate bool, emulationVersion string) { +func testAggregatedAPIServer(t *testing.T, flunderBanningFeatureGate bool, emulationVersion string) { + const testNamespace = "kube-wardle" + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) t.Cleanup(cancel) - // makes the kube-apiserver very responsive. it's normally a minute - dynamiccertificates.FileRefreshDuration = 1 * time.Second + testKAS, wardleOptions, wardlePort := prepareAggregatedWardleAPIServer(ctx, t, testNamespace) + kubeClientConfig := getKubeConfig(testKAS) - // we need the wardle port information first to set up the service resolver - listener, wardlePort, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0", net.ListenConfig{}) - if err != nil { - t.Fatal(err) - } - // endpoints cannot have loopback IPs so we need to override the resolver itself - t.Cleanup(app.SetServiceResolverForTests(staticURLServiceResolver(fmt.Sprintf("https://127.0.0.1:%d", wardlePort)))) - - testServer := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, BinaryVersion: "1.32"}, nil, framework.SharedEtcd()) - defer testServer.TearDownFn() - kubeClientConfig := rest.CopyConfig(testServer.ClientConfig) - // force json because everything speaks it - kubeClientConfig.ContentType = "" - kubeClientConfig.AcceptContentTypes = "" - kubeClient := client.NewForConfigOrDie(kubeClientConfig) - aggregatorClient := aggregatorclient.NewForConfigOrDie(kubeClientConfig) - wardleClient := wardlev1alpha1client.NewForConfigOrDie(kubeClientConfig) - - // create the bare minimum resources required to be able to get the API service into an available state - _, err = kubeClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: "kube-wardle", - }, - }, metav1.CreateOptions{}) - if err != nil { - t.Fatal(err) - } - _, err = kubeClient.CoreV1().Services("kube-wardle").Create(ctx, &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "api", - }, - Spec: corev1.ServiceSpec{ - ExternalName: "needs-to-be-non-empty", - Type: corev1.ServiceTypeExternalName, - }, - }, metav1.CreateOptions{}) - if err != nil { - t.Fatal(err) - } - - // start the wardle server to prove we can aggregate it - wardleToKASKubeConfigFile := writeKubeConfigForWardleServerToKASConnection(t, rest.CopyConfig(kubeClientConfig)) - defer os.Remove(wardleToKASKubeConfigFile) wardleCertDir, _ := os.MkdirTemp("", "test-integration-wardle-server") defer os.RemoveAll(wardleCertDir) - go func() { - o := sampleserver.NewWardleServerOptions(os.Stdout, os.Stderr) - // ensure this is a SAN on the generated cert for service FQDN - o.AlternateDNS = []string{ - "api.kube-wardle.svc", - } - o.RecommendedOptions.SecureServing.Listener = listener - o.RecommendedOptions.SecureServing.BindAddress = netutils.ParseIPSloppy("127.0.0.1") - wardleCmd := sampleserver.NewCommandStartWardleServer(ctx, o) - args := []string{ - "--authentication-kubeconfig", wardleToKASKubeConfigFile, - "--authorization-kubeconfig", wardleToKASKubeConfigFile, - "--etcd-servers", framework.GetEtcdURL(), - "--cert-dir", wardleCertDir, - "--kubeconfig", wardleToKASKubeConfigFile, - "--emulated-version", fmt.Sprintf("wardle=%s", emulationVersion), - } - if enableWardleFeatureGate { - args = append(args, "--feature-gates", "wardle:BanFlunder=true") - } - wardleCmd.SetArgs(args) - if err := wardleCmd.Execute(); err != nil { - t.Error(err) - } - }() - directWardleClientConfig, err := waitForWardleRunning(ctx, t, kubeClientConfig, wardleCertDir, wardlePort) - if err != nil { - t.Fatal(err) - } + + directWardleClientConfig := runPreparedWardleServer(ctx, t, wardleOptions, wardleCertDir, wardlePort, flunderBanningFeatureGate, emulationVersion, kubeClientConfig) // now we're finally ready to test. These are what's run by default now wardleDirectClient := client.NewForConfigOrDie(directWardleClientConfig) @@ -325,81 +257,12 @@ func testAggregatedAPIServer(t *testing.T, enableWardleFeatureGate bool, emulati testAPIGroup(ctx, t, wardleDirectClient.Discovery().RESTClient()) testAPIResourceList(ctx, t, wardleDirectClient.Discovery().RESTClient()) - wardleCA, err := os.ReadFile(directWardleClientConfig.CAFile) - if err != nil { - t.Fatal(err) - } - _, err = aggregatorClient.ApiregistrationV1().APIServices().Create(ctx, &apiregistrationv1.APIService{ - ObjectMeta: metav1.ObjectMeta{Name: "v1alpha1.wardle.example.com"}, - Spec: apiregistrationv1.APIServiceSpec{ - Service: &apiregistrationv1.ServiceReference{ - Namespace: "kube-wardle", - Name: "api", - }, - Group: "wardle.example.com", - Version: "v1alpha1", - CABundle: wardleCA, - GroupPriorityMinimum: 200, - VersionPriority: 200, - }, - }, metav1.CreateOptions{}) - if err != nil { - t.Fatal(err) - } + wardleClient := wardlev1alpha1client.NewForConfigOrDie(kubeClientConfig) - // wait for the API service to be available - err = wait.Poll(time.Second, wait.ForeverTestTimeout, func() (done bool, err error) { - apiService, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1alpha1.wardle.example.com", metav1.GetOptions{}) - if err != nil { - return false, err - } - var available bool - for _, condition := range apiService.Status.Conditions { - if condition.Type == apiregistrationv1.Available && condition.Status == apiregistrationv1.ConditionTrue { - available = true - break - } - } - if !available { - t.Log("api service is not available", apiService.Status.Conditions) - return false, nil - } - - // make sure discovery is healthy overall - _, _, err = kubeClient.Discovery().ServerGroupsAndResources() - if err != nil { - t.Log("discovery failed", err) - return false, nil - } - - // make sure we have the wardle resources in discovery - apiResources, err := kubeClient.Discovery().ServerResourcesForGroupVersion("wardle.example.com/v1alpha1") - if err != nil { - t.Log("wardle discovery failed", err) - return false, nil - } - if len(apiResources.APIResources) != 2 { - t.Log("wardle discovery has wrong resources", apiResources.APIResources) - return false, nil - } - resources := make([]string, 0, 2) - for _, resource := range apiResources.APIResources { - resource := resource - resources = append(resources, resource.Name) - } - sort.Strings(resources) - if !reflect.DeepEqual([]string{"fischers", "flunders"}, resources) { - return false, fmt.Errorf("unexpected resources: %v", resources) - } - - return true, nil - }) - if err != nil { - t.Fatal(err) - } + waitForWardleAPIServiceReady(ctx, t, kubeClientConfig, wardleCertDir, testNamespace) // perform simple CRUD operations against the wardle resources - _, err = wardleClient.Fischers().Create(ctx, &wardlev1alpha1.Fischer{ + _, err := wardleClient.Fischers().Create(ctx, &wardlev1alpha1.Fischer{ ObjectMeta: metav1.ObjectMeta{ Name: "panda", }, @@ -426,7 +289,7 @@ func testAggregatedAPIServer(t *testing.T, enableWardleFeatureGate bool, emulati Name: "badname", }, }, metav1.CreateOptions{}) - banFlunder := enableWardleFeatureGate || emulationVersion == "1.2" + banFlunder := flunderBanningFeatureGate || emulationVersion == "1.2" if banFlunder && err == nil { t.Fatal("expect flunder:badname not admitted when wardle feature gates are specified") } @@ -498,10 +361,10 @@ func testAggregatedAPIServer(t *testing.T, enableWardleFeatureGate bool, emulati // now we update the client-ca nd request-header-client-ca-file and the kas will consume it, update the configmap // and then the wardle server will detect and update too. - if err := os.WriteFile(path.Join(testServer.TmpDir, "client-ca.crt"), differentClientCA, 0644); err != nil { + if err := os.WriteFile(path.Join(testKAS.TmpDir, "client-ca.crt"), differentClientCA, 0644); err != nil { t.Fatal(err) } - if err := os.WriteFile(path.Join(testServer.TmpDir, "proxy-ca.crt"), differentFrontProxyCA, 0644); err != nil { + if err := os.WriteFile(path.Join(testKAS.TmpDir, "proxy-ca.crt"), differentFrontProxyCA, 0644); err != nil { t.Fatal(err) } // wait for it to be picked up. there's a test in certreload_test.go that ensure this works @@ -547,44 +410,6 @@ func testAggregatedAPIServer(t *testing.T, enableWardleFeatureGate bool, emulati } } -func waitForWardleRunning(ctx context.Context, t *testing.T, wardleToKASKubeConfig *rest.Config, wardleCertDir string, wardlePort int) (*rest.Config, error) { - directWardleClientConfig := rest.AnonymousClientConfig(rest.CopyConfig(wardleToKASKubeConfig)) - directWardleClientConfig.CAFile = path.Join(wardleCertDir, "apiserver.crt") - directWardleClientConfig.CAData = nil - directWardleClientConfig.ServerName = "" - directWardleClientConfig.BearerToken = wardleToKASKubeConfig.BearerToken - var wardleClient client.Interface - lastHealthContent := []byte{} - var lastHealthErr error - err := wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) { - if _, err := os.Stat(directWardleClientConfig.CAFile); os.IsNotExist(err) { // wait until the file trust is created - lastHealthErr = err - return false, nil - } - directWardleClientConfig.Host = fmt.Sprintf("https://127.0.0.1:%d", wardlePort) - wardleClient, err = client.NewForConfig(directWardleClientConfig) - if err != nil { - // this happens because we race the API server start - t.Log(err) - return false, nil - } - healthStatus := 0 - result := wardleClient.Discovery().RESTClient().Get().AbsPath("/healthz").Do(ctx).StatusCode(&healthStatus) - lastHealthContent, lastHealthErr = result.Raw() - if healthStatus != http.StatusOK { - return false, nil - } - return true, nil - }) - if err != nil { - t.Log(string(lastHealthContent)) - t.Log(lastHealthErr) - return nil, err - } - - return directWardleClientConfig, nil -} - func TestAggregatedAPIServerRejectRedirectResponse(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) t.Cleanup(cancel) @@ -699,6 +524,225 @@ func TestAggregatedAPIServerRejectRedirectResponse(t *testing.T) { } } +func prepareAggregatedWardleAPIServer(ctx context.Context, t *testing.T, namespace string) (*kastesting.TestServer, *sampleserver.WardleServerOptions, int) { + // makes the kube-apiserver very responsive. it's normally a minute + dynamiccertificates.FileRefreshDuration = 1 * time.Second + + // we need the wardle port information first to set up the service resolver + listener, wardlePort, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0", net.ListenConfig{}) + if err != nil { + t.Fatal(err) + } + // endpoints cannot have loopback IPs so we need to override the resolver itself + t.Cleanup(app.SetServiceResolverForTests(staticURLServiceResolver(fmt.Sprintf("https://127.0.0.1:%d", wardlePort)))) + + testServer := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, BinaryVersion: "1.32"}, nil, framework.SharedEtcd()) + t.Cleanup(func() { testServer.TearDownFn() }) + + kubeClient := client.NewForConfigOrDie(getKubeConfig(testServer)) + + // create the bare minimum resources required to be able to get the API service into an available state + _, err = kubeClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + _, err = kubeClient.CoreV1().Services(namespace).Create(ctx, &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "api", + }, + Spec: corev1.ServiceSpec{ + ExternalName: "needs-to-be-non-empty", + Type: corev1.ServiceTypeExternalName, + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + + wardleOptions := sampleserver.NewWardleServerOptions(os.Stdout, os.Stderr) + // ensure this is a SAN on the generated cert for service FQDN + wardleOptions.AlternateDNS = []string{ + fmt.Sprintf("api.%s.svc", namespace), + } + wardleOptions.RecommendedOptions.SecureServing.Listener = listener + wardleOptions.RecommendedOptions.SecureServing.BindAddress = netutils.ParseIPSloppy("127.0.0.1") + + return testServer, wardleOptions, wardlePort +} + +func runPreparedWardleServer( + ctx context.Context, + t *testing.T, + wardleOptions *sampleserver.WardleServerOptions, + certDir string, + wardlePort int, + flunderBanningFeatureGate bool, + emulationVersion string, + kubeConfig *rest.Config, +) *rest.Config { + + // start the wardle server to prove we can aggregate it + wardleToKASKubeConfigFile := writeKubeConfigForWardleServerToKASConnection(t, rest.CopyConfig(kubeConfig)) + t.Cleanup(func() { os.Remove(wardleToKASKubeConfigFile) }) + + go func() { + args := []string{ + "--authentication-kubeconfig", wardleToKASKubeConfigFile, + "--authorization-kubeconfig", wardleToKASKubeConfigFile, + "--etcd-servers", framework.GetEtcdURL(), + "--cert-dir", certDir, + "--kubeconfig", wardleToKASKubeConfigFile, + "--emulated-version", fmt.Sprintf("wardle=%s", emulationVersion), + } + if flunderBanningFeatureGate { + args = append(args, "--feature-gates", "wardle:BanFlunder=true") + } + wardleCmd := sampleserver.NewCommandStartWardleServer(ctx, wardleOptions) + wardleCmd.SetArgs(args) + if err := wardleCmd.Execute(); err != nil { + t.Error(err) + } + }() + + directWardleClientConfig, err := waitForWardleRunning(ctx, t, kubeConfig, certDir, wardlePort) + if err != nil { + t.Fatal(err) + } + + return directWardleClientConfig +} + +func waitForWardleAPIServiceReady(ctx context.Context, t *testing.T, kubeConfig *rest.Config, wardleCertDir string, namespace string) { + kubeClient := client.NewForConfigOrDie(kubeConfig) + aggregatorClient := aggregatorclient.NewForConfigOrDie(kubeConfig) + + wardleCA, err := os.ReadFile(wardleCAFilePath(wardleCertDir)) + if err != nil { + t.Fatal(err) + } + _, err = aggregatorClient.ApiregistrationV1().APIServices().Create(ctx, &apiregistrationv1.APIService{ + ObjectMeta: metav1.ObjectMeta{Name: "v1alpha1.wardle.example.com"}, + Spec: apiregistrationv1.APIServiceSpec{ + Service: &apiregistrationv1.ServiceReference{ + Namespace: namespace, + Name: "api", + }, + Group: "wardle.example.com", + Version: "v1alpha1", + CABundle: wardleCA, + GroupPriorityMinimum: 200, + VersionPriority: 200, + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + + // wait for the API service to be available + err = wait.Poll(time.Second, wait.ForeverTestTimeout, func() (done bool, err error) { + apiService, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1alpha1.wardle.example.com", metav1.GetOptions{}) + if err != nil { + return false, err + } + var available bool + for _, condition := range apiService.Status.Conditions { + if condition.Type == apiregistrationv1.Available && condition.Status == apiregistrationv1.ConditionTrue { + available = true + break + } + } + if !available { + t.Log("api service is not available", apiService.Status.Conditions) + return false, nil + } + + // make sure discovery is healthy overall + _, _, err = kubeClient.Discovery().ServerGroupsAndResources() + if err != nil { + t.Log("discovery failed", err) + return false, nil + } + + // make sure we have the wardle resources in discovery + apiResources, err := kubeClient.Discovery().ServerResourcesForGroupVersion("wardle.example.com/v1alpha1") + if err != nil { + t.Log("wardle discovery failed", err) + return false, nil + } + if len(apiResources.APIResources) != 2 { + t.Log("wardle discovery has wrong resources", apiResources.APIResources) + return false, nil + } + resources := make([]string, 0, 2) + for _, resource := range apiResources.APIResources { + resource := resource + resources = append(resources, resource.Name) + } + sort.Strings(resources) + if !reflect.DeepEqual([]string{"fischers", "flunders"}, resources) { + return false, fmt.Errorf("unexpected resources: %v", resources) + } + + return true, nil + }) + if err != nil { + t.Fatal(err) + } +} + +func getKubeConfig(testServer *kastesting.TestServer) *rest.Config { + kubeClientConfig := rest.CopyConfig(testServer.ClientConfig) + // force json because everything speaks it + kubeClientConfig.ContentType = "" + kubeClientConfig.AcceptContentTypes = "" + + return kubeClientConfig +} + +func waitForWardleRunning(ctx context.Context, t *testing.T, wardleToKASKubeConfig *rest.Config, wardleCertDir string, wardlePort int) (*rest.Config, error) { + directWardleClientConfig := rest.AnonymousClientConfig(rest.CopyConfig(wardleToKASKubeConfig)) + directWardleClientConfig.CAFile = wardleCAFilePath(wardleCertDir) + directWardleClientConfig.CAData = nil + directWardleClientConfig.ServerName = "" + directWardleClientConfig.BearerToken = wardleToKASKubeConfig.BearerToken + var wardleClient client.Interface + lastHealthContent := []byte{} + var lastHealthErr error + err := wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) { + if _, err := os.Stat(directWardleClientConfig.CAFile); os.IsNotExist(err) { // wait until the file trust is created + lastHealthErr = err + return false, nil + } + directWardleClientConfig.Host = fmt.Sprintf("https://127.0.0.1:%d", wardlePort) + wardleClient, err = client.NewForConfig(directWardleClientConfig) + if err != nil { + // this happens because we race the API server start + t.Log(err) + return false, nil + } + healthStatus := 0 + result := wardleClient.Discovery().RESTClient().Get().AbsPath("/healthz").Do(ctx).StatusCode(&healthStatus) + lastHealthContent, lastHealthErr = result.Raw() + if healthStatus != http.StatusOK { + return false, nil + } + return true, nil + }) + if err != nil { + t.Log(string(lastHealthContent)) + t.Log(lastHealthErr) + return nil, err + } + + return directWardleClientConfig, nil +} + +func wardleCAFilePath(wardleCertDir string) string { return path.Join(wardleCertDir, "apiserver.crt") } + func writeKubeConfigForWardleServerToKASConnection(t *testing.T, kubeClientConfig *rest.Config) string { // write a kubeconfig out for starting other API servers with delegated auth. remember, no in-cluster config // the loopback client config uses a loopback cert with different SNI. We need to use the "real"