Update OpenAPI Aggregator

This commit is contained in:
Jefftree 2023-05-23 21:39:11 +00:00
parent b2a9c06b2e
commit ea23e13463
6 changed files with 628 additions and 353 deletions

View File

@ -17,331 +17,240 @@ limitations under the License.
package aggregator
import (
"crypto/sha256"
"errors"
"fmt"
"net/http"
"strings"
"sync"
"time"
restful "github.com/emicklei/go-restful/v3"
"k8s.io/klog/v2"
"k8s.io/apiserver/pkg/server"
"k8s.io/klog/v2"
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
"k8s.io/kube-openapi/pkg/aggregator"
"k8s.io/kube-openapi/pkg/builder"
"k8s.io/kube-openapi/pkg/cached"
"k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/common/restfuladapter"
"k8s.io/kube-openapi/pkg/handler"
"k8s.io/kube-openapi/pkg/validation/spec"
)
var ErrAPIServiceNotFound = errors.New("resource not found")
// SpecAggregator calls out to http handlers of APIServices and merges specs. It keeps state of the last
// known specs including the http etag.
type SpecAggregator interface {
AddUpdateAPIService(handler http.Handler, apiService *v1.APIService) error
UpdateAPIServiceSpec(apiServiceName string, spec *spec.Swagger, etag string) error
RemoveAPIServiceSpec(apiServiceName string) error
GetAPIServiceInfo(apiServiceName string) (handler http.Handler, etag string, exists bool)
GetAPIServiceNames() []string
AddUpdateAPIService(apiService *v1.APIService, handler http.Handler) error
UpdateAPIServiceSpec(apiServiceName string) error
RemoveAPIService(apiServiceName string) error
}
const (
aggregatorUser = "system:aggregator"
specDownloadTimeout = 60 * time.Second
localDelegateChainNamePrefix = "k8s_internal_local_delegation_chain_"
localDelegateChainNamePattern = localDelegateChainNamePrefix + "%010d"
specDownloadTimeout = time.Minute
localDelegateChainNamePattern = "k8s_internal_local_delegation_chain_%010d"
// A randomly generated UUID to differentiate local and remote eTags.
locallyGeneratedEtagPrefix = "\"6E8F849B434D4B98A569B9D7718876E9-"
)
// IsLocalAPIService returns true for local specs from delegates.
func IsLocalAPIService(apiServiceName string) bool {
return strings.HasPrefix(apiServiceName, localDelegateChainNamePrefix)
// openAPISpecInfo is used to store OpenAPI specs.
// The apiService object is used to sort specs with their priorities.
type openAPISpecInfo struct {
apiService v1.APIService
// spec is the cached OpenAPI spec
spec cached.Replaceable[*spec.Swagger]
// The downloader is used only for non-local apiservices to
// re-update the spec every so often.
downloader cached.Data[*spec.Swagger]
}
// GetAPIServiceNames returns the names of APIServices recorded in specAggregator.openAPISpecs.
// We use this function to pass the names of local APIServices to the controller in this package,
// so that the controller can periodically sync the OpenAPI spec from delegation API servers.
func (s *specAggregator) GetAPIServiceNames() []string {
names := make([]string, 0, len(s.openAPISpecs))
for key := range s.openAPISpecs {
names = append(names, key)
type specAggregator struct {
// mutex protects the specsByAPIServiceName map and its contents.
mutex sync.Mutex
// Map of API Services' OpenAPI specs by their name
specsByAPIServiceName map[string]*openAPISpecInfo
// provided for dynamic OpenAPI spec
openAPIVersionedService *handler.OpenAPIService
downloader *Downloader
}
func buildAndRegisterSpecAggregatorForLocalServices(downloader *Downloader, aggregatorSpec *spec.Swagger, delegationHandlers []http.Handler, pathHandler common.PathHandler) *specAggregator {
s := &specAggregator{
downloader: downloader,
specsByAPIServiceName: map[string]*openAPISpecInfo{},
}
return names
cachedAggregatorSpec := cached.NewResultOK(aggregatorSpec, "never-changes")
s.addLocalSpec(fmt.Sprintf(localDelegateChainNamePattern, 0), cachedAggregatorSpec)
for i, handler := range delegationHandlers {
name := fmt.Sprintf(localDelegateChainNamePattern, i+1)
spec := NewCacheableDownloader(downloader, handler)
spec = decorateError(name, spec)
s.addLocalSpec(name, spec)
}
s.openAPIVersionedService = handler.NewOpenAPIServiceLazy(s.buildMergeSpecLocked())
s.openAPIVersionedService.RegisterOpenAPIVersionedService("/openapi/v2", pathHandler)
return s
}
// BuildAndRegisterAggregator registered OpenAPI aggregator handler. This function is not thread safe as it only being called on startup.
func BuildAndRegisterAggregator(downloader *Downloader, delegationTarget server.DelegationTarget, webServices []*restful.WebService,
config *common.Config, pathHandler common.PathHandler) (SpecAggregator, error) {
s := &specAggregator{
openAPISpecs: map[string]*openAPISpecInfo{},
}
i := 0
// Build Aggregator's spec
aggregatorOpenAPISpec, err := builder.BuildOpenAPISpecFromRoutes(restfuladapter.AdaptWebServices(webServices), config)
if err != nil {
return nil, err
}
aggregatorOpenAPISpec.Definitions = handler.PruneDefaults(aggregatorOpenAPISpec.Definitions)
var delegationHandlers []http.Handler
// Reserving non-name spec for aggregator's Spec.
s.addLocalSpec(aggregatorOpenAPISpec, nil, fmt.Sprintf(localDelegateChainNamePattern, i), "")
i++
for delegate := delegationTarget; delegate != nil; delegate = delegate.NextDelegate() {
handler := delegate.UnprotectedHandler()
if handler == nil {
continue
}
delegateSpec, etag, _, err := downloader.Download(handler, "")
if err != nil {
// ignore errors for the empty delegate we attach at the end the chain
// atm the empty delegate returns 503 when the server hasn't been fully initialized
// and the spec downloader only silences 404s
if len(delegate.ListedPaths()) == 0 && delegate.NextDelegate() == nil {
continue
}
return nil, err
}
if delegateSpec == nil {
// ignore errors for the empty delegate we attach at the end the chain
// atm the empty delegate returns 503 when the server hasn't been fully initialized
// and the spec downloader only silences 404s
if len(delegate.ListedPaths()) == 0 && delegate.NextDelegate() == nil {
continue
}
s.addLocalSpec(delegateSpec, handler, fmt.Sprintf(localDelegateChainNamePattern, i), etag)
i++
delegationHandlers = append(delegationHandlers, handler)
}
// Build initial spec to serve.
klog.V(2).Infof("Building initial OpenAPI spec")
defer func(start time.Time) {
duration := time.Since(start)
klog.V(2).Infof("Finished initial OpenAPI spec generation after %v", duration)
regenerationCounter.With(map[string]string{"apiservice": "*", "reason": "startup"})
regenerationDurationGauge.With(map[string]string{"reason": "startup"}).Set(duration.Seconds())
}(time.Now())
specToServe, err := s.buildOpenAPISpec()
if err != nil {
return nil, err
}
// Install handler
s.openAPIVersionedService = handler.NewOpenAPIService(specToServe)
s.openAPIVersionedService.RegisterOpenAPIVersionedService("/openapi/v2", pathHandler)
s := buildAndRegisterSpecAggregatorForLocalServices(downloader, aggregatorOpenAPISpec, delegationHandlers, pathHandler)
return s, nil
}
type specAggregator struct {
// mutex protects all members of this struct.
rwMutex sync.RWMutex
// Map of API Services' OpenAPI specs by their name
openAPISpecs map[string]*openAPISpecInfo
// provided for dynamic OpenAPI spec
openAPIVersionedService *handler.OpenAPIService
}
var _ SpecAggregator = &specAggregator{}
// This function is not thread safe as it only being called on startup.
func (s *specAggregator) addLocalSpec(spec *spec.Swagger, localHandler http.Handler, name, etag string) {
localAPIService := v1.APIService{}
localAPIService.Name = name
s.openAPISpecs[name] = &openAPISpecInfo{
etag: etag,
apiService: localAPIService,
handler: localHandler,
spec: spec,
func (s *specAggregator) addLocalSpec(name string, spec cached.Data[*spec.Swagger]) {
service := v1.APIService{}
service.Name = name
info := &openAPISpecInfo{
apiService: service,
}
info.spec.Replace(spec)
s.specsByAPIServiceName[name] = info
}
// openAPISpecInfo is used to store OpenAPI spec with its priority.
// It can be used to sort specs with their priorities.
type openAPISpecInfo struct {
apiService v1.APIService
// buildMergeSpecLocked creates a new cached mergeSpec from the list of cached specs.
func (s *specAggregator) buildMergeSpecLocked() cached.Data[*spec.Swagger] {
apiServices := make([]*v1.APIService, 0, len(s.specsByAPIServiceName))
for k := range s.specsByAPIServiceName {
apiServices = append(apiServices, &s.specsByAPIServiceName[k].apiService)
}
sortByPriority(apiServices)
caches := make([]cached.Data[*spec.Swagger], len(apiServices))
for i, apiService := range apiServices {
caches[i] = &(s.specsByAPIServiceName[apiService.Name].spec)
}
// Specification of this API Service. If null then the spec is not loaded yet.
spec *spec.Swagger
handler http.Handler
etag string
}
// buildOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe. The caller is responsible to hold proper locks.
func (s *specAggregator) buildOpenAPISpec() (specToReturn *spec.Swagger, err error) {
specs := []openAPISpecInfo{}
for _, specInfo := range s.openAPISpecs {
if specInfo.spec == nil {
continue
return cached.NewListMerger(func(results []cached.Result[*spec.Swagger]) cached.Result[*spec.Swagger] {
var merged *spec.Swagger
etags := make([]string, 0, len(results))
for _, specInfo := range results {
result := specInfo.Get()
if result.Err != nil {
// APIService name and err message will be included in
// the error message as part of decorateError
klog.Warning(result.Err)
continue
}
if merged == nil {
merged = &spec.Swagger{}
*merged = *result.Data
// Paths, Definitions and parameters are set by
// MergeSpecsIgnorePathConflictRenamingDefinitionsAndParameters
merged.Paths = nil
merged.Definitions = nil
merged.Parameters = nil
}
etags = append(etags, result.Etag)
if err := aggregator.MergeSpecsIgnorePathConflictRenamingDefinitionsAndParameters(merged, result.Data); err != nil {
return cached.NewResultErr[*spec.Swagger](fmt.Errorf("failed to build merge specs: %v", err))
}
}
// Copy the spec before removing the defaults.
localSpec := *specInfo.spec
localSpecInfo := *specInfo
localSpecInfo.spec = &localSpec
localSpecInfo.spec.Definitions = handler.PruneDefaults(specInfo.spec.Definitions)
specs = append(specs, localSpecInfo)
}
if len(specs) == 0 {
return &spec.Swagger{}, nil
}
sortByPriority(specs)
for _, specInfo := range specs {
if specToReturn == nil {
specToReturn = &spec.Swagger{}
*specToReturn = *specInfo.spec
// Paths, Definitions and parameters are set by MergeSpecsIgnorePathConflict
specToReturn.Paths = nil
specToReturn.Definitions = nil
specToReturn.Parameters = nil
}
if err := aggregator.MergeSpecsIgnorePathConflictRenamingDefinitionsAndParameters(specToReturn, specInfo.spec); err != nil {
return nil, err
}
}
return specToReturn, nil
// Printing the etags list is stable because it is sorted.
return cached.NewResultOK(merged, fmt.Sprintf("%x", sha256.Sum256([]byte(fmt.Sprintf("%#v", etags)))))
}, caches)
}
// updateOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe. The caller is responsible to hold proper locks.
func (s *specAggregator) updateOpenAPISpec() error {
if s.openAPIVersionedService == nil {
return nil
}
specToServe, err := s.buildOpenAPISpec()
if err != nil {
return err
}
return s.openAPIVersionedService.UpdateSpec(specToServe)
}
// tryUpdatingServiceSpecs tries updating openAPISpecs map with specified specInfo, and keeps the map intact
// if the update fails.
func (s *specAggregator) tryUpdatingServiceSpecs(specInfo *openAPISpecInfo) error {
if specInfo == nil {
return fmt.Errorf("invalid input: specInfo must be non-nil")
}
_, updated := s.openAPISpecs[specInfo.apiService.Name]
origSpecInfo, existedBefore := s.openAPISpecs[specInfo.apiService.Name]
s.openAPISpecs[specInfo.apiService.Name] = specInfo
// Skip aggregation if OpenAPI spec didn't change
if existedBefore && origSpecInfo != nil && origSpecInfo.etag == specInfo.etag {
return nil
}
klog.V(2).Infof("Updating OpenAPI spec because %s is updated", specInfo.apiService.Name)
defer func(start time.Time) {
duration := time.Since(start)
klog.V(2).Infof("Finished OpenAPI spec generation after %v", duration)
reason := "add"
if updated {
reason = "update"
}
regenerationCounter.With(map[string]string{"apiservice": specInfo.apiService.Name, "reason": reason})
regenerationDurationGauge.With(map[string]string{"reason": reason}).Set(duration.Seconds())
}(time.Now())
if err := s.updateOpenAPISpec(); err != nil {
if existedBefore {
s.openAPISpecs[specInfo.apiService.Name] = origSpecInfo
} else {
delete(s.openAPISpecs, specInfo.apiService.Name)
}
return err
}
return nil
}
// tryDeleteServiceSpecs tries delete specified specInfo from openAPISpecs map, and keeps the map intact
// if the update fails.
func (s *specAggregator) tryDeleteServiceSpecs(apiServiceName string) error {
orgSpecInfo, exists := s.openAPISpecs[apiServiceName]
// updateServiceLocked updates the spec cache by downloading the latest
// version of the spec.
func (s *specAggregator) updateServiceLocked(name string) error {
specInfo, exists := s.specsByAPIServiceName[name]
if !exists {
return ErrAPIServiceNotFound
}
result := specInfo.downloader.Get()
filteredResult := cached.NewTransformer[*spec.Swagger](func(result cached.Result[*spec.Swagger]) cached.Result[*spec.Swagger] {
if result.Err != nil {
return result
}
return cached.NewResultOK(aggregator.FilterSpecByPathsWithoutSideEffects(result.Data, []string{"/apis/"}), result.Etag)
}, result)
specInfo.spec.Replace(filteredResult)
return result.Err
}
// UpdateAPIServiceSpec updates the api service. It is thread safe.
func (s *specAggregator) UpdateAPIServiceSpec(apiServiceName string) error {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.updateServiceLocked(apiServiceName)
}
// AddUpdateAPIService adds the api service. It is thread safe. If the
// apiservice already exists, it will be updated.
func (s *specAggregator) AddUpdateAPIService(apiService *v1.APIService, handler http.Handler) error {
if apiService.Spec.Service == nil {
return nil
}
delete(s.openAPISpecs, apiServiceName)
klog.V(2).Infof("Updating OpenAPI spec because %s is removed", apiServiceName)
defer func(start time.Time) {
duration := time.Since(start)
klog.V(2).Infof("Finished OpenAPI spec generation after %v", duration)
s.mutex.Lock()
defer s.mutex.Unlock()
regenerationCounter.With(map[string]string{"apiservice": apiServiceName, "reason": "delete"})
regenerationDurationGauge.With(map[string]string{"reason": "delete"}).Set(duration.Seconds())
}(time.Now())
if err := s.updateOpenAPISpec(); err != nil {
s.openAPISpecs[apiServiceName] = orgSpecInfo
return err
_, exists := s.specsByAPIServiceName[apiService.Name]
if !exists {
s.specsByAPIServiceName[apiService.Name] = &openAPISpecInfo{
apiService: *apiService,
downloader: decorateError(apiService.Name, NewCacheableDownloader(s.downloader, handler)),
}
s.openAPIVersionedService.UpdateSpecLazy(s.buildMergeSpecLocked())
}
return s.updateServiceLocked(apiService.Name)
}
// RemoveAPIService removes an api service from OpenAPI aggregation. If it does not exist, no error is returned.
// It is thread safe.
func (s *specAggregator) RemoveAPIService(apiServiceName string) error {
s.mutex.Lock()
defer s.mutex.Unlock()
if _, exists := s.specsByAPIServiceName[apiServiceName]; !exists {
return ErrAPIServiceNotFound
}
delete(s.specsByAPIServiceName, apiServiceName)
// Re-create the mergeSpec for the new list of apiservices
s.openAPIVersionedService.UpdateSpecLazy(s.buildMergeSpecLocked())
return nil
}
// UpdateAPIServiceSpec updates the api service's OpenAPI spec. It is thread safe.
func (s *specAggregator) UpdateAPIServiceSpec(apiServiceName string, spec *spec.Swagger, etag string) error {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
specInfo, existingService := s.openAPISpecs[apiServiceName]
if !existingService {
return fmt.Errorf("APIService %q does not exists", apiServiceName)
}
// For APIServices (non-local) specs, only merge their /apis/ prefixed endpoint as it is the only paths
// proxy handler delegates.
if specInfo.apiService.Spec.Service != nil {
spec = aggregator.FilterSpecByPathsWithoutSideEffects(spec, []string{"/apis/"})
}
return s.tryUpdatingServiceSpecs(&openAPISpecInfo{
apiService: specInfo.apiService,
spec: spec,
handler: specInfo.handler,
etag: etag,
})
}
// AddUpdateAPIService adds or updates the api service. It is thread safe.
func (s *specAggregator) AddUpdateAPIService(handler http.Handler, apiService *v1.APIService) error {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
if apiService.Spec.Service == nil {
// All local specs should be already aggregated using local delegate chain
return nil
}
newSpec := &openAPISpecInfo{
apiService: *apiService,
handler: handler,
}
if specInfo, existingService := s.openAPISpecs[apiService.Name]; existingService {
newSpec.etag = specInfo.etag
newSpec.spec = specInfo.spec
}
return s.tryUpdatingServiceSpecs(newSpec)
}
// RemoveAPIServiceSpec removes an api service from OpenAPI aggregation. If it does not exist, no error is returned.
// It is thread safe.
func (s *specAggregator) RemoveAPIServiceSpec(apiServiceName string) error {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
if _, existingService := s.openAPISpecs[apiServiceName]; !existingService {
return nil
}
return s.tryDeleteServiceSpecs(apiServiceName)
}
// GetAPIServiceSpec returns api service spec info
func (s *specAggregator) GetAPIServiceInfo(apiServiceName string) (handler http.Handler, etag string, exists bool) {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
if info, existingService := s.openAPISpecs[apiServiceName]; existingService {
return info.handler, info.etag, true
}
return nil, "", false
// decorateError creates a new cache that wraps a downloader
// cache the name of the apiservice to help with debugging.
func decorateError(name string, cache cached.Data[*spec.Swagger]) cached.Data[*spec.Swagger] {
return cached.NewTransformer(func(result cached.Result[*spec.Swagger]) cached.Result[*spec.Swagger] {
if result.Err != nil {
return cached.NewResultErr[*spec.Swagger](fmt.Errorf("failed to download %v: %v", name, result.Err))
}
return result
}, cache)
}

View File

@ -0,0 +1,372 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregator
import (
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"
"bytes"
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
"k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/validation/spec"
)
func TestBasicPathsMerged(t *testing.T) {
mux := http.NewServeMux()
delegationHandlers := []http.Handler{
&openAPIHandler{
openapi: &spec.Swagger{
SwaggerProps: spec.SwaggerProps{
Paths: &spec.Paths{
Paths: map[string]spec.PathItem{
"/apis/foo/v1": {},
},
},
},
},
},
}
buildAndRegisterSpecAggregator(delegationHandlers, mux)
swagger, err := fetchOpenAPI(mux)
if err != nil {
t.Error(err)
}
expectPath(t, swagger, "/apis/foo/v1")
expectPath(t, swagger, "/apis/apiregistration.k8s.io/v1")
}
func TestAddUpdateAPIService(t *testing.T) {
mux := http.NewServeMux()
var delegationHandlers []http.Handler
delegate1 := &openAPIHandler{openapi: &spec.Swagger{
SwaggerProps: spec.SwaggerProps{
Paths: &spec.Paths{
Paths: map[string]spec.PathItem{
"/apis/foo/v1": {},
},
},
},
}}
delegationHandlers = append(delegationHandlers, delegate1)
s := buildAndRegisterSpecAggregator(delegationHandlers, mux)
apiService := &v1.APIService{
Spec: v1.APIServiceSpec{
Service: &v1.ServiceReference{Name: "dummy"},
},
}
apiService.Name = "apiservice"
handler := &openAPIHandler{openapi: &spec.Swagger{
SwaggerProps: spec.SwaggerProps{
Paths: &spec.Paths{
Paths: map[string]spec.PathItem{
"/apis/apiservicegroup/v1": {},
},
},
},
}}
if err := s.AddUpdateAPIService(apiService, handler); err != nil {
t.Error(err)
}
swagger, err := fetchOpenAPI(mux)
if err != nil {
t.Error(err)
}
expectPath(t, swagger, "/apis/apiservicegroup/v1")
expectPath(t, swagger, "/apis/apiregistration.k8s.io/v1")
t.Log("Update APIService OpenAPI")
handler.openapi = &spec.Swagger{
SwaggerProps: spec.SwaggerProps{
Paths: &spec.Paths{
Paths: map[string]spec.PathItem{
"/apis/apiservicegroup/v2": {},
},
},
},
}
s.UpdateAPIServiceSpec(apiService.Name)
swagger, err = fetchOpenAPI(mux)
if err != nil {
t.Error(err)
}
// Ensure that the if the APIService OpenAPI is updated, the
// aggregated OpenAPI is also updated.
expectPath(t, swagger, "/apis/apiservicegroup/v2")
expectNoPath(t, swagger, "/apis/apiservicegroup/v1")
expectPath(t, swagger, "/apis/apiregistration.k8s.io/v1")
}
func TestAddRemoveAPIService(t *testing.T) {
mux := http.NewServeMux()
var delegationHandlers []http.Handler
delegate1 := &openAPIHandler{openapi: &spec.Swagger{
SwaggerProps: spec.SwaggerProps{
Paths: &spec.Paths{
Paths: map[string]spec.PathItem{
"/apis/foo/v1": {},
},
},
},
}}
delegationHandlers = append(delegationHandlers, delegate1)
s := buildAndRegisterSpecAggregator(delegationHandlers, mux)
apiService := &v1.APIService{
Spec: v1.APIServiceSpec{
Service: &v1.ServiceReference{Name: "dummy"},
},
}
apiService.Name = "apiservice"
handler := &openAPIHandler{openapi: &spec.Swagger{
SwaggerProps: spec.SwaggerProps{
Paths: &spec.Paths{
Paths: map[string]spec.PathItem{
"/apis/apiservicegroup/v1": {},
},
},
},
}}
if err := s.AddUpdateAPIService(apiService, handler); err != nil {
t.Error(err)
}
swagger, err := fetchOpenAPI(mux)
if err != nil {
t.Error(err)
}
expectPath(t, swagger, "/apis/apiservicegroup/v1")
expectPath(t, swagger, "/apis/apiregistration.k8s.io/v1")
t.Logf("Remove APIService %s", apiService.Name)
s.RemoveAPIService(apiService.Name)
swagger, err = fetchOpenAPI(mux)
if err != nil {
t.Error(err)
}
// Ensure that the if the APIService is added then removed, the OpenAPI disappears from the aggregated OpenAPI as well.
expectNoPath(t, swagger, "/apis/apiservicegroup/v1")
expectPath(t, swagger, "/apis/apiregistration.k8s.io/v1")
}
func TestFailingAPIServiceSkippedAggregation(t *testing.T) {
mux := http.NewServeMux()
var delegationHandlers []http.Handler
delegate1 := &openAPIHandler{openapi: &spec.Swagger{
SwaggerProps: spec.SwaggerProps{
Paths: &spec.Paths{
Paths: map[string]spec.PathItem{
"/apis/foo/v1": {},
},
},
},
}}
delegationHandlers = append(delegationHandlers, delegate1)
s := buildAndRegisterSpecAggregator(delegationHandlers, mux)
apiServiceFailed := &v1.APIService{
Spec: v1.APIServiceSpec{
Service: &v1.ServiceReference{Name: "dummy"},
},
}
apiServiceFailed.Name = "apiserviceFailed"
handlerFailed := &openAPIHandler{
returnErr: true,
openapi: &spec.Swagger{
SwaggerProps: spec.SwaggerProps{
Paths: &spec.Paths{
Paths: map[string]spec.PathItem{
"/apis/failed/v1": {},
},
},
},
},
}
apiServiceSuccess := &v1.APIService{
Spec: v1.APIServiceSpec{
Service: &v1.ServiceReference{Name: "dummy2"},
},
}
apiServiceSuccess.Name = "apiserviceSuccess"
handlerSuccess := &openAPIHandler{
openapi: &spec.Swagger{
SwaggerProps: spec.SwaggerProps{
Paths: &spec.Paths{
Paths: map[string]spec.PathItem{
"/apis/success/v1": {},
},
},
},
},
}
s.AddUpdateAPIService(apiServiceFailed, handlerFailed)
s.AddUpdateAPIService(apiServiceSuccess, handlerSuccess)
swagger, err := fetchOpenAPI(mux)
if err != nil {
t.Error(err)
}
expectPath(t, swagger, "/apis/foo/v1")
expectNoPath(t, swagger, "/apis/failed/v1")
expectPath(t, swagger, "/apis/success/v1")
}
func TestAPIServiceFailSuccessTransition(t *testing.T) {
mux := http.NewServeMux()
var delegationHandlers []http.Handler
delegate1 := &openAPIHandler{openapi: &spec.Swagger{
SwaggerProps: spec.SwaggerProps{
Paths: &spec.Paths{
Paths: map[string]spec.PathItem{
"/apis/foo/v1": {},
},
},
},
}}
delegationHandlers = append(delegationHandlers, delegate1)
s := buildAndRegisterSpecAggregator(delegationHandlers, mux)
apiService := &v1.APIService{
Spec: v1.APIServiceSpec{
Service: &v1.ServiceReference{Name: "dummy"},
},
}
apiService.Name = "apiservice"
handler := &openAPIHandler{
returnErr: true,
openapi: &spec.Swagger{
SwaggerProps: spec.SwaggerProps{
Paths: &spec.Paths{
Paths: map[string]spec.PathItem{
"/apis/apiservicegroup/v1": {},
},
},
},
},
}
s.AddUpdateAPIService(apiService, handler)
swagger, err := fetchOpenAPI(mux)
if err != nil {
t.Error(err)
}
expectPath(t, swagger, "/apis/foo/v1")
expectNoPath(t, swagger, "/apis/apiservicegroup/v1")
t.Log("Transition APIService to not return error")
handler.returnErr = false
err = s.UpdateAPIServiceSpec(apiService.Name)
if err != nil {
t.Error(err)
}
swagger, err = fetchOpenAPI(mux)
if err != nil {
t.Error(err)
}
expectPath(t, swagger, "/apis/foo/v1")
expectPath(t, swagger, "/apis/apiservicegroup/v1")
}
type openAPIHandler struct {
openapi *spec.Swagger
returnErr bool
}
func (o *openAPIHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if o.returnErr {
w.WriteHeader(500)
return
}
data, err := json.Marshal(o.openapi)
if err != nil {
panic(err)
}
http.ServeContent(w, r, "/openapi/v2", time.Now(), bytes.NewReader(data))
return
}
func fetchOpenAPI(mux *http.ServeMux) (*spec.Swagger, error) {
server := httptest.NewServer(mux)
defer server.Close()
client := server.Client()
req, err := http.NewRequest("GET", server.URL+"/openapi/v2", nil)
if err != nil {
return nil, err
}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
body, err := io.ReadAll(resp.Body)
swagger := &spec.Swagger{}
if err := swagger.UnmarshalJSON(body); err != nil {
return nil, err
}
return swagger, err
}
func buildAndRegisterSpecAggregator(delegationHandlers []http.Handler, mux common.PathHandler) *specAggregator {
downloader := NewDownloader()
aggregatorSpec := &spec.Swagger{
SwaggerProps: spec.SwaggerProps{
Paths: &spec.Paths{
Paths: map[string]spec.PathItem{
"/apis/apiregistration.k8s.io/v1": {},
},
},
},
}
s := buildAndRegisterSpecAggregatorForLocalServices(&downloader, aggregatorSpec, delegationHandlers, mux)
return s
}
func expectPath(t *testing.T, swagger *spec.Swagger, path string) {
if _, ok := swagger.Paths.Paths[path]; !ok {
t.Errorf("Expected path %s to exist in aggregated paths", path)
}
}
func expectNoPath(t *testing.T, swagger *spec.Swagger, path string) {
if _, ok := swagger.Paths.Paths[path]; ok {
t.Errorf("Expected path %s to be omitted in aggregated paths", path)
}
}

View File

@ -24,9 +24,50 @@ import (
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/kube-openapi/pkg/cached"
"k8s.io/kube-openapi/pkg/validation/spec"
)
// cacheableDownloader is a downloader that will always return the data
// and the etag.
type cacheableDownloader struct {
downloader *Downloader
handler http.Handler
etag string
spec *spec.Swagger
}
// Creates a downloader that also returns the etag, making it useful to use as a cached dependency.
func NewCacheableDownloader(downloader *Downloader, handler http.Handler) cached.Data[*spec.Swagger] {
return &cacheableDownloader{
downloader: downloader,
handler: handler,
}
}
func (d *cacheableDownloader) Get() cached.Result[*spec.Swagger] {
swagger, etag, status, err := d.downloader.Download(d.handler, d.etag)
if err != nil {
return cached.NewResultErr[*spec.Swagger](err)
}
switch status {
case http.StatusNotModified:
// Nothing has changed, do nothing.
case http.StatusOK:
if swagger != nil {
d.etag = etag
d.spec = swagger
break
}
fallthrough
case http.StatusNotFound:
return cached.NewResultErr[*spec.Swagger](ErrAPIServiceNotFound)
default:
return cached.NewResultErr[*spec.Swagger](fmt.Errorf("invalid status code: %v", status))
}
return cached.NewResultOK(d.spec, d.etag)
}
// Downloader is the OpenAPI downloader type. It will try to download spec from /openapi/v2 or /swagger.json endpoint.
type Downloader struct {
}

View File

@ -18,56 +18,60 @@ package aggregator
import (
"sort"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
)
// byPriority can be used in sort.Sort to sort specs with their priorities.
type byPriority struct {
specs []openAPISpecInfo
apiServices []*apiregistrationv1.APIService
groupPriorities map[string]int32
}
func (a byPriority) Len() int { return len(a.specs) }
func (a byPriority) Swap(i, j int) { a.specs[i], a.specs[j] = a.specs[j], a.specs[i] }
func (a byPriority) Len() int { return len(a.apiServices) }
func (a byPriority) Swap(i, j int) {
a.apiServices[i], a.apiServices[j] = a.apiServices[j], a.apiServices[i]
}
func (a byPriority) Less(i, j int) bool {
// All local specs will come first
if a.specs[i].apiService.Spec.Service == nil && a.specs[j].apiService.Spec.Service != nil {
if a.apiServices[i].Spec.Service == nil && a.apiServices[j].Spec.Service != nil {
return true
}
if a.specs[i].apiService.Spec.Service != nil && a.specs[j].apiService.Spec.Service == nil {
if a.apiServices[i].Spec.Service != nil && a.apiServices[j].Spec.Service == nil {
return false
}
// WARNING: This will result in not following priorities for local APIServices.
if a.specs[i].apiService.Spec.Service == nil {
if a.apiServices[i].Spec.Service == nil {
// Sort local specs with their name. This is the order in the delegation chain (aggregator first).
return a.specs[i].apiService.Name < a.specs[j].apiService.Name
return a.apiServices[i].Name < a.apiServices[j].Name
}
var iPriority, jPriority int32
if a.specs[i].apiService.Spec.Group == a.specs[j].apiService.Spec.Group {
iPriority = a.specs[i].apiService.Spec.VersionPriority
jPriority = a.specs[i].apiService.Spec.VersionPriority
if a.apiServices[i].Spec.Group == a.apiServices[j].Spec.Group {
iPriority = a.apiServices[i].Spec.VersionPriority
jPriority = a.apiServices[i].Spec.VersionPriority
} else {
iPriority = a.groupPriorities[a.specs[i].apiService.Spec.Group]
jPriority = a.groupPriorities[a.specs[j].apiService.Spec.Group]
iPriority = a.groupPriorities[a.apiServices[i].Spec.Group]
jPriority = a.groupPriorities[a.apiServices[j].Spec.Group]
}
if iPriority != jPriority {
// Sort by priority, higher first
return iPriority > jPriority
}
// Sort by service name.
return a.specs[i].apiService.Name < a.specs[j].apiService.Name
return a.apiServices[i].Name < a.apiServices[j].Name
}
func sortByPriority(specs []openAPISpecInfo) {
func sortByPriority(apiServices []*apiregistrationv1.APIService) {
b := byPriority{
specs: specs,
apiServices: apiServices,
groupPriorities: map[string]int32{},
}
for _, spec := range specs {
if spec.apiService.Spec.Service == nil {
for _, apiService := range apiServices {
if apiService.Spec.Service == nil {
continue
}
if pr, found := b.groupPriorities[spec.apiService.Spec.Group]; !found || spec.apiService.Spec.GroupPriorityMinimum > pr {
b.groupPriorities[spec.apiService.Spec.Group] = spec.apiService.Spec.GroupPriorityMinimum
if pr, found := b.groupPriorities[apiService.Spec.Group]; !found || apiService.Spec.GroupPriorityMinimum > pr {
b.groupPriorities[apiService.Spec.Group] = apiService.Spec.GroupPriorityMinimum
}
}
sort.Sort(b)

View File

@ -21,23 +21,22 @@ import (
"testing"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
"k8s.io/kube-openapi/pkg/validation/spec"
)
func newAPIServiceForTest(name, group string, minGroupPriority, versionPriority int32, svc *apiregistrationv1.ServiceReference) apiregistrationv1.APIService {
func newAPIServiceForTest(name, group string, minGroupPriority, versionPriority int32, svc *apiregistrationv1.ServiceReference) *apiregistrationv1.APIService {
r := apiregistrationv1.APIService{}
r.Spec.Group = group
r.Spec.GroupPriorityMinimum = minGroupPriority
r.Spec.VersionPriority = versionPriority
r.Spec.Service = svc
r.Name = name
return r
return &r
}
func assertSortedServices(t *testing.T, actual []openAPISpecInfo, expectedNames []string) {
func assertSortedServices(t *testing.T, actual []*apiregistrationv1.APIService, expectedNames []string) {
actualNames := []string{}
for _, a := range actual {
actualNames = append(actualNames, a.apiService.Name)
actualNames = append(actualNames, a.Name)
}
if !reflect.DeepEqual(actualNames, expectedNames) {
t.Errorf("Expected %s got %s.", expectedNames, actualNames)
@ -45,32 +44,14 @@ func assertSortedServices(t *testing.T, actual []openAPISpecInfo, expectedNames
}
func TestAPIServiceSort(t *testing.T) {
list := []openAPISpecInfo{
{
apiService: newAPIServiceForTest("FirstService", "Group1", 10, 5, &apiregistrationv1.ServiceReference{}),
spec: &spec.Swagger{},
},
{
apiService: newAPIServiceForTest("SecondService", "Group2", 15, 3, &apiregistrationv1.ServiceReference{}),
spec: &spec.Swagger{},
},
{
apiService: newAPIServiceForTest("FirstServiceInternal", "Group1", 16, 3, &apiregistrationv1.ServiceReference{}),
spec: &spec.Swagger{},
},
{
apiService: newAPIServiceForTest("ThirdService", "Group3", 15, 3, &apiregistrationv1.ServiceReference{}),
spec: &spec.Swagger{},
},
{
apiService: newAPIServiceForTest("local_service_1", "Group4", 15, 1, nil),
},
{
apiService: newAPIServiceForTest("local_service_3", "Group5", 15, 2, nil),
},
{
apiService: newAPIServiceForTest("local_service_2", "Group6", 15, 3, nil),
},
list := []*apiregistrationv1.APIService{
newAPIServiceForTest("FirstService", "Group1", 10, 5, &apiregistrationv1.ServiceReference{}),
newAPIServiceForTest("SecondService", "Group2", 15, 3, &apiregistrationv1.ServiceReference{}),
newAPIServiceForTest("FirstServiceInternal", "Group1", 16, 3, &apiregistrationv1.ServiceReference{}),
newAPIServiceForTest("ThirdService", "Group3", 15, 3, &apiregistrationv1.ServiceReference{}),
newAPIServiceForTest("local_service_1", "Group4", 15, 1, nil),
newAPIServiceForTest("local_service_3", "Group5", 15, 2, nil),
newAPIServiceForTest("local_service_2", "Group6", 15, 3, nil),
}
sortByPriority(list)
assertSortedServices(t, list, []string{"local_service_1", "local_service_2", "local_service_3", "FirstService", "FirstServiceInternal", "SecondService", "ThirdService"})

View File

@ -25,7 +25,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
"k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator"
)
@ -67,11 +67,6 @@ func NewAggregationController(downloader *aggregator.Downloader, openAPIAggregat
c.syncHandler = c.sync
// update each service at least once, also those which are not coming from APIServices, namely local services
for _, name := range openAPIAggregationManager.GetAPIServiceNames() {
c.queue.AddAfter(name, time.Second)
}
return c
}
@ -100,55 +95,31 @@ func (c *AggregationController) processNextWorkItem() bool {
if quit {
return false
}
if aggregator.IsLocalAPIService(key.(string)) {
// for local delegation targets that are aggregated once per second, log at
// higher level to avoid flooding the log
klog.V(6).Infof("OpenAPI AggregationController: Processing item %s", key)
} else {
klog.V(4).Infof("OpenAPI AggregationController: Processing item %s", key)
}
klog.V(4).Infof("OpenAPI AggregationController: Processing item %s", key)
action, err := c.syncHandler(key.(string))
if err == nil {
c.queue.Forget(key)
} else {
if err != nil {
utilruntime.HandleError(fmt.Errorf("loading OpenAPI spec for %q failed with: %v", key, err))
}
switch action {
case syncRequeue:
if aggregator.IsLocalAPIService(key.(string)) {
klog.V(7).Infof("OpenAPI AggregationController: action for local item %s: Requeue after %s.", key, successfulUpdateDelayLocal)
c.queue.AddAfter(key, successfulUpdateDelayLocal)
} else {
klog.V(7).Infof("OpenAPI AggregationController: action for item %s: Requeue.", key)
c.queue.AddAfter(key, successfulUpdateDelay)
}
c.queue.AddAfter(key, successfulUpdateDelay)
case syncRequeueRateLimited:
klog.Infof("OpenAPI AggregationController: action for item %s: Rate Limited Requeue.", key)
c.queue.AddRateLimited(key)
case syncNothing:
klog.Infof("OpenAPI AggregationController: action for item %s: Nothing (removed from the queue).", key)
c.queue.Forget(key)
}
return true
}
func (c *AggregationController) sync(key string) (syncAction, error) {
handler, etag, exists := c.openAPIAggregationManager.GetAPIServiceInfo(key)
if !exists || handler == nil {
return syncNothing, nil
}
returnSpec, newEtag, httpStatus, err := c.downloader.Download(handler, etag)
switch {
case err != nil:
return syncRequeueRateLimited, err
case httpStatus == http.StatusNotModified:
case httpStatus == http.StatusNotFound || returnSpec == nil:
return syncRequeueRateLimited, fmt.Errorf("OpenAPI spec does not exist")
case httpStatus == http.StatusOK:
if err := c.openAPIAggregationManager.UpdateAPIServiceSpec(key, returnSpec, newEtag); err != nil {
if err := c.openAPIAggregationManager.UpdateAPIServiceSpec(key); err != nil {
if err == aggregator.ErrAPIServiceNotFound {
return syncNothing, nil
} else {
return syncRequeueRateLimited, err
}
}
@ -160,7 +131,7 @@ func (c *AggregationController) AddAPIService(handler http.Handler, apiService *
if apiService.Spec.Service == nil {
return
}
if err := c.openAPIAggregationManager.AddUpdateAPIService(handler, apiService); err != nil {
if err := c.openAPIAggregationManager.AddUpdateAPIService(apiService, handler); err != nil {
utilruntime.HandleError(fmt.Errorf("adding %q to AggregationController failed with: %v", apiService.Name, err))
}
c.queue.AddAfter(apiService.Name, time.Second)
@ -168,11 +139,8 @@ func (c *AggregationController) AddAPIService(handler http.Handler, apiService *
// UpdateAPIService updates API Service's info and handler.
func (c *AggregationController) UpdateAPIService(handler http.Handler, apiService *v1.APIService) {
if apiService.Spec.Service == nil {
return
}
if err := c.openAPIAggregationManager.AddUpdateAPIService(handler, apiService); err != nil {
utilruntime.HandleError(fmt.Errorf("updating %q to AggregationController failed with: %v", apiService.Name, err))
if err := c.openAPIAggregationManager.AddUpdateAPIService(apiService, handler); err != nil {
utilruntime.HandleError(fmt.Errorf("Error updating APIService %q with err: %v", apiService.Name, err))
}
key := apiService.Name
if c.queue.NumRequeues(key) > 0 {
@ -187,7 +155,7 @@ func (c *AggregationController) UpdateAPIService(handler http.Handler, apiServic
// RemoveAPIService removes API Service from OpenAPI Aggregation Controller.
func (c *AggregationController) RemoveAPIService(apiServiceName string) {
if err := c.openAPIAggregationManager.RemoveAPIServiceSpec(apiServiceName); err != nil {
if err := c.openAPIAggregationManager.RemoveAPIService(apiServiceName); err != nil {
utilruntime.HandleError(fmt.Errorf("removing %q from AggregationController failed with: %v", apiServiceName, err))
}
// This will only remove it if it was failing before. If it was successful, processNextWorkItem will figure it out