Fix OpenAPI aggregation cleanup

There were four issues in OpenAPI aggregation cleanup:
1. When removing an APIService, openAPIAggregationController was called
   twice while openAPIV3AggregationController was never called, leading
   to OpenAPI v3 for the APIService not cleaned up.
2. When removing a local APIService, v2 specAggregator should not return
   ErrAPIServiceNotFound when it doesn't find the APIService because
   local APIServices were never added to its cache, otherwise confusing
   error logs would be generated. Besides, the method's comment
   indicates that the desired behavior is that no error is returned if
   the APIService does not exist.
3. When removing an APIService, v3 specProxier should update
   openapiv2converter's cache, like when updating an APIService,
   otherwise the API would not be removed from "/openapi/v3".
4. When v3 AggregationController reconciles an APIService, it should
   stop requeueing it if it fails with ErrAPIServiceNotFound as the
   APIService has been removed, like what v2 AggregationController does,
   otherwise it would keep reconciling the APIService forever.

Signed-off-by: Quan Tian <qtian@vmware.com>
This commit is contained in:
Quan Tian
2023-08-22 20:30:49 +08:00
parent f852d7fead
commit bf41b0462c
7 changed files with 82 additions and 15 deletions

View File

@@ -564,7 +564,7 @@ func (s *APIAggregator) RemoveAPIService(apiServiceName string) {
s.openAPIAggregationController.RemoveAPIService(apiServiceName) s.openAPIAggregationController.RemoveAPIService(apiServiceName)
} }
if s.openAPIV3AggregationController != nil { if s.openAPIV3AggregationController != nil {
s.openAPIAggregationController.RemoveAPIService(apiServiceName) s.openAPIV3AggregationController.RemoveAPIService(apiServiceName)
} }
delete(s.proxyHandlers, apiServiceName) delete(s.proxyHandlers, apiServiceName)

View File

