From ea23e13463f48b788b9c9d0ef0d88a4ee77deda7 Mon Sep 17 00:00:00 2001 From: Jefftree Date: Tue, 23 May 2023 21:39:11 +0000 Subject: [PATCH] Update OpenAPI Aggregator --- .../openapi/aggregator/aggregator.go | 425 +++++++----------- .../openapi/aggregator/aggregator_test.go | 372 +++++++++++++++ .../openapi/aggregator/downloader.go | 41 ++ .../openapi/aggregator/priority.go | 42 +- .../openapi/aggregator/priority_test.go | 43 +- .../pkg/controllers/openapi/controller.go | 58 +-- 6 files changed, 628 insertions(+), 353 deletions(-) create mode 100644 staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator_test.go diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go index 6764edd328d..22a312eff01 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go @@ -17,331 +17,240 @@ limitations under the License. package aggregator import ( + "crypto/sha256" + "errors" "fmt" "net/http" - "strings" "sync" "time" restful "github.com/emicklei/go-restful/v3" - "k8s.io/klog/v2" - "k8s.io/apiserver/pkg/server" + "k8s.io/klog/v2" v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" "k8s.io/kube-openapi/pkg/aggregator" "k8s.io/kube-openapi/pkg/builder" + "k8s.io/kube-openapi/pkg/cached" "k8s.io/kube-openapi/pkg/common" "k8s.io/kube-openapi/pkg/common/restfuladapter" "k8s.io/kube-openapi/pkg/handler" "k8s.io/kube-openapi/pkg/validation/spec" ) +var ErrAPIServiceNotFound = errors.New("resource not found") + // SpecAggregator calls out to http handlers of APIServices and merges specs. It keeps state of the last // known specs including the http etag. type SpecAggregator interface { - AddUpdateAPIService(handler http.Handler, apiService *v1.APIService) error - UpdateAPIServiceSpec(apiServiceName string, spec *spec.Swagger, etag string) error - RemoveAPIServiceSpec(apiServiceName string) error - GetAPIServiceInfo(apiServiceName string) (handler http.Handler, etag string, exists bool) - GetAPIServiceNames() []string + AddUpdateAPIService(apiService *v1.APIService, handler http.Handler) error + UpdateAPIServiceSpec(apiServiceName string) error + RemoveAPIService(apiServiceName string) error } const ( aggregatorUser = "system:aggregator" - specDownloadTimeout = 60 * time.Second - localDelegateChainNamePrefix = "k8s_internal_local_delegation_chain_" - localDelegateChainNamePattern = localDelegateChainNamePrefix + "%010d" + specDownloadTimeout = time.Minute + localDelegateChainNamePattern = "k8s_internal_local_delegation_chain_%010d" // A randomly generated UUID to differentiate local and remote eTags. locallyGeneratedEtagPrefix = "\"6E8F849B434D4B98A569B9D7718876E9-" ) -// IsLocalAPIService returns true for local specs from delegates. -func IsLocalAPIService(apiServiceName string) bool { - return strings.HasPrefix(apiServiceName, localDelegateChainNamePrefix) +// openAPISpecInfo is used to store OpenAPI specs. +// The apiService object is used to sort specs with their priorities. +type openAPISpecInfo struct { + apiService v1.APIService + // spec is the cached OpenAPI spec + spec cached.Replaceable[*spec.Swagger] + + // The downloader is used only for non-local apiservices to + // re-update the spec every so often. + downloader cached.Data[*spec.Swagger] } -// GetAPIServiceNames returns the names of APIServices recorded in specAggregator.openAPISpecs. -// We use this function to pass the names of local APIServices to the controller in this package, -// so that the controller can periodically sync the OpenAPI spec from delegation API servers. -func (s *specAggregator) GetAPIServiceNames() []string { - names := make([]string, 0, len(s.openAPISpecs)) - for key := range s.openAPISpecs { - names = append(names, key) +type specAggregator struct { + // mutex protects the specsByAPIServiceName map and its contents. + mutex sync.Mutex + + // Map of API Services' OpenAPI specs by their name + specsByAPIServiceName map[string]*openAPISpecInfo + + // provided for dynamic OpenAPI spec + openAPIVersionedService *handler.OpenAPIService + + downloader *Downloader +} + +func buildAndRegisterSpecAggregatorForLocalServices(downloader *Downloader, aggregatorSpec *spec.Swagger, delegationHandlers []http.Handler, pathHandler common.PathHandler) *specAggregator { + s := &specAggregator{ + downloader: downloader, + specsByAPIServiceName: map[string]*openAPISpecInfo{}, } - return names + cachedAggregatorSpec := cached.NewResultOK(aggregatorSpec, "never-changes") + s.addLocalSpec(fmt.Sprintf(localDelegateChainNamePattern, 0), cachedAggregatorSpec) + for i, handler := range delegationHandlers { + name := fmt.Sprintf(localDelegateChainNamePattern, i+1) + + spec := NewCacheableDownloader(downloader, handler) + spec = decorateError(name, spec) + s.addLocalSpec(name, spec) + } + + s.openAPIVersionedService = handler.NewOpenAPIServiceLazy(s.buildMergeSpecLocked()) + s.openAPIVersionedService.RegisterOpenAPIVersionedService("/openapi/v2", pathHandler) + return s } // BuildAndRegisterAggregator registered OpenAPI aggregator handler. This function is not thread safe as it only being called on startup. func BuildAndRegisterAggregator(downloader *Downloader, delegationTarget server.DelegationTarget, webServices []*restful.WebService, config *common.Config, pathHandler common.PathHandler) (SpecAggregator, error) { - s := &specAggregator{ - openAPISpecs: map[string]*openAPISpecInfo{}, - } - i := 0 - // Build Aggregator's spec aggregatorOpenAPISpec, err := builder.BuildOpenAPISpecFromRoutes(restfuladapter.AdaptWebServices(webServices), config) if err != nil { return nil, err } + aggregatorOpenAPISpec.Definitions = handler.PruneDefaults(aggregatorOpenAPISpec.Definitions) + + var delegationHandlers []http.Handler - // Reserving non-name spec for aggregator's Spec. - s.addLocalSpec(aggregatorOpenAPISpec, nil, fmt.Sprintf(localDelegateChainNamePattern, i), "") - i++ for delegate := delegationTarget; delegate != nil; delegate = delegate.NextDelegate() { handler := delegate.UnprotectedHandler() if handler == nil { continue } - delegateSpec, etag, _, err := downloader.Download(handler, "") - if err != nil { - // ignore errors for the empty delegate we attach at the end the chain - // atm the empty delegate returns 503 when the server hasn't been fully initialized - // and the spec downloader only silences 404s - if len(delegate.ListedPaths()) == 0 && delegate.NextDelegate() == nil { - continue - } - return nil, err - } - if delegateSpec == nil { + // ignore errors for the empty delegate we attach at the end the chain + // atm the empty delegate returns 503 when the server hasn't been fully initialized + // and the spec downloader only silences 404s + if len(delegate.ListedPaths()) == 0 && delegate.NextDelegate() == nil { continue } - s.addLocalSpec(delegateSpec, handler, fmt.Sprintf(localDelegateChainNamePattern, i), etag) - i++ + delegationHandlers = append(delegationHandlers, handler) } - - // Build initial spec to serve. - klog.V(2).Infof("Building initial OpenAPI spec") - defer func(start time.Time) { - duration := time.Since(start) - klog.V(2).Infof("Finished initial OpenAPI spec generation after %v", duration) - - regenerationCounter.With(map[string]string{"apiservice": "*", "reason": "startup"}) - regenerationDurationGauge.With(map[string]string{"reason": "startup"}).Set(duration.Seconds()) - }(time.Now()) - specToServe, err := s.buildOpenAPISpec() - if err != nil { - return nil, err - } - - // Install handler - s.openAPIVersionedService = handler.NewOpenAPIService(specToServe) - s.openAPIVersionedService.RegisterOpenAPIVersionedService("/openapi/v2", pathHandler) - + s := buildAndRegisterSpecAggregatorForLocalServices(downloader, aggregatorOpenAPISpec, delegationHandlers, pathHandler) return s, nil } -type specAggregator struct { - // mutex protects all members of this struct. - rwMutex sync.RWMutex - - // Map of API Services' OpenAPI specs by their name - openAPISpecs map[string]*openAPISpecInfo - - // provided for dynamic OpenAPI spec - openAPIVersionedService *handler.OpenAPIService -} - -var _ SpecAggregator = &specAggregator{} - -// This function is not thread safe as it only being called on startup. -func (s *specAggregator) addLocalSpec(spec *spec.Swagger, localHandler http.Handler, name, etag string) { - localAPIService := v1.APIService{} - localAPIService.Name = name - s.openAPISpecs[name] = &openAPISpecInfo{ - etag: etag, - apiService: localAPIService, - handler: localHandler, - spec: spec, +func (s *specAggregator) addLocalSpec(name string, spec cached.Data[*spec.Swagger]) { + service := v1.APIService{} + service.Name = name + info := &openAPISpecInfo{ + apiService: service, } + info.spec.Replace(spec) + s.specsByAPIServiceName[name] = info } -// openAPISpecInfo is used to store OpenAPI spec with its priority. -// It can be used to sort specs with their priorities. -type openAPISpecInfo struct { - apiService v1.APIService +// buildMergeSpecLocked creates a new cached mergeSpec from the list of cached specs. +func (s *specAggregator) buildMergeSpecLocked() cached.Data[*spec.Swagger] { + apiServices := make([]*v1.APIService, 0, len(s.specsByAPIServiceName)) + for k := range s.specsByAPIServiceName { + apiServices = append(apiServices, &s.specsByAPIServiceName[k].apiService) + } + sortByPriority(apiServices) + caches := make([]cached.Data[*spec.Swagger], len(apiServices)) + for i, apiService := range apiServices { + caches[i] = &(s.specsByAPIServiceName[apiService.Name].spec) + } - // Specification of this API Service. If null then the spec is not loaded yet. - spec *spec.Swagger - handler http.Handler - etag string -} - -// buildOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe. The caller is responsible to hold proper locks. -func (s *specAggregator) buildOpenAPISpec() (specToReturn *spec.Swagger, err error) { - specs := []openAPISpecInfo{} - for _, specInfo := range s.openAPISpecs { - if specInfo.spec == nil { - continue + return cached.NewListMerger(func(results []cached.Result[*spec.Swagger]) cached.Result[*spec.Swagger] { + var merged *spec.Swagger + etags := make([]string, 0, len(results)) + for _, specInfo := range results { + result := specInfo.Get() + if result.Err != nil { + // APIService name and err message will be included in + // the error message as part of decorateError + klog.Warning(result.Err) + continue + } + if merged == nil { + merged = &spec.Swagger{} + *merged = *result.Data + // Paths, Definitions and parameters are set by + // MergeSpecsIgnorePathConflictRenamingDefinitionsAndParameters + merged.Paths = nil + merged.Definitions = nil + merged.Parameters = nil + } + etags = append(etags, result.Etag) + if err := aggregator.MergeSpecsIgnorePathConflictRenamingDefinitionsAndParameters(merged, result.Data); err != nil { + return cached.NewResultErr[*spec.Swagger](fmt.Errorf("failed to build merge specs: %v", err)) + } } - // Copy the spec before removing the defaults. - localSpec := *specInfo.spec - localSpecInfo := *specInfo - localSpecInfo.spec = &localSpec - localSpecInfo.spec.Definitions = handler.PruneDefaults(specInfo.spec.Definitions) - specs = append(specs, localSpecInfo) - } - if len(specs) == 0 { - return &spec.Swagger{}, nil - } - sortByPriority(specs) - for _, specInfo := range specs { - if specToReturn == nil { - specToReturn = &spec.Swagger{} - *specToReturn = *specInfo.spec - // Paths, Definitions and parameters are set by MergeSpecsIgnorePathConflict - specToReturn.Paths = nil - specToReturn.Definitions = nil - specToReturn.Parameters = nil - } - if err := aggregator.MergeSpecsIgnorePathConflictRenamingDefinitionsAndParameters(specToReturn, specInfo.spec); err != nil { - return nil, err - } - } - - return specToReturn, nil + // Printing the etags list is stable because it is sorted. + return cached.NewResultOK(merged, fmt.Sprintf("%x", sha256.Sum256([]byte(fmt.Sprintf("%#v", etags))))) + }, caches) } -// updateOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe. The caller is responsible to hold proper locks. -func (s *specAggregator) updateOpenAPISpec() error { - if s.openAPIVersionedService == nil { - return nil - } - specToServe, err := s.buildOpenAPISpec() - if err != nil { - return err - } - return s.openAPIVersionedService.UpdateSpec(specToServe) -} - -// tryUpdatingServiceSpecs tries updating openAPISpecs map with specified specInfo, and keeps the map intact -// if the update fails. -func (s *specAggregator) tryUpdatingServiceSpecs(specInfo *openAPISpecInfo) error { - if specInfo == nil { - return fmt.Errorf("invalid input: specInfo must be non-nil") - } - _, updated := s.openAPISpecs[specInfo.apiService.Name] - origSpecInfo, existedBefore := s.openAPISpecs[specInfo.apiService.Name] - s.openAPISpecs[specInfo.apiService.Name] = specInfo - - // Skip aggregation if OpenAPI spec didn't change - if existedBefore && origSpecInfo != nil && origSpecInfo.etag == specInfo.etag { - return nil - } - klog.V(2).Infof("Updating OpenAPI spec because %s is updated", specInfo.apiService.Name) - defer func(start time.Time) { - duration := time.Since(start) - klog.V(2).Infof("Finished OpenAPI spec generation after %v", duration) - - reason := "add" - if updated { - reason = "update" - } - - regenerationCounter.With(map[string]string{"apiservice": specInfo.apiService.Name, "reason": reason}) - regenerationDurationGauge.With(map[string]string{"reason": reason}).Set(duration.Seconds()) - }(time.Now()) - if err := s.updateOpenAPISpec(); err != nil { - if existedBefore { - s.openAPISpecs[specInfo.apiService.Name] = origSpecInfo - } else { - delete(s.openAPISpecs, specInfo.apiService.Name) - } - return err - } - return nil -} - -// tryDeleteServiceSpecs tries delete specified specInfo from openAPISpecs map, and keeps the map intact -// if the update fails. -func (s *specAggregator) tryDeleteServiceSpecs(apiServiceName string) error { - orgSpecInfo, exists := s.openAPISpecs[apiServiceName] +// updateServiceLocked updates the spec cache by downloading the latest +// version of the spec. +func (s *specAggregator) updateServiceLocked(name string) error { + specInfo, exists := s.specsByAPIServiceName[name] if !exists { + return ErrAPIServiceNotFound + } + result := specInfo.downloader.Get() + filteredResult := cached.NewTransformer[*spec.Swagger](func(result cached.Result[*spec.Swagger]) cached.Result[*spec.Swagger] { + if result.Err != nil { + return result + } + return cached.NewResultOK(aggregator.FilterSpecByPathsWithoutSideEffects(result.Data, []string{"/apis/"}), result.Etag) + }, result) + specInfo.spec.Replace(filteredResult) + return result.Err +} + +// UpdateAPIServiceSpec updates the api service. It is thread safe. +func (s *specAggregator) UpdateAPIServiceSpec(apiServiceName string) error { + s.mutex.Lock() + defer s.mutex.Unlock() + return s.updateServiceLocked(apiServiceName) +} + +// AddUpdateAPIService adds the api service. It is thread safe. If the +// apiservice already exists, it will be updated. +func (s *specAggregator) AddUpdateAPIService(apiService *v1.APIService, handler http.Handler) error { + if apiService.Spec.Service == nil { return nil } - delete(s.openAPISpecs, apiServiceName) - klog.V(2).Infof("Updating OpenAPI spec because %s is removed", apiServiceName) - defer func(start time.Time) { - duration := time.Since(start) - klog.V(2).Infof("Finished OpenAPI spec generation after %v", duration) + s.mutex.Lock() + defer s.mutex.Unlock() - regenerationCounter.With(map[string]string{"apiservice": apiServiceName, "reason": "delete"}) - regenerationDurationGauge.With(map[string]string{"reason": "delete"}).Set(duration.Seconds()) - }(time.Now()) - if err := s.updateOpenAPISpec(); err != nil { - s.openAPISpecs[apiServiceName] = orgSpecInfo - return err + _, exists := s.specsByAPIServiceName[apiService.Name] + if !exists { + s.specsByAPIServiceName[apiService.Name] = &openAPISpecInfo{ + apiService: *apiService, + downloader: decorateError(apiService.Name, NewCacheableDownloader(s.downloader, handler)), + } + s.openAPIVersionedService.UpdateSpecLazy(s.buildMergeSpecLocked()) } + + return s.updateServiceLocked(apiService.Name) +} + +// RemoveAPIService removes an api service from OpenAPI aggregation. If it does not exist, no error is returned. +// It is thread safe. +func (s *specAggregator) RemoveAPIService(apiServiceName string) error { + s.mutex.Lock() + defer s.mutex.Unlock() + + if _, exists := s.specsByAPIServiceName[apiServiceName]; !exists { + return ErrAPIServiceNotFound + } + delete(s.specsByAPIServiceName, apiServiceName) + // Re-create the mergeSpec for the new list of apiservices + s.openAPIVersionedService.UpdateSpecLazy(s.buildMergeSpecLocked()) return nil } -// UpdateAPIServiceSpec updates the api service's OpenAPI spec. It is thread safe. -func (s *specAggregator) UpdateAPIServiceSpec(apiServiceName string, spec *spec.Swagger, etag string) error { - s.rwMutex.Lock() - defer s.rwMutex.Unlock() - - specInfo, existingService := s.openAPISpecs[apiServiceName] - if !existingService { - return fmt.Errorf("APIService %q does not exists", apiServiceName) - } - - // For APIServices (non-local) specs, only merge their /apis/ prefixed endpoint as it is the only paths - // proxy handler delegates. - if specInfo.apiService.Spec.Service != nil { - spec = aggregator.FilterSpecByPathsWithoutSideEffects(spec, []string{"/apis/"}) - } - - return s.tryUpdatingServiceSpecs(&openAPISpecInfo{ - apiService: specInfo.apiService, - spec: spec, - handler: specInfo.handler, - etag: etag, - }) -} - -// AddUpdateAPIService adds or updates the api service. It is thread safe. -func (s *specAggregator) AddUpdateAPIService(handler http.Handler, apiService *v1.APIService) error { - s.rwMutex.Lock() - defer s.rwMutex.Unlock() - - if apiService.Spec.Service == nil { - // All local specs should be already aggregated using local delegate chain - return nil - } - - newSpec := &openAPISpecInfo{ - apiService: *apiService, - handler: handler, - } - if specInfo, existingService := s.openAPISpecs[apiService.Name]; existingService { - newSpec.etag = specInfo.etag - newSpec.spec = specInfo.spec - } - return s.tryUpdatingServiceSpecs(newSpec) -} - -// RemoveAPIServiceSpec removes an api service from OpenAPI aggregation. If it does not exist, no error is returned. -// It is thread safe. -func (s *specAggregator) RemoveAPIServiceSpec(apiServiceName string) error { - s.rwMutex.Lock() - defer s.rwMutex.Unlock() - - if _, existingService := s.openAPISpecs[apiServiceName]; !existingService { - return nil - } - - return s.tryDeleteServiceSpecs(apiServiceName) -} - -// GetAPIServiceSpec returns api service spec info -func (s *specAggregator) GetAPIServiceInfo(apiServiceName string) (handler http.Handler, etag string, exists bool) { - s.rwMutex.RLock() - defer s.rwMutex.RUnlock() - - if info, existingService := s.openAPISpecs[apiServiceName]; existingService { - return info.handler, info.etag, true - } - return nil, "", false +// decorateError creates a new cache that wraps a downloader +// cache the name of the apiservice to help with debugging. +func decorateError(name string, cache cached.Data[*spec.Swagger]) cached.Data[*spec.Swagger] { + return cached.NewTransformer(func(result cached.Result[*spec.Swagger]) cached.Result[*spec.Swagger] { + if result.Err != nil { + return cached.NewResultErr[*spec.Swagger](fmt.Errorf("failed to download %v: %v", name, result.Err)) + } + return result + }, cache) } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator_test.go new file mode 100644 index 00000000000..c5ad4e1598f --- /dev/null +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator_test.go @@ -0,0 +1,372 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aggregator + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + "bytes" + v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" + "k8s.io/kube-openapi/pkg/common" + "k8s.io/kube-openapi/pkg/validation/spec" +) + +func TestBasicPathsMerged(t *testing.T) { + mux := http.NewServeMux() + delegationHandlers := []http.Handler{ + &openAPIHandler{ + openapi: &spec.Swagger{ + SwaggerProps: spec.SwaggerProps{ + Paths: &spec.Paths{ + Paths: map[string]spec.PathItem{ + "/apis/foo/v1": {}, + }, + }, + }, + }, + }, + } + buildAndRegisterSpecAggregator(delegationHandlers, mux) + + swagger, err := fetchOpenAPI(mux) + if err != nil { + t.Error(err) + } + expectPath(t, swagger, "/apis/foo/v1") + expectPath(t, swagger, "/apis/apiregistration.k8s.io/v1") +} + +func TestAddUpdateAPIService(t *testing.T) { + mux := http.NewServeMux() + var delegationHandlers []http.Handler + delegate1 := &openAPIHandler{openapi: &spec.Swagger{ + SwaggerProps: spec.SwaggerProps{ + Paths: &spec.Paths{ + Paths: map[string]spec.PathItem{ + "/apis/foo/v1": {}, + }, + }, + }, + }} + delegationHandlers = append(delegationHandlers, delegate1) + s := buildAndRegisterSpecAggregator(delegationHandlers, mux) + + apiService := &v1.APIService{ + Spec: v1.APIServiceSpec{ + Service: &v1.ServiceReference{Name: "dummy"}, + }, + } + apiService.Name = "apiservice" + + handler := &openAPIHandler{openapi: &spec.Swagger{ + SwaggerProps: spec.SwaggerProps{ + Paths: &spec.Paths{ + Paths: map[string]spec.PathItem{ + "/apis/apiservicegroup/v1": {}, + }, + }, + }, + }} + + if err := s.AddUpdateAPIService(apiService, handler); err != nil { + t.Error(err) + } + + swagger, err := fetchOpenAPI(mux) + if err != nil { + t.Error(err) + } + + expectPath(t, swagger, "/apis/apiservicegroup/v1") + expectPath(t, swagger, "/apis/apiregistration.k8s.io/v1") + + t.Log("Update APIService OpenAPI") + handler.openapi = &spec.Swagger{ + SwaggerProps: spec.SwaggerProps{ + Paths: &spec.Paths{ + Paths: map[string]spec.PathItem{ + "/apis/apiservicegroup/v2": {}, + }, + }, + }, + } + s.UpdateAPIServiceSpec(apiService.Name) + + swagger, err = fetchOpenAPI(mux) + if err != nil { + t.Error(err) + } + // Ensure that the if the APIService OpenAPI is updated, the + // aggregated OpenAPI is also updated. + expectPath(t, swagger, "/apis/apiservicegroup/v2") + expectNoPath(t, swagger, "/apis/apiservicegroup/v1") + expectPath(t, swagger, "/apis/apiregistration.k8s.io/v1") +} + +func TestAddRemoveAPIService(t *testing.T) { + mux := http.NewServeMux() + var delegationHandlers []http.Handler + delegate1 := &openAPIHandler{openapi: &spec.Swagger{ + SwaggerProps: spec.SwaggerProps{ + Paths: &spec.Paths{ + Paths: map[string]spec.PathItem{ + "/apis/foo/v1": {}, + }, + }, + }, + }} + delegationHandlers = append(delegationHandlers, delegate1) + + s := buildAndRegisterSpecAggregator(delegationHandlers, mux) + + apiService := &v1.APIService{ + Spec: v1.APIServiceSpec{ + Service: &v1.ServiceReference{Name: "dummy"}, + }, + } + apiService.Name = "apiservice" + + handler := &openAPIHandler{openapi: &spec.Swagger{ + SwaggerProps: spec.SwaggerProps{ + Paths: &spec.Paths{ + Paths: map[string]spec.PathItem{ + "/apis/apiservicegroup/v1": {}, + }, + }, + }, + }} + + if err := s.AddUpdateAPIService(apiService, handler); err != nil { + t.Error(err) + } + + swagger, err := fetchOpenAPI(mux) + if err != nil { + t.Error(err) + } + expectPath(t, swagger, "/apis/apiservicegroup/v1") + expectPath(t, swagger, "/apis/apiregistration.k8s.io/v1") + + t.Logf("Remove APIService %s", apiService.Name) + s.RemoveAPIService(apiService.Name) + + swagger, err = fetchOpenAPI(mux) + if err != nil { + t.Error(err) + } + // Ensure that the if the APIService is added then removed, the OpenAPI disappears from the aggregated OpenAPI as well. + expectNoPath(t, swagger, "/apis/apiservicegroup/v1") + expectPath(t, swagger, "/apis/apiregistration.k8s.io/v1") +} + +func TestFailingAPIServiceSkippedAggregation(t *testing.T) { + mux := http.NewServeMux() + var delegationHandlers []http.Handler + delegate1 := &openAPIHandler{openapi: &spec.Swagger{ + SwaggerProps: spec.SwaggerProps{ + Paths: &spec.Paths{ + Paths: map[string]spec.PathItem{ + "/apis/foo/v1": {}, + }, + }, + }, + }} + delegationHandlers = append(delegationHandlers, delegate1) + + s := buildAndRegisterSpecAggregator(delegationHandlers, mux) + + apiServiceFailed := &v1.APIService{ + Spec: v1.APIServiceSpec{ + Service: &v1.ServiceReference{Name: "dummy"}, + }, + } + apiServiceFailed.Name = "apiserviceFailed" + + handlerFailed := &openAPIHandler{ + returnErr: true, + openapi: &spec.Swagger{ + SwaggerProps: spec.SwaggerProps{ + Paths: &spec.Paths{ + Paths: map[string]spec.PathItem{ + "/apis/failed/v1": {}, + }, + }, + }, + }, + } + + apiServiceSuccess := &v1.APIService{ + Spec: v1.APIServiceSpec{ + Service: &v1.ServiceReference{Name: "dummy2"}, + }, + } + apiServiceSuccess.Name = "apiserviceSuccess" + + handlerSuccess := &openAPIHandler{ + openapi: &spec.Swagger{ + SwaggerProps: spec.SwaggerProps{ + Paths: &spec.Paths{ + Paths: map[string]spec.PathItem{ + "/apis/success/v1": {}, + }, + }, + }, + }, + } + + s.AddUpdateAPIService(apiServiceFailed, handlerFailed) + s.AddUpdateAPIService(apiServiceSuccess, handlerSuccess) + + swagger, err := fetchOpenAPI(mux) + if err != nil { + t.Error(err) + } + expectPath(t, swagger, "/apis/foo/v1") + expectNoPath(t, swagger, "/apis/failed/v1") + expectPath(t, swagger, "/apis/success/v1") +} + +func TestAPIServiceFailSuccessTransition(t *testing.T) { + mux := http.NewServeMux() + var delegationHandlers []http.Handler + delegate1 := &openAPIHandler{openapi: &spec.Swagger{ + SwaggerProps: spec.SwaggerProps{ + Paths: &spec.Paths{ + Paths: map[string]spec.PathItem{ + "/apis/foo/v1": {}, + }, + }, + }, + }} + delegationHandlers = append(delegationHandlers, delegate1) + + s := buildAndRegisterSpecAggregator(delegationHandlers, mux) + + apiService := &v1.APIService{ + Spec: v1.APIServiceSpec{ + Service: &v1.ServiceReference{Name: "dummy"}, + }, + } + apiService.Name = "apiservice" + + handler := &openAPIHandler{ + returnErr: true, + openapi: &spec.Swagger{ + SwaggerProps: spec.SwaggerProps{ + Paths: &spec.Paths{ + Paths: map[string]spec.PathItem{ + "/apis/apiservicegroup/v1": {}, + }, + }, + }, + }, + } + + s.AddUpdateAPIService(apiService, handler) + + swagger, err := fetchOpenAPI(mux) + if err != nil { + t.Error(err) + } + expectPath(t, swagger, "/apis/foo/v1") + expectNoPath(t, swagger, "/apis/apiservicegroup/v1") + + t.Log("Transition APIService to not return error") + handler.returnErr = false + err = s.UpdateAPIServiceSpec(apiService.Name) + if err != nil { + t.Error(err) + } + swagger, err = fetchOpenAPI(mux) + if err != nil { + t.Error(err) + } + expectPath(t, swagger, "/apis/foo/v1") + expectPath(t, swagger, "/apis/apiservicegroup/v1") +} + +type openAPIHandler struct { + openapi *spec.Swagger + returnErr bool +} + +func (o *openAPIHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if o.returnErr { + w.WriteHeader(500) + return + } + data, err := json.Marshal(o.openapi) + if err != nil { + panic(err) + } + http.ServeContent(w, r, "/openapi/v2", time.Now(), bytes.NewReader(data)) + return +} + +func fetchOpenAPI(mux *http.ServeMux) (*spec.Swagger, error) { + server := httptest.NewServer(mux) + defer server.Close() + client := server.Client() + + req, err := http.NewRequest("GET", server.URL+"/openapi/v2", nil) + if err != nil { + return nil, err + } + resp, err := client.Do(req) + if err != nil { + return nil, err + } + body, err := io.ReadAll(resp.Body) + + swagger := &spec.Swagger{} + if err := swagger.UnmarshalJSON(body); err != nil { + return nil, err + } + return swagger, err +} + +func buildAndRegisterSpecAggregator(delegationHandlers []http.Handler, mux common.PathHandler) *specAggregator { + downloader := NewDownloader() + aggregatorSpec := &spec.Swagger{ + SwaggerProps: spec.SwaggerProps{ + Paths: &spec.Paths{ + Paths: map[string]spec.PathItem{ + "/apis/apiregistration.k8s.io/v1": {}, + }, + }, + }, + } + s := buildAndRegisterSpecAggregatorForLocalServices(&downloader, aggregatorSpec, delegationHandlers, mux) + return s +} + +func expectPath(t *testing.T, swagger *spec.Swagger, path string) { + if _, ok := swagger.Paths.Paths[path]; !ok { + t.Errorf("Expected path %s to exist in aggregated paths", path) + } +} + +func expectNoPath(t *testing.T, swagger *spec.Swagger, path string) { + if _, ok := swagger.Paths.Paths[path]; ok { + t.Errorf("Expected path %s to be omitted in aggregated paths", path) + } +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go index 0a32b169ed4..3098f593e24 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go @@ -24,9 +24,50 @@ import ( "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/kube-openapi/pkg/cached" "k8s.io/kube-openapi/pkg/validation/spec" ) +// cacheableDownloader is a downloader that will always return the data +// and the etag. +type cacheableDownloader struct { + downloader *Downloader + handler http.Handler + etag string + spec *spec.Swagger +} + +// Creates a downloader that also returns the etag, making it useful to use as a cached dependency. +func NewCacheableDownloader(downloader *Downloader, handler http.Handler) cached.Data[*spec.Swagger] { + return &cacheableDownloader{ + downloader: downloader, + handler: handler, + } +} + +func (d *cacheableDownloader) Get() cached.Result[*spec.Swagger] { + swagger, etag, status, err := d.downloader.Download(d.handler, d.etag) + if err != nil { + return cached.NewResultErr[*spec.Swagger](err) + } + switch status { + case http.StatusNotModified: + // Nothing has changed, do nothing. + case http.StatusOK: + if swagger != nil { + d.etag = etag + d.spec = swagger + break + } + fallthrough + case http.StatusNotFound: + return cached.NewResultErr[*spec.Swagger](ErrAPIServiceNotFound) + default: + return cached.NewResultErr[*spec.Swagger](fmt.Errorf("invalid status code: %v", status)) + } + return cached.NewResultOK(d.spec, d.etag) +} + // Downloader is the OpenAPI downloader type. It will try to download spec from /openapi/v2 or /swagger.json endpoint. type Downloader struct { } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/priority.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/priority.go index 9847de3c6c6..27f2851ccaf 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/priority.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/priority.go @@ -18,56 +18,60 @@ package aggregator import ( "sort" + + apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" ) // byPriority can be used in sort.Sort to sort specs with their priorities. type byPriority struct { - specs []openAPISpecInfo + apiServices []*apiregistrationv1.APIService groupPriorities map[string]int32 } -func (a byPriority) Len() int { return len(a.specs) } -func (a byPriority) Swap(i, j int) { a.specs[i], a.specs[j] = a.specs[j], a.specs[i] } +func (a byPriority) Len() int { return len(a.apiServices) } +func (a byPriority) Swap(i, j int) { + a.apiServices[i], a.apiServices[j] = a.apiServices[j], a.apiServices[i] +} func (a byPriority) Less(i, j int) bool { // All local specs will come first - if a.specs[i].apiService.Spec.Service == nil && a.specs[j].apiService.Spec.Service != nil { + if a.apiServices[i].Spec.Service == nil && a.apiServices[j].Spec.Service != nil { return true } - if a.specs[i].apiService.Spec.Service != nil && a.specs[j].apiService.Spec.Service == nil { + if a.apiServices[i].Spec.Service != nil && a.apiServices[j].Spec.Service == nil { return false } // WARNING: This will result in not following priorities for local APIServices. - if a.specs[i].apiService.Spec.Service == nil { + if a.apiServices[i].Spec.Service == nil { // Sort local specs with their name. This is the order in the delegation chain (aggregator first). - return a.specs[i].apiService.Name < a.specs[j].apiService.Name + return a.apiServices[i].Name < a.apiServices[j].Name } var iPriority, jPriority int32 - if a.specs[i].apiService.Spec.Group == a.specs[j].apiService.Spec.Group { - iPriority = a.specs[i].apiService.Spec.VersionPriority - jPriority = a.specs[i].apiService.Spec.VersionPriority + if a.apiServices[i].Spec.Group == a.apiServices[j].Spec.Group { + iPriority = a.apiServices[i].Spec.VersionPriority + jPriority = a.apiServices[i].Spec.VersionPriority } else { - iPriority = a.groupPriorities[a.specs[i].apiService.Spec.Group] - jPriority = a.groupPriorities[a.specs[j].apiService.Spec.Group] + iPriority = a.groupPriorities[a.apiServices[i].Spec.Group] + jPriority = a.groupPriorities[a.apiServices[j].Spec.Group] } if iPriority != jPriority { // Sort by priority, higher first return iPriority > jPriority } // Sort by service name. - return a.specs[i].apiService.Name < a.specs[j].apiService.Name + return a.apiServices[i].Name < a.apiServices[j].Name } -func sortByPriority(specs []openAPISpecInfo) { +func sortByPriority(apiServices []*apiregistrationv1.APIService) { b := byPriority{ - specs: specs, + apiServices: apiServices, groupPriorities: map[string]int32{}, } - for _, spec := range specs { - if spec.apiService.Spec.Service == nil { + for _, apiService := range apiServices { + if apiService.Spec.Service == nil { continue } - if pr, found := b.groupPriorities[spec.apiService.Spec.Group]; !found || spec.apiService.Spec.GroupPriorityMinimum > pr { - b.groupPriorities[spec.apiService.Spec.Group] = spec.apiService.Spec.GroupPriorityMinimum + if pr, found := b.groupPriorities[apiService.Spec.Group]; !found || apiService.Spec.GroupPriorityMinimum > pr { + b.groupPriorities[apiService.Spec.Group] = apiService.Spec.GroupPriorityMinimum } } sort.Sort(b) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/priority_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/priority_test.go index da5cb3e8e7d..cc20bbb37fb 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/priority_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/priority_test.go @@ -21,23 +21,22 @@ import ( "testing" apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" - "k8s.io/kube-openapi/pkg/validation/spec" ) -func newAPIServiceForTest(name, group string, minGroupPriority, versionPriority int32, svc *apiregistrationv1.ServiceReference) apiregistrationv1.APIService { +func newAPIServiceForTest(name, group string, minGroupPriority, versionPriority int32, svc *apiregistrationv1.ServiceReference) *apiregistrationv1.APIService { r := apiregistrationv1.APIService{} r.Spec.Group = group r.Spec.GroupPriorityMinimum = minGroupPriority r.Spec.VersionPriority = versionPriority r.Spec.Service = svc r.Name = name - return r + return &r } -func assertSortedServices(t *testing.T, actual []openAPISpecInfo, expectedNames []string) { +func assertSortedServices(t *testing.T, actual []*apiregistrationv1.APIService, expectedNames []string) { actualNames := []string{} for _, a := range actual { - actualNames = append(actualNames, a.apiService.Name) + actualNames = append(actualNames, a.Name) } if !reflect.DeepEqual(actualNames, expectedNames) { t.Errorf("Expected %s got %s.", expectedNames, actualNames) @@ -45,32 +44,14 @@ func assertSortedServices(t *testing.T, actual []openAPISpecInfo, expectedNames } func TestAPIServiceSort(t *testing.T) { - list := []openAPISpecInfo{ - { - apiService: newAPIServiceForTest("FirstService", "Group1", 10, 5, &apiregistrationv1.ServiceReference{}), - spec: &spec.Swagger{}, - }, - { - apiService: newAPIServiceForTest("SecondService", "Group2", 15, 3, &apiregistrationv1.ServiceReference{}), - spec: &spec.Swagger{}, - }, - { - apiService: newAPIServiceForTest("FirstServiceInternal", "Group1", 16, 3, &apiregistrationv1.ServiceReference{}), - spec: &spec.Swagger{}, - }, - { - apiService: newAPIServiceForTest("ThirdService", "Group3", 15, 3, &apiregistrationv1.ServiceReference{}), - spec: &spec.Swagger{}, - }, - { - apiService: newAPIServiceForTest("local_service_1", "Group4", 15, 1, nil), - }, - { - apiService: newAPIServiceForTest("local_service_3", "Group5", 15, 2, nil), - }, - { - apiService: newAPIServiceForTest("local_service_2", "Group6", 15, 3, nil), - }, + list := []*apiregistrationv1.APIService{ + newAPIServiceForTest("FirstService", "Group1", 10, 5, &apiregistrationv1.ServiceReference{}), + newAPIServiceForTest("SecondService", "Group2", 15, 3, &apiregistrationv1.ServiceReference{}), + newAPIServiceForTest("FirstServiceInternal", "Group1", 16, 3, &apiregistrationv1.ServiceReference{}), + newAPIServiceForTest("ThirdService", "Group3", 15, 3, &apiregistrationv1.ServiceReference{}), + newAPIServiceForTest("local_service_1", "Group4", 15, 1, nil), + newAPIServiceForTest("local_service_3", "Group5", 15, 2, nil), + newAPIServiceForTest("local_service_2", "Group6", 15, 3, nil), } sortByPriority(list) assertSortedServices(t, list, []string{"local_service_1", "local_service_2", "local_service_3", "FirstService", "FirstServiceInternal", "SecondService", "ThirdService"}) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/controller.go index 98e6ecc6dad..780eb2090bb 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/controller.go @@ -25,7 +25,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" + v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator" ) @@ -67,11 +67,6 @@ func NewAggregationController(downloader *aggregator.Downloader, openAPIAggregat c.syncHandler = c.sync - // update each service at least once, also those which are not coming from APIServices, namely local services - for _, name := range openAPIAggregationManager.GetAPIServiceNames() { - c.queue.AddAfter(name, time.Second) - } - return c } @@ -100,55 +95,31 @@ func (c *AggregationController) processNextWorkItem() bool { if quit { return false } - - if aggregator.IsLocalAPIService(key.(string)) { - // for local delegation targets that are aggregated once per second, log at - // higher level to avoid flooding the log - klog.V(6).Infof("OpenAPI AggregationController: Processing item %s", key) - } else { - klog.V(4).Infof("OpenAPI AggregationController: Processing item %s", key) - } + klog.V(4).Infof("OpenAPI AggregationController: Processing item %s", key) action, err := c.syncHandler(key.(string)) - if err == nil { - c.queue.Forget(key) - } else { + if err != nil { utilruntime.HandleError(fmt.Errorf("loading OpenAPI spec for %q failed with: %v", key, err)) } switch action { case syncRequeue: - if aggregator.IsLocalAPIService(key.(string)) { - klog.V(7).Infof("OpenAPI AggregationController: action for local item %s: Requeue after %s.", key, successfulUpdateDelayLocal) - c.queue.AddAfter(key, successfulUpdateDelayLocal) - } else { - klog.V(7).Infof("OpenAPI AggregationController: action for item %s: Requeue.", key) - c.queue.AddAfter(key, successfulUpdateDelay) - } + c.queue.AddAfter(key, successfulUpdateDelay) case syncRequeueRateLimited: klog.Infof("OpenAPI AggregationController: action for item %s: Rate Limited Requeue.", key) c.queue.AddRateLimited(key) case syncNothing: - klog.Infof("OpenAPI AggregationController: action for item %s: Nothing (removed from the queue).", key) + c.queue.Forget(key) } return true } func (c *AggregationController) sync(key string) (syncAction, error) { - handler, etag, exists := c.openAPIAggregationManager.GetAPIServiceInfo(key) - if !exists || handler == nil { - return syncNothing, nil - } - returnSpec, newEtag, httpStatus, err := c.downloader.Download(handler, etag) - switch { - case err != nil: - return syncRequeueRateLimited, err - case httpStatus == http.StatusNotModified: - case httpStatus == http.StatusNotFound || returnSpec == nil: - return syncRequeueRateLimited, fmt.Errorf("OpenAPI spec does not exist") - case httpStatus == http.StatusOK: - if err := c.openAPIAggregationManager.UpdateAPIServiceSpec(key, returnSpec, newEtag); err != nil { + if err := c.openAPIAggregationManager.UpdateAPIServiceSpec(key); err != nil { + if err == aggregator.ErrAPIServiceNotFound { + return syncNothing, nil + } else { return syncRequeueRateLimited, err } } @@ -160,7 +131,7 @@ func (c *AggregationController) AddAPIService(handler http.Handler, apiService * if apiService.Spec.Service == nil { return } - if err := c.openAPIAggregationManager.AddUpdateAPIService(handler, apiService); err != nil { + if err := c.openAPIAggregationManager.AddUpdateAPIService(apiService, handler); err != nil { utilruntime.HandleError(fmt.Errorf("adding %q to AggregationController failed with: %v", apiService.Name, err)) } c.queue.AddAfter(apiService.Name, time.Second) @@ -168,11 +139,8 @@ func (c *AggregationController) AddAPIService(handler http.Handler, apiService * // UpdateAPIService updates API Service's info and handler. func (c *AggregationController) UpdateAPIService(handler http.Handler, apiService *v1.APIService) { - if apiService.Spec.Service == nil { - return - } - if err := c.openAPIAggregationManager.AddUpdateAPIService(handler, apiService); err != nil { - utilruntime.HandleError(fmt.Errorf("updating %q to AggregationController failed with: %v", apiService.Name, err)) + if err := c.openAPIAggregationManager.AddUpdateAPIService(apiService, handler); err != nil { + utilruntime.HandleError(fmt.Errorf("Error updating APIService %q with err: %v", apiService.Name, err)) } key := apiService.Name if c.queue.NumRequeues(key) > 0 { @@ -187,7 +155,7 @@ func (c *AggregationController) UpdateAPIService(handler http.Handler, apiServic // RemoveAPIService removes API Service from OpenAPI Aggregation Controller. func (c *AggregationController) RemoveAPIService(apiServiceName string) { - if err := c.openAPIAggregationManager.RemoveAPIServiceSpec(apiServiceName); err != nil { + if err := c.openAPIAggregationManager.RemoveAPIService(apiServiceName); err != nil { utilruntime.HandleError(fmt.Errorf("removing %q from AggregationController failed with: %v", apiServiceName, err)) } // This will only remove it if it was failing before. If it was successful, processNextWorkItem will figure it out