Consolidate local OpenAPI specs and APIServices' spec into one data structure

Remove APIService OpenAPI spec when it is deleted

Add eTag support and returning httpStatus to OpenAPI spec downloader

Update aggregated OpenAPI spec periodically

Use delegate chain

Refactor OpenAPI aggregator to have separate controller and aggregation function

Enable OpenAPI spec for extensions api server

Do not filter paths. higher priority specs wins the conflicting paths

Move OpenAPI aggregation controller to pkg/controller/openapi
This commit is contained in:
mbohlool 2017-08-17 12:49:19 -07:00
parent 7cbdb90890
commit 76e24f216f
9 changed files with 811 additions and 348 deletions

View File

@ -165,6 +165,9 @@ func CreateServerChain(runOptions *options.ServerRunOptions, stopCh <-chan struc
// this wires up openapi
kubeAPIServer.GenericAPIServer.PrepareRun()
// This will wire up openapi for extension api server
apiExtensionsServer.GenericAPIServer.PrepareRun()
// aggregator comes last in the chain
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, runOptions, versionedInformers, serviceResolver, proxyTransport)
if err != nil {

View File

@ -20,4 +20,5 @@ limitations under the License.
// Package v1beta1 is the v1beta1 version of the API.
// +groupName=apiextensions.k8s.io
// +k8s:openapi-gen=true
package v1beta1 // import "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"

View File

@ -17,7 +17,6 @@ limitations under the License.
package apiserver
import (
"fmt"
"net/http"
"time"
@ -27,7 +26,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
@ -41,6 +39,7 @@ import (
"k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset"
informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion"
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion"
openapicontroller "k8s.io/kube-aggregator/pkg/controllers/openapi"
statuscontrollers "k8s.io/kube-aggregator/pkg/controllers/status"
apiservicestorage "k8s.io/kube-aggregator/pkg/registry/apiservice/etcd"
)
@ -119,7 +118,7 @@ type APIAggregator struct {
// Information needed to determine routing for the aggregator
serviceResolver ServiceResolver
openAPIAggregator *openAPIAggregator
openAPIAggregationController *openapicontroller.AggregationController
}
type completedConfig struct {
@ -222,15 +221,22 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
})
if openApiConfig != nil {
s.openAPIAggregator, err = buildAndRegisterOpenAPIAggregator(
s.delegateHandler,
specDownloader := openapicontroller.NewDownloader(s.contextMapper)
openAPIAggregator, err := openapicontroller.BuildAndRegisterAggregator(
&specDownloader,
delegationTarget,
s.GenericAPIServer.Handler.GoRestfulContainer.RegisteredWebServices(),
openApiConfig,
s.GenericAPIServer.Handler.NonGoRestfulMux,
s.contextMapper)
s.GenericAPIServer.Handler.NonGoRestfulMux)
if err != nil {
return nil, err
}
s.openAPIAggregationController = openapicontroller.NewAggregationController(&specDownloader, openAPIAggregator)
s.GenericAPIServer.AddPostStartHook("apiservice-openapi-controller", func(context genericapiserver.PostStartHookContext) error {
go s.openAPIAggregationController.Run(context.StopCh)
return nil
})
}
return s, nil
@ -243,7 +249,10 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) er
// since they are wired against listers because they require multiple resources to respond
if proxyHandler, exists := s.proxyHandlers[apiService.Name]; exists {
proxyHandler.updateAPIService(apiService)
return s.openAPIAggregator.loadApiServiceSpec(proxyHandler, apiService)
if s.openAPIAggregationController != nil {
s.openAPIAggregationController.UpdateAPIService(proxyHandler, apiService)
}
return nil
}
proxyPath := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version
@ -262,8 +271,8 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) er
serviceResolver: s.serviceResolver,
}
proxyHandler.updateAPIService(apiService)
if err := s.openAPIAggregator.loadApiServiceSpec(proxyHandler, apiService); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to load OpenAPI spec for API service %s: %v", apiService.Name, err))
if s.openAPIAggregationController != nil {
s.openAPIAggregationController.AddAPIService(proxyHandler, apiService)
}
s.proxyHandlers[apiService.Name] = proxyHandler
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandler)
@ -307,6 +316,9 @@ func (s *APIAggregator) RemoveAPIService(apiServiceName string) {
}
s.GenericAPIServer.Handler.NonGoRestfulMux.Unregister(proxyPath)
s.GenericAPIServer.Handler.NonGoRestfulMux.Unregister(proxyPath + "/")
if s.openAPIAggregationController != nil {
s.openAPIAggregationController.RemoveAPIService(apiServiceName)
}
delete(s.proxyHandlers, apiServiceName)
// TODO unregister group level discovery when there are no more versions for the group

View File

