Merge pull request #113764 from Jefftree/foobar

Fix bugs with aggregated discovery
This commit is contained in:
Kubernetes Prow Robot 2022-11-11 13:11:56 -08:00 committed by GitHub
commit 263ba476ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1351 additions and 320 deletions

View File

@ -137,7 +137,7 @@ func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delega
// Imbue all builtin group-priorities onto the aggregated discovery // Imbue all builtin group-priorities onto the aggregated discovery
if aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager != nil { if aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager != nil {
for gv, entry := range apiVersionPriorities { for gv, entry := range apiVersionPriorities {
aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager.SetGroupPriority(gv.Group, int(entry.group)) aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager.SetGroupVersionPriority(metav1.GroupVersion(gv), int(entry.group), int(entry.version))
} }
} }

View File

@ -269,7 +269,7 @@ func (c *DiscoveryController) sync(version schema.GroupVersion) error {
Resources: aggregatedApiResourcesForDiscovery, Resources: aggregatedApiResourcesForDiscovery,
}) })
// Default priority for CRDs // Default priority for CRDs
c.resourceManager.SetGroupPriority(version.Group, 1000) c.resourceManager.SetGroupVersionPriority(metav1.GroupVersion(version), 1000, 100)
} }
return nil return nil
} }

View File

@ -273,13 +273,17 @@ func TestResourceManagerExistingCRD(t *testing.T) {
env.FakeResourceManager.Expect(). env.FakeResourceManager.Expect().
AddGroupVersion(coolFooCRD.Spec.Group, coolFooDiscovery) AddGroupVersion(coolFooCRD.Spec.Group, coolFooDiscovery)
for _, v := range coolFooCRD.Spec.Versions {
env.FakeResourceManager.Expect(). env.FakeResourceManager.Expect().
SetGroupPriority(coolFooCRD.Spec.Group, 1000) SetGroupVersionPriority(metav1.GroupVersion{Group: coolFooCRD.Spec.Group, Version: v.Name}, 1000, 100)
}
env.FakeResourceManager.Expect(). env.FakeResourceManager.Expect().
AddGroupVersion(coolFooCRD.Spec.Group, coolFooDiscovery) AddGroupVersion(coolFooCRD.Spec.Group, coolFooDiscovery)
for _, v := range coolFooCRD.Spec.Versions {
env.FakeResourceManager.Expect(). env.FakeResourceManager.Expect().
SetGroupPriority(coolFooCRD.Spec.Group, 1000) SetGroupVersionPriority(metav1.GroupVersion{Group: coolFooCRD.Spec.Group, Version: v.Name}, 1000, 100)
}
env.Start(ctx) env.Start(ctx)
err = env.FakeResourceManager.WaitForActions(ctx, 1*time.Second) err = env.FakeResourceManager.WaitForActions(ctx, 1*time.Second)
@ -295,7 +299,10 @@ func TestResourceManagerAddedCRD(t *testing.T) {
env := setup() env := setup()
env.FakeResourceManager.Expect(). env.FakeResourceManager.Expect().
AddGroupVersion(coolFooCRD.Spec.Group, coolFooDiscovery) AddGroupVersion(coolFooCRD.Spec.Group, coolFooDiscovery)
env.FakeResourceManager.Expect().SetGroupPriority(coolFooCRD.Spec.Group, 1000) for _, v := range coolFooCRD.Spec.Versions {
env.FakeResourceManager.Expect().
SetGroupVersionPriority(metav1.GroupVersion{Group: coolFooCRD.Spec.Group, Version: v.Name}, 1000, 100)
}
env.Start(ctx) env.Start(ctx)
@ -340,7 +347,9 @@ func TestMultipleCRDSameVersion(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
env.FakeResourceManager.Expect(). env.FakeResourceManager.Expect().
AddGroupVersion(coolFooCRD.Spec.Group, coolFooDiscovery) AddGroupVersion(coolFooCRD.Spec.Group, coolFooDiscovery)
env.FakeResourceManager.Expect().SetGroupPriority(coolFooCRD.Spec.Group, 1000) for _, versionEntry := range coolFooCRD.Spec.Versions {
env.FakeResourceManager.Expect().SetGroupVersionPriority(metav1.GroupVersion{Group: coolFooCRD.Spec.Group, Version: versionEntry.Name}, 1000, 100)
}
err = env.FakeResourceManager.WaitForActions(ctx, 1*time.Second) err = env.FakeResourceManager.WaitForActions(ctx, 1*time.Second)
require.NoError(t, err) require.NoError(t, err)
@ -358,7 +367,9 @@ func TestMultipleCRDSameVersion(t *testing.T) {
env.FakeResourceManager.Expect(). env.FakeResourceManager.Expect().
AddGroupVersion(coolFooCRD.Spec.Group, mergedDiscovery) AddGroupVersion(coolFooCRD.Spec.Group, mergedDiscovery)
env.FakeResourceManager.Expect().SetGroupPriority(coolFooCRD.Spec.Group, 1000) for _, versionEntry := range coolFooCRD.Spec.Versions {
env.FakeResourceManager.Expect().SetGroupVersionPriority(metav1.GroupVersion{Group: coolFooCRD.Spec.Group, Version: versionEntry.Name}, 1000, 100)
}
err = env.FakeResourceManager.WaitForActions(ctx, 1*time.Second) err = env.FakeResourceManager.WaitForActions(ctx, 1*time.Second)
require.NoError(t, err) require.NoError(t, err)
} }
@ -388,7 +399,9 @@ func TestDiscoveryControllerResourceManagerRemovedCRD(t *testing.T) {
// Resource Manager // Resource Manager
env.FakeResourceManager.Expect(). env.FakeResourceManager.Expect().
AddGroupVersion(coolFooCRD.Spec.Group, coolFooDiscovery) AddGroupVersion(coolFooCRD.Spec.Group, coolFooDiscovery)
env.FakeResourceManager.Expect().SetGroupPriority(coolFooCRD.Spec.Group, 1000) for _, versionEntry := range coolFooCRD.Spec.Versions {
env.FakeResourceManager.Expect().SetGroupVersionPriority(metav1.GroupVersion{Group: coolFooCRD.Spec.Group, Version: versionEntry.Name}, 1000, 100)
}
err = env.FakeResourceManager.WaitForActions(ctx, 1*time.Second) err = env.FakeResourceManager.WaitForActions(ctx, 1*time.Second)
require.NoError(t, err) require.NoError(t, err)

View File

@ -110,14 +110,15 @@ func (f *fakeResourceManager) WaitForActions(ctx context.Context, timeout time.D
return err return err
} }
func (f *recorderResourceManager) SetGroupPriority(groupName string, priority int) { func (f *recorderResourceManager) SetGroupVersionPriority(gv metav1.GroupVersion, grouppriority, versionpriority int) {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
f.Actions = append(f.Actions, recorderResourceManagerAction{ f.Actions = append(f.Actions, recorderResourceManagerAction{
Type: "SetGroupPriority", Type: "SetGroupVersionPriority",
Group: groupName, Group: gv.Group,
Value: priority, Version: gv.Version,
Value: versionpriority,
}) })
} }

View File

@ -24,6 +24,7 @@ import (
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1" apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
"k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/version"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"sync/atomic" "sync/atomic"
@ -42,10 +43,12 @@ type ResourceManager interface {
// Thread-safe // Thread-safe
AddGroupVersion(groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) AddGroupVersion(groupName string, value apidiscoveryv2beta1.APIVersionDiscovery)
// Sets priority for a group for sorting discovery. // Sets a priority to be used while sorting a specific group and
// If a priority is set before the group is known, the priority will be ignored // group-version. If two versions report different priorities for
// Once a group is removed, the priority is forgotten. // the group, the higher one will be used. If the group is not
SetGroupPriority(groupName string, priority int) // known, the priority is ignored. The priority for this version
// is forgotten once the group-version is forgotten
SetGroupVersionPriority(gv metav1.GroupVersion, grouppriority, versionpriority int)
// Removes all group versions for a given group // Removes all group versions for a given group
// Thread-safe // Thread-safe
@ -73,26 +76,30 @@ type resourceDiscoveryManager struct {
// List of all apigroups & resources indexed by the resource manager // List of all apigroups & resources indexed by the resource manager
lock sync.RWMutex lock sync.RWMutex
apiGroups map[string]*apidiscoveryv2beta1.APIGroupDiscovery apiGroups map[string]*apidiscoveryv2beta1.APIGroupDiscovery
apiGroupNames map[string]int versionPriorities map[metav1.GroupVersion]priorityInfo
}
type priorityInfo struct {
GroupPriorityMinimum int
VersionPriority int
} }
func NewResourceManager() ResourceManager { func NewResourceManager() ResourceManager {
scheme := runtime.NewScheme() scheme := runtime.NewScheme()
codecs := serializer.NewCodecFactory(scheme) codecs := serializer.NewCodecFactory(scheme)
utilruntime.Must(apidiscoveryv2beta1.AddToScheme(scheme)) utilruntime.Must(apidiscoveryv2beta1.AddToScheme(scheme))
return &resourceDiscoveryManager{serializer: codecs, apiGroupNames: make(map[string]int)} return &resourceDiscoveryManager{serializer: codecs, versionPriorities: make(map[metav1.GroupVersion]priorityInfo)}
} }
func (rdm *resourceDiscoveryManager) SetGroupPriority(group string, priority int) { func (rdm *resourceDiscoveryManager) SetGroupVersionPriority(gv metav1.GroupVersion, groupPriorityMinimum, versionPriority int) {
rdm.lock.Lock() rdm.lock.Lock()
defer rdm.lock.Unlock() defer rdm.lock.Unlock()
if _, exists := rdm.apiGroupNames[group]; exists { rdm.versionPriorities[gv] = priorityInfo{
rdm.apiGroupNames[group] = priority GroupPriorityMinimum: groupPriorityMinimum,
rdm.cache.Store(nil) VersionPriority: versionPriority,
} else {
klog.Warningf("DiscoveryManager: Attempted to set priority for group %s but does not exist", group)
} }
rdm.cache.Store(nil)
} }
func (rdm *resourceDiscoveryManager) SetGroups(groups []apidiscoveryv2beta1.APIGroupDiscovery) { func (rdm *resourceDiscoveryManager) SetGroups(groups []apidiscoveryv2beta1.APIGroupDiscovery) {
@ -108,10 +115,25 @@ func (rdm *resourceDiscoveryManager) SetGroups(groups []apidiscoveryv2beta1.APIG
} }
} }
// Filter unused out apiGroupNames // Filter unused out priority entries
for name := range rdm.apiGroupNames { for gv := range rdm.versionPriorities {
if _, exists := rdm.apiGroups[name]; !exists { entry, exists := rdm.apiGroups[gv.Group]
delete(rdm.apiGroupNames, name) if !exists {
delete(rdm.versionPriorities, gv)
continue
}
containsVersion := false
for _, v := range entry.Versions {
if v.Version == gv.Version {
containsVersion = true
break
}
}
if !containsVersion {
delete(rdm.versionPriorities, gv)
} }
} }
} }
@ -161,7 +183,14 @@ func (rdm *resourceDiscoveryManager) addGroupVersionLocked(groupName string, val
Versions: []apidiscoveryv2beta1.APIVersionDiscovery{value}, Versions: []apidiscoveryv2beta1.APIVersionDiscovery{value},
} }
rdm.apiGroups[groupName] = group rdm.apiGroups[groupName] = group
rdm.apiGroupNames[groupName] = 0 }
gv := metav1.GroupVersion{Group: groupName, Version: value.Version}
if _, ok := rdm.versionPriorities[gv]; !ok {
rdm.versionPriorities[gv] = priorityInfo{
GroupPriorityMinimum: 1000,
VersionPriority: 15,
}
} }
// Reset response document so it is recreated lazily // Reset response document so it is recreated lazily
@ -189,9 +218,9 @@ func (rdm *resourceDiscoveryManager) RemoveGroupVersion(apiGroup metav1.GroupVer
return return
} }
delete(rdm.versionPriorities, apiGroup)
if len(group.Versions) == 0 { if len(group.Versions) == 0 {
delete(rdm.apiGroups, group.Name) delete(rdm.apiGroups, group.Name)
delete(rdm.apiGroupNames, group.Name)
} }
// Reset response document so it is recreated lazily // Reset response document so it is recreated lazily
@ -203,7 +232,12 @@ func (rdm *resourceDiscoveryManager) RemoveGroup(groupName string) {
defer rdm.lock.Unlock() defer rdm.lock.Unlock()
delete(rdm.apiGroups, groupName) delete(rdm.apiGroups, groupName)
delete(rdm.apiGroupNames, groupName)
for k := range rdm.versionPriorities {
if k.Group == groupName {
delete(rdm.versionPriorities, k)
}
}
// Reset response document so it is recreated lazily // Reset response document so it is recreated lazily
rdm.cache.Store(nil) rdm.cache.Store(nil)
@ -215,8 +249,40 @@ func (rdm *resourceDiscoveryManager) calculateAPIGroupsLocked() []apidiscoveryv2
// Re-order the apiGroups by their priority. // Re-order the apiGroups by their priority.
groups := []apidiscoveryv2beta1.APIGroupDiscovery{} groups := []apidiscoveryv2beta1.APIGroupDiscovery{}
for _, group := range rdm.apiGroups { for _, group := range rdm.apiGroups {
groups = append(groups, *group.DeepCopy()) copied := *group.DeepCopy()
// Re-order versions based on their priority. Use kube-aware string
// comparison as a tie breaker
sort.SliceStable(copied.Versions, func(i, j int) bool {
iVersion := copied.Versions[i].Version
jVersion := copied.Versions[j].Version
iPriority := rdm.versionPriorities[metav1.GroupVersion{Group: group.Name, Version: iVersion}].VersionPriority
jPriority := rdm.versionPriorities[metav1.GroupVersion{Group: group.Name, Version: jVersion}].VersionPriority
// Sort by version string comparator if priority is equal
if iPriority == jPriority {
return version.CompareKubeAwareVersionStrings(iVersion, jVersion) > 0
}
// i sorts before j if it has a higher priority
return iPriority > jPriority
})
groups = append(groups, *copied.DeepCopy())
}
// For each group, determine the highest minimum group priority and use that
priorities := map[string]int{}
for gv, info := range rdm.versionPriorities {
if existing, exists := priorities[gv.Group]; exists {
if existing < info.GroupPriorityMinimum {
priorities[gv.Group] = info.GroupPriorityMinimum
}
} else {
priorities[gv.Group] = info.GroupPriorityMinimum
}
} }
sort.SliceStable(groups, func(i, j int) bool { sort.SliceStable(groups, func(i, j int) bool {
@ -224,8 +290,8 @@ func (rdm *resourceDiscoveryManager) calculateAPIGroupsLocked() []apidiscoveryv2
jName := groups[j].Name jName := groups[j].Name
// Default to 0 priority by default // Default to 0 priority by default
iPriority := rdm.apiGroupNames[iName] iPriority := priorities[iName]
jPriority := rdm.apiGroupNames[jName] jPriority := priorities[jName]
// Sort discovery based on apiservice priority. // Sort discovery based on apiservice priority.
// Duplicated from staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helpers.go // Duplicated from staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helpers.go
@ -234,7 +300,7 @@ func (rdm *resourceDiscoveryManager) calculateAPIGroupsLocked() []apidiscoveryv2
return iName < jName return iName < jName
} }
// i sorts before j if it has a lower priority // i sorts before j if it has a higher priority
return iPriority > jPriority return iPriority > jPriority
}) })

