From 0740b11073d08262d4d6687ceedd8f0c71819ebd Mon Sep 17 00:00:00 2001 From: Alexander Zielenski Date: Mon, 20 Mar 2023 10:05:15 -0700 Subject: [PATCH 1/2] allow multiple sources to add/remove from discovery without clobbering each other --- .../pkg/apiserver/apiserver.go | 7 +- .../customresource_discovery_controller.go | 1 + ...ustomresource_discovery_controller_test.go | 6 +- .../endpoints/discovery/aggregated/fake.go | 4 + .../endpoints/discovery/aggregated/handler.go | 208 ++++++++++++++--- .../discovery/aggregated/handler_test.go | 86 +++++++ .../apiserver/pkg/server/genericapiserver.go | 2 + .../pkg/apiserver/apiserver.go | 4 +- .../pkg/apiserver/handler_discovery.go | 2 +- .../apiserver/discovery/discovery_test.go | 171 +++++++++++++- .../apiserver/discovery/framework.go | 214 +++++++++++++++++- 11 files changed, 655 insertions(+), 50 deletions(-) diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go index 8e999b9bf4c..9bbc3da5010 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go @@ -43,6 +43,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/version" "k8s.io/apiserver/pkg/endpoints/discovery" + "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" "k8s.io/apiserver/pkg/features" genericregistry "k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/registry/rest" @@ -210,7 +211,11 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler) s.GenericAPIServer.RegisterDestroyFunc(crdHandler.destroy) - discoveryController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler, genericServer.AggregatedDiscoveryGroupManager) + aggregatedDiscoveryManager := genericServer.AggregatedDiscoveryGroupManager + if aggregatedDiscoveryManager != nil { + aggregatedDiscoveryManager = aggregatedDiscoveryManager.WithSource(aggregated.CRDSource) + } + discoveryController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler, aggregatedDiscoveryManager) namingController := status.NewNamingConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1()) nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1()) apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1()) diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_discovery_controller.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_discovery_controller.go index fec1e066e9d..9e663ac6cc9 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_discovery_controller.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_discovery_controller.go @@ -265,6 +265,7 @@ func (c *DiscoveryController) sync(version schema.GroupVersion) error { }) if c.resourceManager != nil { c.resourceManager.AddGroupVersion(version.Group, apidiscoveryv2beta1.APIVersionDiscovery{ + Freshness: apidiscoveryv2beta1.DiscoveryFreshnessCurrent, Version: version.Version, Resources: aggregatedApiResourcesForDiscovery, }) diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_discovery_controller_test.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_discovery_controller_test.go index 7991f41e457..d51b2694190 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_discovery_controller_test.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_discovery_controller_test.go @@ -126,7 +126,8 @@ var coolBarCRD = &v1.CustomResourceDefinition{ } var coolFooDiscovery apidiscoveryv2beta1.APIVersionDiscovery = apidiscoveryv2beta1.APIVersionDiscovery{ - Version: "v1", + Version: "v1", + Freshness: apidiscoveryv2beta1.DiscoveryFreshnessCurrent, Resources: []apidiscoveryv2beta1.APIResourceDiscovery{ { Resource: "coolfoos", @@ -157,7 +158,8 @@ var coolFooDiscovery apidiscoveryv2beta1.APIVersionDiscovery = apidiscoveryv2bet } var mergedDiscovery apidiscoveryv2beta1.APIVersionDiscovery = apidiscoveryv2beta1.APIVersionDiscovery{ - Version: "v1", + Version: "v1", + Freshness: apidiscoveryv2beta1.DiscoveryFreshnessCurrent, Resources: []apidiscoveryv2beta1.APIResourceDiscovery{ { Resource: "coolbars", diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/fake.go b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/fake.go index ea5039c7c36..a819fe28fc3 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/fake.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/fake.go @@ -169,3 +169,7 @@ func (f *recorderResourceManager) WebService() *restful.WebService { func (f *recorderResourceManager) ServeHTTP(http.ResponseWriter, *http.Request) { panic("unimplemented") } + +func (f *recorderResourceManager) WithSource(source Source) ResourceManager { + panic("unimplemented") +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/handler.go b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/handler.go index 0df875a82f6..61a7fd70de1 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/handler.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/handler.go @@ -36,6 +36,15 @@ import ( "k8s.io/klog/v2" ) +type Source uint + +// The GroupVersion from the lowest Source takes precedence +const ( + AggregatorSource Source = 0 + BuiltinSource Source = 100 + CRDSource Source = 200 +) + // This handler serves the /apis endpoint for an aggregated list of // api resources indexed by their group version. type ResourceManager interface { @@ -65,9 +74,55 @@ type ResourceManager interface { // Thread-Safe SetGroups([]apidiscoveryv2beta1.APIGroupDiscovery) + // Returns the same resource manager using a different source + // The source is used to decide how to de-duplicate groups. + // The group from the least-numbered source is used + WithSource(source Source) ResourceManager + http.Handler } +type resourceManager struct { + source Source + *resourceDiscoveryManager +} + +func (rm resourceManager) AddGroupVersion(groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) { + rm.resourceDiscoveryManager.AddGroupVersion(rm.source, groupName, value) +} +func (rm resourceManager) SetGroupVersionPriority(gv metav1.GroupVersion, grouppriority, versionpriority int) { + rm.resourceDiscoveryManager.SetGroupVersionPriority(rm.source, gv, grouppriority, versionpriority) +} +func (rm resourceManager) RemoveGroup(groupName string) { + rm.resourceDiscoveryManager.RemoveGroup(rm.source, groupName) +} +func (rm resourceManager) RemoveGroupVersion(gv metav1.GroupVersion) { + rm.resourceDiscoveryManager.RemoveGroupVersion(rm.source, gv) +} +func (rm resourceManager) SetGroups(groups []apidiscoveryv2beta1.APIGroupDiscovery) { + rm.resourceDiscoveryManager.SetGroups(rm.source, groups) +} + +func (rm resourceManager) WithSource(source Source) ResourceManager { + return resourceManager{ + source: source, + resourceDiscoveryManager: rm.resourceDiscoveryManager, + } +} + +type groupKey struct { + name string + + // Source identifies where this group came from and dictates which group + // among duplicates is chosen to be used for discovery. + source Source +} + +type groupVersionKey struct { + metav1.GroupVersion + source Source +} + type resourceDiscoveryManager struct { serializer runtime.NegotiatedSerializer // cache is an atomic pointer to avoid the use of locks @@ -78,8 +133,8 @@ type resourceDiscoveryManager struct { // Writes protected by the lock. // List of all apigroups & resources indexed by the resource manager lock sync.RWMutex - apiGroups map[string]*apidiscoveryv2beta1.APIGroupDiscovery - versionPriorities map[metav1.GroupVersion]priorityInfo + apiGroups map[groupKey]*apidiscoveryv2beta1.APIGroupDiscovery + versionPriorities map[groupVersionKey]priorityInfo } type priorityInfo struct { @@ -93,7 +148,7 @@ func NewResourceManager(path string) ResourceManager { utilruntime.Must(apidiscoveryv2beta1.AddToScheme(scheme)) rdm := &resourceDiscoveryManager{ serializer: codecs, - versionPriorities: make(map[metav1.GroupVersion]priorityInfo), + versionPriorities: make(map[groupVersionKey]priorityInfo), } rdm.serveHTTPFunc = metrics.InstrumentHandlerFunc("GET", /* group = */ "", @@ -105,20 +160,28 @@ func NewResourceManager(path string) ResourceManager { /* deprecated */ false, /* removedRelease */ "", rdm.serveHTTP) - return rdm + return resourceManager{ + source: BuiltinSource, + resourceDiscoveryManager: rdm, + } } -func (rdm *resourceDiscoveryManager) SetGroupVersionPriority(gv metav1.GroupVersion, groupPriorityMinimum, versionPriority int) { + +func (rdm *resourceDiscoveryManager) SetGroupVersionPriority(source Source, gv metav1.GroupVersion, groupPriorityMinimum, versionPriority int) { rdm.lock.Lock() defer rdm.lock.Unlock() - rdm.versionPriorities[gv] = priorityInfo{ + key := groupVersionKey{ + GroupVersion: gv, + source: source, + } + rdm.versionPriorities[key] = priorityInfo{ GroupPriorityMinimum: groupPriorityMinimum, VersionPriority: versionPriority, } rdm.cache.Store(nil) } -func (rdm *resourceDiscoveryManager) SetGroups(groups []apidiscoveryv2beta1.APIGroupDiscovery) { +func (rdm *resourceDiscoveryManager) SetGroups(source Source, groups []apidiscoveryv2beta1.APIGroupDiscovery) { rdm.lock.Lock() defer rdm.lock.Unlock() @@ -127,13 +190,17 @@ func (rdm *resourceDiscoveryManager) SetGroups(groups []apidiscoveryv2beta1.APIG for _, group := range groups { for _, version := range group.Versions { - rdm.addGroupVersionLocked(group.Name, version) + rdm.addGroupVersionLocked(source, group.Name, version) } } // Filter unused out priority entries for gv := range rdm.versionPriorities { - entry, exists := rdm.apiGroups[gv.Group] + key := groupKey{ + source: source, + name: gv.Group, + } + entry, exists := rdm.apiGroups[key] if !exists { delete(rdm.versionPriorities, gv) continue @@ -154,21 +221,26 @@ func (rdm *resourceDiscoveryManager) SetGroups(groups []apidiscoveryv2beta1.APIG } } -func (rdm *resourceDiscoveryManager) AddGroupVersion(groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) { +func (rdm *resourceDiscoveryManager) AddGroupVersion(source Source, groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) { rdm.lock.Lock() defer rdm.lock.Unlock() - rdm.addGroupVersionLocked(groupName, value) + rdm.addGroupVersionLocked(source, groupName, value) } -func (rdm *resourceDiscoveryManager) addGroupVersionLocked(groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) { +func (rdm *resourceDiscoveryManager) addGroupVersionLocked(source Source, groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) { klog.Infof("Adding GroupVersion %s %s to ResourceManager", groupName, value.Version) if rdm.apiGroups == nil { - rdm.apiGroups = make(map[string]*apidiscoveryv2beta1.APIGroupDiscovery) + rdm.apiGroups = make(map[groupKey]*apidiscoveryv2beta1.APIGroupDiscovery) } - if existing, groupExists := rdm.apiGroups[groupName]; groupExists { + key := groupKey{ + source: source, + name: groupName, + } + + if existing, groupExists := rdm.apiGroups[key]; groupExists { // If this version already exists, replace it versionExists := false @@ -181,6 +253,7 @@ func (rdm *resourceDiscoveryManager) addGroupVersionLocked(groupName string, val if reflect.DeepEqual(existing.Versions[i], value) { return } + existing.Versions[i] = value versionExists = true break @@ -198,12 +271,16 @@ func (rdm *resourceDiscoveryManager) addGroupVersionLocked(groupName string, val }, Versions: []apidiscoveryv2beta1.APIVersionDiscovery{value}, } - rdm.apiGroups[groupName] = group + rdm.apiGroups[key] = group } gv := metav1.GroupVersion{Group: groupName, Version: value.Version} - if _, ok := rdm.versionPriorities[gv]; !ok { - rdm.versionPriorities[gv] = priorityInfo{ + gvKey := groupVersionKey{ + GroupVersion: gv, + source: source, + } + if _, ok := rdm.versionPriorities[gvKey]; !ok { + rdm.versionPriorities[gvKey] = priorityInfo{ GroupPriorityMinimum: 1000, VersionPriority: 15, } @@ -213,10 +290,16 @@ func (rdm *resourceDiscoveryManager) addGroupVersionLocked(groupName string, val rdm.cache.Store(nil) } -func (rdm *resourceDiscoveryManager) RemoveGroupVersion(apiGroup metav1.GroupVersion) { +func (rdm *resourceDiscoveryManager) RemoveGroupVersion(source Source, apiGroup metav1.GroupVersion) { rdm.lock.Lock() defer rdm.lock.Unlock() - group, exists := rdm.apiGroups[apiGroup.Group] + + key := groupKey{ + source: source, + name: apiGroup.Group, + } + + group, exists := rdm.apiGroups[key] if !exists { return } @@ -234,23 +317,33 @@ func (rdm *resourceDiscoveryManager) RemoveGroupVersion(apiGroup metav1.GroupVer return } - delete(rdm.versionPriorities, apiGroup) + gvKey := groupVersionKey{ + GroupVersion: apiGroup, + source: source, + } + + delete(rdm.versionPriorities, gvKey) if len(group.Versions) == 0 { - delete(rdm.apiGroups, group.Name) + delete(rdm.apiGroups, key) } // Reset response document so it is recreated lazily rdm.cache.Store(nil) } -func (rdm *resourceDiscoveryManager) RemoveGroup(groupName string) { +func (rdm *resourceDiscoveryManager) RemoveGroup(source Source, groupName string) { rdm.lock.Lock() defer rdm.lock.Unlock() - delete(rdm.apiGroups, groupName) + key := groupKey{ + source: source, + name: groupName, + } + + delete(rdm.apiGroups, key) for k := range rdm.versionPriorities { - if k.Group == groupName { + if k.Group == groupName && k.source == source { delete(rdm.versionPriorities, k) } } @@ -265,17 +358,63 @@ func (rdm *resourceDiscoveryManager) calculateAPIGroupsLocked() []apidiscoveryv2 regenerationCounter.Inc() // Re-order the apiGroups by their priority. groups := []apidiscoveryv2beta1.APIGroupDiscovery{} - for _, group := range rdm.apiGroups { - copied := *group.DeepCopy() + + groupsToUse := map[string]apidiscoveryv2beta1.APIGroupDiscovery{} + sourcesUsed := map[metav1.GroupVersion]Source{} + + for key, group := range rdm.apiGroups { + if existing, ok := groupsToUse[key.name]; ok { + for _, v := range group.Versions { + gv := metav1.GroupVersion{Group: key.name, Version: v.Version} + + // Skip groupversions we've already seen before. Only DefaultSource + // takes precedence + if usedSource, seen := sourcesUsed[gv]; seen && key.source >= usedSource { + continue + } else if seen { + // Find the index of the duplicate version and replace + for i := 0; i < len(existing.Versions); i++ { + if existing.Versions[i].Version == v.Version { + existing.Versions[i] = v + break + } + } + + } else { + // New group-version, just append + existing.Versions = append(existing.Versions, v) + } + + sourcesUsed[gv] = key.source + groupsToUse[key.name] = existing + } + // Check to see if we have overlapping versions. If we do, take the one + // with highest source precedence + } else { + groupsToUse[key.name] = *group.DeepCopy() + for _, v := range group.Versions { + gv := metav1.GroupVersion{Group: key.name, Version: v.Version} + sourcesUsed[gv] = key.source + } + } + } + + for _, group := range groupsToUse { // Re-order versions based on their priority. Use kube-aware string // comparison as a tie breaker - sort.SliceStable(copied.Versions, func(i, j int) bool { - iVersion := copied.Versions[i].Version - jVersion := copied.Versions[j].Version + sort.SliceStable(group.Versions, func(i, j int) bool { + iVersion := group.Versions[i].Version + jVersion := group.Versions[j].Version - iPriority := rdm.versionPriorities[metav1.GroupVersion{Group: group.Name, Version: iVersion}].VersionPriority - jPriority := rdm.versionPriorities[metav1.GroupVersion{Group: group.Name, Version: jVersion}].VersionPriority + iGV := metav1.GroupVersion{Group: group.Name, Version: iVersion} + jGV := metav1.GroupVersion{Group: group.Name, Version: jVersion} + + iSource := sourcesUsed[iGV] + jSource := sourcesUsed[jGV] + + iPriority := rdm.versionPriorities[groupVersionKey{iGV, iSource}].VersionPriority + jPriority := rdm.versionPriorities[groupVersionKey{jGV, jSource}].VersionPriority // Sort by version string comparator if priority is equal if iPriority == jPriority { @@ -286,13 +425,16 @@ func (rdm *resourceDiscoveryManager) calculateAPIGroupsLocked() []apidiscoveryv2 return iPriority > jPriority }) - groups = append(groups, *copied.DeepCopy()) - + groups = append(groups, group) } // For each group, determine the highest minimum group priority and use that priorities := map[string]int{} for gv, info := range rdm.versionPriorities { + if source := sourcesUsed[gv.GroupVersion]; source != gv.source { + continue + } + if existing, exists := priorities[gv.Group]; exists { if existing < info.GroupPriorityMinimum { priorities[gv.Group] = info.GroupPriorityMinimum diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/handler_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/handler_test.go index fb5aaf2d23a..7fd3cd46567 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/handler_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/handler_test.go @@ -18,6 +18,7 @@ package aggregated_test import ( "encoding/json" + "fmt" "math/rand" "net/http" "net/http/httptest" @@ -365,6 +366,91 @@ func TestUpdateService(t *testing.T) { assert.NotEqual(t, secondDocument, initialDocument, "should have returned expected document") } +func TestMultipleSources(t *testing.T) { + type pair struct { + manager discoveryendpoint.ResourceManager + apis apidiscoveryv2beta1.APIGroupDiscoveryList + } + + pairs := []pair{} + + defaultManager := discoveryendpoint.NewResourceManager("apis") + for i := 0; i < 10; i++ { + name := discoveryendpoint.Source(100 * i) + manager := defaultManager.WithSource(name) + apis := fuzzAPIGroups(1, 3, int64(15+i)) + + // Give the groups deterministic names + for i := range apis.Items { + apis.Items[i].Name = fmt.Sprintf("%v.%v.com", i, name) + } + + pairs = append(pairs, pair{manager, apis}) + } + + expectedResult := []apidiscoveryv2beta1.APIGroupDiscovery{} + + groupCounter := 0 + for _, p := range pairs { + for gi, g := range p.apis.Items { + for vi, v := range g.Versions { + p.manager.AddGroupVersion(g.Name, v) + + // Use index for priority so we dont have to do any sorting + // Use negative index since it is sorted descending + p.manager.SetGroupVersionPriority(metav1.GroupVersion{Group: g.Name, Version: v.Version}, -gi-groupCounter, -vi) + } + + expectedResult = append(expectedResult, g) + } + + groupCounter += len(p.apis.Items) + } + + // Show discovery document is what we expect + _, _, initialDocument := fetchPath(defaultManager, "application/json", discoveryPath, "") + + require.Len(t, initialDocument.Items, len(expectedResult)) + require.Equal(t, initialDocument.Items, expectedResult) +} + +// Shows that if you have multiple sources including Default source using +// with the same group name the groups added by the "Default" source are used +func TestSourcePrecedence(t *testing.T) { + defaultManager := discoveryendpoint.NewResourceManager("apis") + otherManager := defaultManager.WithSource(500) + apis := fuzzAPIGroups(1, 3, int64(15)) + for _, g := range apis.Items { + for i, v := range g.Versions { + v.Freshness = apidiscoveryv2beta1.DiscoveryFreshnessCurrent + g.Versions[i] = v + otherManager.AddGroupVersion(g.Name, v) + } + } + + _, _, initialDocument := fetchPath(defaultManager, "application/json", discoveryPath, "") + require.Equal(t, apis.Items, initialDocument.Items) + + // Add the first groupversion under default. + // No versions should appear in discovery document except this one + overrideVersion := initialDocument.Items[0].Versions[0] + overrideVersion.Freshness = apidiscoveryv2beta1.DiscoveryFreshnessStale + defaultManager.AddGroupVersion(initialDocument.Items[0].Name, overrideVersion) + + _, _, maskedDocument := fetchPath(defaultManager, "application/json", discoveryPath, "") + masked := initialDocument.DeepCopy() + masked.Items[0].Versions[0].Freshness = apidiscoveryv2beta1.DiscoveryFreshnessStale + + require.Equal(t, masked.Items, maskedDocument.Items) + + // Wipe out default group. The other versions from the other group should now + // appear since the group is not being overridden by defaults ource + defaultManager.RemoveGroup(apis.Items[0].Name) + + _, _, resetDocument := fetchPath(defaultManager, "application/json", discoveryPath, "") + require.Equal(t, resetDocument.Items, initialDocument.Items) +} + // Show the discovery manager is capable of serving requests to multiple users // with unchanging data func TestConcurrentRequests(t *testing.T) { diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 4a2756c188f..52c865f8a98 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -776,6 +776,7 @@ func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *A s.AggregatedDiscoveryGroupManager.AddGroupVersion( groupVersion.Group, apidiscoveryv2beta1.APIVersionDiscovery{ + Freshness: apidiscoveryv2beta1.DiscoveryFreshnessCurrent, Version: groupVersion.Version, Resources: discoveryAPIResources, }, @@ -785,6 +786,7 @@ func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *A s.AggregatedLegacyDiscoveryGroupManager.AddGroupVersion( groupVersion.Group, apidiscoveryv2beta1.APIVersionDiscovery{ + Freshness: apidiscoveryv2beta1.DiscoveryFreshnessCurrent, Version: groupVersion.Version, Resources: discoveryAPIResources, }, diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index 4fd206cc89a..e673530081a 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -397,7 +397,9 @@ func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) { if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) { s.discoveryAggregationController = NewDiscoveryManager( - s.GenericAPIServer.AggregatedDiscoveryGroupManager, + // Use aggregator as the source name to avoid overwriting native/CRD + // groups + s.GenericAPIServer.AggregatedDiscoveryGroupManager.WithSource(aggregated.AggregatorSource), ) // Setup discovery endpoint diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go index a56b19740bc..3690006fd57 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go @@ -421,7 +421,7 @@ func (dm *discoveryManager) Run(stopCh <-chan struct{}) { } // Ensure that apiregistration.k8s.io is the first group in the discovery group. - dm.mergedDiscoveryHandler.SetGroupVersionPriority(APIRegistrationGroupVersion, APIRegistrationGroupPriority, 0) + dm.mergedDiscoveryHandler.WithSource(discoveryendpoint.BuiltinSource).SetGroupVersionPriority(APIRegistrationGroupVersion, APIRegistrationGroupPriority, 0) wait.PollUntil(1*time.Minute, func() (done bool, err error) { dm.servicesLock.Lock() diff --git a/test/integration/apiserver/discovery/discovery_test.go b/test/integration/apiserver/discovery/discovery_test.go index 66c42431c88..07aa4c3c68d 100644 --- a/test/integration/apiserver/discovery/discovery_test.go +++ b/test/integration/apiserver/discovery/discovery_test.go @@ -255,13 +255,16 @@ func runTestCases(t *testing.T, cases []testCase) { for _, c := range cases { t.Run(c.Name, func(t *testing.T) { func() { - for _, a := range c.Actions { + testContext, testDone := context.WithCancel(ctx) + defer testDone() + + for i, a := range c.Actions { if cleaning, ok := a.(cleaningAction); ok { defer func() { - require.NoError(t, cleaning.Cleanup(ctx, client)) + require.NoError(t, cleaning.Cleanup(testContext, client), "cleanup after \"%T\" step %v", a, i) }() } - require.NoError(t, a.Do(ctx, client)) + require.NoError(t, a.Do(testContext, client), "running \"%T\" step %v", a, i) } }() @@ -339,9 +342,10 @@ func TestCRD(t *testing.T) { applyCRD(makeCRDSpec(stableGroup, "Bar", false, []string{"v1", "v2"})), - // only CRD has stable v2, this will show that CRD has been synced - waitForGroupVersionsV1([]metav1.GroupVersion{stableV2}), - waitForGroupVersionsV2([]metav1.GroupVersion{stableV2}), + // Show that we have v1 and v2 but v1 is stale + waitForGroupVersionsV1([]metav1.GroupVersion{stableV1, stableV2}), + waitForStaleGroupVersionsV2([]metav1.GroupVersion{stableV1}), + waitForFreshGroupVersionsV2([]metav1.GroupVersion{stableV2}), // Delete APIService shared by the aggregated apiservice and // CRD @@ -355,7 +359,160 @@ func TestCRD(t *testing.T) { // Show that the groupversion is re-added back waitForGroupVersionsV1([]metav1.GroupVersion{stableV1, stableV2, stableV1alpha1}), - waitForGroupVersionsV2([]metav1.GroupVersion{stableV1, stableV2, stableV1alpha1}), + waitForFreshGroupVersionsV2([]metav1.GroupVersion{stableV1, stableV2, stableV1alpha1}), + }, + }, + { + // Show that if CRD and Aggregated APIservice share a groupversiom, + // The aggregated apiservice's discovery information is shown in both + // v1 and v2 discovery + Name: "CRDAPIServiceSameGroupDifferentVersions", + Actions: []testAction{ + // Wait for CRD to apply + applyCRD(makeCRDSpec(stableGroup, "Bar", false, []string{"v2", "v1alpha1"})), + // Wait for GV to appear in both discovery documents + waitForGroupVersionsV1([]metav1.GroupVersion{stableV2, stableV1alpha1}), + waitForGroupVersionsV2([]metav1.GroupVersion{stableV2, stableV1alpha1}), + + applyAPIService( + apiregistrationv1.APIServiceSpec{ + Group: stableGroup, + Version: "v1", + InsecureSkipTLSVerify: true, + GroupPriorityMinimum: int32(1000), + VersionPriority: int32(100), + Service: &apiregistrationv1.ServiceReference{ + Name: "unused", + Namespace: "default", + }, + }, + ), + + // We should now have stable v1 available + waitForGroupVersionsV1([]metav1.GroupVersion{stableV1}), + waitForGroupVersionsV2([]metav1.GroupVersion{stableV1}), + + // The CRD group-versions not served by the aggregated + // apiservice should still be availablee + waitForGroupVersionsV1([]metav1.GroupVersion{stableV2, stableV1alpha1}), + waitForGroupVersionsV2([]metav1.GroupVersion{stableV2, stableV1alpha1}), + + // Remove API service. Show we have switched to CRD + deleteObject{ + GroupVersionResource: metav1.GroupVersionResource(apiregistrationv1.SchemeGroupVersion.WithResource("apiservices")), + Name: "v1.stable.example.com", + }, + + // Show that we still have stable v1 since it is in the CRD + waitForGroupVersionsV1([]metav1.GroupVersion{stableV2, stableV1alpha1}), + waitForGroupVersionsV2([]metav1.GroupVersion{stableV2, stableV1alpha1}), + + waitForAbsentGroupVersionsV1([]metav1.GroupVersion{stableV1}), + waitForAbsentGroupVersionsV2([]metav1.GroupVersion{stableV1}), + }, + }, + { + // Show that if CRD and a builtin share a group version, + // the builtin takes precedence in both versions of discovery + Name: "CRDBuiltinOverlapPrecence", + Actions: []testAction{ + // Create CRD that overrides a builtin + applyCRD(makeCRDSpec("apiextensions.k8s.io", "Bar", true, []string{"v1", "v2", "vfake"})), + + waitForGroupVersionsV1([]metav1.GroupVersion{{Group: "apiextensions.k8s.io", Version: "vfake"}}), + waitForGroupVersionsV2([]metav1.GroupVersion{{Group: "apiextensions.k8s.io", Version: "vfake"}}), + + // Show that the builtin group-version is still used for V1 + // By showing presence of v1.CustomResourceDefinition + // and absence of v1.Bar + waitForResourcesV1([]metav1.GroupVersionResource{ + { + Group: "apiextensions.k8s.io", + Version: "v1", + Resource: "customresourcedefinitions", + }, + { + Group: "apiextensions.k8s.io", + Version: "vfake", + Resource: "bars", + }, + }), + waitForResourcesV2([]metav1.GroupVersionResource{ + { + Group: "apiextensions.k8s.io", + Version: "v1", + Resource: "customresourcedefinitions", + }, + { + Group: "apiextensions.k8s.io", + Version: "vfake", + Resource: "bars", + }, + }), + + waitForResourcesAbsentV1([]metav1.GroupVersionResource{ + { + Group: "apiextensions.k8s.io", + Version: "v1", + Resource: "bars", + }, + }), + waitForResourcesAbsentV2([]metav1.GroupVersionResource{ + { + Group: "apiextensions.k8s.io", + Version: "v1", + Resource: "bars", + }, + }), + }, + }, + { + // Tests that a race discovered during alpha phase of the feature is fixed. + // Rare race would occur if a CRD was synced before the removal of an aggregated + // APIService could be synced. + // To test this we: + // 1. Add CRD to apiserver + // 2. Wait for it to sync + // 3. Add aggregated APIService with same groupversion + // 4. Remove aggregated apiservice + // 5. Check that we have CRD GVs in discovery document + // Show that if CRD and APIService share a groupversion, and the + // APIService is deleted, and CRD updated, the groupversion from + // the CRD remains in discovery. + Name: "Race", + Actions: []testAction{ + // Create CRD with the same GV as the aggregated APIService + applyCRD(makeCRDSpec(stableGroup, "Bar", false, []string{"v1", "v2"})), + + // only CRD has stable v2, this will show that CRD has been synced + waitForGroupVersionsV1([]metav1.GroupVersion{stableV1, stableV2}), + waitForGroupVersionsV2([]metav1.GroupVersion{stableV1, stableV2}), + + // Add Aggregated APIService that overlaps the CRD. + applyAPIService( + apiregistrationv1.APIServiceSpec{ + Group: stableGroup, + Version: "v1", + InsecureSkipTLSVerify: true, + GroupPriorityMinimum: int32(1000), + VersionPriority: int32(100), + Service: &apiregistrationv1.ServiceReference{ + Name: "fake", + Namespace: "default", + }, + }, + ), + + // Delete APIService shared by the aggregated apiservice and + // CRD + deleteObject{ + GroupVersionResource: metav1.GroupVersionResource(apiregistrationv1.SchemeGroupVersion.WithResource("apiservices")), + Name: "v1.stable.example.com", + }, + + // Show the CRD (with stablev2) is the one which is now advertised + waitForGroupVersionsV1([]metav1.GroupVersion{stableV1, stableV2}), + waitForGroupVersionsV2([]metav1.GroupVersion{stableV1, stableV2}), }, }, }) diff --git a/test/integration/apiserver/discovery/framework.go b/test/integration/apiserver/discovery/framework.go index 8d624284eed..5662680c97d 100644 --- a/test/integration/apiserver/discovery/framework.go +++ b/test/integration/apiserver/discovery/framework.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" "reflect" + "strings" "time" apiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" @@ -91,6 +92,15 @@ type waitForGroupVersionsV2 []metav1.GroupVersion // Wait for groupversions to disappear from v2 discovery type waitForAbsentGroupVersionsV2 []metav1.GroupVersion +type waitForStaleGroupVersionsV2 []metav1.GroupVersion +type waitForFreshGroupVersionsV2 []metav1.GroupVersion + +type waitForResourcesV1 []metav1.GroupVersionResource +type waitForResourcesAbsentV1 []metav1.GroupVersionResource + +type waitForResourcesV2 []metav1.GroupVersionResource +type waitForResourcesAbsentV2 []metav1.GroupVersionResource + // Assert something about the current state of v2 discovery type inlineAction func(ctx context.Context, client testClient) error @@ -165,6 +175,13 @@ func (a applyCRD) Do(ctx context.Context, client testClient) error { Spec: apiextensionsv1.CustomResourceDefinitionSpec(a), } + if strings.HasSuffix(obj.Name, ".k8s.io") { + if obj.Annotations == nil { + obj.Annotations = map[string]string{} + } + obj.Annotations["api-approved.kubernetes.io"] = "https://github.com/kubernetes/kubernetes/fake" + } + unstructuredContent, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) if err != nil { return err @@ -226,6 +243,40 @@ func (d deleteObject) Do(ctx context.Context, client testClient) error { } } +func (w waitForStaleGroupVersionsV2) Do(ctx context.Context, client testClient) error { + err := WaitForResultWithCondition(ctx, client, func(result apidiscoveryv2beta1.APIGroupDiscoveryList) bool { + for _, gv := range w { + if info := FindGroupVersionV2(result, gv); info == nil || info.Freshness != apidiscoveryv2beta1.DiscoveryFreshnessStale { + return false + } + } + + return true + }) + + if err != nil { + return fmt.Errorf("waiting for stale groupversions v2 (%v): %w", w, err) + } + return nil +} + +func (w waitForFreshGroupVersionsV2) Do(ctx context.Context, client testClient) error { + err := WaitForResultWithCondition(ctx, client, func(result apidiscoveryv2beta1.APIGroupDiscoveryList) bool { + for _, gv := range w { + if info := FindGroupVersionV2(result, gv); info == nil || info.Freshness != apidiscoveryv2beta1.DiscoveryFreshnessCurrent { + return false + } + } + + return true + }) + + if err != nil { + return fmt.Errorf("waiting for fresh groupversions v2 (%v): %w", w, err) + } + return nil +} + func (w waitForGroupVersionsV2) Do(ctx context.Context, client testClient) error { err := WaitForResultWithCondition(ctx, client, func(result apidiscoveryv2beta1.APIGroupDiscoveryList) bool { for _, gv := range w { @@ -294,6 +345,137 @@ func (w waitForAbsentGroupVersionsV1) Do(ctx context.Context, client testClient) return nil } +func (w waitForResourcesV1) Do(ctx context.Context, client testClient) error { + requiredResources := map[metav1.GroupVersion][]string{} + + for _, gvr := range w { + gv := metav1.GroupVersion{Group: gvr.Group, Version: gvr.Version} + if existing, ok := requiredResources[gv]; ok { + requiredResources[gv] = append(existing, gvr.Resource) + } else { + requiredResources[gv] = []string{gvr.Resource} + } + } + + for gv, resourceNames := range requiredResources { + err := WaitForV1ResourcesWithCondition(ctx, client, gv, func(result metav1.APIResourceList) bool { + for _, name := range resourceNames { + found := false + + for _, resultResource := range result.APIResources { + if resultResource.Name == name { + found = true + break + } + } + + if !found { + return false + } + } + + return true + }) + + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return fmt.Errorf("waiting for resources v1 (%v): %w", w, err) + } + } + + return nil +} + +func (w waitForResourcesAbsentV1) Do(ctx context.Context, client testClient) error { + requiredResources := map[metav1.GroupVersion][]string{} + + for _, gvr := range w { + gv := metav1.GroupVersion{Group: gvr.Group, Version: gvr.Version} + if existing, ok := requiredResources[gv]; ok { + requiredResources[gv] = append(existing, gvr.Resource) + } else { + requiredResources[gv] = []string{gvr.Resource} + } + } + + for gv, resourceNames := range requiredResources { + err := WaitForV1ResourcesWithCondition(ctx, client, gv, func(result metav1.APIResourceList) bool { + for _, name := range resourceNames { + for _, resultResource := range result.APIResources { + if resultResource.Name == name { + return false + } + } + } + + return true + }) + + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return fmt.Errorf("waiting for absent resources v1 (%v): %w", w, err) + } + } + + return nil +} + +func (w waitForResourcesV2) Do(ctx context.Context, client testClient) error { + err := WaitForResultWithCondition(ctx, client, func(result apidiscoveryv2beta1.APIGroupDiscoveryList) bool { + for _, gvr := range w { + if info := FindGroupVersionV2(result, metav1.GroupVersion{Group: gvr.Group, Version: gvr.Version}); info == nil { + return false + } else { + found := false + for _, resultResoure := range info.Resources { + if resultResoure.Resource == gvr.Resource { + found = true + break + } + } + + if !found { + return false + } + } + } + + return true + }) + + if err != nil { + return fmt.Errorf("waiting for resources v2 (%v): %w", w, err) + } + return nil +} + +func (w waitForResourcesAbsentV2) Do(ctx context.Context, client testClient) error { + err := WaitForResultWithCondition(ctx, client, func(result apidiscoveryv2beta1.APIGroupDiscoveryList) bool { + for _, gvr := range w { + if info := FindGroupVersionV2(result, metav1.GroupVersion{Group: gvr.Group, Version: gvr.Version}); info == nil { + return false + } else { + for _, resultResoure := range info.Resources { + if resultResoure.Resource == gvr.Resource { + return false + } + } + } + } + + return true + }) + + if err != nil { + return fmt.Errorf("waiting for absent resources v2 (%v): %w", w, err) + } + return nil +} + func (i inlineAction) Do(ctx context.Context, client testClient) error { return i(ctx, client) } @@ -334,7 +516,7 @@ func FetchV1DiscoveryGroupsAtPath(ctx context.Context, client testClient, path s Discovery(). RESTClient(). Get(). - AbsPath("/apis"). + AbsPath(path). SetHeader("Accept", acceptV1JSON). Do(ctx). Raw() @@ -352,7 +534,7 @@ func FetchV1DiscoveryGroupsAtPath(ctx context.Context, client testClient, path s return groupList, nil } -func FetchV1DiscoveryResource(ctx context.Context, client testClient, gv metav1.GroupVersion) (metav1.APIGroupList, error) { +func FetchV1DiscoveryResource(ctx context.Context, client testClient, gv metav1.GroupVersion) (metav1.APIResourceList, error) { result, err := client. Discovery(). RESTClient(). @@ -363,13 +545,13 @@ func FetchV1DiscoveryResource(ctx context.Context, client testClient, gv metav1. Raw() if err != nil { - return metav1.APIGroupList{}, err + return metav1.APIResourceList{}, err } - groupList := metav1.APIGroupList{} + groupList := metav1.APIResourceList{} err = json.Unmarshal(result, &groupList) if err != nil { - return metav1.APIGroupList{}, err + return metav1.APIResourceList{}, err } return groupList, nil @@ -445,6 +627,28 @@ func WaitForV1GroupsWithCondition(ctx context.Context, client testClient, condit }) } +func WaitForV1ResourcesWithCondition(ctx context.Context, client testClient, gv metav1.GroupVersion, condition func(result metav1.APIResourceList) bool) error { + // Keep repeatedly fetching document from aggregator. + // Check to see if it contains our service within a reasonable amount of time + return wait.PollWithContext( + ctx, + 250*time.Millisecond, + 1*time.Second, + func(ctx context.Context) (done bool, err error) { + resourceList, err := FetchV1DiscoveryResource(ctx, client, gv) + + if err != nil { + return false, err + } + + if condition(resourceList) { + return true, nil + } + + return false, nil + }) +} + func FindGroupVersionV1(discovery metav1.APIGroupList, gv metav1.GroupVersion) bool { for _, documentGroup := range discovery.Groups { if documentGroup.Name != gv.Group { From 933dfe3a51e0e6fee53d14614549f47559e14b6e Mon Sep 17 00:00:00 2001 From: Alexander Zielenski Date: Tue, 21 Mar 2023 14:18:34 -0700 Subject: [PATCH 2/2] use longer timeouts to avoid flakes on heavily loaded systems --- test/integration/apiserver/discovery/framework.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/test/integration/apiserver/discovery/framework.go b/test/integration/apiserver/discovery/framework.go index 5662680c97d..4593b27bb9e 100644 --- a/test/integration/apiserver/discovery/framework.go +++ b/test/integration/apiserver/discovery/framework.go @@ -43,6 +43,8 @@ import ( const acceptV1JSON = "application/json" const acceptV2JSON = "application/json;g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList" +const maxTimeout = 10 * time.Second + type testClient interface { kubernetes.Interface aggregator.Interface @@ -143,7 +145,7 @@ func (a applyAPIService) Cleanup(ctx context.Context, client testClient) error { err = wait.PollWithContext( ctx, 250*time.Millisecond, - 1*time.Second, + maxTimeout, func(ctx context.Context) (done bool, err error) { _, err = client.ApiregistrationV1().APIServices().Get(ctx, name, metav1.GetOptions{}) if err == nil { @@ -211,7 +213,7 @@ func (a applyCRD) Cleanup(ctx context.Context, client testClient) error { err = wait.PollWithContext( ctx, 250*time.Millisecond, - 1*time.Second, + maxTimeout, func(ctx context.Context) (done bool, err error) { _, err = client.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, name, metav1.GetOptions{}) if err == nil { @@ -590,7 +592,7 @@ func WaitForResultWithCondition(ctx context.Context, client testClient, conditio return wait.PollWithContext( ctx, 250*time.Millisecond, - 1*time.Second, + maxTimeout, func(ctx context.Context) (done bool, err error) { groupList, err := FetchV2Discovery(ctx, client) if err != nil { @@ -611,7 +613,7 @@ func WaitForV1GroupsWithCondition(ctx context.Context, client testClient, condit return wait.PollWithContext( ctx, 250*time.Millisecond, - 1*time.Second, + maxTimeout, func(ctx context.Context) (done bool, err error) { groupList, err := FetchV1DiscoveryGroups(ctx, client) @@ -633,7 +635,7 @@ func WaitForV1ResourcesWithCondition(ctx context.Context, client testClient, gv return wait.PollWithContext( ctx, 250*time.Millisecond, - 1*time.Second, + maxTimeout, func(ctx context.Context) (done bool, err error) { resourceList, err := FetchV1DiscoveryResource(ctx, client, gv)