@ -1,270 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"encoding/json"
"fmt"
"net/http"
"sort"
"time"
"github.com/emicklei/go-restful"
"github.com/go-openapi/spec"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
"k8s.io/kube-openapi/pkg/aggregator"
"k8s.io/kube-openapi/pkg/builder"
"k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/handler"
)
const (
aggregatorUser = "system:aggregator"
specDownloadTimeout = 60 * time.Second
)
type openAPIAggregator struct {
// Map of API Services' OpenAPI specs by their name
openAPISpecs map[string]*openAPISpecInfo
// provided for dynamic OpenAPI spec
openAPIService *handler.OpenAPIService
// Aggregator's OpenAPI spec (holds apiregistration group).
aggregatorOpenAPISpec *spec.Swagger
// Local (in process) delegate's OpenAPI spec.
inProcessDelegatesOpenAPISpec *spec.Swagger
contextMapper request.RequestContextMapper
}
func buildAndRegisterOpenAPIAggregator(delegateHandler http.Handler, webServices []*restful.WebService, config *common.Config, pathHandler common.PathHandler, contextMapper request.RequestContextMapper) (s *openAPIAggregator, err error) {
s = &openAPIAggregator{
openAPISpecs: map[string]*openAPISpecInfo{},
contextMapper: contextMapper,
}
// Get Local delegate's Spec
s.inProcessDelegatesOpenAPISpec, err = s.downloadOpenAPISpec(delegateHandler)
if err != nil {
return nil, err
}
// Build Aggregator's spec
s.aggregatorOpenAPISpec, err = builder.BuildOpenAPISpec(
webServices, config)
if err != nil {
return nil, err
}
// Remove any non-API endpoints from aggregator's spec. aggregatorOpenAPISpec
// is the source of truth for all non-api endpoints.
aggregator.FilterSpecByPaths(s.aggregatorOpenAPISpec, []string{"/apis/"})
// Build initial spec to serve.
specToServe, err := s.buildOpenAPISpec()
if err != nil {
return nil, err
}
// Install handler
s.openAPIService, err = handler.RegisterOpenAPIService(
specToServe, "/swagger.json", pathHandler)
if err != nil {
return nil, err
}
return s, nil
}
// openAPISpecInfo is used to store OpenAPI spec with its priority.
// It can be used to sort specs with their priorities.
type openAPISpecInfo struct {
apiService apiregistration.APIService
spec *spec.Swagger
}
// byPriority can be used in sort.Sort to sort specs with their priorities.
type byPriority struct {
specs []openAPISpecInfo
groupPriorities map[string]int32
}
func (a byPriority) Len() int { return len(a.specs) }
func (a byPriority) Swap(i, j int) { a.specs[i], a.specs[j] = a.specs[j], a.specs[i] }
func (a byPriority) Less(i, j int) bool {
var iPriority, jPriority int32
if a.specs[i].apiService.Spec.Group == a.specs[j].apiService.Spec.Group {
iPriority = a.specs[i].apiService.Spec.VersionPriority
jPriority = a.specs[i].apiService.Spec.VersionPriority
} else {
iPriority = a.groupPriorities[a.specs[i].apiService.Spec.Group]
jPriority = a.groupPriorities[a.specs[j].apiService.Spec.Group]
}
if iPriority != jPriority {
// Sort by priority, higher first
return iPriority > jPriority
}
// Sort by service name.
return a.specs[i].apiService.Name < a.specs[j].apiService.Name
}
func sortByPriority(specs []openAPISpecInfo) {
b := byPriority{
specs: specs,
groupPriorities: map[string]int32{},
}
for _, spec := range specs {
if pr, found := b.groupPriorities[spec.apiService.Spec.Group]; !found || spec.apiService.Spec.GroupPriorityMinimum > pr {
b.groupPriorities[spec.apiService.Spec.Group] = spec.apiService.Spec.GroupPriorityMinimum
}
}
sort.Sort(b)
}
// buildOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe.
func (s *openAPIAggregator) buildOpenAPISpec() (specToReturn *spec.Swagger, err error) {
specToReturn, err = aggregator.CloneSpec(s.inProcessDelegatesOpenAPISpec)
if err != nil {
return nil, err
}
if err := aggregator.MergeSpecs(specToReturn, s.aggregatorOpenAPISpec); err != nil {
return nil, fmt.Errorf("cannot merge local delegate spec with aggregator spec: %s", err.Error())
}
specs := []openAPISpecInfo{}
for _, specInfo := range s.openAPISpecs {
specs = append(specs, openAPISpecInfo{specInfo.apiService, specInfo.spec})
}
sortByPriority(specs)
for _, specInfo := range specs {
if err := aggregator.MergeSpecs(specToReturn, specInfo.spec); err != nil {
return nil, err
}
}
return specToReturn, nil
}
// updateOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe.
func (s *openAPIAggregator) updateOpenAPISpec() error {
if s.openAPIService == nil {
return nil
}
specToServe, err := s.buildOpenAPISpec()
if err != nil {
return err
}
return s.openAPIService.UpdateSpec(specToServe)
}
// inMemoryResponseWriter is a http.Writer that keep the response in memory.
type inMemoryResponseWriter struct {
header http.Header
respCode int
data []byte
}
func newInMemoryResponseWriter() *inMemoryResponseWriter {
return &inMemoryResponseWriter{header: http.Header{}}
}
func (r *inMemoryResponseWriter) Header() http.Header {
return r.header
}
func (r *inMemoryResponseWriter) WriteHeader(code int) {
r.respCode = code
}
func (r *inMemoryResponseWriter) Write(in []byte) (int, error) {
r.data = append(r.data, in...)
return len(in), nil
}
func (r *inMemoryResponseWriter) String() string {
s := fmt.Sprintf("ResponseCode: %d", r.respCode)
if r.data != nil {
s += fmt.Sprintf(", Body: %s", string(r.data))
}
if r.header != nil {
s += fmt.Sprintf(", Header: %s", r.header)
}
return s
}
func (s *openAPIAggregator) handlerWithUser(handler http.Handler, info user.Info) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if ctx, ok := s.contextMapper.Get(req); ok {
s.contextMapper.Update(req, request.WithUser(ctx, info))
}
handler.ServeHTTP(w, req)
})
}
// downloadOpenAPISpec downloads openAPI spec from /swagger.json endpoint of the given handler.
func (s *openAPIAggregator) downloadOpenAPISpec(handler http.Handler) (*spec.Swagger, error) {
handler = s.handlerWithUser(handler, &user.DefaultInfo{Name: aggregatorUser})
handler = request.WithRequestContext(handler, s.contextMapper)
handler = http.TimeoutHandler(handler, specDownloadTimeout, "request timed out")
req, err := http.NewRequest("GET", "/swagger.json", nil)
if err != nil {
return nil, err
}
writer := newInMemoryResponseWriter()
handler.ServeHTTP(writer, req)
switch writer.respCode {
case http.StatusOK:
openApiSpec := &spec.Swagger{}
if err := json.Unmarshal(writer.data, openApiSpec); err != nil {
return nil, err
}
return openApiSpec, nil
default:
return nil, fmt.Errorf("failed to retrive openAPI spec, http error: %s", writer.String())
}
}
// loadApiServiceSpec loads OpenAPI spec for the given API Service and then updates aggregator's spec.
func (s *openAPIAggregator) loadApiServiceSpec(handler http.Handler, apiService *apiregistration.APIService) error {
// Ignore local services
if apiService.Spec.Service == nil {
return nil
}
openApiSpec, err := s.downloadOpenAPISpec(handler)
if err != nil {
return err
}
aggregator.FilterSpecByPaths(openApiSpec, []string{"/apis/" + apiService.Spec.Group + "/"})
s.openAPISpecs[apiService.Name] = &openAPISpecInfo{
apiService: *apiService,
spec: openApiSpec,
}
err = s.updateOpenAPISpec()
if err != nil {
delete(s.openAPISpecs, apiService.Name)
return err
}
return nil
}

