Aggregate OpenAPI spec

This commit is contained in:
mbohlool 2017-07-19 15:39:46 -07:00
parent 400b77b48f
commit 8c0580d89b
5 changed files with 367 additions and 8 deletions

View File

@ -51,7 +51,6 @@ func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, command
// the aggregator doesn't wire these up. It just delegates them to the kubeapiserver
genericConfig.EnableSwaggerUI = false
genericConfig.OpenAPIConfig = nil
genericConfig.SwaggerConfig = nil
// copy the etcd options so we don't mutate originals.

View File

@ -116,6 +116,8 @@ type APIAggregator struct {
// Information needed to determine routing for the aggregator
serviceResolver ServiceResolver
openAPIAggregator *openAPIAggregator
}
type completedConfig struct {
@ -142,6 +144,11 @@ func (c *Config) SkipComplete() completedConfig {
// New returns a new instance of APIAggregator from the given config.
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
// Prevent generic API server to install OpenAPI handler. Aggregator server
// has its own customized OpenAPI handler.
openApiConfig := c.Config.GenericConfig.OpenAPIConfig
c.Config.GenericConfig.OpenAPIConfig = nil
genericServer, err := c.Config.GenericConfig.SkipComplete().New("kube-aggregator", delegationTarget) // completion is done in Complete, no need for a second time
if err != nil {
return nil, err
@ -212,17 +219,29 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
return nil
})
if openApiConfig != nil {
s.openAPIAggregator, err = buildAndRegisterOpenAPIAggregator(
s.delegateHandler,
s.GenericAPIServer.Handler.GoRestfulContainer.RegisteredWebServices(),
openApiConfig,
s.GenericAPIServer.Handler.NonGoRestfulMux,
s.contextMapper)
if err != nil {
return nil, err
}
}
return s, nil
}
// AddAPIService adds an API service. It is not thread-safe, so only call it on one thread at a time please.
// It's a slow moving API, so its ok to run the controller on a single thread
func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) {
func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) error {
// if the proxyHandler already exists, it needs to be updated. The aggregation bits do not
// since they are wired against listers because they require multiple resources to respond
if proxyHandler, exists := s.proxyHandlers[apiService.Name]; exists {
proxyHandler.updateAPIService(apiService)
return
return s.openAPIAggregator.loadApiServiceSpec(proxyHandler, apiService)
}
proxyPath := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version
@ -241,18 +260,21 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) {
serviceResolver: s.serviceResolver,
}
proxyHandler.updateAPIService(apiService)
if err := s.openAPIAggregator.loadApiServiceSpec(proxyHandler, apiService); err != nil {
return err
}
s.proxyHandlers[apiService.Name] = proxyHandler
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler)
// if we're dealing with the legacy group, we're done here
if apiService.Name == legacyAPIServiceName {
return
return nil
}
// if we've already registered the path with the handler, we don't want to do it again.
if s.handledGroups.Has(apiService.Spec.Group) {
return
return nil
}
// it's time to register the group aggregation endpoint
@ -268,6 +290,7 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) {
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(groupPath, groupDiscoveryHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle(groupPath+"/", groupDiscoveryHandler)
s.handledGroups.Insert(apiService.Spec.Group)
return nil
}
// RemoveAPIService removes the APIService from being handled. It is not thread-safe, so only call it on one thread at a time please.

View File

@ -39,7 +39,7 @@ import (
)
type APIHandlerManager interface {
AddAPIService(apiService *apiregistration.APIService)
AddAPIService(apiService *apiregistration.APIService) error
RemoveAPIService(apiServiceName string)
}
@ -102,8 +102,7 @@ func (c *APIServiceRegistrationController) sync(key string) error {
return nil
}
c.apiHandlerManager.AddAPIService(apiService)
return nil
return c.apiHandlerManager.AddAPIService(apiService)
}
func (c *APIServiceRegistrationController) Run(stopCh <-chan struct{}) {

View File

@ -0,0 +1,270 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"encoding/json"
"fmt"
"net/http"
"sort"
"time"
"github.com/emicklei/go-restful"
"github.com/go-openapi/spec"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
"k8s.io/kube-openapi/pkg/aggregator"
"k8s.io/kube-openapi/pkg/builder"
"k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/handler"
)
const (
aggregatorUser = "system:aggregator"
specDownloadTimeout = 60 * time.Second
)
type openAPIAggregator struct {
// Map of API Services' OpenAPI specs by their name
openAPISpecs map[string]*openAPISpecInfo
// provided for dynamic OpenAPI spec
openAPIService *handler.OpenAPIService
// Aggregator's OpenAPI spec (holds apiregistration group).
aggregatorOpenAPISpec *spec.Swagger
// Local (in process) delegate's OpenAPI spec.
inProcessDelegatesOpenAPISpec *spec.Swagger
contextMapper request.RequestContextMapper
}
func buildAndRegisterOpenAPIAggregator(delegateHandler http.Handler, webServices []*restful.WebService, config *common.Config, pathHandler common.PathHandler, contextMapper request.RequestContextMapper) (s *openAPIAggregator, err error) {
s = &openAPIAggregator{
openAPISpecs: map[string]*openAPISpecInfo{},
contextMapper: contextMapper,
}
// Get Local delegate's Spec
s.inProcessDelegatesOpenAPISpec, err = s.downloadOpenAPISpec(delegateHandler)
if err != nil {
return nil, err
}
// Build Aggregator's spec
s.aggregatorOpenAPISpec, err = builder.BuildOpenAPISpec(
webServices, config)
if err != nil {
return nil, err
}
// Remove any non-API endpoints from aggregator's spec. aggregatorOpenAPISpec
// is the source of truth for all non-api endpoints.
aggregator.FilterSpecByPaths(s.aggregatorOpenAPISpec, []string{"/apis/"})
// Build initial spec to serve.
specToServe, err := s.buildOpenAPISpec()
if err != nil {
return nil, err
}
// Install handler
s.openAPIService, err = handler.RegisterOpenAPIService(
specToServe, "/swagger.json", pathHandler)
if err != nil {
return nil, err
}
return s, nil
}
// openAPISpecInfo is used to store OpenAPI spec with its priority.
// It can be used to sort specs with their priorities.
type openAPISpecInfo struct {
apiService apiregistration.APIService
spec *spec.Swagger
}
// byPriority can be used in sort.Sort to sort specs with their priorities.
type byPriority struct {
specs []openAPISpecInfo
groupPriorities map[string]int32
}
func (a byPriority) Len() int { return len(a.specs) }
func (a byPriority) Swap(i, j int) { a.specs[i], a.specs[j] = a.specs[j], a.specs[i] }
func (a byPriority) Less(i, j int) bool {
var iPriority, jPriority int32
if a.specs[i].apiService.Spec.Group == a.specs[j].apiService.Spec.Group {
iPriority = a.specs[i].apiService.Spec.VersionPriority
jPriority = a.specs[i].apiService.Spec.VersionPriority
} else {
iPriority = a.groupPriorities[a.specs[i].apiService.Spec.Group]
jPriority = a.groupPriorities[a.specs[j].apiService.Spec.Group]
}
if iPriority != jPriority {
// Sort by priority, higher first
return iPriority > jPriority
}
// Sort by service name.
return a.specs[i].apiService.Name < a.specs[j].apiService.Name
}
func sortByPriority(specs []openAPISpecInfo) {
b := byPriority{
specs: specs,
groupPriorities: map[string]int32{},
}
for _, spec := range specs {
if pr, found := b.groupPriorities[spec.apiService.Spec.Group]; !found || spec.apiService.Spec.GroupPriorityMinimum > pr {
b.groupPriorities[spec.apiService.Spec.Group] = spec.apiService.Spec.GroupPriorityMinimum
}
}
sort.Sort(b)
}
// buildOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe.
func (s *openAPIAggregator) buildOpenAPISpec() (specToReturn *spec.Swagger, err error) {
specToReturn, err = aggregator.CloneSpec(s.inProcessDelegatesOpenAPISpec)
if err != nil {
return nil, err
}
if err := aggregator.MergeSpecs(specToReturn, s.aggregatorOpenAPISpec); err != nil {
return nil, fmt.Errorf("cannot merge local delegate spec with aggregator spec: %s", err.Error())
}
specs := []openAPISpecInfo{}
for _, specInfo := range s.openAPISpecs {
specs = append(specs, openAPISpecInfo{specInfo.apiService, specInfo.spec})
}
sortByPriority(specs)
for _, specInfo := range specs {
if err := aggregator.MergeSpecs(specToReturn, specInfo.spec); err != nil {
return nil, err
}
}
return specToReturn, nil
}
// updateOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe.
func (s *openAPIAggregator) updateOpenAPISpec() error {
if s.openAPIService == nil {
return nil
}
specToServe, err := s.buildOpenAPISpec()
if err != nil {
return err
}
return s.openAPIService.UpdateSpec(specToServe)
}
// inMemoryResponseWriter is a http.Writer that keep the response in memory.
type inMemoryResponseWriter struct {
header http.Header
respCode int
data []byte
}
func newInMemoryResponseWriter() *inMemoryResponseWriter {
return &inMemoryResponseWriter{header: http.Header{}}
}
func (r *inMemoryResponseWriter) Header() http.Header {
return r.header
}
func (r *inMemoryResponseWriter) WriteHeader(code int) {
r.respCode = code
}
func (r *inMemoryResponseWriter) Write(in []byte) (int, error) {
r.data = append(r.data, in...)
return len(in), nil
}
func (r *inMemoryResponseWriter) String() string {
s := fmt.Sprintf("ResponseCode: %d", r.respCode)
if r.data != nil {
s += fmt.Sprintf(", Body: %s", string(r.data))
}
if r.header != nil {
s += fmt.Sprintf(", Header: %s", r.header)
}
return s
}
func (s *openAPIAggregator) handlerWithUser(handler http.Handler, info user.Info) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if ctx, ok := s.contextMapper.Get(req); ok {
s.contextMapper.Update(req, request.WithUser(ctx, info))
}
handler.ServeHTTP(w, req)
})
}
// downloadOpenAPISpec downloads openAPI spec from /swagger.json endpoint of the given handler.
func (s *openAPIAggregator) downloadOpenAPISpec(handler http.Handler) (*spec.Swagger, error) {
handler = s.handlerWithUser(handler, &user.DefaultInfo{Name: aggregatorUser})
handler = request.WithRequestContext(handler, s.contextMapper)
handler = http.TimeoutHandler(handler, specDownloadTimeout, "request timed out")
req, err := http.NewRequest("GET", "/swagger.json", nil)
if err != nil {
return nil, err
}
writer := newInMemoryResponseWriter()
handler.ServeHTTP(writer, req)
switch writer.respCode {
case http.StatusOK:
openApiSpec := &spec.Swagger{}
if err := json.Unmarshal(writer.data, openApiSpec); err != nil {
return nil, err
}
return openApiSpec, nil
default:
return nil, fmt.Errorf("failed to retrive openAPI spec, http error: %s", writer.String())
}
}
// loadApiServiceSpec loads OpenAPI spec for the given API Service and then updates aggregator's spec.
func (s *openAPIAggregator) loadApiServiceSpec(handler http.Handler, apiService *apiregistration.APIService) error {
// Ignore local services
if apiService.Spec.Service == nil {
return nil
}
openApiSpec, err := s.downloadOpenAPISpec(handler)
if err != nil {
return err
}
aggregator.FilterSpecByPaths(openApiSpec, []string{"/apis/" + apiService.Spec.Group + "/"})
s.openAPISpecs[apiService.Name] = &openAPISpecInfo{
apiService: *apiService,
spec: openApiSpec,
}
err = s.updateOpenAPISpec()
if err != nil {
delete(s.openAPISpecs, apiService.Name)
return err
}
return nil
}

