diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD index aa5763bd528..a931e6fc793 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD @@ -78,6 +78,7 @@ go_library( "//staging/src/k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/controllers:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi:go_default_library", + "//staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/controllers/status:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/registry/apiservice/rest:go_default_library", "//vendor/k8s.io/klog:go_default_library", diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index e1be1847c56..e31f0e39b66 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -35,6 +35,7 @@ import ( informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion" listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion" openapicontroller "k8s.io/kube-aggregator/pkg/controllers/openapi" + openapiaggregator "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator" statuscontrollers "k8s.io/kube-aggregator/pkg/controllers/status" apiservicerest "k8s.io/kube-aggregator/pkg/registry/apiservice/rest" ) @@ -206,8 +207,8 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg }) if openAPIConfig != nil { - specDownloader := openapicontroller.NewDownloader() - openAPIAggregator, err := openapicontroller.BuildAndRegisterAggregator( + specDownloader := openapiaggregator.NewDownloader() + openAPIAggregator, err := openapiaggregator.BuildAndRegisterAggregator( &specDownloader, delegationTarget, s.GenericAPIServer.Handler.GoRestfulContainer.RegisteredWebServices(), diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/BUILD b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/BUILD index 9a4bef259a2..6115841cb9a 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/BUILD +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/BUILD @@ -1,41 +1,18 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") +load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", - srcs = [ - "aggregator.go", - "controller.go", - "downloader.go", - ], + srcs = ["controller.go"], importmap = "k8s.io/kubernetes/vendor/k8s.io/kube-aggregator/pkg/controllers/openapi", importpath = "k8s.io/kube-aggregator/pkg/controllers/openapi", visibility = ["//visibility:public"], deps = [ "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/server:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library", - "//vendor/github.com/emicklei/go-restful:go_default_library", - "//vendor/github.com/go-openapi/spec:go_default_library", + "//staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator:go_default_library", "//vendor/k8s.io/klog:go_default_library", - "//vendor/k8s.io/kube-openapi/pkg/aggregator:go_default_library", - "//vendor/k8s.io/kube-openapi/pkg/builder:go_default_library", - "//vendor/k8s.io/kube-openapi/pkg/common:go_default_library", - "//vendor/k8s.io/kube-openapi/pkg/handler:go_default_library", - ], -) - -go_test( - name = "go_default_test", - srcs = ["aggregator_test.go"], - embed = [":go_default_library"], - deps = [ - "//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library", - "//vendor/github.com/go-openapi/spec:go_default_library", - "//vendor/github.com/stretchr/testify/assert:go_default_library", ], ) @@ -48,7 +25,10 @@ filegroup( filegroup( name = "all-srcs", - srcs = [":package-srcs"], + srcs = [ + ":package-srcs", + "//staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator:all-srcs", + ], tags = ["automanaged"], visibility = ["//visibility:public"], ) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/BUILD b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/BUILD new file mode 100644 index 00000000000..01e155402ff --- /dev/null +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/BUILD @@ -0,0 +1,53 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "aggregator.go", + "downloader.go", + "priority.go", + ], + importmap = "k8s.io/kubernetes/vendor/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator", + importpath = "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator", + visibility = ["//visibility:public"], + deps = [ + "//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/server:go_default_library", + "//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library", + "//vendor/github.com/emicklei/go-restful:go_default_library", + "//vendor/github.com/go-openapi/spec:go_default_library", + "//vendor/k8s.io/kube-openapi/pkg/aggregator:go_default_library", + "//vendor/k8s.io/kube-openapi/pkg/builder:go_default_library", + "//vendor/k8s.io/kube-openapi/pkg/common:go_default_library", + "//vendor/k8s.io/kube-openapi/pkg/handler:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = [ + "downloader_test.go", + "priority_test.go", + ], + embed = [":go_default_library"], + deps = [ + "//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library", + "//vendor/github.com/go-openapi/spec:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go similarity index 78% rename from staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator.go rename to staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go index 4dadf71e980..ca36b8e7902 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go @@ -14,12 +14,11 @@ See the License for the specific language governing permissions and limitations under the License. */ -package openapi +package aggregator import ( "fmt" "net/http" - "sort" "sync" "time" @@ -34,6 +33,15 @@ import ( "k8s.io/kube-openapi/pkg/handler" ) +// SpecAggregator calls out to http handlers of APIServices and merges specs. It keeps state of the last +// known specs including the http etag. +type SpecAggregator interface { + AddUpdateAPIService(handler http.Handler, apiService *apiregistration.APIService) error + UpdateAPIServiceSpec(apiServiceName string, spec *spec.Swagger, etag string) error + RemoveAPIServiceSpec(apiServiceName string) error + GetAPIServiceInfo(apiServiceName string) (handler http.Handler, etag string, exists bool) +} + const ( aggregatorUser = "system:aggregator" specDownloadTimeout = 60 * time.Second @@ -43,42 +51,16 @@ const ( locallyGeneratedEtagPrefix = "\"6E8F849B434D4B98A569B9D7718876E9-" ) -type specAggregator struct { - // mutex protects all members of this struct. - rwMutex sync.RWMutex - - // Map of API Services' OpenAPI specs by their name - openAPISpecs map[string]*openAPISpecInfo - - // provided for dynamic OpenAPI spec - openAPIVersionedService *handler.OpenAPIService -} - -var _ AggregationManager = &specAggregator{} - -// This function is not thread safe as it only being called on startup. -func (s *specAggregator) addLocalSpec(spec *spec.Swagger, localHandler http.Handler, name, etag string) { - localAPIService := apiregistration.APIService{} - localAPIService.Name = name - s.openAPISpecs[name] = &openAPISpecInfo{ - etag: etag, - apiService: localAPIService, - handler: localHandler, - spec: spec, - } -} - // BuildAndRegisterAggregator registered OpenAPI aggregator handler. This function is not thread safe as it only being called on startup. func BuildAndRegisterAggregator(downloader *Downloader, delegationTarget server.DelegationTarget, webServices []*restful.WebService, - config *common.Config, pathHandler common.PathHandler) (AggregationManager, error) { + config *common.Config, pathHandler common.PathHandler) (SpecAggregator, error) { s := &specAggregator{ openAPISpecs: map[string]*openAPISpecInfo{}, } i := 0 // Build Aggregator's spec - aggregatorOpenAPISpec, err := builder.BuildOpenAPISpec( - webServices, config) + aggregatorOpenAPISpec, err := builder.BuildOpenAPISpec(webServices, config) if err != nil { return nil, err } @@ -118,6 +100,31 @@ func BuildAndRegisterAggregator(downloader *Downloader, delegationTarget server. return s, nil } +type specAggregator struct { + // mutex protects all members of this struct. + rwMutex sync.RWMutex + + // Map of API Services' OpenAPI specs by their name + openAPISpecs map[string]*openAPISpecInfo + + // provided for dynamic OpenAPI spec + openAPIVersionedService *handler.OpenAPIService +} + +var _ SpecAggregator = &specAggregator{} + +// This function is not thread safe as it only being called on startup. +func (s *specAggregator) addLocalSpec(spec *spec.Swagger, localHandler http.Handler, name, etag string) { + localAPIService := apiregistration.APIService{} + localAPIService.Name = name + s.openAPISpecs[name] = &openAPISpecInfo{ + etag: etag, + apiService: localAPIService, + handler: localHandler, + spec: spec, + } +} + // openAPISpecInfo is used to store OpenAPI spec with its priority. // It can be used to sort specs with their priorities. type openAPISpecInfo struct { @@ -129,59 +136,6 @@ type openAPISpecInfo struct { etag string } -// byPriority can be used in sort.Sort to sort specs with their priorities. -type byPriority struct { - specs []openAPISpecInfo - groupPriorities map[string]int32 -} - -func (a byPriority) Len() int { return len(a.specs) } -func (a byPriority) Swap(i, j int) { a.specs[i], a.specs[j] = a.specs[j], a.specs[i] } -func (a byPriority) Less(i, j int) bool { - // All local specs will come first - if a.specs[i].apiService.Spec.Service == nil && a.specs[j].apiService.Spec.Service != nil { - return true - } - if a.specs[i].apiService.Spec.Service != nil && a.specs[j].apiService.Spec.Service == nil { - return false - } - // WARNING: This will result in not following priorities for local APIServices. - if a.specs[i].apiService.Spec.Service == nil { - // Sort local specs with their name. This is the order in the delegation chain (aggregator first). - return a.specs[i].apiService.Name < a.specs[j].apiService.Name - } - var iPriority, jPriority int32 - if a.specs[i].apiService.Spec.Group == a.specs[j].apiService.Spec.Group { - iPriority = a.specs[i].apiService.Spec.VersionPriority - jPriority = a.specs[i].apiService.Spec.VersionPriority - } else { - iPriority = a.groupPriorities[a.specs[i].apiService.Spec.Group] - jPriority = a.groupPriorities[a.specs[j].apiService.Spec.Group] - } - if iPriority != jPriority { - // Sort by priority, higher first - return iPriority > jPriority - } - // Sort by service name. - return a.specs[i].apiService.Name < a.specs[j].apiService.Name -} - -func sortByPriority(specs []openAPISpecInfo) { - b := byPriority{ - specs: specs, - groupPriorities: map[string]int32{}, - } - for _, spec := range specs { - if spec.apiService.Spec.Service == nil { - continue - } - if pr, found := b.groupPriorities[spec.apiService.Spec.Group]; !found || spec.apiService.Spec.GroupPriorityMinimum > pr { - b.groupPriorities[spec.apiService.Spec.Group] = spec.apiService.Spec.GroupPriorityMinimum - } - } - sort.Sort(b) -} - // buildOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe. The caller is responsible to hold proper locks. func (s *specAggregator) buildOpenAPISpec() (specToReturn *spec.Swagger, err error) { specs := []openAPISpecInfo{} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/downloader.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go similarity index 99% rename from staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/downloader.go rename to staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go index f5ffc0b7bd5..cb9deb0cbd6 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/downloader.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package openapi +package aggregator import ( "crypto/sha512" @@ -38,46 +38,6 @@ func NewDownloader() Downloader { return Downloader{} } -// inMemoryResponseWriter is a http.Writer that keep the response in memory. -type inMemoryResponseWriter struct { - writeHeaderCalled bool - header http.Header - respCode int - data []byte -} - -func newInMemoryResponseWriter() *inMemoryResponseWriter { - return &inMemoryResponseWriter{header: http.Header{}} -} - -func (r *inMemoryResponseWriter) Header() http.Header { - return r.header -} - -func (r *inMemoryResponseWriter) WriteHeader(code int) { - r.writeHeaderCalled = true - r.respCode = code -} - -func (r *inMemoryResponseWriter) Write(in []byte) (int, error) { - if !r.writeHeaderCalled { - r.WriteHeader(http.StatusOK) - } - r.data = append(r.data, in...) - return len(in), nil -} - -func (r *inMemoryResponseWriter) String() string { - s := fmt.Sprintf("ResponseCode: %d", r.respCode) - if r.data != nil { - s += fmt.Sprintf(", Body: %s", string(r.data)) - } - if r.header != nil { - s += fmt.Sprintf(", Header: %s", r.header) - } - return s -} - func (s *Downloader) handlerWithUser(handler http.Handler, info user.Info) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { req = req.WithContext(request.WithUser(req.Context(), info)) @@ -141,3 +101,43 @@ func (s *Downloader) Download(handler http.Handler, etag string) (returnSpec *sp return nil, "", 0, fmt.Errorf("failed to retrieve openAPI spec, http error: %s", writer.String()) } } + +// inMemoryResponseWriter is a http.Writer that keep the response in memory. +type inMemoryResponseWriter struct { + writeHeaderCalled bool + header http.Header + respCode int + data []byte +} + +func newInMemoryResponseWriter() *inMemoryResponseWriter { + return &inMemoryResponseWriter{header: http.Header{}} +} + +func (r *inMemoryResponseWriter) Header() http.Header { + return r.header +} + +func (r *inMemoryResponseWriter) WriteHeader(code int) { + r.writeHeaderCalled = true + r.respCode = code +} + +func (r *inMemoryResponseWriter) Write(in []byte) (int, error) { + if !r.writeHeaderCalled { + r.WriteHeader(http.StatusOK) + } + r.data = append(r.data, in...) + return len(in), nil +} + +func (r *inMemoryResponseWriter) String() string { + s := fmt.Sprintf("ResponseCode: %d", r.respCode) + if r.data != nil { + s += fmt.Sprintf(", Body: %s", string(r.data)) + } + if r.header != nil { + s += fmt.Sprintf(", Header: %s", r.header) + } + return s +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader_test.go similarity index 57% rename from staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator_test.go rename to staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader_test.go index 3312951d709..6d530d6154f 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader_test.go @@ -14,84 +14,17 @@ See the License for the specific language governing permissions and limitations under the License. */ -package openapi +package aggregator import ( "fmt" "net/http" - "reflect" "testing" "github.com/go-openapi/spec" "github.com/stretchr/testify/assert" - - "k8s.io/kube-aggregator/pkg/apis/apiregistration" ) -func newAPIServiceForTest(name, group string, minGroupPriority, versionPriority int32) apiregistration.APIService { - r := apiregistration.APIService{} - r.Spec.Group = group - r.Spec.GroupPriorityMinimum = minGroupPriority - r.Spec.VersionPriority = versionPriority - r.Spec.Service = &apiregistration.ServiceReference{} - r.Name = name - return r -} - -func newLocalAPIServiceForTest(name, group string, minGroupPriority, versionPriority int32) apiregistration.APIService { - r := apiregistration.APIService{} - r.Spec.Group = group - r.Spec.GroupPriorityMinimum = minGroupPriority - r.Spec.VersionPriority = versionPriority - r.Name = name - return r -} - -func assertSortedServices(t *testing.T, actual []openAPISpecInfo, expectedNames []string) { - actualNames := []string{} - for _, a := range actual { - actualNames = append(actualNames, a.apiService.Name) - } - if !reflect.DeepEqual(actualNames, expectedNames) { - t.Errorf("Expected %s got %s.", expectedNames, actualNames) - } -} - -func TestAPIServiceSort(t *testing.T) { - list := []openAPISpecInfo{ - { - apiService: newAPIServiceForTest("FirstService", "Group1", 10, 5), - spec: &spec.Swagger{}, - }, - { - apiService: newAPIServiceForTest("SecondService", "Group2", 15, 3), - spec: &spec.Swagger{}, - }, - { - apiService: newAPIServiceForTest("FirstServiceInternal", "Group1", 16, 3), - spec: &spec.Swagger{}, - }, - { - apiService: newAPIServiceForTest("ThirdService", "Group3", 15, 3), - spec: &spec.Swagger{}, - }, - { - apiService: newLocalAPIServiceForTest("FirstLocalSpec", "Group1", 15, 5), - spec: &spec.Swagger{}, - }, - { - apiService: newLocalAPIServiceForTest("SecondLocalSpec", "Group2", 14, 6), - spec: &spec.Swagger{}, - }, - { - apiService: newLocalAPIServiceForTest("ThirdLocalSpec", "Group3", 16, 3), - spec: &spec.Swagger{}, - }, - } - sortByPriority(list) - assertSortedServices(t, list, []string{"FirstLocalSpec", "SecondLocalSpec", "ThirdLocalSpec", "FirstService", "FirstServiceInternal", "SecondService", "ThirdService"}) -} - type handlerTest struct { etag string data []byte @@ -113,6 +46,32 @@ func (h handlerTest) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Write(h.data) } +type handlerDeprecatedTest struct { + etag string + data []byte +} + +var _ http.Handler = handlerDeprecatedTest{} + +func (h handlerDeprecatedTest) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // old server returns 403 on new endpoint + if r.URL.Path == "/openapi/v2" { + w.WriteHeader(http.StatusForbidden) + return + } + if len(h.etag) > 0 { + w.Header().Add("Etag", h.etag) + } + ifNoneMatches := r.Header["If-None-Match"] + for _, match := range ifNoneMatches { + if match == h.etag { + w.WriteHeader(http.StatusNotModified) + return + } + } + w.Write(h.data) +} + func assertDownloadedSpec(actualSpec *spec.Swagger, actualEtag string, err error, expectedSpecID string, expectedEtag string) error { if err != nil { @@ -131,7 +90,6 @@ func assertDownloadedSpec(actualSpec *spec.Swagger, actualEtag string, err error } func TestDownloadOpenAPISpec(t *testing.T) { - s := Downloader{} // Test with no eTag diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/priority.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/priority.go new file mode 100644 index 00000000000..9847de3c6c6 --- /dev/null +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/priority.go @@ -0,0 +1,74 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aggregator + +import ( + "sort" +) + +// byPriority can be used in sort.Sort to sort specs with their priorities. +type byPriority struct { + specs []openAPISpecInfo + groupPriorities map[string]int32 +} + +func (a byPriority) Len() int { return len(a.specs) } +func (a byPriority) Swap(i, j int) { a.specs[i], a.specs[j] = a.specs[j], a.specs[i] } +func (a byPriority) Less(i, j int) bool { + // All local specs will come first + if a.specs[i].apiService.Spec.Service == nil && a.specs[j].apiService.Spec.Service != nil { + return true + } + if a.specs[i].apiService.Spec.Service != nil && a.specs[j].apiService.Spec.Service == nil { + return false + } + // WARNING: This will result in not following priorities for local APIServices. + if a.specs[i].apiService.Spec.Service == nil { + // Sort local specs with their name. This is the order in the delegation chain (aggregator first). + return a.specs[i].apiService.Name < a.specs[j].apiService.Name + } + var iPriority, jPriority int32 + if a.specs[i].apiService.Spec.Group == a.specs[j].apiService.Spec.Group { + iPriority = a.specs[i].apiService.Spec.VersionPriority + jPriority = a.specs[i].apiService.Spec.VersionPriority + } else { + iPriority = a.groupPriorities[a.specs[i].apiService.Spec.Group] + jPriority = a.groupPriorities[a.specs[j].apiService.Spec.Group] + } + if iPriority != jPriority { + // Sort by priority, higher first + return iPriority > jPriority + } + // Sort by service name. + return a.specs[i].apiService.Name < a.specs[j].apiService.Name +} + +func sortByPriority(specs []openAPISpecInfo) { + b := byPriority{ + specs: specs, + groupPriorities: map[string]int32{}, + } + for _, spec := range specs { + if spec.apiService.Spec.Service == nil { + continue + } + if pr, found := b.groupPriorities[spec.apiService.Spec.Group]; !found || spec.apiService.Spec.GroupPriorityMinimum > pr { + b.groupPriorities[spec.apiService.Spec.Group] = spec.apiService.Spec.GroupPriorityMinimum + } + } + sort.Sort(b) +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/priority_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/priority_test.go new file mode 100644 index 00000000000..085ed15fd61 --- /dev/null +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/priority_test.go @@ -0,0 +1,69 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aggregator + +import ( + "reflect" + "testing" + + "github.com/go-openapi/spec" + + "k8s.io/kube-aggregator/pkg/apis/apiregistration" +) + +func newAPIServiceForTest(name, group string, minGroupPriority, versionPriority int32) apiregistration.APIService { + r := apiregistration.APIService{} + r.Spec.Group = group + r.Spec.GroupPriorityMinimum = minGroupPriority + r.Spec.VersionPriority = versionPriority + r.Spec.Service = &apiregistration.ServiceReference{} + r.Name = name + return r +} + +func assertSortedServices(t *testing.T, actual []openAPISpecInfo, expectedNames []string) { + actualNames := []string{} + for _, a := range actual { + actualNames = append(actualNames, a.apiService.Name) + } + if !reflect.DeepEqual(actualNames, expectedNames) { + t.Errorf("Expected %s got %s.", expectedNames, actualNames) + } +} + +func TestAPIServiceSort(t *testing.T) { + list := []openAPISpecInfo{ + { + apiService: newAPIServiceForTest("FirstService", "Group1", 10, 5), + spec: &spec.Swagger{}, + }, + { + apiService: newAPIServiceForTest("SecondService", "Group2", 15, 3), + spec: &spec.Swagger{}, + }, + { + apiService: newAPIServiceForTest("FirstServiceInternal", "Group1", 16, 3), + spec: &spec.Swagger{}, + }, + { + apiService: newAPIServiceForTest("ThirdService", "Group3", 15, 3), + spec: &spec.Swagger{}, + }, + } + sortByPriority(list) + assertSortedServices(t, list, []string{"FirstService", "FirstServiceInternal", "SecondService", "ThirdService"}) +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/controller.go index 49d190e9020..713a06979d2 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/controller.go @@ -21,13 +21,12 @@ import ( "net/http" "time" - "github.com/go-openapi/spec" - "k8s.io/klog" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" + "k8s.io/klog" "k8s.io/kube-aggregator/pkg/apis/apiregistration" + "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator" ) const ( @@ -43,27 +42,19 @@ const ( syncNothing ) -// AggregationManager is the interface between this controller and OpenAPI Aggregator service. -type AggregationManager interface { - AddUpdateAPIService(handler http.Handler, apiService *apiregistration.APIService) error - UpdateAPIServiceSpec(apiServiceName string, spec *spec.Swagger, etag string) error - RemoveAPIServiceSpec(apiServiceName string) error - GetAPIServiceInfo(apiServiceName string) (handler http.Handler, etag string, exists bool) -} - // AggregationController periodically check for changes in OpenAPI specs of APIServices and update/remove // them if necessary. type AggregationController struct { - openAPIAggregationManager AggregationManager + openAPIAggregationManager aggregator.SpecAggregator queue workqueue.RateLimitingInterface - downloader *Downloader + downloader *aggregator.Downloader // To allow injection for testing. syncHandler func(key string) (syncAction, error) } // NewAggregationController creates new OpenAPI aggregation controller. -func NewAggregationController(downloader *Downloader, openAPIAggregationManager AggregationManager) *AggregationController { +func NewAggregationController(downloader *aggregator.Downloader, openAPIAggregationManager aggregator.SpecAggregator) *AggregationController { c := &AggregationController{ openAPIAggregationManager: openAPIAggregationManager, queue: workqueue.NewNamedRateLimitingQueue(