Merge pull request #73953 from sttts/sttts-simplify-kube-aggregator-openapi

kube-aggregator: split openapi spec aggregator from controller code
This commit is contained in:
Kubernetes Prow Robot 2019-02-13 07:18:42 -08:00 committed by GitHub
commit 7944fed44b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 317 additions and 236 deletions

View File

@ -78,6 +78,7 @@ go_library(
"//staging/src/k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/controllers:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/controllers:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/controllers/status:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/controllers/status:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/registry/apiservice/rest:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/registry/apiservice/rest:go_default_library",
"//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/klog:go_default_library",

View File

@ -35,6 +35,7 @@ import (
informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion" informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion"
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion" listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion"
openapicontroller "k8s.io/kube-aggregator/pkg/controllers/openapi" openapicontroller "k8s.io/kube-aggregator/pkg/controllers/openapi"
openapiaggregator "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator"
statuscontrollers "k8s.io/kube-aggregator/pkg/controllers/status" statuscontrollers "k8s.io/kube-aggregator/pkg/controllers/status"
apiservicerest "k8s.io/kube-aggregator/pkg/registry/apiservice/rest" apiservicerest "k8s.io/kube-aggregator/pkg/registry/apiservice/rest"
) )
@ -206,8 +207,8 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
}) })
if openAPIConfig != nil { if openAPIConfig != nil {
specDownloader := openapicontroller.NewDownloader() specDownloader := openapiaggregator.NewDownloader()
openAPIAggregator, err := openapicontroller.BuildAndRegisterAggregator( openAPIAggregator, err := openapiaggregator.BuildAndRegisterAggregator(
&specDownloader, &specDownloader,
delegationTarget, delegationTarget,
s.GenericAPIServer.Handler.GoRestfulContainer.RegisteredWebServices(), s.GenericAPIServer.Handler.GoRestfulContainer.RegisteredWebServices(),

View File

@ -1,41 +1,18 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = ["controller.go"],
"aggregator.go",
"controller.go",
"downloader.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/kube-aggregator/pkg/controllers/openapi", importmap = "k8s.io/kubernetes/vendor/k8s.io/kube-aggregator/pkg/controllers/openapi",
importpath = "k8s.io/kube-aggregator/pkg/controllers/openapi", importpath = "k8s.io/kube-aggregator/pkg/controllers/openapi",
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
"//vendor/github.com/emicklei/go-restful:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator:go_default_library",
"//vendor/github.com/go-openapi/spec:go_default_library",
"//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/kube-openapi/pkg/aggregator:go_default_library",
"//vendor/k8s.io/kube-openapi/pkg/builder:go_default_library",
"//vendor/k8s.io/kube-openapi/pkg/common:go_default_library",
"//vendor/k8s.io/kube-openapi/pkg/handler:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["aggregator_test.go"],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
"//vendor/github.com/go-openapi/spec:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
], ],
) )
@ -48,7 +25,10 @@ filegroup(
filegroup( filegroup(
name = "all-srcs", name = "all-srcs",
srcs = [":package-srcs"], srcs = [
":package-srcs",
"//staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator:all-srcs",
],
tags = ["automanaged"], tags = ["automanaged"],
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
) )

View File

@ -0,0 +1,53 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"aggregator.go",
"downloader.go",
"priority.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator",
importpath = "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
"//vendor/github.com/emicklei/go-restful:go_default_library",
"//vendor/github.com/go-openapi/spec:go_default_library",
"//vendor/k8s.io/kube-openapi/pkg/aggregator:go_default_library",
"//vendor/k8s.io/kube-openapi/pkg/builder:go_default_library",
"//vendor/k8s.io/kube-openapi/pkg/common:go_default_library",
"//vendor/k8s.io/kube-openapi/pkg/handler:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"downloader_test.go",
"priority_test.go",
],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
"//vendor/github.com/go-openapi/spec:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -14,12 +14,11 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package openapi package aggregator
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"sort"
"sync" "sync"
"time" "time"
@ -34,6 +33,15 @@ import (
"k8s.io/kube-openapi/pkg/handler" "k8s.io/kube-openapi/pkg/handler"
) )
// SpecAggregator calls out to http handlers of APIServices and merges specs. It keeps state of the last
// known specs including the http etag.
type SpecAggregator interface {
AddUpdateAPIService(handler http.Handler, apiService *apiregistration.APIService) error
UpdateAPIServiceSpec(apiServiceName string, spec *spec.Swagger, etag string) error
RemoveAPIServiceSpec(apiServiceName string) error
GetAPIServiceInfo(apiServiceName string) (handler http.Handler, etag string, exists bool)
}
const ( const (
aggregatorUser = "system:aggregator" aggregatorUser = "system:aggregator"
specDownloadTimeout = 60 * time.Second specDownloadTimeout = 60 * time.Second
@ -43,42 +51,16 @@ const (
locallyGeneratedEtagPrefix = "\"6E8F849B434D4B98A569B9D7718876E9-" locallyGeneratedEtagPrefix = "\"6E8F849B434D4B98A569B9D7718876E9-"
) )
type specAggregator struct {
// mutex protects all members of this struct.
rwMutex sync.RWMutex
// Map of API Services' OpenAPI specs by their name
openAPISpecs map[string]*openAPISpecInfo
// provided for dynamic OpenAPI spec
openAPIVersionedService *handler.OpenAPIService
}
var _ AggregationManager = &specAggregator{}
// This function is not thread safe as it only being called on startup.
func (s *specAggregator) addLocalSpec(spec *spec.Swagger, localHandler http.Handler, name, etag string) {
localAPIService := apiregistration.APIService{}
localAPIService.Name = name
s.openAPISpecs[name] = &openAPISpecInfo{
etag: etag,
apiService: localAPIService,
handler: localHandler,
spec: spec,
}
}
// BuildAndRegisterAggregator registered OpenAPI aggregator handler. This function is not thread safe as it only being called on startup. // BuildAndRegisterAggregator registered OpenAPI aggregator handler. This function is not thread safe as it only being called on startup.
func BuildAndRegisterAggregator(downloader *Downloader, delegationTarget server.DelegationTarget, webServices []*restful.WebService, func BuildAndRegisterAggregator(downloader *Downloader, delegationTarget server.DelegationTarget, webServices []*restful.WebService,
config *common.Config, pathHandler common.PathHandler) (AggregationManager, error) { config *common.Config, pathHandler common.PathHandler) (SpecAggregator, error) {
s := &specAggregator{ s := &specAggregator{
openAPISpecs: map[string]*openAPISpecInfo{}, openAPISpecs: map[string]*openAPISpecInfo{},
} }
i := 0 i := 0
// Build Aggregator's spec // Build Aggregator's spec
aggregatorOpenAPISpec, err := builder.BuildOpenAPISpec( aggregatorOpenAPISpec, err := builder.BuildOpenAPISpec(webServices, config)
webServices, config)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -118,6 +100,31 @@ func BuildAndRegisterAggregator(downloader *Downloader, delegationTarget server.
return s, nil return s, nil
} }
type specAggregator struct {
// mutex protects all members of this struct.
rwMutex sync.RWMutex
// Map of API Services' OpenAPI specs by their name
openAPISpecs map[string]*openAPISpecInfo
// provided for dynamic OpenAPI spec
openAPIVersionedService *handler.OpenAPIService
}
var _ SpecAggregator = &specAggregator{}
// This function is not thread safe as it only being called on startup.
func (s *specAggregator) addLocalSpec(spec *spec.Swagger, localHandler http.Handler, name, etag string) {
localAPIService := apiregistration.APIService{}
localAPIService.Name = name
s.openAPISpecs[name] = &openAPISpecInfo{
etag: etag,
apiService: localAPIService,
handler: localHandler,
spec: spec,
}
}
// openAPISpecInfo is used to store OpenAPI spec with its priority. // openAPISpecInfo is used to store OpenAPI spec with its priority.
// It can be used to sort specs with their priorities. // It can be used to sort specs with their priorities.
type openAPISpecInfo struct { type openAPISpecInfo struct {
@ -129,59 +136,6 @@ type openAPISpecInfo struct {
etag string etag string
} }
// byPriority can be used in sort.Sort to sort specs with their priorities.
type byPriority struct {
specs []openAPISpecInfo
groupPriorities map[string]int32
}
func (a byPriority) Len() int { return len(a.specs) }
func (a byPriority) Swap(i, j int) { a.specs[i], a.specs[j] = a.specs[j], a.specs[i] }
func (a byPriority) Less(i, j int) bool {
// All local specs will come first
if a.specs[i].apiService.Spec.Service == nil && a.specs[j].apiService.Spec.Service != nil {
return true
}
if a.specs[i].apiService.Spec.Service != nil && a.specs[j].apiService.Spec.Service == nil {
return false
}
// WARNING: This will result in not following priorities for local APIServices.
if a.specs[i].apiService.Spec.Service == nil {
// Sort local specs with their name. This is the order in the delegation chain (aggregator first).
return a.specs[i].apiService.Name < a.specs[j].apiService.Name
}
var iPriority, jPriority int32
if a.specs[i].apiService.Spec.Group == a.specs[j].apiService.Spec.Group {
iPriority = a.specs[i].apiService.Spec.VersionPriority
jPriority = a.specs[i].apiService.Spec.VersionPriority
} else {
iPriority = a.groupPriorities[a.specs[i].apiService.Spec.Group]
jPriority = a.groupPriorities[a.specs[j].apiService.Spec.Group]
}
if iPriority != jPriority {
// Sort by priority, higher first
return iPriority > jPriority
}
// Sort by service name.
return a.specs[i].apiService.Name < a.specs[j].apiService.Name
}
func sortByPriority(specs []openAPISpecInfo) {
b := byPriority{
specs: specs,
groupPriorities: map[string]int32{},
}
for _, spec := range specs {
if spec.apiService.Spec.Service == nil {
continue
}
if pr, found := b.groupPriorities[spec.apiService.Spec.Group]; !found || spec.apiService.Spec.GroupPriorityMinimum > pr {
b.groupPriorities[spec.apiService.Spec.Group] = spec.apiService.Spec.GroupPriorityMinimum
}
}
sort.Sort(b)
}
// buildOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe. The caller is responsible to hold proper locks. // buildOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe. The caller is responsible to hold proper locks.
func (s *specAggregator) buildOpenAPISpec() (specToReturn *spec.Swagger, err error) { func (s *specAggregator) buildOpenAPISpec() (specToReturn *spec.Swagger, err error) {
specs := []openAPISpecInfo{} specs := []openAPISpecInfo{}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package openapi package aggregator
import ( import (
"crypto/sha512" "crypto/sha512"
@ -38,46 +38,6 @@ func NewDownloader() Downloader {
return Downloader{} return Downloader{}
} }
// inMemoryResponseWriter is a http.Writer that keep the response in memory.
type inMemoryResponseWriter struct {
writeHeaderCalled bool
header http.Header
respCode int
data []byte
}
func newInMemoryResponseWriter() *inMemoryResponseWriter {
return &inMemoryResponseWriter{header: http.Header{}}
}
func (r *inMemoryResponseWriter) Header() http.Header {
return r.header
}
func (r *inMemoryResponseWriter) WriteHeader(code int) {
r.writeHeaderCalled = true
r.respCode = code
}
func (r *inMemoryResponseWriter) Write(in []byte) (int, error) {
if !r.writeHeaderCalled {
r.WriteHeader(http.StatusOK)
}
r.data = append(r.data, in...)
return len(in), nil
}
func (r *inMemoryResponseWriter) String() string {
s := fmt.Sprintf("ResponseCode: %d", r.respCode)
if r.data != nil {
s += fmt.Sprintf(", Body: %s", string(r.data))
}
if r.header != nil {
s += fmt.Sprintf(", Header: %s", r.header)
}
return s
}
func (s *Downloader) handlerWithUser(handler http.Handler, info user.Info) http.Handler { func (s *Downloader) handlerWithUser(handler http.Handler, info user.Info) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
req = req.WithContext(request.WithUser(req.Context(), info)) req = req.WithContext(request.WithUser(req.Context(), info))
@ -141,3 +101,43 @@ func (s *Downloader) Download(handler http.Handler, etag string) (returnSpec *sp
return nil, "", 0, fmt.Errorf("failed to retrieve openAPI spec, http error: %s", writer.String()) return nil, "", 0, fmt.Errorf("failed to retrieve openAPI spec, http error: %s", writer.String())
} }
} }
// inMemoryResponseWriter is a http.Writer that keep the response in memory.
type inMemoryResponseWriter struct {
writeHeaderCalled bool
header http.Header
respCode int
data []byte
}
func newInMemoryResponseWriter() *inMemoryResponseWriter {
return &inMemoryResponseWriter{header: http.Header{}}
}
func (r *inMemoryResponseWriter) Header() http.Header {
return r.header
}
func (r *inMemoryResponseWriter) WriteHeader(code int) {
r.writeHeaderCalled = true
r.respCode = code
}
func (r *inMemoryResponseWriter) Write(in []byte) (int, error) {
if !r.writeHeaderCalled {
r.WriteHeader(http.StatusOK)
}
r.data = append(r.data, in...)
return len(in), nil
}
func (r *inMemoryResponseWriter) String() string {
s := fmt.Sprintf("ResponseCode: %d", r.respCode)
if r.data != nil {
s += fmt.Sprintf(", Body: %s", string(r.data))
}
if r.header != nil {
s += fmt.Sprintf(", Header: %s", r.header)
}
return s
}

View File

@ -14,84 +14,17 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package openapi package aggregator
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"reflect"
"testing" "testing"
"github.com/go-openapi/spec" "github.com/go-openapi/spec"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
) )
func newAPIServiceForTest(name, group string, minGroupPriority, versionPriority int32) apiregistration.APIService {
r := apiregistration.APIService{}
r.Spec.Group = group
r.Spec.GroupPriorityMinimum = minGroupPriority
r.Spec.VersionPriority = versionPriority
r.Spec.Service = &apiregistration.ServiceReference{}
r.Name = name
return r
}
func newLocalAPIServiceForTest(name, group string, minGroupPriority, versionPriority int32) apiregistration.APIService {
r := apiregistration.APIService{}
r.Spec.Group = group
r.Spec.GroupPriorityMinimum = minGroupPriority
r.Spec.VersionPriority = versionPriority
r.Name = name
return r
}
func assertSortedServices(t *testing.T, actual []openAPISpecInfo, expectedNames []string) {
actualNames := []string{}
for _, a := range actual {
actualNames = append(actualNames, a.apiService.Name)
}
if !reflect.DeepEqual(actualNames, expectedNames) {
t.Errorf("Expected %s got %s.", expectedNames, actualNames)
}
}
func TestAPIServiceSort(t *testing.T) {
list := []openAPISpecInfo{
{
apiService: newAPIServiceForTest("FirstService", "Group1", 10, 5),
spec: &spec.Swagger{},
},
{
apiService: newAPIServiceForTest("SecondService", "Group2", 15, 3),
spec: &spec.Swagger{},
},
{
apiService: newAPIServiceForTest("FirstServiceInternal", "Group1", 16, 3),
spec: &spec.Swagger{},
},
{
apiService: newAPIServiceForTest("ThirdService", "Group3", 15, 3),
spec: &spec.Swagger{},
},
{
apiService: newLocalAPIServiceForTest("FirstLocalSpec", "Group1", 15, 5),
spec: &spec.Swagger{},
},
{
apiService: newLocalAPIServiceForTest("SecondLocalSpec", "Group2", 14, 6),
spec: &spec.Swagger{},
},
{
apiService: newLocalAPIServiceForTest("ThirdLocalSpec", "Group3", 16, 3),
spec: &spec.Swagger{},
},
}
sortByPriority(list)
assertSortedServices(t, list, []string{"FirstLocalSpec", "SecondLocalSpec", "ThirdLocalSpec", "FirstService", "FirstServiceInternal", "SecondService", "ThirdService"})
}
type handlerTest struct { type handlerTest struct {
etag string etag string
data []byte data []byte
@ -113,6 +46,32 @@ func (h handlerTest) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Write(h.data) w.Write(h.data)
} }
type handlerDeprecatedTest struct {
etag string
data []byte
}
var _ http.Handler = handlerDeprecatedTest{}
func (h handlerDeprecatedTest) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// old server returns 403 on new endpoint
if r.URL.Path == "/openapi/v2" {
w.WriteHeader(http.StatusForbidden)
return
}
if len(h.etag) > 0 {
w.Header().Add("Etag", h.etag)
}
ifNoneMatches := r.Header["If-None-Match"]
for _, match := range ifNoneMatches {
if match == h.etag {
w.WriteHeader(http.StatusNotModified)
return
}
}
w.Write(h.data)
}
func assertDownloadedSpec(actualSpec *spec.Swagger, actualEtag string, err error, func assertDownloadedSpec(actualSpec *spec.Swagger, actualEtag string, err error,
expectedSpecID string, expectedEtag string) error { expectedSpecID string, expectedEtag string) error {
if err != nil { if err != nil {
@ -131,7 +90,6 @@ func assertDownloadedSpec(actualSpec *spec.Swagger, actualEtag string, err error
} }
func TestDownloadOpenAPISpec(t *testing.T) { func TestDownloadOpenAPISpec(t *testing.T) {
s := Downloader{} s := Downloader{}
// Test with no eTag // Test with no eTag

View File

@ -0,0 +1,74 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregator
import (
"sort"
)
// byPriority can be used in sort.Sort to sort specs with their priorities.
type byPriority struct {
specs []openAPISpecInfo
groupPriorities map[string]int32
}
func (a byPriority) Len() int { return len(a.specs) }
func (a byPriority) Swap(i, j int) { a.specs[i], a.specs[j] = a.specs[j], a.specs[i] }
func (a byPriority) Less(i, j int) bool {
// All local specs will come first
if a.specs[i].apiService.Spec.Service == nil && a.specs[j].apiService.Spec.Service != nil {
return true
}
if a.specs[i].apiService.Spec.Service != nil && a.specs[j].apiService.Spec.Service == nil {
return false
}
// WARNING: This will result in not following priorities for local APIServices.
if a.specs[i].apiService.Spec.Service == nil {
// Sort local specs with their name. This is the order in the delegation chain (aggregator first).
return a.specs[i].apiService.Name < a.specs[j].apiService.Name
}
var iPriority, jPriority int32
if a.specs[i].apiService.Spec.Group == a.specs[j].apiService.Spec.Group {
iPriority = a.specs[i].apiService.Spec.VersionPriority
jPriority = a.specs[i].apiService.Spec.VersionPriority
} else {
iPriority = a.groupPriorities[a.specs[i].apiService.Spec.Group]
jPriority = a.groupPriorities[a.specs[j].apiService.Spec.Group]
}
if iPriority != jPriority {
// Sort by priority, higher first
return iPriority > jPriority
}
// Sort by service name.
return a.specs[i].apiService.Name < a.specs[j].apiService.Name
}
func sortByPriority(specs []openAPISpecInfo) {
b := byPriority{
specs: specs,
groupPriorities: map[string]int32{},
}
for _, spec := range specs {
if spec.apiService.Spec.Service == nil {
continue
}
if pr, found := b.groupPriorities[spec.apiService.Spec.Group]; !found || spec.apiService.Spec.GroupPriorityMinimum > pr {
b.groupPriorities[spec.apiService.Spec.Group] = spec.apiService.Spec.GroupPriorityMinimum
}
}
sort.Sort(b)
}

View File

@ -0,0 +1,69 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregator
import (
"reflect"
"testing"
"github.com/go-openapi/spec"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
)
func newAPIServiceForTest(name, group string, minGroupPriority, versionPriority int32) apiregistration.APIService {
r := apiregistration.APIService{}
r.Spec.Group = group
r.Spec.GroupPriorityMinimum = minGroupPriority
r.Spec.VersionPriority = versionPriority
r.Spec.Service = &apiregistration.ServiceReference{}
r.Name = name
return r
}
func assertSortedServices(t *testing.T, actual []openAPISpecInfo, expectedNames []string) {
actualNames := []string{}
for _, a := range actual {
actualNames = append(actualNames, a.apiService.Name)
}
if !reflect.DeepEqual(actualNames, expectedNames) {
t.Errorf("Expected %s got %s.", expectedNames, actualNames)
}
}
func TestAPIServiceSort(t *testing.T) {
list := []openAPISpecInfo{
{
apiService: newAPIServiceForTest("FirstService", "Group1", 10, 5),
spec: &spec.Swagger{},
},
{
apiService: newAPIServiceForTest("SecondService", "Group2", 15, 3),
spec: &spec.Swagger{},
},
{
apiService: newAPIServiceForTest("FirstServiceInternal", "Group1", 16, 3),
spec: &spec.Swagger{},
},
{
apiService: newAPIServiceForTest("ThirdService", "Group3", 15, 3),
spec: &spec.Swagger{},
},
}
sortByPriority(list)
assertSortedServices(t, list, []string{"FirstService", "FirstServiceInternal", "SecondService", "ThirdService"})
}

View File

@ -21,13 +21,12 @@ import (
"net/http" "net/http"
"time" "time"
"github.com/go-openapi/spec"
"k8s.io/klog"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/kube-aggregator/pkg/apis/apiregistration" "k8s.io/kube-aggregator/pkg/apis/apiregistration"
"k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator"
) )
const ( const (
@ -43,27 +42,19 @@ const (
syncNothing syncNothing
) )
// AggregationManager is the interface between this controller and OpenAPI Aggregator service.
type AggregationManager interface {
AddUpdateAPIService(handler http.Handler, apiService *apiregistration.APIService) error
UpdateAPIServiceSpec(apiServiceName string, spec *spec.Swagger, etag string) error
RemoveAPIServiceSpec(apiServiceName string) error
GetAPIServiceInfo(apiServiceName string) (handler http.Handler, etag string, exists bool)
}
// AggregationController periodically check for changes in OpenAPI specs of APIServices and update/remove // AggregationController periodically check for changes in OpenAPI specs of APIServices and update/remove
// them if necessary. // them if necessary.
type AggregationController struct { type AggregationController struct {
openAPIAggregationManager AggregationManager openAPIAggregationManager aggregator.SpecAggregator
queue workqueue.RateLimitingInterface queue workqueue.RateLimitingInterface
downloader *Downloader downloader *aggregator.Downloader
// To allow injection for testing. // To allow injection for testing.
syncHandler func(key string) (syncAction, error) syncHandler func(key string) (syncAction, error)
} }
// NewAggregationController creates new OpenAPI aggregation controller. // NewAggregationController creates new OpenAPI aggregation controller.
func NewAggregationController(downloader *Downloader, openAPIAggregationManager AggregationManager) *AggregationController { func NewAggregationController(downloader *aggregator.Downloader, openAPIAggregationManager aggregator.SpecAggregator) *AggregationController {
c := &AggregationController{ c := &AggregationController{
openAPIAggregationManager: openAPIAggregationManager, openAPIAggregationManager: openAPIAggregationManager,
queue: workqueue.NewNamedRateLimitingQueue( queue: workqueue.NewNamedRateLimitingQueue(