diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go index 6ee40eceea2..f489f074b1a 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go @@ -32,6 +32,7 @@ import ( "k8s.io/apiextensions-apiserver/pkg/controller/finalizer" "k8s.io/apiextensions-apiserver/pkg/controller/nonstructuralschema" openapicontroller "k8s.io/apiextensions-apiserver/pkg/controller/openapi" + openapiv3controller "k8s.io/apiextensions-apiserver/pkg/controller/openapiv3" "k8s.io/apiextensions-apiserver/pkg/controller/status" "k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -41,10 +42,12 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/version" "k8s.io/apiserver/pkg/endpoints/discovery" + "k8s.io/apiserver/pkg/features" genericregistry "k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/registry/rest" genericapiserver "k8s.io/apiserver/pkg/server" serverstorage "k8s.io/apiserver/pkg/server/storage" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/apiserver/pkg/util/webhook" ) @@ -218,6 +221,10 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) crdHandler, ) openapiController := openapicontroller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions()) + var openapiv3Controller *openapiv3controller.Controller + if utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) { + openapiv3Controller = openapiv3controller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions()) + } s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error { s.Informers.Start(context.StopCh) @@ -230,6 +237,9 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) // and StaticOpenAPISpec are both null. In that case we don't run the CRD OpenAPI controller. if s.GenericAPIServer.OpenAPIVersionedService != nil && s.GenericAPIServer.StaticOpenAPISpec != nil { go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh) + if utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) { + go openapiv3Controller.Run(s.GenericAPIServer.OpenAPIV3VersionedService, context.StopCh) + } } go namingController.Run(context.StopCh) diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go index f10a609a30c..c498c16bff9 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go @@ -1361,7 +1361,8 @@ func buildOpenAPIModelsForApply(staticOpenAPISpec *spec.Swagger, crd *apiextensi specs := []*spec.Swagger{} for _, v := range crd.Spec.Versions { // Defaults are not pruned here, but before being served. - s, err := builder.BuildSwagger(crd, v.Name, builder.Options{V2: false, StripValueValidation: true, StripNullable: true, AllowNonStructural: false}) + // See flag description in builder.go for flag usage + s, err := builder.BuildOpenAPIV2(crd, v.Name, builder.Options{V2: true, SkipFilterSchemaForKubectlOpenAPIV2Validation: true, StripValueValidation: true, StripNullable: true, AllowNonStructural: false}) if err != nil { return nil, err } diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/builder/builder.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/builder/builder.go index 9d8eb917ed3..950e8b08ab5 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/builder/builder.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/builder/builder.go @@ -43,7 +43,9 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" utilopenapi "k8s.io/apiserver/pkg/util/openapi" openapibuilder "k8s.io/kube-openapi/pkg/builder" + "k8s.io/kube-openapi/pkg/builder3" "k8s.io/kube-openapi/pkg/common" + "k8s.io/kube-openapi/pkg/spec3" "k8s.io/kube-openapi/pkg/util" "k8s.io/kube-openapi/pkg/validation/spec" ) @@ -52,10 +54,13 @@ const ( // Reference and Go types for built-in metadata objectMetaSchemaRef = "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta" listMetaSchemaRef = "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.ListMeta" - listMetaType = "k8s.io/apimachinery/pkg/apis/meta/v1.ListMeta" - typeMetaType = "k8s.io/apimachinery/pkg/apis/meta/v1.TypeMeta" - definitionPrefix = "#/definitions/" + listMetaType = "k8s.io/apimachinery/pkg/apis/meta/v1.ListMeta" + typeMetaType = "k8s.io/apimachinery/pkg/apis/meta/v1.TypeMeta" + objectMetaType = "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta" + + definitionPrefix = "#/definitions/" + v3DefinitionPrefix = "#/components/schemas/" ) var ( @@ -66,6 +71,15 @@ var ( namespaceToken = "{namespace}" ) +// The path for definitions in OpenAPI v2 and v3 are different. Translate the path if necessary +// The provided schemaRef uses a v2 prefix and is converted to v3 if the v2 bool is false +func refForOpenAPIVersion(schemaRef string, v2 bool) string { + if v2 { + return schemaRef + } + return strings.Replace(schemaRef, definitionPrefix, v3DefinitionPrefix, 1) +} + var definitions map[string]common.OpenAPIDefinition var buildDefinitions sync.Once var namer *openapi.DefinitionNamer @@ -75,6 +89,12 @@ type Options struct { // Convert to OpenAPI v2. V2 bool + // Only takes effect if the flag and V2 and both set to true. If the condition is reached, + // publish OpenAPI V2 but skip running the spec through ToStructuralOpenAPIV2 + // This prevents XPreserveUnknownFields:true fields from being cleared + // Used only by server side apply + SkipFilterSchemaForKubectlOpenAPIV2Validation bool + // Strip value validation. StripValueValidation bool @@ -85,8 +105,7 @@ type Options struct { AllowNonStructural bool } -// BuildSwagger builds swagger for the given crd in the given version -func BuildSwagger(crd *apiextensionsv1.CustomResourceDefinition, version string, opts Options) (*spec.Swagger, error) { +func generateBuilder(crd *apiextensionsv1.CustomResourceDefinition, version string, opts Options) (*builder, error) { var schema *structuralschema.Structural s, err := apiextensionshelpers.GetSchemaForVersion(crd, version) if err != nil { @@ -122,7 +141,7 @@ func BuildSwagger(crd *apiextensionsv1.CustomResourceDefinition, version string, // comes from function registerResourceHandlers() in k8s.io/apiserver. // Alternatives are either (ideally) refactoring registerResourceHandlers() to // reuse the code, or faking an APIInstaller for CR to feed to registerResourceHandlers(). - b := newBuilder(crd, version, schema, opts.V2) + b := newBuilder(crd, version, schema, opts) // Sample response types for building web service sample := &CRDCanonicalTypeNamer{ @@ -173,13 +192,26 @@ func BuildSwagger(crd *apiextensionsv1.CustomResourceDefinition, version string, for _, route := range routes { b.ws.Route(route) } + return b, nil +} - openAPISpec, err := openapibuilder.BuildOpenAPISpec([]*restful.WebService{b.ws}, b.getOpenAPIConfig()) +func BuildOpenAPIV3(crd *apiextensionsv1.CustomResourceDefinition, version string, opts Options) (*spec3.OpenAPI, error) { + b, err := generateBuilder(crd, version, opts) if err != nil { return nil, err } - return openAPISpec, nil + return builder3.BuildOpenAPISpec([]*restful.WebService{b.ws}, b.getOpenAPIConfig(false)) +} + +// BuildOpenAPIV2 builds OpenAPI v2 for the given crd in the given version +func BuildOpenAPIV2(crd *apiextensionsv1.CustomResourceDefinition, version string, opts Options) (*spec.Swagger, error) { + b, err := generateBuilder(crd, version, opts) + if err != nil { + return nil, err + } + + return openapibuilder.BuildOpenAPISpec([]*restful.WebService{b.ws}, b.getOpenAPIConfig(true)) } // Implements CanonicalTypeNamer @@ -349,26 +381,26 @@ func (b *builder) buildRoute(root, path, httpMethod, actionVerb, operationVerb s // buildKubeNative builds input schema with Kubernetes' native object meta, type meta and // extensions -func (b *builder) buildKubeNative(schema *structuralschema.Structural, v2 bool, crdPreserveUnknownFields bool) (ret *spec.Schema) { +func (b *builder) buildKubeNative(schema *structuralschema.Structural, opts Options, crdPreserveUnknownFields bool) (ret *spec.Schema) { // only add properties if we have a schema. Otherwise, kubectl would (wrongly) assume additionalProperties=false // and forbid anything outside of apiVersion, kind and metadata. We have to fix kubectl to stop doing this, e.g. by // adding additionalProperties=true support to explicitly allow additional fields. // TODO: fix kubectl to understand additionalProperties=true - if schema == nil || (v2 && (schema.XPreserveUnknownFields || crdPreserveUnknownFields)) { + if schema == nil || ((opts.V2 && !opts.SkipFilterSchemaForKubectlOpenAPIV2Validation) && (schema.XPreserveUnknownFields || crdPreserveUnknownFields)) { ret = &spec.Schema{ SchemaProps: spec.SchemaProps{Type: []string{"object"}}, } // no, we cannot add more properties here, not even TypeMeta/ObjectMeta because kubectl will complain about // unknown fields for anything else. } else { - if v2 { + if opts.V2 && !opts.SkipFilterSchemaForKubectlOpenAPIV2Validation { schema = openapiv2.ToStructuralOpenAPIV2(schema) } + ret = schema.ToKubeOpenAPI() - ret.SetProperty("metadata", *spec.RefSchema(objectMetaSchemaRef). - WithDescription(swaggerPartialObjectMetadataDescriptions["metadata"])) - addTypeMetaProperties(ret) - addEmbeddedProperties(ret, v2) + ret.SetProperty("metadata", *spec.RefSchema(refForOpenAPIVersion(objectMetaSchemaRef, opts.V2)).WithDescription(swaggerPartialObjectMetadataDescriptions["metadata"])) + addTypeMetaProperties(ret, opts.V2) + addEmbeddedProperties(ret, opts) } ret.AddExtension(endpoints.ROUTE_META_GVK, []interface{}{ map[string]interface{}{ @@ -381,36 +413,36 @@ func (b *builder) buildKubeNative(schema *structuralschema.Structural, v2 bool, return ret } -func addEmbeddedProperties(s *spec.Schema, v2 bool) { +func addEmbeddedProperties(s *spec.Schema, opts Options) { if s == nil { return } for k := range s.Properties { v := s.Properties[k] - addEmbeddedProperties(&v, v2) + addEmbeddedProperties(&v, opts) s.Properties[k] = v } if s.Items != nil { - addEmbeddedProperties(s.Items.Schema, v2) + addEmbeddedProperties(s.Items.Schema, opts) } if s.AdditionalProperties != nil { - addEmbeddedProperties(s.AdditionalProperties.Schema, v2) + addEmbeddedProperties(s.AdditionalProperties.Schema, opts) } - if isTrue, ok := s.VendorExtensible.Extensions.GetBool("x-kubernetes-preserve-unknown-fields"); ok && isTrue && v2 { + if isTrue, ok := s.VendorExtensible.Extensions.GetBool("x-kubernetes-preserve-unknown-fields"); ok && isTrue && opts.V2 && !opts.SkipFilterSchemaForKubectlOpenAPIV2Validation { // don't add metadata properties if we're publishing to openapi v2 and are allowing unknown fields. // adding these metadata properties makes kubectl refuse to validate unknown fields. return } if isTrue, ok := s.VendorExtensible.Extensions.GetBool("x-kubernetes-embedded-resource"); ok && isTrue { - s.SetProperty("apiVersion", withDescription(getDefinition(typeMetaType).SchemaProps.Properties["apiVersion"], + s.SetProperty("apiVersion", withDescription(getDefinition(typeMetaType, opts.V2).SchemaProps.Properties["apiVersion"], "apiVersion defines the versioned schema of this representation of an object. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources", )) - s.SetProperty("kind", withDescription(getDefinition(typeMetaType).SchemaProps.Properties["kind"], + s.SetProperty("kind", withDescription(getDefinition(typeMetaType, opts.V2).SchemaProps.Properties["kind"], "kind is a string value representing the type of this object. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds", )) - s.SetProperty("metadata", *spec.RefSchema(objectMetaSchemaRef).WithDescription(swaggerPartialObjectMetadataDescriptions["metadata"])) + s.SetProperty("metadata", *spec.RefSchema(refForOpenAPIVersion(objectMetaSchemaRef, opts.V2)).WithDescription(swaggerPartialObjectMetadataDescriptions["metadata"])) req := sets.NewString(s.Required...) if !req.Has("kind") { @@ -424,8 +456,8 @@ func addEmbeddedProperties(s *spec.Schema, v2 bool) { // getDefinition gets definition for given Kubernetes type. This function is extracted from // kube-openapi builder logic -func getDefinition(name string) spec.Schema { - buildDefinitions.Do(buildDefinitionsFunc) +func getDefinition(name string, v2 bool) spec.Schema { + buildDefinitions.Do(generateBuildDefinitionsFunc(v2)) return definitions[name].Schema } @@ -433,31 +465,37 @@ func withDescription(s spec.Schema, desc string) spec.Schema { return *s.WithDescription(desc) } -func buildDefinitionsFunc() { - namer = openapi.NewDefinitionNamer(runtime.NewScheme()) - definitions = utilopenapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)(func(name string) spec.Ref { - defName, _ := namer.GetDefinitionName(name) - return spec.MustCreateRef(definitionPrefix + common.EscapeJsonPointer(defName)) - }) +func generateBuildDefinitionsFunc(v2 bool) func() { + return func() { + namer = openapi.NewDefinitionNamer(runtime.NewScheme()) + definitions = utilopenapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)(func(name string) spec.Ref { + defName, _ := namer.GetDefinitionName(name) + prefix := v3DefinitionPrefix + if v2 { + prefix = definitionPrefix + } + return spec.MustCreateRef(prefix + common.EscapeJsonPointer(defName)) + }) + } } // addTypeMetaProperties adds Kubernetes-specific type meta properties to input schema: // apiVersion and kind -func addTypeMetaProperties(s *spec.Schema) { - s.SetProperty("apiVersion", getDefinition(typeMetaType).SchemaProps.Properties["apiVersion"]) - s.SetProperty("kind", getDefinition(typeMetaType).SchemaProps.Properties["kind"]) +func addTypeMetaProperties(s *spec.Schema, v2 bool) { + s.SetProperty("apiVersion", getDefinition(typeMetaType, v2).SchemaProps.Properties["apiVersion"]) + s.SetProperty("kind", getDefinition(typeMetaType, v2).SchemaProps.Properties["kind"]) } // buildListSchema builds the list kind schema for the CRD -func (b *builder) buildListSchema() *spec.Schema { +func (b *builder) buildListSchema(v2 bool) *spec.Schema { name := definitionPrefix + util.ToRESTFriendlyName(fmt.Sprintf("%s/%s/%s", b.group, b.version, b.kind)) doc := fmt.Sprintf("List of %s. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md", b.plural) s := new(spec.Schema).WithDescription(fmt.Sprintf("%s is a list of %s", b.listKind, b.kind)). WithRequired("items"). SetProperty("items", *spec.ArrayProperty(spec.RefSchema(name)).WithDescription(doc)). - SetProperty("metadata", *spec.RefSchema(listMetaSchemaRef).WithDescription(swaggerPartialObjectMetadataListDescriptions["metadata"])) + SetProperty("metadata", *spec.RefSchema(refForOpenAPIVersion(listMetaSchemaRef, v2)).WithDescription(swaggerPartialObjectMetadataListDescriptions["metadata"])) - addTypeMetaProperties(s) + addTypeMetaProperties(s, v2) s.AddExtension(endpoints.ROUTE_META_GVK, []map[string]string{ { "group": b.group, @@ -469,7 +507,7 @@ func (b *builder) buildListSchema() *spec.Schema { } // getOpenAPIConfig builds config which wires up generated definitions for kube-openapi to consume -func (b *builder) getOpenAPIConfig() *common.Config { +func (b *builder) getOpenAPIConfig(v2 bool) *common.Config { return &common.Config{ ProtocolList: []string{"https"}, Info: &spec.Info{ @@ -487,13 +525,14 @@ func (b *builder) getOpenAPIConfig() *common.Config { }, GetOperationIDAndTags: openapi.GetOperationIDAndTags, GetDefinitionName: func(name string) (string, spec.Extensions) { - buildDefinitions.Do(buildDefinitionsFunc) + buildDefinitions.Do(generateBuildDefinitionsFunc(v2)) return namer.GetDefinitionName(name) }, GetDefinitions: func(ref common.ReferenceCallback) map[string]common.OpenAPIDefinition { def := utilopenapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)(ref) def[fmt.Sprintf("%s/%s.%s", b.group, b.version, b.kind)] = common.OpenAPIDefinition{ - Schema: *b.schema, + Schema: *b.schema, + Dependencies: []string{objectMetaType}, } def[fmt.Sprintf("%s/%s.%s", b.group, b.version, b.listKind)] = common.OpenAPIDefinition{ Schema: *b.listSchema, @@ -503,7 +542,7 @@ func (b *builder) getOpenAPIConfig() *common.Config { } } -func newBuilder(crd *apiextensionsv1.CustomResourceDefinition, version string, schema *structuralschema.Structural, v2 bool) *builder { +func newBuilder(crd *apiextensionsv1.CustomResourceDefinition, version string, schema *structuralschema.Structural, opts Options) *builder { b := &builder{ schema: &spec.Schema{ SchemaProps: spec.SchemaProps{Type: []string{"object"}}, @@ -522,8 +561,8 @@ func newBuilder(crd *apiextensionsv1.CustomResourceDefinition, version string, s } // Pre-build schema with Kubernetes native properties - b.schema = b.buildKubeNative(schema, v2, crd.Spec.PreserveUnknownFields) - b.listSchema = b.buildListSchema() + b.schema = b.buildKubeNative(schema, opts, crd.Spec.PreserveUnknownFields) + b.listSchema = b.buildListSchema(opts.V2) return b } diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/builder/builder_test.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/builder/builder_test.go index c224a6825f8..d923a4f391c 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/builder/builder_test.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/builder/builder_test.go @@ -45,37 +45,43 @@ func TestNewBuilder(t *testing.T) { wantedSchema string wantedItemsSchema string - v2 bool // produce OpenAPIv2 + v2 bool // produce OpenAPIv2 + skipFilterSchemaForKubectlOpenAPIV2Validation bool // produce OpenAPIv2 without going through the ToStructuralOpenAPIV2 path }{ { "nil", "", `{"type":"object","x-kubernetes-group-version-kind":[{"group":"bar.k8s.io","kind":"Foo","version":"v1"}]}`, `{"$ref":"#/definitions/io.k8s.bar.v1.Foo"}`, true, + false, }, {"with properties", `{"type":"object","properties":{"spec":{"type":"object"},"status":{"type":"object"}}}`, `{"type":"object","properties":{"apiVersion":{"type":"string"},"kind":{"type":"string"},"metadata":{"$ref":"#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta"},"spec":{"type":"object"},"status":{"type":"object"}},"x-kubernetes-group-version-kind":[{"group":"bar.k8s.io","kind":"Foo","version":"v1"}]}`, `{"$ref":"#/definitions/io.k8s.bar.v1.Foo"}`, true, + false, }, {"type only", `{"type":"object"}`, `{"type":"object","properties":{"apiVersion":{"type":"string"},"kind":{"type":"string"},"metadata":{"$ref":"#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta"}},"x-kubernetes-group-version-kind":[{"group":"bar.k8s.io","kind":"Foo","version":"v1"}]}`, `{"$ref":"#/definitions/io.k8s.bar.v1.Foo"}`, true, + false, }, {"preserve unknown at root v2", `{"type":"object","x-kubernetes-preserve-unknown-fields":true}`, `{"type":"object","x-kubernetes-group-version-kind":[{"group":"bar.k8s.io","kind":"Foo","version":"v1"}]}`, `{"$ref":"#/definitions/io.k8s.bar.v1.Foo"}`, true, + false, }, {"preserve unknown at root v3", `{"type":"object","x-kubernetes-preserve-unknown-fields":true}`, `{"type":"object","x-kubernetes-preserve-unknown-fields":true,"properties":{"apiVersion":{"type":"string"},"kind":{"type":"string"},"metadata":{"$ref":"#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta"}},"x-kubernetes-group-version-kind":[{"group":"bar.k8s.io","kind":"Foo","version":"v1"}]}`, `{"$ref":"#/definitions/io.k8s.bar.v1.Foo"}`, - false, + true, + true, }, {"with extensions", ` @@ -179,6 +185,7 @@ func TestNewBuilder(t *testing.T) { }`, `{"$ref":"#/definitions/io.k8s.bar.v1.Foo"}`, true, + false, }, {"with extensions as v3 schema", ` @@ -344,7 +351,8 @@ func TestNewBuilder(t *testing.T) { "x-kubernetes-group-version-kind":[{"group":"bar.k8s.io","kind":"Foo","version":"v1"}] }`, `{"$ref":"#/definitions/io.k8s.bar.v1.Foo"}`, - false, + true, + true, }, } for _, tt := range tests { @@ -384,7 +392,7 @@ func TestNewBuilder(t *testing.T) { }, Scope: apiextensionsv1.NamespaceScoped, }, - }, "v1", schema, tt.v2) + }, "v1", schema, Options{V2: tt.v2, SkipFilterSchemaForKubectlOpenAPIV2Validation: tt.skipFilterSchemaForKubectlOpenAPIV2Validation}) var wantedSchema, wantedItemsSchema spec.Schema if err := json.Unmarshal([]byte(tt.wantedSchema), &wantedSchema); err != nil { @@ -500,7 +508,7 @@ func TestCRDRouteParameterBuilder(t *testing.T) { }, }, } - swagger, err := BuildSwagger(testNamespacedCRD, testCRDVersion, Options{V2: true}) + swagger, err := BuildOpenAPIV2(testNamespacedCRD, testCRDVersion, Options{V2: true}) require.NoError(t, err) require.Equal(t, len(testCase.paths), len(swagger.Paths.Paths), testCase.scope) for path, expected := range testCase.paths { @@ -567,7 +575,7 @@ func schemaDiff(a, b *spec.Schema) string { return diff.StringDiff(string(as), string(bs)) } -func TestBuildSwagger(t *testing.T) { +func TestBuildOpenAPIV2(t *testing.T) { tests := []struct { name string schema string @@ -622,7 +630,7 @@ func TestBuildSwagger(t *testing.T) { `{"type":"object","properties":{"foo":{"type":"string","oneOf":[{"pattern":"a"},{"pattern":"b"}]}}}`, nil, `{"type":"object","properties":{"apiVersion":{"type":"string"},"kind":{"type":"string"},"metadata":{"$ref":"#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta"},"foo":{"type":"string","oneOf":[{"pattern":"a"},{"pattern":"b"}]}},"x-kubernetes-group-version-kind":[{"group":"bar.k8s.io","kind":"Foo","version":"v1"}]}`, - Options{V2: false}, + Options{V2: true, SkipFilterSchemaForKubectlOpenAPIV2Validation: true}, }, } for _, tt := range tests { @@ -642,7 +650,7 @@ func TestBuildSwagger(t *testing.T) { } // TODO: mostly copied from the test above. reuse code to cleanup - got, err := BuildSwagger(&apiextensionsv1.CustomResourceDefinition{ + got, err := BuildOpenAPIV2(&apiextensionsv1.CustomResourceDefinition{ Spec: apiextensionsv1.CustomResourceDefinitionSpec{ Group: "bar.k8s.io", Versions: []apiextensionsv1.CustomResourceDefinitionVersion{ @@ -691,3 +699,106 @@ func TestBuildSwagger(t *testing.T) { }) } } + +func TestBuildOpenAPIV3(t *testing.T) { + tests := []struct { + name string + schema string + preserveUnknownFields *bool + wantedSchema string + opts Options + }{ + { + "nil", + "", + nil, + `{"type":"object","x-kubernetes-group-version-kind":[{"group":"bar.k8s.io","kind":"Foo","version":"v1"}]}`, + Options{}, + }, + { + "with properties", + `{"type":"object","properties":{"spec":{"type":"object"},"status":{"type":"object"}}}`, + nil, + `{"type":"object","properties":{"apiVersion":{"type":"string"},"kind":{"type":"string"},"metadata":{"$ref":"#/components/schemas/io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta"},"spec":{"type":"object"},"status":{"type":"object"}},"x-kubernetes-group-version-kind":[{"group":"bar.k8s.io","kind":"Foo","version":"v1"}]}`, + Options{}, + }, + { + "with v3 nullable field", + `{"type":"object","properties":{"spec":{"type":"object", "nullable": true},"status":{"type":"object"}}}`, + nil, + `{"type":"object","properties":{"apiVersion":{"type":"string"},"kind":{"type":"string"},"metadata":{"$ref":"#/components/schemas/io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta"},"spec":{"type":"object", "nullable": true},"status":{"type":"object"}},"x-kubernetes-group-version-kind":[{"group":"bar.k8s.io","kind":"Foo","version":"v1"}]}`, + Options{}, + }, + { + "with default not pruned for v3", + `{"type":"object","properties":{"spec":{"type":"object","properties":{"field":{"type":"string","default":"foo"}}},"status":{"type":"object"}}}`, + nil, + `{"type":"object","properties":{"apiVersion":{"type":"string"},"kind":{"type":"string"},"metadata":{"$ref":"#/components/schemas/io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta"},"spec":{"type":"object","properties":{"field":{"type":"string","default":"foo"}}},"status":{"type":"object"}},"x-kubernetes-group-version-kind":[{"group":"bar.k8s.io","kind":"Foo","version":"v1"}]}`, + Options{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var validation *apiextensionsv1.CustomResourceValidation + if len(tt.schema) > 0 { + v1Schema := &apiextensionsv1.JSONSchemaProps{} + if err := json.Unmarshal([]byte(tt.schema), &v1Schema); err != nil { + t.Fatal(err) + } + validation = &apiextensionsv1.CustomResourceValidation{ + OpenAPIV3Schema: v1Schema, + } + } + if tt.preserveUnknownFields != nil && *tt.preserveUnknownFields { + validation.OpenAPIV3Schema.XPreserveUnknownFields = utilpointer.BoolPtr(true) + } + + got, err := BuildOpenAPIV3(&apiextensionsv1.CustomResourceDefinition{ + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Group: "bar.k8s.io", + Versions: []apiextensionsv1.CustomResourceDefinitionVersion{ + { + Name: "v1", + Schema: validation, + }, + }, + Names: apiextensionsv1.CustomResourceDefinitionNames{ + Plural: "foos", + Singular: "foo", + Kind: "Foo", + ListKind: "FooList", + }, + Scope: apiextensionsv1.NamespaceScoped, + }, + }, "v1", tt.opts) + if err != nil { + t.Fatal(err) + } + + var wantedSchema spec.Schema + if err := json.Unmarshal([]byte(tt.wantedSchema), &wantedSchema); err != nil { + t.Fatal(err) + } + + gotSchema := *got.Components.Schemas["io.k8s.bar.v1.Foo"] + gotProperties := properties(gotSchema.Properties) + wantedProperties := properties(wantedSchema.Properties) + if !gotProperties.Equal(wantedProperties) { + t.Fatalf("unexpected properties, got: %s, expected: %s", gotProperties.List(), wantedProperties.List()) + } + + // wipe out TypeMeta/ObjectMeta content, with those many lines of descriptions. We trust that they match here. + for _, metaField := range []string{"kind", "apiVersion", "metadata"} { + if _, found := gotSchema.Properties["kind"]; found { + prop := gotSchema.Properties[metaField] + prop.Description = "" + gotSchema.Properties[metaField] = prop + } + } + + if !reflect.DeepEqual(&wantedSchema, &gotSchema) { + t.Errorf("unexpected schema: %s\nwant = %#v\ngot = %#v", schemaDiff(&wantedSchema, &gotSchema), &wantedSchema, &gotSchema) + } + }) + } +} diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/builder/merge.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/builder/merge.go index 9309ec9b366..36c4118207b 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/builder/merge.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/builder/merge.go @@ -17,7 +17,9 @@ limitations under the License. package builder import ( + "k8s.io/klog/v2" "k8s.io/kube-openapi/pkg/aggregator" + "k8s.io/kube-openapi/pkg/spec3" "k8s.io/kube-openapi/pkg/validation/spec" ) @@ -79,3 +81,51 @@ func mergeSpec(dest, source *spec.Swagger) { dest.Paths.Paths[k] = v } } + +// MergeSpecsV3 merges OpenAPI v3 specs for CRDs +// For V3, the static spec is never merged with the individual CRD specs so no conflict resolution is necessary +func MergeSpecsV3(crdSpecs ...*spec3.OpenAPI) (*spec3.OpenAPI, error) { + // create shallow copy of staticSpec, but replace paths and definitions because we modify them. + crdSpec := &spec3.OpenAPI{} + if len(crdSpecs) > 0 { + crdSpec.Version = crdSpecs[0].Version + crdSpec.Info = crdSpecs[0].Info + } + for _, s := range crdSpecs { + // merge specs without checking conflicts, since the naming controller prevents + // conflicts between user-defined CRDs + mergeSpecV3(crdSpec, s) + } + + return crdSpec, nil +} + +// mergeSpecV3 copies paths and definitions from source to dest, mutating dest, but not source. +// We assume that conflicts do not matter. +func mergeSpecV3(dest, source *spec3.OpenAPI) { + if source == nil || source.Paths == nil { + return + } + if dest.Paths == nil { + dest.Paths = &spec3.Paths{} + } + + for k, v := range source.Components.Schemas { + if dest.Components == nil { + dest.Components = &spec3.Components{} + } + if dest.Components.Schemas == nil { + dest.Components.Schemas = map[string]*spec.Schema{} + } + if _, exists := dest.Components.Schemas[k]; exists { + klog.Warningf("Should not happen: OpenAPI V3 merge schema conflict on %s", k) + } + dest.Components.Schemas[k] = v + } + for k, v := range source.Paths.Paths { + if dest.Paths.Paths == nil { + dest.Paths.Paths = map[string]*spec3.Path{} + } + dest.Paths.Paths[k] = v + } +} diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/controller.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/controller.go index efd18939529..a99d95c6ef2 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/controller.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/controller.go @@ -202,7 +202,7 @@ func buildVersionSpecs(crd *apiextensionsv1.CustomResourceDefinition, oldSpecs m continue } // Defaults are not pruned here, but before being served. - spec, err := builder.BuildSwagger(crd, v.Name, builder.Options{V2: true}) + spec, err := builder.BuildOpenAPIV2(crd, v.Name, builder.Options{V2: true}) if err != nil { return nil, false, err } diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapiv3/controller.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapiv3/controller.go new file mode 100644 index 00000000000..aeb85f6ba92 --- /dev/null +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapiv3/controller.go @@ -0,0 +1,272 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package openapiv3 + +import ( + "fmt" + "reflect" + "sync" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "k8s.io/kube-openapi/pkg/handler3" + "k8s.io/kube-openapi/pkg/spec3" + + apiextensionshelpers "k8s.io/apiextensions-apiserver/pkg/apihelpers" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + informers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1" + listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1" + "k8s.io/apiextensions-apiserver/pkg/controller/openapi/builder" +) + +// Controller watches CustomResourceDefinitions and publishes OpenAPI v3 +type Controller struct { + crdLister listers.CustomResourceDefinitionLister + crdsSynced cache.InformerSynced + + // To allow injection for testing. + syncFn func(string) error + + queue workqueue.RateLimitingInterface + + openAPIV3Service *handler3.OpenAPIService + + // specs per version and per CRD name + lock sync.Mutex + specsByGVandName map[schema.GroupVersion]map[string]*spec3.OpenAPI +} + +// NewController creates a new Controller with input CustomResourceDefinition informer +func NewController(crdInformer informers.CustomResourceDefinitionInformer) *Controller { + c := &Controller{ + crdLister: crdInformer.Lister(), + crdsSynced: crdInformer.Informer().HasSynced, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd_openapi_v3_controller"), + specsByGVandName: map[schema.GroupVersion]map[string]*spec3.OpenAPI{}, + } + + crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.addCustomResourceDefinition, + UpdateFunc: c.updateCustomResourceDefinition, + DeleteFunc: c.deleteCustomResourceDefinition, + }) + + c.syncFn = c.sync + return c +} + +// Run sets openAPIAggregationManager and starts workers +func (c *Controller) Run(openAPIV3Service *handler3.OpenAPIService, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + defer klog.Infof("Shutting down OpenAPI V3 controller") + + klog.Infof("Starting OpenAPI V3 controller") + + c.openAPIV3Service = openAPIV3Service + + if !cache.WaitForCacheSync(stopCh, c.crdsSynced) { + utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + return + } + + crds, err := c.crdLister.List(labels.Everything()) + if err != nil { + utilruntime.HandleError(fmt.Errorf("failed to initially list all CRDs: %v", err)) + return + } + for _, crd := range crds { + if !apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) { + continue + } + for _, v := range crd.Spec.Versions { + if !v.Served { + continue + } + c.buildV3Spec(crd, crd.Name, v.Name) + } + } + + // only start one worker thread since its a slow moving API + go wait.Until(c.runWorker, time.Second, stopCh) + + <-stopCh +} + +func (c *Controller) runWorker() { + for c.processNextWorkItem() { + } +} + +func (c *Controller) processNextWorkItem() bool { + key, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(key) + + // log slow aggregations + start := time.Now() + defer func() { + elapsed := time.Since(start) + if elapsed > time.Second { + klog.Warningf("slow openapi aggregation of %q: %s", key.(string), elapsed) + } + }() + + err := c.syncFn(key.(string)) + if err == nil { + c.queue.Forget(key) + return true + } + + utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err)) + c.queue.AddRateLimited(key) + return true +} + +func (c *Controller) sync(name string) error { + c.lock.Lock() + defer c.lock.Unlock() + + crd, err := c.crdLister.Get(name) + if err != nil && !errors.IsNotFound(err) { + return err + } + + if errors.IsNotFound(err) || !apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) { + c.deleteCRD(name) + return nil + } + + for _, v := range crd.Spec.Versions { + if !v.Served { + continue + } + c.buildV3Spec(crd, name, v.Name) + } + + return nil +} + +func (c *Controller) deleteCRD(name string) { + for gv, crdListForGV := range c.specsByGVandName { + _, needOpenAPIUpdate := crdListForGV[name] + if needOpenAPIUpdate { + delete(crdListForGV, name) + if len(crdListForGV) == 0 { + delete(c.specsByGVandName, gv) + } + c.updateGroupVersion(gv) + } + } +} + +func (c *Controller) updateGroupVersion(gv schema.GroupVersion) error { + if _, ok := c.specsByGVandName[gv]; !ok { + c.openAPIV3Service.DeleteGroupVersion(groupVersionToOpenAPIV3Path(gv)) + return nil + } + + var specs []*spec3.OpenAPI + for _, spec := range c.specsByGVandName[gv] { + specs = append(specs, spec) + } + + mergedSpec, err := builder.MergeSpecsV3(specs...) + if err != nil { + return fmt.Errorf("failed to merge specs: %v", err) + } + + c.openAPIV3Service.UpdateGroupVersion(groupVersionToOpenAPIV3Path(gv), mergedSpec) + return nil +} + +func (c *Controller) updateCRDSpec(crd *apiextensionsv1.CustomResourceDefinition, name, versionName string, v3 *spec3.OpenAPI) error { + gv := schema.GroupVersion{ + Group: crd.Spec.Group, + Version: versionName, + } + + _, ok := c.specsByGVandName[gv] + if !ok { + c.specsByGVandName[gv] = map[string]*spec3.OpenAPI{} + } + + oldSpec, ok := c.specsByGVandName[gv][name] + if ok { + if reflect.DeepEqual(oldSpec, v3) { + // no changes to CRD + return nil + } + } + c.specsByGVandName[gv][name] = v3 + + return c.updateGroupVersion(gv) +} + +func (c *Controller) buildV3Spec(crd *apiextensionsv1.CustomResourceDefinition, name, versionName string) error { + v3, err := builder.BuildOpenAPIV3(crd, versionName, builder.Options{V2: false}) + + if err != nil { + return err + } + + c.updateCRDSpec(crd, name, versionName, v3) + return nil +} + +func (c *Controller) addCustomResourceDefinition(obj interface{}) { + castObj := obj.(*apiextensionsv1.CustomResourceDefinition) + klog.V(4).Infof("Adding customresourcedefinition %s", castObj.Name) + c.enqueue(castObj) +} + +func (c *Controller) updateCustomResourceDefinition(oldObj, newObj interface{}) { + castNewObj := newObj.(*apiextensionsv1.CustomResourceDefinition) + klog.V(4).Infof("Updating customresourcedefinition %s", castNewObj.Name) + c.enqueue(castNewObj) +} + +func (c *Controller) deleteCustomResourceDefinition(obj interface{}) { + castObj, ok := obj.(*apiextensionsv1.CustomResourceDefinition) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + klog.Errorf("Couldn't get object from tombstone %#v", obj) + return + } + castObj, ok = tombstone.Obj.(*apiextensionsv1.CustomResourceDefinition) + if !ok { + klog.Errorf("Tombstone contained object that is not expected %#v", obj) + return + } + } + klog.V(4).Infof("Deleting customresourcedefinition %q", castObj.Name) + c.enqueue(castObj) +} + +func (c *Controller) enqueue(obj *apiextensionsv1.CustomResourceDefinition) { + c.queue.Add(obj.Name) +} diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapiv3/util.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapiv3/util.go new file mode 100644 index 00000000000..56370f3790d --- /dev/null +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapiv3/util.go @@ -0,0 +1,25 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package openapiv3 + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" +) + +func groupVersionToOpenAPIV3Path(gv schema.GroupVersion) string { + return "apis/" + gv.Group + "/" + gv.Version +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 4308e835d0a..954c61756df 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -49,9 +49,10 @@ import ( utilopenapi "k8s.io/apiserver/pkg/util/openapi" restclient "k8s.io/client-go/rest" "k8s.io/klog/v2" - openapibuilder "k8s.io/kube-openapi/pkg/builder" + openapibuilder2 "k8s.io/kube-openapi/pkg/builder" openapicommon "k8s.io/kube-openapi/pkg/common" "k8s.io/kube-openapi/pkg/handler" + "k8s.io/kube-openapi/pkg/handler3" openapiutil "k8s.io/kube-openapi/pkg/util" openapiproto "k8s.io/kube-openapi/pkg/util/proto" "k8s.io/kube-openapi/pkg/validation/spec" @@ -144,6 +145,10 @@ type GenericAPIServer struct { // It is set during PrepareRun if `openAPIConfig` is non-nil unless `skipOpenAPIInstallation` is true. OpenAPIVersionedService *handler.OpenAPIService + // OpenAPIV3VersionedService controls the /openapi/v3 endpoint and can be used to update the served spec. + // It is set during PrepareRun if `openAPIConfig` is non-nil unless `skipOpenAPIInstallation` is true. + OpenAPIV3VersionedService *handler3.OpenAPIService + // StaticOpenAPISpec is the spec derived from the restful container endpoints. // It is set during PrepareRun. StaticOpenAPISpec *spec.Swagger @@ -345,7 +350,12 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer { if s.openAPIConfig != nil && !s.skipOpenAPIInstallation { s.OpenAPIVersionedService, s.StaticOpenAPISpec = routes.OpenAPI{ Config: s.openAPIConfig, - }.Install(s.Handler.GoRestfulContainer, s.Handler.NonGoRestfulMux) + }.InstallV2(s.Handler.GoRestfulContainer, s.Handler.NonGoRestfulMux) + if utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) { + s.OpenAPIV3VersionedService = routes.OpenAPI{ + Config: s.openAPIConfig, + }.InstallV3(s.Handler.GoRestfulContainer, s.Handler.NonGoRestfulMux) + } } s.installHealthz() @@ -706,7 +716,7 @@ func (s *GenericAPIServer) getOpenAPIModels(apiPrefix string, apiGroupInfos ...* } // Build the openapi definitions for those resources and convert it to proto models - openAPISpec, err := openapibuilder.BuildOpenAPIDefinitionsForResources(s.openAPIConfig, resourceNames...) + openAPISpec, err := openapibuilder2.BuildOpenAPIDefinitionsForResources(s.openAPIConfig, resourceNames...) if err != nil { return nil, err } diff --git a/staging/src/k8s.io/apiserver/pkg/server/routes/openapi.go b/staging/src/k8s.io/apiserver/pkg/server/routes/openapi.go index 4be4d03fc21..44e463532ae 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/routes/openapi.go +++ b/staging/src/k8s.io/apiserver/pkg/server/routes/openapi.go @@ -21,9 +21,11 @@ import ( "k8s.io/klog/v2" "k8s.io/apiserver/pkg/server/mux" - "k8s.io/kube-openapi/pkg/builder" + builder2 "k8s.io/kube-openapi/pkg/builder" + "k8s.io/kube-openapi/pkg/builder3" "k8s.io/kube-openapi/pkg/common" "k8s.io/kube-openapi/pkg/handler" + "k8s.io/kube-openapi/pkg/handler3" "k8s.io/kube-openapi/pkg/validation/spec" ) @@ -33,8 +35,8 @@ type OpenAPI struct { } // Install adds the SwaggerUI webservice to the given mux. -func (oa OpenAPI) Install(c *restful.Container, mux *mux.PathRecorderMux) (*handler.OpenAPIService, *spec.Swagger) { - spec, err := builder.BuildOpenAPISpec(c.RegisteredWebServices(), oa.Config) +func (oa OpenAPI) InstallV2(c *restful.Container, mux *mux.PathRecorderMux) (*handler.OpenAPIService, *spec.Swagger) { + spec, err := builder2.BuildOpenAPISpec(c.RegisteredWebServices(), oa.Config) if err != nil { klog.Fatalf("Failed to build open api spec for root: %v", err) } @@ -51,3 +53,34 @@ func (oa OpenAPI) Install(c *restful.Container, mux *mux.PathRecorderMux) (*hand return openAPIVersionedService, spec } + +// InstallV3 adds the static group/versions defined in the RegisteredWebServices to the OpenAPI v3 spec +func (oa OpenAPI) InstallV3(c *restful.Container, mux *mux.PathRecorderMux) *handler3.OpenAPIService { + openAPIVersionedService, err := handler3.NewOpenAPIService(nil) + if err != nil { + klog.Fatalf("Failed to create OpenAPIService: %v", err) + } + + err = openAPIVersionedService.RegisterOpenAPIV3VersionedService("/openapi/v3", mux) + if err != nil { + klog.Fatalf("Failed to register versioned open api spec for root: %v", err) + } + + grouped := make(map[string][]*restful.WebService) + + for _, t := range c.RegisteredWebServices() { + // Strip the "/" prefix from the name + gvName := t.RootPath()[1:] + grouped[gvName] = []*restful.WebService{t} + } + + for gv, ws := range grouped { + spec, err := builder3.BuildOpenAPISpec(ws, oa.Config) + if err != nil { + klog.Errorf("Failed to build OpenAPI v3 for group %s, %q", gv, err) + + } + openAPIVersionedService.UpdateGroupVersion(gv, spec) + } + return openAPIVersionedService +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index 4558130c364..45223e53cd8 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/features" genericfeatures "k8s.io/apiserver/pkg/features" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/egressselector" @@ -46,6 +47,8 @@ import ( listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1" openapicontroller "k8s.io/kube-aggregator/pkg/controllers/openapi" openapiaggregator "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator" + openapiv3controller "k8s.io/kube-aggregator/pkg/controllers/openapiv3" + openapiv3aggregator "k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator" statuscontrollers "k8s.io/kube-aggregator/pkg/controllers/status" apiservicerest "k8s.io/kube-aggregator/pkg/registry/apiservice/rest" ) @@ -141,9 +144,12 @@ type APIAggregator struct { // Enable swagger and/or OpenAPI if these configs are non-nil. openAPIConfig *openapicommon.Config - // openAPIAggregationController downloads and merges OpenAPI specs. + // openAPIAggregationController downloads and merges OpenAPI v2 specs. openAPIAggregationController *openapicontroller.AggregationController + // openAPIV3AggregationController downloads and caches OpenAPI v3 specs. + openAPIV3AggregationController *openapiv3controller.AggregationController + // egressSelector selects the proper egress dialer to communicate with the custom apiserver // overwrites proxyTransport dialer if not nil egressSelector *egressselector.EgressSelector @@ -344,6 +350,9 @@ func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) { if s.openAPIConfig != nil { s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapi-controller", func(context genericapiserver.PostStartHookContext) error { go s.openAPIAggregationController.Run(context.StopCh) + if utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) { + go s.openAPIV3AggregationController.Run(context.StopCh) + } return nil }) } @@ -363,6 +372,18 @@ func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) { return preparedAPIAggregator{}, err } s.openAPIAggregationController = openapicontroller.NewAggregationController(&specDownloader, openAPIAggregator) + if utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) { + specDownloaderV3 := openapiv3aggregator.NewDownloader() + openAPIV3Aggregator, err := openapiv3aggregator.BuildAndRegisterAggregator( + specDownloaderV3, + s.GenericAPIServer.NextDelegate(), + s.GenericAPIServer.Handler.NonGoRestfulMux) + if err != nil { + return preparedAPIAggregator{}, err + } + _ = openAPIV3Aggregator + s.openAPIV3AggregationController = openapiv3controller.NewAggregationController(openAPIV3Aggregator) + } } return preparedAPIAggregator{APIAggregator: s, runnable: prepared}, nil @@ -382,6 +403,9 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error { if s.openAPIAggregationController != nil { s.openAPIAggregationController.UpdateAPIService(proxyHandler, apiService) } + if s.openAPIV3AggregationController != nil { + s.openAPIV3AggregationController.UpdateAPIService(proxyHandler, apiService) + } return nil } @@ -403,6 +427,9 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error { if s.openAPIAggregationController != nil { s.openAPIAggregationController.AddAPIService(proxyHandler, apiService) } + if s.openAPIV3AggregationController != nil { + s.openAPIV3AggregationController.AddAPIService(proxyHandler, apiService) + } s.proxyHandlers[apiService.Name] = proxyHandler s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandler) s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler) @@ -447,6 +474,9 @@ func (s *APIAggregator) RemoveAPIService(apiServiceName string) { if s.openAPIAggregationController != nil { s.openAPIAggregationController.RemoveAPIService(apiServiceName) } + if s.openAPIV3AggregationController != nil { + s.openAPIAggregationController.RemoveAPIService(apiServiceName) + } delete(s.proxyHandlers, apiServiceName) // TODO unregister group level discovery when there are no more versions for the group diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator.go new file mode 100644 index 00000000000..519a7900e30 --- /dev/null +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator.go @@ -0,0 +1,206 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aggregator + +import ( + "fmt" + "net/http" + "strings" + "sync" + "time" + + "k8s.io/apiserver/pkg/server" + v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" + "k8s.io/kube-openapi/pkg/common" + "k8s.io/kube-openapi/pkg/handler3" + "k8s.io/kube-openapi/pkg/spec3" +) + +// SpecAggregator calls out to http handlers of APIServices and caches specs. It keeps state of the last +// known specs including the http etag. +// TODO(jefftree): remove the downloading and caching and proxy directly to the APIServices. This is possible because we +// don't have to merge here, which is cpu intensive in v2 +type SpecAggregator interface { + AddUpdateAPIService(handler http.Handler, apiService *v1.APIService) + UpdateAPIServiceSpec(apiServiceName string) error + RemoveAPIServiceSpec(apiServiceName string) + GetAPIServiceNames() []string +} + +const ( + aggregatorUser = "system:aggregator" + specDownloadTimeout = 60 * time.Second + localDelegateChainNamePrefix = "k8s_internal_local_delegation_chain_" + localDelegateChainNamePattern = localDelegateChainNamePrefix + "%010d" +) + +// IsLocalAPIService returns true for local specs from delegates. +func IsLocalAPIService(apiServiceName string) bool { + return strings.HasPrefix(apiServiceName, localDelegateChainNamePrefix) +} + +// GetAPIServicesName returns the names of APIServices recorded in openAPIV3Specs. +// We use this function to pass the names of local APIServices to the controller in this package, +// so that the controller can periodically sync the OpenAPI spec from delegation API servers. +func (s *specAggregator) GetAPIServiceNames() []string { + s.rwMutex.Lock() + defer s.rwMutex.Unlock() + + names := make([]string, len(s.openAPIV3Specs)) + for key := range s.openAPIV3Specs { + names = append(names, key) + } + return names +} + +// BuildAndRegisterAggregator registered OpenAPI aggregator handler. This function is not thread safe as it only being called on startup. +func BuildAndRegisterAggregator(downloader Downloader, delegationTarget server.DelegationTarget, pathHandler common.PathHandlerByGroupVersion) (SpecAggregator, error) { + var err error + s := &specAggregator{ + openAPIV3Specs: map[string]*openAPIV3APIServiceInfo{}, + downloader: downloader, + } + + s.openAPIV3VersionedService, err = handler3.NewOpenAPIService(nil) + if err != nil { + return nil, err + } + err = s.openAPIV3VersionedService.RegisterOpenAPIV3VersionedService("/openapi/v3", pathHandler) + if err != nil { + return nil, err + } + + i := 1 + for delegate := delegationTarget; delegate != nil; delegate = delegate.NextDelegate() { + handler := delegate.UnprotectedHandler() + if handler == nil { + continue + } + + apiServiceName := fmt.Sprintf(localDelegateChainNamePattern, i) + localAPIService := v1.APIService{} + localAPIService.Name = apiServiceName + s.AddUpdateAPIService(handler, &localAPIService) + s.UpdateAPIServiceSpec(apiServiceName) + i++ + } + + return s, nil +} + +// AddUpdateAPIService adds or updates the api service. It is thread safe. +func (s *specAggregator) AddUpdateAPIService(handler http.Handler, apiservice *v1.APIService) { + s.rwMutex.Lock() + defer s.rwMutex.Unlock() + // If the APIService is being updated, use the existing struct. + if apiServiceInfo, ok := s.openAPIV3Specs[apiservice.Name]; ok { + apiServiceInfo.apiService = *apiservice + apiServiceInfo.handler = handler + } + s.openAPIV3Specs[apiservice.Name] = &openAPIV3APIServiceInfo{ + apiService: *apiservice, + handler: handler, + specs: make(map[string]*openAPIV3SpecInfo), + } +} + +// UpdateAPIServiceSpec updates all the OpenAPI v3 specs that the APIService serves. +// It is thread safe. +func (s *specAggregator) UpdateAPIServiceSpec(apiServiceName string) error { + s.rwMutex.Lock() + defer s.rwMutex.Unlock() + + apiService, exists := s.openAPIV3Specs[apiServiceName] + if !exists { + return fmt.Errorf("APIService %s does not exist for update", apiServiceName) + } + + // Pass a list of old etags to the Downloader to prevent transfers if etags match + etagList := make(map[string]string) + for gv, specInfo := range apiService.specs { + etagList[gv] = specInfo.etag + } + groups, err := s.downloader.Download(apiService.handler, etagList) + if err != nil { + return err + } + + // Remove any groups that do not exist anymore + for group := range s.openAPIV3Specs[apiServiceName].specs { + if _, exists := groups[group]; !exists { + s.openAPIV3VersionedService.DeleteGroupVersion(group) + delete(s.openAPIV3Specs[apiServiceName].specs, group) + } + } + + for group, info := range groups { + if info.spec == nil { + continue + } + + // If ETag has not changed, no update is necessary + oldInfo, exists := s.openAPIV3Specs[apiServiceName].specs[group] + if exists && oldInfo.etag == info.etag { + continue + } + s.openAPIV3Specs[apiServiceName].specs[group] = &openAPIV3SpecInfo{ + spec: info.spec, + etag: info.etag, + } + s.openAPIV3VersionedService.UpdateGroupVersion(group, info.spec) + } + return nil +} + +type specAggregator struct { + // mutex protects all members of this struct. + rwMutex sync.RWMutex + + // OpenAPI V3 specs by APIService name + openAPIV3Specs map[string]*openAPIV3APIServiceInfo + // provided for dynamic OpenAPI spec + openAPIV3VersionedService *handler3.OpenAPIService + + // For downloading the OpenAPI v3 specs from apiservices + downloader Downloader +} + +var _ SpecAggregator = &specAggregator{} + +type openAPIV3APIServiceInfo struct { + apiService v1.APIService + handler http.Handler + specs map[string]*openAPIV3SpecInfo +} + +type openAPIV3SpecInfo struct { + spec *spec3.OpenAPI + etag string +} + +// RemoveAPIServiceSpec removes an api service from the OpenAPI map. If it does not exist, no error is returned. +// It is thread safe. +func (s *specAggregator) RemoveAPIServiceSpec(apiServiceName string) { + s.rwMutex.Lock() + defer s.rwMutex.Unlock() + if apiServiceInfo, ok := s.openAPIV3Specs[apiServiceName]; ok { + for gv := range apiServiceInfo.specs { + s.openAPIV3VersionedService.DeleteGroupVersion(gv) + } + delete(s.openAPIV3Specs, apiServiceName) + } +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader.go new file mode 100644 index 00000000000..b91f9e64aa3 --- /dev/null +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader.go @@ -0,0 +1,169 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aggregator + +import ( + "encoding/json" + "fmt" + "net/http" + + "k8s.io/apiserver/pkg/authentication/user" + "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/klog/v2" + "k8s.io/kube-openapi/pkg/spec3" +) + +// Downloader is the OpenAPI downloader type. It will try to download spec from /openapi/v3 and /openap/v3// endpoints. +type Downloader struct { +} + +// NewDownloader creates a new OpenAPI Downloader. +func NewDownloader() Downloader { + return Downloader{} +} + +func (s *Downloader) handlerWithUser(handler http.Handler, info user.Info) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + req = req.WithContext(request.WithUser(req.Context(), info)) + handler.ServeHTTP(w, req) + }) +} + +// gvList is a struct for the response of the /openapi/v3 endpoint to unmarshal into +type gvList struct { + Paths []string `json:"Paths"` +} + +// SpecETag is a OpenAPI v3 spec and etag pair for the endpoint of each OpenAPI group/version +type SpecETag struct { + spec *spec3.OpenAPI + etag string +} + +// Download downloads OpenAPI v3 for all groups of a given handler +func (s *Downloader) Download(handler http.Handler, etagList map[string]string) (returnSpec map[string]*SpecETag, err error) { + // TODO(jefftree): https://github.com/kubernetes/kubernetes/pull/105945#issuecomment-966455034 + // Move to proxy request in the aggregator and let the APIServices serve the OpenAPI directly + handler = s.handlerWithUser(handler, &user.DefaultInfo{Name: aggregatorUser}) + handler = http.TimeoutHandler(handler, specDownloadTimeout, "request timed out") + + req, err := http.NewRequest("GET", "/openapi/v3", nil) + if err != nil { + return nil, err + } + req.Header.Add("Accept", "application/json") + + writer := newInMemoryResponseWriter() + handler.ServeHTTP(writer, req) + + switch writer.respCode { + case http.StatusNotFound: + // Gracefully skip 404, assuming the server won't provide any spec + return nil, nil + case http.StatusOK: + groups := gvList{} + aggregated := make(map[string]*SpecETag) + + if err := json.Unmarshal(writer.data, &groups); err != nil { + return nil, err + } + for _, path := range groups.Paths { + reqPath := fmt.Sprintf("/openapi/v3/%s", path) + req, err := http.NewRequest("GET", reqPath, nil) + if err != nil { + return nil, err + } + req.Header.Add("Accept", "application/json") + oldEtag, ok := etagList[path] + if ok { + req.Header.Add("If-None-Match", oldEtag) + } + openAPIWriter := newInMemoryResponseWriter() + handler.ServeHTTP(openAPIWriter, req) + + switch openAPIWriter.respCode { + case http.StatusNotFound: + continue + case http.StatusNotModified: + aggregated[path] = &SpecETag{ + etag: oldEtag, + } + case http.StatusOK: + var spec spec3.OpenAPI + // TODO|jefftree: For OpenAPI v3 Beta, if the v3 spec is empty then + // we should request the v2 endpoint and convert it to v3 + if len(openAPIWriter.data) > 0 { + err = json.Unmarshal(openAPIWriter.data, &spec) + if err != nil { + return nil, err + } + etag := openAPIWriter.Header().Get("Etag") + aggregated[path] = &SpecETag{ + spec: &spec, + etag: etag, + } + } + default: + klog.Errorf("Error: unknown status %v", openAPIWriter.respCode) + } + } + + return aggregated, nil + default: + return nil, fmt.Errorf("failed to retrieve openAPI spec, http error: %s", writer.String()) + } +} + +// inMemoryResponseWriter is a http.Writer that keep the response in memory. +type inMemoryResponseWriter struct { + writeHeaderCalled bool + header http.Header + respCode int + data []byte +} + +func newInMemoryResponseWriter() *inMemoryResponseWriter { + return &inMemoryResponseWriter{header: http.Header{}} +} + +func (r *inMemoryResponseWriter) Header() http.Header { + return r.header +} + +func (r *inMemoryResponseWriter) WriteHeader(code int) { + r.writeHeaderCalled = true + r.respCode = code +} + +func (r *inMemoryResponseWriter) Write(in []byte) (int, error) { + if !r.writeHeaderCalled { + r.WriteHeader(http.StatusOK) + } + r.data = append(r.data, in...) + return len(in), nil +} + +func (r *inMemoryResponseWriter) String() string { + s := fmt.Sprintf("ResponseCode: %d", r.respCode) + if r.data != nil { + s += fmt.Sprintf(", Body: %s", string(r.data)) + } + if r.header != nil { + s += fmt.Sprintf(", Header: %s", r.header) + } + return s +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader_test.go new file mode 100644 index 00000000000..7e5043bbe52 --- /dev/null +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader_test.go @@ -0,0 +1,102 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aggregator + +import ( + "encoding/json" + "fmt" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" +) + +type handlerTest struct { + etag string + data []byte +} + +var _ http.Handler = handlerTest{} + +func (h handlerTest) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Create an APIService with a handler for one group/version + group := make(map[string][]string) + group["Paths"] = []string{"apis/group/version"} + j, _ := json.Marshal(group) + if r.URL.Path == "/openapi/v3" { + w.Write(j) + return + } + + if r.URL.Path == "/openapi/v3/apis/group/version" { + if len(h.etag) > 0 { + w.Header().Add("Etag", h.etag) + } + ifNoneMatches := r.Header["If-None-Match"] + for _, match := range ifNoneMatches { + if match == h.etag { + w.WriteHeader(http.StatusNotModified) + return + } + } + w.Write(h.data) + } +} + +func assertDownloadedSpec(gvSpec map[string]*SpecETag, err error, expectedSpecID string, expectedEtag string) error { + if err != nil { + return fmt.Errorf("downloadOpenAPISpec failed : %s", err) + } + specInfo, ok := gvSpec["apis/group/version"] + if !ok { + if expectedSpecID == "" { + return nil + } + return fmt.Errorf("expected to download spec, no spec downloaded") + } + + if specInfo.spec != nil && expectedSpecID == "" { + return fmt.Errorf("expected ID %s, actual ID %s", expectedSpecID, specInfo.spec.Version) + } + + if specInfo.spec != nil && specInfo.spec.Version != expectedSpecID { + return fmt.Errorf("expected ID %s, actual ID %s", expectedSpecID, specInfo.spec.Version) + } + if specInfo.etag != expectedEtag { + return fmt.Errorf("expected ETag '%s', actual ETag '%s'", expectedEtag, specInfo.etag) + } + return nil +} + +func TestDownloadOpenAPISpec(t *testing.T) { + s := Downloader{} + + // Test with eTag + gvSpec, err := s.Download( + handlerTest{data: []byte("{\"openapi\": \"test\"}"), etag: "etag_test"}, map[string]string{}) + assert.NoError(t, assertDownloadedSpec(gvSpec, err, "test", "etag_test")) + + // Test not modified + gvSpec, err = s.Download( + handlerTest{data: []byte("{\"openapi\": \"test\"}"), etag: "etag_test"}, map[string]string{"apis/group/version": "etag_test"}) + assert.NoError(t, assertDownloadedSpec(gvSpec, err, "", "etag_test")) + + // Test different eTags + gvSpec, err = s.Download( + handlerTest{data: []byte("{\"openapi\": \"test\"}"), etag: "etag_test1"}, map[string]string{"apis/group/version": "etag_test2"}) + assert.NoError(t, assertDownloadedSpec(gvSpec, err, "test", "etag_test1")) +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/controller.go new file mode 100644 index 00000000000..7e981ca1da6 --- /dev/null +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/controller.go @@ -0,0 +1,175 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package openapiv3 + +import ( + "fmt" + "net/http" + "time" + + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" + "k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator" +) + +const ( + successfulUpdateDelay = time.Minute + successfulUpdateDelayLocal = time.Second + failedUpdateMaxExpDelay = time.Hour +) + +type syncAction int + +const ( + syncRequeue syncAction = iota + syncRequeueRateLimited + syncNothing +) + +// AggregationController periodically check for changes in OpenAPI specs of APIServices and update/remove +// them if necessary. +type AggregationController struct { + openAPIAggregationManager aggregator.SpecAggregator + queue workqueue.RateLimitingInterface + + // To allow injection for testing. + syncHandler func(key string) (syncAction, error) +} + +// NewAggregationController creates new OpenAPI aggregation controller. +func NewAggregationController(openAPIAggregationManager aggregator.SpecAggregator) *AggregationController { + c := &AggregationController{ + openAPIAggregationManager: openAPIAggregationManager, + queue: workqueue.NewNamedRateLimitingQueue( + workqueue.NewItemExponentialFailureRateLimiter(successfulUpdateDelay, failedUpdateMaxExpDelay), + "open_api_v3_aggregation_controller", + ), + } + + c.syncHandler = c.sync + + // update each service at least once, also those which are not coming from APIServices, namely local services + for _, name := range openAPIAggregationManager.GetAPIServiceNames() { + c.queue.AddAfter(name, time.Second) + } + + return c +} + +// Run starts OpenAPI AggregationController +func (c *AggregationController) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + + klog.Info("Starting OpenAPI V3 AggregationController") + defer klog.Info("Shutting down OpenAPI V3 AggregationController") + + go wait.Until(c.runWorker, time.Second, stopCh) + + <-stopCh +} + +func (c *AggregationController) runWorker() { + for c.processNextWorkItem() { + } +} + +// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit. +func (c *AggregationController) processNextWorkItem() bool { + key, quit := c.queue.Get() + defer c.queue.Done(key) + if quit { + return false + } + + if aggregator.IsLocalAPIService(key.(string)) { + // for local delegation targets that are aggregated once per second, log at + // higher level to avoid flooding the log + klog.V(6).Infof("OpenAPI AggregationController: Processing item %s", key) + } else { + klog.V(4).Infof("OpenAPI AggregationController: Processing item %s", key) + } + + action, err := c.syncHandler(key.(string)) + if err == nil { + c.queue.Forget(key) + } else { + utilruntime.HandleError(fmt.Errorf("loading OpenAPI spec for %q failed with: %v", key, err)) + } + + switch action { + case syncRequeue: + if aggregator.IsLocalAPIService(key.(string)) { + klog.V(7).Infof("OpenAPI AggregationController: action for local item %s: Requeue after %s.", key, successfulUpdateDelayLocal) + c.queue.AddAfter(key, successfulUpdateDelayLocal) + } else { + klog.V(7).Infof("OpenAPI AggregationController: action for item %s: Requeue.", key) + c.queue.AddAfter(key, successfulUpdateDelay) + } + case syncRequeueRateLimited: + klog.Infof("OpenAPI AggregationController: action for item %s: Rate Limited Requeue.", key) + c.queue.AddRateLimited(key) + case syncNothing: + klog.Infof("OpenAPI AggregationController: action for item %s: Nothing (removed from the queue).", key) + } + + return true +} + +func (c *AggregationController) sync(key string) (syncAction, error) { + err := c.openAPIAggregationManager.UpdateAPIServiceSpec(key) + switch { + case err != nil: + return syncRequeueRateLimited, err + } + return syncRequeue, nil +} + +// AddAPIService adds a new API Service to OpenAPI Aggregation. +func (c *AggregationController) AddAPIService(handler http.Handler, apiService *v1.APIService) { + if apiService.Spec.Service == nil { + return + } + c.openAPIAggregationManager.AddUpdateAPIService(handler, apiService) + c.queue.AddAfter(apiService.Name, time.Second) +} + +// UpdateAPIService updates API Service's info and handler. +func (c *AggregationController) UpdateAPIService(handler http.Handler, apiService *v1.APIService) { + if apiService.Spec.Service == nil { + return + } + c.openAPIAggregationManager.AddUpdateAPIService(handler, apiService) + key := apiService.Name + if c.queue.NumRequeues(key) > 0 { + // The item has failed before. Remove it from failure queue and + // update it in a second + c.queue.Forget(key) + c.queue.AddAfter(key, time.Second) + } +} + +// RemoveAPIService removes API Service from OpenAPI Aggregation Controller. +func (c *AggregationController) RemoveAPIService(apiServiceName string) { + c.openAPIAggregationManager.RemoveAPIServiceSpec(apiServiceName) + // This will only remove it if it was failing before. If it was successful, processNextWorkItem will figure it out + // and will not add it again to the queue. + c.queue.Forget(apiServiceName) +}