View File

@ -1,68 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"reflect"
"testing"
"github.com/go-openapi/spec"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
)
func newApiServiceForTest(name, group string, minGroupPriority, versionPriority int32) apiregistration.APIService {
r := apiregistration.APIService{}
r.Spec.Group = group
r.Spec.GroupPriorityMinimum = minGroupPriority
r.Spec.VersionPriority = versionPriority
r.Name = name
return r
}
func assertSortedServices(t *testing.T, actual []openAPISpecInfo, expectedNames []string) {
actualNames := []string{}
for _, a := range actual {
actualNames = append(actualNames, a.apiService.Name)
}
if !reflect.DeepEqual(actualNames, expectedNames) {
t.Errorf("Expected %s got %s.", expectedNames, actualNames)
}
}
func TestApiServiceSort(t *testing.T) {
list := []openAPISpecInfo{
{
apiService: newApiServiceForTest("FirstService", "Group1", 10, 5),
spec: &spec.Swagger{},
},
{
apiService: newApiServiceForTest("SecondService", "Group2", 15, 3),
spec: &spec.Swagger{},
},
{
apiService: newApiServiceForTest("FirstServiceInternal", "Group1", 16, 3),
spec: &spec.Swagger{},
},
{
apiService: newApiServiceForTest("ThirdService", "Group3", 15, 3),
spec: &spec.Swagger{},
},
}
sortByPriority(list)
assertSortedServices(t, list, []string{"FirstService", "FirstServiceInternal", "SecondService", "ThirdService"})
}

View File

