From 335d42165f2c889a1f232e318c1567a518206b79 Mon Sep 17 00:00:00 2001 From: Mehdy Bohlool Date: Wed, 20 Jun 2018 15:20:48 -0700 Subject: [PATCH] Consume watch event for all versions of CRD --- .../test/integration/testserver/resources.go | 77 +++++++++++-------- 1 file changed, 47 insertions(+), 30 deletions(-) diff --git a/staging/src/k8s.io/apiextensions-apiserver/test/integration/testserver/resources.go b/staging/src/k8s.io/apiextensions-apiserver/test/integration/testserver/resources.go index f341b002f77..d33bf737e8b 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/test/integration/testserver/resources.go +++ b/staging/src/k8s.io/apiextensions-apiserver/test/integration/testserver/resources.go @@ -258,35 +258,41 @@ func CreateNewCustomResourceDefinition(crd *apiextensionsv1beta1.CustomResourceD // For this test, we'll actually cycle, "list/watch/create/delete" until we get an RV from list that observes the create and not an error. // This way all the tests that are checking for watches don't have to worry about RV too old problems because crazy things *could* happen // before like the created RV could be too old to watch. - for _, version := range servedVersions(crd) { - err := wait.PollImmediate(500*time.Millisecond, 30*time.Second, func() (bool, error) { - return isWatchCachePrimed(crd, dynamicClientSet, version) - }) - if err != nil { - return nil, err - } + err = wait.PollImmediate(500*time.Millisecond, 30*time.Second, func() (bool, error) { + return isWatchCachePrimed(crd, dynamicClientSet) + }) + if err != nil { + return nil, err } return crd, nil } +func resourceClientForVersion(crd *apiextensionsv1beta1.CustomResourceDefinition, dynamicClientSet dynamic.Interface, namespace, version string) dynamic.ResourceInterface { + gvr := schema.GroupVersionResource{Group: crd.Spec.Group, Version: version, Resource: crd.Spec.Names.Plural} + if crd.Spec.Scope != apiextensionsv1beta1.ClusterScoped { + return dynamicClientSet.Resource(gvr).Namespace(namespace) + } else { + return dynamicClientSet.Resource(gvr) + } +} + // isWatchCachePrimed returns true if the watch is primed for an specified version of CRD watch -func isWatchCachePrimed(crd *apiextensionsv1beta1.CustomResourceDefinition, dynamicClientSet dynamic.Interface, version string) (bool, error) { +func isWatchCachePrimed(crd *apiextensionsv1beta1.CustomResourceDefinition, dynamicClientSet dynamic.Interface) (bool, error) { ns := "" if crd.Spec.Scope != apiextensionsv1beta1.ClusterScoped { ns = "aval" } - gvr := schema.GroupVersionResource{Group: crd.Spec.Group, Version: version, Resource: crd.Spec.Names.Plural} - var resourceClient dynamic.ResourceInterface - if crd.Spec.Scope != apiextensionsv1beta1.ClusterScoped { - resourceClient = dynamicClientSet.Resource(gvr).Namespace(ns) - } else { - resourceClient = dynamicClientSet.Resource(gvr) + versions := servedVersions(crd) + if len(versions) == 0 { + return true, nil } + + resourceClient := resourceClientForVersion(crd, dynamicClientSet, ns, versions[0]) instanceName := "setup-instance" instance := &unstructured.Unstructured{ Object: map[string]interface{}{ - "apiVersion": crd.Spec.Group + "/" + version, + "apiVersion": crd.Spec.Group + "/" + versions[0], "kind": crd.Spec.Names.Kind, "metadata": map[string]interface{}{ "namespace": ns, @@ -309,24 +315,35 @@ func isWatchCachePrimed(crd *apiextensionsv1beta1.CustomResourceDefinition, dyna return false, err } - noxuWatch, err := resourceClient.Watch(metav1.ListOptions{ResourceVersion: createdInstance.GetResourceVersion()}) - if err != nil { - return false, err - } - defer noxuWatch.Stop() + // Wait for all versions of watch cache to be primed and also make sure we consumed the DELETE event for all + // versions so that any new watch with ResourceVersion=0 does not get those events. This is source of some flaky tests. + // When a client creates a watch with resourceVersion=0, it will get an ADD event for any existing objects + // but because they specified resourceVersion=0, there is no starting point in the cache buffer to return existing events + // from, thus the server will return anything from current head of the cache to the end. By accessing the delete + // events for all versions here, we make sure that the head of the cache is passed those events and they will not being + // delivered to any future watch with resourceVersion=0. + for _, v := range versions { + noxuWatch, err := resourceClientForVersion(crd, dynamicClientSet, ns, v).Watch( + metav1.ListOptions{ResourceVersion: createdInstance.GetResourceVersion()}) + if err != nil { + return false, err + } + defer noxuWatch.Stop() - select { - case watchEvent := <-noxuWatch.ResultChan(): - if watch.Error == watchEvent.Type { - return false, nil + select { + case watchEvent := <-noxuWatch.ResultChan(): + if watch.Error == watchEvent.Type { + return false, nil + } + if watch.Deleted != watchEvent.Type { + return false, fmt.Errorf("expected DELETE, but got %#v", watchEvent) + } + case <-time.After(5 * time.Second): + return false, fmt.Errorf("gave up waiting for watch event") } - if watch.Deleted != watchEvent.Type { - return false, fmt.Errorf("expected DELETE, but got %#v", watchEvent) - } - return true, nil - case <-time.After(5 * time.Second): - return false, fmt.Errorf("gave up waiting for watch event") } + + return true, nil } func DeleteCustomResourceDefinition(crd *apiextensionsv1beta1.CustomResourceDefinition, apiExtensionsClient clientset.Interface) error {