From bf41b0462ccd9c0ecc97a9f64fd499e2d4284f85 Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Tue, 22 Aug 2023 20:30:49 +0800 Subject: [PATCH] Fix OpenAPI aggregation cleanup There were four issues in OpenAPI aggregation cleanup: 1. When removing an APIService, openAPIAggregationController was called twice while openAPIV3AggregationController was never called, leading to OpenAPI v3 for the APIService not cleaned up. 2. When removing a local APIService, v2 specAggregator should not return ErrAPIServiceNotFound when it doesn't find the APIService because local APIServices were never added to its cache, otherwise confusing error logs would be generated. Besides, the method's comment indicates that the desired behavior is that no error is returned if the APIService does not exist. 3. When removing an APIService, v3 specProxier should update openapiv2converter's cache, like when updating an APIService, otherwise the API would not be removed from "/openapi/v3". 4. When v3 AggregationController reconciles an APIService, it should stop requeueing it if it fails with ErrAPIServiceNotFound as the APIService has been removed, like what v2 AggregationController does, otherwise it would keep reconciling the APIService forever. Signed-off-by: Quan Tian --- .../pkg/apiserver/apiserver.go | 2 +- .../openapi/aggregator/aggregator.go | 8 +-- .../pkg/controllers/openapi/controller.go | 4 +- .../openapiv3/aggregator/aggregator.go | 7 ++- .../openapiv3/aggregator/aggregator_test.go | 13 +++++ .../pkg/controllers/openapiv3/controller.go | 7 ++- test/e2e/apimachinery/openapiv3.go | 56 ++++++++++++++++++- 7 files changed, 82 insertions(+), 15 deletions(-) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index 73e818d261b..b49f9cd3b87 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -564,7 +564,7 @@ func (s *APIAggregator) RemoveAPIService(apiServiceName string) { s.openAPIAggregationController.RemoveAPIService(apiServiceName) } if s.openAPIV3AggregationController != nil { - s.openAPIAggregationController.RemoveAPIService(apiServiceName) + s.openAPIV3AggregationController.RemoveAPIService(apiServiceName) } delete(s.proxyHandlers, apiServiceName) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go index 22a312eff01..8491293e725 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go @@ -44,8 +44,9 @@ var ErrAPIServiceNotFound = errors.New("resource not found") // known specs including the http etag. type SpecAggregator interface { AddUpdateAPIService(apiService *v1.APIService, handler http.Handler) error + // UpdateAPIServiceSpec updates the APIService. It returns ErrAPIServiceNotFound if the APIService doesn't exist. UpdateAPIServiceSpec(apiServiceName string) error - RemoveAPIService(apiServiceName string) error + RemoveAPIService(apiServiceName string) } const ( @@ -231,17 +232,16 @@ func (s *specAggregator) AddUpdateAPIService(apiService *v1.APIService, handler // RemoveAPIService removes an api service from OpenAPI aggregation. If it does not exist, no error is returned. // It is thread safe. -func (s *specAggregator) RemoveAPIService(apiServiceName string) error { +func (s *specAggregator) RemoveAPIService(apiServiceName string) { s.mutex.Lock() defer s.mutex.Unlock() if _, exists := s.specsByAPIServiceName[apiServiceName]; !exists { - return ErrAPIServiceNotFound + return } delete(s.specsByAPIServiceName, apiServiceName) // Re-create the mergeSpec for the new list of apiservices s.openAPIVersionedService.UpdateSpecLazy(s.buildMergeSpecLocked()) - return nil } // decorateError creates a new cache that wraps a downloader diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/controller.go index 780eb2090bb..5f107150c8e 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/controller.go @@ -155,9 +155,7 @@ func (c *AggregationController) UpdateAPIService(handler http.Handler, apiServic // RemoveAPIService removes API Service from OpenAPI Aggregation Controller. func (c *AggregationController) RemoveAPIService(apiServiceName string) { - if err := c.openAPIAggregationManager.RemoveAPIService(apiServiceName); err != nil { - utilruntime.HandleError(fmt.Errorf("removing %q from AggregationController failed with: %v", apiServiceName, err)) - } + c.openAPIAggregationManager.RemoveAPIService(apiServiceName) // This will only remove it if it was failing before. If it was successful, processNextWorkItem will figure it out // and will not add it again to the queue. c.queue.Forget(apiServiceName) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator.go index 9f7ad4ff975..23632ff32d1 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator.go @@ -19,6 +19,7 @@ package aggregator import ( "bytes" "encoding/json" + "errors" "fmt" "net/http" "strings" @@ -41,9 +42,12 @@ import ( v2aggregator "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator" ) +var ErrAPIServiceNotFound = errors.New("resource not found") + // SpecProxier proxies OpenAPI V3 requests to their respective APIService type SpecProxier interface { AddUpdateAPIService(handler http.Handler, apiService *v1.APIService) + // UpdateAPIServiceSpec updates the APIService. It returns ErrAPIServiceNotFound if the APIService doesn't exist. UpdateAPIServiceSpec(apiServiceName string) error RemoveAPIServiceSpec(apiServiceName string) GetAPIServiceNames() []string @@ -161,7 +165,7 @@ func (s *specProxier) UpdateAPIServiceSpec(apiServiceName string) error { func (s *specProxier) updateAPIServiceSpecLocked(apiServiceName string) error { apiService, exists := s.apiServiceInfo[apiServiceName] if !exists { - return fmt.Errorf("APIService %s does not exist for update", apiServiceName) + return ErrAPIServiceNotFound } if !apiService.isLegacyAPIService { @@ -225,6 +229,7 @@ func (s *specProxier) RemoveAPIServiceSpec(apiServiceName string) { defer s.rwMutex.Unlock() if apiServiceInfo, ok := s.apiServiceInfo[apiServiceName]; ok { s.openAPIV2ConverterHandler.DeleteGroupVersion(getGroupVersionStringFromAPIService(apiServiceInfo.apiService)) + _ = s.updateAPIServiceSpecLocked(openAPIV2Converter) delete(s.apiServiceInfo, apiServiceName) } } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator_test.go index 5ffed17913f..eb0b090e23a 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator_test.go @@ -131,6 +131,19 @@ func TestV2APIService(t *testing.T) { apiServiceNames := specProxier.GetAPIServiceNames() assert.ElementsMatch(t, []string{openAPIV2Converter, apiService.Name}, apiServiceNames) + + // Ensure that OpenAPI v3 for legacy APIService is removed. + specProxier.RemoveAPIServiceSpec(apiService.Name) + data = sendReq(t, serveHandler, "/openapi/v3") + groupVersionList = handler3.OpenAPIV3Discovery{} + if err := json.Unmarshal(data, &groupVersionList); err != nil { + t.Fatal(err) + } + + path, ok = groupVersionList.Paths["apis/group.example.com/v1"] + if ok { + t.Error("Expected group.example.com/v1 not to be in group version list") + } } func TestV3APIService(t *testing.T) { diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/controller.go index ed69afe40c1..0f5e37c4522 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/controller.go @@ -133,9 +133,10 @@ func (c *AggregationController) processNextWorkItem() bool { } func (c *AggregationController) sync(key string) (syncAction, error) { - err := c.openAPIAggregationManager.UpdateAPIServiceSpec(key) - switch { - case err != nil: + if err := c.openAPIAggregationManager.UpdateAPIServiceSpec(key); err != nil { + if err == aggregator.ErrAPIServiceNotFound { + return syncNothing, nil + } return syncRequeueRateLimited, err } return syncRequeue, nil diff --git a/test/e2e/apimachinery/openapiv3.go b/test/e2e/apimachinery/openapiv3.go index b26c4cf86df..7a939b059b7 100644 --- a/test/e2e/apimachinery/openapiv3.go +++ b/test/e2e/apimachinery/openapiv3.go @@ -19,6 +19,7 @@ package apimachinery import ( "context" "encoding/json" + "fmt" "reflect" "strings" "time" @@ -28,8 +29,10 @@ import ( apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" "k8s.io/apiextensions-apiserver/test/integration/fixtures" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/storage/names" "k8s.io/client-go/dynamic" "k8s.io/client-go/openapi3" aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset" @@ -97,12 +100,34 @@ var _ = SIGDescribe("OpenAPIV3", func() { framework.ExpectNoError(err) dynamicClient, err := dynamic.NewForConfig(config) framework.ExpectNoError(err) - crd := fixtures.NewRandomNameV1CustomResourceDefinition(apiextensionsv1.NamespaceScoped) + resourceName := "testcrd" + // Generate a CRD with random group name to avoid group conflict with other tests that run in parallel. + groupName := fmt.Sprintf("%s.example.com", names.SimpleNameGenerator.GenerateName("group")) + crd := &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("%ss.%s", resourceName, groupName)}, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Group: groupName, + Versions: []apiextensionsv1.CustomResourceDefinitionVersion{ + { + Name: "v1beta1", + Served: true, + Storage: true, + Schema: fixtures.AllowAllSchema(), + }, + }, + Names: apiextensionsv1.CustomResourceDefinitionNames{ + Plural: resourceName + "s", + Singular: resourceName, + Kind: resourceName, + ListKind: resourceName + "List", + }, + Scope: apiextensionsv1.NamespaceScoped, + }, + } gv := schema.GroupVersion{Group: crd.Spec.Group, Version: crd.Spec.Versions[0].Name} _, err = fixtures.CreateNewV1CustomResourceDefinition(crd, apiExtensionClient, dynamicClient) defer func() { - err = fixtures.DeleteV1CustomResourceDefinition(crd, apiExtensionClient) - framework.ExpectNoError(err, "deleting CustomResourceDefinition") + _ = fixtures.DeleteV1CustomResourceDefinition(crd, apiExtensionClient) }() framework.ExpectNoError(err) @@ -126,6 +151,19 @@ var _ = SIGDescribe("OpenAPIV3", func() { diff := cmp.Diff(*openAPISpec, spec2) framework.Failf("%s", diff) } + + err = fixtures.DeleteV1CustomResourceDefinition(crd, apiExtensionClient) + framework.ExpectNoError(err, "deleting CustomResourceDefinition") + // Poll for the OpenAPI to be updated with the deleted CRD + err = wait.PollUntilContextTimeout(ctx, time.Second*1, wait.ForeverTestTimeout, true, func(_ context.Context) (bool, error) { + _, err = c.GVSpec(gv) + if err == nil { + return false, nil + } + _, isNotFound := err.(*openapi3.GroupVersionNotFoundError) + return isNotFound, nil + }) + framework.ExpectNoError(err, "should not contain OpenAPI V3 for deleted CustomResourceDefinition") }) /* @@ -163,5 +201,17 @@ var _ = SIGDescribe("OpenAPIV3", func() { diff := cmp.Diff(*openAPISpec, spec2) framework.Failf("%s", diff) } + + cleanupSampleAPIServer(ctx, f.ClientSet, aggrclient, names, "v1beta1.wardle.example.com") + // Poll for the OpenAPI to be updated with the deleted aggregated apiserver. + err = wait.PollUntilContextTimeout(ctx, time.Second*1, wait.ForeverTestTimeout, true, func(_ context.Context) (bool, error) { + _, err = c.GVSpec(gv) + if err == nil { + return false, nil + } + _, isNotFound := err.(*openapi3.GroupVersionNotFoundError) + return isNotFound, nil + }) + framework.ExpectNoError(err, "should not contain OpenAPI V3 for deleted APIService") }) })