@ -0,0 +1,318 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package openapi
import (
"fmt"
"net/http"
"sort"
"sync"
"time"
"github.com/emicklei/go-restful"
"github.com/go-openapi/spec"
"k8s.io/apiserver/pkg/server"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
"k8s.io/kube-openapi/pkg/aggregator"
"k8s.io/kube-openapi/pkg/builder"
"k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/handler"
)
const (
aggregatorUser = "system:aggregator"
specDownloadTimeout = 60 * time.Second
localDelegateChainNamePattern = "k8s_internal_local_delegation_chain_%010d"
// A randomly generated UUID to differentiate local and remote eTags.
locallyGeneratedEtagPrefix = "\"6E8F849B434D4B98A569B9D7718876E9-"
)
type specAggregator struct {
// mutex protects all members of this struct.
rwMutex sync.RWMutex
// Map of API Services' OpenAPI specs by their name
openAPISpecs map[string]*openAPISpecInfo
// provided for dynamic OpenAPI spec
openAPIService *handler.OpenAPIService
}
var _ AggregationManager = &specAggregator{}
// This function is not thread safe as it only being called on startup.
func (s *specAggregator) addLocalSpec(spec *spec.Swagger, localHandler http.Handler, name, etag string) {
localAPIService := apiregistration.APIService{}
localAPIService.Name = name
s.openAPISpecs[name] = &openAPISpecInfo{
etag: etag,
apiService: localAPIService,
handler: localHandler,
spec: spec,
}
}
// BuildAndRegisterAggregator registered OpenAPI aggregator handler. This function is not thread safe as it only being called on startup.
func BuildAndRegisterAggregator(downloader *Downloader, delegationTarget server.DelegationTarget, webServices []*restful.WebService,
config *common.Config, pathHandler common.PathHandler) (AggregationManager, error) {
s := &specAggregator{
openAPISpecs: map[string]*openAPISpecInfo{},
}
i := 0
// Build Aggregator's spec
aggregatorOpenAPISpec, err := builder.BuildOpenAPISpec(
webServices, config)
if err != nil {
return nil, err
}
// Reserving non-name spec for aggregator's Spec.
s.addLocalSpec(aggregatorOpenAPISpec, nil, fmt.Sprintf(localDelegateChainNamePattern, i), "")
i++
for delegate := delegationTarget; delegate != nil; delegate = delegate.NextDelegate() {
handler := delegate.UnprotectedHandler()
if handler == nil {
continue
}
delegateSpec, etag, _, err := downloader.Download(handler, "")
if err != nil {
return nil, err
}
if delegateSpec == nil {
continue
}
s.addLocalSpec(delegateSpec, handler, fmt.Sprintf(localDelegateChainNamePattern, i), etag)
i++
}
// Build initial spec to serve.
specToServe, err := s.buildOpenAPISpec()
if err != nil {
return nil, err
}
// Install handler
s.openAPIService, err = handler.RegisterOpenAPIService(
specToServe, "/swagger.json", pathHandler)
if err != nil {
return nil, err
}
return s, nil
}
// openAPISpecInfo is used to store OpenAPI spec with its priority.
// It can be used to sort specs with their priorities.
type openAPISpecInfo struct {
apiService apiregistration.APIService
// Specification of this API Service. If null then the spec is not loaded yet.
spec *spec.Swagger
handler http.Handler
etag string
}
// byPriority can be used in sort.Sort to sort specs with their priorities.
type byPriority struct {
specs []openAPISpecInfo
groupPriorities map[string]int32
}
func (a byPriority) Len() int { return len(a.specs) }
func (a byPriority) Swap(i, j int) { a.specs[i], a.specs[j] = a.specs[j], a.specs[i] }
func (a byPriority) Less(i, j int) bool {
// All local specs will come first
// WARNING: This will result in not following priorities for local APIServices.
if a.specs[i].apiService.Spec.Service == nil {
// Sort local specs with their name. This is the order in the delegation chain (aggregator first).
return a.specs[i].apiService.Name < a.specs[j].apiService.Name
}
var iPriority, jPriority int32
if a.specs[i].apiService.Spec.Group == a.specs[j].apiService.Spec.Group {
iPriority = a.specs[i].apiService.Spec.VersionPriority
jPriority = a.specs[i].apiService.Spec.VersionPriority
} else {
iPriority = a.groupPriorities[a.specs[i].apiService.Spec.Group]
jPriority = a.groupPriorities[a.specs[j].apiService.Spec.Group]
}
if iPriority != jPriority {
// Sort by priority, higher first
return iPriority > jPriority
}
// Sort by service name.
return a.specs[i].apiService.Name < a.specs[j].apiService.Name
}
func sortByPriority(specs []openAPISpecInfo) {
b := byPriority{
specs: specs,
groupPriorities: map[string]int32{},
}
for _, spec := range specs {
if spec.apiService.Spec.Service == nil {
continue
}
if pr, found := b.groupPriorities[spec.apiService.Spec.Group]; !found || spec.apiService.Spec.GroupPriorityMinimum > pr {
b.groupPriorities[spec.apiService.Spec.Group] = spec.apiService.Spec.GroupPriorityMinimum
}
}
sort.Sort(b)
}
// buildOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe. The caller is responsible to hold proper locks.
func (s *specAggregator) buildOpenAPISpec() (specToReturn *spec.Swagger, err error) {
specs := []openAPISpecInfo{}
for _, specInfo := range s.openAPISpecs {
if specInfo.spec == nil {
continue
}
specs = append(specs, *specInfo)
}
if len(specs) == 0 {
return &spec.Swagger{}, nil
}
sortByPriority(specs)
for _, specInfo := range specs {
// TODO: Make kube-openapi.MergeSpec(s) accept nil or empty spec as destination and just clone the spec in that case.
if specToReturn == nil {
specToReturn, err = aggregator.CloneSpec(specInfo.spec)
if err != nil {
return nil, err
}
continue
}
if err := aggregator.MergeSpecsIgnorePathConflict(specToReturn, specInfo.spec); err != nil {
return nil, err
}
}
return specToReturn, nil
}
// updateOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe. The caller is responsible to hold proper locks.
func (s *specAggregator) updateOpenAPISpec() error {
if s.openAPIService == nil {
return nil
}
specToServe, err := s.buildOpenAPISpec()
if err != nil {
return err
}
return s.openAPIService.UpdateSpec(specToServe)
}
// tryUpdatingServiceSpecs tries updating openAPISpecs map with specified specInfo, and keeps the map intact
// if the update fails.
func (s *specAggregator) tryUpdatingServiceSpecs(specInfo *openAPISpecInfo) error {
orgSpecInfo, exists := s.openAPISpecs[specInfo.apiService.Name]
s.openAPISpecs[specInfo.apiService.Name] = specInfo
if err := s.updateOpenAPISpec(); err != nil {
if exists {
s.openAPISpecs[specInfo.apiService.Name] = orgSpecInfo
} else {
delete(s.openAPISpecs, specInfo.apiService.Name)
}
return err
}
return nil
}
// tryDeleteServiceSpecs tries delete specified specInfo from openAPISpecs map, and keeps the map intact
// if the update fails.
func (s *specAggregator) tryDeleteServiceSpecs(apiServiceName string) error {
orgSpecInfo, exists := s.openAPISpecs[apiServiceName]
if !exists {
return nil
}
delete(s.openAPISpecs, apiServiceName)
if err := s.updateOpenAPISpec(); err != nil {
s.openAPISpecs[apiServiceName] = orgSpecInfo
return err
}
return nil
}
// UpdateAPIServiceSpec updates the api service's OpenAPI spec. It is thread safe.
func (s *specAggregator) UpdateAPIServiceSpec(apiServiceName string, spec *spec.Swagger, etag string) error {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
specInfo, existingService := s.openAPISpecs[apiServiceName]
if !existingService {
return fmt.Errorf("APIService %q does not exists", apiServiceName)
}
// For APIServices (non-local) specs, only merge their /apis/ prefixed endpoint as it is the only paths
// proxy handler delegates.
if specInfo.apiService.Spec.Service != nil {
aggregator.FilterSpecByPaths(spec, []string{"/apis/"})
}
return s.tryUpdatingServiceSpecs(&openAPISpecInfo{
apiService: specInfo.apiService,
spec: spec,
handler: specInfo.handler,
etag: etag,
})
}
// AddUpdateAPIService adds or updates the api service. It is thread safe.
func (s *specAggregator) AddUpdateAPIService(handler http.Handler, apiService *apiregistration.APIService) error {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
if apiService.Spec.Service == nil {
// All local specs should be already aggregated using local delegate chain
return nil
}
newSpec := &openAPISpecInfo{
apiService: *apiService,
handler: handler,
}
if specInfo, existingService := s.openAPISpecs[apiService.Name]; existingService {
newSpec.etag = specInfo.etag
newSpec.spec = specInfo.spec
}
return s.tryUpdatingServiceSpecs(newSpec)
}
// RemoveAPIServiceSpec removes an api service from OpenAPI aggregation. If it does not exist, no error is returned.
// It is thread safe.
func (s *specAggregator) RemoveAPIServiceSpec(apiServiceName string) error {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
if _, existingService := s.openAPISpecs[apiServiceName]; !existingService {
return nil
}
return s.tryDeleteServiceSpecs(apiServiceName)
}
// GetAPIServiceSpec returns api service spec info
func (s *specAggregator) GetAPIServiceInfo(apiServiceName string) (handler http.Handler, etag string, exists bool) {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
if info, existingService := s.openAPISpecs[apiServiceName]; existingService {
return info.handler, info.etag, true
}
return nil, "", false
}

