mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-15 14:14:39 +00:00
Merge pull request #50864 from mbohlool/update_openapi_aggr
Automatic merge from submit-queue Improvements to OpenAPI aggregation Fixes #50863 Fixes #50011 Related: #50896
This commit is contained in:
12
Godeps/Godeps.json
generated
12
Godeps/Godeps.json
generated
@@ -3099,27 +3099,27 @@
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/kube-openapi/pkg/aggregator",
|
||||
"Rev": "80f07ef71bb4f781233c65aa8d0369e4ecafab87"
|
||||
"Rev": "868f2f29720b192240e18284659231b440f9cda5"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/kube-openapi/pkg/builder",
|
||||
"Rev": "80f07ef71bb4f781233c65aa8d0369e4ecafab87"
|
||||
"Rev": "868f2f29720b192240e18284659231b440f9cda5"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/kube-openapi/pkg/common",
|
||||
"Rev": "80f07ef71bb4f781233c65aa8d0369e4ecafab87"
|
||||
"Rev": "868f2f29720b192240e18284659231b440f9cda5"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/kube-openapi/pkg/generators",
|
||||
"Rev": "80f07ef71bb4f781233c65aa8d0369e4ecafab87"
|
||||
"Rev": "868f2f29720b192240e18284659231b440f9cda5"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/kube-openapi/pkg/handler",
|
||||
"Rev": "80f07ef71bb4f781233c65aa8d0369e4ecafab87"
|
||||
"Rev": "868f2f29720b192240e18284659231b440f9cda5"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/kube-openapi/pkg/util",
|
||||
"Rev": "80f07ef71bb4f781233c65aa8d0369e4ecafab87"
|
||||
"Rev": "868f2f29720b192240e18284659231b440f9cda5"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/utils/exec",
|
||||
|
File diff suppressed because it is too large
Load Diff
@@ -165,6 +165,9 @@ func CreateServerChain(runOptions *options.ServerRunOptions, stopCh <-chan struc
|
||||
// this wires up openapi
|
||||
kubeAPIServer.GenericAPIServer.PrepareRun()
|
||||
|
||||
// This will wire up openapi for extension api server
|
||||
apiExtensionsServer.GenericAPIServer.PrepareRun()
|
||||
|
||||
// aggregator comes last in the chain
|
||||
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, runOptions, versionedInformers, serviceResolver, proxyTransport)
|
||||
if err != nil {
|
||||
|
@@ -44,6 +44,7 @@ openapi_library(
|
||||
"k8s.io/api/settings/v1alpha1",
|
||||
"k8s.io/api/storage/v1",
|
||||
"k8s.io/api/storage/v1beta1",
|
||||
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1",
|
||||
"k8s.io/apimachinery/pkg/api/resource",
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1",
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1alpha1",
|
||||
|
@@ -20,4 +20,5 @@ limitations under the License.
|
||||
|
||||
// Package v1beta1 is the v1beta1 version of the API.
|
||||
// +groupName=apiextensions.k8s.io
|
||||
// +k8s:openapi-gen=true
|
||||
package v1beta1 // import "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
|
||||
|
@@ -405,6 +405,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
||||
requestContextMapper: c.RequestContextMapper,
|
||||
Serializer: c.Serializer,
|
||||
AuditBackend: c.AuditBackend,
|
||||
delegationTarget: delegationTarget,
|
||||
|
||||
minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
|
||||
|
||||
|
@@ -145,6 +145,9 @@ type GenericAPIServer struct {
|
||||
// enableAPIResponseCompression indicates whether API Responses should support compression
|
||||
// if the client requests it via Accept-Encoding
|
||||
enableAPIResponseCompression bool
|
||||
|
||||
// delegationTarget is the next delegate in the chain or nil
|
||||
delegationTarget DelegationTarget
|
||||
}
|
||||
|
||||
// DelegationTarget is an interface which allows for composition of API servers with top level handling that works
|
||||
@@ -165,6 +168,9 @@ type DelegationTarget interface {
|
||||
|
||||
// ListedPaths returns the paths for supporting an index
|
||||
ListedPaths() []string
|
||||
|
||||
// NextDelegate returns the next delegationTarget in the chain of delegations
|
||||
NextDelegate() DelegationTarget
|
||||
}
|
||||
|
||||
func (s *GenericAPIServer) UnprotectedHandler() http.Handler {
|
||||
@@ -181,6 +187,10 @@ func (s *GenericAPIServer) ListedPaths() []string {
|
||||
return s.listedPathProvider.ListedPaths()
|
||||
}
|
||||
|
||||
func (s *GenericAPIServer) NextDelegate() DelegationTarget {
|
||||
return s.delegationTarget
|
||||
}
|
||||
|
||||
var EmptyDelegate = emptyDelegate{
|
||||
requestContextMapper: apirequest.NewRequestContextMapper(),
|
||||
}
|
||||
@@ -204,6 +214,9 @@ func (s emptyDelegate) ListedPaths() []string {
|
||||
func (s emptyDelegate) RequestContextMapper() apirequest.RequestContextMapper {
|
||||
return s.requestContextMapper
|
||||
}
|
||||
func (s emptyDelegate) NextDelegate() DelegationTarget {
|
||||
return nil
|
||||
}
|
||||
|
||||
// RequestContextMapper is exposed so that third party resource storage can be build in a different location.
|
||||
// TODO refactor third party resource storage
|
||||
|
@@ -274,6 +274,10 @@
|
||||
"ImportPath": "github.com/pkg/errors",
|
||||
"Rev": "a22138067af1c4942683050411a841ade67fe1eb"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/pmezard/go-difflib/difflib",
|
||||
"Rev": "d8ed2627bdf02c080bf22230dbb337003b7aba2d"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/prometheus/client_golang/prometheus",
|
||||
"Rev": "e7e903064f5e9eb5da98208bae10b475d4db0f8c"
|
||||
@@ -310,6 +314,10 @@
|
||||
"ImportPath": "github.com/spf13/pflag",
|
||||
"Rev": "9ff6c6923cfffbcd502984b8e0c80539a94968b7"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/stretchr/testify/assert",
|
||||
"Rev": "f6abca593680b2315d2075e0f5e2a9751e3f431a"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/ugorji/go/codec",
|
||||
"Rev": "ded73eae5db7e7a0ef6f55aace87a2873c5d2b74"
|
||||
|
@@ -11,11 +11,9 @@ go_test(
|
||||
srcs = [
|
||||
"handler_apis_test.go",
|
||||
"handler_proxy_test.go",
|
||||
"openapi_aggregator_test.go",
|
||||
],
|
||||
library = ":go_default_library",
|
||||
deps = [
|
||||
"//vendor/github.com/go-openapi/spec:go_default_library",
|
||||
"//vendor/golang.org/x/net/websocket:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
@@ -37,12 +35,9 @@ go_library(
|
||||
"apiservice_controller.go",
|
||||
"handler_apis.go",
|
||||
"handler_proxy.go",
|
||||
"openapi_aggregator.go",
|
||||
"resolvers.go",
|
||||
],
|
||||
deps = [
|
||||
"//vendor/github.com/emicklei/go-restful:go_default_library",
|
||||
"//vendor/github.com/go-openapi/spec:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
@@ -60,7 +55,6 @@ go_library(
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/features:go_default_library",
|
||||
@@ -84,12 +78,9 @@ go_library(
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/controllers:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/controllers/openapi:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/controllers/status:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/registry/apiservice/etcd:go_default_library",
|
||||
"//vendor/k8s.io/kube-openapi/pkg/aggregator:go_default_library",
|
||||
"//vendor/k8s.io/kube-openapi/pkg/builder:go_default_library",
|
||||
"//vendor/k8s.io/kube-openapi/pkg/common:go_default_library",
|
||||
"//vendor/k8s.io/kube-openapi/pkg/handler:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@@ -17,7 +17,6 @@ limitations under the License.
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
@@ -27,7 +26,6 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
@@ -41,6 +39,7 @@ import (
|
||||
"k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset"
|
||||
informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion"
|
||||
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion"
|
||||
openapicontroller "k8s.io/kube-aggregator/pkg/controllers/openapi"
|
||||
statuscontrollers "k8s.io/kube-aggregator/pkg/controllers/status"
|
||||
apiservicestorage "k8s.io/kube-aggregator/pkg/registry/apiservice/etcd"
|
||||
)
|
||||
@@ -119,7 +118,7 @@ type APIAggregator struct {
|
||||
// Information needed to determine routing for the aggregator
|
||||
serviceResolver ServiceResolver
|
||||
|
||||
openAPIAggregator *openAPIAggregator
|
||||
openAPIAggregationController *openapicontroller.AggregationController
|
||||
}
|
||||
|
||||
type completedConfig struct {
|
||||
@@ -222,15 +221,22 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
|
||||
})
|
||||
|
||||
if openApiConfig != nil {
|
||||
s.openAPIAggregator, err = buildAndRegisterOpenAPIAggregator(
|
||||
s.delegateHandler,
|
||||
specDownloader := openapicontroller.NewDownloader(s.contextMapper)
|
||||
openAPIAggregator, err := openapicontroller.BuildAndRegisterAggregator(
|
||||
&specDownloader,
|
||||
delegationTarget,
|
||||
s.GenericAPIServer.Handler.GoRestfulContainer.RegisteredWebServices(),
|
||||
openApiConfig,
|
||||
s.GenericAPIServer.Handler.NonGoRestfulMux,
|
||||
s.contextMapper)
|
||||
s.GenericAPIServer.Handler.NonGoRestfulMux)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.openAPIAggregationController = openapicontroller.NewAggregationController(&specDownloader, openAPIAggregator)
|
||||
|
||||
s.GenericAPIServer.AddPostStartHook("apiservice-openapi-controller", func(context genericapiserver.PostStartHookContext) error {
|
||||
go s.openAPIAggregationController.Run(context.StopCh)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
return s, nil
|
||||
@@ -243,7 +249,10 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) er
|
||||
// since they are wired against listers because they require multiple resources to respond
|
||||
if proxyHandler, exists := s.proxyHandlers[apiService.Name]; exists {
|
||||
proxyHandler.updateAPIService(apiService)
|
||||
return s.openAPIAggregator.loadApiServiceSpec(proxyHandler, apiService)
|
||||
if s.openAPIAggregationController != nil {
|
||||
s.openAPIAggregationController.UpdateAPIService(proxyHandler, apiService)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
proxyPath := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version
|
||||
@@ -262,8 +271,8 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) er
|
||||
serviceResolver: s.serviceResolver,
|
||||
}
|
||||
proxyHandler.updateAPIService(apiService)
|
||||
if err := s.openAPIAggregator.loadApiServiceSpec(proxyHandler, apiService); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("unable to load OpenAPI spec for API service %s: %v", apiService.Name, err))
|
||||
if s.openAPIAggregationController != nil {
|
||||
s.openAPIAggregationController.AddAPIService(proxyHandler, apiService)
|
||||
}
|
||||
s.proxyHandlers[apiService.Name] = proxyHandler
|
||||
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandler)
|
||||
@@ -307,6 +316,9 @@ func (s *APIAggregator) RemoveAPIService(apiServiceName string) {
|
||||
}
|
||||
s.GenericAPIServer.Handler.NonGoRestfulMux.Unregister(proxyPath)
|
||||
s.GenericAPIServer.Handler.NonGoRestfulMux.Unregister(proxyPath + "/")
|
||||
if s.openAPIAggregationController != nil {
|
||||
s.openAPIAggregationController.RemoveAPIService(apiServiceName)
|
||||
}
|
||||
delete(s.proxyHandlers, apiServiceName)
|
||||
|
||||
// TODO unregister group level discovery when there are no more versions for the group
|
||||
|
@@ -1,270 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/emicklei/go-restful"
|
||||
"github.com/go-openapi/spec"
|
||||
|
||||
"k8s.io/apiserver/pkg/authentication/user"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
|
||||
"k8s.io/kube-openapi/pkg/aggregator"
|
||||
"k8s.io/kube-openapi/pkg/builder"
|
||||
"k8s.io/kube-openapi/pkg/common"
|
||||
"k8s.io/kube-openapi/pkg/handler"
|
||||
)
|
||||
|
||||
const (
|
||||
aggregatorUser = "system:aggregator"
|
||||
specDownloadTimeout = 60 * time.Second
|
||||
)
|
||||
|
||||
type openAPIAggregator struct {
|
||||
// Map of API Services' OpenAPI specs by their name
|
||||
openAPISpecs map[string]*openAPISpecInfo
|
||||
|
||||
// provided for dynamic OpenAPI spec
|
||||
openAPIService *handler.OpenAPIService
|
||||
|
||||
// Aggregator's OpenAPI spec (holds apiregistration group).
|
||||
aggregatorOpenAPISpec *spec.Swagger
|
||||
|
||||
// Local (in process) delegate's OpenAPI spec.
|
||||
inProcessDelegatesOpenAPISpec *spec.Swagger
|
||||
|
||||
contextMapper request.RequestContextMapper
|
||||
}
|
||||
|
||||
func buildAndRegisterOpenAPIAggregator(delegateHandler http.Handler, webServices []*restful.WebService, config *common.Config, pathHandler common.PathHandler, contextMapper request.RequestContextMapper) (s *openAPIAggregator, err error) {
|
||||
s = &openAPIAggregator{
|
||||
openAPISpecs: map[string]*openAPISpecInfo{},
|
||||
contextMapper: contextMapper,
|
||||
}
|
||||
|
||||
// Get Local delegate's Spec
|
||||
s.inProcessDelegatesOpenAPISpec, err = s.downloadOpenAPISpec(delegateHandler)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Build Aggregator's spec
|
||||
s.aggregatorOpenAPISpec, err = builder.BuildOpenAPISpec(
|
||||
webServices, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Remove any non-API endpoints from aggregator's spec. aggregatorOpenAPISpec
|
||||
// is the source of truth for all non-api endpoints.
|
||||
aggregator.FilterSpecByPaths(s.aggregatorOpenAPISpec, []string{"/apis/"})
|
||||
|
||||
// Build initial spec to serve.
|
||||
specToServe, err := s.buildOpenAPISpec()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Install handler
|
||||
s.openAPIService, err = handler.RegisterOpenAPIService(
|
||||
specToServe, "/swagger.json", pathHandler)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// openAPISpecInfo is used to store OpenAPI spec with its priority.
|
||||
// It can be used to sort specs with their priorities.
|
||||
type openAPISpecInfo struct {
|
||||
apiService apiregistration.APIService
|
||||
spec *spec.Swagger
|
||||
}
|
||||
|
||||
// byPriority can be used in sort.Sort to sort specs with their priorities.
|
||||
type byPriority struct {
|
||||
specs []openAPISpecInfo
|
||||
groupPriorities map[string]int32
|
||||
}
|
||||
|
||||
func (a byPriority) Len() int { return len(a.specs) }
|
||||
func (a byPriority) Swap(i, j int) { a.specs[i], a.specs[j] = a.specs[j], a.specs[i] }
|
||||
func (a byPriority) Less(i, j int) bool {
|
||||
var iPriority, jPriority int32
|
||||
if a.specs[i].apiService.Spec.Group == a.specs[j].apiService.Spec.Group {
|
||||
iPriority = a.specs[i].apiService.Spec.VersionPriority
|
||||
jPriority = a.specs[i].apiService.Spec.VersionPriority
|
||||
} else {
|
||||
iPriority = a.groupPriorities[a.specs[i].apiService.Spec.Group]
|
||||
jPriority = a.groupPriorities[a.specs[j].apiService.Spec.Group]
|
||||
}
|
||||
if iPriority != jPriority {
|
||||
// Sort by priority, higher first
|
||||
return iPriority > jPriority
|
||||
}
|
||||
// Sort by service name.
|
||||
return a.specs[i].apiService.Name < a.specs[j].apiService.Name
|
||||
}
|
||||
|
||||
func sortByPriority(specs []openAPISpecInfo) {
|
||||
b := byPriority{
|
||||
specs: specs,
|
||||
groupPriorities: map[string]int32{},
|
||||
}
|
||||
for _, spec := range specs {
|
||||
if pr, found := b.groupPriorities[spec.apiService.Spec.Group]; !found || spec.apiService.Spec.GroupPriorityMinimum > pr {
|
||||
b.groupPriorities[spec.apiService.Spec.Group] = spec.apiService.Spec.GroupPriorityMinimum
|
||||
}
|
||||
}
|
||||
sort.Sort(b)
|
||||
}
|
||||
|
||||
// buildOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe.
|
||||
func (s *openAPIAggregator) buildOpenAPISpec() (specToReturn *spec.Swagger, err error) {
|
||||
specToReturn, err = aggregator.CloneSpec(s.inProcessDelegatesOpenAPISpec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := aggregator.MergeSpecs(specToReturn, s.aggregatorOpenAPISpec); err != nil {
|
||||
return nil, fmt.Errorf("cannot merge local delegate spec with aggregator spec: %s", err.Error())
|
||||
}
|
||||
specs := []openAPISpecInfo{}
|
||||
for _, specInfo := range s.openAPISpecs {
|
||||
specs = append(specs, openAPISpecInfo{specInfo.apiService, specInfo.spec})
|
||||
}
|
||||
sortByPriority(specs)
|
||||
for _, specInfo := range specs {
|
||||
if err := aggregator.MergeSpecs(specToReturn, specInfo.spec); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return specToReturn, nil
|
||||
}
|
||||
|
||||
// updateOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe.
|
||||
func (s *openAPIAggregator) updateOpenAPISpec() error {
|
||||
if s.openAPIService == nil {
|
||||
return nil
|
||||
}
|
||||
specToServe, err := s.buildOpenAPISpec()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return s.openAPIService.UpdateSpec(specToServe)
|
||||
}
|
||||
|
||||
// inMemoryResponseWriter is a http.Writer that keep the response in memory.
|
||||
type inMemoryResponseWriter struct {
|
||||
header http.Header
|
||||
respCode int
|
||||
data []byte
|
||||
}
|
||||
|
||||
func newInMemoryResponseWriter() *inMemoryResponseWriter {
|
||||
return &inMemoryResponseWriter{header: http.Header{}}
|
||||
}
|
||||
|
||||
func (r *inMemoryResponseWriter) Header() http.Header {
|
||||
return r.header
|
||||
}
|
||||
|
||||
func (r *inMemoryResponseWriter) WriteHeader(code int) {
|
||||
r.respCode = code
|
||||
}
|
||||
|
||||
func (r *inMemoryResponseWriter) Write(in []byte) (int, error) {
|
||||
r.data = append(r.data, in...)
|
||||
return len(in), nil
|
||||
}
|
||||
|
||||
func (r *inMemoryResponseWriter) String() string {
|
||||
s := fmt.Sprintf("ResponseCode: %d", r.respCode)
|
||||
if r.data != nil {
|
||||
s += fmt.Sprintf(", Body: %s", string(r.data))
|
||||
}
|
||||
if r.header != nil {
|
||||
s += fmt.Sprintf(", Header: %s", r.header)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *openAPIAggregator) handlerWithUser(handler http.Handler, info user.Info) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
if ctx, ok := s.contextMapper.Get(req); ok {
|
||||
s.contextMapper.Update(req, request.WithUser(ctx, info))
|
||||
}
|
||||
handler.ServeHTTP(w, req)
|
||||
})
|
||||
}
|
||||
|
||||
// downloadOpenAPISpec downloads openAPI spec from /swagger.json endpoint of the given handler.
|
||||
func (s *openAPIAggregator) downloadOpenAPISpec(handler http.Handler) (*spec.Swagger, error) {
|
||||
handler = s.handlerWithUser(handler, &user.DefaultInfo{Name: aggregatorUser})
|
||||
handler = request.WithRequestContext(handler, s.contextMapper)
|
||||
handler = http.TimeoutHandler(handler, specDownloadTimeout, "request timed out")
|
||||
|
||||
req, err := http.NewRequest("GET", "/swagger.json", nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
writer := newInMemoryResponseWriter()
|
||||
handler.ServeHTTP(writer, req)
|
||||
|
||||
switch writer.respCode {
|
||||
case http.StatusOK:
|
||||
openApiSpec := &spec.Swagger{}
|
||||
if err := json.Unmarshal(writer.data, openApiSpec); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return openApiSpec, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("failed to retrive openAPI spec, http error: %s", writer.String())
|
||||
}
|
||||
}
|
||||
|
||||
// loadApiServiceSpec loads OpenAPI spec for the given API Service and then updates aggregator's spec.
|
||||
func (s *openAPIAggregator) loadApiServiceSpec(handler http.Handler, apiService *apiregistration.APIService) error {
|
||||
|
||||
// Ignore local services
|
||||
if apiService.Spec.Service == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
openApiSpec, err := s.downloadOpenAPISpec(handler)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
aggregator.FilterSpecByPaths(openApiSpec, []string{"/apis/" + apiService.Spec.Group + "/"})
|
||||
|
||||
s.openAPISpecs[apiService.Name] = &openAPISpecInfo{
|
||||
apiService: *apiService,
|
||||
spec: openApiSpec,
|
||||
}
|
||||
|
||||
err = s.updateOpenAPISpec()
|
||||
if err != nil {
|
||||
delete(s.openAPISpecs, apiService.Name)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
@@ -1,68 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/go-openapi/spec"
|
||||
|
||||
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
|
||||
)
|
||||
|
||||
func newApiServiceForTest(name, group string, minGroupPriority, versionPriority int32) apiregistration.APIService {
|
||||
r := apiregistration.APIService{}
|
||||
r.Spec.Group = group
|
||||
r.Spec.GroupPriorityMinimum = minGroupPriority
|
||||
r.Spec.VersionPriority = versionPriority
|
||||
r.Name = name
|
||||
return r
|
||||
}
|
||||
|
||||
func assertSortedServices(t *testing.T, actual []openAPISpecInfo, expectedNames []string) {
|
||||
actualNames := []string{}
|
||||
for _, a := range actual {
|
||||
actualNames = append(actualNames, a.apiService.Name)
|
||||
}
|
||||
if !reflect.DeepEqual(actualNames, expectedNames) {
|
||||
t.Errorf("Expected %s got %s.", expectedNames, actualNames)
|
||||
}
|
||||
}
|
||||
|
||||
func TestApiServiceSort(t *testing.T) {
|
||||
list := []openAPISpecInfo{
|
||||
{
|
||||
apiService: newApiServiceForTest("FirstService", "Group1", 10, 5),
|
||||
spec: &spec.Swagger{},
|
||||
},
|
||||
{
|
||||
apiService: newApiServiceForTest("SecondService", "Group2", 15, 3),
|
||||
spec: &spec.Swagger{},
|
||||
},
|
||||
{
|
||||
apiService: newApiServiceForTest("FirstServiceInternal", "Group1", 16, 3),
|
||||
spec: &spec.Swagger{},
|
||||
},
|
||||
{
|
||||
apiService: newApiServiceForTest("ThirdService", "Group3", 15, 3),
|
||||
spec: &spec.Swagger{},
|
||||
},
|
||||
}
|
||||
sortByPriority(list)
|
||||
assertSortedServices(t, list, []string{"FirstService", "FirstServiceInternal", "SecondService", "ThirdService"})
|
||||
}
|
@@ -27,6 +27,7 @@ filegroup(
|
||||
srcs = [
|
||||
":package-srcs",
|
||||
"//staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister:all-srcs",
|
||||
"//staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi:all-srcs",
|
||||
"//staging/src/k8s.io/kube-aggregator/pkg/controllers/status:all-srcs",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
|
@@ -0,0 +1,53 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"aggregator.go",
|
||||
"controller.go",
|
||||
"downloader.go",
|
||||
],
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//vendor/github.com/emicklei/go-restful:go_default_library",
|
||||
"//vendor/github.com/go-openapi/spec:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/server:go_default_library",
|
||||
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
|
||||
"//vendor/k8s.io/kube-openapi/pkg/aggregator:go_default_library",
|
||||
"//vendor/k8s.io/kube-openapi/pkg/builder:go_default_library",
|
||||
"//vendor/k8s.io/kube-openapi/pkg/common:go_default_library",
|
||||
"//vendor/k8s.io/kube-openapi/pkg/handler:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["aggregator_test.go"],
|
||||
library = ":go_default_library",
|
||||
deps = [
|
||||
"//vendor/github.com/go-openapi/spec:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
@@ -0,0 +1,318 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package openapi
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/emicklei/go-restful"
|
||||
"github.com/go-openapi/spec"
|
||||
|
||||
"k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
|
||||
"k8s.io/kube-openapi/pkg/aggregator"
|
||||
"k8s.io/kube-openapi/pkg/builder"
|
||||
"k8s.io/kube-openapi/pkg/common"
|
||||
"k8s.io/kube-openapi/pkg/handler"
|
||||
)
|
||||
|
||||
const (
|
||||
aggregatorUser = "system:aggregator"
|
||||
specDownloadTimeout = 60 * time.Second
|
||||
localDelegateChainNamePattern = "k8s_internal_local_delegation_chain_%010d"
|
||||
|
||||
// A randomly generated UUID to differentiate local and remote eTags.
|
||||
locallyGeneratedEtagPrefix = "\"6E8F849B434D4B98A569B9D7718876E9-"
|
||||
)
|
||||
|
||||
type specAggregator struct {
|
||||
// mutex protects all members of this struct.
|
||||
rwMutex sync.RWMutex
|
||||
|
||||
// Map of API Services' OpenAPI specs by their name
|
||||
openAPISpecs map[string]*openAPISpecInfo
|
||||
|
||||
// provided for dynamic OpenAPI spec
|
||||
openAPIService *handler.OpenAPIService
|
||||
}
|
||||
|
||||
var _ AggregationManager = &specAggregator{}
|
||||
|
||||
// This function is not thread safe as it only being called on startup.
|
||||
func (s *specAggregator) addLocalSpec(spec *spec.Swagger, localHandler http.Handler, name, etag string) {
|
||||
localAPIService := apiregistration.APIService{}
|
||||
localAPIService.Name = name
|
||||
s.openAPISpecs[name] = &openAPISpecInfo{
|
||||
etag: etag,
|
||||
apiService: localAPIService,
|
||||
handler: localHandler,
|
||||
spec: spec,
|
||||
}
|
||||
}
|
||||
|
||||
// BuildAndRegisterAggregator registered OpenAPI aggregator handler. This function is not thread safe as it only being called on startup.
|
||||
func BuildAndRegisterAggregator(downloader *Downloader, delegationTarget server.DelegationTarget, webServices []*restful.WebService,
|
||||
config *common.Config, pathHandler common.PathHandler) (AggregationManager, error) {
|
||||
s := &specAggregator{
|
||||
openAPISpecs: map[string]*openAPISpecInfo{},
|
||||
}
|
||||
|
||||
i := 0
|
||||
// Build Aggregator's spec
|
||||
aggregatorOpenAPISpec, err := builder.BuildOpenAPISpec(
|
||||
webServices, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Reserving non-name spec for aggregator's Spec.
|
||||
s.addLocalSpec(aggregatorOpenAPISpec, nil, fmt.Sprintf(localDelegateChainNamePattern, i), "")
|
||||
i++
|
||||
for delegate := delegationTarget; delegate != nil; delegate = delegate.NextDelegate() {
|
||||
handler := delegate.UnprotectedHandler()
|
||||
if handler == nil {
|
||||
continue
|
||||
}
|
||||
delegateSpec, etag, _, err := downloader.Download(handler, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if delegateSpec == nil {
|
||||
continue
|
||||
}
|
||||
s.addLocalSpec(delegateSpec, handler, fmt.Sprintf(localDelegateChainNamePattern, i), etag)
|
||||
i++
|
||||
}
|
||||
|
||||
// Build initial spec to serve.
|
||||
specToServe, err := s.buildOpenAPISpec()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Install handler
|
||||
s.openAPIService, err = handler.RegisterOpenAPIService(
|
||||
specToServe, "/swagger.json", pathHandler)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// openAPISpecInfo is used to store OpenAPI spec with its priority.
|
||||
// It can be used to sort specs with their priorities.
|
||||
type openAPISpecInfo struct {
|
||||
apiService apiregistration.APIService
|
||||
|
||||
// Specification of this API Service. If null then the spec is not loaded yet.
|
||||
spec *spec.Swagger
|
||||
handler http.Handler
|
||||
etag string
|
||||
}
|
||||
|
||||
// byPriority can be used in sort.Sort to sort specs with their priorities.
|
||||
type byPriority struct {
|
||||
specs []openAPISpecInfo
|
||||
groupPriorities map[string]int32
|
||||
}
|
||||
|
||||
func (a byPriority) Len() int { return len(a.specs) }
|
||||
func (a byPriority) Swap(i, j int) { a.specs[i], a.specs[j] = a.specs[j], a.specs[i] }
|
||||
func (a byPriority) Less(i, j int) bool {
|
||||
// All local specs will come first
|
||||
// WARNING: This will result in not following priorities for local APIServices.
|
||||
if a.specs[i].apiService.Spec.Service == nil {
|
||||
// Sort local specs with their name. This is the order in the delegation chain (aggregator first).
|
||||
return a.specs[i].apiService.Name < a.specs[j].apiService.Name
|
||||
}
|
||||
var iPriority, jPriority int32
|
||||
if a.specs[i].apiService.Spec.Group == a.specs[j].apiService.Spec.Group {
|
||||
iPriority = a.specs[i].apiService.Spec.VersionPriority
|
||||
jPriority = a.specs[i].apiService.Spec.VersionPriority
|
||||
} else {
|
||||
iPriority = a.groupPriorities[a.specs[i].apiService.Spec.Group]
|
||||
jPriority = a.groupPriorities[a.specs[j].apiService.Spec.Group]
|
||||
}
|
||||
if iPriority != jPriority {
|
||||
// Sort by priority, higher first
|
||||
return iPriority > jPriority
|
||||
}
|
||||
// Sort by service name.
|
||||
return a.specs[i].apiService.Name < a.specs[j].apiService.Name
|
||||
}
|
||||
|
||||
func sortByPriority(specs []openAPISpecInfo) {
|
||||
b := byPriority{
|
||||
specs: specs,
|
||||
groupPriorities: map[string]int32{},
|
||||
}
|
||||
for _, spec := range specs {
|
||||
if spec.apiService.Spec.Service == nil {
|
||||
continue
|
||||
}
|
||||
if pr, found := b.groupPriorities[spec.apiService.Spec.Group]; !found || spec.apiService.Spec.GroupPriorityMinimum > pr {
|
||||
b.groupPriorities[spec.apiService.Spec.Group] = spec.apiService.Spec.GroupPriorityMinimum
|
||||
}
|
||||
}
|
||||
sort.Sort(b)
|
||||
}
|
||||
|
||||
// buildOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe. The caller is responsible to hold proper locks.
|
||||
func (s *specAggregator) buildOpenAPISpec() (specToReturn *spec.Swagger, err error) {
|
||||
specs := []openAPISpecInfo{}
|
||||
for _, specInfo := range s.openAPISpecs {
|
||||
if specInfo.spec == nil {
|
||||
continue
|
||||
}
|
||||
specs = append(specs, *specInfo)
|
||||
}
|
||||
if len(specs) == 0 {
|
||||
return &spec.Swagger{}, nil
|
||||
}
|
||||
sortByPriority(specs)
|
||||
for _, specInfo := range specs {
|
||||
// TODO: Make kube-openapi.MergeSpec(s) accept nil or empty spec as destination and just clone the spec in that case.
|
||||
if specToReturn == nil {
|
||||
specToReturn, err = aggregator.CloneSpec(specInfo.spec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err := aggregator.MergeSpecsIgnorePathConflict(specToReturn, specInfo.spec); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return specToReturn, nil
|
||||
}
|
||||
|
||||
// updateOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe. The caller is responsible to hold proper locks.
|
||||
func (s *specAggregator) updateOpenAPISpec() error {
|
||||
if s.openAPIService == nil {
|
||||
return nil
|
||||
}
|
||||
specToServe, err := s.buildOpenAPISpec()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return s.openAPIService.UpdateSpec(specToServe)
|
||||
}
|
||||
|
||||
// tryUpdatingServiceSpecs tries updating openAPISpecs map with specified specInfo, and keeps the map intact
|
||||
// if the update fails.
|
||||
func (s *specAggregator) tryUpdatingServiceSpecs(specInfo *openAPISpecInfo) error {
|
||||
orgSpecInfo, exists := s.openAPISpecs[specInfo.apiService.Name]
|
||||
s.openAPISpecs[specInfo.apiService.Name] = specInfo
|
||||
if err := s.updateOpenAPISpec(); err != nil {
|
||||
if exists {
|
||||
s.openAPISpecs[specInfo.apiService.Name] = orgSpecInfo
|
||||
} else {
|
||||
delete(s.openAPISpecs, specInfo.apiService.Name)
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// tryDeleteServiceSpecs tries delete specified specInfo from openAPISpecs map, and keeps the map intact
|
||||
// if the update fails.
|
||||
func (s *specAggregator) tryDeleteServiceSpecs(apiServiceName string) error {
|
||||
orgSpecInfo, exists := s.openAPISpecs[apiServiceName]
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
delete(s.openAPISpecs, apiServiceName)
|
||||
if err := s.updateOpenAPISpec(); err != nil {
|
||||
s.openAPISpecs[apiServiceName] = orgSpecInfo
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateAPIServiceSpec updates the api service's OpenAPI spec. It is thread safe.
|
||||
func (s *specAggregator) UpdateAPIServiceSpec(apiServiceName string, spec *spec.Swagger, etag string) error {
|
||||
s.rwMutex.Lock()
|
||||
defer s.rwMutex.Unlock()
|
||||
|
||||
specInfo, existingService := s.openAPISpecs[apiServiceName]
|
||||
if !existingService {
|
||||
return fmt.Errorf("APIService %q does not exists", apiServiceName)
|
||||
}
|
||||
|
||||
// For APIServices (non-local) specs, only merge their /apis/ prefixed endpoint as it is the only paths
|
||||
// proxy handler delegates.
|
||||
if specInfo.apiService.Spec.Service != nil {
|
||||
aggregator.FilterSpecByPaths(spec, []string{"/apis/"})
|
||||
}
|
||||
|
||||
return s.tryUpdatingServiceSpecs(&openAPISpecInfo{
|
||||
apiService: specInfo.apiService,
|
||||
spec: spec,
|
||||
handler: specInfo.handler,
|
||||
etag: etag,
|
||||
})
|
||||
}
|
||||
|
||||
// AddUpdateAPIService adds or updates the api service. It is thread safe.
|
||||
func (s *specAggregator) AddUpdateAPIService(handler http.Handler, apiService *apiregistration.APIService) error {
|
||||
s.rwMutex.Lock()
|
||||
defer s.rwMutex.Unlock()
|
||||
|
||||
if apiService.Spec.Service == nil {
|
||||
// All local specs should be already aggregated using local delegate chain
|
||||
return nil
|
||||
}
|
||||
|
||||
newSpec := &openAPISpecInfo{
|
||||
apiService: *apiService,
|
||||
handler: handler,
|
||||
}
|
||||
if specInfo, existingService := s.openAPISpecs[apiService.Name]; existingService {
|
||||
newSpec.etag = specInfo.etag
|
||||
newSpec.spec = specInfo.spec
|
||||
}
|
||||
return s.tryUpdatingServiceSpecs(newSpec)
|
||||
}
|
||||
|
||||
// RemoveAPIServiceSpec removes an api service from OpenAPI aggregation. If it does not exist, no error is returned.
|
||||
// It is thread safe.
|
||||
func (s *specAggregator) RemoveAPIServiceSpec(apiServiceName string) error {
|
||||
s.rwMutex.Lock()
|
||||
defer s.rwMutex.Unlock()
|
||||
|
||||
if _, existingService := s.openAPISpecs[apiServiceName]; !existingService {
|
||||
return nil
|
||||
}
|
||||
|
||||
return s.tryDeleteServiceSpecs(apiServiceName)
|
||||
}
|
||||
|
||||
// GetAPIServiceSpec returns api service spec info
|
||||
func (s *specAggregator) GetAPIServiceInfo(apiServiceName string) (handler http.Handler, etag string, exists bool) {
|
||||
s.rwMutex.RLock()
|
||||
defer s.rwMutex.RUnlock()
|
||||
|
||||
if info, existingService := s.openAPISpecs[apiServiceName]; existingService {
|
||||
return info.handler, info.etag, true
|
||||
}
|
||||
return nil, "", false
|
||||
}
|
@@ -0,0 +1,135 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package openapi
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/go-openapi/spec"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
|
||||
)
|
||||
|
||||
func newAPIServiceForTest(name, group string, minGroupPriority, versionPriority int32) apiregistration.APIService {
|
||||
r := apiregistration.APIService{}
|
||||
r.Spec.Group = group
|
||||
r.Spec.GroupPriorityMinimum = minGroupPriority
|
||||
r.Spec.VersionPriority = versionPriority
|
||||
r.Spec.Service = &apiregistration.ServiceReference{}
|
||||
r.Name = name
|
||||
return r
|
||||
}
|
||||
|
||||
func assertSortedServices(t *testing.T, actual []openAPISpecInfo, expectedNames []string) {
|
||||
actualNames := []string{}
|
||||
for _, a := range actual {
|
||||
actualNames = append(actualNames, a.apiService.Name)
|
||||
}
|
||||
if !reflect.DeepEqual(actualNames, expectedNames) {
|
||||
t.Errorf("Expected %s got %s.", expectedNames, actualNames)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAPIServiceSort(t *testing.T) {
|
||||
list := []openAPISpecInfo{
|
||||
{
|
||||
apiService: newAPIServiceForTest("FirstService", "Group1", 10, 5),
|
||||
spec: &spec.Swagger{},
|
||||
},
|
||||
{
|
||||
apiService: newAPIServiceForTest("SecondService", "Group2", 15, 3),
|
||||
spec: &spec.Swagger{},
|
||||
},
|
||||
{
|
||||
apiService: newAPIServiceForTest("FirstServiceInternal", "Group1", 16, 3),
|
||||
spec: &spec.Swagger{},
|
||||
},
|
||||
{
|
||||
apiService: newAPIServiceForTest("ThirdService", "Group3", 15, 3),
|
||||
spec: &spec.Swagger{},
|
||||
},
|
||||
}
|
||||
sortByPriority(list)
|
||||
assertSortedServices(t, list, []string{"FirstService", "FirstServiceInternal", "SecondService", "ThirdService"})
|
||||
}
|
||||
|
||||
type handlerTest struct {
|
||||
etag string
|
||||
data []byte
|
||||
}
|
||||
|
||||
var _ http.Handler = handlerTest{}
|
||||
|
||||
func (h handlerTest) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
if len(h.etag) > 0 {
|
||||
w.Header().Add("Etag", h.etag)
|
||||
}
|
||||
ifNoneMatches := r.Header["If-None-Match"]
|
||||
for _, match := range ifNoneMatches {
|
||||
if match == h.etag {
|
||||
w.WriteHeader(http.StatusNotModified)
|
||||
return
|
||||
}
|
||||
}
|
||||
w.Write(h.data)
|
||||
}
|
||||
|
||||
func assertDownloadedSpec(actualSpec *spec.Swagger, actualEtag string, err error,
|
||||
expectedSpecID string, expectedEtag string) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("downloadOpenAPISpec failed : %s", err)
|
||||
}
|
||||
if expectedSpecID == "" && actualSpec != nil {
|
||||
return fmt.Errorf("expected Not Modified, actual ID %s", actualSpec.ID)
|
||||
}
|
||||
if actualSpec != nil && actualSpec.ID != expectedSpecID {
|
||||
return fmt.Errorf("expected ID %s, actual ID %s", expectedSpecID, actualSpec.ID)
|
||||
}
|
||||
if actualEtag != expectedEtag {
|
||||
return fmt.Errorf("expected ETag '%s', actual ETag '%s'", expectedEtag, actualEtag)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestDownloadOpenAPISpec(t *testing.T) {
|
||||
|
||||
s := Downloader{contextMapper: request.NewRequestContextMapper()}
|
||||
|
||||
// Test with no eTag
|
||||
actualSpec, actualEtag, _, err := s.Download(handlerTest{data: []byte("{\"id\": \"test\"}")}, "")
|
||||
assert.NoError(t, assertDownloadedSpec(actualSpec, actualEtag, err, "test", "\"6E8F849B434D4B98A569B9D7718876E9-356ECAB19D7FBE1336BABB1E70F8F3025050DE218BE78256BE81620681CFC9A268508E542B8B55974E17B2184BBFC8FFFAA577E51BE195D32B3CA2547818ABE4\""))
|
||||
|
||||
// Test with eTag
|
||||
actualSpec, actualEtag, _, err = s.Download(
|
||||
handlerTest{data: []byte("{\"id\": \"test\"}"), etag: "etag_test"}, "")
|
||||
assert.NoError(t, assertDownloadedSpec(actualSpec, actualEtag, err, "test", "etag_test"))
|
||||
|
||||
// Test not modified
|
||||
actualSpec, actualEtag, _, err = s.Download(
|
||||
handlerTest{data: []byte("{\"id\": \"test\"}"), etag: "etag_test"}, "etag_test")
|
||||
assert.NoError(t, assertDownloadedSpec(actualSpec, actualEtag, err, "", "etag_test"))
|
||||
|
||||
// Test different eTags
|
||||
actualSpec, actualEtag, _, err = s.Download(
|
||||
handlerTest{data: []byte("{\"id\": \"test\"}"), etag: "etag_test1"}, "etag_test2")
|
||||
assert.NoError(t, assertDownloadedSpec(actualSpec, actualEtag, err, "test", "etag_test1"))
|
||||
}
|
@@ -0,0 +1,186 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package openapi
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/go-openapi/spec"
|
||||
"github.com/golang/glog"
|
||||
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
|
||||
)
|
||||
|
||||
const (
|
||||
successfulUpdateDelay = time.Minute
|
||||
failedUpdateMaxExpDelay = time.Hour
|
||||
)
|
||||
|
||||
type syncAction int
|
||||
|
||||
const (
|
||||
syncRequeue syncAction = iota
|
||||
syncRequeueRateLimited
|
||||
syncNothing
|
||||
)
|
||||
|
||||
// AggregationManager is the interface between this controller and OpenAPI Aggregator service.
|
||||
type AggregationManager interface {
|
||||
AddUpdateAPIService(handler http.Handler, apiService *apiregistration.APIService) error
|
||||
UpdateAPIServiceSpec(apiServiceName string, spec *spec.Swagger, etag string) error
|
||||
RemoveAPIServiceSpec(apiServiceName string) error
|
||||
GetAPIServiceInfo(apiServiceName string) (handler http.Handler, etag string, exists bool)
|
||||
}
|
||||
|
||||
// AggregationController periodically check for changes in OpenAPI specs of APIServices and update/remove
|
||||
// them if necessary.
|
||||
type AggregationController struct {
|
||||
openAPIAggregationManager AggregationManager
|
||||
queue workqueue.RateLimitingInterface
|
||||
downloader *Downloader
|
||||
|
||||
// To allow injection for testing.
|
||||
syncHandler func(key string) (syncAction, error)
|
||||
}
|
||||
|
||||
// NewAggregationController creates new OpenAPI aggregation controller.
|
||||
func NewAggregationController(downloader *Downloader, openAPIAggregationManager AggregationManager) *AggregationController {
|
||||
c := &AggregationController{
|
||||
openAPIAggregationManager: openAPIAggregationManager,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(
|
||||
workqueue.NewItemExponentialFailureRateLimiter(successfulUpdateDelay, failedUpdateMaxExpDelay), "APIServiceOpenAPIAggregationControllerQueue1"),
|
||||
downloader: downloader,
|
||||
}
|
||||
|
||||
c.syncHandler = c.sync
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
// Run starts OpenAPI AggregationController
|
||||
func (c *AggregationController) Run(stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer c.queue.ShutDown()
|
||||
|
||||
glog.Infof("Starting OpenAPI AggregationController")
|
||||
defer glog.Infof("Shutting down OpenAPI AggregationController")
|
||||
|
||||
go wait.Until(c.runWorker, time.Second, stopCh)
|
||||
|
||||
<-stopCh
|
||||
}
|
||||
|
||||
func (c *AggregationController) runWorker() {
|
||||
for c.processNextWorkItem() {
|
||||
}
|
||||
}
|
||||
|
||||
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
|
||||
func (c *AggregationController) processNextWorkItem() bool {
|
||||
key, quit := c.queue.Get()
|
||||
defer c.queue.Done(key)
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
|
||||
glog.Infof("OpenAPI AggregationController: Processing item %s", key)
|
||||
|
||||
action, err := c.syncHandler(key.(string))
|
||||
if err == nil {
|
||||
c.queue.Forget(key)
|
||||
} else {
|
||||
utilruntime.HandleError(fmt.Errorf("loading OpenAPI spec for %q failed with: %v", key, err))
|
||||
}
|
||||
|
||||
switch action {
|
||||
case syncRequeue:
|
||||
glog.Infof("OpenAPI AggregationController: action for item %s: Requeue.", key)
|
||||
c.queue.AddAfter(key, successfulUpdateDelay)
|
||||
case syncRequeueRateLimited:
|
||||
glog.Infof("OpenAPI AggregationController: action for item %s: Rate Limited Requeue.", key)
|
||||
c.queue.AddRateLimited(key)
|
||||
case syncNothing:
|
||||
glog.Infof("OpenAPI AggregationController: action for item %s: Nothing (removed from the queue).", key)
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *AggregationController) sync(key string) (syncAction, error) {
|
||||
handler, etag, exists := c.openAPIAggregationManager.GetAPIServiceInfo(key)
|
||||
if !exists || handler == nil {
|
||||
return syncNothing, nil
|
||||
}
|
||||
returnSpec, newEtag, httpStatus, err := c.downloader.Download(handler, etag)
|
||||
switch {
|
||||
case err != nil:
|
||||
return syncRequeueRateLimited, err
|
||||
case httpStatus == http.StatusNotModified:
|
||||
case httpStatus == http.StatusNotFound || returnSpec == nil:
|
||||
return syncRequeueRateLimited, fmt.Errorf("OpenAPI spec does not exists")
|
||||
case httpStatus == http.StatusOK:
|
||||
if err := c.openAPIAggregationManager.UpdateAPIServiceSpec(key, returnSpec, newEtag); err != nil {
|
||||
return syncRequeueRateLimited, err
|
||||
}
|
||||
}
|
||||
return syncRequeue, nil
|
||||
}
|
||||
|
||||
// AddAPIService adds a new API Service to OpenAPI Aggregation.
|
||||
func (c *AggregationController) AddAPIService(handler http.Handler, apiService *apiregistration.APIService) {
|
||||
if apiService.Spec.Service == nil {
|
||||
return
|
||||
}
|
||||
if err := c.openAPIAggregationManager.AddUpdateAPIService(handler, apiService); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("adding %q to AggregationController failed with: %v", apiService.Name, err))
|
||||
}
|
||||
c.queue.AddAfter(apiService.Name, time.Second)
|
||||
}
|
||||
|
||||
// UpdateAPIService updates API Service's info and handler.
|
||||
func (c *AggregationController) UpdateAPIService(handler http.Handler, apiService *apiregistration.APIService) {
|
||||
if apiService.Spec.Service == nil {
|
||||
return
|
||||
}
|
||||
if err := c.openAPIAggregationManager.AddUpdateAPIService(handler, apiService); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("updating %q to AggregationController failed with: %v", apiService.Name, err))
|
||||
}
|
||||
key := apiService.Name
|
||||
if c.queue.NumRequeues(key) > 0 {
|
||||
// The item has failed before. Remove it from failure queue and
|
||||
// update it in a second
|
||||
c.queue.Forget(key)
|
||||
c.queue.AddAfter(key, time.Second)
|
||||
}
|
||||
// Else: The item has been succeeded before and it will be updated soon (after successfulUpdateDelay)
|
||||
// we don't add it again as it will cause a duplication of items.
|
||||
}
|
||||
|
||||
// RemoveAPIService removes API Service from OpenAPI Aggregation Controller.
|
||||
func (c *AggregationController) RemoveAPIService(apiServiceName string) {
|
||||
if err := c.openAPIAggregationManager.RemoveAPIServiceSpec(apiServiceName); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("removing %q from AggregationController failed with: %v", apiServiceName, err))
|
||||
}
|
||||
// This will only remove it if it was failing before. If it was successful, processNextWorkItem will figure it out
|
||||
// and will not add it again to the queue.
|
||||
c.queue.Forget(apiServiceName)
|
||||
}
|
@@ -0,0 +1,146 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package openapi
|
||||
|
||||
import (
|
||||
"crypto/sha512"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/go-openapi/spec"
|
||||
|
||||
"k8s.io/apiserver/pkg/authentication/user"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
)
|
||||
|
||||
// Downloader is the OpenAPI downloader type. It will try to download spec from /swagger.json endpoint.
|
||||
type Downloader struct {
|
||||
contextMapper request.RequestContextMapper
|
||||
}
|
||||
|
||||
// NewDownloader creates a new OpenAPI Downloader.
|
||||
func NewDownloader(contextMapper request.RequestContextMapper) Downloader {
|
||||
return Downloader{contextMapper}
|
||||
}
|
||||
|
||||
// inMemoryResponseWriter is a http.Writer that keep the response in memory.
|
||||
type inMemoryResponseWriter struct {
|
||||
writeHeaderCalled bool
|
||||
header http.Header
|
||||
respCode int
|
||||
data []byte
|
||||
}
|
||||
|
||||
func newInMemoryResponseWriter() *inMemoryResponseWriter {
|
||||
return &inMemoryResponseWriter{header: http.Header{}}
|
||||
}
|
||||
|
||||
func (r *inMemoryResponseWriter) Header() http.Header {
|
||||
return r.header
|
||||
}
|
||||
|
||||
func (r *inMemoryResponseWriter) WriteHeader(code int) {
|
||||
r.writeHeaderCalled = true
|
||||
r.respCode = code
|
||||
}
|
||||
|
||||
func (r *inMemoryResponseWriter) Write(in []byte) (int, error) {
|
||||
if !r.writeHeaderCalled {
|
||||
r.WriteHeader(http.StatusOK)
|
||||
}
|
||||
r.data = append(r.data, in...)
|
||||
return len(in), nil
|
||||
}
|
||||
|
||||
func (r *inMemoryResponseWriter) String() string {
|
||||
s := fmt.Sprintf("ResponseCode: %d", r.respCode)
|
||||
if r.data != nil {
|
||||
s += fmt.Sprintf(", Body: %s", string(r.data))
|
||||
}
|
||||
if r.header != nil {
|
||||
s += fmt.Sprintf(", Header: %s", r.header)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *Downloader) handlerWithUser(handler http.Handler, info user.Info) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
if ctx, ok := s.contextMapper.Get(req); ok {
|
||||
s.contextMapper.Update(req, request.WithUser(ctx, info))
|
||||
}
|
||||
handler.ServeHTTP(w, req)
|
||||
})
|
||||
}
|
||||
|
||||
func etagFor(data []byte) string {
|
||||
return fmt.Sprintf("%s%X\"", locallyGeneratedEtagPrefix, sha512.Sum512(data))
|
||||
}
|
||||
|
||||
// Download downloads openAPI spec from /swagger.json endpoint of the given handler.
|
||||
// httpStatus is only valid if err == nil
|
||||
func (s *Downloader) Download(handler http.Handler, etag string) (returnSpec *spec.Swagger, newEtag string, httpStatus int, err error) {
|
||||
handler = s.handlerWithUser(handler, &user.DefaultInfo{Name: aggregatorUser})
|
||||
handler = request.WithRequestContext(handler, s.contextMapper)
|
||||
handler = http.TimeoutHandler(handler, specDownloadTimeout, "request timed out")
|
||||
|
||||
req, err := http.NewRequest("GET", "/swagger.json", nil)
|
||||
if err != nil {
|
||||
return nil, "", 0, err
|
||||
}
|
||||
|
||||
// Only pass eTag if it is not generated locally
|
||||
if len(etag) > 0 && !strings.HasPrefix(etag, locallyGeneratedEtagPrefix) {
|
||||
req.Header.Add("If-None-Match", etag)
|
||||
}
|
||||
|
||||
writer := newInMemoryResponseWriter()
|
||||
handler.ServeHTTP(writer, req)
|
||||
|
||||
switch writer.respCode {
|
||||
case http.StatusNotModified:
|
||||
if len(etag) == 0 {
|
||||
return nil, etag, http.StatusNotModified, fmt.Errorf("http.StatusNotModified is not allowed in absence of etag")
|
||||
}
|
||||
return nil, etag, http.StatusNotModified, nil
|
||||
case http.StatusNotFound:
|
||||
// Gracefully skip 404, assuming the server won't provide any spec
|
||||
return nil, "", http.StatusNotFound, nil
|
||||
case http.StatusOK:
|
||||
openAPISpec := &spec.Swagger{}
|
||||
if err := json.Unmarshal(writer.data, openAPISpec); err != nil {
|
||||
return nil, "", 0, err
|
||||
}
|
||||
newEtag = writer.Header().Get("Etag")
|
||||
if len(newEtag) == 0 {
|
||||
newEtag = etagFor(writer.data)
|
||||
if len(etag) > 0 && strings.HasPrefix(etag, locallyGeneratedEtagPrefix) {
|
||||
// The function call with an etag and server does not report an etag.
|
||||
// That means this server does not support etag and the etag that passed
|
||||
// to the function generated previously by us. Just compare etags and
|
||||
// return StatusNotModified if they are the same.
|
||||
if etag == newEtag {
|
||||
return nil, etag, http.StatusNotModified, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return openAPISpec, newEtag, http.StatusOK, nil
|
||||
default:
|
||||
return nil, "", 0, fmt.Errorf("failed to retrieve openAPI spec, http error: %s", writer.String())
|
||||
}
|
||||
}
|
187
vendor/k8s.io/kube-openapi/pkg/aggregator/aggregator.go
generated
vendored
187
vendor/k8s.io/kube-openapi/pkg/aggregator/aggregator.go
generated
vendored
@@ -154,8 +154,28 @@ func (s *referenceWalker) Start() {
|
||||
}
|
||||
}
|
||||
|
||||
// FilterSpecByPaths remove unnecessary paths and unused definitions.
|
||||
// usedDefinitionForSpec returns a map with all used definition in the provided spec as keys and true as values.
|
||||
func usedDefinitionForSpec(sp *spec.Swagger) map[string]bool {
|
||||
usedDefinitions := map[string]bool{}
|
||||
walkOnAllReferences(func(ref spec.Ref) spec.Ref {
|
||||
if refStr := ref.String(); refStr != "" && strings.HasPrefix(refStr, definitionPrefix) {
|
||||
usedDefinitions[refStr[len(definitionPrefix):]] = true
|
||||
}
|
||||
return ref
|
||||
}, sp)
|
||||
return usedDefinitions
|
||||
}
|
||||
|
||||
// FilterSpecByPaths removes unnecessary paths and definitions used by those paths.
|
||||
// i.e. if a Path removed by this function, all definition used by it and not used
|
||||
// anywhere else will also be removed.
|
||||
func FilterSpecByPaths(sp *spec.Swagger, keepPathPrefixes []string) {
|
||||
// Walk all references to find all used definitions. This function
|
||||
// want to only deal with unused definitions resulted from filtering paths.
|
||||
// Thus a definition will be removed only if it has been used before but
|
||||
// it is unused because of a path prune.
|
||||
initialUsedDefinitions := usedDefinitionForSpec(sp)
|
||||
|
||||
// First remove unwanted paths
|
||||
prefixes := util.NewTrie(keepPathPrefixes)
|
||||
orgPaths := sp.Paths
|
||||
@@ -174,34 +194,24 @@ func FilterSpecByPaths(sp *spec.Swagger, keepPathPrefixes []string) {
|
||||
}
|
||||
|
||||
// Walk all references to find all definition references.
|
||||
usedDefinitions := map[string]bool{}
|
||||
|
||||
walkOnAllReferences(func(ref spec.Ref) spec.Ref {
|
||||
if ref.String() != "" {
|
||||
refStr := ref.String()
|
||||
if strings.HasPrefix(refStr, definitionPrefix) {
|
||||
usedDefinitions[refStr[len(definitionPrefix):]] = true
|
||||
}
|
||||
}
|
||||
return ref
|
||||
}, sp)
|
||||
usedDefinitions := usedDefinitionForSpec(sp)
|
||||
|
||||
// Remove unused definitions
|
||||
orgDefinitions := sp.Definitions
|
||||
sp.Definitions = spec.Definitions{}
|
||||
for k, v := range orgDefinitions {
|
||||
if usedDefinitions[k] {
|
||||
if usedDefinitions[k] || !initialUsedDefinitions[k] {
|
||||
sp.Definitions[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func renameDefinition(s *spec.Swagger, old, new string) {
|
||||
old_ref := definitionPrefix + old
|
||||
new_ref := definitionPrefix + new
|
||||
oldRef := definitionPrefix + old
|
||||
newRef := definitionPrefix + new
|
||||
walkOnAllReferences(func(ref spec.Ref) spec.Ref {
|
||||
if ref.String() == old_ref {
|
||||
return spec.MustCreateRef(new_ref)
|
||||
if ref.String() == oldRef {
|
||||
return spec.MustCreateRef(newRef)
|
||||
}
|
||||
return ref
|
||||
}, s)
|
||||
@@ -209,58 +219,117 @@ func renameDefinition(s *spec.Swagger, old, new string) {
|
||||
delete(s.Definitions, old)
|
||||
}
|
||||
|
||||
// Copy paths and definitions from source to dest, rename definitions if needed.
|
||||
// dest will be mutated, and source will not be changed.
|
||||
// MergeSpecsIgnorePathConflict is the same as MergeSpecs except it will ignore any path
|
||||
// conflicts by keeping the paths of destination. It will rename definition conflicts.
|
||||
func MergeSpecsIgnorePathConflict(dest, source *spec.Swagger) error {
|
||||
return mergeSpecs(dest, source, true, true)
|
||||
}
|
||||
|
||||
// MergeSpecsFailOnDefinitionConflict is differ from MergeSpecs as it fails if there is
|
||||
// a definition conflict.
|
||||
func MergeSpecsFailOnDefinitionConflict(dest, source *spec.Swagger) error {
|
||||
return mergeSpecs(dest, source, false, false)
|
||||
}
|
||||
|
||||
// MergeSpecs copies paths and definitions from source to dest, rename definitions if needed.
|
||||
// dest will be mutated, and source will not be changed. It will fail on path conflicts.
|
||||
func MergeSpecs(dest, source *spec.Swagger) error {
|
||||
sourceCopy, err := CloneSpec(source)
|
||||
if err != nil {
|
||||
return err
|
||||
return mergeSpecs(dest, source, true, false)
|
||||
}
|
||||
|
||||
func mergeSpecs(dest, source *spec.Swagger, renameModelConflicts, ignorePathConflicts bool) (err error) {
|
||||
specCloned := false
|
||||
if ignorePathConflicts {
|
||||
keepPaths := []string{}
|
||||
hasConflictingPath := false
|
||||
for k := range source.Paths.Paths {
|
||||
if _, found := dest.Paths.Paths[k]; !found {
|
||||
keepPaths = append(keepPaths, k)
|
||||
} else {
|
||||
hasConflictingPath = true
|
||||
}
|
||||
}
|
||||
if len(keepPaths) == 0 {
|
||||
// There is nothing to merge. All paths are conflicting.
|
||||
return nil
|
||||
}
|
||||
if hasConflictingPath {
|
||||
source, err = CloneSpec(source)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
specCloned = true
|
||||
FilterSpecByPaths(source, keepPaths)
|
||||
}
|
||||
}
|
||||
for k, v := range sourceCopy.Paths.Paths {
|
||||
// Check for model conflicts
|
||||
conflicts := false
|
||||
for k, v := range source.Definitions {
|
||||
v2, found := dest.Definitions[k]
|
||||
if found && !reflect.DeepEqual(v, v2) {
|
||||
if !renameModelConflicts {
|
||||
return fmt.Errorf("model name conflict in merging OpenAPI spec: %s", k)
|
||||
}
|
||||
conflicts = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if conflicts {
|
||||
if !specCloned {
|
||||
source, err = CloneSpec(source)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
specCloned = true
|
||||
usedNames := map[string]bool{}
|
||||
for k := range dest.Definitions {
|
||||
usedNames[k] = true
|
||||
}
|
||||
type Rename struct {
|
||||
from, to string
|
||||
}
|
||||
renames := []Rename{}
|
||||
for k, v := range source.Definitions {
|
||||
if usedNames[k] {
|
||||
v2, found := dest.Definitions[k]
|
||||
// Reuse model iff they are exactly the same.
|
||||
if found && reflect.DeepEqual(v, v2) {
|
||||
continue
|
||||
}
|
||||
i := 2
|
||||
newName := fmt.Sprintf("%s_v%d", k, i)
|
||||
_, foundInSource := source.Definitions[newName]
|
||||
for usedNames[newName] || foundInSource {
|
||||
i++
|
||||
newName = fmt.Sprintf("%s_v%d", k, i)
|
||||
_, foundInSource = source.Definitions[newName]
|
||||
}
|
||||
renames = append(renames, Rename{from: k, to: newName})
|
||||
usedNames[newName] = true
|
||||
}
|
||||
}
|
||||
for _, r := range renames {
|
||||
renameDefinition(source, r.from, r.to)
|
||||
}
|
||||
}
|
||||
for k, v := range source.Definitions {
|
||||
if _, found := dest.Definitions[k]; !found {
|
||||
dest.Definitions[k] = v
|
||||
}
|
||||
}
|
||||
// Check for path conflicts
|
||||
for k, v := range source.Paths.Paths {
|
||||
if _, found := dest.Paths.Paths[k]; found {
|
||||
return fmt.Errorf("unable to merge: duplicated path %s", k)
|
||||
}
|
||||
dest.Paths.Paths[k] = v
|
||||
}
|
||||
usedNames := map[string]bool{}
|
||||
for k := range dest.Definitions {
|
||||
usedNames[k] = true
|
||||
}
|
||||
type Rename struct {
|
||||
from, to string
|
||||
}
|
||||
renames := []Rename{}
|
||||
for k, v := range sourceCopy.Definitions {
|
||||
if usedNames[k] {
|
||||
v2, found := dest.Definitions[k]
|
||||
// Reuse model iff they are exactly the same.
|
||||
if found && reflect.DeepEqual(v, v2) {
|
||||
continue
|
||||
}
|
||||
i := 2
|
||||
newName := fmt.Sprintf("%s_v%d", k, i)
|
||||
_, foundInSource := sourceCopy.Definitions[newName]
|
||||
for usedNames[newName] || foundInSource {
|
||||
i += 1
|
||||
newName = fmt.Sprintf("%s_v%d", k, i)
|
||||
_, foundInSource = sourceCopy.Definitions[newName]
|
||||
}
|
||||
renames = append(renames, Rename{from: k, to: newName})
|
||||
usedNames[newName] = true
|
||||
}
|
||||
}
|
||||
for _, r := range renames {
|
||||
renameDefinition(sourceCopy, r.from, r.to)
|
||||
}
|
||||
for k, v := range sourceCopy.Definitions {
|
||||
if _, found := dest.Definitions[k]; !found {
|
||||
dest.Definitions[k] = v
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Clone OpenAPI spec
|
||||
// CloneSpec clones OpenAPI spec
|
||||
func CloneSpec(source *spec.Swagger) (*spec.Swagger, error) {
|
||||
// TODO(mehdy): Find a faster way to clone an spec
|
||||
bytes, err := json.Marshal(source)
|
||||
|
1
vendor/k8s.io/kube-openapi/pkg/generators/openapi.go
generated
vendored
1
vendor/k8s.io/kube-openapi/pkg/generators/openapi.go
generated
vendored
@@ -328,6 +328,7 @@ func (g openAPITypeWriter) generateMembers(t *types.Type, required []string) ([]
|
||||
required = append(required, name)
|
||||
}
|
||||
if err = g.generateProperty(&m, t); err != nil {
|
||||
glog.Errorf("Error when generating: %v, %v\n", name, m)
|
||||
return required, err
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user