View File

@ -35,6 +35,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
runtimeserializer "k8s.io/apimachinery/pkg/runtime/serializer" runtimeserializer "k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/version"
discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
) )
@ -61,6 +62,9 @@ func fuzzAPIGroups(atLeastNumGroups, maxNumGroups int, seed int64) apidiscoveryv
atLeastOne := apidiscoveryv2beta1.APIVersionDiscovery{} atLeastOne := apidiscoveryv2beta1.APIVersionDiscovery{}
c.Fuzz(&atLeastOne) c.Fuzz(&atLeastOne)
o.Versions = append(o.Versions, atLeastOne) o.Versions = append(o.Versions, atLeastOne)
sort.Slice(o.Versions[:], func(i, j int) bool {
return version.CompareKubeAwareVersionStrings(o.Versions[i].Version, o.Versions[j].Version) > 0
})
o.TypeMeta = metav1.TypeMeta{} o.TypeMeta = metav1.TypeMeta{}
var name string var name string
@ -499,3 +503,123 @@ func TestAbuse(t *testing.T) {
waitGroup.Wait() waitGroup.Wait()
} }
func TestVersionSortingNoPriority(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager.AddGroupVersion("default", apidiscoveryv2beta1.APIVersionDiscovery{
Version: "v1alpha1",
})
manager.AddGroupVersion("default", apidiscoveryv2beta1.APIVersionDiscovery{
Version: "v2beta1",
})
manager.AddGroupVersion("default", apidiscoveryv2beta1.APIVersionDiscovery{
Version: "v1",
})
manager.AddGroupVersion("default", apidiscoveryv2beta1.APIVersionDiscovery{
Version: "v1beta1",
})
manager.AddGroupVersion("default", apidiscoveryv2beta1.APIVersionDiscovery{
Version: "v2",
})
response, _, decoded := fetchPath(manager, "application/json", discoveryPath, "")
assert.Equal(t, http.StatusOK, response.StatusCode, "response should be 200 OK")
versions := decoded.Items[0].Versions
// Ensure that v1 is sorted before v1alpha1
assert.Equal(t, versions[0].Version, "v2")
assert.Equal(t, versions[1].Version, "v1")
assert.Equal(t, versions[2].Version, "v2beta1")
assert.Equal(t, versions[3].Version, "v1beta1")
assert.Equal(t, versions[4].Version, "v1alpha1")
}
func TestVersionSortingWithPriority(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager.AddGroupVersion("default", apidiscoveryv2beta1.APIVersionDiscovery{
Version: "v1",
})
manager.SetGroupVersionPriority(metav1.GroupVersion{Group: "default", Version: "v1"}, 1000, 100)
manager.AddGroupVersion("default", apidiscoveryv2beta1.APIVersionDiscovery{
Version: "v1alpha1",
})
manager.SetGroupVersionPriority(metav1.GroupVersion{Group: "default", Version: "v1alpha1"}, 1000, 200)
response, _, decoded := fetchPath(manager, "application/json", discoveryPath, "")
assert.Equal(t, http.StatusOK, response.StatusCode, "response should be 200 OK")
versions := decoded.Items[0].Versions
// Ensure that reverse alpha sort order can be overridden by setting group version priorities.
assert.Equal(t, versions[0].Version, "v1alpha1")
assert.Equal(t, versions[1].Version, "v1")
}
// if two apiservices declare conflicting priorities for their group priority, take the higher one.
func TestGroupVersionSortingConflictingPriority(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager.AddGroupVersion("default", apidiscoveryv2beta1.APIVersionDiscovery{
Version: "v1",
})
manager.SetGroupVersionPriority(metav1.GroupVersion{Group: "default", Version: "v1"}, 1000, 100)
manager.AddGroupVersion("test", apidiscoveryv2beta1.APIVersionDiscovery{
Version: "v1alpha1",
})
manager.SetGroupVersionPriority(metav1.GroupVersion{Group: "test", Version: "v1alpha1"}, 500, 100)
manager.AddGroupVersion("test", apidiscoveryv2beta1.APIVersionDiscovery{
Version: "v1alpha2",
})
manager.SetGroupVersionPriority(metav1.GroupVersion{Group: "test", Version: "v1alpha1"}, 2000, 100)
response, _, decoded := fetchPath(manager, "application/json", discoveryPath, "")
assert.Equal(t, http.StatusOK, response.StatusCode, "response should be 200 OK")
groups := decoded.Items
// Ensure that reverse alpha sort order can be overridden by setting group version priorities.
assert.Equal(t, groups[0].Name, "test")
assert.Equal(t, groups[1].Name, "default")
}
// Show that the GroupPriorityMinimum is not sticky if a higher group version is removed
// after a lower one is added
func TestStatelessGroupPriorityMinimum(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
stableGroup := "stable.example.com"
experimentalGroup := "experimental.example.com"
manager.AddGroupVersion(stableGroup, apidiscoveryv2beta1.APIVersionDiscovery{
Version: "v1",
})
manager.SetGroupVersionPriority(metav1.GroupVersion{Group: stableGroup, Version: "v1"}, 1000, 100)
manager.AddGroupVersion(experimentalGroup, apidiscoveryv2beta1.APIVersionDiscovery{
Version: "v1",
})
manager.SetGroupVersionPriority(metav1.GroupVersion{Group: experimentalGroup, Version: "v1"}, 100, 100)
manager.AddGroupVersion(experimentalGroup, apidiscoveryv2beta1.APIVersionDiscovery{
Version: "v1alpha1",
})
manager.SetGroupVersionPriority(metav1.GroupVersion{Group: experimentalGroup, Version: "v1alpha1"}, 10000, 100)
// Expect v1alpha1's group priority to be used and sort it first in the list
response, _, decoded := fetchPath(manager, "application/json", discoveryPath, "")
assert.Equal(t, http.StatusOK, response.StatusCode, "response should be 200 OK")
assert.Equal(t, decoded.Items[0].Name, "experimental.example.com")
assert.Equal(t, decoded.Items[1].Name, "stable.example.com")
// Remove v1alpha1 and expect the new lower priority to take hold
manager.RemoveGroupVersion(metav1.GroupVersion{Group: experimentalGroup, Version: "v1alpha1"})
response, _, decoded = fetchPath(manager, "application/json", discoveryPath, "")
assert.Equal(t, http.StatusOK, response.StatusCode, "response should be 200 OK")
assert.Equal(t, decoded.Items[0].Name, "stable.example.com")
assert.Equal(t, decoded.Items[1].Name, "experimental.example.com")
}