View File

@ -0,0 +1,135 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package openapi
import (
"fmt"
"net/http"
"reflect"
"testing"
"github.com/go-openapi/spec"
"github.com/stretchr/testify/assert"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
)
func newAPIServiceForTest(name, group string, minGroupPriority, versionPriority int32) apiregistration.APIService {
r := apiregistration.APIService{}
r.Spec.Group = group
r.Spec.GroupPriorityMinimum = minGroupPriority
r.Spec.VersionPriority = versionPriority
r.Spec.Service = &apiregistration.ServiceReference{}
r.Name = name
return r
}
func assertSortedServices(t *testing.T, actual []openAPISpecInfo, expectedNames []string) {
actualNames := []string{}
for _, a := range actual {
actualNames = append(actualNames, a.apiService.Name)
}
if !reflect.DeepEqual(actualNames, expectedNames) {
t.Errorf("Expected %s got %s.", expectedNames, actualNames)
}
}
func TestAPIServiceSort(t *testing.T) {
list := []openAPISpecInfo{
{
apiService: newAPIServiceForTest("FirstService", "Group1", 10, 5),
spec: &spec.Swagger{},
},
{
apiService: newAPIServiceForTest("SecondService", "Group2", 15, 3),
spec: &spec.Swagger{},
},
{
apiService: newAPIServiceForTest("FirstServiceInternal", "Group1", 16, 3),
spec: &spec.Swagger{},
},
{
apiService: newAPIServiceForTest("ThirdService", "Group3", 15, 3),
spec: &spec.Swagger{},
},
}
sortByPriority(list)
assertSortedServices(t, list, []string{"FirstService", "FirstServiceInternal", "SecondService", "ThirdService"})
}
type handlerTest struct {
etag string
data []byte
}
var _ http.Handler = handlerTest{}
func (h handlerTest) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if len(h.etag) > 0 {
w.Header().Add("Etag", h.etag)
}
ifNoneMatches := r.Header["If-None-Match"]
for _, match := range ifNoneMatches {
if match == h.etag {
w.WriteHeader(http.StatusNotModified)
return
}
}
w.Write(h.data)
}
func assertDownloadedSpec(actualSpec *spec.Swagger, actualEtag string, err error,
expectedSpecID string, expectedEtag string) error {
if err != nil {
return fmt.Errorf("downloadOpenAPISpec failed : %s", err)
}
if expectedSpecID == "" && actualSpec != nil {
return fmt.Errorf("expected Not Modified, actual ID %s", actualSpec.ID)
}
if actualSpec != nil && actualSpec.ID != expectedSpecID {
return fmt.Errorf("expected ID %s, actual ID %s", expectedSpecID, actualSpec.ID)
}
if actualEtag != expectedEtag {
return fmt.Errorf("expected ETag '%s', actual ETag '%s'", expectedEtag, actualEtag)
}
return nil
}
func TestDownloadOpenAPISpec(t *testing.T) {
s := Downloader{contextMapper: request.NewRequestContextMapper()}
// Test with no eTag
actualSpec, actualEtag, _, err := s.Download(handlerTest{data: []byte("{\"id\": \"test\"}")}, "")
assert.NoError(t, assertDownloadedSpec(actualSpec, actualEtag, err, "test", "\"6E8F849B434D4B98A569B9D7718876E9-356ECAB19D7FBE1336BABB1E70F8F3025050DE218BE78256BE81620681CFC9A268508E542B8B55974E17B2184BBFC8FFFAA577E51BE195D32B3CA2547818ABE4\""))
// Test with eTag
actualSpec, actualEtag, _, err = s.Download(
handlerTest{data: []byte("{\"id\": \"test\"}"), etag: "etag_test"}, "")
assert.NoError(t, assertDownloadedSpec(actualSpec, actualEtag, err, "test", "etag_test"))
// Test not modified
actualSpec, actualEtag, _, err = s.Download(
handlerTest{data: []byte("{\"id\": \"test\"}"), etag: "etag_test"}, "etag_test")
assert.NoError(t, assertDownloadedSpec(actualSpec, actualEtag, err, "", "etag_test"))
// Test different eTags
actualSpec, actualEtag, _, err = s.Download(
handlerTest{data: []byte("{\"id\": \"test\"}"), etag: "etag_test1"}, "etag_test2")
assert.NoError(t, assertDownloadedSpec(actualSpec, actualEtag, err, "test", "etag_test1"))
}

