Add conversion if APIService does not publish v3

This commit is contained in:
Jefftree 2022-03-24 22:23:50 -07:00
parent 67d3dbfaae
commit f0837c18d3
4 changed files with 248 additions and 137 deletions

View File

@ -26,10 +26,14 @@ import (
"time"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/mux"
"k8s.io/klog/v2"
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
"k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/handler3"
"k8s.io/kube-openapi/pkg/openapiconv"
v2aggregator "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator"
)
// SpecProxier proxies OpenAPI V3 requests to their respective APIService
@ -45,6 +49,7 @@ const (
specDownloadTimeout = 60 * time.Second
localDelegateChainNamePrefix = "k8s_internal_local_delegation_chain_"
localDelegateChainNamePattern = localDelegateChainNamePrefix + "%010d"
openAPIV2Converter = "openapiv2converter"
)
// IsLocalAPIService returns true for local specs from delegates.
@ -87,6 +92,17 @@ func BuildAndRegisterAggregator(downloader Downloader, delegationTarget server.D
s.UpdateAPIServiceSpec(apiServiceName)
i++
}
handler, err := handler3.NewOpenAPIService(nil)
if err != nil {
return s, err
}
s.openAPIV2ConverterHandler = handler
openAPIV2ConverterMux := mux.NewPathRecorderMux(openAPIV2Converter)
s.openAPIV2ConverterHandler.RegisterOpenAPIV3VersionedService("/openapi/v3", openAPIV2ConverterMux)
openAPIV2ConverterAPIService := v1.APIService{}
openAPIV2ConverterAPIService.Name = openAPIV2Converter
s.AddUpdateAPIService(openAPIV2ConverterMux, &openAPIV2ConverterAPIService)
s.register(pathHandler)
return s, nil
@ -107,22 +123,51 @@ func (s *specProxier) AddUpdateAPIService(handler http.Handler, apiservice *v1.A
}
}
func getGroupVersionStringFromAPIService(apiService v1.APIService) string {
if apiService.Spec.Group == "" && apiService.Spec.Version == "" {
return ""
}
return "apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version
}
// UpdateAPIServiceSpec updates all the OpenAPI v3 specs that the APIService serves.
// It is thread safe.
func (s *specProxier) UpdateAPIServiceSpec(apiServiceName string) error {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
return s.updateAPIServiceSpecLocked(apiServiceName)
}
func (s *specProxier) updateAPIServiceSpecLocked(apiServiceName string) error {
apiService, exists := s.apiServiceInfo[apiServiceName]
if !exists {
return fmt.Errorf("APIService %s does not exist for update", apiServiceName)
}
gv, err := s.downloader.OpenAPIV3Root(apiService.handler)
if !apiService.isLegacyAPIService {
gv, httpStatus, err := s.downloader.OpenAPIV3Root(apiService.handler)
if err != nil {
return err
}
if httpStatus == http.StatusNotFound {
apiService.isLegacyAPIService = true
} else {
s.apiServiceInfo[apiServiceName].discovery = gv
return nil
}
}
newDownloader := v2aggregator.Downloader{}
v2Spec, etag, httpStatus, err := newDownloader.Download(apiService.handler, apiService.etag)
if err != nil {
return err
}
s.apiServiceInfo[apiServiceName].discovery = gv
apiService.etag = etag
if httpStatus == http.StatusOK {
v3Spec := openapiconv.ConvertV2ToV3(v2Spec)
s.openAPIV2ConverterHandler.UpdateGroupVersion(getGroupVersionStringFromAPIService(apiService.apiService), v3Spec)
s.updateAPIServiceSpecLocked(openAPIV2Converter)
}
return nil
}
@ -135,6 +180,8 @@ type specProxier struct {
// For downloading the OpenAPI v3 specs from apiservices
downloader Downloader
openAPIV2ConverterHandler *handler3.OpenAPIService
}
var _ SpecProxier = &specProxier{}
@ -143,6 +190,12 @@ type openAPIV3APIServiceInfo struct {
apiService v1.APIService
handler http.Handler
discovery *handler3.OpenAPIV3Discovery
// These fields are only used if the /openapi/v3 endpoint is not served by an APIService
// Legacy APIService indicates that an APIService does not support OpenAPI V3, and the OpenAPI V2
// will be downloaded, converted to V3 (lossy), and served by the aggregator
etag string
isLegacyAPIService bool
}
// RemoveAPIServiceSpec removes an api service from the OpenAPI map. If it does not exist, no error is returned.
@ -150,11 +203,13 @@ type openAPIV3APIServiceInfo struct {
func (s *specProxier) RemoveAPIServiceSpec(apiServiceName string) {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
delete(s.apiServiceInfo, apiServiceName)
if apiServiceInfo, ok := s.apiServiceInfo[apiServiceName]; ok {
s.openAPIV2ConverterHandler.DeleteGroupVersion(getGroupVersionStringFromAPIService(apiServiceInfo.apiService))
delete(s.apiServiceInfo, apiServiceName)
}
}
// handleDiscovery is the handler for OpenAPI V3 Discovery
func (s *specProxier) handleDiscovery(w http.ResponseWriter, r *http.Request) {
func (s *specProxier) getOpenAPIV3Root() handler3.OpenAPIV3Discovery {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
@ -171,7 +226,12 @@ func (s *specProxier) handleDiscovery(w http.ResponseWriter, r *http.Request) {
merged.Paths[key] = item
}
}
return merged
}
// handleDiscovery is the handler for OpenAPI V3 Discovery
func (s *specProxier) handleDiscovery(w http.ResponseWriter, r *http.Request) {
merged := s.getOpenAPIV3Root()
j, err := json.Marshal(&merged)
if err != nil {
w.WriteHeader(500)

View File

@ -0,0 +1,169 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregator
import (
"bytes"
"encoding/json"
"net/http"
"testing"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/mux"
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
"k8s.io/kube-openapi/pkg/handler3"
)
type testV3APIService struct {
etag string
data []byte
}
var _ http.Handler = testV3APIService{}
func (h testV3APIService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Create an APIService with a handler for one group/version
if r.URL.Path == "/openapi/v3" {
group := &handler3.OpenAPIV3Discovery{
Paths: map[string]handler3.OpenAPIV3DiscoveryGroupVersion{
"apis/group.example.com/v1": {
ServerRelativeURL: "/openapi/v3/apis/group.example.com/v1?hash=" + h.etag,
},
},
}
j, _ := json.Marshal(group)
w.Write(j)
return
}
if r.URL.Path == "/openapi/v3/apis/group.example.com/v1" {
if len(h.etag) > 0 {
w.Header().Add("Etag", h.etag)
}
ifNoneMatches := r.Header["If-None-Match"]
for _, match := range ifNoneMatches {
if match == h.etag {
w.WriteHeader(http.StatusNotModified)
return
}
}
w.Write(h.data)
}
}
type testV2APIService struct{}
var _ http.Handler = testV2APIService{}
func (h testV2APIService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Create an APIService with a handler for one group/version
if r.URL.Path == "/openapi/v2" {
w.Write([]byte(`{"swagger":"2.0","info":{"title":"Kubernetes","version":"unversioned"}}`))
return
}
w.WriteHeader(404)
}
func TestV2APIService(t *testing.T) {
downloader := Downloader{}
pathHandler := mux.NewPathRecorderMux("aggregator_test")
var serveHandler http.Handler = pathHandler
specProxier, err := BuildAndRegisterAggregator(downloader, genericapiserver.NewEmptyDelegate(), pathHandler)
if err != nil {
t.Error(err)
}
handler := testV2APIService{}
apiService := &v1.APIService{
Spec: v1.APIServiceSpec{
Group: "group.example.com",
Version: "v1",
},
}
apiService.Name = "v1.group.example.com"
specProxier.AddUpdateAPIService(handler, apiService)
specProxier.UpdateAPIServiceSpec("v1.group.example.com")
data := sendReq(t, serveHandler, "/openapi/v3")
groupVersionList := handler3.OpenAPIV3Discovery{}
if err := json.Unmarshal(data, &groupVersionList); err != nil {
t.Fatal(err)
}
// A legacy APIService will not publish OpenAPI V3
// Ensure that we can still aggregate its V2 spec and convert it to V3.
path, ok := groupVersionList.Paths["apis/group.example.com/v1"]
if !ok {
t.Error("Expected group.example.com/v1 to be in group version list")
}
gotSpecJSON := sendReq(t, serveHandler, path.ServerRelativeURL)
expectedV3Bytes := []byte(`{"openapi":"3.0.0","info":{"title":"Kubernetes","version":"unversioned"},"components":{}}`)
if bytes.Compare(gotSpecJSON, expectedV3Bytes) != 0 {
t.Errorf("Spec mismatch, expected %s, got %s", expectedV3Bytes, gotSpecJSON)
}
}
func TestV3APIService(t *testing.T) {
downloader := Downloader{}
pathHandler := mux.NewPathRecorderMux("aggregator_test")
var serveHandler http.Handler = pathHandler
specProxier, err := BuildAndRegisterAggregator(downloader, genericapiserver.NewEmptyDelegate(), pathHandler)
if err != nil {
t.Error(err)
}
specJSON := []byte(`{"openapi":"3.0.0","info":{"title":"Kubernetes","version":"unversioned"}}`)
handler := testV3APIService{
etag: "6E8F849B434D4B98A569B9D7718876E9-356ECAB19D7FBE1336BABB1E70F8F3025050DE218BE78256BE81620681CFC9A268508E542B8B55974E17B2184BBFC8FFFAA577E51BE195D32B3CA2547818ABE4",
data: specJSON,
}
apiService := &v1.APIService{
Spec: v1.APIServiceSpec{
Group: "group.example.com",
Version: "v1",
},
}
apiService.Name = "v1.group.example.com"
specProxier.AddUpdateAPIService(handler, apiService)
specProxier.UpdateAPIServiceSpec("v1.group.example.com")
data := sendReq(t, serveHandler, "/openapi/v3")
groupVersionList := handler3.OpenAPIV3Discovery{}
if err := json.Unmarshal(data, &groupVersionList); err != nil {
t.Fatal(err)
}
path, ok := groupVersionList.Paths["apis/group.example.com/v1"]
if !ok {
t.Error("Expected group.example.com/v1 to be in group version list")
}
gotSpecJSON := sendReq(t, serveHandler, path.ServerRelativeURL)
if bytes.Compare(gotSpecJSON, specJSON) != 0 {
t.Errorf("Spec mismatch, expected %s, got %s", specJSON, gotSpecJSON)
}
}
func sendReq(t *testing.T, handler http.Handler, path string) []byte {
req, err := http.NewRequest("GET", path, nil)
if err != nil {
t.Fatal(err)
}
writer := newInMemoryResponseWriter()
handler.ServeHTTP(writer, req)
return writer.data
}

View File

@ -23,11 +23,16 @@ import (
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/klog/v2"
"k8s.io/kube-openapi/pkg/handler3"
"k8s.io/kube-openapi/pkg/spec3"
)
type NotFoundError struct {
}
func (e *NotFoundError) Error() string {
return ""
}
// Downloader is the OpenAPI downloader type. It will try to download spec from /openapi/v3 and /openap/v3/<group>/<version> endpoints.
type Downloader struct {
}
@ -44,112 +49,29 @@ func (s *Downloader) handlerWithUser(handler http.Handler, info user.Info) http.
})
}
// SpecETag is a OpenAPI v3 spec and etag pair for the endpoint of each OpenAPI group/version
type SpecETag struct {
spec *spec3.OpenAPI
etag string
}
// OpenAPIV3Root downloads the OpenAPI V3 root document from an APIService
func (s *Downloader) OpenAPIV3Root(handler http.Handler) (*handler3.OpenAPIV3Discovery, error) {
func (s *Downloader) OpenAPIV3Root(handler http.Handler) (*handler3.OpenAPIV3Discovery, int, error) {
handler = s.handlerWithUser(handler, &user.DefaultInfo{Name: aggregatorUser})
handler = http.TimeoutHandler(handler, specDownloadTimeout, "request timed out")
req, err := http.NewRequest("GET", "/openapi/v3", nil)
if err != nil {
return nil, err
return nil, 0, err
}
req.Header.Add("Accept", "application/json")
writer := newInMemoryResponseWriter()
handler.ServeHTTP(writer, req)
switch writer.respCode {
case http.StatusNotFound:
// TODO: For APIServices, download the V2 spec and convert to V3
return nil, nil
return nil, writer.respCode, nil
case http.StatusOK:
groups := handler3.OpenAPIV3Discovery{}
if err := json.Unmarshal(writer.data, &groups); err != nil {
return nil, err
return nil, writer.respCode, err
}
return &groups, nil
}
return nil, fmt.Errorf("Error, could not get list of group versions for APIService")
}
// Download downloads OpenAPI v3 for all groups of a given handler
func (s *Downloader) Download(handler http.Handler, etagList map[string]string) (returnSpec map[string]*SpecETag, err error) {
// TODO(jefftree): https://github.com/kubernetes/kubernetes/pull/105945#issuecomment-966455034
// Move to proxy request in the aggregator and let the APIServices serve the OpenAPI directly
handler = s.handlerWithUser(handler, &user.DefaultInfo{Name: aggregatorUser})
handler = http.TimeoutHandler(handler, specDownloadTimeout, "request timed out")
req, err := http.NewRequest("GET", "/openapi/v3", nil)
if err != nil {
return nil, err
}
req.Header.Add("Accept", "application/json")
writer := newInMemoryResponseWriter()
handler.ServeHTTP(writer, req)
switch writer.respCode {
case http.StatusNotFound:
// Gracefully skip 404, assuming the server won't provide any spec
return nil, nil
case http.StatusOK:
groups := handler3.OpenAPIV3Discovery{}
aggregated := make(map[string]*SpecETag)
if err := json.Unmarshal(writer.data, &groups); err != nil {
return nil, err
}
for path, item := range groups.Paths {
reqPath := item.ServerRelativeURL
req, err := http.NewRequest("GET", reqPath, nil)
if err != nil {
return nil, err
}
req.Header.Add("Accept", "application/json")
oldEtag, ok := etagList[path]
if ok {
req.Header.Add("If-None-Match", oldEtag)
}
openAPIWriter := newInMemoryResponseWriter()
handler.ServeHTTP(openAPIWriter, req)
switch openAPIWriter.respCode {
case http.StatusNotFound:
continue
case http.StatusNotModified:
aggregated[path] = &SpecETag{
etag: oldEtag,
}
case http.StatusOK:
var spec spec3.OpenAPI
// TODO|jefftree: For OpenAPI v3 Beta, if the v3 spec is empty then
// we should request the v2 endpoint and convert it to v3
if len(openAPIWriter.data) > 0 {
err = json.Unmarshal(openAPIWriter.data, &spec)
if err != nil {
return nil, err
}
etag := openAPIWriter.Header().Get("Etag")
aggregated[path] = &SpecETag{
spec: &spec,
etag: etag,
}
}
default:
klog.Errorf("Error: unknown status %v", openAPIWriter.respCode)
}
}
return aggregated, nil
default:
return nil, fmt.Errorf("failed to retrieve openAPI spec, http error: %s", writer.String())
return &groups, writer.respCode, nil
}
return nil, writer.respCode, fmt.Errorf("Error, could not get list of group versions for APIService")
}
// inMemoryResponseWriter is a http.Writer that keep the response in memory.

View File

@ -18,7 +18,6 @@ package aggregator
import (
"encoding/json"
"fmt"
"net/http"
"testing"
@ -65,35 +64,10 @@ func (h handlerTest) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
func assertDownloadedSpec(gvSpec map[string]*SpecETag, err error, expectedSpecID string, expectedEtag string) error {
if err != nil {
return fmt.Errorf("downloadOpenAPISpec failed : %s", err)
}
specInfo, ok := gvSpec["apis/group/version"]
if !ok {
if expectedSpecID == "" {
return nil
}
return fmt.Errorf("expected to download spec, no spec downloaded")
}
if specInfo.spec != nil && expectedSpecID == "" {
return fmt.Errorf("expected ID %s, actual ID %s", expectedSpecID, specInfo.spec.Version)
}
if specInfo.spec != nil && specInfo.spec.Version != expectedSpecID {
return fmt.Errorf("expected ID %s, actual ID %s", expectedSpecID, specInfo.spec.Version)
}
if specInfo.etag != expectedEtag {
return fmt.Errorf("expected ETag '%s', actual ETag '%s'", expectedEtag, specInfo.etag)
}
return nil
}
func TestDownloadOpenAPISpec(t *testing.T) {
s := Downloader{}
groups, err := s.OpenAPIV3Root(
groups, _, err := s.OpenAPIV3Root(
handlerTest{data: []byte(""), etag: ""})
assert.NoError(t, err)
if assert.NotNil(t, groups) {
@ -103,18 +77,4 @@ func TestDownloadOpenAPISpec(t *testing.T) {
}
}
// Test with eTag
gvSpec, err := s.Download(
handlerTest{data: []byte("{\"openapi\": \"test\"}"), etag: "etag_test"}, map[string]string{})
assert.NoError(t, assertDownloadedSpec(gvSpec, err, "test", "etag_test"))
// Test not modified
gvSpec, err = s.Download(
handlerTest{data: []byte("{\"openapi\": \"test\"}"), etag: "etag_test"}, map[string]string{"apis/group/version": "etag_test"})
assert.NoError(t, assertDownloadedSpec(gvSpec, err, "", "etag_test"))
// Test different eTags
gvSpec, err = s.Download(
handlerTest{data: []byte("{\"openapi\": \"test\"}"), etag: "etag_test1"}, map[string]string{"apis/group/version": "etag_test2"})
assert.NoError(t, assertDownloadedSpec(gvSpec, err, "test", "etag_test1"))
}