View File

@ -71,7 +71,7 @@ type action struct {
func ConvertGroupVersionIntoToDiscovery(list []metav1.APIResource) ([]apidiscoveryv2beta1.APIResourceDiscovery, error) { func ConvertGroupVersionIntoToDiscovery(list []metav1.APIResource) ([]apidiscoveryv2beta1.APIResourceDiscovery, error) {
var apiResourceList []apidiscoveryv2beta1.APIResourceDiscovery var apiResourceList []apidiscoveryv2beta1.APIResourceDiscovery
parentResources := map[string]*apidiscoveryv2beta1.APIResourceDiscovery{} parentResources := make(map[string]int)
// Loop through all top-level resources // Loop through all top-level resources
for _, r := range list { for _, r := range list {
@ -87,7 +87,7 @@ func ConvertGroupVersionIntoToDiscovery(list []metav1.APIResource) ([]apidiscove
scope = apidiscoveryv2beta1.ScopeCluster scope = apidiscoveryv2beta1.ScopeCluster
} }
apiResourceList = append(apiResourceList, apidiscoveryv2beta1.APIResourceDiscovery{ resource := apidiscoveryv2beta1.APIResourceDiscovery{
Resource: r.Name, Resource: r.Name,
Scope: scope, Scope: scope,
ResponseKind: &metav1.GroupVersionKind{ ResponseKind: &metav1.GroupVersionKind{
@ -99,8 +99,9 @@ func ConvertGroupVersionIntoToDiscovery(list []metav1.APIResource) ([]apidiscove
ShortNames: r.ShortNames, ShortNames: r.ShortNames,
Categories: r.Categories, Categories: r.Categories,
SingularResource: r.SingularName, SingularResource: r.SingularName,
}) }
parentResources[r.Name] = &apiResourceList[len(apiResourceList)-1] apiResourceList = append(apiResourceList, resource)
parentResources[r.Name] = len(apiResourceList) - 1
} }
// Loop through all subresources // Loop through all subresources
@ -120,23 +121,19 @@ func ConvertGroupVersionIntoToDiscovery(list []metav1.APIResource) ([]apidiscove
scope = apidiscoveryv2beta1.ScopeCluster scope = apidiscoveryv2beta1.ScopeCluster
} }
var parent *apidiscoveryv2beta1.APIResourceDiscovery parentidx, exists := parentResources[split[0]]
var exists bool
parent, exists = parentResources[split[0]]
if !exists { if !exists {
// If a subresource exists without a parent, create a parent // If a subresource exists without a parent, create a parent
apiResourceList = append(apiResourceList, apidiscoveryv2beta1.APIResourceDiscovery{ apiResourceList = append(apiResourceList, apidiscoveryv2beta1.APIResourceDiscovery{
Resource: split[0], Resource: split[0],
Scope: scope, Scope: scope,
}) })
parentResources[split[0]] = &apiResourceList[len(apiResourceList)-1] parentidx = len(apiResourceList) - 1
parent = &apiResourceList[len(apiResourceList)-1] parentResources[split[0]] = parentidx
parentResources[split[0]] = parent
} }
if parent.Scope != scope { if apiResourceList[parentidx].Scope != scope {
return nil, fmt.Errorf("Error: Parent %s (scope: %s) and subresource %s (scope: %s) scope do not match", split[0], parent.Scope, split[1], scope) return nil, fmt.Errorf("Error: Parent %s (scope: %s) and subresource %s (scope: %s) scope do not match", split[0], apiResourceList[parentidx].Scope, split[1], scope)
// //
} }
@ -151,9 +148,9 @@ func ConvertGroupVersionIntoToDiscovery(list []metav1.APIResource) ([]apidiscove
Kind: r.Kind, Kind: r.Kind,
} }
} }
parent.Subresources = append(parent.Subresources, subresource) apiResourceList[parentidx].Subresources = append(apiResourceList[parentidx].Subresources, subresource)
} }
return apiResourceList, nil return apiResourceList, nil
} }

View File