View File

@ -0,0 +1,186 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package openapi
import (
"fmt"
"net/http"
"time"
"github.com/go-openapi/spec"
"github.com/golang/glog"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
)
const (
successfulUpdateDelay = time.Minute
failedUpdateMaxExpDelay = time.Hour
)
type syncAction int
const (
syncRequeue syncAction = iota
syncRequeueRateLimited
syncNothing
)
// AggregationManager is the interface between this controller and OpenAPI Aggregator service.
type AggregationManager interface {
AddUpdateAPIService(handler http.Handler, apiService *apiregistration.APIService) error
UpdateAPIServiceSpec(apiServiceName string, spec *spec.Swagger, etag string) error
RemoveAPIServiceSpec(apiServiceName string) error
GetAPIServiceInfo(apiServiceName string) (handler http.Handler, etag string, exists bool)
}
// AggregationController periodically check for changes in OpenAPI specs of APIServices and update/remove
// them if necessary.
type AggregationController struct {
openAPIAggregationManager AggregationManager
queue workqueue.RateLimitingInterface
downloader *Downloader
// To allow injection for testing.
syncHandler func(key string) (syncAction, error)
}
// NewAggregationController creates new OpenAPI aggregation controller.
func NewAggregationController(downloader *Downloader, openAPIAggregationManager AggregationManager) *AggregationController {
c := &AggregationController{
openAPIAggregationManager: openAPIAggregationManager,
queue: workqueue.NewNamedRateLimitingQueue(
workqueue.NewItemExponentialFailureRateLimiter(successfulUpdateDelay, failedUpdateMaxExpDelay), "APIServiceOpenAPIAggregationControllerQueue1"),
downloader: downloader,
}
c.syncHandler = c.sync
return c
}
// Run starts OpenAPI AggregationController
func (c *AggregationController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
glog.Infof("Starting OpenAPI AggregationController")
defer glog.Infof("Shutting down OpenAPI AggregationController")
go wait.Until(c.runWorker, time.Second, stopCh)
<-stopCh
}
func (c *AggregationController) runWorker() {
for c.processNextWorkItem() {
}
}
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
func (c *AggregationController) processNextWorkItem() bool {
key, quit := c.queue.Get()
defer c.queue.Done(key)
if quit {
return false
}
glog.Infof("OpenAPI AggregationController: Processing item %s", key)
action, err := c.syncHandler(key.(string))
if err == nil {
c.queue.Forget(key)
} else {
utilruntime.HandleError(fmt.Errorf("loading OpenAPI spec for %q failed with: %v", key, err))
}
switch action {
case syncRequeue:
glog.Infof("OpenAPI AggregationController: action for item %s: Requeue.", key)
c.queue.AddAfter(key, successfulUpdateDelay)
case syncRequeueRateLimited:
glog.Infof("OpenAPI AggregationController: action for item %s: Rate Limited Requeue.", key)
c.queue.AddRateLimited(key)
case syncNothing:
glog.Infof("OpenAPI AggregationController: action for item %s: Nothing (removed from the queue).", key)
}
return true
}
func (c *AggregationController) sync(key string) (syncAction, error) {
handler, etag, exists := c.openAPIAggregationManager.GetAPIServiceInfo(key)
if !exists || handler == nil {
return syncNothing, nil
}
returnSpec, newEtag, httpStatus, err := c.downloader.Download(handler, etag)
switch {
case err != nil:
return syncRequeueRateLimited, err
case httpStatus == http.StatusNotModified:
case httpStatus == http.StatusNotFound || returnSpec == nil:
return syncRequeueRateLimited, fmt.Errorf("OpenAPI spec does not exists")
case httpStatus == http.StatusOK:
if err := c.openAPIAggregationManager.UpdateAPIServiceSpec(key, returnSpec, newEtag); err != nil {
return syncRequeueRateLimited, err
}
}
return syncRequeue, nil
}
// AddAPIService adds a new API Service to OpenAPI Aggregation.
func (c *AggregationController) AddAPIService(handler http.Handler, apiService *apiregistration.APIService) {
if apiService.Spec.Service == nil {
return
}
if err := c.openAPIAggregationManager.AddUpdateAPIService(handler, apiService); err != nil {
utilruntime.HandleError(fmt.Errorf("adding %q to AggregationController failed with: %v", apiService.Name, err))
}
c.queue.AddAfter(apiService.Name, time.Second)
}
// UpdateAPIService updates API Service's info and handler.
func (c *AggregationController) UpdateAPIService(handler http.Handler, apiService *apiregistration.APIService) {
if apiService.Spec.Service == nil {
return
}
if err := c.openAPIAggregationManager.AddUpdateAPIService(handler, apiService); err != nil {
utilruntime.HandleError(fmt.Errorf("updating %q to AggregationController failed with: %v", apiService.Name, err))
}
key := apiService.Name
if c.queue.NumRequeues(key) > 0 {
// The item has failed before. Remove it from failure queue and
// update it in a second
c.queue.Forget(key)
c.queue.AddAfter(key, time.Second)
}
// Else: The item has been succeeded before and it will be updated soon (after successfulUpdateDelay)
// we don't add it again as it will cause a duplication of items.
}
// RemoveAPIService removes API Service from OpenAPI Aggregation Controller.
func (c *AggregationController) RemoveAPIService(apiServiceName string) {
if err := c.openAPIAggregationManager.RemoveAPIServiceSpec(apiServiceName); err != nil {
utilruntime.HandleError(fmt.Errorf("removing %q from AggregationController failed with: %v", apiServiceName, err))
}
// This will only remove it if it was failing before. If it was successful, processNextWorkItem will figure it out
// and will not add it again to the queue.
c.queue.Forget(apiServiceName)
}

