mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
Merge pull request #119870 from Jefftree/agg-discovery-406
Fallback to legacy discovery on a wider range of conditions
This commit is contained in:
commit
42275daaaa
@ -68,6 +68,9 @@ const (
|
||||
acceptDiscoveryFormats = AcceptV2Beta1 + "," + AcceptV1
|
||||
)
|
||||
|
||||
// Aggregated discovery content-type GVK.
|
||||
var v2Beta1GVK = schema.GroupVersionKind{Group: "apidiscovery.k8s.io", Version: "v2beta1", Kind: "APIGroupDiscoveryList"}
|
||||
|
||||
// DiscoveryInterface holds the methods that discover server-supported API groups,
|
||||
// versions and resources.
|
||||
type DiscoveryInterface interface {
|
||||
@ -261,16 +264,15 @@ func (d *DiscoveryClient) downloadLegacy() (
|
||||
}
|
||||
|
||||
var resourcesByGV map[schema.GroupVersion]*metav1.APIResourceList
|
||||
// Switch on content-type server responded with: aggregated or unaggregated.
|
||||
switch {
|
||||
case isV2Beta1ContentType(responseContentType):
|
||||
// Based on the content-type server responded with: aggregated or unaggregated.
|
||||
if isGVK, _ := ContentTypeIsGVK(responseContentType, v2Beta1GVK); isGVK {
|
||||
var aggregatedDiscovery apidiscovery.APIGroupDiscoveryList
|
||||
err = json.Unmarshal(body, &aggregatedDiscovery)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
apiGroupList, resourcesByGV, failedGVs = SplitGroupsAndResources(aggregatedDiscovery)
|
||||
default:
|
||||
} else {
|
||||
// Default is unaggregated discovery v1.
|
||||
var v metav1.APIVersions
|
||||
err = json.Unmarshal(body, &v)
|
||||
@ -314,16 +316,15 @@ func (d *DiscoveryClient) downloadAPIs() (
|
||||
apiGroupList := &metav1.APIGroupList{}
|
||||
failedGVs := map[schema.GroupVersion]error{}
|
||||
var resourcesByGV map[schema.GroupVersion]*metav1.APIResourceList
|
||||
// Switch on content-type server responded with: aggregated or unaggregated.
|
||||
switch {
|
||||
case isV2Beta1ContentType(responseContentType):
|
||||
// Based on the content-type server responded with: aggregated or unaggregated.
|
||||
if isGVK, _ := ContentTypeIsGVK(responseContentType, v2Beta1GVK); isGVK {
|
||||
var aggregatedDiscovery apidiscovery.APIGroupDiscoveryList
|
||||
err = json.Unmarshal(body, &aggregatedDiscovery)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
apiGroupList, resourcesByGV, failedGVs = SplitGroupsAndResources(aggregatedDiscovery)
|
||||
default:
|
||||
} else {
|
||||
// Default is unaggregated discovery v1.
|
||||
err = json.Unmarshal(body, apiGroupList)
|
||||
if err != nil {
|
||||
@ -334,26 +335,29 @@ func (d *DiscoveryClient) downloadAPIs() (
|
||||
return apiGroupList, resourcesByGV, failedGVs, nil
|
||||
}
|
||||
|
||||
// isV2Beta1ContentType checks of the content-type string is both
|
||||
// "application/json" and contains the v2beta1 content-type params.
|
||||
// ContentTypeIsGVK checks of the content-type string is both
|
||||
// "application/json" and matches the provided GVK. An error
|
||||
// is returned if the content type string is malformed.
|
||||
// NOTE: This function is resilient to the ordering of the
|
||||
// content-type parameters, as well as parameters added by
|
||||
// intermediaries such as proxies or gateways. Examples:
|
||||
//
|
||||
// "application/json; g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList" = true
|
||||
// "application/json; as=APIGroupDiscoveryList;v=v2beta1;g=apidiscovery.k8s.io" = true
|
||||
// "application/json; as=APIGroupDiscoveryList;v=v2beta1;g=apidiscovery.k8s.io;charset=utf-8" = true
|
||||
// "application/json" = false
|
||||
// "application/json; charset=UTF-8" = false
|
||||
func isV2Beta1ContentType(contentType string) bool {
|
||||
// ("application/json; g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList", {apidiscovery.k8s.io, v2beta1, APIGroupDiscoveryList}) = (true, nil)
|
||||
// ("application/json; as=APIGroupDiscoveryList;v=v2beta1;g=apidiscovery.k8s.io", {apidiscovery.k8s.io, v2beta1, APIGroupDiscoveryList}) = (true, nil)
|
||||
// ("application/json; as=APIGroupDiscoveryList;v=v2beta1;g=apidiscovery.k8s.io;charset=utf-8", {apidiscovery.k8s.io, v2beta1, APIGroupDiscoveryList}) = (true, nil)
|
||||
// ("application/json", any GVK) = (false, nil)
|
||||
// ("application/json; charset=UTF-8", any GVK) = (false, nil)
|
||||
// ("malformed content type string", any GVK) = (false, error)
|
||||
func ContentTypeIsGVK(contentType string, gvk schema.GroupVersionKind) (bool, error) {
|
||||
base, params, err := mime.ParseMediaType(contentType)
|
||||
if err != nil {
|
||||
return false
|
||||
return false, err
|
||||
}
|
||||
return runtime.ContentTypeJSON == base &&
|
||||
params["g"] == "apidiscovery.k8s.io" &&
|
||||
params["v"] == "v2beta1" &&
|
||||
params["as"] == "APIGroupDiscoveryList"
|
||||
gvkMatch := runtime.ContentTypeJSON == base &&
|
||||
params["g"] == gvk.Group &&
|
||||
params["v"] == gvk.Version &&
|
||||
params["as"] == gvk.Kind
|
||||
return gvkMatch, nil
|
||||
}
|
||||
|
||||
// ServerGroups returns the supported groups, with information like supported versions and the
|
||||
|
@ -2762,54 +2762,76 @@ func TestAggregatedServerPreferredResources(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDiscoveryContentTypeVersion(t *testing.T) {
|
||||
v2beta1 := schema.GroupVersionKind{Group: "apidiscovery.k8s.io", Version: "v2beta1", Kind: "APIGroupDiscoveryList"}
|
||||
tests := []struct {
|
||||
contentType string
|
||||
isV2Beta1 bool
|
||||
gvk schema.GroupVersionKind
|
||||
match bool
|
||||
expectErr bool
|
||||
}{
|
||||
{
|
||||
contentType: "application/json; g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList",
|
||||
isV2Beta1: true,
|
||||
gvk: v2beta1,
|
||||
match: true,
|
||||
expectErr: false,
|
||||
},
|
||||
{
|
||||
// content-type parameters are not in correct order, but comparison ignores order.
|
||||
contentType: "application/json; v=v2beta1;as=APIGroupDiscoveryList;g=apidiscovery.k8s.io",
|
||||
isV2Beta1: true,
|
||||
gvk: v2beta1,
|
||||
match: true,
|
||||
expectErr: false,
|
||||
},
|
||||
{
|
||||
// content-type parameters are not in correct order, but comparison ignores order.
|
||||
contentType: "application/json; as=APIGroupDiscoveryList;g=apidiscovery.k8s.io;v=v2beta1",
|
||||
isV2Beta1: true,
|
||||
gvk: v2beta1,
|
||||
match: true,
|
||||
expectErr: false,
|
||||
},
|
||||
{
|
||||
// Ignores extra parameter "charset=utf-8"
|
||||
contentType: "application/json; g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList;charset=utf-8",
|
||||
isV2Beta1: true,
|
||||
gvk: v2beta1,
|
||||
match: true,
|
||||
expectErr: false,
|
||||
},
|
||||
{
|
||||
contentType: "application/json",
|
||||
isV2Beta1: false,
|
||||
gvk: v2beta1,
|
||||
match: false,
|
||||
expectErr: false,
|
||||
},
|
||||
{
|
||||
contentType: "application/json; charset=UTF-8",
|
||||
isV2Beta1: false,
|
||||
gvk: v2beta1,
|
||||
match: false,
|
||||
expectErr: false,
|
||||
},
|
||||
{
|
||||
contentType: "text/json",
|
||||
isV2Beta1: false,
|
||||
gvk: v2beta1,
|
||||
match: false,
|
||||
expectErr: false,
|
||||
},
|
||||
{
|
||||
contentType: "text/html",
|
||||
isV2Beta1: false,
|
||||
gvk: v2beta1,
|
||||
match: false,
|
||||
expectErr: false,
|
||||
},
|
||||
{
|
||||
contentType: "",
|
||||
isV2Beta1: false,
|
||||
gvk: v2beta1,
|
||||
match: false,
|
||||
expectErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
isV2Beta1 := isV2Beta1ContentType(test.contentType)
|
||||
assert.Equal(t, test.isV2Beta1, isV2Beta1)
|
||||
match, err := ContentTypeIsGVK(test.contentType, test.gvk)
|
||||
assert.Equal(t, test.expectErr, err != nil)
|
||||
assert.Equal(t, test.match, match)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -26,12 +26,14 @@ import (
|
||||
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/authentication/user"
|
||||
"k8s.io/apiserver/pkg/endpoints"
|
||||
discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
scheme "k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/discovery"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
|
||||
@ -44,6 +46,13 @@ var APIRegistrationGroupVersion metav1.GroupVersion = metav1.GroupVersion{Group:
|
||||
// first (mirrors v1 discovery behavior)
|
||||
var APIRegistrationGroupPriority int = 20001
|
||||
|
||||
// Aggregated discovery content-type GVK.
|
||||
var v2Beta1GVK = schema.GroupVersionKind{
|
||||
Group: "apidiscovery.k8s.io",
|
||||
Version: "v2beta1",
|
||||
Kind: "APIGroupDiscoveryList",
|
||||
}
|
||||
|
||||
// Given a list of APIServices and proxyHandlers for contacting them,
|
||||
// DiscoveryManager caches a list of discovery documents for each server
|
||||
|
||||
@ -204,7 +213,7 @@ func (dm *discoveryManager) fetchFreshDiscoveryForService(gv metav1.GroupVersion
|
||||
Path: req.URL.Path,
|
||||
IsResourceRequest: false,
|
||||
}))
|
||||
req.Header.Add("Accept", runtime.ContentTypeJSON+";g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList")
|
||||
req.Header.Add("Accept", discovery.AcceptV2Beta1)
|
||||
|
||||
if exists && len(cached.etag) > 0 {
|
||||
req.Header.Add("If-None-Match", cached.etag)
|
||||
@ -217,8 +226,9 @@ func (dm *discoveryManager) fetchFreshDiscoveryForService(gv metav1.GroupVersion
|
||||
writer := newInMemoryResponseWriter()
|
||||
handler.ServeHTTP(writer, req)
|
||||
|
||||
switch writer.respCode {
|
||||
case http.StatusNotModified:
|
||||
isV2Beta1GVK, _ := discovery.ContentTypeIsGVK(writer.Header().Get("Content-Type"), v2Beta1GVK)
|
||||
switch {
|
||||
case writer.respCode == http.StatusNotModified:
|
||||
// Keep old entry, update timestamp
|
||||
cached = cachedResult{
|
||||
discovery: cached.discovery,
|
||||
@ -228,8 +238,47 @@ func (dm *discoveryManager) fetchFreshDiscoveryForService(gv metav1.GroupVersion
|
||||
|
||||
dm.setCacheEntryForService(info.service, cached)
|
||||
return &cached, nil
|
||||
case http.StatusNotAcceptable:
|
||||
// Discovery Document is not being served at all.
|
||||
case writer.respCode == http.StatusServiceUnavailable:
|
||||
return nil, fmt.Errorf("service %s returned non-success response code: %v",
|
||||
info.service.String(), writer.respCode)
|
||||
case writer.respCode == http.StatusOK && isV2Beta1GVK:
|
||||
parsed := &apidiscoveryv2beta1.APIGroupDiscoveryList{}
|
||||
if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), writer.data, parsed); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
klog.V(3).Infof("DiscoveryManager: Successfully downloaded discovery for %s", info.service.String())
|
||||
|
||||
// Convert discovery info into a map for convenient lookup later
|
||||
discoMap := map[metav1.GroupVersion]apidiscoveryv2beta1.APIVersionDiscovery{}
|
||||
for _, g := range parsed.Items {
|
||||
for _, v := range g.Versions {
|
||||
discoMap[metav1.GroupVersion{Group: g.Name, Version: v.Version}] = v
|
||||
for i := range v.Resources {
|
||||
// avoid nil panics in v0.26.0-v0.26.3 client-go clients
|
||||
// see https://github.com/kubernetes/kubernetes/issues/118361
|
||||
if v.Resources[i].ResponseKind == nil {
|
||||
v.Resources[i].ResponseKind = &metav1.GroupVersionKind{}
|
||||
}
|
||||
for j := range v.Resources[i].Subresources {
|
||||
if v.Resources[i].Subresources[j].ResponseKind == nil {
|
||||
v.Resources[i].Subresources[j].ResponseKind = &metav1.GroupVersionKind{}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Save cached result
|
||||
cached = cachedResult{
|
||||
discovery: discoMap,
|
||||
etag: writer.Header().Get("Etag"),
|
||||
lastUpdated: now,
|
||||
}
|
||||
dm.setCacheEntryForService(info.service, cached)
|
||||
return &cached, nil
|
||||
default:
|
||||
// Could not get acceptable response for Aggregated Discovery.
|
||||
// Fall back to legacy discovery information
|
||||
if len(gv.Version) == 0 {
|
||||
return nil, errors.New("not found")
|
||||
@ -265,7 +314,7 @@ func (dm *discoveryManager) fetchFreshDiscoveryForService(gv metav1.GroupVersion
|
||||
handler.ServeHTTP(writer, req)
|
||||
|
||||
if writer.respCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("failed to download discovery for %s: %v", path, writer.String())
|
||||
return nil, fmt.Errorf("failed to download legacy discovery for %s: %v", path, writer.String())
|
||||
}
|
||||
|
||||
parsed := &metav1.APIResourceList{}
|
||||
@ -278,6 +327,7 @@ func (dm *discoveryManager) fetchFreshDiscoveryForService(gv metav1.GroupVersion
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
klog.V(3).Infof("DiscoveryManager: Successfully downloaded legacy discovery for %s", info.service.String())
|
||||
|
||||
discoMap := map[metav1.GroupVersion]apidiscoveryv2beta1.APIVersionDiscovery{
|
||||
// Convert old-style APIGroupList to new information
|
||||
@ -296,48 +346,6 @@ func (dm *discoveryManager) fetchFreshDiscoveryForService(gv metav1.GroupVersion
|
||||
// one group version and an API Service may serve multiple
|
||||
// group versions.
|
||||
return &cached, nil
|
||||
|
||||
case http.StatusOK:
|
||||
parsed := &apidiscoveryv2beta1.APIGroupDiscoveryList{}
|
||||
if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), writer.data, parsed); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
klog.V(3).Infof("DiscoveryManager: Successfully downloaded discovery for %s", info.service.String())
|
||||
|
||||
// Convert discovery info into a map for convenient lookup later
|
||||
discoMap := map[metav1.GroupVersion]apidiscoveryv2beta1.APIVersionDiscovery{}
|
||||
for _, g := range parsed.Items {
|
||||
for _, v := range g.Versions {
|
||||
discoMap[metav1.GroupVersion{Group: g.Name, Version: v.Version}] = v
|
||||
for i := range v.Resources {
|
||||
// avoid nil panics in v0.26.0-v0.26.3 client-go clients
|
||||
// see https://github.com/kubernetes/kubernetes/issues/118361
|
||||
if v.Resources[i].ResponseKind == nil {
|
||||
v.Resources[i].ResponseKind = &metav1.GroupVersionKind{}
|
||||
}
|
||||
for j := range v.Resources[i].Subresources {
|
||||
if v.Resources[i].Subresources[j].ResponseKind == nil {
|
||||
v.Resources[i].Subresources[j].ResponseKind = &metav1.GroupVersionKind{}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Save cached result
|
||||
cached = cachedResult{
|
||||
discovery: discoMap,
|
||||
etag: writer.Header().Get("Etag"),
|
||||
lastUpdated: now,
|
||||
}
|
||||
dm.setCacheEntryForService(info.service, cached)
|
||||
return &cached, nil
|
||||
|
||||
default:
|
||||
klog.Infof("DiscoveryManager: Failed to download discovery for %v: %v %s",
|
||||
info.service.String(), writer.respCode, writer.data)
|
||||
return nil, fmt.Errorf("service %s returned non-success response code: %v",
|
||||
info.service.String(), writer.respCode)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -596,9 +596,8 @@ func TestLegacyFallbackNoCache(t *testing.T) {
|
||||
require.Equal(t, doc.Items, expectAggregatedDiscovery)
|
||||
}
|
||||
|
||||
func TestLegacyFallback(t *testing.T) {
|
||||
func testLegacyFallbackWithCustomRootHandler(t *testing.T, rootHandlerFn func(http.ResponseWriter, *http.Request)) {
|
||||
aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
|
||||
rootAPIsHandler := discovery.NewRootAPIsHandler(discovery.DefaultAddresses{DefaultAddress: "192.168.1.1"}, scheme.Codecs)
|
||||
|
||||
legacyGroupHandler := discovery.NewAPIGroupHandler(scheme.Codecs, metav1.APIGroup{
|
||||
Name: "stable.example.com",
|
||||
@ -657,7 +656,7 @@ func TestLegacyFallback(t *testing.T) {
|
||||
// defer to legacy discovery
|
||||
legacyResourceHandler.ServeHTTP(w, r)
|
||||
} else if r.URL.Path == "/apis" {
|
||||
rootAPIsHandler.ServeHTTP(w, r)
|
||||
rootHandlerFn(w, r)
|
||||
} else {
|
||||
// Unknown url
|
||||
t.Fatalf("unexpected request sent to %v", r.URL.Path)
|
||||
@ -690,6 +689,82 @@ func TestLegacyFallback(t *testing.T) {
|
||||
},
|
||||
}, doc.Items)
|
||||
}
|
||||
func TestLegacyFallback(t *testing.T) {
|
||||
rootAPIsHandler := discovery.NewRootAPIsHandler(discovery.DefaultAddresses{DefaultAddress: "192.168.1.1"}, scheme.Codecs)
|
||||
testCases := []struct {
|
||||
name string
|
||||
rootHandler func(http.ResponseWriter, *http.Request)
|
||||
}{
|
||||
{
|
||||
name: "Default root handler (406)",
|
||||
rootHandler: rootAPIsHandler.ServeHTTP,
|
||||
},
|
||||
{
|
||||
name: "Root handler with non 200 status code",
|
||||
rootHandler: func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(404)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Root handler with 200 response code no content type",
|
||||
rootHandler: func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(200)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Root handler with 200 response code incorrect content type",
|
||||
rootHandler: func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json;g=apidiscovery.k8s.io;v=v1alpha1;as=APIGroupDiscoveryList")
|
||||
w.WriteHeader(200)
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
testLegacyFallbackWithCustomRootHandler(t, tc.rootHandler)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAPIServiceStale(t *testing.T) {
|
||||
aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
|
||||
aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
|
||||
aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "v1.stable.example.com",
|
||||
},
|
||||
Spec: apiregistrationv1.APIServiceSpec{
|
||||
Group: "stable.example.com",
|
||||
Version: "v1",
|
||||
Service: &apiregistrationv1.ServiceReference{
|
||||
Name: "test-service",
|
||||
},
|
||||
},
|
||||
}, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(503)
|
||||
}))
|
||||
testCtx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
go aggregatedManager.Run(testCtx.Done())
|
||||
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
|
||||
|
||||
// At this point external services have synced. Check if discovery document
|
||||
// lists the APIService group version as Stale.
|
||||
_, _, doc := fetchPath(aggregatedResourceManager, "")
|
||||
require.Equal(t, []apidiscoveryv2beta1.APIGroupDiscovery{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "stable.example.com",
|
||||
},
|
||||
Versions: []apidiscoveryv2beta1.APIVersionDiscovery{
|
||||
{
|
||||
Version: "v1",
|
||||
Freshness: apidiscoveryv2beta1.DiscoveryFreshnessStale,
|
||||
},
|
||||
},
|
||||
},
|
||||
}, doc.Items)
|
||||
}
|
||||
|
||||
// Exercises the 304 Not Modified Path of the aggregator
|
||||
// This path in 1.26.0 would result in a deadlock if an aggregated APIService
|
||||
|
Loading…
Reference in New Issue
Block a user