kube-aggregator: split openapi spec aggregator from controller code

This commit is contained in:
Dr. Stefan Schimanski 2019-02-12 12:25:06 +01:00
parent 3c2a4f0362
commit 749d98c2a8
8 changed files with 131 additions and 96 deletions

View File

@ -78,6 +78,7 @@ go_library(
"//staging/src/k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/controllers:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/controllers:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/controllers/status:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/controllers/status:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/registry/apiservice/rest:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/registry/apiservice/rest:go_default_library",
"//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/klog:go_default_library",

View File

@ -35,6 +35,7 @@ import (
informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion" informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion"
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion" listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion"
openapicontroller "k8s.io/kube-aggregator/pkg/controllers/openapi" openapicontroller "k8s.io/kube-aggregator/pkg/controllers/openapi"
openapiaggregator "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator"
statuscontrollers "k8s.io/kube-aggregator/pkg/controllers/status" statuscontrollers "k8s.io/kube-aggregator/pkg/controllers/status"
apiservicerest "k8s.io/kube-aggregator/pkg/registry/apiservice/rest" apiservicerest "k8s.io/kube-aggregator/pkg/registry/apiservice/rest"
) )
@ -206,8 +207,8 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
}) })
if openAPIConfig != nil { if openAPIConfig != nil {
specDownloader := openapicontroller.NewDownloader() specDownloader := openapiaggregator.NewDownloader()
openAPIAggregator, err := openapicontroller.BuildAndRegisterAggregator( openAPIAggregator, err := openapiaggregator.BuildAndRegisterAggregator(
&specDownloader, &specDownloader,
delegationTarget, delegationTarget,
s.GenericAPIServer.Handler.GoRestfulContainer.RegisteredWebServices(), s.GenericAPIServer.Handler.GoRestfulContainer.RegisteredWebServices(),

View File

@ -1,41 +1,18 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = ["controller.go"],
"aggregator.go",
"controller.go",
"downloader.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/kube-aggregator/pkg/controllers/openapi", importmap = "k8s.io/kubernetes/vendor/k8s.io/kube-aggregator/pkg/controllers/openapi",
importpath = "k8s.io/kube-aggregator/pkg/controllers/openapi", importpath = "k8s.io/kube-aggregator/pkg/controllers/openapi",
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
"//vendor/github.com/emicklei/go-restful:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator:go_default_library",
"//vendor/github.com/go-openapi/spec:go_default_library",
"//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/kube-openapi/pkg/aggregator:go_default_library",
"//vendor/k8s.io/kube-openapi/pkg/builder:go_default_library",
"//vendor/k8s.io/kube-openapi/pkg/common:go_default_library",
"//vendor/k8s.io/kube-openapi/pkg/handler:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["aggregator_test.go"],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
"//vendor/github.com/go-openapi/spec:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
], ],
) )
@ -48,7 +25,10 @@ filegroup(
filegroup( filegroup(
name = "all-srcs", name = "all-srcs",
srcs = [":package-srcs"], srcs = [
":package-srcs",
"//staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator:all-srcs",
],
tags = ["automanaged"], tags = ["automanaged"],
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
) )

View File

@ -0,0 +1,49 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"aggregator.go",
"downloader.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator",
importpath = "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
"//vendor/github.com/emicklei/go-restful:go_default_library",
"//vendor/github.com/go-openapi/spec:go_default_library",
"//vendor/k8s.io/kube-openapi/pkg/aggregator:go_default_library",
"//vendor/k8s.io/kube-openapi/pkg/builder:go_default_library",
"//vendor/k8s.io/kube-openapi/pkg/common:go_default_library",
"//vendor/k8s.io/kube-openapi/pkg/handler:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["aggregator_test.go"],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
"//vendor/github.com/go-openapi/spec:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package openapi package aggregator
import ( import (
"fmt" "fmt"
@ -34,6 +34,15 @@ import (
"k8s.io/kube-openapi/pkg/handler" "k8s.io/kube-openapi/pkg/handler"
) )
// SpecAggregator calls out to http handlers of APIServices and merges specs. It keeps state of the last
// known specs including the http etag.
type SpecAggregator interface {
AddUpdateAPIService(handler http.Handler, apiService *apiregistration.APIService) error
UpdateAPIServiceSpec(apiServiceName string, spec *spec.Swagger, etag string) error
RemoveAPIServiceSpec(apiServiceName string) error
GetAPIServiceInfo(apiServiceName string) (handler http.Handler, etag string, exists bool)
}
const ( const (
aggregatorUser = "system:aggregator" aggregatorUser = "system:aggregator"
specDownloadTimeout = 60 * time.Second specDownloadTimeout = 60 * time.Second
@ -43,42 +52,16 @@ const (
locallyGeneratedEtagPrefix = "\"6E8F849B434D4B98A569B9D7718876E9-" locallyGeneratedEtagPrefix = "\"6E8F849B434D4B98A569B9D7718876E9-"
) )
type specAggregator struct {
// mutex protects all members of this struct.
rwMutex sync.RWMutex
// Map of API Services' OpenAPI specs by their name
openAPISpecs map[string]*openAPISpecInfo
// provided for dynamic OpenAPI spec
openAPIVersionedService *handler.OpenAPIService
}
var _ AggregationManager = &specAggregator{}
// This function is not thread safe as it only being called on startup.
func (s *specAggregator) addLocalSpec(spec *spec.Swagger, localHandler http.Handler, name, etag string) {
localAPIService := apiregistration.APIService{}
localAPIService.Name = name
s.openAPISpecs[name] = &openAPISpecInfo{
etag: etag,
apiService: localAPIService,
handler: localHandler,
spec: spec,
}
}
// BuildAndRegisterAggregator registered OpenAPI aggregator handler. This function is not thread safe as it only being called on startup. // BuildAndRegisterAggregator registered OpenAPI aggregator handler. This function is not thread safe as it only being called on startup.
func BuildAndRegisterAggregator(downloader *Downloader, delegationTarget server.DelegationTarget, webServices []*restful.WebService, func BuildAndRegisterAggregator(downloader *Downloader, delegationTarget server.DelegationTarget, webServices []*restful.WebService,
config *common.Config, pathHandler common.PathHandler) (AggregationManager, error) { config *common.Config, pathHandler common.PathHandler) (SpecAggregator, error) {
s := &specAggregator{ s := &specAggregator{
openAPISpecs: map[string]*openAPISpecInfo{}, openAPISpecs: map[string]*openAPISpecInfo{},
} }
i := 0 i := 0
// Build Aggregator's spec // Build Aggregator's spec
aggregatorOpenAPISpec, err := builder.BuildOpenAPISpec( aggregatorOpenAPISpec, err := builder.BuildOpenAPISpec(webServices, config)
webServices, config)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -118,6 +101,31 @@ func BuildAndRegisterAggregator(downloader *Downloader, delegationTarget server.
return s, nil return s, nil
} }
type specAggregator struct {
// mutex protects all members of this struct.
rwMutex sync.RWMutex
// Map of API Services' OpenAPI specs by their name
openAPISpecs map[string]*openAPISpecInfo
// provided for dynamic OpenAPI spec
openAPIVersionedService *handler.OpenAPIService
}
var _ SpecAggregator = &specAggregator{}
// This function is not thread safe as it only being called on startup.
func (s *specAggregator) addLocalSpec(spec *spec.Swagger, localHandler http.Handler, name, etag string) {
localAPIService := apiregistration.APIService{}
localAPIService.Name = name
s.openAPISpecs[name] = &openAPISpecInfo{
etag: etag,
apiService: localAPIService,
handler: localHandler,
spec: spec,
}
}
// openAPISpecInfo is used to store OpenAPI spec with its priority. // openAPISpecInfo is used to store OpenAPI spec with its priority.
// It can be used to sort specs with their priorities. // It can be used to sort specs with their priorities.
type openAPISpecInfo struct { type openAPISpecInfo struct {

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package openapi package aggregator
import ( import (
"fmt" "fmt"
@ -38,15 +38,6 @@ func newAPIServiceForTest(name, group string, minGroupPriority, versionPriority
return r return r
} }
func newLocalAPIServiceForTest(name, group string, minGroupPriority, versionPriority int32) apiregistration.APIService {
r := apiregistration.APIService{}
r.Spec.Group = group
r.Spec.GroupPriorityMinimum = minGroupPriority
r.Spec.VersionPriority = versionPriority
r.Name = name
return r
}
func assertSortedServices(t *testing.T, actual []openAPISpecInfo, expectedNames []string) { func assertSortedServices(t *testing.T, actual []openAPISpecInfo, expectedNames []string) {
actualNames := []string{} actualNames := []string{}
for _, a := range actual { for _, a := range actual {
@ -75,21 +66,9 @@ func TestAPIServiceSort(t *testing.T) {
apiService: newAPIServiceForTest("ThirdService", "Group3", 15, 3), apiService: newAPIServiceForTest("ThirdService", "Group3", 15, 3),
spec: &spec.Swagger{}, spec: &spec.Swagger{},
}, },
{
apiService: newLocalAPIServiceForTest("FirstLocalSpec", "Group1", 15, 5),
spec: &spec.Swagger{},
},
{
apiService: newLocalAPIServiceForTest("SecondLocalSpec", "Group2", 14, 6),
spec: &spec.Swagger{},
},
{
apiService: newLocalAPIServiceForTest("ThirdLocalSpec", "Group3", 16, 3),
spec: &spec.Swagger{},
},
} }
sortByPriority(list) sortByPriority(list)
assertSortedServices(t, list, []string{"FirstLocalSpec", "SecondLocalSpec", "ThirdLocalSpec", "FirstService", "FirstServiceInternal", "SecondService", "ThirdService"}) assertSortedServices(t, list, []string{"FirstService", "FirstServiceInternal", "SecondService", "ThirdService"})
} }
type handlerTest struct { type handlerTest struct {
@ -113,6 +92,32 @@ func (h handlerTest) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Write(h.data) w.Write(h.data)
} }
type handlerDeprecatedTest struct {
etag string
data []byte
}
var _ http.Handler = handlerDeprecatedTest{}
func (h handlerDeprecatedTest) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// old server returns 403 on new endpoint
if r.URL.Path == "/openapi/v2" {
w.WriteHeader(http.StatusForbidden)
return
}
if len(h.etag) > 0 {
w.Header().Add("Etag", h.etag)
}
ifNoneMatches := r.Header["If-None-Match"]
for _, match := range ifNoneMatches {
if match == h.etag {
w.WriteHeader(http.StatusNotModified)
return
}
}
w.Write(h.data)
}
func assertDownloadedSpec(actualSpec *spec.Swagger, actualEtag string, err error, func assertDownloadedSpec(actualSpec *spec.Swagger, actualEtag string, err error,
expectedSpecID string, expectedEtag string) error { expectedSpecID string, expectedEtag string) error {
if err != nil { if err != nil {

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package openapi package aggregator
import ( import (
"crypto/sha512" "crypto/sha512"

View File

@ -21,13 +21,12 @@ import (
"net/http" "net/http"
"time" "time"
"github.com/go-openapi/spec"
"k8s.io/klog"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/kube-aggregator/pkg/apis/apiregistration" "k8s.io/kube-aggregator/pkg/apis/apiregistration"
"k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator"
) )
const ( const (
@ -43,27 +42,19 @@ const (
syncNothing syncNothing
) )
// AggregationManager is the interface between this controller and OpenAPI Aggregator service.
type AggregationManager interface {
AddUpdateAPIService(handler http.Handler, apiService *apiregistration.APIService) error
UpdateAPIServiceSpec(apiServiceName string, spec *spec.Swagger, etag string) error
RemoveAPIServiceSpec(apiServiceName string) error
GetAPIServiceInfo(apiServiceName string) (handler http.Handler, etag string, exists bool)
}
// AggregationController periodically check for changes in OpenAPI specs of APIServices and update/remove // AggregationController periodically check for changes in OpenAPI specs of APIServices and update/remove
// them if necessary. // them if necessary.
type AggregationController struct { type AggregationController struct {
openAPIAggregationManager AggregationManager openAPIAggregationManager aggregator.SpecAggregator
queue workqueue.RateLimitingInterface queue workqueue.RateLimitingInterface
downloader *Downloader downloader *aggregator.Downloader
// To allow injection for testing. // To allow injection for testing.
syncHandler func(key string) (syncAction, error) syncHandler func(key string) (syncAction, error)
} }
// NewAggregationController creates new OpenAPI aggregation controller. // NewAggregationController creates new OpenAPI aggregation controller.
func NewAggregationController(downloader *Downloader, openAPIAggregationManager AggregationManager) *AggregationController { func NewAggregationController(downloader *aggregator.Downloader, openAPIAggregationManager aggregator.SpecAggregator) *AggregationController {
c := &AggregationController{ c := &AggregationController{
openAPIAggregationManager: openAPIAggregationManager, openAPIAggregationManager: openAPIAggregationManager,
queue: workqueue.NewNamedRateLimitingQueue( queue: workqueue.NewNamedRateLimitingQueue(