View File

@ -0,0 +1,146 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package openapi
import (
"crypto/sha512"
"encoding/json"
"fmt"
"net/http"
"strings"
"github.com/go-openapi/spec"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
)
// Downloader is the OpenAPI downloader type. It will try to download spec from /swagger.json endpoint.
type Downloader struct {
contextMapper request.RequestContextMapper
}
// NewDownloader creates a new OpenAPI Downloader.
func NewDownloader(contextMapper request.RequestContextMapper) Downloader {
return Downloader{contextMapper}
}
// inMemoryResponseWriter is a http.Writer that keep the response in memory.
type inMemoryResponseWriter struct {
writeHeaderCalled bool
header http.Header
respCode int
data []byte
}
func newInMemoryResponseWriter() *inMemoryResponseWriter {
return &inMemoryResponseWriter{header: http.Header{}}
}
func (r *inMemoryResponseWriter) Header() http.Header {
return r.header
}
func (r *inMemoryResponseWriter) WriteHeader(code int) {
r.writeHeaderCalled = true
r.respCode = code
}
func (r *inMemoryResponseWriter) Write(in []byte) (int, error) {
if !r.writeHeaderCalled {
r.WriteHeader(http.StatusOK)
}
r.data = append(r.data, in...)
return len(in), nil
}
func (r *inMemoryResponseWriter) String() string {
s := fmt.Sprintf("ResponseCode: %d", r.respCode)
if r.data != nil {
s += fmt.Sprintf(", Body: %s", string(r.data))
}
if r.header != nil {
s += fmt.Sprintf(", Header: %s", r.header)
}
return s
}
func (s *Downloader) handlerWithUser(handler http.Handler, info user.Info) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if ctx, ok := s.contextMapper.Get(req); ok {
s.contextMapper.Update(req, request.WithUser(ctx, info))
}
handler.ServeHTTP(w, req)
})
}
func etagFor(data []byte) string {
return fmt.Sprintf("%s%X\"", locallyGeneratedEtagPrefix, sha512.Sum512(data))
}
// Download downloads openAPI spec from /swagger.json endpoint of the given handler.
// httpStatus is only valid if err == nil
func (s *Downloader) Download(handler http.Handler, etag string) (returnSpec *spec.Swagger, newEtag string, httpStatus int, err error) {
handler = s.handlerWithUser(handler, &user.DefaultInfo{Name: aggregatorUser})
handler = request.WithRequestContext(handler, s.contextMapper)
handler = http.TimeoutHandler(handler, specDownloadTimeout, "request timed out")
req, err := http.NewRequest("GET", "/swagger.json", nil)
if err != nil {
return nil, "", 0, err
}
// Only pass eTag if it is not generated locally
if len(etag) > 0 && !strings.HasPrefix(etag, locallyGeneratedEtagPrefix) {
req.Header.Add("If-None-Match", etag)
}
writer := newInMemoryResponseWriter()
handler.ServeHTTP(writer, req)
switch writer.respCode {
case http.StatusNotModified:
if len(etag) == 0 {
return nil, etag, http.StatusNotModified, fmt.Errorf("http.StatusNotModified is not allowed in absence of etag")
}
return nil, etag, http.StatusNotModified, nil
case http.StatusNotFound:
// Gracefully skip 404, assuming the server won't provide any spec
return nil, "", http.StatusNotFound, nil
case http.StatusOK:
openAPISpec := &spec.Swagger{}
if err := json.Unmarshal(writer.data, openAPISpec); err != nil {
return nil, "", 0, err
}
newEtag = writer.Header().Get("Etag")
if len(newEtag) == 0 {
newEtag = etagFor(writer.data)
if len(etag) > 0 && strings.HasPrefix(etag, locallyGeneratedEtagPrefix) {
// The function call with an etag and server does not report an etag.
// That means this server does not support etag and the etag that passed
// to the function generated previously by us. Just compare etags and
// return StatusNotModified if they are the same.
if etag == newEtag {
return nil, etag, http.StatusNotModified, nil
}
}
}
return openAPISpec, newEtag, http.StatusOK, nil
default:
return nil, "", 0, fmt.Errorf("failed to retrieve openAPI spec, http error: %s", writer.String())
}
}