Consume watch event for all versions of CRD

This commit is contained in:
Mehdy Bohlool 2018-06-20 15:20:48 -07:00
parent 152b0c12da
commit 335d42165f

View File

@ -258,35 +258,41 @@ func CreateNewCustomResourceDefinition(crd *apiextensionsv1beta1.CustomResourceD
// For this test, we'll actually cycle, "list/watch/create/delete" until we get an RV from list that observes the create and not an error.
// This way all the tests that are checking for watches don't have to worry about RV too old problems because crazy things *could* happen
// before like the created RV could be too old to watch.
for _, version := range servedVersions(crd) {
err := wait.PollImmediate(500*time.Millisecond, 30*time.Second, func() (bool, error) {
return isWatchCachePrimed(crd, dynamicClientSet, version)
})
if err != nil {
return nil, err
}
err = wait.PollImmediate(500*time.Millisecond, 30*time.Second, func() (bool, error) {
return isWatchCachePrimed(crd, dynamicClientSet)
})
if err != nil {
return nil, err
}
return crd, nil
}
func resourceClientForVersion(crd *apiextensionsv1beta1.CustomResourceDefinition, dynamicClientSet dynamic.Interface, namespace, version string) dynamic.ResourceInterface {
gvr := schema.GroupVersionResource{Group: crd.Spec.Group, Version: version, Resource: crd.Spec.Names.Plural}
if crd.Spec.Scope != apiextensionsv1beta1.ClusterScoped {
return dynamicClientSet.Resource(gvr).Namespace(namespace)
} else {
return dynamicClientSet.Resource(gvr)
}
}
// isWatchCachePrimed returns true if the watch is primed for an specified version of CRD watch
func isWatchCachePrimed(crd *apiextensionsv1beta1.CustomResourceDefinition, dynamicClientSet dynamic.Interface, version string) (bool, error) {
func isWatchCachePrimed(crd *apiextensionsv1beta1.CustomResourceDefinition, dynamicClientSet dynamic.Interface) (bool, error) {
ns := ""
if crd.Spec.Scope != apiextensionsv1beta1.ClusterScoped {
ns = "aval"
}
gvr := schema.GroupVersionResource{Group: crd.Spec.Group, Version: version, Resource: crd.Spec.Names.Plural}
var resourceClient dynamic.ResourceInterface
if crd.Spec.Scope != apiextensionsv1beta1.ClusterScoped {
resourceClient = dynamicClientSet.Resource(gvr).Namespace(ns)
} else {
resourceClient = dynamicClientSet.Resource(gvr)
versions := servedVersions(crd)
if len(versions) == 0 {
return true, nil
}
resourceClient := resourceClientForVersion(crd, dynamicClientSet, ns, versions[0])
instanceName := "setup-instance"
instance := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": crd.Spec.Group + "/" + version,
"apiVersion": crd.Spec.Group + "/" + versions[0],
"kind": crd.Spec.Names.Kind,
"metadata": map[string]interface{}{
"namespace": ns,
@ -309,24 +315,35 @@ func isWatchCachePrimed(crd *apiextensionsv1beta1.CustomResourceDefinition, dyna
return false, err
}
noxuWatch, err := resourceClient.Watch(metav1.ListOptions{ResourceVersion: createdInstance.GetResourceVersion()})
if err != nil {
return false, err
}
defer noxuWatch.Stop()
// Wait for all versions of watch cache to be primed and also make sure we consumed the DELETE event for all
// versions so that any new watch with ResourceVersion=0 does not get those events. This is source of some flaky tests.
// When a client creates a watch with resourceVersion=0, it will get an ADD event for any existing objects
// but because they specified resourceVersion=0, there is no starting point in the cache buffer to return existing events
// from, thus the server will return anything from current head of the cache to the end. By accessing the delete
// events for all versions here, we make sure that the head of the cache is passed those events and they will not being
// delivered to any future watch with resourceVersion=0.
for _, v := range versions {
noxuWatch, err := resourceClientForVersion(crd, dynamicClientSet, ns, v).Watch(
metav1.ListOptions{ResourceVersion: createdInstance.GetResourceVersion()})
if err != nil {
return false, err
}
defer noxuWatch.Stop()
select {
case watchEvent := <-noxuWatch.ResultChan():
if watch.Error == watchEvent.Type {
return false, nil
select {
case watchEvent := <-noxuWatch.ResultChan():
if watch.Error == watchEvent.Type {
return false, nil
}
if watch.Deleted != watchEvent.Type {
return false, fmt.Errorf("expected DELETE, but got %#v", watchEvent)
}
case <-time.After(5 * time.Second):
return false, fmt.Errorf("gave up waiting for watch event")
}
if watch.Deleted != watchEvent.Type {
return false, fmt.Errorf("expected DELETE, but got %#v", watchEvent)
}
return true, nil
case <-time.After(5 * time.Second):
return false, fmt.Errorf("gave up waiting for watch event")
}
return true, nil
}
func DeleteCustomResourceDefinition(crd *apiextensionsv1beta1.CustomResourceDefinition, apiExtensionsClient clientset.Interface) error {