From 11cf3101e3d373954dda57d33dd01df3801cd435 Mon Sep 17 00:00:00 2001 From: Haowei Cai Date: Mon, 4 Mar 2019 08:20:39 -0800 Subject: [PATCH] apiextensions-apiserver: serve openapi spec Co-authored-by: Dr. Stefan Schimanski --- pkg/features/kube_features.go | 1 + .../pkg/apiserver/apiserver.go | 11 + .../pkg/controller/openapi/aggregator.go | 70 ++++++ .../pkg/controller/openapi/controller.go | 218 ++++++++++++++++++ .../pkg/features/kube_features.go | 7 + 5 files changed, 307 insertions(+) create mode 100644 staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/aggregator.go create mode 100644 staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/controller.go diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 0d01194527f..85dc95ece3e 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -497,6 +497,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS apiextensionsfeatures.CustomResourceValidation: {Default: true, PreRelease: utilfeature.Beta}, apiextensionsfeatures.CustomResourceSubresources: {Default: true, PreRelease: utilfeature.Beta}, apiextensionsfeatures.CustomResourceWebhookConversion: {Default: false, PreRelease: utilfeature.Alpha}, + apiextensionsfeatures.CustomResourcePublishOpenAPI: {Default: false, PreRelease: utilfeature.Alpha}, // features that enable backwards compatibility but are scheduled to be removed // ... diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go index 1ba74a1933a..a223d41acec 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go @@ -31,7 +31,9 @@ import ( internalinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion" "k8s.io/apiextensions-apiserver/pkg/controller/establish" "k8s.io/apiextensions-apiserver/pkg/controller/finalizer" + openapicontroller "k8s.io/apiextensions-apiserver/pkg/controller/openapi" "k8s.io/apiextensions-apiserver/pkg/controller/status" + apiextensionsfeatures "k8s.io/apiextensions-apiserver/pkg/features" "k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -44,6 +46,7 @@ import ( "k8s.io/apiserver/pkg/registry/rest" genericapiserver "k8s.io/apiserver/pkg/server" serverstorage "k8s.io/apiserver/pkg/server/storage" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/apiserver/pkg/util/webhook" ) @@ -198,12 +201,20 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) crdClient.Apiextensions(), crdHandler, ) + var openapiController *openapicontroller.Controller + if utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourcePublishOpenAPI) { + openapiController = openapicontroller.NewController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions()) + } s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error { s.Informers.Start(context.StopCh) return nil }) s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error { + if utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourcePublishOpenAPI) { + go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh) + } + go crdController.Run(context.StopCh) go namingController.Run(context.StopCh) go establishingController.Run(context.StopCh) diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/aggregator.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/aggregator.go new file mode 100644 index 00000000000..1dc355fd94c --- /dev/null +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/aggregator.go @@ -0,0 +1,70 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package openapi + +import ( + "github.com/go-openapi/spec" +) + +// mergeSpecs aggregates all OpenAPI specs, reusing the metadata of the first, static spec as the basis. +func mergeSpecs(staticSpec *spec.Swagger, crdSpecs ...*spec.Swagger) *spec.Swagger { + // create shallow copy of staticSpec, but replace paths and definitions because we modify them. + specToReturn := *staticSpec + if staticSpec.Definitions != nil { + specToReturn.Definitions = spec.Definitions{} + for k, s := range staticSpec.Definitions { + specToReturn.Definitions[k] = s + } + } + if staticSpec.Paths != nil { + specToReturn.Paths = &spec.Paths{ + Paths: map[string]spec.PathItem{}, + } + for k, p := range staticSpec.Paths.Paths { + specToReturn.Paths.Paths[k] = p + } + } + + for _, s := range crdSpecs { + mergeSpec(&specToReturn, s) + } + + return &specToReturn +} + +// mergeSpec copies paths and definitions from source to dest, mutating dest, but not source. +// We assume that conflicts do not matter. +func mergeSpec(dest, source *spec.Swagger) { + if source == nil || source.Paths == nil { + return + } + if dest.Paths == nil { + dest.Paths = &spec.Paths{} + } + for k, v := range source.Definitions { + if dest.Definitions == nil { + dest.Definitions = spec.Definitions{} + } + dest.Definitions[k] = v + } + for k, v := range source.Paths.Paths { + if dest.Paths.Paths == nil { + dest.Paths.Paths = map[string]spec.PathItem{} + } + dest.Paths.Paths[k] = v + } +} diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/controller.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/controller.go new file mode 100644 index 00000000000..0e083f9c18b --- /dev/null +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/controller.go @@ -0,0 +1,218 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package openapi + +import ( + "fmt" + "reflect" + "sync" + "time" + + "github.com/go-openapi/spec" + "k8s.io/apimachinery/pkg/api/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog" + "k8s.io/kube-openapi/pkg/handler" + + "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions" + informers "k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion/apiextensions/internalversion" + listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/internalversion" +) + +// Controller watches CustomResourceDefinitions and publishes validation schema +type Controller struct { + crdLister listers.CustomResourceDefinitionLister + crdsSynced cache.InformerSynced + + // To allow injection for testing. + syncFn func(string) error + + queue workqueue.RateLimitingInterface + + staticSpec *spec.Swagger + openAPIService *handler.OpenAPIService + + // specs per version and per CRD name + lock sync.Mutex + crdSpecs map[string]map[string]*spec.Swagger +} + +// NewController creates a new Controller with input CustomResourceDefinition informer +func NewController(crdInformer informers.CustomResourceDefinitionInformer) *Controller { + c := &Controller{ + crdLister: crdInformer.Lister(), + crdsSynced: crdInformer.Informer().HasSynced, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd_openapi_controller"), + crdSpecs: map[string]map[string]*spec.Swagger{}, + } + + crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.addCustomResourceDefinition, + UpdateFunc: c.updateCustomResourceDefinition, + DeleteFunc: c.deleteCustomResourceDefinition, + }) + + c.syncFn = c.sync + return c +} + +// Run sets openAPIAggregationManager and starts workers +func (c *Controller) Run(staticSpec *spec.Swagger, openAPIService *handler.OpenAPIService, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + defer klog.Infof("Shutting down OpenAPI controller") + + klog.Infof("Starting OpenAPI controller") + + c.staticSpec = staticSpec + c.openAPIService = openAPIService + + if !cache.WaitForCacheSync(stopCh, c.crdsSynced) { + utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + return + } + + // only start one worker thread since its a slow moving API + go wait.Until(c.runWorker, time.Second, stopCh) + + <-stopCh +} + +func (c *Controller) runWorker() { + for c.processNextWorkItem() { + } +} + +func (c *Controller) processNextWorkItem() bool { + key, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(key) + + // log slow aggregations + start := time.Now() + defer func() { + elapsed := time.Since(start) + if elapsed > time.Second { + klog.Warningf("slow openapi aggregation of %q: %s", key.(string), elapsed) + } + }() + + err := c.syncFn(key.(string)) + if err == nil { + c.queue.Forget(key) + return true + } + + utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err)) + c.queue.AddRateLimited(key) + return true +} + +func (c *Controller) sync(name string) error { + c.lock.Lock() + defer c.lock.Unlock() + + crd, err := c.crdLister.Get(name) + if err != nil && !errors.IsNotFound(err) { + return err + } + + // do we have to remove all specs of this CRD? + if errors.IsNotFound(err) || !apiextensions.IsCRDConditionTrue(crd, apiextensions.Established) { + if _, found := c.crdSpecs[name]; !found { + return nil + } + delete(c.crdSpecs, name) + return c.updateSpecLocked() + } + + // compute CRD spec and see whether it changed + oldSpecs := c.crdSpecs[crd.Name] + newSpecs := map[string]*spec.Swagger{} + anyChanged := false + for _, v := range crd.Spec.Versions { + if !v.Served { + continue + } + spec, err := BuildSwagger(crd, v.Name) + if err != nil { + return err + } + newSpecs[v.Name] = spec + if oldSpecs[v.Name] == nil || !reflect.DeepEqual(oldSpecs[v.Name], spec) { + anyChanged = true + } + } + if !anyChanged && len(oldSpecs) == len(newSpecs) { + return nil + } + + // update specs of this CRD + c.crdSpecs[crd.Name] = newSpecs + return c.updateSpecLocked() +} + +// updateSpecLocked aggregates all OpenAPI specs and updates openAPIService. +// It is not thread-safe. The caller is responsible to hold proper lock (Controller.lock). +func (c *Controller) updateSpecLocked() error { + crdSpecs := []*spec.Swagger{} + for _, versionSpecs := range c.crdSpecs { + for _, s := range versionSpecs { + crdSpecs = append(crdSpecs, s) + } + } + return c.openAPIService.UpdateSpec(mergeSpecs(c.staticSpec, crdSpecs...)) +} + +func (c *Controller) addCustomResourceDefinition(obj interface{}) { + castObj := obj.(*apiextensions.CustomResourceDefinition) + klog.V(4).Infof("Adding customresourcedefinition %s", castObj.Name) + c.enqueue(castObj) +} + +func (c *Controller) updateCustomResourceDefinition(oldObj, newObj interface{}) { + castNewObj := newObj.(*apiextensions.CustomResourceDefinition) + klog.V(4).Infof("Updating customresourcedefinition %s", castNewObj.Name) + c.enqueue(castNewObj) +} + +func (c *Controller) deleteCustomResourceDefinition(obj interface{}) { + castObj, ok := obj.(*apiextensions.CustomResourceDefinition) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + klog.Errorf("Couldn't get object from tombstone %#v", obj) + return + } + castObj, ok = tombstone.Obj.(*apiextensions.CustomResourceDefinition) + if !ok { + klog.Errorf("Tombstone contained object that is not expected %#v", obj) + return + } + } + klog.V(4).Infof("Deleting customresourcedefinition %q", castObj.Name) + c.enqueue(castObj) +} + +func (c *Controller) enqueue(obj *apiextensions.CustomResourceDefinition) { + c.queue.Add(obj.Name) +} diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/features/kube_features.go index 0ed34211eab..f2b7453733d 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/features/kube_features.go @@ -34,6 +34,12 @@ const ( // CustomResourceValidation is a list of validation methods for CustomResources CustomResourceValidation utilfeature.Feature = "CustomResourceValidation" + // owner: @roycaihw, @sttts + // alpha: v1.14 + // + // CustomResourcePublishOpenAPI enables publishing of CRD OpenAPI specs. + CustomResourcePublishOpenAPI utilfeature.Feature = "CustomResourcePublishOpenAPI" + // owner: @sttts, @nikhita // alpha: v1.10 // beta: v1.11 @@ -59,4 +65,5 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS CustomResourceValidation: {Default: true, PreRelease: utilfeature.Beta}, CustomResourceSubresources: {Default: true, PreRelease: utilfeature.Beta}, CustomResourceWebhookConversion: {Default: false, PreRelease: utilfeature.Alpha}, + CustomResourcePublishOpenAPI: {Default: false, PreRelease: utilfeature.Alpha}, }