Fix check for CRD watch priming

This commit is contained in:
Mehdy Bohlool 2018-06-09 18:58:02 -07:00
parent 1d10287d75
commit 602ddef158
4 changed files with 74 additions and 68 deletions

View File

@ -226,7 +226,7 @@ func testSimpleCRUD(t *testing.T, ns string, noxuDefinition *apiextensionsv1beta
t.Fatal(err) t.Fatal(err)
} }
if e, a := createdObjectMeta.GetUID(), deletedObjectMeta.GetUID(); e != a { if e, a := createdObjectMeta.GetUID(), deletedObjectMeta.GetUID(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected equal UID for (expected) %v, and (actual) %v", createdNoxuInstance, watchEvent.Object)
} }
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):

View File

@ -361,7 +361,7 @@ func TestValidationSchema(t *testing.T) {
}, },
Required: []string{"spec"}, Required: []string{"spec"},
} }
noxuDefinition, err = testserver.CreateNewCustomResourceDefinition(noxuDefinition, apiExtensionClient, dynamicClient) _, err = testserver.CreateNewCustomResourceDefinition(noxuDefinition, apiExtensionClient, dynamicClient)
if err != nil { if err != nil {
t.Fatalf("unable to created crd %v: %v", noxuDefinition.Name, err) t.Fatalf("unable to created crd %v: %v", noxuDefinition.Name, err)
} }

View File