@ -206,6 +206,87 @@ func TestConvertAPIResourceToDiscovery(t *testing.T) {
}, },
}, },
{ {
name: "Test multiple resources and subresources",
resources: []metav1.APIResource{
{
Name: "cronjobs",
Namespaced: true,
Kind: "CronJob",
Group: "batch",
Version: "v1",
ShortNames: []string{"cj"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
{
Name: "cronjobs/status",
Namespaced: true,
Kind: "CronJob",
Group: "batch",
Version: "v1",
ShortNames: []string{"cj"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
{
Name: "deployments",
Namespaced: true,
Kind: "Deployment",
Group: "apps",
Version: "v1",
ShortNames: []string{"deploy"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
{
Name: "deployments/status",
Namespaced: true,
Kind: "Deployment",
Group: "apps",
Version: "v1",
ShortNames: []string{"deploy"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
wantAPIResourceDiscovery: []apidiscoveryv2beta1.APIResourceDiscovery{
{
Resource: "cronjobs",
Scope: apidiscoveryv2beta1.ScopeNamespace,
ResponseKind: &metav1.GroupVersionKind{
Group: "batch",
Version: "v1",
Kind: "CronJob",
},
ShortNames: []string{"cj"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
Subresources: []apidiscoveryv2beta1.APISubresourceDiscovery{{
Subresource: "status",
ResponseKind: &metav1.GroupVersionKind{
Group: "batch",
Version: "v1",
Kind: "CronJob",
},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
}},
}, {
Resource: "deployments",
Scope: apidiscoveryv2beta1.ScopeNamespace,
ResponseKind: &metav1.GroupVersionKind{
Group: "apps",
Version: "v1",
Kind: "Deployment",
},
ShortNames: []string{"deploy"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
Subresources: []apidiscoveryv2beta1.APISubresourceDiscovery{{
Subresource: "status",
ResponseKind: &metav1.GroupVersionKind{
Group: "apps",
Version: "v1",
Kind: "Deployment",
},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
}},
},
},
}, {
name: "Test with subresource with no parent", name: "Test with subresource with no parent",
resources: []metav1.APIResource{ resources: []metav1.APIResource{
{ {

View File

@ -35,6 +35,7 @@ import (
jsonpatch "github.com/evanphx/json-patch" jsonpatch "github.com/evanphx/json-patch"
"github.com/google/uuid" "github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/runtime/serializer"
@ -931,8 +932,13 @@ func installAPI(s *GenericAPIServer, c *Config) {
routes.Version{Version: c.Version}.Install(s.Handler.GoRestfulContainer) routes.Version{Version: c.Version}.Install(s.Handler.GoRestfulContainer)
if c.EnableDiscovery { if c.EnableDiscovery {
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
wrapped := discoveryendpoint.WrapAggregatedDiscoveryToHandler(s.DiscoveryGroupManager, s.AggregatedDiscoveryGroupManager)
s.Handler.GoRestfulContainer.Add(wrapped.GenerateWebService("/apis", metav1.APIGroupList{}))
} else {
s.Handler.GoRestfulContainer.Add(s.DiscoveryGroupManager.WebService()) s.Handler.GoRestfulContainer.Add(s.DiscoveryGroupManager.WebService())
} }
}
if c.FlowControl != nil && utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) { if c.FlowControl != nil && utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) {
c.FlowControl.Install(s.Handler.NonGoRestfulMux) c.FlowControl.Install(s.Handler.NonGoRestfulMux)
} }

View File

@ -38,8 +38,11 @@ import (
"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper" "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
) )
var APIRegistrationGroup string = "apiregistration.k8s.io" var APIRegistrationGroupVersion metav1.GroupVersion = metav1.GroupVersion{Group: "apiregistration.k8s.io", Version: "v1"}
var APIRegistrationGroupPriority int = 18000
// Maximum is 20000. Set to higher than that so apiregistration always is listed
// first (mirrors v1 discovery behavior)
var APIRegistrationGroupPriority int = 20001
// Given a list of APIServices and proxyHandlers for contacting them, // Given a list of APIServices and proxyHandlers for contacting them,
// DiscoveryManager caches a list of discovery documents for each server // DiscoveryManager caches a list of discovery documents for each server
@ -151,9 +154,12 @@ type groupVersionInfo struct {
// describes how to contact the server responsible for this GroupVersion. // describes how to contact the server responsible for this GroupVersion.
service serviceKey service serviceKey
// groupPriority describes the priority of the APIService for sorting // groupPriority describes the priority of the APIService's group for sorting
groupPriority int groupPriority int
// groupPriority describes the priority of the APIService version for sorting
versionPriority int
// Method for contacting the service // Method for contacting the service
handler http.Handler handler http.Handler
} }
@ -232,7 +238,7 @@ func (dm *discoveryManager) fetchFreshDiscoveryForService(gv metav1.GroupVersion
dm.setCacheEntryForService(info.service, cached) dm.setCacheEntryForService(info.service, cached)
return &cached, nil return &cached, nil
case http.StatusNotFound: case http.StatusNotAcceptable:
// Discovery Document is not being served at all. // Discovery Document is not being served at all.
// Fall back to legacy discovery information // Fall back to legacy discovery information
if len(gv.Version) == 0 { if len(gv.Version) == 0 {
@ -390,6 +396,7 @@ func (dm *discoveryManager) syncAPIService(apiServiceName string) error {
} }
dm.mergedDiscoveryHandler.AddGroupVersion(gv.Group, entry) dm.mergedDiscoveryHandler.AddGroupVersion(gv.Group, entry)
dm.mergedDiscoveryHandler.SetGroupVersionPriority(metav1.GroupVersion(gv), info.groupPriority, info.versionPriority)
return nil return nil
} }
@ -428,7 +435,7 @@ func (dm *discoveryManager) Run(stopCh <-chan struct{}) {
} }
// Ensure that apiregistration.k8s.io is the first group in the discovery group. // Ensure that apiregistration.k8s.io is the first group in the discovery group.
dm.mergedDiscoveryHandler.SetGroupPriority(APIRegistrationGroup, APIRegistrationGroupPriority) dm.mergedDiscoveryHandler.SetGroupVersionPriority(APIRegistrationGroupVersion, APIRegistrationGroupPriority, 0)
wait.PollUntil(1*time.Minute, func() (done bool, err error) { wait.PollUntil(1*time.Minute, func() (done bool, err error) {
dm.servicesLock.Lock() dm.servicesLock.Lock()
@ -458,6 +465,7 @@ func (dm *discoveryManager) AddAPIService(apiService *apiregistrationv1.APIServi
// Add or update APIService record and mark it as dirty // Add or update APIService record and mark it as dirty
dm.setInfoForAPIService(apiService.Name, &groupVersionInfo{ dm.setInfoForAPIService(apiService.Name, &groupVersionInfo{
groupPriority: int(apiService.Spec.GroupPriorityMinimum), groupPriority: int(apiService.Spec.GroupPriorityMinimum),
versionPriority: int(apiService.Spec.VersionPriority),
handler: handler, handler: handler,
lastMarkedDirty: time.Now(), lastMarkedDirty: time.Now(),
service: newServiceKey(*apiService.Spec.Service), service: newServiceKey(*apiService.Spec.Service),

View File

@ -205,6 +205,7 @@ func TestRemoveAPIService(t *testing.T) {
func TestLegacyFallback(t *testing.T) { func TestLegacyFallback(t *testing.T) {
aggregatedResourceManager := discoveryendpoint.NewResourceManager() aggregatedResourceManager := discoveryendpoint.NewResourceManager()
rootAPIsHandler := discovery.NewRootAPIsHandler(discovery.DefaultAddresses{DefaultAddress: "192.168.1.1"}, scheme.Codecs)
legacyGroupHandler := discovery.NewAPIGroupHandler(scheme.Codecs, metav1.APIGroup{ legacyGroupHandler := discovery.NewAPIGroupHandler(scheme.Codecs, metav1.APIGroup{
Name: "stable.example.com", Name: "stable.example.com",
@ -262,9 +263,11 @@ func TestLegacyFallback(t *testing.T) {
} else if r.URL.Path == "/apis/stable.example.com/v1" { } else if r.URL.Path == "/apis/stable.example.com/v1" {
// defer to legacy discovery // defer to legacy discovery
legacyResourceHandler.ServeHTTP(w, r) legacyResourceHandler.ServeHTTP(w, r)
} else if r.URL.Path == "/apis" {
rootAPIsHandler.ServeHTTP(w, r)
} else { } else {
// Unknown url // Unknown url
w.WriteHeader(http.StatusNotFound) t.Fatalf("unexpected request sent to %v", r.URL.Path)
} }
})) }))
testCtx, cancel := context.WithCancel(context.Background()) testCtx, cancel := context.WithCancel(context.Background())

View File

@ -17,28 +17,25 @@ limitations under the License.
package discovery package discovery
import ( import (
"bytes"
"context" "context"
"encoding/json"
"errors" "errors"
"fmt"
"net/http" "net/http"
"reflect" "reflect"
"strings" "strings"
"testing" "testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1" apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
runtimeserializer "k8s.io/apimachinery/pkg/runtime/serializer" runtimeserializer "k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
genericfeatures "k8s.io/apiserver/pkg/features" genericfeatures "k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -66,6 +63,8 @@ type testClientSet struct {
dynamicClientset dynamicClientset
} }
var _ testClient = testClientSet{}
func (t testClientSet) Discovery() discovery.DiscoveryInterface { func (t testClientSet) Discovery() discovery.DiscoveryInterface {
return t.kubeClientSet.Discovery() return t.kubeClientSet.Discovery()
} }
@ -94,6 +93,13 @@ var (
}, },
}, },
} }
stableGroup = "stable.example.com"
stableV1 = metav1.GroupVersion{Group: stableGroup, Version: "v1"}
stableV1alpha1 = metav1.GroupVersion{Group: stableGroup, Version: "v1alpha1"}
stableV1alpha2 = metav1.GroupVersion{Group: stableGroup, Version: "v1alpha2"}
stableV1beta1 = metav1.GroupVersion{Group: stableGroup, Version: "v1beta1"}
stableV2 = metav1.GroupVersion{Group: stableGroup, Version: "v2"}
) )
func init() { func init() {
@ -178,68 +184,6 @@ func unregisterAPIService(ctx context.Context, client aggregator.Interface, gv m
return client.ApiregistrationV1().APIServices().Delete(ctx, gv.Version+"."+gv.Group, metav1.DeleteOptions{}) return client.ApiregistrationV1().APIServices().Delete(ctx, gv.Version+"."+gv.Group, metav1.DeleteOptions{})
} }
func WaitForGroupsAbsent(ctx context.Context, client testClientSet, groups ...string) error {
return WaitForResultWithCondition(ctx, client, func(groupList apidiscoveryv2beta1.APIGroupDiscoveryList) bool {
for _, searchGroup := range groups {
for _, docGroup := range groupList.Items {
if docGroup.Name == searchGroup {
return false
}
}
}
return true
})
}
func WaitForGroups(ctx context.Context, client testClientSet, groups ...apidiscoveryv2beta1.APIGroupDiscovery) error {
return WaitForResultWithCondition(ctx, client, func(groupList apidiscoveryv2beta1.APIGroupDiscoveryList) bool {
for _, searchGroup := range groups {
for _, docGroup := range groupList.Items {
if reflect.DeepEqual(searchGroup, docGroup) {
return true
}
}
}
return false
})
}
func WaitForResultWithCondition(ctx context.Context, client testClientSet, condition func(result apidiscoveryv2beta1.APIGroupDiscoveryList) bool) error {
// Keep repeatedly fetching document from aggregator.
// Check to see if it contains our service within a reasonable amount of time
return wait.PollWithContext(
ctx,
250*time.Millisecond,
1*time.Second,
func(ctx context.Context) (done bool, err error) {
result, err := client.
Discovery().
RESTClient().
Get().
AbsPath("/apis").
SetHeader("Accept", "application/json;g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList").
Do(ctx).
Raw()
if err != nil {
return false, err
}
groupList := apidiscoveryv2beta1.APIGroupDiscoveryList{}
err = json.Unmarshal(result, &groupList)
if err != nil {
panic(err)
}
if condition(groupList) {
return true, nil
}
return false, nil
})
}
func TestAggregatedAPIServiceDiscovery(t *testing.T) { func TestAggregatedAPIServiceDiscovery(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.AggregatedDiscoveryEndpoint, true)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.AggregatedDiscoveryEndpoint, true)()
@ -264,7 +208,10 @@ func TestAggregatedAPIServiceDiscovery(t *testing.T) {
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
} }
})) }))
service.Start(t, ctx) go func() {
require.NoError(t, service.Run(ctx))
}()
require.NoError(t, service.WaitForReady(ctx))
// For each groupversion served by our resourcemanager, create an APIService // For each groupversion served by our resourcemanager, create an APIService
// object connected to our fake APIServer // object connected to our fake APIServer
@ -285,188 +232,498 @@ func TestAggregatedAPIServiceDiscovery(t *testing.T) {
require.NoError(t, WaitForGroups(ctx, client, basicTestGroup)) require.NoError(t, WaitForGroups(ctx, client, basicTestGroup))
} }
// Shows that the following sequence is handled correctly: func runTestCases(t *testing.T, cases []testCase) {
// 1. Create an APIService
// - Check that API service is in discovery doc
// 2. Create CRD with the same GroupVersion as APIService
// 3. Delete APIService
// - Check that API service is removed from discovery
// 4. Update CRD
// - Check that CRD is in discovery document
func TestOverlappingCRDAndAPIService(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.AggregatedDiscoveryEndpoint, true)()
// Keep any goroutines spawned from running past the execution of this test // Keep any goroutines spawned from running past the execution of this test
ctx, client, cleanup := setup(t) ctx, client, cleanup := setup(t)
defer cleanup() defer cleanup()
// Create a resource manager whichs serves our GroupVersion // Fetch the original discovery information so we can wait for it to
resourceManager := discoveryendpoint.NewResourceManager() // reset between tests
resourceManager.SetGroups([]apidiscoveryv2beta1.APIGroupDiscovery{basicTestGroup}) originalV1, err := FetchV1DiscoveryGroups(ctx, client)
require.NoError(t, err)
// Install our ResourceManager as an Aggregated APIService to the originalV2, err := FetchV2Discovery(ctx, client)
// test server require.NoError(t, err)
service := NewFakeService("test-server", client, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/apis" { for _, c := range cases {
resourceManager.ServeHTTP(w, r) t.Run(c.Name, func(t *testing.T) {
} else if strings.HasPrefix(r.URL.Path, "/apis/") { func() {
// Return "valid" response so APIService can be marked as "available" for _, a := range c.Actions {
w.WriteHeader(http.StatusOK) if cleaning, ok := a.(cleaningAction); ok {
} else { defer func() {
// reject openapi/v2, openapi/v3, apis/<group>/<version> require.NoError(t, cleaning.Cleanup(ctx, client))
w.WriteHeader(http.StatusNotFound) }()
} }
})) require.NoError(t, a.Do(ctx, client))
service.Start(t, ctx) }
}()
// For each groupversion served by our resourcemanager, create an APIService var diff string
// object connected to our fake APIServer err := WaitForV1GroupsWithCondition(ctx, client, func(result metav1.APIGroupList) bool {
for _, versionInfo := range basicTestGroup.Versions { diff = cmp.Diff(originalV1, result)
groupVersion := metav1.GroupVersion{ return reflect.DeepEqual(result, originalV1)
Group: basicTestGroup.Name, })
Version: versionInfo.Version, require.NoError(t, err, "v1 discovery must reset between tests: "+diff)
err = WaitForResultWithCondition(ctx, client, func(result apidiscoveryv2beta1.APIGroupDiscoveryList) bool {
diff = cmp.Diff(originalV2, result)
return reflect.DeepEqual(result, originalV2)
})
require.NoError(t, err, "v2 discovery must reset between tests: "+diff)
})
}
} }
registerAPIService(ctx, client, groupVersion, service) // Declarative tests targeting CRD integration
} func TestCRD(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.AggregatedDiscoveryEndpoint, true)()
// Keep repeatedly fetching document from aggregator. runTestCases(t, []testCase{
// Check to see if it contains our service within a reasonable amount of time {
require.NoError(t, WaitForGroups(ctx, client, basicTestGroup)) // Show that when a CRD is added it gets included on the discovery doc
// within a reasonable amount of time
// Create a CRD Name: "CRDInclusion",
crd, err := client.ApiextensionsV1().CustomResourceDefinitions().Create(ctx, &apiextensionsv1.CustomResourceDefinition{ Actions: []testAction{
ObjectMeta: metav1.ObjectMeta{ applyCRD(makeCRDSpec(stableGroup, "Foo", false, []string{"v1", "v1alpha1", "v1beta1", "v2"})),
waitForGroupVersionsV1([]metav1.GroupVersion{stableV1, stableV1alpha1, stableV1beta1, stableV2}),
waitForGroupVersionsV2([]metav1.GroupVersion{stableV1, stableV1alpha1, stableV1beta1, stableV2}),
},
},
{
// Show that a CRD added to the discovery doc can also be removed
Name: "CRDRemoval",
Actions: []testAction{
applyCRD(makeCRDSpec(stableGroup, "Foo", false, []string{"v1", "v1alpha1", "v1beta1", "v2"})),
waitForGroupVersionsV1([]metav1.GroupVersion{stableV1, stableV1alpha1, stableV1beta1, stableV2}),
waitForGroupVersionsV2([]metav1.GroupVersion{stableV1, stableV1alpha1, stableV1beta1, stableV2}),
deleteObject{
GroupVersionResource: metav1.GroupVersionResource(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions")),
Name: "foos.stable.example.com", Name: "foos.stable.example.com",
}, },
Spec: apiextensionsv1.CustomResourceDefinitionSpec{ waitForAbsentGroupVersionsV1([]metav1.GroupVersion{stableV1, stableV1alpha1, stableV1beta1, stableV2}),
Group: "stable.example.com", waitForAbsentGroupVersionsV2([]metav1.GroupVersion{stableV1, stableV1alpha1, stableV1beta1, stableV2}),
Names: apiextensionsv1.CustomResourceDefinitionNames{ },
Singular: "foo",
Plural: "foos",
Kind: "Foo",
}, },
Scope: apiextensionsv1.ClusterScoped,
Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
{ {
Name: "v1", // Show that if CRD and APIService share a groupversion, and the
// APIService is deleted, and CRD updated, the APIService remains in
// discovery.
// This test simulates a resync of CRD controler to show that eventually
// APIService is recreated
Name: "CRDAPIServiceOverlap",
Actions: []testAction{
applyAPIService(
apiregistrationv1.APIServiceSpec{
Group: stableGroup,
Version: "v1",
InsecureSkipTLSVerify: true,
GroupPriorityMinimum: int32(1000),
VersionPriority: int32(15),
Service: &apiregistrationv1.ServiceReference{
Name: "unused",
Namespace: "default",
},
},
),
// Wait for GV to appear in both discovery documents
waitForGroupVersionsV1([]metav1.GroupVersion{stableV1}),
waitForGroupVersionsV2([]metav1.GroupVersion{stableV1}),
applyCRD(makeCRDSpec(stableGroup, "Bar", false, []string{"v1", "v2"})),
// only CRD has stable v2, this will show that CRD has been synced
waitForGroupVersionsV1([]metav1.GroupVersion{stableV2}),
waitForGroupVersionsV2([]metav1.GroupVersion{stableV2}),
// Delete APIService shared by the aggregated apiservice and
// CRD
deleteObject{
GroupVersionResource: metav1.GroupVersionResource(apiregistrationv1.SchemeGroupVersion.WithResource("apiservices")),
Name: "v1.stable.example.com",
},
// Update CRD to trigger a resync by adding a category and new groupversion
applyCRD(makeCRDSpec(stableGroup, "Bar", false, []string{"v1", "v2", "v1alpha1"}, "all")),
// Show that the groupversion is re-added back
waitForGroupVersionsV1([]metav1.GroupVersion{stableV1, stableV2, stableV1alpha1}),
waitForGroupVersionsV2([]metav1.GroupVersion{stableV1, stableV2, stableV1alpha1}),
},
},
})
}
func TestFreshness(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.AggregatedDiscoveryEndpoint, true)()
requireStaleGVs := func(gvs ...metav1.GroupVersion) inlineAction {
return inlineAction(func(ctx context.Context, client testClient) error {
document, err := FetchV2Discovery(ctx, client)
if err != nil {
return nil
}
// Track the stale gvs in array for nice diff output upon test failure
staleGVs := []metav1.GroupVersion{}
// Iterate through input so order does not matter
for _, targetGv := range gvs {
entry := FindGroupVersionV2(document, targetGv)
if entry == nil {
continue
}
switch entry.Freshness {
case apidiscoveryv2beta1.DiscoveryFreshnessCurrent:
// Skip
case apidiscoveryv2beta1.DiscoveryFreshnessStale:
staleGVs = append(staleGVs, targetGv)
default:
return fmt.Errorf("unrecognized freshness '%v' on gv '%v'", entry.Freshness, targetGv)
}
}
if !(len(staleGVs) == 0 && len(gvs) == 0) && !reflect.DeepEqual(staleGVs, gvs) {
diff := cmp.Diff(staleGVs, gvs)
return fmt.Errorf("expected sets of stale gvs to be equal:\n%v", diff)
}
return nil
})
}
runTestCases(t, []testCase{
{
Name: "BuiltinsFresh",
Actions: []testAction{
// Wait for discovery ready
waitForGroupVersionsV2{metav1.GroupVersion(apiregistrationv1.SchemeGroupVersion)},
// Require there are no stale groupversions and no unrecognized
// GVs
requireStaleGVs(),
},
},
{
// CRD freshness is always current
Name: "CRDFresh",
Actions: []testAction{
// Add a CRD and wait for it to appear in discovery
applyCRD(makeCRDSpec(stableGroup, "Foo", false, []string{"v1", "v1alpha1", "v1beta1", "v2"})),
waitForGroupVersionsV1([]metav1.GroupVersion{stableV1, stableV1alpha1, stableV1beta1, stableV2}),
waitForGroupVersionsV2([]metav1.GroupVersion{stableV1, stableV1alpha1, stableV1beta1, stableV2}),
// Test CRD is current by requiring there is nothing stale
requireStaleGVs(),
},
},
{
// Make an aggregated APIService that's unreachable and show
// that its groupversion is included in the discovery document as
// stale
Name: "AggregatedUnreachable",
Actions: []testAction{
applyAPIService{
Group: stableGroup,
Version: "v1",
GroupPriorityMinimum: 1000,
VersionPriority: 15,
Service: &apiregistrationv1.ServiceReference{
Name: "doesnt-exist",
Namespace: "default",
},
},
waitForGroupVersionsV2([]metav1.GroupVersion{stableV1}),
// Require there is one and only one stale GV and it is stableV1
requireStaleGVs(stableV1),
},
},
})
}
// Shows a group for which multiple APIServices specify a GroupPriorityMinimum,
// it is sorted the same in both versions of discovery
func TestGroupPriorty(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.AggregatedDiscoveryEndpoint, true)()
makeApiServiceSpec := func(gv metav1.GroupVersion, groupPriorityMin, versionPriority int) apiregistrationv1.APIServiceSpec {
return apiregistrationv1.APIServiceSpec{
Group: gv.Group,
Version: gv.Version,
InsecureSkipTLSVerify: true,
GroupPriorityMinimum: int32(groupPriorityMin),
VersionPriority: int32(versionPriority),
Service: &apiregistrationv1.ServiceReference{
Name: "unused",
Namespace: "default",
},
}
}
checkGVOrder := inlineAction(func(ctx context.Context, client testClient) (err error) {
// Fetch v1 document and v2 document, and ensure they have
// equal orderings of groupversions. and nothing missing or
// extra.
v1GroupsAndVersions, err := FetchV1DiscoveryGroups(ctx, client)
if err != nil {
return err
}
v2GroupsAndVersions, err := FetchV2Discovery(ctx, client)
if err != nil {
return err
}
v1Gvs := []metav1.GroupVersion{}
v2Gvs := []metav1.GroupVersion{}
for _, group := range v1GroupsAndVersions.Groups {
for _, version := range group.Versions {
v1Gvs = append(v1Gvs, metav1.GroupVersion{
Group: group.Name,
Version: version.Version,
})
}
}
for _, group := range v2GroupsAndVersions.Items {
for _, version := range group.Versions {
v2Gvs = append(v2Gvs, metav1.GroupVersion{
Group: group.Name,
Version: version.Version,
})
}
}
if !reflect.DeepEqual(v1Gvs, v2Gvs) {
return fmt.Errorf("expected equal orderings and lists of groupversions in both v1 and v2 discovery:\n%v", cmp.Diff(v1Gvs, v2Gvs))
}
return nil
})
runTestCases(t, []testCase{
{
// Show that the legacy and aggregated discovery docs have the same
// set of builtin groupversions
Name: "BuiltinsAndOrdering",
Actions: []testAction{
waitForGroupVersionsV1{metav1.GroupVersion(apiregistrationv1.SchemeGroupVersion)},
waitForGroupVersionsV2{metav1.GroupVersion(apiregistrationv1.SchemeGroupVersion)},
checkGVOrder,
},
},
{
// Show that a very high priority group is sorted first (below apiregistration v1)
// Also show the ordering is same for both v1 and v2 discovery apis
// Does not vary version priority
Name: "HighGroupPriority",
Actions: []testAction{
// A VERY high priority which should take precedence
// 20000 is highest possible priority
applyAPIService(makeApiServiceSpec(stableV1, 20000, 15)),
// A VERY low priority which should be ignored
applyAPIService(makeApiServiceSpec(stableV1alpha1, 1, 15)),
// A medium-high priority (that conflicts with k8s) which should be ignored
applyAPIService(makeApiServiceSpec(stableV1alpha2, 17300, 15)),
// Wait for all the added group-versions to appear in both discovery documents
waitForGroupVersionsV1([]metav1.GroupVersion{stableV1, stableV1alpha1, stableV1alpha2}),
waitForGroupVersionsV2([]metav1.GroupVersion{stableV1, stableV1alpha1, stableV1alpha2}),
// Check that both v1 and v2 endpoints have exactly the same
// sets of groupversions
checkGVOrder,
// Check that the first group-version is the one with the highest
// priority
inlineAction(func(ctx context.Context, client testClient) error {
v2GroupsAndVersions, err := FetchV2Discovery(ctx, client)
if err != nil {
return err
}
// First group should always be apiregistration.k8s.io
secondGV := metav1.GroupVersion{
Group: v2GroupsAndVersions.Items[1].Name,
Version: v2GroupsAndVersions.Items[1].Versions[0].Version,
}
if !reflect.DeepEqual(&stableV1, &secondGV) {
return fmt.Errorf("expected second group's first version to be %v, not %v", stableV1, secondGV)
}
return nil
}),
},
},
{
// Show that a very low group priority is ordered last
Name: "LowGroupPriority",
Actions: []testAction{
// A minimal priority
applyAPIService(makeApiServiceSpec(stableV1alpha1, 1, 15)),
// Wait for all the added group-versions to appear in v2 discovery
waitForGroupVersionsV2([]metav1.GroupVersion{stableV1alpha1}),
// Check that the last group-version is the one with the lowest
// priority
inlineAction(func(ctx context.Context, client testClient) error {
v2GroupsAndVersions, err := FetchV2Discovery(ctx, client)
if err != nil {
return err
}
lastGroup := v2GroupsAndVersions.Items[len(v2GroupsAndVersions.Items)-1]
lastGV := metav1.GroupVersion{
Group: lastGroup.Name,
Version: lastGroup.Versions[0].Version,
}
if !reflect.DeepEqual(&stableV1alpha1, &lastGV) {
return fmt.Errorf("expected last group to be %v, not %v", stableV1alpha1, lastGV)
}
return nil
}),
// Wait for all the added group-versions to appear in both discovery documents
waitForGroupVersionsV1([]metav1.GroupVersion{stableV1alpha1}),
// Check that both v1 and v2 endpoints have exactly the same
// sets of groupversions
checkGVOrder,
},
},
{
// Show that versions within a group are sorted by priority
Name: "VersionPriority",
Actions: []testAction{
applyAPIService(makeApiServiceSpec(stableV1, 1000, 2)),
applyAPIService(makeApiServiceSpec(stableV1alpha1, 1000, 1)),
applyAPIService(makeApiServiceSpec(stableV1alpha2, 1000, 3)),
// Wait for all the added group-versions to appear in both discovery documents
waitForGroupVersionsV1([]metav1.GroupVersion{stableV1, stableV1alpha1, stableV1alpha2}),
waitForGroupVersionsV2([]metav1.GroupVersion{stableV1, stableV1alpha1, stableV1alpha2}),
// Check that both v1 and v2 endpoints have exactly the same
// sets of groupversions
checkGVOrder,
inlineAction(func(ctx context.Context, client testClient) error {
// Find the entry for stable.example.com
// and show the versions are ordered how we expect
v2GroupsAndVersions, err := FetchV2Discovery(ctx, client)
if err != nil {
return err
}
// Should be ordered last for this test
group := v2GroupsAndVersions.Items[len(v2GroupsAndVersions.Items)-1]
if group.Name != stableGroup {
return fmt.Errorf("group is not where we expect: found %v, expected %v", group.Name, stableGroup)
}
versionOrder := []string{}
for _, version := range group.Versions {
versionOrder = append(versionOrder, version.Version)
}
expectedOrder := []string{
stableV1alpha2.Version,
stableV1.Version,
stableV1alpha1.Version,
}
if !reflect.DeepEqual(expectedOrder, versionOrder) {
return fmt.Errorf("version in wrong order: %v", cmp.Diff(expectedOrder, versionOrder))
}
return nil
}),
},
},
{
// Show that versions within a group are sorted by priority
// and that equal versions will be sorted by a kube-aware version
// comparator
Name: "VersionPriorityTiebreaker",
Actions: []testAction{
applyAPIService(makeApiServiceSpec(stableV1, 1000, 15)),
applyAPIService(makeApiServiceSpec(stableV1alpha1, 1000, 15)),
applyAPIService(makeApiServiceSpec(stableV1alpha2, 1000, 15)),
applyAPIService(makeApiServiceSpec(stableV1beta1, 1000, 15)),
applyAPIService(makeApiServiceSpec(stableV2, 1000, 15)),
// Wait for all the added group-versions to appear in both discovery documents
waitForGroupVersionsV1([]metav1.GroupVersion{stableV1, stableV1alpha1, stableV1alpha2, stableV1beta1, stableV2}),
waitForGroupVersionsV2([]metav1.GroupVersion{stableV1, stableV1alpha1, stableV1alpha2, stableV1beta1, stableV2}),
// Check that both v1 and v2 endpoints have exactly the same
// sets of groupversions
checkGVOrder,
inlineAction(func(ctx context.Context, client testClient) error {
// Find the entry for stable.example.com
// and show the versions are ordered how we expect
v2GroupsAndVersions, err := FetchV2Discovery(ctx, client)
if err != nil {
return err
}
// Should be ordered last for this test
group := v2GroupsAndVersions.Items[len(v2GroupsAndVersions.Items)-1]
if group.Name != stableGroup {
return fmt.Errorf("group is not where we expect: found %v, expected %v", group.Name, stableGroup)
}
versionOrder := []string{}
for _, version := range group.Versions {
versionOrder = append(versionOrder, version.Version)
}
expectedOrder := []string{
stableV2.Version,
stableV1.Version,
stableV1beta1.Version,
stableV1alpha2.Version,
stableV1alpha1.Version,
}
if !reflect.DeepEqual(expectedOrder, versionOrder) {
return fmt.Errorf("version in wrong order: %v", cmp.Diff(expectedOrder, versionOrder))
}
return nil
}),
},
},
})
}
func makeCRDSpec(group string, kind string, namespaced bool, versions []string, categories ...string) apiextensionsv1.CustomResourceDefinitionSpec {
scope := apiextensionsv1.NamespaceScoped
if !namespaced {
scope = apiextensionsv1.ClusterScoped
}
plural, singular := meta.UnsafeGuessKindToResource(schema.GroupVersionKind{Kind: kind})
res := apiextensionsv1.CustomResourceDefinitionSpec{
Group: group,
Scope: scope,
Names: apiextensionsv1.CustomResourceDefinitionNames{
Plural: plural.Resource,
Singular: singular.Resource,
Kind: kind,
Categories: categories,
},
}
for i, version := range versions {
res.Versions = append(res.Versions, apiextensionsv1.CustomResourceDefinitionVersion{
Name: version,
Served: true, Served: true,
Storage: true, Storage: i == 0,
Schema: &apiextensionsv1.CustomResourceValidation{ Schema: &apiextensionsv1.CustomResourceValidation{
OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{ OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{
Type: "object", Type: "object",
Properties: map[string]apiextensionsv1.JSONSchemaProps{ Properties: map[string]apiextensionsv1.JSONSchemaProps{
"stringMap": { "data": {
Description: "a map[string]string",
Type: "object",
AdditionalProperties: &apiextensionsv1.JSONSchemaPropsOrBool{
Schema: &apiextensionsv1.JSONSchemaProps{
Type: "string", Type: "string",
}, },
}, },
}, },
}, },
},
},
},
},
},
}, metav1.CreateOptions{
FieldManager: "test-manager",
}) })
require.NoError(t, err)
// Create a CR for the CRD
// Keep trying until it succeeds (or should we try for discovery?)
require.NoError(t, wait.PollWithContext(ctx, 100*time.Millisecond, 1*time.Second, func(ctx context.Context) (done bool, err error) {
toCreate := &unstructured.Unstructured{}
toCreate.SetUnstructuredContent(map[string]any{
"apiVersion": "stable.example.com/v1",
"kind": "Foo",
"key": "value",
})
_, err = client.dynamicClientset.Resource(schema.GroupVersionResource{
Group: "stable.example.com",
Version: "v1",
Resource: "foos",
}).Create(ctx, toCreate, metav1.CreateOptions{
FieldManager: "test-manager",
})
return err != nil, nil
}))
// For each groupversion served by our resourcemanager, delete an APIService
// object connected to our fake APIServer
for _, versionInfo := range basicTestGroup.Versions {
groupVersion := metav1.GroupVersion{
Group: basicTestGroup.Name,
Version: versionInfo.Version,
} }
return res
unregisterAPIService(ctx, client, groupVersion)
}
// Wait for the apiservice to be deleted from discovery
require.NoError(t, WaitForGroupsAbsent(ctx, client, "stable.example.com"))
// Update the CRD with a minor change to show that reconciliation will
// eventually refresh the discovery group on resync
obj := &unstructured.Unstructured{}
obj.SetUnstructuredContent(map[string]interface{}{
"apiVersion": "apiextensions.k8s.io/v1",
"kind": "CustomResourceDefinition",
"metadata": map[string]any{
"name": crd.Name,
},
"spec": map[string]interface{}{
"names": map[string]any{
"categories": []string{"all"},
},
},
})
buf := bytes.NewBuffer(nil)
err = unstructured.UnstructuredJSONScheme.Encode(obj, buf)
require.NoError(t, err)
//Is there a better way to force crd resync?
_, err = client.ApiextensionsV1().CustomResourceDefinitions().Patch(
ctx,
crd.Name,
types.ApplyPatchType,
buf.Bytes(),
metav1.PatchOptions{
FieldManager: "test-manager",
},
)
require.NoError(t, err)
// Wait until the crd appears in discovery
expectedDiscovery := apidiscoveryv2beta1.APIGroupDiscovery{
ObjectMeta: metav1.ObjectMeta{
Name: basicTestGroup.Name,
},
Versions: []apidiscoveryv2beta1.APIVersionDiscovery{
{
Version: "v1",
Resources: []apidiscoveryv2beta1.APIResourceDiscovery{
{
Resource: "foos",
ResponseKind: &metav1.GroupVersionKind{
Group: basicTestGroup.Name,
Version: "v1",
Kind: "Foo",
},
Scope: apidiscoveryv2beta1.ScopeCluster,
SingularResource: crd.Spec.Names.Singular,
Verbs: []string{"delete", "deletecollection", "get", "list", "patch", "create", "update", "watch"},
Categories: []string{"all"},
},
},
//!TODO: set freshness of builtin/crds
Freshness: "",
},
},
}
require.NoError(t, WaitForGroups(ctx, client, expectedDiscovery))
} }

