diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator.go index 519a7900e30..05b1c5f62d8 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator.go @@ -17,8 +17,11 @@ limitations under the License. package aggregator import ( + "bytes" + "encoding/json" "fmt" "net/http" + "sort" "strings" "sync" "time" @@ -26,15 +29,10 @@ import ( "k8s.io/apiserver/pkg/server" v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" "k8s.io/kube-openapi/pkg/common" - "k8s.io/kube-openapi/pkg/handler3" - "k8s.io/kube-openapi/pkg/spec3" ) -// SpecAggregator calls out to http handlers of APIServices and caches specs. It keeps state of the last -// known specs including the http etag. -// TODO(jefftree): remove the downloading and caching and proxy directly to the APIServices. This is possible because we -// don't have to merge here, which is cpu intensive in v2 -type SpecAggregator interface { +// SpecProxier proxies OpenAPI V3 requests to their respective APIService +type SpecProxier interface { AddUpdateAPIService(handler http.Handler, apiService *v1.APIService) UpdateAPIServiceSpec(apiServiceName string) error RemoveAPIServiceSpec(apiServiceName string) @@ -53,37 +51,27 @@ func IsLocalAPIService(apiServiceName string) bool { return strings.HasPrefix(apiServiceName, localDelegateChainNamePrefix) } -// GetAPIServicesName returns the names of APIServices recorded in openAPIV3Specs. +// GetAPIServicesName returns the names of APIServices recorded in apiServiceInfo. // We use this function to pass the names of local APIServices to the controller in this package, // so that the controller can periodically sync the OpenAPI spec from delegation API servers. -func (s *specAggregator) GetAPIServiceNames() []string { - s.rwMutex.Lock() - defer s.rwMutex.Unlock() +func (s *specProxier) GetAPIServiceNames() []string { + s.rwMutex.RLock() + defer s.rwMutex.RUnlock() - names := make([]string, len(s.openAPIV3Specs)) - for key := range s.openAPIV3Specs { + names := make([]string, len(s.apiServiceInfo)) + for key := range s.apiServiceInfo { names = append(names, key) } return names } // BuildAndRegisterAggregator registered OpenAPI aggregator handler. This function is not thread safe as it only being called on startup. -func BuildAndRegisterAggregator(downloader Downloader, delegationTarget server.DelegationTarget, pathHandler common.PathHandlerByGroupVersion) (SpecAggregator, error) { - var err error - s := &specAggregator{ - openAPIV3Specs: map[string]*openAPIV3APIServiceInfo{}, +func BuildAndRegisterAggregator(downloader Downloader, delegationTarget server.DelegationTarget, pathHandler common.PathHandlerByGroupVersion) (SpecProxier, error) { + s := &specProxier{ + apiServiceInfo: map[string]*openAPIV3APIServiceInfo{}, downloader: downloader, } - s.openAPIV3VersionedService, err = handler3.NewOpenAPIService(nil) - if err != nil { - return nil, err - } - err = s.openAPIV3VersionedService.RegisterOpenAPIV3VersionedService("/openapi/v3", pathHandler) - if err != nil { - return nil, err - } - i := 1 for delegate := delegationTarget; delegate != nil; delegate = delegate.NextDelegate() { handler := delegate.UnprotectedHandler() @@ -98,109 +86,126 @@ func BuildAndRegisterAggregator(downloader Downloader, delegationTarget server.D s.UpdateAPIServiceSpec(apiServiceName) i++ } + s.register(pathHandler) return s, nil } // AddUpdateAPIService adds or updates the api service. It is thread safe. -func (s *specAggregator) AddUpdateAPIService(handler http.Handler, apiservice *v1.APIService) { +func (s *specProxier) AddUpdateAPIService(handler http.Handler, apiservice *v1.APIService) { s.rwMutex.Lock() defer s.rwMutex.Unlock() // If the APIService is being updated, use the existing struct. - if apiServiceInfo, ok := s.openAPIV3Specs[apiservice.Name]; ok { + if apiServiceInfo, ok := s.apiServiceInfo[apiservice.Name]; ok { apiServiceInfo.apiService = *apiservice apiServiceInfo.handler = handler } - s.openAPIV3Specs[apiservice.Name] = &openAPIV3APIServiceInfo{ + s.apiServiceInfo[apiservice.Name] = &openAPIV3APIServiceInfo{ apiService: *apiservice, handler: handler, - specs: make(map[string]*openAPIV3SpecInfo), } } // UpdateAPIServiceSpec updates all the OpenAPI v3 specs that the APIService serves. // It is thread safe. -func (s *specAggregator) UpdateAPIServiceSpec(apiServiceName string) error { +func (s *specProxier) UpdateAPIServiceSpec(apiServiceName string) error { s.rwMutex.Lock() defer s.rwMutex.Unlock() - apiService, exists := s.openAPIV3Specs[apiServiceName] + apiService, exists := s.apiServiceInfo[apiServiceName] if !exists { return fmt.Errorf("APIService %s does not exist for update", apiServiceName) } - // Pass a list of old etags to the Downloader to prevent transfers if etags match - etagList := make(map[string]string) - for gv, specInfo := range apiService.specs { - etagList[gv] = specInfo.etag - } - groups, err := s.downloader.Download(apiService.handler, etagList) + gv, err := s.downloader.OpenAPIV3Root(apiService.handler) if err != nil { return err } - - // Remove any groups that do not exist anymore - for group := range s.openAPIV3Specs[apiServiceName].specs { - if _, exists := groups[group]; !exists { - s.openAPIV3VersionedService.DeleteGroupVersion(group) - delete(s.openAPIV3Specs[apiServiceName].specs, group) - } - } - - for group, info := range groups { - if info.spec == nil { - continue - } - - // If ETag has not changed, no update is necessary - oldInfo, exists := s.openAPIV3Specs[apiServiceName].specs[group] - if exists && oldInfo.etag == info.etag { - continue - } - s.openAPIV3Specs[apiServiceName].specs[group] = &openAPIV3SpecInfo{ - spec: info.spec, - etag: info.etag, - } - s.openAPIV3VersionedService.UpdateGroupVersion(group, info.spec) - } + s.apiServiceInfo[apiServiceName].gvList = gv return nil } -type specAggregator struct { +type specProxier struct { // mutex protects all members of this struct. rwMutex sync.RWMutex // OpenAPI V3 specs by APIService name - openAPIV3Specs map[string]*openAPIV3APIServiceInfo - // provided for dynamic OpenAPI spec - openAPIV3VersionedService *handler3.OpenAPIService + apiServiceInfo map[string]*openAPIV3APIServiceInfo // For downloading the OpenAPI v3 specs from apiservices downloader Downloader } -var _ SpecAggregator = &specAggregator{} +var _ SpecProxier = &specProxier{} type openAPIV3APIServiceInfo struct { apiService v1.APIService handler http.Handler - specs map[string]*openAPIV3SpecInfo -} - -type openAPIV3SpecInfo struct { - spec *spec3.OpenAPI - etag string + gvList []string } // RemoveAPIServiceSpec removes an api service from the OpenAPI map. If it does not exist, no error is returned. // It is thread safe. -func (s *specAggregator) RemoveAPIServiceSpec(apiServiceName string) { +func (s *specProxier) RemoveAPIServiceSpec(apiServiceName string) { s.rwMutex.Lock() defer s.rwMutex.Unlock() - if apiServiceInfo, ok := s.openAPIV3Specs[apiServiceName]; ok { - for gv := range apiServiceInfo.specs { - s.openAPIV3VersionedService.DeleteGroupVersion(gv) - } - delete(s.openAPIV3Specs, apiServiceName) + if _, ok := s.apiServiceInfo[apiServiceName]; ok { + delete(s.apiServiceInfo, apiServiceName) } } + +// handleDiscovery is the handler for OpenAPI V3 Discovery +func (s *specProxier) handleDiscovery(w http.ResponseWriter, r *http.Request) { + s.rwMutex.RLock() + defer s.rwMutex.RUnlock() + + gvList := make(map[string]bool) + for _, apiServiceInfo := range s.apiServiceInfo { + for _, gv := range apiServiceInfo.gvList { + gvList[gv] = true + } + } + + keys := make([]string, 0, len(gvList)) + for k := range gvList { + keys = append(keys, k) + } + + sort.Strings(keys) + output := map[string][]string{"Paths": keys} + j, err := json.Marshal(output) + if err != nil { + return + } + + http.ServeContent(w, r, "/openapi/v3", time.Now(), bytes.NewReader(j)) +} + +// handleGroupVersion is the OpenAPI V3 handler for a specified group/version +func (s *specProxier) handleGroupVersion(w http.ResponseWriter, r *http.Request) { + s.rwMutex.RLock() + defer s.rwMutex.RUnlock() + + // TODO: Import this logic from kube-openapi instead of duplicating + // URLs for OpenAPI V3 have the format /openapi/v3/ + // SplitAfterN with 4 yields ["", "openapi", "v3", ] + url := strings.SplitAfterN(r.URL.Path, "/", 4) + targetGV := url[3] + + for _, apiServiceInfo := range s.apiServiceInfo { + for _, gv := range apiServiceInfo.gvList { + if targetGV == gv { + apiServiceInfo.handler.ServeHTTP(w, r) + return + } + } + } + // No group-versions match the desired request + w.WriteHeader(404) +} + +// Register registers the OpenAPI V3 Discovery and GroupVersion handlers +func (s *specProxier) register(handler common.PathHandlerByGroupVersion) { + handler.Handle("/openapi/v3", http.HandlerFunc(s.handleDiscovery)) + handler.HandlePrefix("/openapi/v3/", http.HandlerFunc(s.handleGroupVersion)) +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader.go index b91f9e64aa3..f53fd9f75c2 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader.go @@ -54,6 +54,34 @@ type SpecETag struct { etag string } +// OpenAPIV3Root downloads the OpenAPI V3 root document from an APIService +func (s *Downloader) OpenAPIV3Root(handler http.Handler) ([]string, error) { + handler = s.handlerWithUser(handler, &user.DefaultInfo{Name: aggregatorUser}) + handler = http.TimeoutHandler(handler, specDownloadTimeout, "request timed out") + + req, err := http.NewRequest("GET", "/openapi/v3", nil) + if err != nil { + return nil, err + } + req.Header.Add("Accept", "application/json") + + writer := newInMemoryResponseWriter() + handler.ServeHTTP(writer, req) + + switch writer.respCode { + case http.StatusNotFound: + // TODO: For APIServices, download the V2 spec and convert to V3 + return nil, nil + case http.StatusOK: + groups := gvList{} + if err := json.Unmarshal(writer.data, &groups); err != nil { + return nil, err + } + return groups.Paths, nil + } + return nil, fmt.Errorf("Error, could not get list of group versions for APIService") +} + // Download downloads OpenAPI v3 for all groups of a given handler func (s *Downloader) Download(handler http.Handler, etagList map[string]string) (returnSpec map[string]*SpecETag, err error) { // TODO(jefftree): https://github.com/kubernetes/kubernetes/pull/105945#issuecomment-966455034 diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader_test.go index 7e5043bbe52..7efd0d90c84 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader_test.go @@ -32,10 +32,12 @@ type handlerTest struct { var _ http.Handler = handlerTest{} +var groupList = []string{"apis/group/version"} + func (h handlerTest) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Create an APIService with a handler for one group/version group := make(map[string][]string) - group["Paths"] = []string{"apis/group/version"} + group["Paths"] = groupList j, _ := json.Marshal(group) if r.URL.Path == "/openapi/v3" { w.Write(j) @@ -85,6 +87,11 @@ func assertDownloadedSpec(gvSpec map[string]*SpecETag, err error, expectedSpecID func TestDownloadOpenAPISpec(t *testing.T) { s := Downloader{} + groups, err := s.OpenAPIV3Root( + handlerTest{data: []byte(""), etag: ""}) + assert.NoError(t, err) + assert.Equal(t, groups, groupList) + // Test with eTag gvSpec, err := s.Download( handlerTest{data: []byte("{\"openapi\": \"test\"}"), etag: "etag_test"}, map[string]string{}) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/controller.go index 7e981ca1da6..ed69afe40c1 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/controller.go @@ -43,10 +43,9 @@ const ( syncNothing ) -// AggregationController periodically check for changes in OpenAPI specs of APIServices and update/remove -// them if necessary. +// AggregationController periodically checks the list of group-versions handled by each APIService and updates the discovery page periodically type AggregationController struct { - openAPIAggregationManager aggregator.SpecAggregator + openAPIAggregationManager aggregator.SpecProxier queue workqueue.RateLimitingInterface // To allow injection for testing. @@ -54,7 +53,7 @@ type AggregationController struct { } // NewAggregationController creates new OpenAPI aggregation controller. -func NewAggregationController(openAPIAggregationManager aggregator.SpecAggregator) *AggregationController { +func NewAggregationController(openAPIAggregationManager aggregator.SpecProxier) *AggregationController { c := &AggregationController{ openAPIAggregationManager: openAPIAggregationManager, queue: workqueue.NewNamedRateLimitingQueue(