Merge pull request #120108 from tnqn/fix-appgregator

Fix OpenAPI aggregation cleanup
This commit is contained in:
Kubernetes Prow Robot
2023-08-31 10:42:47 -07:00
committed by GitHub
7 changed files with 82 additions and 15 deletions

View File

@@ -564,7 +564,7 @@ func (s *APIAggregator) RemoveAPIService(apiServiceName string) {
s.openAPIAggregationController.RemoveAPIService(apiServiceName) s.openAPIAggregationController.RemoveAPIService(apiServiceName)
} }
if s.openAPIV3AggregationController != nil { if s.openAPIV3AggregationController != nil {
s.openAPIAggregationController.RemoveAPIService(apiServiceName) s.openAPIV3AggregationController.RemoveAPIService(apiServiceName)
} }
delete(s.proxyHandlers, apiServiceName) delete(s.proxyHandlers, apiServiceName)

View File

@@ -44,8 +44,9 @@ var ErrAPIServiceNotFound = errors.New("resource not found")
// known specs including the http etag. // known specs including the http etag.
type SpecAggregator interface { type SpecAggregator interface {
AddUpdateAPIService(apiService *v1.APIService, handler http.Handler) error AddUpdateAPIService(apiService *v1.APIService, handler http.Handler) error
// UpdateAPIServiceSpec updates the APIService. It returns ErrAPIServiceNotFound if the APIService doesn't exist.
UpdateAPIServiceSpec(apiServiceName string) error UpdateAPIServiceSpec(apiServiceName string) error
RemoveAPIService(apiServiceName string) error RemoveAPIService(apiServiceName string)
} }
const ( const (
@@ -231,17 +232,16 @@ func (s *specAggregator) AddUpdateAPIService(apiService *v1.APIService, handler
// RemoveAPIService removes an api service from OpenAPI aggregation. If it does not exist, no error is returned. // RemoveAPIService removes an api service from OpenAPI aggregation. If it does not exist, no error is returned.
// It is thread safe. // It is thread safe.
func (s *specAggregator) RemoveAPIService(apiServiceName string) error { func (s *specAggregator) RemoveAPIService(apiServiceName string) {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
if _, exists := s.specsByAPIServiceName[apiServiceName]; !exists { if _, exists := s.specsByAPIServiceName[apiServiceName]; !exists {
return ErrAPIServiceNotFound return
} }
delete(s.specsByAPIServiceName, apiServiceName) delete(s.specsByAPIServiceName, apiServiceName)
// Re-create the mergeSpec for the new list of apiservices // Re-create the mergeSpec for the new list of apiservices
s.openAPIVersionedService.UpdateSpecLazy(s.buildMergeSpecLocked()) s.openAPIVersionedService.UpdateSpecLazy(s.buildMergeSpecLocked())
return nil
} }
// decorateError creates a new cache that wraps a downloader // decorateError creates a new cache that wraps a downloader

View File

@@ -155,9 +155,7 @@ func (c *AggregationController) UpdateAPIService(handler http.Handler, apiServic
// RemoveAPIService removes API Service from OpenAPI Aggregation Controller. // RemoveAPIService removes API Service from OpenAPI Aggregation Controller.
func (c *AggregationController) RemoveAPIService(apiServiceName string) { func (c *AggregationController) RemoveAPIService(apiServiceName string) {
if err := c.openAPIAggregationManager.RemoveAPIService(apiServiceName); err != nil { c.openAPIAggregationManager.RemoveAPIService(apiServiceName)
utilruntime.HandleError(fmt.Errorf("removing %q from AggregationController failed with: %v", apiServiceName, err))
}
// This will only remove it if it was failing before. If it was successful, processNextWorkItem will figure it out // This will only remove it if it was failing before. If it was successful, processNextWorkItem will figure it out
// and will not add it again to the queue. // and will not add it again to the queue.
c.queue.Forget(apiServiceName) c.queue.Forget(apiServiceName)

View File

@@ -19,6 +19,7 @@ package aggregator
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"net/http" "net/http"
"strings" "strings"
@@ -41,9 +42,12 @@ import (
v2aggregator "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator" v2aggregator "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator"
) )
var ErrAPIServiceNotFound = errors.New("resource not found")
// SpecProxier proxies OpenAPI V3 requests to their respective APIService // SpecProxier proxies OpenAPI V3 requests to their respective APIService
type SpecProxier interface { type SpecProxier interface {
AddUpdateAPIService(handler http.Handler, apiService *v1.APIService) AddUpdateAPIService(handler http.Handler, apiService *v1.APIService)
// UpdateAPIServiceSpec updates the APIService. It returns ErrAPIServiceNotFound if the APIService doesn't exist.
UpdateAPIServiceSpec(apiServiceName string) error UpdateAPIServiceSpec(apiServiceName string) error
RemoveAPIServiceSpec(apiServiceName string) RemoveAPIServiceSpec(apiServiceName string)
GetAPIServiceNames() []string GetAPIServiceNames() []string
@@ -161,7 +165,7 @@ func (s *specProxier) UpdateAPIServiceSpec(apiServiceName string) error {
func (s *specProxier) updateAPIServiceSpecLocked(apiServiceName string) error { func (s *specProxier) updateAPIServiceSpecLocked(apiServiceName string) error {
apiService, exists := s.apiServiceInfo[apiServiceName] apiService, exists := s.apiServiceInfo[apiServiceName]
if !exists { if !exists {
return fmt.Errorf("APIService %s does not exist for update", apiServiceName) return ErrAPIServiceNotFound
} }
if !apiService.isLegacyAPIService { if !apiService.isLegacyAPIService {
@@ -225,6 +229,7 @@ func (s *specProxier) RemoveAPIServiceSpec(apiServiceName string) {
defer s.rwMutex.Unlock() defer s.rwMutex.Unlock()
if apiServiceInfo, ok := s.apiServiceInfo[apiServiceName]; ok { if apiServiceInfo, ok := s.apiServiceInfo[apiServiceName]; ok {
s.openAPIV2ConverterHandler.DeleteGroupVersion(getGroupVersionStringFromAPIService(apiServiceInfo.apiService)) s.openAPIV2ConverterHandler.DeleteGroupVersion(getGroupVersionStringFromAPIService(apiServiceInfo.apiService))
_ = s.updateAPIServiceSpecLocked(openAPIV2Converter)
delete(s.apiServiceInfo, apiServiceName) delete(s.apiServiceInfo, apiServiceName)
} }
} }

View File

@@ -131,6 +131,19 @@ func TestV2APIService(t *testing.T) {
apiServiceNames := specProxier.GetAPIServiceNames() apiServiceNames := specProxier.GetAPIServiceNames()
assert.ElementsMatch(t, []string{openAPIV2Converter, apiService.Name}, apiServiceNames) assert.ElementsMatch(t, []string{openAPIV2Converter, apiService.Name}, apiServiceNames)
// Ensure that OpenAPI v3 for legacy APIService is removed.
specProxier.RemoveAPIServiceSpec(apiService.Name)
data = sendReq(t, serveHandler, "/openapi/v3")
groupVersionList = handler3.OpenAPIV3Discovery{}
if err := json.Unmarshal(data, &groupVersionList); err != nil {
t.Fatal(err)
}
path, ok = groupVersionList.Paths["apis/group.example.com/v1"]
if ok {
t.Error("Expected group.example.com/v1 not to be in group version list")
}
} }
func TestV3APIService(t *testing.T) { func TestV3APIService(t *testing.T) {

View File

@@ -133,9 +133,10 @@ func (c *AggregationController) processNextWorkItem() bool {
} }
func (c *AggregationController) sync(key string) (syncAction, error) { func (c *AggregationController) sync(key string) (syncAction, error) {
err := c.openAPIAggregationManager.UpdateAPIServiceSpec(key) if err := c.openAPIAggregationManager.UpdateAPIServiceSpec(key); err != nil {
switch { if err == aggregator.ErrAPIServiceNotFound {
case err != nil: return syncNothing, nil
}
return syncRequeueRateLimited, err return syncRequeueRateLimited, err
} }
return syncRequeue, nil return syncRequeue, nil

View File

@@ -19,6 +19,7 @@ package apimachinery
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"reflect" "reflect"
"strings" "strings"
"time" "time"
@@ -28,8 +29,10 @@ import (
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apiextensionclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apiextensions-apiserver/test/integration/fixtures" "k8s.io/apiextensions-apiserver/test/integration/fixtures"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
"k8s.io/client-go/openapi3" "k8s.io/client-go/openapi3"
aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset" aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
@@ -97,12 +100,34 @@ var _ = SIGDescribe("OpenAPIV3", func() {
framework.ExpectNoError(err) framework.ExpectNoError(err)
dynamicClient, err := dynamic.NewForConfig(config) dynamicClient, err := dynamic.NewForConfig(config)
framework.ExpectNoError(err) framework.ExpectNoError(err)
crd := fixtures.NewRandomNameV1CustomResourceDefinition(apiextensionsv1.NamespaceScoped) resourceName := "testcrd"
// Generate a CRD with random group name to avoid group conflict with other tests that run in parallel.
groupName := fmt.Sprintf("%s.example.com", names.SimpleNameGenerator.GenerateName("group"))
crd := &apiextensionsv1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("%ss.%s", resourceName, groupName)},
Spec: apiextensionsv1.CustomResourceDefinitionSpec{
Group: groupName,
Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
{
Name: "v1beta1",
Served: true,
Storage: true,
Schema: fixtures.AllowAllSchema(),
},
},
Names: apiextensionsv1.CustomResourceDefinitionNames{
Plural: resourceName + "s",
Singular: resourceName,
Kind: resourceName,
ListKind: resourceName + "List",
},
Scope: apiextensionsv1.NamespaceScoped,
},
}
gv := schema.GroupVersion{Group: crd.Spec.Group, Version: crd.Spec.Versions[0].Name} gv := schema.GroupVersion{Group: crd.Spec.Group, Version: crd.Spec.Versions[0].Name}
_, err = fixtures.CreateNewV1CustomResourceDefinition(crd, apiExtensionClient, dynamicClient) _, err = fixtures.CreateNewV1CustomResourceDefinition(crd, apiExtensionClient, dynamicClient)
defer func() { defer func() {
err = fixtures.DeleteV1CustomResourceDefinition(crd, apiExtensionClient) _ = fixtures.DeleteV1CustomResourceDefinition(crd, apiExtensionClient)
framework.ExpectNoError(err, "deleting CustomResourceDefinition")
}() }()
framework.ExpectNoError(err) framework.ExpectNoError(err)
@@ -126,6 +151,19 @@ var _ = SIGDescribe("OpenAPIV3", func() {
diff := cmp.Diff(*openAPISpec, spec2) diff := cmp.Diff(*openAPISpec, spec2)
framework.Failf("%s", diff) framework.Failf("%s", diff)
} }
err = fixtures.DeleteV1CustomResourceDefinition(crd, apiExtensionClient)
framework.ExpectNoError(err, "deleting CustomResourceDefinition")
// Poll for the OpenAPI to be updated with the deleted CRD
err = wait.PollUntilContextTimeout(ctx, time.Second*1, wait.ForeverTestTimeout, true, func(_ context.Context) (bool, error) {
_, err = c.GVSpec(gv)
if err == nil {
return false, nil
}
_, isNotFound := err.(*openapi3.GroupVersionNotFoundError)
return isNotFound, nil
})
framework.ExpectNoError(err, "should not contain OpenAPI V3 for deleted CustomResourceDefinition")
}) })
/* /*
@@ -163,5 +201,17 @@ var _ = SIGDescribe("OpenAPIV3", func() {
diff := cmp.Diff(*openAPISpec, spec2) diff := cmp.Diff(*openAPISpec, spec2)
framework.Failf("%s", diff) framework.Failf("%s", diff)
} }
cleanupSampleAPIServer(ctx, f.ClientSet, aggrclient, names, "v1beta1.wardle.example.com")
// Poll for the OpenAPI to be updated with the deleted aggregated apiserver.
err = wait.PollUntilContextTimeout(ctx, time.Second*1, wait.ForeverTestTimeout, true, func(_ context.Context) (bool, error) {
_, err = c.GVSpec(gv)
if err == nil {
return false, nil
}
_, isNotFound := err.(*openapi3.GroupVersionNotFoundError)
return isNotFound, nil
})
framework.ExpectNoError(err, "should not contain OpenAPI V3 for deleted APIService")
}) })
}) })