From 3222a7033cf9128b76c0677887f4e383821d0475 Mon Sep 17 00:00:00 2001 From: Haowei Cai Date: Thu, 15 Nov 2018 11:02:11 -0800 Subject: [PATCH] Apiextensions-apiserver aggregates CRD schemas efficiently without checking conflicts, and wire up CRD discovery controller to serve OpenAPI spec. --- .../pkg/apiserver/apiserver.go | 9 +- .../customresource_discovery_controller.go | 54 +++- .../pkg/openapi/aggregator.go | 232 ++++++++++++++++++ .../apiserver/pkg/server/genericapiserver.go | 11 +- .../apiserver/pkg/server/routes/openapi.go | 20 +- 5 files changed, 318 insertions(+), 8 deletions(-) create mode 100644 staging/src/k8s.io/apiextensions-apiserver/pkg/openapi/aggregator.go diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go index b6830d762c8..30ec82074ee 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go @@ -41,6 +41,7 @@ import ( "k8s.io/apiextensions-apiserver/pkg/controller/establish" "k8s.io/apiextensions-apiserver/pkg/controller/finalizer" "k8s.io/apiextensions-apiserver/pkg/controller/status" + openapiaggregator "k8s.io/apiextensions-apiserver/pkg/openapi" "k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition" _ "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" @@ -204,7 +205,13 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) return nil }) s.GenericAPIServer.AddPostStartHook("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error { - go crdController.Run(context.StopCh) + // create OpenAPI aggregation manager in the last step because only now genericapiserver's the OpenAPI services and spec is available (after PrepareRun). + crdOpenAPIAggregationManager, err := openapiaggregator.NewAggregationManager(s.GenericAPIServer.OpenAPIService, s.GenericAPIServer.OpenAPIVersionedService, s.GenericAPIServer.StaticOpenAPISpec) + if err != nil { + return err + } + + go crdController.Run(context.StopCh, crdOpenAPIAggregationManager) go namingController.Run(context.StopCh) go establishingController.Run(context.StopCh) go finalizingController.Run(5, context.StopCh) diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_discovery_controller.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_discovery_controller.go index 3b13e8357db..da54fca6af5 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_discovery_controller.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_discovery_controller.go @@ -31,12 +31,16 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/version" "k8s.io/apiserver/pkg/endpoints/discovery" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions" + "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" informers "k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion/apiextensions/internalversion" listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/internalversion" + apiextensionsfeatures "k8s.io/apiextensions-apiserver/pkg/features" + apiextensionsopenapi "k8s.io/apiextensions-apiserver/pkg/openapi" ) type DiscoveryController struct { @@ -50,6 +54,8 @@ type DiscoveryController struct { syncFn func(version schema.GroupVersion) error queue workqueue.RateLimitingInterface + + openAPIAggregationManager apiextensionsopenapi.AggregationManager } func NewDiscoveryController(crdInformer informers.CustomResourceDefinitionInformer, versionHandler *versionDiscoveryHandler, groupHandler *groupDiscoveryHandler) *DiscoveryController { @@ -83,6 +89,7 @@ func (c *DiscoveryController) sync(version schema.GroupVersion) error { if err != nil { return err } + apiServiceName := version.Group + "." + version.Version foundVersion := false foundGroup := false for _, crd := range crds { @@ -119,6 +126,33 @@ func (c *DiscoveryController) sync(version schema.GroupVersion) error { continue } foundVersion = true + if c.openAPIAggregationManager != nil && utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourceValidation) { + validationSchema, err := getSchemaForVersion(crd, version.Version) + if err != nil { + return err + } + // Convert internal CustomResourceValidation to versioned CustomResourceValidation + versionedSchema := new(v1beta1.CustomResourceValidation) + if validationSchema == nil { + versionedSchema = nil + } else { + if err := v1beta1.Convert_apiextensions_CustomResourceValidation_To_v1beta1_CustomResourceValidation(validationSchema, versionedSchema, nil); err != nil { + return err + } + } + // We aggregate the schema even if it's nil as it maybe a removal of the schema for this CRD, + // and the aggreated OpenAPI spec should reflect this change. + crdspec, etag, err := apiextensionsopenapi.CustomResourceDefinitionOpenAPISpec(&crd.Spec, version.Version, versionedSchema) + if err != nil { + return err + } + + // Add/update the local API service's spec for the CRD in apiExtensionsServer's + // openAPIAggregationManager + if err := c.openAPIAggregationManager.AddUpdateLocalAPIServiceSpec(apiServiceName, crdspec, etag); err != nil { + return err + } + } verbs := metav1.Verbs([]string{"delete", "deletecollection", "get", "list", "patch", "create", "update", "watch"}) // if we're terminating we don't allow some verbs @@ -164,6 +198,14 @@ func (c *DiscoveryController) sync(version schema.GroupVersion) error { if !foundGroup { c.groupHandler.unsetDiscovery(version.Group) c.versionHandler.unsetDiscovery(version) + if c.openAPIAggregationManager != nil && utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourceValidation) { + // Remove the local API service for the CRD in apiExtensionsServer's + // openAPIAggregationManager. + // Note that we don't check if apiServiceName exists in openAPIAggregationManager + // because RemoveAPIServiceSpec properly handles non-existing API service by + // returning no error. + return c.openAPIAggregationManager.RemoveAPIServiceSpec(apiServiceName) + } return nil } @@ -180,6 +222,14 @@ func (c *DiscoveryController) sync(version schema.GroupVersion) error { if !foundVersion { c.versionHandler.unsetDiscovery(version) + if c.openAPIAggregationManager != nil && utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourceValidation) { + // Remove the local API service for the CRD in apiExtensionsServer's + // openAPIAggregationManager. + // Note that we don't check if apiServiceName exists in openAPIAggregationManager + // because RemoveAPIServiceSpec properly handles non-existing API service by + // returning no error. + return c.openAPIAggregationManager.RemoveAPIServiceSpec(apiServiceName) + } return nil } c.versionHandler.setDiscovery(version, discovery.NewAPIVersionHandler(Codecs, version, discovery.APIResourceListerFunc(func() []metav1.APIResource { @@ -195,13 +245,15 @@ func sortGroupDiscoveryByKubeAwareVersion(gd []metav1.GroupVersionForDiscovery) }) } -func (c *DiscoveryController) Run(stopCh <-chan struct{}) { +func (c *DiscoveryController) Run(stopCh <-chan struct{}, crdOpenAPIAggregationManager apiextensionsopenapi.AggregationManager) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() defer klog.Infof("Shutting down DiscoveryController") klog.Infof("Starting DiscoveryController") + c.openAPIAggregationManager = crdOpenAPIAggregationManager + if !cache.WaitForCacheSync(stopCh, c.crdsSynced) { utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) return diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/openapi/aggregator.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/openapi/aggregator.go new file mode 100644 index 00000000000..90bd042f097 --- /dev/null +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/openapi/aggregator.go @@ -0,0 +1,232 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package openapi + +import ( + "fmt" + "sync" + + "github.com/go-openapi/spec" + + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/kube-openapi/pkg/aggregator" + "k8s.io/kube-openapi/pkg/handler" +) + +// AggregationManager is the interface between OpenAPI Aggregator service and a controller +// that manages CRD openapi spec aggregation +type AggregationManager interface { + // AddUpdateLocalAPIService allows adding/updating local API service with nil handler and + // nil Spec.Service. This function can be used for local dynamic OpenAPI spec aggregation + // management (e.g. CRD) + AddUpdateLocalAPIServiceSpec(name string, spec *spec.Swagger, etag string) error + RemoveAPIServiceSpec(apiServiceName string) error +} + +type specAggregator struct { + // mutex protects all members of this struct. + rwMutex sync.RWMutex + + // Map of API Services' OpenAPI specs by their name + openAPISpecs map[string]*openAPISpecInfo + + // provided for dynamic OpenAPI spec + openAPIService *handler.OpenAPIService + openAPIVersionedService *handler.OpenAPIService +} + +var _ AggregationManager = &specAggregator{} + +// NewAggregationManager constructs a specAggregator from input openAPIService, openAPIVersionedService and +// recorded static OpenAPI spec. The function returns an AggregationManager interface. +func NewAggregationManager(openAPIService, openAPIVersionedService *handler.OpenAPIService, staticSpec *spec.Swagger) (AggregationManager, error) { + // openAPIVersionedService and deprecated openAPIService should be initialized together + if (openAPIService == nil) != (openAPIVersionedService == nil) { + return nil, fmt.Errorf("unexpected openapi service initialization error") + } + return &specAggregator{ + openAPISpecs: map[string]*openAPISpecInfo{ + "initial_static_spec": { + spec: staticSpec, + }, + }, + openAPIService: openAPIService, + openAPIVersionedService: openAPIVersionedService, + }, nil +} + +// openAPISpecInfo is used to store OpenAPI spec with its priority. +// It can be used to sort specs with their priorities. +type openAPISpecInfo struct { + // Name of a registered ApiService + name string + + // Specification of this API Service. If null then the spec is not loaded yet. + spec *spec.Swagger + etag string +} + +// buildOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe. The caller is responsible to hold proper locks. +func (s *specAggregator) buildOpenAPISpec() (specToReturn *spec.Swagger, err error) { + specs := []openAPISpecInfo{} + for _, specInfo := range s.openAPISpecs { + if specInfo.spec == nil { + continue + } + specs = append(specs, *specInfo) + } + if len(specs) == 0 { + return &spec.Swagger{}, nil + } + for _, specInfo := range specs { + if specToReturn == nil { + specToReturn, err = aggregator.CloneSpec(specInfo.spec) + if err != nil { + return nil, err + } + continue + } + mergeSpecs(specToReturn, specInfo.spec) + } + // Add minimum required keys if missing, to properly serve the OpenAPI spec + // through apiextensions-apiserver HTTP handler. These keys will not be + // aggregated to top-level OpenAPI spec (only paths and definitions will). + // However these keys make the OpenAPI->proto serialization happy. + if specToReturn.Info == nil { + specToReturn.Info = &spec.Info{ + InfoProps: spec.InfoProps{ + Title: "Kubernetes", + }, + } + } + if len(specToReturn.Swagger) == 0 { + specToReturn.Swagger = "2.0" + } + return specToReturn, nil +} + +// updateOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe. The caller is responsible to hold proper locks. +func (s *specAggregator) updateOpenAPISpec() error { + if s.openAPIService == nil || s.openAPIVersionedService == nil { + // openAPIVersionedService and deprecated openAPIService should be initialized together + if !(s.openAPIService == nil && s.openAPIVersionedService == nil) { + return fmt.Errorf("unexpected openapi service initialization error") + } + return nil + } + specToServe, err := s.buildOpenAPISpec() + if err != nil { + return err + } + // openAPIService.UpdateSpec and openAPIVersionedService.UpdateSpec read the same swagger spec + // serially and update their local caches separately. Both endpoints will have same spec in + // their caches if the caller is holding proper locks. + err = s.openAPIService.UpdateSpec(specToServe) + if err != nil { + return err + } + return s.openAPIVersionedService.UpdateSpec(specToServe) +} + +// tryUpdatingServiceSpecs tries updating openAPISpecs map with specified specInfo, and keeps the map intact +// if the update fails. +func (s *specAggregator) tryUpdatingServiceSpecs(specInfo *openAPISpecInfo) error { + orgSpecInfo, exists := s.openAPISpecs[specInfo.name] + s.openAPISpecs[specInfo.name] = specInfo + if err := s.updateOpenAPISpec(); err != nil { + if exists { + s.openAPISpecs[specInfo.name] = orgSpecInfo + } else { + delete(s.openAPISpecs, specInfo.name) + } + return err + } + return nil +} + +// tryDeleteServiceSpecs tries delete specified specInfo from openAPISpecs map, and keeps the map intact +// if the update fails. +func (s *specAggregator) tryDeleteServiceSpecs(apiServiceName string) error { + orgSpecInfo, exists := s.openAPISpecs[apiServiceName] + if !exists { + return nil + } + delete(s.openAPISpecs, apiServiceName) + if err := s.updateOpenAPISpec(); err != nil { + s.openAPISpecs[apiServiceName] = orgSpecInfo + return err + } + return nil +} + +// AddUpdateLocalAPIService allows adding/updating local API service with nil handler and +// nil Spec.Service. This function can be used for local dynamic OpenAPI spec aggregation +// management (e.g. CRD) +func (s *specAggregator) AddUpdateLocalAPIServiceSpec(name string, spec *spec.Swagger, etag string) error { + s.rwMutex.Lock() + defer s.rwMutex.Unlock() + + return s.tryUpdatingServiceSpecs(&openAPISpecInfo{ + name: name, + spec: spec, + etag: etag, + }) +} + +// RemoveAPIServiceSpec removes an api service from OpenAPI aggregation. If it does not exist, no error is returned. +// It is thread safe. +func (s *specAggregator) RemoveAPIServiceSpec(apiServiceName string) error { + s.rwMutex.Lock() + defer s.rwMutex.Unlock() + + if _, existingService := s.openAPISpecs[apiServiceName]; !existingService { + return nil + } + + return s.tryDeleteServiceSpecs(apiServiceName) +} + +// mergeSpecs simply adds source openapi spec to dest and ignores any path/definition +// conflicts because CRD openapi spec should not have conflict +func mergeSpecs(dest, source *spec.Swagger) { + // Paths may be empty, due to [ACL constraints](http://goo.gl/8us55a#securityFiltering). + if source.Paths == nil { + // If Path is nil, none of the model defined in Definitions is used and we + // should not do anything. + // NOTE: this should not happen for CRD specs, because we automatically construct + // the Paths for CRD specs. We use utilruntime.HandleError to log this impossible + // case + utilruntime.HandleError(fmt.Errorf("unexpected CRD spec with empty Path: %v", *source)) + return + } + if dest.Paths == nil { + dest.Paths = &spec.Paths{} + } + for k, v := range source.Definitions { + if dest.Definitions == nil { + dest.Definitions = spec.Definitions{} + } + dest.Definitions[k] = v + } + for k, v := range source.Paths.Paths { + // PathItem may be empty, due to [ACL constraints](http://goo.gl/8us55a#securityFiltering). + if dest.Paths.Paths == nil { + dest.Paths.Paths = map[string]spec.PathItem{} + } + dest.Paths.Paths[k] = v + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 4efc82427e2..fac46af815e 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -25,7 +25,8 @@ import ( "time" systemd "github.com/coreos/go-systemd/daemon" - "github.com/emicklei/go-restful-swagger12" + swagger "github.com/emicklei/go-restful-swagger12" + "github.com/go-openapi/spec" "k8s.io/klog" "k8s.io/apimachinery/pkg/api/meta" @@ -47,6 +48,7 @@ import ( restclient "k8s.io/client-go/rest" openapibuilder "k8s.io/kube-openapi/pkg/builder" openapicommon "k8s.io/kube-openapi/pkg/common" + "k8s.io/kube-openapi/pkg/handler" openapiutil "k8s.io/kube-openapi/pkg/util" openapiproto "k8s.io/kube-openapi/pkg/util/proto" ) @@ -124,6 +126,11 @@ type GenericAPIServer struct { swaggerConfig *swagger.Config openAPIConfig *openapicommon.Config + // Expose the registered OpenAPI Services and built static OpenAPI spec if openAPIConfig is non-nil + OpenAPIService *handler.OpenAPIService // for endpoint /swagger.json + OpenAPIVersionedService *handler.OpenAPIService // for endpoint /openapi/v2 + StaticOpenAPISpec *spec.Swagger + // PostStartHooks are each called after the server has started listening, in a separate go func for each // with no guarantee of ordering between them. The map key is a name used for error reporting. // It may kill the process with a panic if it wishes to by returning an error. @@ -240,7 +247,7 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer { routes.Swagger{Config: s.swaggerConfig}.Install(s.Handler.GoRestfulContainer) } if s.openAPIConfig != nil { - routes.OpenAPI{ + s.OpenAPIService, s.OpenAPIVersionedService, s.StaticOpenAPISpec = routes.OpenAPI{ Config: s.openAPIConfig, }.Install(s.Handler.GoRestfulContainer, s.Handler.NonGoRestfulMux) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/routes/openapi.go b/staging/src/k8s.io/apiserver/pkg/server/routes/openapi.go index 934bbf84a04..d080e471061 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/routes/openapi.go +++ b/staging/src/k8s.io/apiserver/pkg/server/routes/openapi.go @@ -18,9 +18,11 @@ package routes import ( restful "github.com/emicklei/go-restful" + "github.com/go-openapi/spec" "k8s.io/klog" "k8s.io/apiserver/pkg/server/mux" + "k8s.io/kube-openapi/pkg/builder" "k8s.io/kube-openapi/pkg/common" "k8s.io/kube-openapi/pkg/handler" ) @@ -30,17 +32,27 @@ type OpenAPI struct { Config *common.Config } -// Install adds the SwaggerUI webservice to the given mux. -func (oa OpenAPI) Install(c *restful.Container, mux *mux.PathRecorderMux) { +// Install adds the SwaggerUI webservice to the given mux. This function returns +// the built static OpenAPI spec and the registered OpenAPI services to allow +// further OpenAPI spec aggregation. +func (oa OpenAPI) Install(c *restful.Container, mux *mux.PathRecorderMux) (openAPIService, openAPIVersionedService *handler.OpenAPIService, spec *spec.Swagger) { + var err error + // Record the static OpenAPI spec to allow further OpenAPI spec aggregation + // with this static spec on the registered OpenAPI services + spec, err = builder.BuildOpenAPISpec(c.RegisteredWebServices(), oa.Config) + if err != nil { + klog.Fatalf("Failed to build open api spec for root: %v", err) + } // NOTE: [DEPRECATION] We will announce deprecation for format-separated endpoints for OpenAPI spec, // and switch to a single /openapi/v2 endpoint in Kubernetes 1.10. The design doc and deprecation process // are tracked at: https://docs.google.com/document/d/19lEqE9lc4yHJ3WJAJxS_G7TcORIJXGHyq3wpwcH28nU. - _, err := handler.BuildAndRegisterOpenAPIService("/swagger.json", c.RegisteredWebServices(), oa.Config, mux) + openAPIService, err = handler.RegisterOpenAPIService(spec, "/swagger.json", mux) if err != nil { klog.Fatalf("Failed to register open api spec for root: %v", err) } - _, err = handler.BuildAndRegisterOpenAPIVersionedService("/openapi/v2", c.RegisteredWebServices(), oa.Config, mux) + openAPIVersionedService, err = handler.RegisterOpenAPIVersionedService(spec, "/openapi/v2", mux) if err != nil { klog.Fatalf("Failed to register versioned open api spec for root: %v", err) } + return openAPIService, openAPIVersionedService, spec }