From 57b27fd3cd11cb5f2515c7ac5f67f612998fb368 Mon Sep 17 00:00:00 2001 From: Jefftree Date: Wed, 9 Aug 2023 18:22:30 +0000 Subject: [PATCH] Fallback to legacy discovery on a wider range of conditions in aggregator --- .../client-go/discovery/discovery_client.go | 46 ++++---- .../discovery/discovery_client_test.go | 46 ++++++-- .../pkg/apiserver/handler_discovery.go | 106 ++++++++++-------- .../pkg/apiserver/handler_discovery_test.go | 81 ++++++++++++- 4 files changed, 194 insertions(+), 85 deletions(-) diff --git a/staging/src/k8s.io/client-go/discovery/discovery_client.go b/staging/src/k8s.io/client-go/discovery/discovery_client.go index 77bf64b5b8d..18e84cfee0f 100644 --- a/staging/src/k8s.io/client-go/discovery/discovery_client.go +++ b/staging/src/k8s.io/client-go/discovery/discovery_client.go @@ -68,6 +68,9 @@ const ( acceptDiscoveryFormats = AcceptV2Beta1 + "," + AcceptV1 ) +// Aggregated discovery content-type GVK. +var v2Beta1GVK = schema.GroupVersionKind{Group: "apidiscovery.k8s.io", Version: "v2beta1", Kind: "APIGroupDiscoveryList"} + // DiscoveryInterface holds the methods that discover server-supported API groups, // versions and resources. type DiscoveryInterface interface { @@ -261,16 +264,15 @@ func (d *DiscoveryClient) downloadLegacy() ( } var resourcesByGV map[schema.GroupVersion]*metav1.APIResourceList - // Switch on content-type server responded with: aggregated or unaggregated. - switch { - case isV2Beta1ContentType(responseContentType): + // Based on the content-type server responded with: aggregated or unaggregated. + if isGVK, _ := ContentTypeIsGVK(responseContentType, v2Beta1GVK); isGVK { var aggregatedDiscovery apidiscovery.APIGroupDiscoveryList err = json.Unmarshal(body, &aggregatedDiscovery) if err != nil { return nil, nil, nil, err } apiGroupList, resourcesByGV, failedGVs = SplitGroupsAndResources(aggregatedDiscovery) - default: + } else { // Default is unaggregated discovery v1. var v metav1.APIVersions err = json.Unmarshal(body, &v) @@ -314,16 +316,15 @@ func (d *DiscoveryClient) downloadAPIs() ( apiGroupList := &metav1.APIGroupList{} failedGVs := map[schema.GroupVersion]error{} var resourcesByGV map[schema.GroupVersion]*metav1.APIResourceList - // Switch on content-type server responded with: aggregated or unaggregated. - switch { - case isV2Beta1ContentType(responseContentType): + // Based on the content-type server responded with: aggregated or unaggregated. + if isGVK, _ := ContentTypeIsGVK(responseContentType, v2Beta1GVK); isGVK { var aggregatedDiscovery apidiscovery.APIGroupDiscoveryList err = json.Unmarshal(body, &aggregatedDiscovery) if err != nil { return nil, nil, nil, err } apiGroupList, resourcesByGV, failedGVs = SplitGroupsAndResources(aggregatedDiscovery) - default: + } else { // Default is unaggregated discovery v1. err = json.Unmarshal(body, apiGroupList) if err != nil { @@ -334,26 +335,29 @@ func (d *DiscoveryClient) downloadAPIs() ( return apiGroupList, resourcesByGV, failedGVs, nil } -// isV2Beta1ContentType checks of the content-type string is both -// "application/json" and contains the v2beta1 content-type params. +// ContentTypeIsGVK checks of the content-type string is both +// "application/json" and matches the provided GVK. An error +// is returned if the content type string is malformed. // NOTE: This function is resilient to the ordering of the // content-type parameters, as well as parameters added by // intermediaries such as proxies or gateways. Examples: // -// "application/json; g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList" = true -// "application/json; as=APIGroupDiscoveryList;v=v2beta1;g=apidiscovery.k8s.io" = true -// "application/json; as=APIGroupDiscoveryList;v=v2beta1;g=apidiscovery.k8s.io;charset=utf-8" = true -// "application/json" = false -// "application/json; charset=UTF-8" = false -func isV2Beta1ContentType(contentType string) bool { +// ("application/json; g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList", {apidiscovery.k8s.io, v2beta1, APIGroupDiscoveryList}) = (true, nil) +// ("application/json; as=APIGroupDiscoveryList;v=v2beta1;g=apidiscovery.k8s.io", {apidiscovery.k8s.io, v2beta1, APIGroupDiscoveryList}) = (true, nil) +// ("application/json; as=APIGroupDiscoveryList;v=v2beta1;g=apidiscovery.k8s.io;charset=utf-8", {apidiscovery.k8s.io, v2beta1, APIGroupDiscoveryList}) = (true, nil) +// ("application/json", any GVK) = (false, nil) +// ("application/json; charset=UTF-8", any GVK) = (false, nil) +// ("malformed content type string", any GVK) = (false, error) +func ContentTypeIsGVK(contentType string, gvk schema.GroupVersionKind) (bool, error) { base, params, err := mime.ParseMediaType(contentType) if err != nil { - return false + return false, err } - return runtime.ContentTypeJSON == base && - params["g"] == "apidiscovery.k8s.io" && - params["v"] == "v2beta1" && - params["as"] == "APIGroupDiscoveryList" + gvkMatch := runtime.ContentTypeJSON == base && + params["g"] == gvk.Group && + params["v"] == gvk.Version && + params["as"] == gvk.Kind + return gvkMatch, nil } // ServerGroups returns the supported groups, with information like supported versions and the diff --git a/staging/src/k8s.io/client-go/discovery/discovery_client_test.go b/staging/src/k8s.io/client-go/discovery/discovery_client_test.go index 7bb62fe98b9..2b009e38a3c 100644 --- a/staging/src/k8s.io/client-go/discovery/discovery_client_test.go +++ b/staging/src/k8s.io/client-go/discovery/discovery_client_test.go @@ -2762,54 +2762,76 @@ func TestAggregatedServerPreferredResources(t *testing.T) { } func TestDiscoveryContentTypeVersion(t *testing.T) { + v2beta1 := schema.GroupVersionKind{Group: "apidiscovery.k8s.io", Version: "v2beta1", Kind: "APIGroupDiscoveryList"} tests := []struct { contentType string - isV2Beta1 bool + gvk schema.GroupVersionKind + match bool + expectErr bool }{ { contentType: "application/json; g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList", - isV2Beta1: true, + gvk: v2beta1, + match: true, + expectErr: false, }, { // content-type parameters are not in correct order, but comparison ignores order. contentType: "application/json; v=v2beta1;as=APIGroupDiscoveryList;g=apidiscovery.k8s.io", - isV2Beta1: true, + gvk: v2beta1, + match: true, + expectErr: false, }, { // content-type parameters are not in correct order, but comparison ignores order. contentType: "application/json; as=APIGroupDiscoveryList;g=apidiscovery.k8s.io;v=v2beta1", - isV2Beta1: true, + gvk: v2beta1, + match: true, + expectErr: false, }, { // Ignores extra parameter "charset=utf-8" contentType: "application/json; g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList;charset=utf-8", - isV2Beta1: true, + gvk: v2beta1, + match: true, + expectErr: false, }, { contentType: "application/json", - isV2Beta1: false, + gvk: v2beta1, + match: false, + expectErr: false, }, { contentType: "application/json; charset=UTF-8", - isV2Beta1: false, + gvk: v2beta1, + match: false, + expectErr: false, }, { contentType: "text/json", - isV2Beta1: false, + gvk: v2beta1, + match: false, + expectErr: false, }, { contentType: "text/html", - isV2Beta1: false, + gvk: v2beta1, + match: false, + expectErr: false, }, { contentType: "", - isV2Beta1: false, + gvk: v2beta1, + match: false, + expectErr: true, }, } for _, test := range tests { - isV2Beta1 := isV2Beta1ContentType(test.contentType) - assert.Equal(t, test.isV2Beta1, isV2Beta1) + match, err := ContentTypeIsGVK(test.contentType, test.gvk) + assert.Equal(t, test.expectErr, err != nil) + assert.Equal(t, test.match, match) } } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go index 974388f9731..c307c1f0ed5 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go @@ -26,12 +26,14 @@ import ( apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints" discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" "k8s.io/apiserver/pkg/endpoints/request" - scheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/discovery" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" @@ -44,6 +46,13 @@ var APIRegistrationGroupVersion metav1.GroupVersion = metav1.GroupVersion{Group: // first (mirrors v1 discovery behavior) var APIRegistrationGroupPriority int = 20001 +// Aggregated discovery content-type GVK. +var v2Beta1GVK = schema.GroupVersionKind{ + Group: "apidiscovery.k8s.io", + Version: "v2beta1", + Kind: "APIGroupDiscoveryList", +} + // Given a list of APIServices and proxyHandlers for contacting them, // DiscoveryManager caches a list of discovery documents for each server @@ -204,7 +213,7 @@ func (dm *discoveryManager) fetchFreshDiscoveryForService(gv metav1.GroupVersion Path: req.URL.Path, IsResourceRequest: false, })) - req.Header.Add("Accept", runtime.ContentTypeJSON+";g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList") + req.Header.Add("Accept", discovery.AcceptV2Beta1) if exists && len(cached.etag) > 0 { req.Header.Add("If-None-Match", cached.etag) @@ -217,8 +226,9 @@ func (dm *discoveryManager) fetchFreshDiscoveryForService(gv metav1.GroupVersion writer := newInMemoryResponseWriter() handler.ServeHTTP(writer, req) - switch writer.respCode { - case http.StatusNotModified: + isV2Beta1GVK, _ := discovery.ContentTypeIsGVK(writer.Header().Get("Content-Type"), v2Beta1GVK) + switch { + case writer.respCode == http.StatusNotModified: // Keep old entry, update timestamp cached = cachedResult{ discovery: cached.discovery, @@ -228,8 +238,47 @@ func (dm *discoveryManager) fetchFreshDiscoveryForService(gv metav1.GroupVersion dm.setCacheEntryForService(info.service, cached) return &cached, nil - case http.StatusNotAcceptable: - // Discovery Document is not being served at all. + case writer.respCode == http.StatusServiceUnavailable: + return nil, fmt.Errorf("service %s returned non-success response code: %v", + info.service.String(), writer.respCode) + case writer.respCode == http.StatusOK && isV2Beta1GVK: + parsed := &apidiscoveryv2beta1.APIGroupDiscoveryList{} + if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), writer.data, parsed); err != nil { + return nil, err + } + + klog.V(3).Infof("DiscoveryManager: Successfully downloaded discovery for %s", info.service.String()) + + // Convert discovery info into a map for convenient lookup later + discoMap := map[metav1.GroupVersion]apidiscoveryv2beta1.APIVersionDiscovery{} + for _, g := range parsed.Items { + for _, v := range g.Versions { + discoMap[metav1.GroupVersion{Group: g.Name, Version: v.Version}] = v + for i := range v.Resources { + // avoid nil panics in v0.26.0-v0.26.3 client-go clients + // see https://github.com/kubernetes/kubernetes/issues/118361 + if v.Resources[i].ResponseKind == nil { + v.Resources[i].ResponseKind = &metav1.GroupVersionKind{} + } + for j := range v.Resources[i].Subresources { + if v.Resources[i].Subresources[j].ResponseKind == nil { + v.Resources[i].Subresources[j].ResponseKind = &metav1.GroupVersionKind{} + } + } + } + } + } + + // Save cached result + cached = cachedResult{ + discovery: discoMap, + etag: writer.Header().Get("Etag"), + lastUpdated: now, + } + dm.setCacheEntryForService(info.service, cached) + return &cached, nil + default: + // Could not get acceptable response for Aggregated Discovery. // Fall back to legacy discovery information if len(gv.Version) == 0 { return nil, errors.New("not found") @@ -265,7 +314,7 @@ func (dm *discoveryManager) fetchFreshDiscoveryForService(gv metav1.GroupVersion handler.ServeHTTP(writer, req) if writer.respCode != http.StatusOK { - return nil, fmt.Errorf("failed to download discovery for %s: %v", path, writer.String()) + return nil, fmt.Errorf("failed to download legacy discovery for %s: %v", path, writer.String()) } parsed := &metav1.APIResourceList{} @@ -278,6 +327,7 @@ func (dm *discoveryManager) fetchFreshDiscoveryForService(gv metav1.GroupVersion if err != nil { return nil, err } + klog.V(3).Infof("DiscoveryManager: Successfully downloaded legacy discovery for %s", info.service.String()) discoMap := map[metav1.GroupVersion]apidiscoveryv2beta1.APIVersionDiscovery{ // Convert old-style APIGroupList to new information @@ -296,48 +346,6 @@ func (dm *discoveryManager) fetchFreshDiscoveryForService(gv metav1.GroupVersion // one group version and an API Service may serve multiple // group versions. return &cached, nil - - case http.StatusOK: - parsed := &apidiscoveryv2beta1.APIGroupDiscoveryList{} - if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), writer.data, parsed); err != nil { - return nil, err - } - klog.V(3).Infof("DiscoveryManager: Successfully downloaded discovery for %s", info.service.String()) - - // Convert discovery info into a map for convenient lookup later - discoMap := map[metav1.GroupVersion]apidiscoveryv2beta1.APIVersionDiscovery{} - for _, g := range parsed.Items { - for _, v := range g.Versions { - discoMap[metav1.GroupVersion{Group: g.Name, Version: v.Version}] = v - for i := range v.Resources { - // avoid nil panics in v0.26.0-v0.26.3 client-go clients - // see https://github.com/kubernetes/kubernetes/issues/118361 - if v.Resources[i].ResponseKind == nil { - v.Resources[i].ResponseKind = &metav1.GroupVersionKind{} - } - for j := range v.Resources[i].Subresources { - if v.Resources[i].Subresources[j].ResponseKind == nil { - v.Resources[i].Subresources[j].ResponseKind = &metav1.GroupVersionKind{} - } - } - } - } - } - - // Save cached result - cached = cachedResult{ - discovery: discoMap, - etag: writer.Header().Get("Etag"), - lastUpdated: now, - } - dm.setCacheEntryForService(info.service, cached) - return &cached, nil - - default: - klog.Infof("DiscoveryManager: Failed to download discovery for %v: %v %s", - info.service.String(), writer.respCode, writer.data) - return nil, fmt.Errorf("service %s returned non-success response code: %v", - info.service.String(), writer.respCode) } } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery_test.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery_test.go index 44bdfa5c9b9..da61ab6ce85 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery_test.go @@ -596,9 +596,8 @@ func TestLegacyFallbackNoCache(t *testing.T) { require.Equal(t, doc.Items, expectAggregatedDiscovery) } -func TestLegacyFallback(t *testing.T) { +func testLegacyFallbackWithCustomRootHandler(t *testing.T, rootHandlerFn func(http.ResponseWriter, *http.Request)) { aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis") - rootAPIsHandler := discovery.NewRootAPIsHandler(discovery.DefaultAddresses{DefaultAddress: "192.168.1.1"}, scheme.Codecs) legacyGroupHandler := discovery.NewAPIGroupHandler(scheme.Codecs, metav1.APIGroup{ Name: "stable.example.com", @@ -657,7 +656,7 @@ func TestLegacyFallback(t *testing.T) { // defer to legacy discovery legacyResourceHandler.ServeHTTP(w, r) } else if r.URL.Path == "/apis" { - rootAPIsHandler.ServeHTTP(w, r) + rootHandlerFn(w, r) } else { // Unknown url t.Fatalf("unexpected request sent to %v", r.URL.Path) @@ -690,6 +689,82 @@ func TestLegacyFallback(t *testing.T) { }, }, doc.Items) } +func TestLegacyFallback(t *testing.T) { + rootAPIsHandler := discovery.NewRootAPIsHandler(discovery.DefaultAddresses{DefaultAddress: "192.168.1.1"}, scheme.Codecs) + testCases := []struct { + name string + rootHandler func(http.ResponseWriter, *http.Request) + }{ + { + name: "Default root handler (406)", + rootHandler: rootAPIsHandler.ServeHTTP, + }, + { + name: "Root handler with non 200 status code", + rootHandler: func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(404) + }, + }, + { + name: "Root handler with 200 response code no content type", + rootHandler: func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(200) + }, + }, + { + name: "Root handler with 200 response code incorrect content type", + rootHandler: func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json;g=apidiscovery.k8s.io;v=v1alpha1;as=APIGroupDiscoveryList") + w.WriteHeader(200) + }, + }, + } + for _, tc := range testCases { + testLegacyFallbackWithCustomRootHandler(t, tc.rootHandler) + } +} + +func TestAPIServiceStale(t *testing.T) { + aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis") + aggregatedManager := newDiscoveryManager(aggregatedResourceManager) + aggregatedManager.AddAPIService(&apiregistrationv1.APIService{ + ObjectMeta: metav1.ObjectMeta{ + Name: "v1.stable.example.com", + }, + Spec: apiregistrationv1.APIServiceSpec{ + Group: "stable.example.com", + Version: "v1", + Service: &apiregistrationv1.ServiceReference{ + Name: "test-service", + }, + }, + }, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(503) + })) + testCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go aggregatedManager.Run(testCtx.Done()) + require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager)) + + // At this point external services have synced. Check if discovery document + // lists the APIService group version as Stale. + _, _, doc := fetchPath(aggregatedResourceManager, "") + require.Equal(t, []apidiscoveryv2beta1.APIGroupDiscovery{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "stable.example.com", + }, + Versions: []apidiscoveryv2beta1.APIVersionDiscovery{ + { + Version: "v1", + Freshness: apidiscoveryv2beta1.DiscoveryFreshnessStale, + }, + }, + }, + }, doc.Items) +} // Exercises the 304 Not Modified Path of the aggregator // This path in 1.26.0 would result in a deadlock if an aggregated APIService