From 6e83f6750598d394fb257f66c5d0721cf88f45db Mon Sep 17 00:00:00 2001 From: Alexander Zielenski Date: Tue, 8 Nov 2022 12:37:50 -0800 Subject: [PATCH] add new aggregated resourcemanager to genericapiserver Co-authored-by: Jeffrey Ying --- cmd/kube-apiserver/app/server.go | 4 + .../apiserver/pkg/endpoints/apiserver_test.go | 12 +- .../endpoints/discovery/aggregated/etag.go | 84 +++ .../endpoints/discovery/aggregated/fake.go | 170 ++++++ .../endpoints/discovery/aggregated/handler.go | 302 +++++++++++ .../discovery/aggregated/handler_test.go | 501 ++++++++++++++++++ .../discovery/aggregated/negotiation.go | 45 ++ .../endpoints/discovery/aggregated/wrapper.go | 78 +++ .../discovery/aggregated/wrapper_test.go | 156 ++++++ .../pkg/endpoints/discovery/legacy.go | 12 +- .../apiserver/pkg/endpoints/discovery/root.go | 2 +- .../apiserver/pkg/endpoints/groupversion.go | 9 +- .../apiserver/pkg/endpoints/installer.go | 89 ++++ .../apiserver/pkg/endpoints/installer_test.go | 244 +++++++++ .../src/k8s.io/apiserver/pkg/server/config.go | 13 + .../apiserver/pkg/server/genericapiserver.go | 44 +- vendor/modules.txt | 1 + 17 files changed, 1750 insertions(+), 16 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/etag.go create mode 100644 staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/fake.go create mode 100644 staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/handler.go create mode 100644 staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/handler_test.go create mode 100644 staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/negotiation.go create mode 100644 staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/wrapper.go create mode 100644 staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/wrapper_test.go diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index fc36d044dbe..c3ea65aa597 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -40,6 +40,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/authorization/authorizer" + "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" genericapifilters "k8s.io/apiserver/pkg/endpoints/filters" openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" genericfeatures "k8s.io/apiserver/pkg/features" @@ -481,6 +482,9 @@ func buildGenericConfig( if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && s.GenericServerRunOptions.EnablePriorityAndFairness { genericConfig.FlowControl, lastErr = BuildPriorityAndFairness(s, clientgoExternalClient, versionedInformers) } + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) { + genericConfig.AggregatedDiscoveryGroupManager = aggregated.NewResourceManager() + } return } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go index 9f82cae90c6..9de67ad7a60 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go @@ -254,7 +254,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission. group.GroupVersion = grouplessGroupVersion group.OptionsExternalVersion = &grouplessGroupVersion group.Serializer = codecs - if _, err := (&group).InstallREST(container); err != nil { + if _, _, err := (&group).InstallREST(container); err != nil { panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err)) } } @@ -266,7 +266,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission. group.GroupVersion = testGroupVersion group.OptionsExternalVersion = &testGroupVersion group.Serializer = codecs - if _, err := (&group).InstallREST(container); err != nil { + if _, _, err := (&group).InstallREST(container); err != nil { panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err)) } } @@ -278,7 +278,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission. group.GroupVersion = newGroupVersion group.OptionsExternalVersion = &newGroupVersion group.Serializer = codecs - if _, err := (&group).InstallREST(container); err != nil { + if _, _, err := (&group).InstallREST(container); err != nil { panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err)) } } @@ -3311,7 +3311,7 @@ func TestParentResourceIsRequired(t *testing.T) { ParameterCodec: parameterCodec, } container := restful.NewContainer() - if _, err := group.InstallREST(container); err == nil { + if _, _, err := group.InstallREST(container); err == nil { t.Fatal("expected error") } @@ -3343,7 +3343,7 @@ func TestParentResourceIsRequired(t *testing.T) { ParameterCodec: parameterCodec, } container = restful.NewContainer() - if _, err := group.InstallREST(container); err != nil { + if _, _, err := group.InstallREST(container); err != nil { t.Fatal(err) } @@ -4328,7 +4328,7 @@ func TestXGSubresource(t *testing.T) { Serializer: codecs, } - if _, err := (&group).InstallREST(container); err != nil { + if _, _, err := (&group).InstallREST(container); err != nil { panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err)) } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/etag.go b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/etag.go new file mode 100644 index 00000000000..d74e376c7dd --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/etag.go @@ -0,0 +1,84 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aggregated + +import ( + "crypto/sha512" + "encoding/json" + "fmt" + "net/http" + "strconv" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" +) + +// This file exposes helper functions used for calculating the E-Tag header +// used in discovery endpoint responses + +// Attaches Cache-Busting functionality to an endpoint +// - Sets ETag header to provided hash +// - Replies with 304 Not Modified, if If-None-Match header matches hash +// +// hash should be the value of calculateETag on object. If hash is empty, then +// +// the object is simply serialized without E-Tag functionality +func ServeHTTPWithETag( + object runtime.Object, + hash string, + serializer runtime.NegotiatedSerializer, + w http.ResponseWriter, + req *http.Request, +) { + // ETag must be enclosed in double quotes: + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ETag + quotedHash := strconv.Quote(hash) + w.Header().Set("ETag", quotedHash) + w.Header().Set("Vary", "Accept") + w.Header().Set("Cache-Control", "public") + + // If Request includes If-None-Match and matches hash, reply with 304 + // Otherwise, we delegate to the handler for actual content + // + // According to documentation, An Etag within an If-None-Match + // header will be enclosed within doule quotes: + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-None-Match#directives + if clientCachedHash := req.Header.Get("If-None-Match"); quotedHash == clientCachedHash { + w.WriteHeader(http.StatusNotModified) + return + } + + responsewriters.WriteObjectNegotiated( + serializer, + DiscoveryEndpointRestrictions, + AggregatedDiscoveryGV, + w, + req, + http.StatusOK, + object, + true, + ) +} + +func calculateETag(resources interface{}) (string, error) { + serialized, err := json.Marshal(resources) + if err != nil { + return "", err + } + + return fmt.Sprintf("%X", sha512.Sum512(serialized)), nil +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/fake.go b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/fake.go new file mode 100644 index 00000000000..b160bf38233 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/fake.go @@ -0,0 +1,170 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aggregated + +import ( + "context" + "errors" + "net/http" + "reflect" + "sync" + "time" + + "github.com/emicklei/go-restful/v3" + "github.com/google/go-cmp/cmp" + apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" +) + +type FakeResourceManager interface { + ResourceManager + Expect() ResourceManager + + HasExpectedNumberActions() bool + Validate() error + WaitForActions(ctx context.Context, timeout time.Duration) error +} + +func NewFakeResourceManager() FakeResourceManager { + return &fakeResourceManager{} +} + +// a resource manager with helper functions for checking the actions +// match expected. For Use in tests +type fakeResourceManager struct { + recorderResourceManager + expect recorderResourceManager +} + +// a resource manager which instead of managing a discovery document, +// simply records the calls to its interface functoins for testing +type recorderResourceManager struct { + lock sync.RWMutex + Actions []recorderResourceManagerAction +} + +var _ ResourceManager = &fakeResourceManager{} +var _ ResourceManager = &recorderResourceManager{} + +// Storage type for a call to the resource manager +type recorderResourceManagerAction struct { + Type string + Group string + Version string + Value interface{} +} + +func (f *fakeResourceManager) Expect() ResourceManager { + return &f.expect +} + +func (f *fakeResourceManager) HasExpectedNumberActions() bool { + f.lock.RLock() + defer f.lock.RUnlock() + + f.expect.lock.RLock() + defer f.expect.lock.RUnlock() + + return len(f.Actions) >= len(f.expect.Actions) +} + +func (f *fakeResourceManager) Validate() error { + f.lock.RLock() + defer f.lock.RUnlock() + + f.expect.lock.RLock() + defer f.expect.lock.RUnlock() + + if !reflect.DeepEqual(f.expect.Actions, f.Actions) { + return errors.New(cmp.Diff(f.expect.Actions, f.Actions)) + } + return nil +} + +func (f *fakeResourceManager) WaitForActions(ctx context.Context, timeout time.Duration) error { + err := wait.PollImmediateWithContext( + ctx, + 100*time.Millisecond, // try every 100ms + timeout, // timeout after timeout + func(ctx context.Context) (done bool, err error) { + if f.HasExpectedNumberActions() { + return true, f.Validate() + } + return false, nil + }) + return err +} + +func (f *recorderResourceManager) SetGroupPriority(groupName string, priority int) { + f.lock.Lock() + defer f.lock.Unlock() + + f.Actions = append(f.Actions, recorderResourceManagerAction{ + Type: "SetGroupPriority", + Group: groupName, + Value: priority, + }) +} + +func (f *recorderResourceManager) AddGroupVersion(groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) { + f.lock.Lock() + defer f.lock.Unlock() + + f.Actions = append(f.Actions, recorderResourceManagerAction{ + Type: "AddGroupVersion", + Group: groupName, + Value: value, + }) +} +func (f *recorderResourceManager) RemoveGroup(groupName string) { + f.lock.Lock() + defer f.lock.Unlock() + + f.Actions = append(f.Actions, recorderResourceManagerAction{ + Type: "RemoveGroup", + Group: groupName, + }) + +} +func (f *recorderResourceManager) RemoveGroupVersion(gv metav1.GroupVersion) { + f.lock.Lock() + defer f.lock.Unlock() + + f.Actions = append(f.Actions, recorderResourceManagerAction{ + Type: "RemoveGroupVersion", + Group: gv.Group, + Version: gv.Version, + }) + +} +func (f *recorderResourceManager) SetGroups(values []apidiscoveryv2beta1.APIGroupDiscovery) { + f.lock.Lock() + defer f.lock.Unlock() + + f.Actions = append(f.Actions, recorderResourceManagerAction{ + Type: "SetGroups", + Value: values, + }) +} +func (f *recorderResourceManager) WebService() *restful.WebService { + panic("unimplemented") +} + +func (f *recorderResourceManager) ServeHTTP(http.ResponseWriter, *http.Request) { + panic("unimplemented") +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/handler.go b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/handler.go new file mode 100644 index 00000000000..2db8dfc48ce --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/handler.go @@ -0,0 +1,302 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aggregated + +import ( + "net/http" + "reflect" + "sort" + "sync" + + apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" + + "sync/atomic" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/klog/v2" +) + +// This handler serves the /apis endpoint for an aggregated list of +// api resources indexed by their group version. +type ResourceManager interface { + // Adds knowledge of the given groupversion to the discovery document + // If it was already being tracked, updates the stored APIVersionDiscovery + // Thread-safe + AddGroupVersion(groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) + + // Sets priority for a group for sorting discovery. + // If a priority is set before the group is known, the priority will be ignored + // Once a group is removed, the priority is forgotten. + SetGroupPriority(groupName string, priority int) + + // Removes all group versions for a given group + // Thread-safe + RemoveGroup(groupName string) + + // Removes a specific groupversion. If all versions of a group have been + // removed, then the entire group is unlisted. + // Thread-safe + RemoveGroupVersion(gv metav1.GroupVersion) + + // Resets the manager's known list of group-versions and replaces them + // with the given groups + // Thread-Safe + SetGroups([]apidiscoveryv2beta1.APIGroupDiscovery) + + http.Handler +} + +type resourceDiscoveryManager struct { + serializer runtime.NegotiatedSerializer + // cache is an atomic pointer to avoid the use of locks + cache atomic.Pointer[cachedGroupList] + + // Writes protected by the lock. + // List of all apigroups & resources indexed by the resource manager + lock sync.RWMutex + apiGroups map[string]*apidiscoveryv2beta1.APIGroupDiscovery + apiGroupNames map[string]int +} + +func NewResourceManager() ResourceManager { + scheme := runtime.NewScheme() + codecs := serializer.NewCodecFactory(scheme) + utilruntime.Must(apidiscoveryv2beta1.AddToScheme(scheme)) + return &resourceDiscoveryManager{serializer: codecs, apiGroupNames: make(map[string]int)} +} + +func (rdm *resourceDiscoveryManager) SetGroupPriority(group string, priority int) { + rdm.lock.Lock() + defer rdm.lock.Unlock() + + if _, exists := rdm.apiGroupNames[group]; exists { + rdm.apiGroupNames[group] = priority + rdm.cache.Store(nil) + } else { + klog.Warningf("DiscoveryManager: Attempted to set priority for group %s but does not exist", group) + } +} + +func (rdm *resourceDiscoveryManager) SetGroups(groups []apidiscoveryv2beta1.APIGroupDiscovery) { + rdm.lock.Lock() + defer rdm.lock.Unlock() + + rdm.apiGroups = nil + rdm.cache.Store(nil) + + for _, group := range groups { + for _, version := range group.Versions { + rdm.addGroupVersionLocked(group.Name, version) + } + } + + // Filter unused out apiGroupNames + for name := range rdm.apiGroupNames { + if _, exists := rdm.apiGroups[name]; !exists { + delete(rdm.apiGroupNames, name) + } + } +} + +func (rdm *resourceDiscoveryManager) AddGroupVersion(groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) { + rdm.lock.Lock() + defer rdm.lock.Unlock() + + rdm.addGroupVersionLocked(groupName, value) +} + +func (rdm *resourceDiscoveryManager) addGroupVersionLocked(groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) { + klog.Infof("Adding GroupVersion %s %s to ResourceManager", groupName, value.Version) + + if rdm.apiGroups == nil { + rdm.apiGroups = make(map[string]*apidiscoveryv2beta1.APIGroupDiscovery) + } + + if existing, groupExists := rdm.apiGroups[groupName]; groupExists { + // If this version already exists, replace it + versionExists := false + + // Not very efficient, but in practice there are generally not many versions + for i := range existing.Versions { + if existing.Versions[i].Version == value.Version { + // The new gv is the exact same as what is already in + // the map. This is a noop and cache should not be + // invalidated. + if reflect.DeepEqual(existing.Versions[i], value) { + return + } + existing.Versions[i] = value + versionExists = true + break + } + } + + if !versionExists { + existing.Versions = append(existing.Versions, value) + } + + } else { + group := &apidiscoveryv2beta1.APIGroupDiscovery{ + ObjectMeta: metav1.ObjectMeta{ + Name: groupName, + }, + Versions: []apidiscoveryv2beta1.APIVersionDiscovery{value}, + } + rdm.apiGroups[groupName] = group + rdm.apiGroupNames[groupName] = 0 + } + + // Reset response document so it is recreated lazily + rdm.cache.Store(nil) +} + +func (rdm *resourceDiscoveryManager) RemoveGroupVersion(apiGroup metav1.GroupVersion) { + rdm.lock.Lock() + defer rdm.lock.Unlock() + group, exists := rdm.apiGroups[apiGroup.Group] + if !exists { + return + } + + modified := false + for i := range group.Versions { + if group.Versions[i].Version == apiGroup.Version { + group.Versions = append(group.Versions[:i], group.Versions[i+1:]...) + modified = true + break + } + } + // If no modification was done, cache does not need to be cleared + if !modified { + return + } + + if len(group.Versions) == 0 { + delete(rdm.apiGroups, group.Name) + delete(rdm.apiGroupNames, group.Name) + } + + // Reset response document so it is recreated lazily + rdm.cache.Store(nil) +} + +func (rdm *resourceDiscoveryManager) RemoveGroup(groupName string) { + rdm.lock.Lock() + defer rdm.lock.Unlock() + + delete(rdm.apiGroups, groupName) + delete(rdm.apiGroupNames, groupName) + + // Reset response document so it is recreated lazily + rdm.cache.Store(nil) +} + +// Prepares the api group list for serving by converting them from map into +// list and sorting them according to insertion order +func (rdm *resourceDiscoveryManager) calculateAPIGroupsLocked() []apidiscoveryv2beta1.APIGroupDiscovery { + // Re-order the apiGroups by their priority. + groups := []apidiscoveryv2beta1.APIGroupDiscovery{} + for _, group := range rdm.apiGroups { + groups = append(groups, *group.DeepCopy()) + + } + + sort.SliceStable(groups, func(i, j int) bool { + iName := groups[i].Name + jName := groups[j].Name + + // Default to 0 priority by default + iPriority := rdm.apiGroupNames[iName] + jPriority := rdm.apiGroupNames[jName] + + // Sort discovery based on apiservice priority. + // Duplicated from staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helpers.go + if iPriority == jPriority { + // Equal priority uses name to break ties + return iName < jName + } + + // i sorts before j if it has a lower priority + return iPriority > jPriority + }) + + return groups +} + +// Fetches from cache if it exists. If cache is empty, create it. +func (rdm *resourceDiscoveryManager) fetchFromCache() *cachedGroupList { + rdm.lock.RLock() + defer rdm.lock.RUnlock() + + cacheLoad := rdm.cache.Load() + if cacheLoad != nil { + return cacheLoad + } + response := apidiscoveryv2beta1.APIGroupDiscoveryList{ + Items: rdm.calculateAPIGroupsLocked(), + } + etag, err := calculateETag(response) + if err != nil { + klog.Errorf("failed to calculate etag for discovery document: %s", etag) + etag = "" + } + cached := &cachedGroupList{ + cachedResponse: response, + cachedResponseETag: etag, + } + rdm.cache.Store(cached) + return cached +} + +type cachedGroupList struct { + cachedResponse apidiscoveryv2beta1.APIGroupDiscoveryList + cachedResponseETag string +} + +func (rdm *resourceDiscoveryManager) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + cache := rdm.fetchFromCache() + response := cache.cachedResponse + etag := cache.cachedResponseETag + + if len(etag) > 0 { + // Use proper e-tag headers if one is available + ServeHTTPWithETag( + &response, + etag, + rdm.serializer, + resp, + req, + ) + } else { + // Default to normal response in rare case etag is + // not cached with the object for some reason. + responsewriters.WriteObjectNegotiated( + rdm.serializer, + DiscoveryEndpointRestrictions, + AggregatedDiscoveryGV, + resp, + req, + http.StatusOK, + &response, + true, + ) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/handler_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/handler_test.go new file mode 100644 index 00000000000..9fe88023bb4 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/handler_test.go @@ -0,0 +1,501 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aggregated_test + +import ( + "encoding/json" + "math/rand" + "net/http" + "net/http/httptest" + + "sort" + "strconv" + "strings" + "sync" + "testing" + + fuzz "github.com/google/gofuzz" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + runtimeserializer "k8s.io/apimachinery/pkg/runtime/serializer" + discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" +) + +var scheme = runtime.NewScheme() +var codecs = runtimeserializer.NewCodecFactory(scheme) + +const discoveryPath = "/apis" + +func init() { + // Add all builtin types to scheme + apidiscoveryv2beta1.AddToScheme(scheme) + codecs = runtimeserializer.NewCodecFactory(scheme) +} + +func fuzzAPIGroups(atLeastNumGroups, maxNumGroups int, seed int64) apidiscoveryv2beta1.APIGroupDiscoveryList { + fuzzer := fuzz.NewWithSeed(seed) + fuzzer.NumElements(atLeastNumGroups, maxNumGroups) + fuzzer.NilChance(0) + fuzzer.Funcs(func(o *apidiscoveryv2beta1.APIGroupDiscovery, c fuzz.Continue) { + c.FuzzNoCustom(o) + + // The ResourceManager will just not serve the group if its versions + // list is empty + atLeastOne := apidiscoveryv2beta1.APIVersionDiscovery{} + c.Fuzz(&atLeastOne) + o.Versions = append(o.Versions, atLeastOne) + + o.TypeMeta = metav1.TypeMeta{} + var name string + c.Fuzz(&name) + o.ObjectMeta = metav1.ObjectMeta{ + Name: name, + } + }) + + var apis []apidiscoveryv2beta1.APIGroupDiscovery + fuzzer.Fuzz(&apis) + sort.Slice(apis[:], func(i, j int) bool { + return apis[i].Name < apis[j].Name + }) + + return apidiscoveryv2beta1.APIGroupDiscoveryList{ + TypeMeta: metav1.TypeMeta{ + Kind: "APIGroupDiscoveryList", + APIVersion: "apidiscovery.k8s.io/v2beta1", + }, + Items: apis, + } +} + +func fetchPath(handler http.Handler, acceptPrefix string, path string, etag string) (*http.Response, []byte, *apidiscoveryv2beta1.APIGroupDiscoveryList) { + // Expect json-formatted apis group list + w := httptest.NewRecorder() + req := httptest.NewRequest("GET", discoveryPath, nil) + + // Ask for JSON response + req.Header.Set("Accept", acceptPrefix+";g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList") + + if etag != "" { + // Quote provided etag if unquoted + quoted := etag + if !strings.HasPrefix(etag, "\"") { + quoted = strconv.Quote(etag) + } + req.Header.Set("If-None-Match", quoted) + } + + handler.ServeHTTP(w, req) + + bytes := w.Body.Bytes() + var decoded *apidiscoveryv2beta1.APIGroupDiscoveryList + if len(bytes) > 0 { + decoded = &apidiscoveryv2beta1.APIGroupDiscoveryList{} + runtime.DecodeInto(codecs.UniversalDecoder(), bytes, decoded) + } + + return w.Result(), bytes, decoded +} + +// Add all builtin APIServices to the manager and check the output +func TestBasicResponse(t *testing.T) { + manager := discoveryendpoint.NewResourceManager() + + apis := fuzzAPIGroups(1, 3, 10) + manager.SetGroups(apis.Items) + + response, body, decoded := fetchPath(manager, "application/json", discoveryPath, "") + + jsonFormatted, err := json.Marshal(&apis) + require.NoError(t, err, "json marshal should always succeed") + + assert.Equal(t, http.StatusOK, response.StatusCode, "response should be 200 OK") + assert.Equal(t, "application/json;g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList", response.Header.Get("Content-Type"), "Content-Type response header should be as requested in Accept header if supported") + assert.NotEmpty(t, response.Header.Get("ETag"), "E-Tag should be set") + + assert.NoError(t, err, "decode should always succeed") + assert.EqualValues(t, &apis, decoded, "decoded value should equal input") + assert.Equal(t, string(jsonFormatted)+"\n", string(body), "response should be the api group list") +} + +// Test that protobuf is outputted correctly +func TestBasicResponseProtobuf(t *testing.T) { + manager := discoveryendpoint.NewResourceManager() + + apis := fuzzAPIGroups(1, 3, 10) + manager.SetGroups(apis.Items) + + response, _, decoded := fetchPath(manager, "application/vnd.kubernetes.protobuf", discoveryPath, "") + assert.Equal(t, http.StatusOK, response.StatusCode, "response should be 200 OK") + assert.Equal(t, "application/vnd.kubernetes.protobuf;g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList", response.Header.Get("Content-Type"), "Content-Type response header should be as requested in Accept header if supported") + assert.NotEmpty(t, response.Header.Get("ETag"), "E-Tag should be set") + assert.EqualValues(t, &apis, decoded, "decoded value should equal input") +} + +// Test that an etag associated with the service only depends on the apiresources +// e.g.: Multiple services with the same contents should have the same etag. +func TestEtagConsistent(t *testing.T) { + // Create 2 managers, add a bunch of services to each + manager1 := discoveryendpoint.NewResourceManager() + manager2 := discoveryendpoint.NewResourceManager() + + apis := fuzzAPIGroups(1, 3, 11) + manager1.SetGroups(apis.Items) + manager2.SetGroups(apis.Items) + + // Make sure etag of each is the same + res1_initial, _, _ := fetchPath(manager1, "application/json", discoveryPath, "") + res2_initial, _, _ := fetchPath(manager2, "application/json", discoveryPath, "") + + assert.NotEmpty(t, res1_initial.Header.Get("ETag"), "Etag should be populated") + assert.NotEmpty(t, res2_initial.Header.Get("ETag"), "Etag should be populated") + assert.Equal(t, res1_initial.Header.Get("ETag"), res2_initial.Header.Get("ETag"), "etag should be deterministic") + + // Then add one service to only one. + // Make sure etag is changed, but other is the same + apis = fuzzAPIGroups(1, 1, 11) + for _, group := range apis.Items { + for _, version := range group.Versions { + manager1.AddGroupVersion(group.Name, version) + } + } + + res1_addedToOne, _, _ := fetchPath(manager1, "application/json", discoveryPath, "") + res2_addedToOne, _, _ := fetchPath(manager2, "application/json", discoveryPath, "") + + assert.NotEmpty(t, res1_addedToOne.Header.Get("ETag"), "Etag should be populated") + assert.NotEmpty(t, res2_addedToOne.Header.Get("ETag"), "Etag should be populated") + assert.NotEqual(t, res1_initial.Header.Get("ETag"), res1_addedToOne.Header.Get("ETag"), "ETag should be changed since version was added") + assert.Equal(t, res2_initial.Header.Get("ETag"), res2_addedToOne.Header.Get("ETag"), "ETag should be unchanged since data was unchanged") + + // Then add service to other one + // Make sure etag is the same + for _, group := range apis.Items { + for _, version := range group.Versions { + manager2.AddGroupVersion(group.Name, version) + } + } + + res1_addedToBoth, _, _ := fetchPath(manager1, "application/json", discoveryPath, "") + res2_addedToBoth, _, _ := fetchPath(manager2, "application/json", discoveryPath, "") + + assert.NotEmpty(t, res1_addedToOne.Header.Get("ETag"), "Etag should be populated") + assert.NotEmpty(t, res2_addedToOne.Header.Get("ETag"), "Etag should be populated") + assert.Equal(t, res1_addedToBoth.Header.Get("ETag"), res2_addedToBoth.Header.Get("ETag"), "ETags should be equal since content is equal") + assert.NotEqual(t, res2_initial.Header.Get("ETag"), res2_addedToBoth.Header.Get("ETag"), "ETag should be changed since data was changed") + + // Remove the group version from both. Initial E-Tag should be restored + for _, group := range apis.Items { + for _, version := range group.Versions { + manager1.RemoveGroupVersion(metav1.GroupVersion{ + Group: group.Name, + Version: version.Version, + }) + manager2.RemoveGroupVersion(metav1.GroupVersion{ + Group: group.Name, + Version: version.Version, + }) + } + } + + res1_removeFromBoth, _, _ := fetchPath(manager1, "application/json", discoveryPath, "") + res2_removeFromBoth, _, _ := fetchPath(manager2, "application/json", discoveryPath, "") + + assert.NotEmpty(t, res1_addedToOne.Header.Get("ETag"), "Etag should be populated") + assert.NotEmpty(t, res2_addedToOne.Header.Get("ETag"), "Etag should be populated") + assert.Equal(t, res1_removeFromBoth.Header.Get("ETag"), res2_removeFromBoth.Header.Get("ETag"), "ETags should be equal since content is equal") + assert.Equal(t, res1_initial.Header.Get("ETag"), res1_removeFromBoth.Header.Get("ETag"), "ETag should be equal to initial value since added content was removed") +} + +// Test that if a request comes in with an If-None-Match header with an incorrect +// E-Tag, that fresh content is returned. +func TestEtagNonMatching(t *testing.T) { + manager := discoveryendpoint.NewResourceManager() + apis := fuzzAPIGroups(1, 3, 12) + manager.SetGroups(apis.Items) + + // fetch the document once + initial, _, _ := fetchPath(manager, "application/json", discoveryPath, "") + assert.NotEmpty(t, initial.Header.Get("ETag"), "ETag should be populated") + + // Send another request with a wrong e-tag. The same response should + // get sent again + second, _, _ := fetchPath(manager, "application/json", discoveryPath, "wrongetag") + + assert.Equal(t, http.StatusOK, initial.StatusCode, "response should be 200 OK") + assert.Equal(t, http.StatusOK, second.StatusCode, "response should be 200 OK") + assert.Equal(t, initial.Header.Get("ETag"), second.Header.Get("ETag"), "ETag of both requests should be equal") +} + +// Test that if a request comes in with an If-None-Match header with a correct +// E-Tag, that 304 Not Modified is returned +func TestEtagMatching(t *testing.T) { + manager := discoveryendpoint.NewResourceManager() + apis := fuzzAPIGroups(1, 3, 12) + manager.SetGroups(apis.Items) + + // fetch the document once + initial, initialBody, _ := fetchPath(manager, "application/json", discoveryPath, "") + assert.NotEmpty(t, initial.Header.Get("ETag"), "ETag should be populated") + assert.NotEmpty(t, initialBody, "body should not be empty") + + // Send another request with a wrong e-tag. The same response should + // get sent again + second, secondBody, _ := fetchPath(manager, "application/json", discoveryPath, initial.Header.Get("ETag")) + + assert.Equal(t, http.StatusOK, initial.StatusCode, "initial response should be 200 OK") + assert.Equal(t, http.StatusNotModified, second.StatusCode, "second response should be 304 Not Modified") + assert.Equal(t, initial.Header.Get("ETag"), second.Header.Get("ETag"), "ETag of both requests should be equal") + assert.Empty(t, secondBody, "body should be empty when returning 304 Not Modified") +} + +// Test that if a request comes in with an If-None-Match header with an old +// E-Tag, that fresh content is returned +func TestEtagOutdated(t *testing.T) { + manager := discoveryendpoint.NewResourceManager() + apis := fuzzAPIGroups(1, 3, 15) + manager.SetGroups(apis.Items) + + // fetch the document once + initial, initialBody, _ := fetchPath(manager, "application/json", discoveryPath, "") + assert.NotEmpty(t, initial.Header.Get("ETag"), "ETag should be populated") + assert.NotEmpty(t, initialBody, "body should not be empty") + + // Then add some services so the etag changes + apis = fuzzAPIGroups(1, 3, 14) + for _, group := range apis.Items { + for _, version := range group.Versions { + manager.AddGroupVersion(group.Name, version) + } + } + + // Send another request with the old e-tag. Response should not be 304 Not Modified + second, secondBody, _ := fetchPath(manager, "application/json", discoveryPath, initial.Header.Get("ETag")) + + assert.Equal(t, http.StatusOK, initial.StatusCode, "initial response should be 200 OK") + assert.Equal(t, http.StatusOK, second.StatusCode, "second response should be 304 Not Modified") + assert.NotEqual(t, initial.Header.Get("ETag"), second.Header.Get("ETag"), "ETag of both requests should be unequal since contents differ") + assert.NotEmpty(t, secondBody, "body should be not empty when returning 304 Not Modified") +} + +// Test that an api service can be added or removed +func TestAddRemove(t *testing.T) { + manager := discoveryendpoint.NewResourceManager() + apis := fuzzAPIGroups(1, 3, 15) + for _, group := range apis.Items { + for _, version := range group.Versions { + manager.AddGroupVersion(group.Name, version) + } + } + + _, _, initialDocument := fetchPath(manager, "application/json", discoveryPath, "") + + for _, group := range apis.Items { + for _, version := range group.Versions { + manager.RemoveGroupVersion(metav1.GroupVersion{ + Group: group.Name, + Version: version.Version, + }) + } + } + + _, _, secondDocument := fetchPath(manager, "application/json", discoveryPath, "") + + require.NotNil(t, initialDocument, "initial document should parse") + require.NotNil(t, secondDocument, "second document should parse") + assert.Len(t, initialDocument.Items, len(apis.Items), "initial document should have set number of groups") + assert.Len(t, secondDocument.Items, 0, "second document should have no groups") +} + +// Show that updating an existing service replaces and does not add the entry +// and instead replaces it +func TestUpdateService(t *testing.T) { + manager := discoveryendpoint.NewResourceManager() + apis := fuzzAPIGroups(1, 3, 15) + for _, group := range apis.Items { + for _, version := range group.Versions { + manager.AddGroupVersion(group.Name, version) + } + } + + _, _, initialDocument := fetchPath(manager, "application/json", discoveryPath, "") + + assert.Equal(t, initialDocument, &apis, "should have returned expected document") + + b, err := json.Marshal(apis) + if err != nil { + t.Error(err) + } + var newapis apidiscoveryv2beta1.APIGroupDiscoveryList + err = json.Unmarshal(b, &newapis) + if err != nil { + t.Error(err) + } + + newapis.Items[0].Versions[0].Resources[0].Resource = "changed a resource name!" + for _, group := range newapis.Items { + for _, version := range group.Versions { + manager.AddGroupVersion(group.Name, version) + } + } + + _, _, secondDocument := fetchPath(manager, "application/json", discoveryPath, "") + assert.Equal(t, secondDocument, &newapis, "should have returned expected document") + assert.NotEqual(t, secondDocument, initialDocument, "should have returned expected document") +} + +// Show the discovery manager is capable of serving requests to multiple users +// with unchanging data +func TestConcurrentRequests(t *testing.T) { + manager := discoveryendpoint.NewResourceManager() + apis := fuzzAPIGroups(1, 3, 15) + manager.SetGroups(apis.Items) + + waitGroup := sync.WaitGroup{} + + numReaders := 100 + numRequestsPerReader := 100 + + // Spawn a bunch of readers that will keep sending requests to the server + for i := 0; i < numReaders; i++ { + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + etag := "" + for j := 0; j < numRequestsPerReader; j++ { + usedEtag := etag + if j%2 == 0 { + // Disable use of etag for every second request + usedEtag = "" + } + response, body, document := fetchPath(manager, "application/json", discoveryPath, usedEtag) + + if usedEtag != "" { + assert.Equal(t, http.StatusNotModified, response.StatusCode, "response should be Not Modified if etag was used") + assert.Empty(t, body, "body should be empty if etag used") + } else { + assert.Equal(t, http.StatusOK, response.StatusCode, "response should be OK if etag was unused") + assert.Equal(t, &apis, document, "document should be equal") + } + + etag = response.Header.Get("ETag") + } + }() + } + waitGroup.Wait() +} + +// Show the handler is capable of serving many concurrent readers and many +// concurrent writers without tripping up. Good to run with go '-race' detector +// since there are not many "correctness" checks +func TestAbuse(t *testing.T) { + manager := discoveryendpoint.NewResourceManager() + + numReaders := 100 + numRequestsPerReader := 1000 + + numWriters := 10 + numWritesPerWriter := 1000 + + waitGroup := sync.WaitGroup{} + + // Spawn a bunch of writers that randomly add groups, remove groups, and + // reset the list of groups + for i := 0; i < numWriters; i++ { + source := rand.NewSource(int64(i)) + + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + + // track list of groups we've added so that we can remove them + // randomly + var addedGroups []metav1.GroupVersion + + for j := 0; j < numWritesPerWriter; j++ { + switch source.Int63() % 3 { + case 0: + // Add a fuzzed group + apis := fuzzAPIGroups(1, 2, 15) + for _, group := range apis.Items { + for _, version := range group.Versions { + manager.AddGroupVersion(group.Name, version) + addedGroups = append(addedGroups, metav1.GroupVersion{ + Group: group.Name, + Version: version.Version, + }) + } + } + case 1: + // Remove a group that we have added + if len(addedGroups) > 0 { + manager.RemoveGroupVersion(addedGroups[0]) + addedGroups = addedGroups[1:] + } else { + // Send a request and try to remove a group someone else + // might have added + _, _, document := fetchPath(manager, "application/json", discoveryPath, "") + assert.NotNil(t, document, "manager should always succeed in returning a document") + + if len(document.Items) > 0 { + manager.RemoveGroupVersion(metav1.GroupVersion{ + Group: document.Items[0].Name, + Version: document.Items[0].Versions[0].Version, + }) + } + + } + case 2: + manager.SetGroups(nil) + addedGroups = nil + default: + panic("unreachable") + } + } + }() + } + + // Spawn a bunch of readers that will keep sending requests to the server + // and making sure the response makes sense + for i := 0; i < numReaders; i++ { + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + + etag := "" + for j := 0; j < numRequestsPerReader; j++ { + response, body, document := fetchPath(manager, "application/json", discoveryPath, etag) + + if response.StatusCode == http.StatusNotModified { + assert.Equal(t, etag, response.Header.Get("ETag")) + assert.Empty(t, body, "body should be empty if etag used") + assert.Nil(t, document) + } else { + assert.Equal(t, http.StatusOK, response.StatusCode, "response should be OK if etag was unused") + assert.NotNil(t, document) + } + + etag = response.Header.Get("ETag") + } + }() + } + + waitGroup.Wait() +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/negotiation.go b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/negotiation.go new file mode 100644 index 00000000000..9e58dad8542 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/negotiation.go @@ -0,0 +1,45 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aggregated + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" +) + +var AggregatedDiscoveryGV = schema.GroupVersion{Group: "apidiscovery.k8s.io", Version: "v2beta1"} + +// Interface is from "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" + +// DiscoveryEndpointRestrictions allows requests to /apis to provide a Content Negotiation GVK for aggregated discovery. +var DiscoveryEndpointRestrictions = discoveryEndpointRestrictions{} + +type discoveryEndpointRestrictions struct{} + +func (discoveryEndpointRestrictions) AllowsMediaTypeTransform(mimeType string, mimeSubType string, gvk *schema.GroupVersionKind) bool { + return IsAggregatedDiscoveryGVK(gvk) +} + +func (discoveryEndpointRestrictions) AllowsServerVersion(string) bool { return false } +func (discoveryEndpointRestrictions) AllowsStreamSchema(s string) bool { return s == "watch" } + +// IsAggregatedDiscoveryGVK checks if a provided GVK is the GVK for serving aggregated discovery. +func IsAggregatedDiscoveryGVK(gvk *schema.GroupVersionKind) bool { + if gvk != nil { + return gvk.Group == "apidiscovery.k8s.io" && gvk.Version == "v2beta1" && gvk.Kind == "APIGroupDiscoveryList" + } + return false +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/wrapper.go b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/wrapper.go new file mode 100644 index 00000000000..8516c154c88 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/wrapper.go @@ -0,0 +1,78 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aggregated + +import ( + "net/http" + + apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1" + "k8s.io/apimachinery/pkg/runtime/serializer" + + "github.com/emicklei/go-restful/v3" + "k8s.io/apimachinery/pkg/runtime" + + "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" + genericfeatures "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" +) + +type WrappedHandler struct { + s runtime.NegotiatedSerializer + handler http.Handler + aggHandler http.Handler +} + +func (wrapped *WrappedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) { + mediaType, _ := negotiation.NegotiateMediaTypeOptions(req.Header.Get("Accept"), wrapped.s.SupportedMediaTypes(), DiscoveryEndpointRestrictions) + // mediaType.Convert looks at the request accept headers and is used to control whether the discovery document will be aggregated. + if IsAggregatedDiscoveryGVK(mediaType.Convert) { + wrapped.aggHandler.ServeHTTP(resp, req) + return + } + } + wrapped.handler.ServeHTTP(resp, req) +} + +func (wrapped *WrappedHandler) restfulHandle(req *restful.Request, resp *restful.Response) { + wrapped.ServeHTTP(resp.ResponseWriter, req.Request) +} + +func (wrapped *WrappedHandler) GenerateWebService(prefix string, returnType interface{}) *restful.WebService { + mediaTypes, _ := negotiation.MediaTypesForSerializer(wrapped.s) + ws := new(restful.WebService) + ws.Path(prefix) + ws.Doc("get available API versions") + ws.Route(ws.GET("/").To(wrapped.restfulHandle). + Doc("get available API versions"). + Operation("getAPIVersions"). + Produces(mediaTypes...). + Consumes(mediaTypes...). + Writes(returnType)) + return ws +} + +// WrapAggregatedDiscoveryToHandler wraps a handler with an option to +// emit the aggregated discovery by passing in the aggregated +// discovery type in content negotiation headers: eg: (Accept: +// application/json;v=v2beta1;g=apidiscovery.k8s.io;as=APIGroupDiscoveryList) +func WrapAggregatedDiscoveryToHandler(handler http.Handler, aggHandler http.Handler) *WrappedHandler { + scheme := runtime.NewScheme() + apidiscoveryv2beta1.AddToScheme(scheme) + codecs := serializer.NewCodecFactory(scheme) + return &WrappedHandler{codecs, handler, aggHandler} +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/wrapper_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/wrapper_test.go new file mode 100644 index 00000000000..78f11bc6394 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/wrapper_test.go @@ -0,0 +1,156 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aggregated + +import ( + "net/http" + "net/http/httptest" + + "io" + "testing" + + "github.com/stretchr/testify/assert" + genericfeatures "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" +) + +const discoveryPath = "/apis" +const jsonAccept = "application/json" +const protobufAccept = "application/vnd.kubernetes.protobuf" +const aggregatedAcceptSuffix = ";g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList" + +const aggregatedJSONAccept = jsonAccept + aggregatedAcceptSuffix +const aggregatedProtoAccept = protobufAccept + aggregatedAcceptSuffix + +func fetchPath(handler http.Handler, path, accept string) string { + w := httptest.NewRecorder() + req := httptest.NewRequest("GET", discoveryPath, nil) + + // Ask for JSON response + req.Header.Set("Accept", accept) + + handler.ServeHTTP(w, req) + return string(w.Body.Bytes()) +} + +type fakeHTTPHandler struct { + data string +} + +func (f fakeHTTPHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + io.WriteString(resp, f.data) +} + +func TestAggregationEnabled(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.AggregatedDiscoveryEndpoint, true)() + + unaggregated := fakeHTTPHandler{data: "unaggregated"} + aggregated := fakeHTTPHandler{data: "aggregated"} + wrapped := WrapAggregatedDiscoveryToHandler(unaggregated, aggregated) + + testCases := []struct { + accept string + expected string + }{ + { + // Misconstructed/incorrect accept headers should be passed to the unaggregated handler to return an error + accept: "application/json;foo=bar", + expected: "unaggregated", + }, { + // Empty accept headers are valid and should be handled by the unaggregated handler + accept: "", + expected: "unaggregated", + }, { + accept: aggregatedJSONAccept, + expected: "aggregated", + }, { + accept: aggregatedProtoAccept, + expected: "aggregated", + }, { + accept: jsonAccept, + expected: "unaggregated", + }, { + accept: protobufAccept, + expected: "unaggregated", + }, { + // Server should return the first accepted type + accept: aggregatedJSONAccept + "," + jsonAccept, + expected: "aggregated", + }, { + // Server should return the first accepted type + accept: aggregatedProtoAccept + "," + protobufAccept, + expected: "aggregated", + }, + } + + for _, tc := range testCases { + body := fetchPath(wrapped, discoveryPath, tc.accept) + assert.Equal(t, tc.expected, body) + } +} + +func TestAggregationDisabled(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.AggregatedDiscoveryEndpoint, false)() + + unaggregated := fakeHTTPHandler{data: "unaggregated"} + aggregated := fakeHTTPHandler{data: "aggregated"} + wrapped := WrapAggregatedDiscoveryToHandler(unaggregated, aggregated) + + testCases := []struct { + accept string + expected string + }{ + { + // Misconstructed/incorrect accept headers should be passed to the unaggregated handler to return an error + accept: "application/json;foo=bar", + expected: "unaggregated", + }, { + // Empty accept headers are valid and should be handled by the unaggregated handler + accept: "", + expected: "unaggregated", + }, { + + accept: aggregatedJSONAccept, + expected: "unaggregated", + }, { + accept: aggregatedProtoAccept, + expected: "unaggregated", + }, { + accept: jsonAccept, + expected: "unaggregated", + }, { + accept: protobufAccept, + expected: "unaggregated", + }, { + // Server should return the first accepted type. + // If aggregation is disabled, the unaggregated type should be returned. + accept: aggregatedJSONAccept + "," + jsonAccept, + expected: "unaggregated", + }, { + // Server should return the first accepted type. + // If aggregation is disabled, the unaggregated type should be returned. + accept: aggregatedProtoAccept + "," + protobufAccept, + expected: "unaggregated", + }, + } + + for _, tc := range testCases { + body := fetchPath(wrapped, discoveryPath, tc.accept) + assert.Equal(t, tc.expected, body) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/legacy.go b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/legacy.go index 94d27925551..dae0d714b9b 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/legacy.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/legacy.go @@ -56,7 +56,7 @@ func (s *legacyRootAPIHandler) WebService() *restful.WebService { ws := new(restful.WebService) ws.Path(s.apiPrefix) ws.Doc("get available API versions") - ws.Route(ws.GET("/").To(s.handle). + ws.Route(ws.GET("/").To(s.restfulHandle). Doc("get available API versions"). Operation("getAPIVersions"). Produces(mediaTypes...). @@ -65,12 +65,16 @@ func (s *legacyRootAPIHandler) WebService() *restful.WebService { return ws } -func (s *legacyRootAPIHandler) handle(req *restful.Request, resp *restful.Response) { - clientIP := utilnet.GetClientIP(req.Request) +func (s *legacyRootAPIHandler) restfulHandle(req *restful.Request, resp *restful.Response) { + s.ServeHTTP(resp.ResponseWriter, req.Request) +} + +func (s *legacyRootAPIHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + clientIP := utilnet.GetClientIP(req) apiVersions := &metav1.APIVersions{ ServerAddressByClientCIDRs: s.addresses.ServerAddressByClientCIDRs(clientIP), Versions: []string{"v1"}, } - responsewriters.WriteObjectNegotiated(s.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, apiVersions, false) + responsewriters.WriteObjectNegotiated(s.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, resp, req, http.StatusOK, apiVersions, false) } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/root.go b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/root.go index 1a5a22fbc25..24f0a34526d 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/root.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/root.go @@ -35,7 +35,7 @@ import ( type GroupManager interface { AddGroup(apiGroup metav1.APIGroup) RemoveGroup(groupName string) - + ServeHTTP(resp http.ResponseWriter, req *http.Request) WebService() *restful.WebService } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/groupversion.go b/staging/src/k8s.io/apiserver/pkg/endpoints/groupversion.go index d7f85106ee4..34b80b44997 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/groupversion.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/groupversion.go @@ -22,6 +22,7 @@ import ( restful "github.com/emicklei/go-restful/v3" + apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -105,7 +106,7 @@ type APIGroupVersion struct { // InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container. // It is expected that the provided path root prefix will serve all operations. Root MUST NOT end // in a slash. -func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storageversion.ResourceInfo, error) { +func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]apidiscoveryv2beta1.APIResourceDiscovery, []*storageversion.ResourceInfo, error) { prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version) installer := &APIInstaller{ group: g, @@ -117,7 +118,11 @@ func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storagev versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources}) versionDiscoveryHandler.AddToWebService(ws) container.Add(ws) - return removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors) + aggregatedDiscoveryResources, err := ConvertGroupVersionIntoToDiscovery(apiResources) + if err != nil { + registrationErrors = append(registrationErrors, err) + } + return aggregatedDiscoveryResources, removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors) } func removeNonPersistedResources(infos []*storageversion.ResourceInfo) []*storageversion.ResourceInfo { diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go b/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go index b4e6d2b27f8..e91c513df85 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go @@ -26,6 +26,7 @@ import ( "unicode" restful "github.com/emicklei/go-restful/v3" + apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/runtime" @@ -68,6 +69,94 @@ type action struct { AllNamespaces bool // true iff the action is namespaced but works on aggregate result for all namespaces } +func ConvertGroupVersionIntoToDiscovery(list []metav1.APIResource) ([]apidiscoveryv2beta1.APIResourceDiscovery, error) { + var apiResourceList []apidiscoveryv2beta1.APIResourceDiscovery + parentResources := map[string]*apidiscoveryv2beta1.APIResourceDiscovery{} + + // Loop through all top-level resources + for _, r := range list { + if strings.Contains(r.Name, "/") { + // Skip subresources for now so we can get the list of resources + continue + } + + var scope apidiscoveryv2beta1.ResourceScope + if r.Namespaced { + scope = apidiscoveryv2beta1.ScopeNamespace + } else { + scope = apidiscoveryv2beta1.ScopeCluster + } + + apiResourceList = append(apiResourceList, apidiscoveryv2beta1.APIResourceDiscovery{ + Resource: r.Name, + Scope: scope, + ResponseKind: &metav1.GroupVersionKind{ + Group: r.Group, + Version: r.Version, + Kind: r.Kind, + }, + Verbs: r.Verbs, + ShortNames: r.ShortNames, + Categories: r.Categories, + SingularResource: r.SingularName, + }) + parentResources[r.Name] = &apiResourceList[len(apiResourceList)-1] + } + + // Loop through all subresources + for _, r := range list { + // Split resource name and subresource name + split := strings.SplitN(r.Name, "/", 2) + + if len(split) != 2 { + // Skip parent resources + continue + } + + var scope apidiscoveryv2beta1.ResourceScope + if r.Namespaced { + scope = apidiscoveryv2beta1.ScopeNamespace + } else { + scope = apidiscoveryv2beta1.ScopeCluster + } + + var parent *apidiscoveryv2beta1.APIResourceDiscovery + var exists bool + + parent, exists = parentResources[split[0]] + if !exists { + // If a subresource exists without a parent, create a parent + apiResourceList = append(apiResourceList, apidiscoveryv2beta1.APIResourceDiscovery{ + Resource: split[0], + Scope: scope, + }) + parentResources[split[0]] = &apiResourceList[len(apiResourceList)-1] + parent = &apiResourceList[len(apiResourceList)-1] + parentResources[split[0]] = parent + } + + if parent.Scope != scope { + return nil, fmt.Errorf("Error: Parent %s (scope: %s) and subresource %s (scope: %s) scope do not match", split[0], parent.Scope, split[1], scope) + // + } + + subresource := apidiscoveryv2beta1.APISubresourceDiscovery{ + Subresource: split[1], + Verbs: r.Verbs, + } + if r.Kind != "" { + subresource.ResponseKind = &metav1.GroupVersionKind{ + Group: r.Group, + Version: r.Version, + Kind: r.Kind, + } + } + parent.Subresources = append(parent.Subresources, subresource) + + } + return apiResourceList, nil +} + // An interface to see if one storage supports override its default verb for monitoring type StorageMetricsOverride interface { // OverrideMetricsVerb gives a storage object an opportunity to override the verb reported to the metrics endpoint diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/installer_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/installer_test.go index d4f74817226..bb6cd32aa69 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/installer_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/installer_test.go @@ -18,6 +18,10 @@ package endpoints import ( "testing" + + "github.com/stretchr/testify/require" + apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestIsVowel(t *testing.T) { @@ -97,3 +101,243 @@ func TestGetArticleForNoun(t *testing.T) { } } } + +func TestConvertAPIResourceToDiscovery(t *testing.T) { + tests := []struct { + name string + resources []metav1.APIResource + wantAPIResourceDiscovery []apidiscoveryv2beta1.APIResourceDiscovery + wantErr bool + }{ + { + name: "Basic Test", + resources: []metav1.APIResource{ + { + + Name: "pods", + Namespaced: true, + Kind: "Pod", + ShortNames: []string{"po"}, + Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"}, + }, + }, + wantAPIResourceDiscovery: []apidiscoveryv2beta1.APIResourceDiscovery{ + { + Resource: "pods", + Scope: apidiscoveryv2beta1.ScopeNamespace, + ResponseKind: &metav1.GroupVersionKind{ + Kind: "Pod", + }, + ShortNames: []string{"po"}, + Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"}, + }, + }, + }, + { + name: "Basic Group Version Test", + resources: []metav1.APIResource{ + { + Name: "cronjobs", + Namespaced: true, + Group: "batch", + Version: "v1", + Kind: "CronJob", + ShortNames: []string{"cj"}, + Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"}, + }, + }, + wantAPIResourceDiscovery: []apidiscoveryv2beta1.APIResourceDiscovery{ + { + Resource: "cronjobs", + Scope: apidiscoveryv2beta1.ScopeNamespace, + ResponseKind: &metav1.GroupVersionKind{ + Group: "batch", + Version: "v1", + Kind: "CronJob", + }, + ShortNames: []string{"cj"}, + Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"}, + }, + }, + }, + { + name: "Test with subresource", + resources: []metav1.APIResource{ + { + Name: "cronjobs", + Namespaced: true, + Kind: "CronJob", + Group: "batch", + Version: "v1", + ShortNames: []string{"cj"}, + Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"}, + }, + { + Name: "cronjobs/status", + Namespaced: true, + Kind: "CronJob", + Group: "batch", + Version: "v1", + ShortNames: []string{"cj"}, + Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"}, + }, + }, + wantAPIResourceDiscovery: []apidiscoveryv2beta1.APIResourceDiscovery{ + { + Resource: "cronjobs", + Scope: apidiscoveryv2beta1.ScopeNamespace, + ResponseKind: &metav1.GroupVersionKind{ + Group: "batch", + Version: "v1", + Kind: "CronJob", + }, + ShortNames: []string{"cj"}, + Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"}, + Subresources: []apidiscoveryv2beta1.APISubresourceDiscovery{{ + Subresource: "status", + ResponseKind: &metav1.GroupVersionKind{ + Group: "batch", + Version: "v1", + Kind: "CronJob", + }, + Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"}, + }}, + }, + }, + }, + { + name: "Test with subresource with no parent", + resources: []metav1.APIResource{ + { + Name: "cronjobs/status", + Namespaced: true, + Kind: "CronJob", + Group: "batch", + Version: "v1", + Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"}, + }, + }, + wantAPIResourceDiscovery: []apidiscoveryv2beta1.APIResourceDiscovery{ + { + Resource: "cronjobs", + Scope: apidiscoveryv2beta1.ScopeNamespace, + Subresources: []apidiscoveryv2beta1.APISubresourceDiscovery{{ + Subresource: "status", + ResponseKind: &metav1.GroupVersionKind{ + Group: "batch", + Version: "v1", + Kind: "CronJob", + }, + Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"}, + }}, + }, + }, + }, + { + name: "Test with mismatch parent and subresource scope", + resources: []metav1.APIResource{ + { + Name: "cronjobs", + Namespaced: true, + Kind: "CronJob", + Group: "batch", + Version: "v1", + ShortNames: []string{"cj"}, + Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"}, + }, + { + Name: "cronjobs/status", + Namespaced: false, + Kind: "CronJob", + Group: "batch", + Version: "v1", + ShortNames: []string{"cj"}, + Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"}, + }, + }, + wantAPIResourceDiscovery: []apidiscoveryv2beta1.APIResourceDiscovery{}, + wantErr: true, + }, + { + name: "Cluster Scope Test", + resources: []metav1.APIResource{ + { + Name: "nodes", + Namespaced: false, + Kind: "Node", + ShortNames: []string{"no"}, + Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"}, + }, + }, + wantAPIResourceDiscovery: []apidiscoveryv2beta1.APIResourceDiscovery{ + { + Resource: "nodes", + Scope: apidiscoveryv2beta1.ScopeCluster, + ResponseKind: &metav1.GroupVersionKind{ + Kind: "Node", + }, + ShortNames: []string{"no"}, + Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"}, + }, + }, + }, + { + name: "Namespace Scope Test", + resources: []metav1.APIResource{ + { + Name: "nodes", + Namespaced: true, + Kind: "Node", + ShortNames: []string{"no"}, + Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"}, + }, + }, + wantAPIResourceDiscovery: []apidiscoveryv2beta1.APIResourceDiscovery{ + { + Resource: "nodes", + Scope: apidiscoveryv2beta1.ScopeNamespace, + ResponseKind: &metav1.GroupVersionKind{ + Kind: "Node", + }, + ShortNames: []string{"no"}, + Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"}, + }, + }, + }, + { + name: "Singular Resource Name", + resources: []metav1.APIResource{ + { + Name: "nodes", + SingularName: "node", + Kind: "Node", + ShortNames: []string{"no"}, + Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"}, + }, + }, + wantAPIResourceDiscovery: []apidiscoveryv2beta1.APIResourceDiscovery{ + { + Resource: "nodes", + SingularResource: "node", + Scope: apidiscoveryv2beta1.ScopeCluster, + ResponseKind: &metav1.GroupVersionKind{ + Kind: "Node", + }, + ShortNames: []string{"no"}, + Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"}, + }, + }, + }, + } + + for _, tt := range tests { + discoveryAPIResources, err := ConvertGroupVersionIntoToDiscovery(tt.resources) + if err != nil { + if tt.wantErr == false { + t.Error(err) + } + } else { + require.Equal(t, tt.wantAPIResourceDiscovery, discoveryAPIResources) + } + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 27aeeeef292..7f3be77b01e 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -47,6 +47,7 @@ import ( "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/endpoints/discovery" + discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" "k8s.io/apiserver/pkg/endpoints/filterlatency" genericapifilters "k8s.io/apiserver/pkg/endpoints/filters" apiopenapi "k8s.io/apiserver/pkg/endpoints/openapi" @@ -122,6 +123,7 @@ type Config struct { EnableIndex bool EnableProfiling bool EnableDiscovery bool + // Requires generic profiling enabled EnableContentionProfiling bool EnableMetrics bool @@ -259,6 +261,9 @@ type Config struct { // StorageVersionManager holds the storage versions of the API resources installed by this server. StorageVersionManager storageversion.Manager + + // AggregatedDiscoveryGroupManager serves /apis in an aggregated form. + AggregatedDiscoveryGroupManager discoveryendpoint.ResourceManager } type RecommendedConfig struct { @@ -668,6 +673,14 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G muxAndDiscoveryCompleteSignals: map[string]<-chan struct{}{}, } + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) { + manager := c.AggregatedDiscoveryGroupManager + if manager == nil { + manager = discoveryendpoint.NewResourceManager() + } + s.AggregatedDiscoveryGroupManager = manager + s.AggregatedLegacyDiscoveryGroupManager = discoveryendpoint.NewResourceManager() + } for { if c.JSONPatchMaxCopyBytes <= 0 { break diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 925f557fb0b..27966b51ea5 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -26,6 +26,7 @@ import ( systemd "github.com/coreos/go-systemd/v22/daemon" + apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -39,6 +40,7 @@ import ( "k8s.io/apiserver/pkg/authorization/authorizer" genericapi "k8s.io/apiserver/pkg/endpoints" "k8s.io/apiserver/pkg/endpoints/discovery" + discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" "k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/registry/rest" @@ -137,9 +139,15 @@ type GenericAPIServer struct { // listedPathProvider is a lister which provides the set of paths to show at / listedPathProvider routes.ListedPathProvider - // DiscoveryGroupManager serves /apis + // DiscoveryGroupManager serves /apis in an unaggregated form. DiscoveryGroupManager discovery.GroupManager + // AggregatedDiscoveryGroupManager serves /apis in an aggregated form. + AggregatedDiscoveryGroupManager discoveryendpoint.ResourceManager + + // AggregatedLegacyDiscoveryGroupManager serves /api in an aggregated form. + AggregatedLegacyDiscoveryGroupManager discoveryendpoint.ResourceManager + // Enable swagger and/or OpenAPI if these configs are non-nil. openAPIConfig *openapicommon.Config @@ -676,11 +684,35 @@ func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *A apiGroupVersion.MaxRequestBodyBytes = s.maxRequestBodyBytes - r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer) + discoveryAPIResources, r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer) + if err != nil { return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err) } resourceInfos = append(resourceInfos, r...) + + if utilfeature.DefaultFeatureGate.Enabled(features.AggregatedDiscoveryEndpoint) { + // Aggregated discovery only aggregates resources under /apis + if apiPrefix == APIGroupPrefix { + s.AggregatedDiscoveryGroupManager.AddGroupVersion( + groupVersion.Group, + apidiscoveryv2beta1.APIVersionDiscovery{ + Version: groupVersion.Version, + Resources: discoveryAPIResources, + }, + ) + } else { + // There is only one group version for legacy resources, priority can be defaulted to 0. + s.AggregatedLegacyDiscoveryGroupManager.AddGroupVersion( + groupVersion.Group, + apidiscoveryv2beta1.APIVersionDiscovery{ + Version: groupVersion.Version, + Resources: discoveryAPIResources, + }, + ) + } + } + } s.RegisterDestroyFunc(apiGroupInfo.destroyStorage) @@ -715,7 +747,13 @@ func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo // Install the version handler. // Add a handler at / to enumerate the supported api versions. - s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix).WebService()) + legacyRootAPIHandler := discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix) + if utilfeature.DefaultFeatureGate.Enabled(features.AggregatedDiscoveryEndpoint) { + wrapped := discoveryendpoint.WrapAggregatedDiscoveryToHandler(legacyRootAPIHandler, s.AggregatedLegacyDiscoveryGroupManager) + s.Handler.GoRestfulContainer.Add(wrapped.GenerateWebService("/api", metav1.APIVersions{})) + } else { + s.Handler.GoRestfulContainer.Add(legacyRootAPIHandler.WebService()) + } return nil } diff --git a/vendor/modules.txt b/vendor/modules.txt index 6c481dce3e4..39a2b845391 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1497,6 +1497,7 @@ k8s.io/apiserver/pkg/cel/metrics k8s.io/apiserver/pkg/endpoints k8s.io/apiserver/pkg/endpoints/deprecation k8s.io/apiserver/pkg/endpoints/discovery +k8s.io/apiserver/pkg/endpoints/discovery/aggregated k8s.io/apiserver/pkg/endpoints/filterlatency k8s.io/apiserver/pkg/endpoints/filters k8s.io/apiserver/pkg/endpoints/handlers