allow multiple sources to add/remove from discovery without clobbering each other

This commit is contained in:
Alexander Zielenski
2023-03-20 10:05:15 -07:00
parent 15894cfc85
commit 0740b11073
11 changed files with 655 additions and 50 deletions

View File

@@ -255,13 +255,16 @@ func runTestCases(t *testing.T, cases []testCase) {
for _, c := range cases {
t.Run(c.Name, func(t *testing.T) {
func() {
for _, a := range c.Actions {
testContext, testDone := context.WithCancel(ctx)
defer testDone()
for i, a := range c.Actions {
if cleaning, ok := a.(cleaningAction); ok {
defer func() {
require.NoError(t, cleaning.Cleanup(ctx, client))
require.NoError(t, cleaning.Cleanup(testContext, client), "cleanup after \"%T\" step %v", a, i)
}()
}
require.NoError(t, a.Do(ctx, client))
require.NoError(t, a.Do(testContext, client), "running \"%T\" step %v", a, i)
}
}()
@@ -339,9 +342,10 @@ func TestCRD(t *testing.T) {
applyCRD(makeCRDSpec(stableGroup, "Bar", false, []string{"v1", "v2"})),
// only CRD has stable v2, this will show that CRD has been synced
waitForGroupVersionsV1([]metav1.GroupVersion{stableV2}),
waitForGroupVersionsV2([]metav1.GroupVersion{stableV2}),
// Show that we have v1 and v2 but v1 is stale
waitForGroupVersionsV1([]metav1.GroupVersion{stableV1, stableV2}),
waitForStaleGroupVersionsV2([]metav1.GroupVersion{stableV1}),
waitForFreshGroupVersionsV2([]metav1.GroupVersion{stableV2}),
// Delete APIService shared by the aggregated apiservice and
// CRD
@@ -355,7 +359,160 @@ func TestCRD(t *testing.T) {
// Show that the groupversion is re-added back
waitForGroupVersionsV1([]metav1.GroupVersion{stableV1, stableV2, stableV1alpha1}),
waitForGroupVersionsV2([]metav1.GroupVersion{stableV1, stableV2, stableV1alpha1}),
waitForFreshGroupVersionsV2([]metav1.GroupVersion{stableV1, stableV2, stableV1alpha1}),
},
},
{
// Show that if CRD and Aggregated APIservice share a groupversiom,
// The aggregated apiservice's discovery information is shown in both
// v1 and v2 discovery
Name: "CRDAPIServiceSameGroupDifferentVersions",
Actions: []testAction{
// Wait for CRD to apply
applyCRD(makeCRDSpec(stableGroup, "Bar", false, []string{"v2", "v1alpha1"})),
// Wait for GV to appear in both discovery documents
waitForGroupVersionsV1([]metav1.GroupVersion{stableV2, stableV1alpha1}),
waitForGroupVersionsV2([]metav1.GroupVersion{stableV2, stableV1alpha1}),
applyAPIService(
apiregistrationv1.APIServiceSpec{
Group: stableGroup,
Version: "v1",
InsecureSkipTLSVerify: true,
GroupPriorityMinimum: int32(1000),
VersionPriority: int32(100),
Service: &apiregistrationv1.ServiceReference{
Name: "unused",
Namespace: "default",
},
},
),
// We should now have stable v1 available
waitForGroupVersionsV1([]metav1.GroupVersion{stableV1}),
waitForGroupVersionsV2([]metav1.GroupVersion{stableV1}),
// The CRD group-versions not served by the aggregated
// apiservice should still be availablee
waitForGroupVersionsV1([]metav1.GroupVersion{stableV2, stableV1alpha1}),
waitForGroupVersionsV2([]metav1.GroupVersion{stableV2, stableV1alpha1}),
// Remove API service. Show we have switched to CRD
deleteObject{
GroupVersionResource: metav1.GroupVersionResource(apiregistrationv1.SchemeGroupVersion.WithResource("apiservices")),
Name: "v1.stable.example.com",
},
// Show that we still have stable v1 since it is in the CRD
waitForGroupVersionsV1([]metav1.GroupVersion{stableV2, stableV1alpha1}),
waitForGroupVersionsV2([]metav1.GroupVersion{stableV2, stableV1alpha1}),
waitForAbsentGroupVersionsV1([]metav1.GroupVersion{stableV1}),
waitForAbsentGroupVersionsV2([]metav1.GroupVersion{stableV1}),
},
},
{
// Show that if CRD and a builtin share a group version,
// the builtin takes precedence in both versions of discovery
Name: "CRDBuiltinOverlapPrecence",
Actions: []testAction{
// Create CRD that overrides a builtin
applyCRD(makeCRDSpec("apiextensions.k8s.io", "Bar", true, []string{"v1", "v2", "vfake"})),
waitForGroupVersionsV1([]metav1.GroupVersion{{Group: "apiextensions.k8s.io", Version: "vfake"}}),
waitForGroupVersionsV2([]metav1.GroupVersion{{Group: "apiextensions.k8s.io", Version: "vfake"}}),
// Show that the builtin group-version is still used for V1
// By showing presence of v1.CustomResourceDefinition
// and absence of v1.Bar
waitForResourcesV1([]metav1.GroupVersionResource{
{
Group: "apiextensions.k8s.io",
Version: "v1",
Resource: "customresourcedefinitions",
},
{
Group: "apiextensions.k8s.io",
Version: "vfake",
Resource: "bars",
},
}),
waitForResourcesV2([]metav1.GroupVersionResource{
{
Group: "apiextensions.k8s.io",
Version: "v1",
Resource: "customresourcedefinitions",
},
{
Group: "apiextensions.k8s.io",
Version: "vfake",
Resource: "bars",
},
}),
waitForResourcesAbsentV1([]metav1.GroupVersionResource{
{
Group: "apiextensions.k8s.io",
Version: "v1",
Resource: "bars",
},
}),
waitForResourcesAbsentV2([]metav1.GroupVersionResource{
{
Group: "apiextensions.k8s.io",
Version: "v1",
Resource: "bars",
},
}),
},
},
{
// Tests that a race discovered during alpha phase of the feature is fixed.
// Rare race would occur if a CRD was synced before the removal of an aggregated
// APIService could be synced.
// To test this we:
// 1. Add CRD to apiserver
// 2. Wait for it to sync
// 3. Add aggregated APIService with same groupversion
// 4. Remove aggregated apiservice
// 5. Check that we have CRD GVs in discovery document
// Show that if CRD and APIService share a groupversion, and the
// APIService is deleted, and CRD updated, the groupversion from
// the CRD remains in discovery.
Name: "Race",
Actions: []testAction{
// Create CRD with the same GV as the aggregated APIService
applyCRD(makeCRDSpec(stableGroup, "Bar", false, []string{"v1", "v2"})),
// only CRD has stable v2, this will show that CRD has been synced
waitForGroupVersionsV1([]metav1.GroupVersion{stableV1, stableV2}),
waitForGroupVersionsV2([]metav1.GroupVersion{stableV1, stableV2}),
// Add Aggregated APIService that overlaps the CRD.
applyAPIService(
apiregistrationv1.APIServiceSpec{
Group: stableGroup,
Version: "v1",
InsecureSkipTLSVerify: true,
GroupPriorityMinimum: int32(1000),
VersionPriority: int32(100),
Service: &apiregistrationv1.ServiceReference{
Name: "fake",
Namespace: "default",
},
},
),
// Delete APIService shared by the aggregated apiservice and
// CRD
deleteObject{
GroupVersionResource: metav1.GroupVersionResource(apiregistrationv1.SchemeGroupVersion.WithResource("apiservices")),
Name: "v1.stable.example.com",
},
// Show the CRD (with stablev2) is the one which is now advertised
waitForGroupVersionsV1([]metav1.GroupVersion{stableV1, stableV2}),
waitForGroupVersionsV2([]metav1.GroupVersion{stableV1, stableV2}),
},
},
})

View File

@@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"reflect"
"strings"
"time"
apiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
@@ -91,6 +92,15 @@ type waitForGroupVersionsV2 []metav1.GroupVersion
// Wait for groupversions to disappear from v2 discovery
type waitForAbsentGroupVersionsV2 []metav1.GroupVersion
type waitForStaleGroupVersionsV2 []metav1.GroupVersion
type waitForFreshGroupVersionsV2 []metav1.GroupVersion
type waitForResourcesV1 []metav1.GroupVersionResource
type waitForResourcesAbsentV1 []metav1.GroupVersionResource
type waitForResourcesV2 []metav1.GroupVersionResource
type waitForResourcesAbsentV2 []metav1.GroupVersionResource
// Assert something about the current state of v2 discovery
type inlineAction func(ctx context.Context, client testClient) error
@@ -165,6 +175,13 @@ func (a applyCRD) Do(ctx context.Context, client testClient) error {
Spec: apiextensionsv1.CustomResourceDefinitionSpec(a),
}
if strings.HasSuffix(obj.Name, ".k8s.io") {
if obj.Annotations == nil {
obj.Annotations = map[string]string{}
}
obj.Annotations["api-approved.kubernetes.io"] = "https://github.com/kubernetes/kubernetes/fake"
}
unstructuredContent, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
return err
@@ -226,6 +243,40 @@ func (d deleteObject) Do(ctx context.Context, client testClient) error {
}
}
func (w waitForStaleGroupVersionsV2) Do(ctx context.Context, client testClient) error {
err := WaitForResultWithCondition(ctx, client, func(result apidiscoveryv2beta1.APIGroupDiscoveryList) bool {
for _, gv := range w {
if info := FindGroupVersionV2(result, gv); info == nil || info.Freshness != apidiscoveryv2beta1.DiscoveryFreshnessStale {
return false
}
}
return true
})
if err != nil {
return fmt.Errorf("waiting for stale groupversions v2 (%v): %w", w, err)
}
return nil
}
func (w waitForFreshGroupVersionsV2) Do(ctx context.Context, client testClient) error {
err := WaitForResultWithCondition(ctx, client, func(result apidiscoveryv2beta1.APIGroupDiscoveryList) bool {
for _, gv := range w {
if info := FindGroupVersionV2(result, gv); info == nil || info.Freshness != apidiscoveryv2beta1.DiscoveryFreshnessCurrent {
return false
}
}
return true
})
if err != nil {
return fmt.Errorf("waiting for fresh groupversions v2 (%v): %w", w, err)
}
return nil
}
func (w waitForGroupVersionsV2) Do(ctx context.Context, client testClient) error {
err := WaitForResultWithCondition(ctx, client, func(result apidiscoveryv2beta1.APIGroupDiscoveryList) bool {
for _, gv := range w {
@@ -294,6 +345,137 @@ func (w waitForAbsentGroupVersionsV1) Do(ctx context.Context, client testClient)
return nil
}
func (w waitForResourcesV1) Do(ctx context.Context, client testClient) error {
requiredResources := map[metav1.GroupVersion][]string{}
for _, gvr := range w {
gv := metav1.GroupVersion{Group: gvr.Group, Version: gvr.Version}
if existing, ok := requiredResources[gv]; ok {
requiredResources[gv] = append(existing, gvr.Resource)
} else {
requiredResources[gv] = []string{gvr.Resource}
}
}
for gv, resourceNames := range requiredResources {
err := WaitForV1ResourcesWithCondition(ctx, client, gv, func(result metav1.APIResourceList) bool {
for _, name := range resourceNames {
found := false
for _, resultResource := range result.APIResources {
if resultResource.Name == name {
found = true
break
}
}
if !found {
return false
}
}
return true
})
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return fmt.Errorf("waiting for resources v1 (%v): %w", w, err)
}
}
return nil
}
func (w waitForResourcesAbsentV1) Do(ctx context.Context, client testClient) error {
requiredResources := map[metav1.GroupVersion][]string{}
for _, gvr := range w {
gv := metav1.GroupVersion{Group: gvr.Group, Version: gvr.Version}
if existing, ok := requiredResources[gv]; ok {
requiredResources[gv] = append(existing, gvr.Resource)
} else {
requiredResources[gv] = []string{gvr.Resource}
}
}
for gv, resourceNames := range requiredResources {
err := WaitForV1ResourcesWithCondition(ctx, client, gv, func(result metav1.APIResourceList) bool {
for _, name := range resourceNames {
for _, resultResource := range result.APIResources {
if resultResource.Name == name {
return false
}
}
}
return true
})
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return fmt.Errorf("waiting for absent resources v1 (%v): %w", w, err)
}
}
return nil
}
func (w waitForResourcesV2) Do(ctx context.Context, client testClient) error {
err := WaitForResultWithCondition(ctx, client, func(result apidiscoveryv2beta1.APIGroupDiscoveryList) bool {
for _, gvr := range w {
if info := FindGroupVersionV2(result, metav1.GroupVersion{Group: gvr.Group, Version: gvr.Version}); info == nil {
return false
} else {
found := false
for _, resultResoure := range info.Resources {
if resultResoure.Resource == gvr.Resource {
found = true
break
}
}
if !found {
return false
}
}
}
return true
})
if err != nil {
return fmt.Errorf("waiting for resources v2 (%v): %w", w, err)
}
return nil
}
func (w waitForResourcesAbsentV2) Do(ctx context.Context, client testClient) error {
err := WaitForResultWithCondition(ctx, client, func(result apidiscoveryv2beta1.APIGroupDiscoveryList) bool {
for _, gvr := range w {
if info := FindGroupVersionV2(result, metav1.GroupVersion{Group: gvr.Group, Version: gvr.Version}); info == nil {
return false
} else {
for _, resultResoure := range info.Resources {
if resultResoure.Resource == gvr.Resource {
return false
}
}
}
}
return true
})
if err != nil {
return fmt.Errorf("waiting for absent resources v2 (%v): %w", w, err)
}
return nil
}
func (i inlineAction) Do(ctx context.Context, client testClient) error {
return i(ctx, client)
}
@@ -334,7 +516,7 @@ func FetchV1DiscoveryGroupsAtPath(ctx context.Context, client testClient, path s
Discovery().
RESTClient().
Get().
AbsPath("/apis").
AbsPath(path).
SetHeader("Accept", acceptV1JSON).
Do(ctx).
Raw()
@@ -352,7 +534,7 @@ func FetchV1DiscoveryGroupsAtPath(ctx context.Context, client testClient, path s
return groupList, nil
}
func FetchV1DiscoveryResource(ctx context.Context, client testClient, gv metav1.GroupVersion) (metav1.APIGroupList, error) {
func FetchV1DiscoveryResource(ctx context.Context, client testClient, gv metav1.GroupVersion) (metav1.APIResourceList, error) {
result, err := client.
Discovery().
RESTClient().
@@ -363,13 +545,13 @@ func FetchV1DiscoveryResource(ctx context.Context, client testClient, gv metav1.
Raw()
if err != nil {
return metav1.APIGroupList{}, err
return metav1.APIResourceList{}, err
}
groupList := metav1.APIGroupList{}
groupList := metav1.APIResourceList{}
err = json.Unmarshal(result, &groupList)
if err != nil {
return metav1.APIGroupList{}, err
return metav1.APIResourceList{}, err
}
return groupList, nil
@@ -445,6 +627,28 @@ func WaitForV1GroupsWithCondition(ctx context.Context, client testClient, condit
})
}
func WaitForV1ResourcesWithCondition(ctx context.Context, client testClient, gv metav1.GroupVersion, condition func(result metav1.APIResourceList) bool) error {
// Keep repeatedly fetching document from aggregator.
// Check to see if it contains our service within a reasonable amount of time
return wait.PollWithContext(
ctx,
250*time.Millisecond,
1*time.Second,
func(ctx context.Context) (done bool, err error) {
resourceList, err := FetchV1DiscoveryResource(ctx, client, gv)
if err != nil {
return false, err
}
if condition(resourceList) {
return true, nil
}
return false, nil
})
}
func FindGroupVersionV1(discovery metav1.APIGroupList, gv metav1.GroupVersion) bool {
for _, documentGroup := range discovery.Groups {
if documentGroup.Name != gv.Group {