@@ -44,8 +44,9 @@ var ErrAPIServiceNotFound = errors.New("resource not found")
// known specs including the http etag. // known specs including the http etag.
type SpecAggregator interface { type SpecAggregator interface {
AddUpdateAPIService(apiService *v1.APIService, handler http.Handler) error AddUpdateAPIService(apiService *v1.APIService, handler http.Handler) error
// UpdateAPIServiceSpec updates the APIService. It returns ErrAPIServiceNotFound if the APIService doesn't exist.
UpdateAPIServiceSpec(apiServiceName string) error UpdateAPIServiceSpec(apiServiceName string) error
RemoveAPIService(apiServiceName string) error RemoveAPIService(apiServiceName string)
} }
const ( const (
@@ -231,17 +232,16 @@ func (s *specAggregator) AddUpdateAPIService(apiService *v1.APIService, handler
// RemoveAPIService removes an api service from OpenAPI aggregation. If it does not exist, no error is returned. // RemoveAPIService removes an api service from OpenAPI aggregation. If it does not exist, no error is returned.
// It is thread safe. // It is thread safe.
func (s *specAggregator) RemoveAPIService(apiServiceName string) error { func (s *specAggregator) RemoveAPIService(apiServiceName string) {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
if _, exists := s.specsByAPIServiceName[apiServiceName]; !exists { if _, exists := s.specsByAPIServiceName[apiServiceName]; !exists {
return ErrAPIServiceNotFound return
} }
delete(s.specsByAPIServiceName, apiServiceName) delete(s.specsByAPIServiceName, apiServiceName)
// Re-create the mergeSpec for the new list of apiservices // Re-create the mergeSpec for the new list of apiservices
s.openAPIVersionedService.UpdateSpecLazy(s.buildMergeSpecLocked()) s.openAPIVersionedService.UpdateSpecLazy(s.buildMergeSpecLocked())
return nil
} }
// decorateError creates a new cache that wraps a downloader // decorateError creates a new cache that wraps a downloader

View File

@@ -155,9 +155,7 @@ func (c *AggregationController) UpdateAPIService(handler http.Handler, apiServic
// RemoveAPIService removes API Service from OpenAPI Aggregation Controller. // RemoveAPIService removes API Service from OpenAPI Aggregation Controller.
func (c *AggregationController) RemoveAPIService(apiServiceName string) { func (c *AggregationController) RemoveAPIService(apiServiceName string) {
if err := c.openAPIAggregationManager.RemoveAPIService(apiServiceName); err != nil { c.openAPIAggregationManager.RemoveAPIService(apiServiceName)
utilruntime.HandleError(fmt.Errorf("removing %q from AggregationController failed with: %v", apiServiceName, err))
}
// This will only remove it if it was failing before. If it was successful, processNextWorkItem will figure it out // This will only remove it if it was failing before. If it was successful, processNextWorkItem will figure it out
// and will not add it again to the queue. // and will not add it again to the queue.
c.queue.Forget(apiServiceName) c.queue.Forget(apiServiceName)

View File

@@ -19,6 +19,7 @@ package aggregator
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"net/http" "net/http"
"strings" "strings"
@@ -41,9 +42,12 @@ import (
v2aggregator "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator" v2aggregator "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator"
) )
var ErrAPIServiceNotFound = errors.New("resource not found")
// SpecProxier proxies OpenAPI V3 requests to their respective APIService // SpecProxier proxies OpenAPI V3 requests to their respective APIService
type SpecProxier interface { type SpecProxier interface {
AddUpdateAPIService(handler http.Handler, apiService *v1.APIService) AddUpdateAPIService(handler http.Handler, apiService *v1.APIService)
// UpdateAPIServiceSpec updates the APIService. It returns ErrAPIServiceNotFound if the APIService doesn't exist.
UpdateAPIServiceSpec(apiServiceName string) error UpdateAPIServiceSpec(apiServiceName string) error
RemoveAPIServiceSpec(apiServiceName string) RemoveAPIServiceSpec(apiServiceName string)
GetAPIServiceNames() []string GetAPIServiceNames() []string
@@ -161,7 +165,7 @@ func (s *specProxier) UpdateAPIServiceSpec(apiServiceName string) error {
func (s *specProxier) updateAPIServiceSpecLocked(apiServiceName string) error { func (s *specProxier) updateAPIServiceSpecLocked(apiServiceName string) error {
apiService, exists := s.apiServiceInfo[apiServiceName] apiService, exists := s.apiServiceInfo[apiServiceName]
if !exists { if !exists {
return fmt.Errorf("APIService %s does not exist for update", apiServiceName) return ErrAPIServiceNotFound
} }
if !apiService.isLegacyAPIService { if !apiService.isLegacyAPIService {
@@ -225,6 +229,7 @@ func (s *specProxier) RemoveAPIServiceSpec(apiServiceName string) {
defer s.rwMutex.Unlock() defer s.rwMutex.Unlock()
if apiServiceInfo, ok := s.apiServiceInfo[apiServiceName]; ok { if apiServiceInfo, ok := s.apiServiceInfo[apiServiceName]; ok {
s.openAPIV2ConverterHandler.DeleteGroupVersion(getGroupVersionStringFromAPIService(apiServiceInfo.apiService)) s.openAPIV2ConverterHandler.DeleteGroupVersion(getGroupVersionStringFromAPIService(apiServiceInfo.apiService))
_ = s.updateAPIServiceSpecLocked(openAPIV2Converter)
delete(s.apiServiceInfo, apiServiceName) delete(s.apiServiceInfo, apiServiceName)
} }
} }

View File

@@ -131,6 +131,19 @@ func TestV2APIService(t *testing.T) {
apiServiceNames := specProxier.GetAPIServiceNames() apiServiceNames := specProxier.GetAPIServiceNames()
assert.ElementsMatch(t, []string{openAPIV2Converter, apiService.Name}, apiServiceNames) assert.ElementsMatch(t, []string{openAPIV2Converter, apiService.Name}, apiServiceNames)
// Ensure that OpenAPI v3 for legacy APIService is removed.
specProxier.RemoveAPIServiceSpec(apiService.Name)
data = sendReq(t, serveHandler, "/openapi/v3")
groupVersionList = handler3.OpenAPIV3Discovery{}
if err := json.Unmarshal(data, &groupVersionList); err != nil {
t.Fatal(err)
}
path, ok = groupVersionList.Paths["apis/group.example.com/v1"]
if ok {
t.Error("Expected group.example.com/v1 not to be in group version list")
}
} }
func TestV3APIService(t *testing.T) { func TestV3APIService(t *testing.T) {

View File

@@ -133,9 +133,10 @@ func (c *AggregationController) processNextWorkItem() bool {
} }
func (c *AggregationController) sync(key string) (syncAction, error) { func (c *AggregationController) sync(key string) (syncAction, error) {
err := c.openAPIAggregationManager.UpdateAPIServiceSpec(key) if err := c.openAPIAggregationManager.UpdateAPIServiceSpec(key); err != nil {
switch { if err == aggregator.ErrAPIServiceNotFound {
case err != nil: return syncNothing, nil
}
return syncRequeueRateLimited, err return syncRequeueRateLimited, err
} }
return syncRequeue, nil return syncRequeue, nil

View File

@@ -19,6 +19,7 @@ package apimachinery
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"reflect" "reflect"
"strings" "strings"
"time" "time"
@@ -28,8 +29,10 @@ import (
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apiextensionclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apiextensions-apiserver/test/integration/fixtures" "k8s.io/apiextensions-apiserver/test/integration/fixtures"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
"k8s.io/client-go/openapi3" "k8s.io/client-go/openapi3"
aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset" aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
@@ -97,12 +100,34 @@ var _ = SIGDescribe("OpenAPIV3", func() {
framework.ExpectNoError(err) framework.ExpectNoError(err)
dynamicClient, err := dynamic.NewForConfig(config) dynamicClient, err := dynamic.NewForConfig(config)
framework.ExpectNoError(err) framework.ExpectNoError(err)
crd := fixtures.NewRandomNameV1CustomResourceDefinition(apiextensionsv1.NamespaceScoped) resourceName := "testcrd"
// Generate a CRD with random group name to avoid group conflict with other tests that run in parallel.
groupName := fmt.Sprintf("%s.example.com", names.SimpleNameGenerator.GenerateName("group"))
crd := &apiextensionsv1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("%ss.%s", resourceName, groupName)},
Spec: apiextensionsv1.CustomResourceDefinitionSpec{
Group: groupName,
Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
{
Name: "v1beta1",
Served: true,
Storage: true,
Schema: fixtures.AllowAllSchema(),
},
},
Names: apiextensionsv1.CustomResourceDefinitionNames{
Plural: resourceName + "s",
Singular: resourceName,
Kind: resourceName,
ListKind: resourceName + "List",
},
Scope: apiextensionsv1.NamespaceScoped,
},
}
gv := schema.GroupVersion{Group: crd.Spec.Group, Version: crd.Spec.Versions[0].Name} gv := schema.GroupVersion{Group: crd.Spec.Group, Version: crd.Spec.Versions[0].Name}
_, err = fixtures.CreateNewV1CustomResourceDefinition(crd, apiExtensionClient, dynamicClient) _, err = fixtures.CreateNewV1CustomResourceDefinition(crd, apiExtensionClient, dynamicClient)
defer func() { defer func() {
err = fixtures.DeleteV1CustomResourceDefinition(crd, apiExtensionClient) _ = fixtures.DeleteV1CustomResourceDefinition(crd, apiExtensionClient)
framework.ExpectNoError(err, "deleting CustomResourceDefinition")
}() }()
framework.ExpectNoError(err) framework.ExpectNoError(err)
@@ -126,6 +151,19 @@ var _ = SIGDescribe("OpenAPIV3", func() {
diff := cmp.Diff(*openAPISpec, spec2) diff := cmp.Diff(*openAPISpec, spec2)
framework.Failf("%s", diff) framework.Failf("%s", diff)
} }
err = fixtures.DeleteV1CustomResourceDefinition(crd, apiExtensionClient)
framework.ExpectNoError(err, "deleting CustomResourceDefinition")
// Poll for the OpenAPI to be updated with the deleted CRD
err = wait.PollUntilContextTimeout(ctx, time.Second*1, wait.ForeverTestTimeout, true, func(_ context.Context) (bool, error) {
_, err = c.GVSpec(gv)
if err == nil {
return false, nil
}
_, isNotFound := err.(*openapi3.GroupVersionNotFoundError)
return isNotFound, nil
})
framework.ExpectNoError(err, "should not contain OpenAPI V3 for deleted CustomResourceDefinition")
}) })
/* /*
@@ -163,5 +201,17 @@ var _ = SIGDescribe("OpenAPIV3", func() {
diff := cmp.Diff(*openAPISpec, spec2) diff := cmp.Diff(*openAPISpec, spec2)
framework.Failf("%s", diff) framework.Failf("%s", diff)
} }
cleanupSampleAPIServer(ctx, f.ClientSet, aggrclient, names, "v1beta1.wardle.example.com")
// Poll for the OpenAPI to be updated with the deleted aggregated apiserver.
err = wait.PollUntilContextTimeout(ctx, time.Second*1, wait.ForeverTestTimeout, true, func(_ context.Context) (bool, error) {
_, err = c.GVSpec(gv)
if err == nil {
return false, nil
}
_, isNotFound := err.(*openapi3.GroupVersionNotFoundError)
return isNotFound, nil
})
framework.ExpectNoError(err, "should not contain OpenAPI V3 for deleted APIService")
}) })
}) })