diff --git a/test/integration/apiserver/discovery/discovery_test.go b/test/integration/apiserver/discovery/discovery_test.go new file mode 100644 index 00000000000..51604a105a0 --- /dev/null +++ b/test/integration/apiserver/discovery/discovery_test.go @@ -0,0 +1,472 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package discovery + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "net/http" + "reflect" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + runtimeserializer "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" + genericfeatures "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/discovery" + "k8s.io/client-go/dynamic" + kubernetes "k8s.io/client-go/kubernetes" + k8sscheme "k8s.io/client-go/kubernetes/scheme" + featuregatetesting "k8s.io/component-base/featuregate/testing" + apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" + aggregator "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset" + aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme" + kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + + "k8s.io/kubernetes/test/integration/framework" +) + +type kubeClientSet = kubernetes.Interface +type aggegatorClientSet = aggregator.Interface +type apiextensionsClientSet = apiextensions.Interface +type dynamicClientset = dynamic.Interface +type testClientSet struct { + kubeClientSet + aggegatorClientSet + apiextensionsClientSet + dynamicClientset +} + +func (t testClientSet) Discovery() discovery.DiscoveryInterface { + return t.kubeClientSet.Discovery() +} + +var ( + scheme = runtime.NewScheme() + codecs = runtimeserializer.NewCodecFactory(scheme) + serialize runtime.NegotiatedSerializer + + basicTestGroup = apidiscoveryv2beta1.APIGroupDiscovery{ + ObjectMeta: metav1.ObjectMeta{ + Name: "stable.example.com", + }, + Versions: []apidiscoveryv2beta1.APIVersionDiscovery{ + { + Version: "v1", + Resources: []apidiscoveryv2beta1.APIResourceDiscovery{ + { + Resource: "jobs", + Verbs: []string{"create", "list", "watch", "delete"}, + ShortNames: []string{"jz"}, + Categories: []string{"all"}, + }, + }, + Freshness: apidiscoveryv2beta1.DiscoveryFreshnessCurrent, + }, + }, + } +) + +func init() { + // Add all builtin types to scheme + utilruntime.Must(k8sscheme.AddToScheme(scheme)) + utilruntime.Must(aggregatorclientsetscheme.AddToScheme(scheme)) + utilruntime.Must(apiextensionsv1.AddToScheme(scheme)) + + info, ok := runtime.SerializerInfoForMediaType(codecs.SupportedMediaTypes(), runtime.ContentTypeJSON) + if !ok { + panic("failed to create serializer info") + } + + serialize = runtime.NewSimpleNegotiatedSerializer(info) +} + +// Spins up an api server which is cleaned up at the end up the test +// Returns some kubernetes clients +func setup(t *testing.T) (context.Context, testClientSet, context.CancelFunc) { + ctx, cancelCtx := context.WithCancel(context.Background()) + + server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) + t.Cleanup(server.TearDownFn) + + kubeClientSet, err := kubernetes.NewForConfig(server.ClientConfig) + require.NoError(t, err) + + aggegatorClientSet, err := aggregator.NewForConfig(server.ClientConfig) + require.NoError(t, err) + + apiextensionsClientSet, err := apiextensions.NewForConfig(server.ClientConfig) + require.NoError(t, err) + + dynamicClientset, err := dynamic.NewForConfig(server.ClientConfig) + require.NoError(t, err) + + client := testClientSet{ + kubeClientSet: kubeClientSet, + aggegatorClientSet: aggegatorClientSet, + apiextensionsClientSet: apiextensionsClientSet, + dynamicClientset: dynamicClientset, + } + return ctx, client, cancelCtx +} + +func registerAPIService(ctx context.Context, client aggregator.Interface, gv metav1.GroupVersion, service FakeService) error { + port := service.Port() + if port == nil { + return errors.New("service not yet started") + } + // Register the APIService + patch := apiregistrationv1.APIService{ + ObjectMeta: metav1.ObjectMeta{ + Name: gv.Version + "." + gv.Group, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "APIService", + APIVersion: "apiregistration.k8s.io/v1", + }, + Spec: apiregistrationv1.APIServiceSpec{ + Group: gv.Group, + Version: gv.Version, + InsecureSkipTLSVerify: true, + GroupPriorityMinimum: 1000, + VersionPriority: 15, + Service: &apiregistrationv1.ServiceReference{ + Namespace: "default", + Name: service.Name(), + Port: port, + }, + }, + } + + _, err := client. + ApiregistrationV1(). + APIServices(). + Create(context.TODO(), &patch, metav1.CreateOptions{FieldManager: "test-manager"}) + return err +} + +func unregisterAPIService(ctx context.Context, client aggregator.Interface, gv metav1.GroupVersion) error { + return client.ApiregistrationV1().APIServices().Delete(ctx, gv.Version+"."+gv.Group, metav1.DeleteOptions{}) +} + +func WaitForGroupsAbsent(ctx context.Context, client testClientSet, groups ...string) error { + return WaitForResultWithCondition(ctx, client, func(groupList apidiscoveryv2beta1.APIGroupDiscoveryList) bool { + for _, searchGroup := range groups { + for _, docGroup := range groupList.Items { + if docGroup.Name == searchGroup { + return false + } + } + } + return true + }) + +} + +func WaitForGroups(ctx context.Context, client testClientSet, groups ...apidiscoveryv2beta1.APIGroupDiscovery) error { + return WaitForResultWithCondition(ctx, client, func(groupList apidiscoveryv2beta1.APIGroupDiscoveryList) bool { + for _, searchGroup := range groups { + for _, docGroup := range groupList.Items { + if reflect.DeepEqual(searchGroup, docGroup) { + return true + } + } + } + return false + }) +} + +func WaitForResultWithCondition(ctx context.Context, client testClientSet, condition func(result apidiscoveryv2beta1.APIGroupDiscoveryList) bool) error { + // Keep repeatedly fetching document from aggregator. + // Check to see if it contains our service within a reasonable amount of time + return wait.PollWithContext( + ctx, + 250*time.Millisecond, + 1*time.Second, + func(ctx context.Context) (done bool, err error) { + result, err := client. + Discovery(). + RESTClient(). + Get(). + AbsPath("/apis"). + SetHeader("Accept", "application/json;g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList"). + Do(ctx). + Raw() + + if err != nil { + return false, err + } + + groupList := apidiscoveryv2beta1.APIGroupDiscoveryList{} + err = json.Unmarshal(result, &groupList) + if err != nil { + panic(err) + } + + if condition(groupList) { + return true, nil + } + + return false, nil + }) +} + +func TestAggregatedAPIServiceDiscovery(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.AggregatedDiscoveryEndpoint, true)() + + // Keep any goroutines spawned from running past the execution of this test + ctx, client, cleanup := setup(t) + defer cleanup() + + // Create a resource manager whichs serves our GroupVersion + resourceManager := discoveryendpoint.NewResourceManager() + resourceManager.SetGroups([]apidiscoveryv2beta1.APIGroupDiscovery{basicTestGroup}) + + // Install our ResourceManager as an Aggregated APIService to the + // test server + service := NewFakeService("test-server", client, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if strings.HasPrefix(r.URL.Path, "/apis") { + resourceManager.ServeHTTP(w, r) + } else if strings.HasPrefix(r.URL.Path, "/apis/stable.example.com") { + // Return invalid response so APIService can be marked as "available" + w.WriteHeader(http.StatusOK) + } else { + // reject openapi/v2, openapi/v3, apis// + w.WriteHeader(http.StatusNotFound) + } + })) + service.Start(t, ctx) + + // For each groupversion served by our resourcemanager, create an APIService + // object connected to our fake APIServer + for _, versionInfo := range basicTestGroup.Versions { + groupVersion := metav1.GroupVersion{ + Group: basicTestGroup.Name, + Version: versionInfo.Version, + } + + require.NoError(t, registerAPIService(ctx, client, groupVersion, service)) + defer func() { + require.NoError(t, unregisterAPIService(ctx, client, groupVersion)) + }() + } + + // Keep repeatedly fetching document from aggregator. + // Check to see if it contains our service within a reasonable amount of time + require.NoError(t, WaitForGroups(ctx, client, basicTestGroup)) +} + +// Shows that the following sequence is handled correctly: +// 1. Create an APIService +// - Check that API service is in discovery doc +// 2. Create CRD with the same GroupVersion as APIService +// 3. Delete APIService +// - Check that API service is removed from discovery +// 4. Update CRD +// - Check that CRD is in discovery document +func TestOverlappingCRDAndAPIService(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.AggregatedDiscoveryEndpoint, true)() + // Keep any goroutines spawned from running past the execution of this test + ctx, client, cleanup := setup(t) + defer cleanup() + + // Create a resource manager whichs serves our GroupVersion + resourceManager := discoveryendpoint.NewResourceManager() + resourceManager.SetGroups([]apidiscoveryv2beta1.APIGroupDiscovery{basicTestGroup}) + + // Install our ResourceManager as an Aggregated APIService to the + // test server + service := NewFakeService("test-server", client, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/apis" { + resourceManager.ServeHTTP(w, r) + } else if strings.HasPrefix(r.URL.Path, "/apis/") { + // Return "valid" response so APIService can be marked as "available" + w.WriteHeader(http.StatusOK) + } else { + // reject openapi/v2, openapi/v3, apis// + w.WriteHeader(http.StatusNotFound) + } + })) + service.Start(t, ctx) + + // For each groupversion served by our resourcemanager, create an APIService + // object connected to our fake APIServer + for _, versionInfo := range basicTestGroup.Versions { + groupVersion := metav1.GroupVersion{ + Group: basicTestGroup.Name, + Version: versionInfo.Version, + } + + registerAPIService(ctx, client, groupVersion, service) + } + + // Keep repeatedly fetching document from aggregator. + // Check to see if it contains our service within a reasonable amount of time + require.NoError(t, WaitForGroups(ctx, client, basicTestGroup)) + + // Create a CRD + crd, err := client.ApiextensionsV1().CustomResourceDefinitions().Create(ctx, &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foos.stable.example.com", + }, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Group: "stable.example.com", + Names: apiextensionsv1.CustomResourceDefinitionNames{ + Singular: "foo", + Plural: "foos", + Kind: "Foo", + }, + Scope: apiextensionsv1.ClusterScoped, + Versions: []apiextensionsv1.CustomResourceDefinitionVersion{ + { + Name: "v1", + Served: true, + Storage: true, + Schema: &apiextensionsv1.CustomResourceValidation{ + OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{ + Type: "object", + Properties: map[string]apiextensionsv1.JSONSchemaProps{ + "stringMap": { + Description: "a map[string]string", + Type: "object", + AdditionalProperties: &apiextensionsv1.JSONSchemaPropsOrBool{ + Schema: &apiextensionsv1.JSONSchemaProps{ + Type: "string", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, metav1.CreateOptions{ + FieldManager: "test-manager", + }) + require.NoError(t, err) + + // Create a CR for the CRD + // Keep trying until it succeeds (or should we try for discovery?) + require.NoError(t, wait.PollWithContext(ctx, 100*time.Millisecond, 1*time.Second, func(ctx context.Context) (done bool, err error) { + toCreate := &unstructured.Unstructured{} + toCreate.SetUnstructuredContent(map[string]any{ + "apiVersion": "stable.example.com/v1", + "kind": "Foo", + "key": "value", + }) + + _, err = client.dynamicClientset.Resource(schema.GroupVersionResource{ + Group: "stable.example.com", + Version: "v1", + Resource: "foos", + }).Create(ctx, toCreate, metav1.CreateOptions{ + FieldManager: "test-manager", + }) + return err != nil, nil + })) + + // For each groupversion served by our resourcemanager, delete an APIService + // object connected to our fake APIServer + for _, versionInfo := range basicTestGroup.Versions { + groupVersion := metav1.GroupVersion{ + Group: basicTestGroup.Name, + Version: versionInfo.Version, + } + + unregisterAPIService(ctx, client, groupVersion) + } + + // Wait for the apiservice to be deleted from discovery + require.NoError(t, WaitForGroupsAbsent(ctx, client, "stable.example.com")) + + // Update the CRD with a minor change to show that reconciliation will + // eventually refresh the discovery group on resync + obj := &unstructured.Unstructured{} + obj.SetUnstructuredContent(map[string]interface{}{ + "apiVersion": "apiextensions.k8s.io/v1", + "kind": "CustomResourceDefinition", + "metadata": map[string]any{ + "name": crd.Name, + }, + "spec": map[string]interface{}{ + "names": map[string]any{ + "categories": []string{"all"}, + }, + }, + }) + + buf := bytes.NewBuffer(nil) + err = unstructured.UnstructuredJSONScheme.Encode(obj, buf) + require.NoError(t, err) + + //Is there a better way to force crd resync? + _, err = client.ApiextensionsV1().CustomResourceDefinitions().Patch( + ctx, + crd.Name, + types.ApplyPatchType, + buf.Bytes(), + metav1.PatchOptions{ + FieldManager: "test-manager", + }, + ) + require.NoError(t, err) + + // Wait until the crd appears in discovery + expectedDiscovery := apidiscoveryv2beta1.APIGroupDiscovery{ + ObjectMeta: metav1.ObjectMeta{ + Name: basicTestGroup.Name, + }, + Versions: []apidiscoveryv2beta1.APIVersionDiscovery{ + { + Version: "v1", + Resources: []apidiscoveryv2beta1.APIResourceDiscovery{ + { + Resource: "foos", + ResponseKind: &metav1.GroupVersionKind{ + Group: basicTestGroup.Name, + Version: "v1", + Kind: "Foo", + }, + Scope: apidiscoveryv2beta1.ScopeCluster, + SingularResource: crd.Spec.Names.Singular, + Verbs: []string{"delete", "deletecollection", "get", "list", "patch", "create", "update", "watch"}, + Categories: []string{"all"}, + }, + }, + //!TODO: set freshness of builtin/crds + Freshness: "", + }, + }, + } + require.NoError(t, WaitForGroups(ctx, client, expectedDiscovery)) +} diff --git a/test/integration/apiserver/discovery/main_test.go b/test/integration/apiserver/discovery/main_test.go new file mode 100644 index 00000000000..9a89e89a85c --- /dev/null +++ b/test/integration/apiserver/discovery/main_test.go @@ -0,0 +1,27 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package discovery + +import ( + "testing" + + "k8s.io/kubernetes/test/integration/framework" +) + +func TestMain(m *testing.M) { + framework.EtcdMain(m.Run) +} diff --git a/test/integration/apiserver/discovery/service.go b/test/integration/apiserver/discovery/service.go new file mode 100644 index 00000000000..de3d1854943 --- /dev/null +++ b/test/integration/apiserver/discovery/service.go @@ -0,0 +1,146 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package discovery + +import ( + "context" + "errors" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/util/wait" + + "k8s.io/client-go/kubernetes" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + corev1apply "k8s.io/client-go/applyconfigurations/core/v1" +) + +type FakeService interface { + Start(t *testing.T, ctx context.Context) + Port() *int32 + Name() string +} + +// Creates and registers an in-process Service capable of communicating with the +// kubernetes integration test apiserver +type fakeService struct { + name string + client kubernetes.Interface + handler http.Handler + + lock sync.RWMutex + activePort *int32 +} + +func NewFakeService(name string, client kubernetes.Interface, handler http.Handler) *fakeService { + return &fakeService{ + name: name, + client: client, + handler: handler, + } +} + +func (f *fakeService) run(ctx context.Context) error { + aggregatedServer := httptest.NewUnstartedServer(f.handler) + aggregatedServer.StartTLS() + defer aggregatedServer.Close() + + serverURL, err := url.Parse(aggregatedServer.URL) + if err != nil { + // This should never occur + panic(err) + } + + serverPort, err := strconv.Atoi(serverURL.Port()) + if err != nil { + // This should never occur + panic(err) + } + + port := int32(serverPort) + + // Install service into the cluster + service, err := f.client.CoreV1().Services("default").Apply( + ctx, + corev1apply.Service(f.name, "default"). + WithSpec(corev1apply.ServiceSpec(). + WithPorts( + corev1apply.ServicePort(). + WithPort(port)). + WithType("ExternalName"). + WithExternalName("localhost")), + metav1.ApplyOptions{ + FieldManager: "test-manager", + }, + ) + if err != nil { + return err + } + + f.lock.Lock() + f.activePort = &port + f.lock.Unlock() + + <-ctx.Done() + + f.lock.Lock() + f.activePort = nil + f.lock.Unlock() + + // Uninstall service from the cluser + err = f.client.CoreV1().Services("default").Delete(ctx, service.Name, metav1.DeleteOptions{}) + return err +} + +func (f *fakeService) Start(t *testing.T, ctx context.Context) { + go func() { + err := f.run(ctx) + if errors.Is(err, context.Canceled) { + err = nil + } + require.NoError(t, err) + }() + + err := wait.PollWithContext(ctx, 1*time.Second, 200*time.Millisecond, func(ctx context.Context) (done bool, err error) { + return f.Port() != nil, nil + }) + + if errors.Is(err, context.Canceled) { + err = nil + } + require.NoError(t, err, "service should have come alive in a reasonable amount of time") +} + +func (f *fakeService) Port() *int32 { + // Returns the port of the server if it is running or nil + // if it is not running + f.lock.RLock() + defer f.lock.RUnlock() + return f.activePort +} + +func (f *fakeService) Name() string { + return f.name +}