View File

@ -0,0 +1,478 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package discovery
import (
"context"
"encoding/json"
"fmt"
"reflect"
"time"
apiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
aggregator "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
)
const acceptV1JSON = "application/json"
const acceptV2JSON = "application/json;g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList"
type testClient interface {
kubernetes.Interface
aggregator.Interface
apiextensions.Interface
dynamic.Interface
}
// declarative framework for discovery integration tests
// each test has metadata and a list of actions which each must pass for the
// test to pass
type testCase struct {
Name string
Actions []testAction
}
// interface defining a function that does something with the integration test
// api server and returns an error. the test fails if the error is non nil
type testAction interface {
Do(ctx context.Context, client testClient) error
}
type cleaningAction interface {
testAction
Cleanup(ctx context.Context, client testClient) error
}
// apply an apiservice to the cluster
type applyAPIService apiregistrationv1.APIServiceSpec
type applyCRD apiextensionsv1.CustomResourceDefinitionSpec
type deleteObject struct {
metav1.GroupVersionResource
Namespace string
Name string
}
// Wait for groupversions to appear in v1 discovery
type waitForGroupVersionsV1 []metav1.GroupVersion
// Wait for groupversions to disappear from v2 discovery
type waitForAbsentGroupVersionsV1 []metav1.GroupVersion
// Wait for groupversions to appear in v2 discovery
type waitForGroupVersionsV2 []metav1.GroupVersion
// Wait for groupversions to disappear from v2 discovery
type waitForAbsentGroupVersionsV2 []metav1.GroupVersion
// Assert something about the current state of v2 discovery
type inlineAction func(ctx context.Context, client testClient) error
func (a applyAPIService) Do(ctx context.Context, client testClient) error {
// using dynamic client since the typed client does not support `Apply`
// operation?
obj := &apiregistrationv1.APIService{
ObjectMeta: metav1.ObjectMeta{
Name: a.Version + "." + a.Group,
},
Spec: apiregistrationv1.APIServiceSpec(a),
}
unstructuredContent, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
return err
}
unstructedObject := &unstructured.Unstructured{}
unstructedObject.SetUnstructuredContent(unstructuredContent)
unstructedObject.SetGroupVersionKind(apiregistrationv1.SchemeGroupVersion.WithKind("APIService"))
_, err = client.
Resource(apiregistrationv1.SchemeGroupVersion.WithResource("apiservices")).
Apply(ctx, obj.Name, unstructedObject, metav1.ApplyOptions{
FieldManager: "test-manager",
})
return err
}
func (a applyAPIService) Cleanup(ctx context.Context, client testClient) error {
name := a.Version + "." + a.Group
err := client.ApiregistrationV1().APIServices().Delete(ctx, name, metav1.DeleteOptions{})
if !errors.IsNotFound(err) {
return err
}
err = wait.PollWithContext(
ctx,
250*time.Millisecond,
1*time.Second,
func(ctx context.Context) (done bool, err error) {
_, err = client.ApiregistrationV1().APIServices().Get(ctx, name, metav1.GetOptions{})
if err == nil {
return false, nil
}
if !errors.IsNotFound(err) {
return false, err
}
return true, nil
},
)
if err != nil {
return fmt.Errorf("error waiting for APIService %v to clean up: %w", name, err)
}
return nil
}
func (a applyCRD) Do(ctx context.Context, client testClient) error {
// using dynamic client since the typed client does not support `Apply`
// operation?
name := a.Names.Plural + "." + a.Group
obj := &apiextensionsv1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: apiextensionsv1.CustomResourceDefinitionSpec(a),
}
unstructuredContent, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
return err
}
unstructedObject := &unstructured.Unstructured{}
unstructedObject.SetUnstructuredContent(unstructuredContent)
unstructedObject.SetGroupVersionKind(apiextensionsv1.SchemeGroupVersion.WithKind("CustomResourceDefinition"))
_, err = client.
Resource(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions")).
Apply(ctx, obj.Name, unstructedObject, metav1.ApplyOptions{
FieldManager: "test-manager",
})
return err
}
func (a applyCRD) Cleanup(ctx context.Context, client testClient) error {
name := a.Names.Plural + "." + a.Group
err := client.ApiextensionsV1().CustomResourceDefinitions().Delete(ctx, name, metav1.DeleteOptions{})
if !errors.IsNotFound(err) {
return err
}
err = wait.PollWithContext(
ctx,
250*time.Millisecond,
1*time.Second,
func(ctx context.Context) (done bool, err error) {
_, err = client.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, name, metav1.GetOptions{})
if err == nil {
return false, nil
}
if !errors.IsNotFound(err) {
return false, err
}
return true, nil
},
)
if err != nil {
return fmt.Errorf("error waiting for CRD %v to clean up: %w", name, err)
}
return nil
}
func (d deleteObject) Do(ctx context.Context, client testClient) error {
if d.Namespace == "" {
return client.Resource(schema.GroupVersionResource(d.GroupVersionResource)).
Delete(ctx, d.Name, metav1.DeleteOptions{})
} else {
return client.Resource(schema.GroupVersionResource(d.GroupVersionResource)).
Namespace(d.Namespace).
Delete(ctx, d.Name, metav1.DeleteOptions{})
}
}
func (w waitForGroupVersionsV2) Do(ctx context.Context, client testClient) error {
err := WaitForResultWithCondition(ctx, client, func(result apidiscoveryv2beta1.APIGroupDiscoveryList) bool {
for _, gv := range w {
if FindGroupVersionV2(result, gv) == nil {
return false
}
}
return true
})
if err != nil {
return fmt.Errorf("waiting for groupversions v2 (%v): %w", w, err)
}
return nil
}
func (w waitForAbsentGroupVersionsV2) Do(ctx context.Context, client testClient) error {
err := WaitForResultWithCondition(ctx, client, func(result apidiscoveryv2beta1.APIGroupDiscoveryList) bool {
for _, gv := range w {
if FindGroupVersionV2(result, gv) != nil {
return false
}
}
return true
})
if err != nil {
return fmt.Errorf("waiting for absent groupversions v2 (%v): %w", w, err)
}
return nil
}
func (w waitForGroupVersionsV1) Do(ctx context.Context, client testClient) error {
err := WaitForV1GroupsWithCondition(ctx, client, func(result metav1.APIGroupList) bool {
for _, gv := range w {
if !FindGroupVersionV1(result, gv) {
return false
}
}
return true
})
if err != nil {
return fmt.Errorf("waiting for groupversions v1 (%v): %w", w, err)
}
return nil
}
func (w waitForAbsentGroupVersionsV1) Do(ctx context.Context, client testClient) error {
err := WaitForV1GroupsWithCondition(ctx, client, func(result metav1.APIGroupList) bool {
for _, gv := range w {
if FindGroupVersionV1(result, gv) {
return false
}
}
return true
})
if err != nil {
return fmt.Errorf("waiting for absent groupversions v1 (%v): %w", w, err)
}
return nil
}
func (i inlineAction) Do(ctx context.Context, client testClient) error {
return i(ctx, client)
}
func FetchV2Discovery(ctx context.Context, client testClient) (apidiscoveryv2beta1.APIGroupDiscoveryList, error) {
result, err := client.
Discovery().
RESTClient().
Get().
AbsPath("/apis").
SetHeader("Accept", acceptV2JSON).
Do(ctx).
Raw()
if err != nil {
return apidiscoveryv2beta1.APIGroupDiscoveryList{}, fmt.Errorf("failed to fetch v2 discovery: %w", err)
}
groupList := apidiscoveryv2beta1.APIGroupDiscoveryList{}
err = json.Unmarshal(result, &groupList)
if err != nil {
return apidiscoveryv2beta1.APIGroupDiscoveryList{}, fmt.Errorf("failed to parse v2 discovery: %w", err)
}
return groupList, nil
}
func FetchV1DiscoveryGroups(ctx context.Context, client testClient) (metav1.APIGroupList, error) {
return FetchV1DiscoveryGroupsAtPath(ctx, client, "/apis")
}
func FetchV1DiscoveryLegacyGroups(ctx context.Context, client testClient) (metav1.APIGroupList, error) {
return FetchV1DiscoveryGroupsAtPath(ctx, client, "/api")
}
func FetchV1DiscoveryGroupsAtPath(ctx context.Context, client testClient, path string) (metav1.APIGroupList, error) {
result, err := client.
Discovery().
RESTClient().
Get().
AbsPath("/apis").
SetHeader("Accept", acceptV1JSON).
Do(ctx).
Raw()
if err != nil {
return metav1.APIGroupList{}, fmt.Errorf("failed to fetch v1 discovery at %v: %w", path, err)
}
groupList := metav1.APIGroupList{}
err = json.Unmarshal(result, &groupList)
if err != nil {
return metav1.APIGroupList{}, fmt.Errorf("failed to parse v1 discovery at %v: %w", path, err)
}
return groupList, nil
}
func FetchV1DiscoveryResource(ctx context.Context, client testClient, gv metav1.GroupVersion) (metav1.APIGroupList, error) {
result, err := client.
Discovery().
RESTClient().
Get().
AbsPath("/apis/"+gv.Group+"/"+gv.Version).
SetHeader("Accept", acceptV1JSON).
Do(ctx).
Raw()
if err != nil {
return metav1.APIGroupList{}, err
}
groupList := metav1.APIGroupList{}
err = json.Unmarshal(result, &groupList)
if err != nil {
return metav1.APIGroupList{}, err
}
return groupList, nil
}
func WaitForGroupsAbsent(ctx context.Context, client testClient, groups ...string) error {
return WaitForResultWithCondition(ctx, client, func(groupList apidiscoveryv2beta1.APIGroupDiscoveryList) bool {
for _, searchGroup := range groups {
for _, docGroup := range groupList.Items {
if docGroup.Name == searchGroup {
return false
}
}
}
return true
})
}
func WaitForGroups(ctx context.Context, client testClient, groups ...apidiscoveryv2beta1.APIGroupDiscovery) error {
return WaitForResultWithCondition(ctx, client, func(groupList apidiscoveryv2beta1.APIGroupDiscoveryList) bool {
for _, searchGroup := range groups {
for _, docGroup := range groupList.Items {
if reflect.DeepEqual(searchGroup, docGroup) {
return true
}
}
}
return false
})
}
func WaitForResultWithCondition(ctx context.Context, client testClient, condition func(result apidiscoveryv2beta1.APIGroupDiscoveryList) bool) error {
// Keep repeatedly fetching document from aggregator.
// Check to see if it contains our service within a reasonable amount of time
return wait.PollWithContext(
ctx,
250*time.Millisecond,
1*time.Second,
func(ctx context.Context) (done bool, err error) {
groupList, err := FetchV2Discovery(ctx, client)
if err != nil {
return false, err
}
if condition(groupList) {
return true, nil
}
return false, nil
})
}
func WaitForV1GroupsWithCondition(ctx context.Context, client testClient, condition func(result metav1.APIGroupList) bool) error {
// Keep repeatedly fetching document from aggregator.
// Check to see if it contains our service within a reasonable amount of time
return wait.PollWithContext(
ctx,
250*time.Millisecond,
1*time.Second,
func(ctx context.Context) (done bool, err error) {
groupList, err := FetchV1DiscoveryGroups(ctx, client)
if err != nil {
return false, err
}
if condition(groupList) {
return true, nil
}
return false, nil
})
}
func FindGroupVersionV1(discovery metav1.APIGroupList, gv metav1.GroupVersion) bool {
for _, documentGroup := range discovery.Groups {
if documentGroup.Name != gv.Group {
continue
}
for _, documentVersion := range documentGroup.Versions {
if documentVersion.Version == gv.Version {
return true
}
}
}
return false
}
func FindGroupVersionV2(discovery apidiscoveryv2beta1.APIGroupDiscoveryList, gv metav1.GroupVersion) *apidiscoveryv2beta1.APIVersionDiscovery {
for _, documentGroup := range discovery.Items {
if documentGroup.Name != gv.Group {
continue
}
for _, documentVersion := range documentGroup.Versions {
if documentVersion.Version == gv.Version {
return &documentVersion
}
}
}
return nil
}