View File

@ -0,0 +1,68 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"reflect"
"testing"
"github.com/go-openapi/spec"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
)
func newApiServiceForTest(name, group string, minGroupPriority, versionPriority int32) apiregistration.APIService {
r := apiregistration.APIService{}
r.Spec.Group = group
r.Spec.GroupPriorityMinimum = minGroupPriority
r.Spec.VersionPriority = versionPriority
r.Name = name
return r
}
func assertSortedServices(t *testing.T, actual []openAPISpecInfo, expectedNames []string) {
actualNames := []string{}
for _, a := range actual {
actualNames = append(actualNames, a.apiService.Name)
}
if !reflect.DeepEqual(actualNames, expectedNames) {
t.Errorf("Expected %s got %s.", expectedNames, actualNames)
}
}
func TestApiServiceSort(t *testing.T) {
list := []openAPISpecInfo{
{
apiService: newApiServiceForTest("FirstService", "Group1", 10, 5),
spec: &spec.Swagger{},
},
{
apiService: newApiServiceForTest("SecondService", "Group2", 15, 3),
spec: &spec.Swagger{},
},
{
apiService: newApiServiceForTest("FirstServiceInternal", "Group1", 16, 3),
spec: &spec.Swagger{},
},
{
apiService: newApiServiceForTest("ThirdService", "Group3", 15, 3),
spec: &spec.Swagger{},
},
}
sortByPriority(list)
assertSortedServices(t, list, []string{"FirstService", "FirstServiceInternal", "SecondService", "ThirdService"})
}