Merge pull request #108637 from Jefftree/openapi-proxy

Change aggregator to proxier for OpenAPI v3
This commit is contained in:
Kubernetes Prow Robot 2022-03-23 13:22:11 -07:00 committed by GitHub
commit 5d26b111ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 124 additions and 85 deletions

View File

@ -17,8 +17,11 @@ limitations under the License.
package aggregator package aggregator
import ( import (
"bytes"
"encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"sort"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -26,15 +29,10 @@ import (
"k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server"
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
"k8s.io/kube-openapi/pkg/common" "k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/handler3"
"k8s.io/kube-openapi/pkg/spec3"
) )
// SpecAggregator calls out to http handlers of APIServices and caches specs. It keeps state of the last // SpecProxier proxies OpenAPI V3 requests to their respective APIService
// known specs including the http etag. type SpecProxier interface {
// TODO(jefftree): remove the downloading and caching and proxy directly to the APIServices. This is possible because we
// don't have to merge here, which is cpu intensive in v2
type SpecAggregator interface {
AddUpdateAPIService(handler http.Handler, apiService *v1.APIService) AddUpdateAPIService(handler http.Handler, apiService *v1.APIService)
UpdateAPIServiceSpec(apiServiceName string) error UpdateAPIServiceSpec(apiServiceName string) error
RemoveAPIServiceSpec(apiServiceName string) RemoveAPIServiceSpec(apiServiceName string)
@ -53,37 +51,27 @@ func IsLocalAPIService(apiServiceName string) bool {
return strings.HasPrefix(apiServiceName, localDelegateChainNamePrefix) return strings.HasPrefix(apiServiceName, localDelegateChainNamePrefix)
} }
// GetAPIServicesName returns the names of APIServices recorded in openAPIV3Specs. // GetAPIServicesName returns the names of APIServices recorded in apiServiceInfo.
// We use this function to pass the names of local APIServices to the controller in this package, // We use this function to pass the names of local APIServices to the controller in this package,
// so that the controller can periodically sync the OpenAPI spec from delegation API servers. // so that the controller can periodically sync the OpenAPI spec from delegation API servers.
func (s *specAggregator) GetAPIServiceNames() []string { func (s *specProxier) GetAPIServiceNames() []string {
s.rwMutex.Lock() s.rwMutex.RLock()
defer s.rwMutex.Unlock() defer s.rwMutex.RUnlock()
names := make([]string, len(s.openAPIV3Specs)) names := make([]string, len(s.apiServiceInfo))
for key := range s.openAPIV3Specs { for key := range s.apiServiceInfo {
names = append(names, key) names = append(names, key)
} }
return names return names
} }
// BuildAndRegisterAggregator registered OpenAPI aggregator handler. This function is not thread safe as it only being called on startup. // BuildAndRegisterAggregator registered OpenAPI aggregator handler. This function is not thread safe as it only being called on startup.
func BuildAndRegisterAggregator(downloader Downloader, delegationTarget server.DelegationTarget, pathHandler common.PathHandlerByGroupVersion) (SpecAggregator, error) { func BuildAndRegisterAggregator(downloader Downloader, delegationTarget server.DelegationTarget, pathHandler common.PathHandlerByGroupVersion) (SpecProxier, error) {
var err error s := &specProxier{
s := &specAggregator{ apiServiceInfo: map[string]*openAPIV3APIServiceInfo{},
openAPIV3Specs: map[string]*openAPIV3APIServiceInfo{},
downloader: downloader, downloader: downloader,
} }
s.openAPIV3VersionedService, err = handler3.NewOpenAPIService(nil)
if err != nil {
return nil, err
}
err = s.openAPIV3VersionedService.RegisterOpenAPIV3VersionedService("/openapi/v3", pathHandler)
if err != nil {
return nil, err
}
i := 1 i := 1
for delegate := delegationTarget; delegate != nil; delegate = delegate.NextDelegate() { for delegate := delegationTarget; delegate != nil; delegate = delegate.NextDelegate() {
handler := delegate.UnprotectedHandler() handler := delegate.UnprotectedHandler()
@ -98,109 +86,126 @@ func BuildAndRegisterAggregator(downloader Downloader, delegationTarget server.D
s.UpdateAPIServiceSpec(apiServiceName) s.UpdateAPIServiceSpec(apiServiceName)
i++ i++
} }
s.register(pathHandler)
return s, nil return s, nil
} }
// AddUpdateAPIService adds or updates the api service. It is thread safe. // AddUpdateAPIService adds or updates the api service. It is thread safe.
func (s *specAggregator) AddUpdateAPIService(handler http.Handler, apiservice *v1.APIService) { func (s *specProxier) AddUpdateAPIService(handler http.Handler, apiservice *v1.APIService) {
s.rwMutex.Lock() s.rwMutex.Lock()
defer s.rwMutex.Unlock() defer s.rwMutex.Unlock()
// If the APIService is being updated, use the existing struct. // If the APIService is being updated, use the existing struct.
if apiServiceInfo, ok := s.openAPIV3Specs[apiservice.Name]; ok { if apiServiceInfo, ok := s.apiServiceInfo[apiservice.Name]; ok {
apiServiceInfo.apiService = *apiservice apiServiceInfo.apiService = *apiservice
apiServiceInfo.handler = handler apiServiceInfo.handler = handler
} }
s.openAPIV3Specs[apiservice.Name] = &openAPIV3APIServiceInfo{ s.apiServiceInfo[apiservice.Name] = &openAPIV3APIServiceInfo{
apiService: *apiservice, apiService: *apiservice,
handler: handler, handler: handler,
specs: make(map[string]*openAPIV3SpecInfo),
} }
} }
// UpdateAPIServiceSpec updates all the OpenAPI v3 specs that the APIService serves. // UpdateAPIServiceSpec updates all the OpenAPI v3 specs that the APIService serves.
// It is thread safe. // It is thread safe.
func (s *specAggregator) UpdateAPIServiceSpec(apiServiceName string) error { func (s *specProxier) UpdateAPIServiceSpec(apiServiceName string) error {
s.rwMutex.Lock() s.rwMutex.Lock()
defer s.rwMutex.Unlock() defer s.rwMutex.Unlock()
apiService, exists := s.openAPIV3Specs[apiServiceName] apiService, exists := s.apiServiceInfo[apiServiceName]
if !exists { if !exists {
return fmt.Errorf("APIService %s does not exist for update", apiServiceName) return fmt.Errorf("APIService %s does not exist for update", apiServiceName)
} }
// Pass a list of old etags to the Downloader to prevent transfers if etags match gv, err := s.downloader.OpenAPIV3Root(apiService.handler)
etagList := make(map[string]string)
for gv, specInfo := range apiService.specs {
etagList[gv] = specInfo.etag
}
groups, err := s.downloader.Download(apiService.handler, etagList)
if err != nil { if err != nil {
return err return err
} }
s.apiServiceInfo[apiServiceName].gvList = gv
// Remove any groups that do not exist anymore
for group := range s.openAPIV3Specs[apiServiceName].specs {
if _, exists := groups[group]; !exists {
s.openAPIV3VersionedService.DeleteGroupVersion(group)
delete(s.openAPIV3Specs[apiServiceName].specs, group)
}
}
for group, info := range groups {
if info.spec == nil {
continue
}
// If ETag has not changed, no update is necessary
oldInfo, exists := s.openAPIV3Specs[apiServiceName].specs[group]
if exists && oldInfo.etag == info.etag {
continue
}
s.openAPIV3Specs[apiServiceName].specs[group] = &openAPIV3SpecInfo{
spec: info.spec,
etag: info.etag,
}
s.openAPIV3VersionedService.UpdateGroupVersion(group, info.spec)
}
return nil return nil
} }
type specAggregator struct { type specProxier struct {
// mutex protects all members of this struct. // mutex protects all members of this struct.
rwMutex sync.RWMutex rwMutex sync.RWMutex
// OpenAPI V3 specs by APIService name // OpenAPI V3 specs by APIService name
openAPIV3Specs map[string]*openAPIV3APIServiceInfo apiServiceInfo map[string]*openAPIV3APIServiceInfo
// provided for dynamic OpenAPI spec
openAPIV3VersionedService *handler3.OpenAPIService
// For downloading the OpenAPI v3 specs from apiservices // For downloading the OpenAPI v3 specs from apiservices
downloader Downloader downloader Downloader
} }
var _ SpecAggregator = &specAggregator{} var _ SpecProxier = &specProxier{}
type openAPIV3APIServiceInfo struct { type openAPIV3APIServiceInfo struct {
apiService v1.APIService apiService v1.APIService
handler http.Handler handler http.Handler
specs map[string]*openAPIV3SpecInfo gvList []string
}
type openAPIV3SpecInfo struct {
spec *spec3.OpenAPI
etag string
} }
// RemoveAPIServiceSpec removes an api service from the OpenAPI map. If it does not exist, no error is returned. // RemoveAPIServiceSpec removes an api service from the OpenAPI map. If it does not exist, no error is returned.
// It is thread safe. // It is thread safe.
func (s *specAggregator) RemoveAPIServiceSpec(apiServiceName string) { func (s *specProxier) RemoveAPIServiceSpec(apiServiceName string) {
s.rwMutex.Lock() s.rwMutex.Lock()
defer s.rwMutex.Unlock() defer s.rwMutex.Unlock()
if apiServiceInfo, ok := s.openAPIV3Specs[apiServiceName]; ok { if _, ok := s.apiServiceInfo[apiServiceName]; ok {
for gv := range apiServiceInfo.specs { delete(s.apiServiceInfo, apiServiceName)
s.openAPIV3VersionedService.DeleteGroupVersion(gv)
}
delete(s.openAPIV3Specs, apiServiceName)
} }
} }
// handleDiscovery is the handler for OpenAPI V3 Discovery
func (s *specProxier) handleDiscovery(w http.ResponseWriter, r *http.Request) {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
gvList := make(map[string]bool)
for _, apiServiceInfo := range s.apiServiceInfo {
for _, gv := range apiServiceInfo.gvList {
gvList[gv] = true
}
}
keys := make([]string, 0, len(gvList))
for k := range gvList {
keys = append(keys, k)
}
sort.Strings(keys)
output := map[string][]string{"Paths": keys}
j, err := json.Marshal(output)
if err != nil {
return
}
http.ServeContent(w, r, "/openapi/v3", time.Now(), bytes.NewReader(j))
}
// handleGroupVersion is the OpenAPI V3 handler for a specified group/version
func (s *specProxier) handleGroupVersion(w http.ResponseWriter, r *http.Request) {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
// TODO: Import this logic from kube-openapi instead of duplicating
// URLs for OpenAPI V3 have the format /openapi/v3/<groupversionpath>
// SplitAfterN with 4 yields ["", "openapi", "v3", <groupversionpath>]
url := strings.SplitAfterN(r.URL.Path, "/", 4)
targetGV := url[3]
for _, apiServiceInfo := range s.apiServiceInfo {
for _, gv := range apiServiceInfo.gvList {
if targetGV == gv {
apiServiceInfo.handler.ServeHTTP(w, r)
return
}
}
}
// No group-versions match the desired request
w.WriteHeader(404)
}
// Register registers the OpenAPI V3 Discovery and GroupVersion handlers
func (s *specProxier) register(handler common.PathHandlerByGroupVersion) {
handler.Handle("/openapi/v3", http.HandlerFunc(s.handleDiscovery))
handler.HandlePrefix("/openapi/v3/", http.HandlerFunc(s.handleGroupVersion))
}

View File

@ -54,6 +54,34 @@ type SpecETag struct {
etag string etag string
} }
// OpenAPIV3Root downloads the OpenAPI V3 root document from an APIService
func (s *Downloader) OpenAPIV3Root(handler http.Handler) ([]string, error) {
handler = s.handlerWithUser(handler, &user.DefaultInfo{Name: aggregatorUser})
handler = http.TimeoutHandler(handler, specDownloadTimeout, "request timed out")
req, err := http.NewRequest("GET", "/openapi/v3", nil)
if err != nil {
return nil, err
}
req.Header.Add("Accept", "application/json")
writer := newInMemoryResponseWriter()
handler.ServeHTTP(writer, req)
switch writer.respCode {
case http.StatusNotFound:
// TODO: For APIServices, download the V2 spec and convert to V3
return nil, nil
case http.StatusOK:
groups := gvList{}
if err := json.Unmarshal(writer.data, &groups); err != nil {
return nil, err
}
return groups.Paths, nil
}
return nil, fmt.Errorf("Error, could not get list of group versions for APIService")
}
// Download downloads OpenAPI v3 for all groups of a given handler // Download downloads OpenAPI v3 for all groups of a given handler
func (s *Downloader) Download(handler http.Handler, etagList map[string]string) (returnSpec map[string]*SpecETag, err error) { func (s *Downloader) Download(handler http.Handler, etagList map[string]string) (returnSpec map[string]*SpecETag, err error) {
// TODO(jefftree): https://github.com/kubernetes/kubernetes/pull/105945#issuecomment-966455034 // TODO(jefftree): https://github.com/kubernetes/kubernetes/pull/105945#issuecomment-966455034

View File

@ -32,10 +32,12 @@ type handlerTest struct {
var _ http.Handler = handlerTest{} var _ http.Handler = handlerTest{}
var groupList = []string{"apis/group/version"}
func (h handlerTest) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h handlerTest) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Create an APIService with a handler for one group/version // Create an APIService with a handler for one group/version
group := make(map[string][]string) group := make(map[string][]string)
group["Paths"] = []string{"apis/group/version"} group["Paths"] = groupList
j, _ := json.Marshal(group) j, _ := json.Marshal(group)
if r.URL.Path == "/openapi/v3" { if r.URL.Path == "/openapi/v3" {
w.Write(j) w.Write(j)
@ -85,6 +87,11 @@ func assertDownloadedSpec(gvSpec map[string]*SpecETag, err error, expectedSpecID
func TestDownloadOpenAPISpec(t *testing.T) { func TestDownloadOpenAPISpec(t *testing.T) {
s := Downloader{} s := Downloader{}
groups, err := s.OpenAPIV3Root(
handlerTest{data: []byte(""), etag: ""})
assert.NoError(t, err)
assert.Equal(t, groups, groupList)
// Test with eTag // Test with eTag
gvSpec, err := s.Download( gvSpec, err := s.Download(
handlerTest{data: []byte("{\"openapi\": \"test\"}"), etag: "etag_test"}, map[string]string{}) handlerTest{data: []byte("{\"openapi\": \"test\"}"), etag: "etag_test"}, map[string]string{})

View File

@ -43,10 +43,9 @@ const (
syncNothing syncNothing
) )
// AggregationController periodically check for changes in OpenAPI specs of APIServices and update/remove // AggregationController periodically checks the list of group-versions handled by each APIService and updates the discovery page periodically
// them if necessary.
type AggregationController struct { type AggregationController struct {
openAPIAggregationManager aggregator.SpecAggregator openAPIAggregationManager aggregator.SpecProxier
queue workqueue.RateLimitingInterface queue workqueue.RateLimitingInterface
// To allow injection for testing. // To allow injection for testing.
@ -54,7 +53,7 @@ type AggregationController struct {
} }
// NewAggregationController creates new OpenAPI aggregation controller. // NewAggregationController creates new OpenAPI aggregation controller.
func NewAggregationController(openAPIAggregationManager aggregator.SpecAggregator) *AggregationController { func NewAggregationController(openAPIAggregationManager aggregator.SpecProxier) *AggregationController {
c := &AggregationController{ c := &AggregationController{
openAPIAggregationManager: openAPIAggregationManager, openAPIAggregationManager: openAPIAggregationManager,
queue: workqueue.NewNamedRateLimitingQueue( queue: workqueue.NewNamedRateLimitingQueue(