View File

@ -19,15 +19,14 @@ package discovery
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url" "net/url"
"strconv" "strconv"
"sync" "sync"
"testing"
"time" "time"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
@ -38,7 +37,7 @@ import (
) )
type FakeService interface { type FakeService interface {
Start(t *testing.T, ctx context.Context) Run(ctx context.Context) error
Port() *int32 Port() *int32
Name() string Name() string
} }
@ -62,7 +61,7 @@ func NewFakeService(name string, client kubernetes.Interface, handler http.Handl
} }
} }
func (f *fakeService) run(ctx context.Context) error { func (f *fakeService) Run(ctx context.Context) error {
aggregatedServer := httptest.NewUnstartedServer(f.handler) aggregatedServer := httptest.NewUnstartedServer(f.handler)
aggregatedServer.StartTLS() aggregatedServer.StartTLS()
defer aggregatedServer.Close() defer aggregatedServer.Close()
@ -111,26 +110,24 @@ func (f *fakeService) run(ctx context.Context) error {
// Uninstall service from the cluser // Uninstall service from the cluser
err = f.client.CoreV1().Services("default").Delete(ctx, service.Name, metav1.DeleteOptions{}) err = f.client.CoreV1().Services("default").Delete(ctx, service.Name, metav1.DeleteOptions{})
return err
}
func (f *fakeService) Start(t *testing.T, ctx context.Context) {
go func() {
err := f.run(ctx)
if errors.Is(err, context.Canceled) { if errors.Is(err, context.Canceled) {
err = nil err = nil
} }
require.NoError(t, err) return err
}() }
func (f *fakeService) WaitForReady(ctx context.Context) error {
err := wait.PollWithContext(ctx, 1*time.Second, 200*time.Millisecond, func(ctx context.Context) (done bool, err error) { err := wait.PollWithContext(ctx, 1*time.Second, 200*time.Millisecond, func(ctx context.Context) (done bool, err error) {
return f.Port() != nil, nil return f.Port() != nil, nil
}) })
if errors.Is(err, context.Canceled) { if errors.Is(err, context.Canceled) {
err = nil err = nil
} else if err != nil {
err = fmt.Errorf("service should have come alive in a reasonable amount of time: %w", err)
} }
require.NoError(t, err, "service should have come alive in a reasonable amount of time")
return err
} }
func (f *fakeService) Port() *int32 { func (f *fakeService) Port() *int32 {