@ -19,7 +19,6 @@ go_library(
"//vendor/k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset:go_default_library", "//vendor/k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset:go_default_library",
"//vendor/k8s.io/apiextensions-apiserver/pkg/cmd/server:go_default_library", "//vendor/k8s.io/apiextensions-apiserver/pkg/cmd/server:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",

View File

@ -23,7 +23,6 @@ import (
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
@ -191,6 +190,35 @@ func NewCurletInstance(namespace, name string) *unstructured.Unstructured {
} }
} }
func servedVersions(crd *apiextensionsv1beta1.CustomResourceDefinition) []string {
if len(crd.Spec.Versions) == 0 {
return []string{crd.Spec.Version}
}
var versions []string
for _, v := range crd.Spec.Versions {
if v.Served {
versions = append(versions, v.Name)
}
}
return versions
}
func existsInDiscovery(crd *apiextensionsv1beta1.CustomResourceDefinition, apiExtensionsClient clientset.Interface, version string) (bool, error) {
groupResource, err := apiExtensionsClient.Discovery().ServerResourcesForGroupVersion(crd.Spec.Group + "/" + version)
if err != nil {
if errors.IsNotFound(err) {
return false, nil
}
return false, err
}
for _, g := range groupResource.APIResources {
if g.Name == crd.Spec.Names.Plural {
return true, nil
}
}
return false, nil
}
// CreateNewCustomResourceDefinitionWatchUnsafe creates the CRD and makes sure // CreateNewCustomResourceDefinitionWatchUnsafe creates the CRD and makes sure
// the apiextension apiserver has installed the CRD. But it's not safe to watch // the apiextension apiserver has installed the CRD. But it's not safe to watch
// the created CR. Please call CreateNewCustomResourceDefinition if you need to // the created CR. Please call CreateNewCustomResourceDefinition if you need to
@ -201,19 +229,15 @@ func CreateNewCustomResourceDefinitionWatchUnsafe(crd *apiextensionsv1beta1.Cust
return nil, err return nil, err
} }
// wait until the resource appears in discovery // wait until all resources appears in discovery
err = wait.PollImmediate(500*time.Millisecond, 30*time.Second, func() (bool, error) { for _, version := range servedVersions(crd) {
resourceList, err := apiExtensionsClient.Discovery().ServerResourcesForGroupVersion(crd.Spec.Group + "/" + crd.Spec.Version) err := wait.PollImmediate(500*time.Millisecond, 30*time.Second, func() (bool, error) {
return existsInDiscovery(crd, apiExtensionsClient, version)
})
if err != nil { if err != nil {
return false, nil return nil, err
} }
for _, resource := range resourceList.APIResources { }
if resource.Name == crd.Spec.Names.Plural {
return true, nil
}
}
return false, nil
})
return crd, err return crd, err
} }
@ -234,48 +258,35 @@ func CreateNewCustomResourceDefinition(crd *apiextensionsv1beta1.CustomResourceD
// For this test, we'll actually cycle, "list/watch/create/delete" until we get an RV from list that observes the create and not an error. // For this test, we'll actually cycle, "list/watch/create/delete" until we get an RV from list that observes the create and not an error.
// This way all the tests that are checking for watches don't have to worry about RV too old problems because crazy things *could* happen // This way all the tests that are checking for watches don't have to worry about RV too old problems because crazy things *could* happen
// before like the created RV could be too old to watch. // before like the created RV could be too old to watch.
var primingErr error for _, version := range servedVersions(crd) {
wait.PollImmediate(500*time.Millisecond, 30*time.Second, func() (bool, error) { err := wait.PollImmediate(500*time.Millisecond, 30*time.Second, func() (bool, error) {
primingErr = checkForWatchCachePrimed(crd, dynamicClientSet) return isWatchCachePrimed(crd, dynamicClientSet, version)
if primingErr == nil { })
return true, nil if err != nil {
return nil, err
} }
return false, nil
})
if primingErr != nil {
return nil, primingErr
} }
return crd, nil return crd, nil
} }
func checkForWatchCachePrimed(crd *apiextensionsv1beta1.CustomResourceDefinition, dynamicClientSet dynamic.Interface) error { // isWatchCachePrimed returns true if the watch is primed for an specified version of CRD watch
func isWatchCachePrimed(crd *apiextensionsv1beta1.CustomResourceDefinition, dynamicClientSet dynamic.Interface, version string) (bool, error) {
ns := "" ns := ""
if crd.Spec.Scope != apiextensionsv1beta1.ClusterScoped { if crd.Spec.Scope != apiextensionsv1beta1.ClusterScoped {
ns = "aval" ns = "aval"
} }
gvr := schema.GroupVersionResource{Group: crd.Spec.Group, Version: crd.Spec.Version, Resource: crd.Spec.Names.Plural} gvr := schema.GroupVersionResource{Group: crd.Spec.Group, Version: version, Resource: crd.Spec.Names.Plural}
var resourceClient dynamic.ResourceInterface var resourceClient dynamic.ResourceInterface
if crd.Spec.Scope != apiextensionsv1beta1.ClusterScoped { if crd.Spec.Scope != apiextensionsv1beta1.ClusterScoped {
resourceClient = dynamicClientSet.Resource(gvr).Namespace(ns) resourceClient = dynamicClientSet.Resource(gvr).Namespace(ns)
} else { } else {
resourceClient = dynamicClientSet.Resource(gvr) resourceClient = dynamicClientSet.Resource(gvr)
} }
initialList, err := resourceClient.List(metav1.ListOptions{})
if err != nil {
return err
}
initialListListMeta, err := meta.ListAccessor(initialList)
if err != nil {
return err
}
instanceName := "setup-instance" instanceName := "setup-instance"
instance := &unstructured.Unstructured{ instance := &unstructured.Unstructured{
Object: map[string]interface{}{ Object: map[string]interface{}{
"apiVersion": crd.Spec.Group + "/" + crd.Spec.Version, "apiVersion": crd.Spec.Group + "/" + version,
"kind": crd.Spec.Names.Kind, "kind": crd.Spec.Names.Kind,
"metadata": map[string]interface{}{ "metadata": map[string]interface{}{
"namespace": ns, "namespace": ns,
@ -289,29 +300,32 @@ func checkForWatchCachePrimed(crd *apiextensionsv1beta1.CustomResourceDefinition
"spec": map[string]interface{}{}, "spec": map[string]interface{}{},
}, },
} }
if _, err := resourceClient.Create(instance); err != nil { createdInstance, err := resourceClient.Create(instance)
return err
}
// we created something, clean it up
defer func() {
resourceClient.Delete(instanceName, nil)
}()
noxuWatch, err := resourceClient.Watch(metav1.ListOptions{ResourceVersion: initialListListMeta.GetResourceVersion()})
if err != nil { if err != nil {
return err return false, err
}
err = resourceClient.Delete(createdInstance.GetName(), nil)
if err != nil {
return false, err
}
noxuWatch, err := resourceClient.Watch(metav1.ListOptions{ResourceVersion: createdInstance.GetResourceVersion()})
if err != nil {
return false, err
} }
defer noxuWatch.Stop() defer noxuWatch.Stop()
select { select {
case watchEvent := <-noxuWatch.ResultChan(): case watchEvent := <-noxuWatch.ResultChan():
if watch.Added == watchEvent.Type { if watch.Error == watchEvent.Type {
return nil return false, nil
} }
return fmt.Errorf("expected add, but got %#v", watchEvent) if watch.Deleted != watchEvent.Type {
return false, fmt.Errorf("expected DELETE, but got %#v", watchEvent)
}
return true, nil
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):
return fmt.Errorf("gave up waiting for watch event") return false, fmt.Errorf("gave up waiting for watch event")
} }
} }
@ -319,23 +333,16 @@ func DeleteCustomResourceDefinition(crd *apiextensionsv1beta1.CustomResourceDefi
if err := apiExtensionsClient.Apiextensions().CustomResourceDefinitions().Delete(crd.Name, nil); err != nil { if err := apiExtensionsClient.Apiextensions().CustomResourceDefinitions().Delete(crd.Name, nil); err != nil {
return err return err
} }
err := wait.PollImmediate(500*time.Millisecond, 30*time.Second, func() (bool, error) { for _, version := range servedVersions(crd) {
groupResource, err := apiExtensionsClient.Discovery().ServerResourcesForGroupVersion(crd.Spec.Group + "/" + crd.Spec.Version) err := wait.PollImmediate(500*time.Millisecond, 30*time.Second, func() (bool, error) {
exists, err := existsInDiscovery(crd, apiExtensionsClient, version)
return !exists, err
})
if err != nil { if err != nil {
if errors.IsNotFound(err) { return err
return true, nil
}
return false, err
} }
for _, g := range groupResource.APIResources { }
if g.Name == crd.Spec.Names.Plural { return nil
return false, nil
}
}
return true, nil
})
return err
} }
func CreateNewScaleClient(crd *apiextensionsv1beta1.CustomResourceDefinition, config *rest.Config) (scale.ScalesGetter, error) { func CreateNewScaleClient(crd *apiextensionsv1beta1.CustomResourceDefinition, config *rest.Config) (scale.ScalesGetter, error) {