diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index f05bb831e11..3dc7400ae63 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -641,7 +641,27 @@ func Complete(s *options.ServerRunOptions) (completedServerRunOptions, error) { return options, nil } +var testServiceResolver webhook.ServiceResolver + +// SetServiceResolverForTests allows the service resolver to be overridden during tests. +// Tests using this function must run serially as this function is not safe to call concurrently with server start. +func SetServiceResolverForTests(resolver webhook.ServiceResolver) func() { + if testServiceResolver != nil { + panic("test service resolver is set: tests are either running concurrently or clean up was skipped") + } + + testServiceResolver = resolver + + return func() { + testServiceResolver = nil + } +} + func buildServiceResolver(enabledAggregatorRouting bool, hostname string, informer clientgoinformers.SharedInformerFactory) webhook.ServiceResolver { + if testServiceResolver != nil { + return testServiceResolver + } + var serviceResolver webhook.ServiceResolver if enabledAggregatorRouting { serviceResolver = aggregatorapiserver.NewEndpointServiceResolver( diff --git a/cmd/kube-apiserver/app/testing/testserver.go b/cmd/kube-apiserver/app/testing/testserver.go index 01df67130d3..5a0952115d6 100644 --- a/cmd/kube-apiserver/app/testing/testserver.go +++ b/cmd/kube-apiserver/app/testing/testserver.go @@ -18,6 +18,7 @@ package testing import ( "context" + "crypto/x509" "fmt" "net" "os" @@ -35,17 +36,18 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/wait" + serveroptions "k8s.io/apiserver/pkg/server/options" "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storageversion" "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" "k8s.io/client-go/util/cert" + "k8s.io/klog/v2" "k8s.io/kube-aggregator/pkg/apiserver" "k8s.io/kubernetes/cmd/kube-apiserver/app" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" + "k8s.io/kubernetes/cmd/kubeadm/app/util/pkiutil" testutil "k8s.io/kubernetes/test/utils" - - "k8s.io/klog/v2" ) // This key is for testing purposes only and is not considered secure. @@ -144,6 +146,10 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo s.SecureServing.ServerCert.CertDirectory = result.TmpDir if instanceOptions.EnableCertAuth { + // set up default headers for request header auth + reqHeaders := serveroptions.NewDelegatingAuthenticationOptions() + s.Authentication.RequestHeader = &reqHeaders.RequestHeader + // create certificates for aggregation and client-cert auth proxySigningKey, err := testutil.NewPrivateKey() if err != nil { @@ -158,6 +164,31 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo return result, err } s.Authentication.RequestHeader.ClientCAFile = proxyCACertFile + + // give the kube api server an "identity" it can use to for request header auth + // so that aggregated api servers can understand who the calling user is + s.Authentication.RequestHeader.AllowedNames = []string{"ash", "misty", "brock"} + // make a client certificate for the api server - common name has to match one of our defined names above + tenThousandHoursLater := time.Now().Add(10_000 * time.Hour) + clientCrtOfAPIServer, signer, err := pkiutil.NewCertAndKey(proxySigningCert, proxySigningKey, &pkiutil.CertConfig{ + Config: cert.Config{ + CommonName: "misty", + Usages: []x509.ExtKeyUsage{ + x509.ExtKeyUsageClientAuth, + }, + }, + NotAfter: &tenThousandHoursLater, + PublicKeyAlgorithm: x509.ECDSA, + }) + if err != nil { + return result, err + } + if err := pkiutil.WriteCertAndKey(s.SecureServing.ServerCert.CertDirectory, "misty-crt", clientCrtOfAPIServer, signer); err != nil { + return result, err + } + s.ProxyClientKeyFile = path.Join(s.SecureServing.ServerCert.CertDirectory, "misty-crt.key") + s.ProxyClientCertFile = path.Join(s.SecureServing.ServerCert.CertDirectory, "misty-crt.crt") + clientSigningKey, err := testutil.NewPrivateKey() if err != nil { return result, err diff --git a/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go b/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go index 33df43b3cd3..1f57e82d02a 100644 --- a/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go +++ b/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go @@ -51,6 +51,8 @@ type WardleServerOptions struct { SharedInformerFactory informers.SharedInformerFactory StdOut io.Writer StdErr io.Writer + + AlternateDNS []string } // NewWardleServerOptions returns a new WardleServerOptions @@ -117,7 +119,7 @@ func (o *WardleServerOptions) Complete() error { // Config returns config for the api server given WardleServerOptions func (o *WardleServerOptions) Config() (*apiserver.Config, error) { // TODO have a "real" external address - if err := o.RecommendedOptions.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{netutils.ParseIPSloppy("127.0.0.1")}); err != nil { + if err := o.RecommendedOptions.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", o.AlternateDNS, []net.IP{netutils.ParseIPSloppy("127.0.0.1")}); err != nil { return nil, fmt.Errorf("error creating self-signed certificates: %v", err) } diff --git a/test/integration/examples/apiserver_test.go b/test/integration/examples/apiserver_test.go index 391701a78cd..9669ef52de8 100644 --- a/test/integration/examples/apiserver_test.go +++ b/test/integration/examples/apiserver_test.go @@ -22,21 +22,21 @@ import ( "fmt" "net" "net/http" + "net/url" "os" "path" "reflect" + "sort" "testing" "time" "github.com/stretchr/testify/assert" - apierrors "k8s.io/apimachinery/pkg/api/errors" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/server/dynamiccertificates" genericapiserveroptions "k8s.io/apiserver/pkg/server/options" - "k8s.io/client-go/discovery" client "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -44,21 +44,34 @@ import ( "k8s.io/client-go/util/cert" apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset" + "k8s.io/kubernetes/cmd/kube-apiserver/app" kastesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/test/integration/framework" wardlev1alpha1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1" wardlev1beta1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1beta1" sampleserver "k8s.io/sample-apiserver/pkg/cmd/server" + wardlev1alpha1client "k8s.io/sample-apiserver/pkg/generated/clientset/versioned/typed/wardle/v1alpha1" netutils "k8s.io/utils/net" ) func TestAggregatedAPIServer(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + t.Cleanup(cancel) + // makes the kube-apiserver very responsive. it's normally a minute dynamiccertificates.FileRefreshDuration = 1 * time.Second stopCh := make(chan struct{}) defer close(stopCh) + // we need the wardle port information first to set up the service resolver + listener, wardlePort, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0", net.ListenConfig{}) + if err != nil { + t.Fatal(err) + } + // endpoints cannot have loopback IPs so we need to override the resolver itself + t.Cleanup(app.SetServiceResolverForTests(staticURLServiceResolver(fmt.Sprintf("https://127.0.0.1:%d", wardlePort)))) + testServer := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true}, nil, framework.SharedEtcd()) defer testServer.TearDownFn() kubeClientConfig := rest.CopyConfig(testServer.ClientConfig) @@ -67,18 +80,41 @@ func TestAggregatedAPIServer(t *testing.T) { kubeClientConfig.AcceptContentTypes = "" kubeClient := client.NewForConfigOrDie(kubeClientConfig) aggregatorClient := aggregatorclient.NewForConfigOrDie(kubeClientConfig) + wardleClient := wardlev1alpha1client.NewForConfigOrDie(kubeClientConfig) + + // create the bare minimum resources required to be able to get the API service into an available state + _, err = kubeClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kube-wardle", + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + _, err = kubeClient.CoreV1().Services("kube-wardle").Create(ctx, &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "api", + }, + Spec: corev1.ServiceSpec{ + ExternalName: "needs-to-be-non-empty", + Type: corev1.ServiceTypeExternalName, + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } // start the wardle server to prove we can aggregate it wardleToKASKubeConfigFile := writeKubeConfigForWardleServerToKASConnection(t, rest.CopyConfig(kubeClientConfig)) defer os.Remove(wardleToKASKubeConfigFile) wardleCertDir, _ := os.MkdirTemp("", "test-integration-wardle-server") defer os.RemoveAll(wardleCertDir) - listener, wardlePort, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0", net.ListenConfig{}) - if err != nil { - t.Fatal(err) - } go func() { o := sampleserver.NewWardleServerOptions(os.Stdout, os.Stderr) + // ensure this is a SAN on the generated cert for service FQDN + o.AlternateDNS = []string{ + "api.kube-wardle.svc", + } o.RecommendedOptions.SecureServing.Listener = listener o.RecommendedOptions.SecureServing.BindAddress = netutils.ParseIPSloppy("127.0.0.1") wardleCmd := sampleserver.NewCommandStartWardleServer(o, stopCh) @@ -93,25 +129,22 @@ func TestAggregatedAPIServer(t *testing.T) { t.Error(err) } }() - directWardleClientConfig, err := waitForWardleRunning(t, kubeClientConfig, wardleCertDir, wardlePort) + directWardleClientConfig, err := waitForWardleRunning(ctx, t, kubeClientConfig, wardleCertDir, wardlePort) if err != nil { t.Fatal(err) } // now we're finally ready to test. These are what's run by default now - wardleClient, err := client.NewForConfig(directWardleClientConfig) - if err != nil { - t.Fatal(err) - } - testAPIGroupList(t, wardleClient.Discovery().RESTClient()) - testAPIGroup(t, wardleClient.Discovery().RESTClient()) - testAPIResourceList(t, wardleClient.Discovery().RESTClient()) + wardleDirectClient := client.NewForConfigOrDie(directWardleClientConfig) + testAPIGroupList(ctx, t, wardleDirectClient.Discovery().RESTClient()) + testAPIGroup(ctx, t, wardleDirectClient.Discovery().RESTClient()) + testAPIResourceList(ctx, t, wardleDirectClient.Discovery().RESTClient()) wardleCA, err := os.ReadFile(directWardleClientConfig.CAFile) if err != nil { t.Fatal(err) } - _, err = aggregatorClient.ApiregistrationV1().APIServices().Create(context.TODO(), &apiregistrationv1.APIService{ + _, err = aggregatorClient.ApiregistrationV1().APIServices().Create(ctx, &apiregistrationv1.APIService{ ObjectMeta: metav1.ObjectMeta{Name: "v1alpha1.wardle.example.com"}, Spec: apiregistrationv1.APIServiceSpec{ Service: &apiregistrationv1.ServiceReference{ @@ -129,16 +162,92 @@ func TestAggregatedAPIServer(t *testing.T) { t.Fatal(err) } - // wait for the unavailable API service to be processed with updated status - err = wait.Poll(1*time.Second, wait.ForeverTestTimeout, func() (done bool, err error) { + // wait for the API service to be available + err = wait.Poll(time.Second, wait.ForeverTestTimeout, func() (done bool, err error) { + apiService, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1alpha1.wardle.example.com", metav1.GetOptions{}) + if err != nil { + return false, err + } + var available bool + for _, condition := range apiService.Status.Conditions { + if condition.Type == apiregistrationv1.Available && condition.Status == apiregistrationv1.ConditionTrue { + available = true + break + } + } + if !available { + t.Log("api service is not available", apiService.Status.Conditions) + return false, nil + } + + // make sure discovery is healthy overall _, _, err = kubeClient.Discovery().ServerGroupsAndResources() - hasExpectedError := checkWardleUnavailableDiscoveryError(t, err) - return hasExpectedError, nil + if err != nil { + t.Log("discovery failed", err) + return false, nil + } + + // make sure we have the wardle resources in discovery + apiResources, err := kubeClient.Discovery().ServerResourcesForGroupVersion("wardle.example.com/v1alpha1") + if err != nil { + t.Log("wardle discovery failed", err) + return false, nil + } + if len(apiResources.APIResources) != 2 { + t.Log("wardle discovery has wrong resources", apiResources.APIResources) + return false, nil + } + resources := make([]string, 0, 2) + for _, resource := range apiResources.APIResources { + resource := resource + resources = append(resources, resource.Name) + } + sort.Strings(resources) + if !reflect.DeepEqual([]string{"fischers", "flunders"}, resources) { + return false, fmt.Errorf("unexpected resources: %v", resources) + } + + return true, nil }) if err != nil { t.Fatal(err) } - // TODO figure out how to turn on enough of services and dns to run more + + // perform simple CRUD operations against the wardle resources + _, err = wardleClient.Fischers().Create(ctx, &wardlev1alpha1.Fischer{ + ObjectMeta: metav1.ObjectMeta{ + Name: "panda", + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + fischersList, err := wardleClient.Fischers().List(ctx, metav1.ListOptions{}) + if err != nil { + t.Fatal(err) + } + if len(fischersList.Items) != 1 { + t.Errorf("expected one fischer: %#v", fischersList.Items) + } + if len(fischersList.ResourceVersion) == 0 { + t.Error("expected non-empty resource version for fischer list") + } + + _, err = wardleClient.Flunders(metav1.NamespaceSystem).Create(ctx, &wardlev1alpha1.Flunder{ + ObjectMeta: metav1.ObjectMeta{ + Name: "panda", + }, + }, metav1.CreateOptions{}) + flunderList, err := wardleClient.Flunders(metav1.NamespaceSystem).List(ctx, metav1.ListOptions{}) + if err != nil { + t.Fatal(err) + } + if len(flunderList.Items) != 1 { + t.Errorf("expected one flunder: %#v", flunderList.Items) + } + if len(flunderList.ResourceVersion) == 0 { + t.Error("expected non-empty resource version for flunder list") + } // Since ClientCAs are provided by "client-ca::kube-system::extension-apiserver-authentication::client-ca-file" controller // we need to wait until it picks up the configmap (via a lister) otherwise the response might contain an empty result. @@ -224,10 +333,9 @@ func TestAggregatedAPIServer(t *testing.T) { if numMatches != 4 { t.Fatal("names don't match") } - } -func waitForWardleRunning(t *testing.T, wardleToKASKubeConfig *rest.Config, wardleCertDir string, wardlePort int) (*rest.Config, error) { +func waitForWardleRunning(ctx context.Context, t *testing.T, wardleToKASKubeConfig *rest.Config, wardleCertDir string, wardlePort int) (*rest.Config, error) { directWardleClientConfig := rest.AnonymousClientConfig(rest.CopyConfig(wardleToKASKubeConfig)) directWardleClientConfig.CAFile = path.Join(wardleCertDir, "apiserver.crt") directWardleClientConfig.CAData = nil @@ -249,7 +357,7 @@ func waitForWardleRunning(t *testing.T, wardleToKASKubeConfig *rest.Config, ward return false, nil } healthStatus := 0 - result := wardleClient.Discovery().RESTClient().Get().AbsPath("/healthz").Do(context.TODO()).StatusCode(&healthStatus) + result := wardleClient.Discovery().RESTClient().Get().AbsPath("/healthz").Do(ctx).StatusCode(&healthStatus) lastHealthContent, lastHealthErr = result.Raw() if healthStatus != http.StatusOK { return false, nil @@ -301,33 +409,6 @@ func writeKubeConfigForWardleServerToKASConnection(t *testing.T, kubeClientConfi return wardleToKASKubeConfigFile.Name() } -func checkWardleUnavailableDiscoveryError(t *testing.T, err error) bool { - if err == nil { - t.Log("Discovery call expected to return failed unavailable service") - return false - } - if !discovery.IsGroupDiscoveryFailedError(err) { - t.Logf("Unexpected error: %T, %v", err, err) - return false - } - discoveryErr := err.(*discovery.ErrGroupDiscoveryFailed) - if len(discoveryErr.Groups) != 1 { - t.Logf("Unexpected failed groups: %v", err) - return false - } - groupVersion := schema.GroupVersion{Group: "wardle.example.com", Version: "v1alpha1"} - groupVersionErr, ok := discoveryErr.Groups[groupVersion] - if !ok { - t.Logf("Unexpected failed group version: %v", err) - return false - } - if !apierrors.IsServiceUnavailable(groupVersionErr) { - t.Logf("Unexpected failed group version error: %v", err) - return false - } - return true -} - func createKubeConfig(clientCfg *rest.Config) *clientcmdapi.Config { clusterNick := "cluster" userNick := "user" @@ -365,12 +446,12 @@ func createKubeConfig(clientCfg *rest.Config) *clientcmdapi.Config { return config } -func readResponse(client rest.Interface, location string) ([]byte, error) { - return client.Get().AbsPath(location).DoRaw(context.TODO()) +func readResponse(ctx context.Context, client rest.Interface, location string) ([]byte, error) { + return client.Get().AbsPath(location).DoRaw(ctx) } -func testAPIGroupList(t *testing.T, client rest.Interface) { - contents, err := readResponse(client, "/apis") +func testAPIGroupList(ctx context.Context, t *testing.T, client rest.Interface) { + contents, err := readResponse(ctx, client, "/apis") if err != nil { t.Fatalf("%v", err) } @@ -398,8 +479,8 @@ func testAPIGroupList(t *testing.T, client rest.Interface) { assert.Equal(t, v1beta1, apiGroupList.Groups[0].PreferredVersion) } -func testAPIGroup(t *testing.T, client rest.Interface) { - contents, err := readResponse(client, "/apis/wardle.example.com") +func testAPIGroup(ctx context.Context, t *testing.T, client rest.Interface) { + contents, err := readResponse(ctx, client, "/apis/wardle.example.com") if err != nil { t.Fatalf("%v", err) } @@ -416,8 +497,8 @@ func testAPIGroup(t *testing.T, client rest.Interface) { assert.Equal(t, apiGroup.PreferredVersion, apiGroup.Versions[0]) } -func testAPIResourceList(t *testing.T, client rest.Interface) { - contents, err := readResponse(client, "/apis/wardle.example.com/v1alpha1") +func testAPIResourceList(ctx context.Context, t *testing.T, client rest.Interface) { + contents, err := readResponse(ctx, client, "/apis/wardle.example.com/v1alpha1") if err != nil { t.Fatalf("%v", err) } @@ -472,3 +553,9 @@ MnVCuBwfwDXCAiEAw/1TA+CjPq9JC5ek1ifR0FybTURjeQqYkKpve1dveps= `) ) + +type staticURLServiceResolver string + +func (u staticURLServiceResolver) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) { + return url.Parse(string(u)) +}