From f0837c18d371ffbaacddb126ca066baef6aaf593 Mon Sep 17 00:00:00 2001 From: Jefftree Date: Thu, 24 Mar 2022 22:23:50 -0700 Subject: [PATCH] Add conversion if APIService does not publish v3 --- .../openapiv3/aggregator/aggregator.go | 70 +++++++- .../openapiv3/aggregator/aggregator_test.go | 169 ++++++++++++++++++ .../openapiv3/aggregator/downloader.go | 104 ++--------- .../openapiv3/aggregator/downloader_test.go | 42 +---- 4 files changed, 248 insertions(+), 137 deletions(-) create mode 100644 staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator_test.go diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator.go index 4be6e1b0a73..8b32ae96d60 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator.go @@ -26,10 +26,14 @@ import ( "time" "k8s.io/apiserver/pkg/server" + "k8s.io/apiserver/pkg/server/mux" "k8s.io/klog/v2" v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" "k8s.io/kube-openapi/pkg/common" "k8s.io/kube-openapi/pkg/handler3" + "k8s.io/kube-openapi/pkg/openapiconv" + + v2aggregator "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator" ) // SpecProxier proxies OpenAPI V3 requests to their respective APIService @@ -45,6 +49,7 @@ const ( specDownloadTimeout = 60 * time.Second localDelegateChainNamePrefix = "k8s_internal_local_delegation_chain_" localDelegateChainNamePattern = localDelegateChainNamePrefix + "%010d" + openAPIV2Converter = "openapiv2converter" ) // IsLocalAPIService returns true for local specs from delegates. @@ -87,6 +92,17 @@ func BuildAndRegisterAggregator(downloader Downloader, delegationTarget server.D s.UpdateAPIServiceSpec(apiServiceName) i++ } + + handler, err := handler3.NewOpenAPIService(nil) + if err != nil { + return s, err + } + s.openAPIV2ConverterHandler = handler + openAPIV2ConverterMux := mux.NewPathRecorderMux(openAPIV2Converter) + s.openAPIV2ConverterHandler.RegisterOpenAPIV3VersionedService("/openapi/v3", openAPIV2ConverterMux) + openAPIV2ConverterAPIService := v1.APIService{} + openAPIV2ConverterAPIService.Name = openAPIV2Converter + s.AddUpdateAPIService(openAPIV2ConverterMux, &openAPIV2ConverterAPIService) s.register(pathHandler) return s, nil @@ -107,22 +123,51 @@ func (s *specProxier) AddUpdateAPIService(handler http.Handler, apiservice *v1.A } } +func getGroupVersionStringFromAPIService(apiService v1.APIService) string { + if apiService.Spec.Group == "" && apiService.Spec.Version == "" { + return "" + } + return "apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version +} + // UpdateAPIServiceSpec updates all the OpenAPI v3 specs that the APIService serves. // It is thread safe. func (s *specProxier) UpdateAPIServiceSpec(apiServiceName string) error { s.rwMutex.Lock() defer s.rwMutex.Unlock() + return s.updateAPIServiceSpecLocked(apiServiceName) +} +func (s *specProxier) updateAPIServiceSpecLocked(apiServiceName string) error { apiService, exists := s.apiServiceInfo[apiServiceName] if !exists { return fmt.Errorf("APIService %s does not exist for update", apiServiceName) } - gv, err := s.downloader.OpenAPIV3Root(apiService.handler) + if !apiService.isLegacyAPIService { + gv, httpStatus, err := s.downloader.OpenAPIV3Root(apiService.handler) + if err != nil { + return err + } + if httpStatus == http.StatusNotFound { + apiService.isLegacyAPIService = true + } else { + s.apiServiceInfo[apiServiceName].discovery = gv + return nil + } + } + + newDownloader := v2aggregator.Downloader{} + v2Spec, etag, httpStatus, err := newDownloader.Download(apiService.handler, apiService.etag) if err != nil { return err } - s.apiServiceInfo[apiServiceName].discovery = gv + apiService.etag = etag + if httpStatus == http.StatusOK { + v3Spec := openapiconv.ConvertV2ToV3(v2Spec) + s.openAPIV2ConverterHandler.UpdateGroupVersion(getGroupVersionStringFromAPIService(apiService.apiService), v3Spec) + s.updateAPIServiceSpecLocked(openAPIV2Converter) + } return nil } @@ -135,6 +180,8 @@ type specProxier struct { // For downloading the OpenAPI v3 specs from apiservices downloader Downloader + + openAPIV2ConverterHandler *handler3.OpenAPIService } var _ SpecProxier = &specProxier{} @@ -143,6 +190,12 @@ type openAPIV3APIServiceInfo struct { apiService v1.APIService handler http.Handler discovery *handler3.OpenAPIV3Discovery + + // These fields are only used if the /openapi/v3 endpoint is not served by an APIService + // Legacy APIService indicates that an APIService does not support OpenAPI V3, and the OpenAPI V2 + // will be downloaded, converted to V3 (lossy), and served by the aggregator + etag string + isLegacyAPIService bool } // RemoveAPIServiceSpec removes an api service from the OpenAPI map. If it does not exist, no error is returned. @@ -150,11 +203,13 @@ type openAPIV3APIServiceInfo struct { func (s *specProxier) RemoveAPIServiceSpec(apiServiceName string) { s.rwMutex.Lock() defer s.rwMutex.Unlock() - delete(s.apiServiceInfo, apiServiceName) + if apiServiceInfo, ok := s.apiServiceInfo[apiServiceName]; ok { + s.openAPIV2ConverterHandler.DeleteGroupVersion(getGroupVersionStringFromAPIService(apiServiceInfo.apiService)) + delete(s.apiServiceInfo, apiServiceName) + } } -// handleDiscovery is the handler for OpenAPI V3 Discovery -func (s *specProxier) handleDiscovery(w http.ResponseWriter, r *http.Request) { +func (s *specProxier) getOpenAPIV3Root() handler3.OpenAPIV3Discovery { s.rwMutex.RLock() defer s.rwMutex.RUnlock() @@ -171,7 +226,12 @@ func (s *specProxier) handleDiscovery(w http.ResponseWriter, r *http.Request) { merged.Paths[key] = item } } + return merged +} +// handleDiscovery is the handler for OpenAPI V3 Discovery +func (s *specProxier) handleDiscovery(w http.ResponseWriter, r *http.Request) { + merged := s.getOpenAPIV3Root() j, err := json.Marshal(&merged) if err != nil { w.WriteHeader(500) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator_test.go new file mode 100644 index 00000000000..5b018b3b1e4 --- /dev/null +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator_test.go @@ -0,0 +1,169 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aggregator + +import ( + "bytes" + "encoding/json" + "net/http" + "testing" + + genericapiserver "k8s.io/apiserver/pkg/server" + "k8s.io/apiserver/pkg/server/mux" + v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" + "k8s.io/kube-openapi/pkg/handler3" +) + +type testV3APIService struct { + etag string + data []byte +} + +var _ http.Handler = testV3APIService{} + +func (h testV3APIService) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Create an APIService with a handler for one group/version + if r.URL.Path == "/openapi/v3" { + group := &handler3.OpenAPIV3Discovery{ + Paths: map[string]handler3.OpenAPIV3DiscoveryGroupVersion{ + "apis/group.example.com/v1": { + ServerRelativeURL: "/openapi/v3/apis/group.example.com/v1?hash=" + h.etag, + }, + }, + } + + j, _ := json.Marshal(group) + w.Write(j) + return + } + + if r.URL.Path == "/openapi/v3/apis/group.example.com/v1" { + if len(h.etag) > 0 { + w.Header().Add("Etag", h.etag) + } + ifNoneMatches := r.Header["If-None-Match"] + for _, match := range ifNoneMatches { + if match == h.etag { + w.WriteHeader(http.StatusNotModified) + return + } + } + w.Write(h.data) + } +} + +type testV2APIService struct{} + +var _ http.Handler = testV2APIService{} + +func (h testV2APIService) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Create an APIService with a handler for one group/version + if r.URL.Path == "/openapi/v2" { + w.Write([]byte(`{"swagger":"2.0","info":{"title":"Kubernetes","version":"unversioned"}}`)) + return + } + w.WriteHeader(404) +} + +func TestV2APIService(t *testing.T) { + downloader := Downloader{} + pathHandler := mux.NewPathRecorderMux("aggregator_test") + var serveHandler http.Handler = pathHandler + specProxier, err := BuildAndRegisterAggregator(downloader, genericapiserver.NewEmptyDelegate(), pathHandler) + if err != nil { + t.Error(err) + } + handler := testV2APIService{} + apiService := &v1.APIService{ + Spec: v1.APIServiceSpec{ + Group: "group.example.com", + Version: "v1", + }, + } + apiService.Name = "v1.group.example.com" + specProxier.AddUpdateAPIService(handler, apiService) + specProxier.UpdateAPIServiceSpec("v1.group.example.com") + + data := sendReq(t, serveHandler, "/openapi/v3") + groupVersionList := handler3.OpenAPIV3Discovery{} + if err := json.Unmarshal(data, &groupVersionList); err != nil { + t.Fatal(err) + } + + // A legacy APIService will not publish OpenAPI V3 + // Ensure that we can still aggregate its V2 spec and convert it to V3. + path, ok := groupVersionList.Paths["apis/group.example.com/v1"] + if !ok { + t.Error("Expected group.example.com/v1 to be in group version list") + } + gotSpecJSON := sendReq(t, serveHandler, path.ServerRelativeURL) + + expectedV3Bytes := []byte(`{"openapi":"3.0.0","info":{"title":"Kubernetes","version":"unversioned"},"components":{}}`) + + if bytes.Compare(gotSpecJSON, expectedV3Bytes) != 0 { + t.Errorf("Spec mismatch, expected %s, got %s", expectedV3Bytes, gotSpecJSON) + } +} + +func TestV3APIService(t *testing.T) { + downloader := Downloader{} + + pathHandler := mux.NewPathRecorderMux("aggregator_test") + var serveHandler http.Handler = pathHandler + specProxier, err := BuildAndRegisterAggregator(downloader, genericapiserver.NewEmptyDelegate(), pathHandler) + if err != nil { + t.Error(err) + } + specJSON := []byte(`{"openapi":"3.0.0","info":{"title":"Kubernetes","version":"unversioned"}}`) + handler := testV3APIService{ + etag: "6E8F849B434D4B98A569B9D7718876E9-356ECAB19D7FBE1336BABB1E70F8F3025050DE218BE78256BE81620681CFC9A268508E542B8B55974E17B2184BBFC8FFFAA577E51BE195D32B3CA2547818ABE4", + data: specJSON, + } + apiService := &v1.APIService{ + Spec: v1.APIServiceSpec{ + Group: "group.example.com", + Version: "v1", + }, + } + apiService.Name = "v1.group.example.com" + specProxier.AddUpdateAPIService(handler, apiService) + specProxier.UpdateAPIServiceSpec("v1.group.example.com") + + data := sendReq(t, serveHandler, "/openapi/v3") + groupVersionList := handler3.OpenAPIV3Discovery{} + if err := json.Unmarshal(data, &groupVersionList); err != nil { + t.Fatal(err) + } + path, ok := groupVersionList.Paths["apis/group.example.com/v1"] + if !ok { + t.Error("Expected group.example.com/v1 to be in group version list") + } + gotSpecJSON := sendReq(t, serveHandler, path.ServerRelativeURL) + if bytes.Compare(gotSpecJSON, specJSON) != 0 { + t.Errorf("Spec mismatch, expected %s, got %s", specJSON, gotSpecJSON) + } +} + +func sendReq(t *testing.T, handler http.Handler, path string) []byte { + req, err := http.NewRequest("GET", path, nil) + if err != nil { + t.Fatal(err) + } + writer := newInMemoryResponseWriter() + handler.ServeHTTP(writer, req) + return writer.data +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader.go index c0ab02f2771..45b2467980a 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader.go @@ -23,11 +23,16 @@ import ( "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/request" - "k8s.io/klog/v2" "k8s.io/kube-openapi/pkg/handler3" - "k8s.io/kube-openapi/pkg/spec3" ) +type NotFoundError struct { +} + +func (e *NotFoundError) Error() string { + return "" +} + // Downloader is the OpenAPI downloader type. It will try to download spec from /openapi/v3 and /openap/v3// endpoints. type Downloader struct { } @@ -44,112 +49,29 @@ func (s *Downloader) handlerWithUser(handler http.Handler, info user.Info) http. }) } -// SpecETag is a OpenAPI v3 spec and etag pair for the endpoint of each OpenAPI group/version -type SpecETag struct { - spec *spec3.OpenAPI - etag string -} - // OpenAPIV3Root downloads the OpenAPI V3 root document from an APIService -func (s *Downloader) OpenAPIV3Root(handler http.Handler) (*handler3.OpenAPIV3Discovery, error) { +func (s *Downloader) OpenAPIV3Root(handler http.Handler) (*handler3.OpenAPIV3Discovery, int, error) { handler = s.handlerWithUser(handler, &user.DefaultInfo{Name: aggregatorUser}) handler = http.TimeoutHandler(handler, specDownloadTimeout, "request timed out") req, err := http.NewRequest("GET", "/openapi/v3", nil) if err != nil { - return nil, err + return nil, 0, err } - req.Header.Add("Accept", "application/json") - writer := newInMemoryResponseWriter() handler.ServeHTTP(writer, req) switch writer.respCode { case http.StatusNotFound: - // TODO: For APIServices, download the V2 spec and convert to V3 - return nil, nil + return nil, writer.respCode, nil case http.StatusOK: groups := handler3.OpenAPIV3Discovery{} if err := json.Unmarshal(writer.data, &groups); err != nil { - return nil, err + return nil, writer.respCode, err } - return &groups, nil - } - return nil, fmt.Errorf("Error, could not get list of group versions for APIService") -} - -// Download downloads OpenAPI v3 for all groups of a given handler -func (s *Downloader) Download(handler http.Handler, etagList map[string]string) (returnSpec map[string]*SpecETag, err error) { - // TODO(jefftree): https://github.com/kubernetes/kubernetes/pull/105945#issuecomment-966455034 - // Move to proxy request in the aggregator and let the APIServices serve the OpenAPI directly - handler = s.handlerWithUser(handler, &user.DefaultInfo{Name: aggregatorUser}) - handler = http.TimeoutHandler(handler, specDownloadTimeout, "request timed out") - - req, err := http.NewRequest("GET", "/openapi/v3", nil) - if err != nil { - return nil, err - } - req.Header.Add("Accept", "application/json") - - writer := newInMemoryResponseWriter() - handler.ServeHTTP(writer, req) - - switch writer.respCode { - case http.StatusNotFound: - // Gracefully skip 404, assuming the server won't provide any spec - return nil, nil - case http.StatusOK: - groups := handler3.OpenAPIV3Discovery{} - aggregated := make(map[string]*SpecETag) - - if err := json.Unmarshal(writer.data, &groups); err != nil { - return nil, err - } - for path, item := range groups.Paths { - reqPath := item.ServerRelativeURL - req, err := http.NewRequest("GET", reqPath, nil) - if err != nil { - return nil, err - } - req.Header.Add("Accept", "application/json") - oldEtag, ok := etagList[path] - if ok { - req.Header.Add("If-None-Match", oldEtag) - } - openAPIWriter := newInMemoryResponseWriter() - handler.ServeHTTP(openAPIWriter, req) - - switch openAPIWriter.respCode { - case http.StatusNotFound: - continue - case http.StatusNotModified: - aggregated[path] = &SpecETag{ - etag: oldEtag, - } - case http.StatusOK: - var spec spec3.OpenAPI - // TODO|jefftree: For OpenAPI v3 Beta, if the v3 spec is empty then - // we should request the v2 endpoint and convert it to v3 - if len(openAPIWriter.data) > 0 { - err = json.Unmarshal(openAPIWriter.data, &spec) - if err != nil { - return nil, err - } - etag := openAPIWriter.Header().Get("Etag") - aggregated[path] = &SpecETag{ - spec: &spec, - etag: etag, - } - } - default: - klog.Errorf("Error: unknown status %v", openAPIWriter.respCode) - } - } - - return aggregated, nil - default: - return nil, fmt.Errorf("failed to retrieve openAPI spec, http error: %s", writer.String()) + return &groups, writer.respCode, nil } + return nil, writer.respCode, fmt.Errorf("Error, could not get list of group versions for APIService") } // inMemoryResponseWriter is a http.Writer that keep the response in memory. diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader_test.go index 4375d9fec2f..0877cb301f2 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader_test.go @@ -18,7 +18,6 @@ package aggregator import ( "encoding/json" - "fmt" "net/http" "testing" @@ -65,35 +64,10 @@ func (h handlerTest) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } -func assertDownloadedSpec(gvSpec map[string]*SpecETag, err error, expectedSpecID string, expectedEtag string) error { - if err != nil { - return fmt.Errorf("downloadOpenAPISpec failed : %s", err) - } - specInfo, ok := gvSpec["apis/group/version"] - if !ok { - if expectedSpecID == "" { - return nil - } - return fmt.Errorf("expected to download spec, no spec downloaded") - } - - if specInfo.spec != nil && expectedSpecID == "" { - return fmt.Errorf("expected ID %s, actual ID %s", expectedSpecID, specInfo.spec.Version) - } - - if specInfo.spec != nil && specInfo.spec.Version != expectedSpecID { - return fmt.Errorf("expected ID %s, actual ID %s", expectedSpecID, specInfo.spec.Version) - } - if specInfo.etag != expectedEtag { - return fmt.Errorf("expected ETag '%s', actual ETag '%s'", expectedEtag, specInfo.etag) - } - return nil -} - func TestDownloadOpenAPISpec(t *testing.T) { s := Downloader{} - groups, err := s.OpenAPIV3Root( + groups, _, err := s.OpenAPIV3Root( handlerTest{data: []byte(""), etag: ""}) assert.NoError(t, err) if assert.NotNil(t, groups) { @@ -103,18 +77,4 @@ func TestDownloadOpenAPISpec(t *testing.T) { } } - // Test with eTag - gvSpec, err := s.Download( - handlerTest{data: []byte("{\"openapi\": \"test\"}"), etag: "etag_test"}, map[string]string{}) - assert.NoError(t, assertDownloadedSpec(gvSpec, err, "test", "etag_test")) - - // Test not modified - gvSpec, err = s.Download( - handlerTest{data: []byte("{\"openapi\": \"test\"}"), etag: "etag_test"}, map[string]string{"apis/group/version": "etag_test"}) - assert.NoError(t, assertDownloadedSpec(gvSpec, err, "", "etag_test")) - - // Test different eTags - gvSpec, err = s.Download( - handlerTest{data: []byte("{\"openapi\": \"test\"}"), etag: "etag_test1"}, map[string]string{"apis/group/version": "etag_test2"}) - assert.NoError(t, assertDownloadedSpec(gvSpec, err, "test", "etag_test1")) }