diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 837799fd2f8..53d8b1f415b 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -165,6 +165,9 @@ func CreateServerChain(runOptions *options.ServerRunOptions, stopCh <-chan struc // this wires up openapi kubeAPIServer.GenericAPIServer.PrepareRun() + // This will wire up openapi for extension api server + apiExtensionsServer.GenericAPIServer.PrepareRun() + // aggregator comes last in the chain aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, runOptions, versionedInformers, serviceResolver, proxyTransport) if err != nil { diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1/doc.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1/doc.go index 3976988bf8f..5ef43c18457 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1/doc.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1/doc.go @@ -20,4 +20,5 @@ limitations under the License. // Package v1beta1 is the v1beta1 version of the API. // +groupName=apiextensions.k8s.io +// +k8s:openapi-gen=true package v1beta1 // import "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index 0d72b9e43df..65873c88e0c 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -17,7 +17,6 @@ limitations under the License. package apiserver import ( - "fmt" "net/http" "time" @@ -27,7 +26,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" @@ -41,6 +39,7 @@ import ( "k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset" informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion" listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion" + openapicontroller "k8s.io/kube-aggregator/pkg/controllers/openapi" statuscontrollers "k8s.io/kube-aggregator/pkg/controllers/status" apiservicestorage "k8s.io/kube-aggregator/pkg/registry/apiservice/etcd" ) @@ -119,7 +118,7 @@ type APIAggregator struct { // Information needed to determine routing for the aggregator serviceResolver ServiceResolver - openAPIAggregator *openAPIAggregator + openAPIAggregationController *openapicontroller.AggregationController } type completedConfig struct { @@ -222,15 +221,22 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg }) if openApiConfig != nil { - s.openAPIAggregator, err = buildAndRegisterOpenAPIAggregator( - s.delegateHandler, + specDownloader := openapicontroller.NewDownloader(s.contextMapper) + openAPIAggregator, err := openapicontroller.BuildAndRegisterAggregator( + &specDownloader, + delegationTarget, s.GenericAPIServer.Handler.GoRestfulContainer.RegisteredWebServices(), openApiConfig, - s.GenericAPIServer.Handler.NonGoRestfulMux, - s.contextMapper) + s.GenericAPIServer.Handler.NonGoRestfulMux) if err != nil { return nil, err } + s.openAPIAggregationController = openapicontroller.NewAggregationController(&specDownloader, openAPIAggregator) + + s.GenericAPIServer.AddPostStartHook("apiservice-openapi-controller", func(context genericapiserver.PostStartHookContext) error { + go s.openAPIAggregationController.Run(context.StopCh) + return nil + }) } return s, nil @@ -243,7 +249,10 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) er // since they are wired against listers because they require multiple resources to respond if proxyHandler, exists := s.proxyHandlers[apiService.Name]; exists { proxyHandler.updateAPIService(apiService) - return s.openAPIAggregator.loadApiServiceSpec(proxyHandler, apiService) + if s.openAPIAggregationController != nil { + s.openAPIAggregationController.UpdateAPIService(proxyHandler, apiService) + } + return nil } proxyPath := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version @@ -262,8 +271,8 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) er serviceResolver: s.serviceResolver, } proxyHandler.updateAPIService(apiService) - if err := s.openAPIAggregator.loadApiServiceSpec(proxyHandler, apiService); err != nil { - utilruntime.HandleError(fmt.Errorf("unable to load OpenAPI spec for API service %s: %v", apiService.Name, err)) + if s.openAPIAggregationController != nil { + s.openAPIAggregationController.AddAPIService(proxyHandler, apiService) } s.proxyHandlers[apiService.Name] = proxyHandler s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandler) @@ -307,6 +316,9 @@ func (s *APIAggregator) RemoveAPIService(apiServiceName string) { } s.GenericAPIServer.Handler.NonGoRestfulMux.Unregister(proxyPath) s.GenericAPIServer.Handler.NonGoRestfulMux.Unregister(proxyPath + "/") + if s.openAPIAggregationController != nil { + s.openAPIAggregationController.RemoveAPIService(apiServiceName) + } delete(s.proxyHandlers, apiServiceName) // TODO unregister group level discovery when there are no more versions for the group diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/openapi_aggregator.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/openapi_aggregator.go deleted file mode 100644 index f5941ec8070..00000000000 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/openapi_aggregator.go +++ /dev/null @@ -1,270 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package apiserver - -import ( - "encoding/json" - "fmt" - "net/http" - "sort" - "time" - - "github.com/emicklei/go-restful" - "github.com/go-openapi/spec" - - "k8s.io/apiserver/pkg/authentication/user" - "k8s.io/apiserver/pkg/endpoints/request" - "k8s.io/kube-aggregator/pkg/apis/apiregistration" - "k8s.io/kube-openapi/pkg/aggregator" - "k8s.io/kube-openapi/pkg/builder" - "k8s.io/kube-openapi/pkg/common" - "k8s.io/kube-openapi/pkg/handler" -) - -const ( - aggregatorUser = "system:aggregator" - specDownloadTimeout = 60 * time.Second -) - -type openAPIAggregator struct { - // Map of API Services' OpenAPI specs by their name - openAPISpecs map[string]*openAPISpecInfo - - // provided for dynamic OpenAPI spec - openAPIService *handler.OpenAPIService - - // Aggregator's OpenAPI spec (holds apiregistration group). - aggregatorOpenAPISpec *spec.Swagger - - // Local (in process) delegate's OpenAPI spec. - inProcessDelegatesOpenAPISpec *spec.Swagger - - contextMapper request.RequestContextMapper -} - -func buildAndRegisterOpenAPIAggregator(delegateHandler http.Handler, webServices []*restful.WebService, config *common.Config, pathHandler common.PathHandler, contextMapper request.RequestContextMapper) (s *openAPIAggregator, err error) { - s = &openAPIAggregator{ - openAPISpecs: map[string]*openAPISpecInfo{}, - contextMapper: contextMapper, - } - - // Get Local delegate's Spec - s.inProcessDelegatesOpenAPISpec, err = s.downloadOpenAPISpec(delegateHandler) - if err != nil { - return nil, err - } - - // Build Aggregator's spec - s.aggregatorOpenAPISpec, err = builder.BuildOpenAPISpec( - webServices, config) - if err != nil { - return nil, err - } - // Remove any non-API endpoints from aggregator's spec. aggregatorOpenAPISpec - // is the source of truth for all non-api endpoints. - aggregator.FilterSpecByPaths(s.aggregatorOpenAPISpec, []string{"/apis/"}) - - // Build initial spec to serve. - specToServe, err := s.buildOpenAPISpec() - if err != nil { - return nil, err - } - - // Install handler - s.openAPIService, err = handler.RegisterOpenAPIService( - specToServe, "/swagger.json", pathHandler) - if err != nil { - return nil, err - } - - return s, nil -} - -// openAPISpecInfo is used to store OpenAPI spec with its priority. -// It can be used to sort specs with their priorities. -type openAPISpecInfo struct { - apiService apiregistration.APIService - spec *spec.Swagger -} - -// byPriority can be used in sort.Sort to sort specs with their priorities. -type byPriority struct { - specs []openAPISpecInfo - groupPriorities map[string]int32 -} - -func (a byPriority) Len() int { return len(a.specs) } -func (a byPriority) Swap(i, j int) { a.specs[i], a.specs[j] = a.specs[j], a.specs[i] } -func (a byPriority) Less(i, j int) bool { - var iPriority, jPriority int32 - if a.specs[i].apiService.Spec.Group == a.specs[j].apiService.Spec.Group { - iPriority = a.specs[i].apiService.Spec.VersionPriority - jPriority = a.specs[i].apiService.Spec.VersionPriority - } else { - iPriority = a.groupPriorities[a.specs[i].apiService.Spec.Group] - jPriority = a.groupPriorities[a.specs[j].apiService.Spec.Group] - } - if iPriority != jPriority { - // Sort by priority, higher first - return iPriority > jPriority - } - // Sort by service name. - return a.specs[i].apiService.Name < a.specs[j].apiService.Name -} - -func sortByPriority(specs []openAPISpecInfo) { - b := byPriority{ - specs: specs, - groupPriorities: map[string]int32{}, - } - for _, spec := range specs { - if pr, found := b.groupPriorities[spec.apiService.Spec.Group]; !found || spec.apiService.Spec.GroupPriorityMinimum > pr { - b.groupPriorities[spec.apiService.Spec.Group] = spec.apiService.Spec.GroupPriorityMinimum - } - } - sort.Sort(b) -} - -// buildOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe. -func (s *openAPIAggregator) buildOpenAPISpec() (specToReturn *spec.Swagger, err error) { - specToReturn, err = aggregator.CloneSpec(s.inProcessDelegatesOpenAPISpec) - if err != nil { - return nil, err - } - if err := aggregator.MergeSpecs(specToReturn, s.aggregatorOpenAPISpec); err != nil { - return nil, fmt.Errorf("cannot merge local delegate spec with aggregator spec: %s", err.Error()) - } - specs := []openAPISpecInfo{} - for _, specInfo := range s.openAPISpecs { - specs = append(specs, openAPISpecInfo{specInfo.apiService, specInfo.spec}) - } - sortByPriority(specs) - for _, specInfo := range specs { - if err := aggregator.MergeSpecs(specToReturn, specInfo.spec); err != nil { - return nil, err - } - } - return specToReturn, nil -} - -// updateOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe. -func (s *openAPIAggregator) updateOpenAPISpec() error { - if s.openAPIService == nil { - return nil - } - specToServe, err := s.buildOpenAPISpec() - if err != nil { - return err - } - return s.openAPIService.UpdateSpec(specToServe) -} - -// inMemoryResponseWriter is a http.Writer that keep the response in memory. -type inMemoryResponseWriter struct { - header http.Header - respCode int - data []byte -} - -func newInMemoryResponseWriter() *inMemoryResponseWriter { - return &inMemoryResponseWriter{header: http.Header{}} -} - -func (r *inMemoryResponseWriter) Header() http.Header { - return r.header -} - -func (r *inMemoryResponseWriter) WriteHeader(code int) { - r.respCode = code -} - -func (r *inMemoryResponseWriter) Write(in []byte) (int, error) { - r.data = append(r.data, in...) - return len(in), nil -} - -func (r *inMemoryResponseWriter) String() string { - s := fmt.Sprintf("ResponseCode: %d", r.respCode) - if r.data != nil { - s += fmt.Sprintf(", Body: %s", string(r.data)) - } - if r.header != nil { - s += fmt.Sprintf(", Header: %s", r.header) - } - return s -} - -func (s *openAPIAggregator) handlerWithUser(handler http.Handler, info user.Info) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - if ctx, ok := s.contextMapper.Get(req); ok { - s.contextMapper.Update(req, request.WithUser(ctx, info)) - } - handler.ServeHTTP(w, req) - }) -} - -// downloadOpenAPISpec downloads openAPI spec from /swagger.json endpoint of the given handler. -func (s *openAPIAggregator) downloadOpenAPISpec(handler http.Handler) (*spec.Swagger, error) { - handler = s.handlerWithUser(handler, &user.DefaultInfo{Name: aggregatorUser}) - handler = request.WithRequestContext(handler, s.contextMapper) - handler = http.TimeoutHandler(handler, specDownloadTimeout, "request timed out") - - req, err := http.NewRequest("GET", "/swagger.json", nil) - if err != nil { - return nil, err - } - writer := newInMemoryResponseWriter() - handler.ServeHTTP(writer, req) - - switch writer.respCode { - case http.StatusOK: - openApiSpec := &spec.Swagger{} - if err := json.Unmarshal(writer.data, openApiSpec); err != nil { - return nil, err - } - return openApiSpec, nil - default: - return nil, fmt.Errorf("failed to retrive openAPI spec, http error: %s", writer.String()) - } -} - -// loadApiServiceSpec loads OpenAPI spec for the given API Service and then updates aggregator's spec. -func (s *openAPIAggregator) loadApiServiceSpec(handler http.Handler, apiService *apiregistration.APIService) error { - - // Ignore local services - if apiService.Spec.Service == nil { - return nil - } - - openApiSpec, err := s.downloadOpenAPISpec(handler) - if err != nil { - return err - } - aggregator.FilterSpecByPaths(openApiSpec, []string{"/apis/" + apiService.Spec.Group + "/"}) - - s.openAPISpecs[apiService.Name] = &openAPISpecInfo{ - apiService: *apiService, - spec: openApiSpec, - } - - err = s.updateOpenAPISpec() - if err != nil { - delete(s.openAPISpecs, apiService.Name) - return err - } - return nil -} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/openapi_aggregator_test.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/openapi_aggregator_test.go deleted file mode 100644 index 40130b88d80..00000000000 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/openapi_aggregator_test.go +++ /dev/null @@ -1,68 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package apiserver - -import ( - "reflect" - "testing" - - "github.com/go-openapi/spec" - - "k8s.io/kube-aggregator/pkg/apis/apiregistration" -) - -func newApiServiceForTest(name, group string, minGroupPriority, versionPriority int32) apiregistration.APIService { - r := apiregistration.APIService{} - r.Spec.Group = group - r.Spec.GroupPriorityMinimum = minGroupPriority - r.Spec.VersionPriority = versionPriority - r.Name = name - return r -} - -func assertSortedServices(t *testing.T, actual []openAPISpecInfo, expectedNames []string) { - actualNames := []string{} - for _, a := range actual { - actualNames = append(actualNames, a.apiService.Name) - } - if !reflect.DeepEqual(actualNames, expectedNames) { - t.Errorf("Expected %s got %s.", expectedNames, actualNames) - } -} - -func TestApiServiceSort(t *testing.T) { - list := []openAPISpecInfo{ - { - apiService: newApiServiceForTest("FirstService", "Group1", 10, 5), - spec: &spec.Swagger{}, - }, - { - apiService: newApiServiceForTest("SecondService", "Group2", 15, 3), - spec: &spec.Swagger{}, - }, - { - apiService: newApiServiceForTest("FirstServiceInternal", "Group1", 16, 3), - spec: &spec.Swagger{}, - }, - { - apiService: newApiServiceForTest("ThirdService", "Group3", 15, 3), - spec: &spec.Swagger{}, - }, - } - sortByPriority(list) - assertSortedServices(t, list, []string{"FirstService", "FirstServiceInternal", "SecondService", "ThirdService"}) -} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator.go new file mode 100644 index 00000000000..c32da4cdd79 --- /dev/null +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator.go @@ -0,0 +1,318 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package openapi + +import ( + "fmt" + "net/http" + "sort" + "sync" + "time" + + "github.com/emicklei/go-restful" + "github.com/go-openapi/spec" + + "k8s.io/apiserver/pkg/server" + "k8s.io/kube-aggregator/pkg/apis/apiregistration" + "k8s.io/kube-openapi/pkg/aggregator" + "k8s.io/kube-openapi/pkg/builder" + "k8s.io/kube-openapi/pkg/common" + "k8s.io/kube-openapi/pkg/handler" +) + +const ( + aggregatorUser = "system:aggregator" + specDownloadTimeout = 60 * time.Second + localDelegateChainNamePattern = "k8s_internal_local_delegation_chain_%010d" + + // A randomly generated UUID to differentiate local and remote eTags. + locallyGeneratedEtagPrefix = "\"6E8F849B434D4B98A569B9D7718876E9-" +) + +type specAggregator struct { + // mutex protects all members of this struct. + rwMutex sync.RWMutex + + // Map of API Services' OpenAPI specs by their name + openAPISpecs map[string]*openAPISpecInfo + + // provided for dynamic OpenAPI spec + openAPIService *handler.OpenAPIService +} + +var _ AggregationManager = &specAggregator{} + +// This function is not thread safe as it only being called on startup. +func (s *specAggregator) addLocalSpec(spec *spec.Swagger, localHandler http.Handler, name, etag string) { + localAPIService := apiregistration.APIService{} + localAPIService.Name = name + s.openAPISpecs[name] = &openAPISpecInfo{ + etag: etag, + apiService: localAPIService, + handler: localHandler, + spec: spec, + } +} + +// BuildAndRegisterAggregator registered OpenAPI aggregator handler. This function is not thread safe as it only being called on startup. +func BuildAndRegisterAggregator(downloader *Downloader, delegationTarget server.DelegationTarget, webServices []*restful.WebService, + config *common.Config, pathHandler common.PathHandler) (AggregationManager, error) { + s := &specAggregator{ + openAPISpecs: map[string]*openAPISpecInfo{}, + } + + i := 0 + // Build Aggregator's spec + aggregatorOpenAPISpec, err := builder.BuildOpenAPISpec( + webServices, config) + if err != nil { + return nil, err + } + + // Reserving non-name spec for aggregator's Spec. + s.addLocalSpec(aggregatorOpenAPISpec, nil, fmt.Sprintf(localDelegateChainNamePattern, i), "") + i++ + for delegate := delegationTarget; delegate != nil; delegate = delegate.NextDelegate() { + handler := delegate.UnprotectedHandler() + if handler == nil { + continue + } + delegateSpec, etag, _, err := downloader.Download(handler, "") + if err != nil { + return nil, err + } + if delegateSpec == nil { + continue + } + s.addLocalSpec(delegateSpec, handler, fmt.Sprintf(localDelegateChainNamePattern, i), etag) + i++ + } + + // Build initial spec to serve. + specToServe, err := s.buildOpenAPISpec() + if err != nil { + return nil, err + } + + // Install handler + s.openAPIService, err = handler.RegisterOpenAPIService( + specToServe, "/swagger.json", pathHandler) + if err != nil { + return nil, err + } + + return s, nil +} + +// openAPISpecInfo is used to store OpenAPI spec with its priority. +// It can be used to sort specs with their priorities. +type openAPISpecInfo struct { + apiService apiregistration.APIService + + // Specification of this API Service. If null then the spec is not loaded yet. + spec *spec.Swagger + handler http.Handler + etag string +} + +// byPriority can be used in sort.Sort to sort specs with their priorities. +type byPriority struct { + specs []openAPISpecInfo + groupPriorities map[string]int32 +} + +func (a byPriority) Len() int { return len(a.specs) } +func (a byPriority) Swap(i, j int) { a.specs[i], a.specs[j] = a.specs[j], a.specs[i] } +func (a byPriority) Less(i, j int) bool { + // All local specs will come first + // WARNING: This will result in not following priorities for local APIServices. + if a.specs[i].apiService.Spec.Service == nil { + // Sort local specs with their name. This is the order in the delegation chain (aggregator first). + return a.specs[i].apiService.Name < a.specs[j].apiService.Name + } + var iPriority, jPriority int32 + if a.specs[i].apiService.Spec.Group == a.specs[j].apiService.Spec.Group { + iPriority = a.specs[i].apiService.Spec.VersionPriority + jPriority = a.specs[i].apiService.Spec.VersionPriority + } else { + iPriority = a.groupPriorities[a.specs[i].apiService.Spec.Group] + jPriority = a.groupPriorities[a.specs[j].apiService.Spec.Group] + } + if iPriority != jPriority { + // Sort by priority, higher first + return iPriority > jPriority + } + // Sort by service name. + return a.specs[i].apiService.Name < a.specs[j].apiService.Name +} + +func sortByPriority(specs []openAPISpecInfo) { + b := byPriority{ + specs: specs, + groupPriorities: map[string]int32{}, + } + for _, spec := range specs { + if spec.apiService.Spec.Service == nil { + continue + } + if pr, found := b.groupPriorities[spec.apiService.Spec.Group]; !found || spec.apiService.Spec.GroupPriorityMinimum > pr { + b.groupPriorities[spec.apiService.Spec.Group] = spec.apiService.Spec.GroupPriorityMinimum + } + } + sort.Sort(b) +} + +// buildOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe. The caller is responsible to hold proper locks. +func (s *specAggregator) buildOpenAPISpec() (specToReturn *spec.Swagger, err error) { + specs := []openAPISpecInfo{} + for _, specInfo := range s.openAPISpecs { + if specInfo.spec == nil { + continue + } + specs = append(specs, *specInfo) + } + if len(specs) == 0 { + return &spec.Swagger{}, nil + } + sortByPriority(specs) + for _, specInfo := range specs { + // TODO: Make kube-openapi.MergeSpec(s) accept nil or empty spec as destination and just clone the spec in that case. + if specToReturn == nil { + specToReturn, err = aggregator.CloneSpec(specInfo.spec) + if err != nil { + return nil, err + } + continue + } + if err := aggregator.MergeSpecsIgnorePathConflict(specToReturn, specInfo.spec); err != nil { + return nil, err + } + } + return specToReturn, nil +} + +// updateOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe. The caller is responsible to hold proper locks. +func (s *specAggregator) updateOpenAPISpec() error { + if s.openAPIService == nil { + return nil + } + specToServe, err := s.buildOpenAPISpec() + if err != nil { + return err + } + return s.openAPIService.UpdateSpec(specToServe) +} + +// tryUpdatingServiceSpecs tries updating openAPISpecs map with specified specInfo, and keeps the map intact +// if the update fails. +func (s *specAggregator) tryUpdatingServiceSpecs(specInfo *openAPISpecInfo) error { + orgSpecInfo, exists := s.openAPISpecs[specInfo.apiService.Name] + s.openAPISpecs[specInfo.apiService.Name] = specInfo + if err := s.updateOpenAPISpec(); err != nil { + if exists { + s.openAPISpecs[specInfo.apiService.Name] = orgSpecInfo + } else { + delete(s.openAPISpecs, specInfo.apiService.Name) + } + return err + } + return nil +} + +// tryDeleteServiceSpecs tries delete specified specInfo from openAPISpecs map, and keeps the map intact +// if the update fails. +func (s *specAggregator) tryDeleteServiceSpecs(apiServiceName string) error { + orgSpecInfo, exists := s.openAPISpecs[apiServiceName] + if !exists { + return nil + } + delete(s.openAPISpecs, apiServiceName) + if err := s.updateOpenAPISpec(); err != nil { + s.openAPISpecs[apiServiceName] = orgSpecInfo + return err + } + return nil +} + +// UpdateAPIServiceSpec updates the api service's OpenAPI spec. It is thread safe. +func (s *specAggregator) UpdateAPIServiceSpec(apiServiceName string, spec *spec.Swagger, etag string) error { + s.rwMutex.Lock() + defer s.rwMutex.Unlock() + + specInfo, existingService := s.openAPISpecs[apiServiceName] + if !existingService { + return fmt.Errorf("APIService %q does not exists", apiServiceName) + } + + // For APIServices (non-local) specs, only merge their /apis/ prefixed endpoint as it is the only paths + // proxy handler delegates. + if specInfo.apiService.Spec.Service != nil { + aggregator.FilterSpecByPaths(spec, []string{"/apis/"}) + } + + return s.tryUpdatingServiceSpecs(&openAPISpecInfo{ + apiService: specInfo.apiService, + spec: spec, + handler: specInfo.handler, + etag: etag, + }) +} + +// AddUpdateAPIService adds or updates the api service. It is thread safe. +func (s *specAggregator) AddUpdateAPIService(handler http.Handler, apiService *apiregistration.APIService) error { + s.rwMutex.Lock() + defer s.rwMutex.Unlock() + + if apiService.Spec.Service == nil { + // All local specs should be already aggregated using local delegate chain + return nil + } + + newSpec := &openAPISpecInfo{ + apiService: *apiService, + handler: handler, + } + if specInfo, existingService := s.openAPISpecs[apiService.Name]; existingService { + newSpec.etag = specInfo.etag + newSpec.spec = specInfo.spec + } + return s.tryUpdatingServiceSpecs(newSpec) +} + +// RemoveAPIServiceSpec removes an api service from OpenAPI aggregation. If it does not exist, no error is returned. +// It is thread safe. +func (s *specAggregator) RemoveAPIServiceSpec(apiServiceName string) error { + s.rwMutex.Lock() + defer s.rwMutex.Unlock() + + if _, existingService := s.openAPISpecs[apiServiceName]; !existingService { + return nil + } + + return s.tryDeleteServiceSpecs(apiServiceName) +} + +// GetAPIServiceSpec returns api service spec info +func (s *specAggregator) GetAPIServiceInfo(apiServiceName string) (handler http.Handler, etag string, exists bool) { + s.rwMutex.RLock() + defer s.rwMutex.RUnlock() + + if info, existingService := s.openAPISpecs[apiServiceName]; existingService { + return info.handler, info.etag, true + } + return nil, "", false +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator_test.go new file mode 100644 index 00000000000..4bebfe506b6 --- /dev/null +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator_test.go @@ -0,0 +1,135 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package openapi + +import ( + "fmt" + "net/http" + "reflect" + "testing" + + "github.com/go-openapi/spec" + "github.com/stretchr/testify/assert" + + "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/kube-aggregator/pkg/apis/apiregistration" +) + +func newAPIServiceForTest(name, group string, minGroupPriority, versionPriority int32) apiregistration.APIService { + r := apiregistration.APIService{} + r.Spec.Group = group + r.Spec.GroupPriorityMinimum = minGroupPriority + r.Spec.VersionPriority = versionPriority + r.Spec.Service = &apiregistration.ServiceReference{} + r.Name = name + return r +} + +func assertSortedServices(t *testing.T, actual []openAPISpecInfo, expectedNames []string) { + actualNames := []string{} + for _, a := range actual { + actualNames = append(actualNames, a.apiService.Name) + } + if !reflect.DeepEqual(actualNames, expectedNames) { + t.Errorf("Expected %s got %s.", expectedNames, actualNames) + } +} + +func TestAPIServiceSort(t *testing.T) { + list := []openAPISpecInfo{ + { + apiService: newAPIServiceForTest("FirstService", "Group1", 10, 5), + spec: &spec.Swagger{}, + }, + { + apiService: newAPIServiceForTest("SecondService", "Group2", 15, 3), + spec: &spec.Swagger{}, + }, + { + apiService: newAPIServiceForTest("FirstServiceInternal", "Group1", 16, 3), + spec: &spec.Swagger{}, + }, + { + apiService: newAPIServiceForTest("ThirdService", "Group3", 15, 3), + spec: &spec.Swagger{}, + }, + } + sortByPriority(list) + assertSortedServices(t, list, []string{"FirstService", "FirstServiceInternal", "SecondService", "ThirdService"}) +} + +type handlerTest struct { + etag string + data []byte +} + +var _ http.Handler = handlerTest{} + +func (h handlerTest) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if len(h.etag) > 0 { + w.Header().Add("Etag", h.etag) + } + ifNoneMatches := r.Header["If-None-Match"] + for _, match := range ifNoneMatches { + if match == h.etag { + w.WriteHeader(http.StatusNotModified) + return + } + } + w.Write(h.data) +} + +func assertDownloadedSpec(actualSpec *spec.Swagger, actualEtag string, err error, + expectedSpecID string, expectedEtag string) error { + if err != nil { + return fmt.Errorf("downloadOpenAPISpec failed : %s", err) + } + if expectedSpecID == "" && actualSpec != nil { + return fmt.Errorf("expected Not Modified, actual ID %s", actualSpec.ID) + } + if actualSpec != nil && actualSpec.ID != expectedSpecID { + return fmt.Errorf("expected ID %s, actual ID %s", expectedSpecID, actualSpec.ID) + } + if actualEtag != expectedEtag { + return fmt.Errorf("expected ETag '%s', actual ETag '%s'", expectedEtag, actualEtag) + } + return nil +} + +func TestDownloadOpenAPISpec(t *testing.T) { + + s := Downloader{contextMapper: request.NewRequestContextMapper()} + + // Test with no eTag + actualSpec, actualEtag, _, err := s.Download(handlerTest{data: []byte("{\"id\": \"test\"}")}, "") + assert.NoError(t, assertDownloadedSpec(actualSpec, actualEtag, err, "test", "\"6E8F849B434D4B98A569B9D7718876E9-356ECAB19D7FBE1336BABB1E70F8F3025050DE218BE78256BE81620681CFC9A268508E542B8B55974E17B2184BBFC8FFFAA577E51BE195D32B3CA2547818ABE4\"")) + + // Test with eTag + actualSpec, actualEtag, _, err = s.Download( + handlerTest{data: []byte("{\"id\": \"test\"}"), etag: "etag_test"}, "") + assert.NoError(t, assertDownloadedSpec(actualSpec, actualEtag, err, "test", "etag_test")) + + // Test not modified + actualSpec, actualEtag, _, err = s.Download( + handlerTest{data: []byte("{\"id\": \"test\"}"), etag: "etag_test"}, "etag_test") + assert.NoError(t, assertDownloadedSpec(actualSpec, actualEtag, err, "", "etag_test")) + + // Test different eTags + actualSpec, actualEtag, _, err = s.Download( + handlerTest{data: []byte("{\"id\": \"test\"}"), etag: "etag_test1"}, "etag_test2") + assert.NoError(t, assertDownloadedSpec(actualSpec, actualEtag, err, "test", "etag_test1")) +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/controller.go new file mode 100644 index 00000000000..c38350ddbbe --- /dev/null +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/controller.go @@ -0,0 +1,186 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package openapi + +import ( + "fmt" + "net/http" + "time" + + "github.com/go-openapi/spec" + "github.com/golang/glog" + + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/workqueue" + "k8s.io/kube-aggregator/pkg/apis/apiregistration" +) + +const ( + successfulUpdateDelay = time.Minute + failedUpdateMaxExpDelay = time.Hour +) + +type syncAction int + +const ( + syncRequeue syncAction = iota + syncRequeueRateLimited + syncNothing +) + +// AggregationManager is the interface between this controller and OpenAPI Aggregator service. +type AggregationManager interface { + AddUpdateAPIService(handler http.Handler, apiService *apiregistration.APIService) error + UpdateAPIServiceSpec(apiServiceName string, spec *spec.Swagger, etag string) error + RemoveAPIServiceSpec(apiServiceName string) error + GetAPIServiceInfo(apiServiceName string) (handler http.Handler, etag string, exists bool) +} + +// AggregationController periodically check for changes in OpenAPI specs of APIServices and update/remove +// them if necessary. +type AggregationController struct { + openAPIAggregationManager AggregationManager + queue workqueue.RateLimitingInterface + downloader *Downloader + + // To allow injection for testing. + syncHandler func(key string) (syncAction, error) +} + +// NewAggregationController creates new OpenAPI aggregation controller. +func NewAggregationController(downloader *Downloader, openAPIAggregationManager AggregationManager) *AggregationController { + c := &AggregationController{ + openAPIAggregationManager: openAPIAggregationManager, + queue: workqueue.NewNamedRateLimitingQueue( + workqueue.NewItemExponentialFailureRateLimiter(successfulUpdateDelay, failedUpdateMaxExpDelay), "APIServiceOpenAPIAggregationControllerQueue1"), + downloader: downloader, + } + + c.syncHandler = c.sync + + return c +} + +// Run starts OpenAPI AggregationController +func (c *AggregationController) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + + glog.Infof("Starting OpenAPI AggregationController") + defer glog.Infof("Shutting down OpenAPI AggregationController") + + go wait.Until(c.runWorker, time.Second, stopCh) + + <-stopCh +} + +func (c *AggregationController) runWorker() { + for c.processNextWorkItem() { + } +} + +// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit. +func (c *AggregationController) processNextWorkItem() bool { + key, quit := c.queue.Get() + defer c.queue.Done(key) + if quit { + return false + } + + glog.Infof("OpenAPI AggregationController: Processing item %s", key) + + action, err := c.syncHandler(key.(string)) + if err == nil { + c.queue.Forget(key) + } else { + utilruntime.HandleError(fmt.Errorf("loading OpenAPI spec for %q failed with: %v", key, err)) + } + + switch action { + case syncRequeue: + glog.Infof("OpenAPI AggregationController: action for item %s: Requeue.", key) + c.queue.AddAfter(key, successfulUpdateDelay) + case syncRequeueRateLimited: + glog.Infof("OpenAPI AggregationController: action for item %s: Rate Limited Requeue.", key) + c.queue.AddRateLimited(key) + case syncNothing: + glog.Infof("OpenAPI AggregationController: action for item %s: Nothing (removed from the queue).", key) + } + + return true +} + +func (c *AggregationController) sync(key string) (syncAction, error) { + handler, etag, exists := c.openAPIAggregationManager.GetAPIServiceInfo(key) + if !exists || handler == nil { + return syncNothing, nil + } + returnSpec, newEtag, httpStatus, err := c.downloader.Download(handler, etag) + switch { + case err != nil: + return syncRequeueRateLimited, err + case httpStatus == http.StatusNotModified: + case httpStatus == http.StatusNotFound || returnSpec == nil: + return syncRequeueRateLimited, fmt.Errorf("OpenAPI spec does not exists") + case httpStatus == http.StatusOK: + if err := c.openAPIAggregationManager.UpdateAPIServiceSpec(key, returnSpec, newEtag); err != nil { + return syncRequeueRateLimited, err + } + } + return syncRequeue, nil +} + +// AddAPIService adds a new API Service to OpenAPI Aggregation. +func (c *AggregationController) AddAPIService(handler http.Handler, apiService *apiregistration.APIService) { + if apiService.Spec.Service == nil { + return + } + if err := c.openAPIAggregationManager.AddUpdateAPIService(handler, apiService); err != nil { + utilruntime.HandleError(fmt.Errorf("adding %q to AggregationController failed with: %v", apiService.Name, err)) + } + c.queue.AddAfter(apiService.Name, time.Second) +} + +// UpdateAPIService updates API Service's info and handler. +func (c *AggregationController) UpdateAPIService(handler http.Handler, apiService *apiregistration.APIService) { + if apiService.Spec.Service == nil { + return + } + if err := c.openAPIAggregationManager.AddUpdateAPIService(handler, apiService); err != nil { + utilruntime.HandleError(fmt.Errorf("updating %q to AggregationController failed with: %v", apiService.Name, err)) + } + key := apiService.Name + if c.queue.NumRequeues(key) > 0 { + // The item has failed before. Remove it from failure queue and + // update it in a second + c.queue.Forget(key) + c.queue.AddAfter(key, time.Second) + } + // Else: The item has been succeeded before and it will be updated soon (after successfulUpdateDelay) + // we don't add it again as it will cause a duplication of items. +} + +// RemoveAPIService removes API Service from OpenAPI Aggregation Controller. +func (c *AggregationController) RemoveAPIService(apiServiceName string) { + if err := c.openAPIAggregationManager.RemoveAPIServiceSpec(apiServiceName); err != nil { + utilruntime.HandleError(fmt.Errorf("removing %q from AggregationController failed with: %v", apiServiceName, err)) + } + // This will only remove it if it was failing before. If it was successful, processNextWorkItem will figure it out + // and will not add it again to the queue. + c.queue.Forget(apiServiceName) +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/downloader.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/downloader.go new file mode 100644 index 00000000000..0da4e7e99af --- /dev/null +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/downloader.go @@ -0,0 +1,146 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package openapi + +import ( + "crypto/sha512" + "encoding/json" + "fmt" + "net/http" + "strings" + + "github.com/go-openapi/spec" + + "k8s.io/apiserver/pkg/authentication/user" + "k8s.io/apiserver/pkg/endpoints/request" +) + +// Downloader is the OpenAPI downloader type. It will try to download spec from /swagger.json endpoint. +type Downloader struct { + contextMapper request.RequestContextMapper +} + +// NewDownloader creates a new OpenAPI Downloader. +func NewDownloader(contextMapper request.RequestContextMapper) Downloader { + return Downloader{contextMapper} +} + +// inMemoryResponseWriter is a http.Writer that keep the response in memory. +type inMemoryResponseWriter struct { + writeHeaderCalled bool + header http.Header + respCode int + data []byte +} + +func newInMemoryResponseWriter() *inMemoryResponseWriter { + return &inMemoryResponseWriter{header: http.Header{}} +} + +func (r *inMemoryResponseWriter) Header() http.Header { + return r.header +} + +func (r *inMemoryResponseWriter) WriteHeader(code int) { + r.writeHeaderCalled = true + r.respCode = code +} + +func (r *inMemoryResponseWriter) Write(in []byte) (int, error) { + if !r.writeHeaderCalled { + r.WriteHeader(http.StatusOK) + } + r.data = append(r.data, in...) + return len(in), nil +} + +func (r *inMemoryResponseWriter) String() string { + s := fmt.Sprintf("ResponseCode: %d", r.respCode) + if r.data != nil { + s += fmt.Sprintf(", Body: %s", string(r.data)) + } + if r.header != nil { + s += fmt.Sprintf(", Header: %s", r.header) + } + return s +} + +func (s *Downloader) handlerWithUser(handler http.Handler, info user.Info) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if ctx, ok := s.contextMapper.Get(req); ok { + s.contextMapper.Update(req, request.WithUser(ctx, info)) + } + handler.ServeHTTP(w, req) + }) +} + +func etagFor(data []byte) string { + return fmt.Sprintf("%s%X\"", locallyGeneratedEtagPrefix, sha512.Sum512(data)) +} + +// Download downloads openAPI spec from /swagger.json endpoint of the given handler. +// httpStatus is only valid if err == nil +func (s *Downloader) Download(handler http.Handler, etag string) (returnSpec *spec.Swagger, newEtag string, httpStatus int, err error) { + handler = s.handlerWithUser(handler, &user.DefaultInfo{Name: aggregatorUser}) + handler = request.WithRequestContext(handler, s.contextMapper) + handler = http.TimeoutHandler(handler, specDownloadTimeout, "request timed out") + + req, err := http.NewRequest("GET", "/swagger.json", nil) + if err != nil { + return nil, "", 0, err + } + + // Only pass eTag if it is not generated locally + if len(etag) > 0 && !strings.HasPrefix(etag, locallyGeneratedEtagPrefix) { + req.Header.Add("If-None-Match", etag) + } + + writer := newInMemoryResponseWriter() + handler.ServeHTTP(writer, req) + + switch writer.respCode { + case http.StatusNotModified: + if len(etag) == 0 { + return nil, etag, http.StatusNotModified, fmt.Errorf("http.StatusNotModified is not allowed in absence of etag") + } + return nil, etag, http.StatusNotModified, nil + case http.StatusNotFound: + // Gracefully skip 404, assuming the server won't provide any spec + return nil, "", http.StatusNotFound, nil + case http.StatusOK: + openAPISpec := &spec.Swagger{} + if err := json.Unmarshal(writer.data, openAPISpec); err != nil { + return nil, "", 0, err + } + newEtag = writer.Header().Get("Etag") + if len(newEtag) == 0 { + newEtag = etagFor(writer.data) + if len(etag) > 0 && strings.HasPrefix(etag, locallyGeneratedEtagPrefix) { + // The function call with an etag and server does not report an etag. + // That means this server does not support etag and the etag that passed + // to the function generated previously by us. Just compare etags and + // return StatusNotModified if they are the same. + if etag == newEtag { + return nil, etag, http.StatusNotModified, nil + } + } + } + return openAPISpec, newEtag, http.StatusOK, nil + default: + return nil, "", 0, fmt.Errorf("failed to retrieve openAPI spec, http error: %s", writer.String()) + } +}