diff --git a/staging/src/k8s.io/apiextensions-apiserver/test/integration/change_test.go b/staging/src/k8s.io/apiextensions-apiserver/test/integration/change_test.go index b0ddb00da21..59811f57c24 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/test/integration/change_test.go +++ b/staging/src/k8s.io/apiextensions-apiserver/test/integration/change_test.go @@ -59,9 +59,74 @@ func TestChangeCRD(t *testing.T) { ns := "default" noxuNamespacedResourceClient := newNamespacedCustomResourceVersionedClient(ns, dynamicClient, noxuDefinition, "v1beta1") - stopChan := make(chan struct{}) + updateCRD := func() { + noxuDefinitionToUpdate, err := apiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), noxuDefinition.Name, metav1.GetOptions{}) + if err != nil { + t.Error(err) + return + } + if len(noxuDefinitionToUpdate.Spec.Versions) == 1 { + v2 := noxuDefinitionToUpdate.Spec.Versions[0] + v2.Name = "v2" + v2.Served = true + v2.Storage = false + noxuDefinitionToUpdate.Spec.Versions = append(noxuDefinitionToUpdate.Spec.Versions, v2) + } else { + noxuDefinitionToUpdate.Spec.Versions = noxuDefinitionToUpdate.Spec.Versions[0:1] + } + if _, err := apiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Update(context.TODO(), noxuDefinitionToUpdate, metav1.UpdateOptions{}); err != nil && !apierrors.IsConflict(err) { + t.Error(err) + } + } + // Set up 10 watchers for custom resource. + // We can't exercise them in a loop the same way as get requests, as watchcache + // can reject them with 429 and Retry-After: 1 if it is uninitialized and even + // though 429 is automatically retried, with frequent watchcache terminations and + // reinitializations they could either end-up being rejected N times and fail or + // or not initialize until the last watchcache reinitialization and then not be + // terminated. Thus we exercise their termination explicitly at the beginning. wg := &sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + + w, err := noxuNamespacedResourceClient.Watch(context.TODO(), metav1.ListOptions{}) + if err != nil { + t.Errorf("unexpected error establishing watch: %v", err) + return + } + for event := range w.ResultChan() { + switch event.Type { + case watch.Added, watch.Modified, watch.Deleted: + // all expected + default: + t.Errorf("unexpected watch event: %#v", event) + } + } + }(i) + } + + // Let all the established watches soak request loops soak + time.Sleep(5 * time.Second) + + // Update CRD and ensure that all watches are gracefully terminated. + updateCRD() + + drained := make(chan struct{}) + go func() { + defer close(drained) + wg.Wait() + }() + + select { + case <-drained: + case <-time.After(wait.ForeverTestTimeout): + t.Fatal("timed out waiting for watchers to be terminated") + } + + stopChan := make(chan struct{}) // Set up loop to modify CRD in the background wg.Add(1) @@ -76,28 +141,11 @@ func TestChangeCRD(t *testing.T) { time.Sleep(10 * time.Millisecond) - noxuDefinitionToUpdate, err := apiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), noxuDefinition.Name, metav1.GetOptions{}) - if err != nil { - t.Error(err) - continue - } - if len(noxuDefinitionToUpdate.Spec.Versions) == 1 { - v2 := noxuDefinitionToUpdate.Spec.Versions[0] - v2.Name = "v2" - v2.Served = true - v2.Storage = false - noxuDefinitionToUpdate.Spec.Versions = append(noxuDefinitionToUpdate.Spec.Versions, v2) - } else { - noxuDefinitionToUpdate.Spec.Versions = noxuDefinitionToUpdate.Spec.Versions[0:1] - } - if _, err := apiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Update(context.TODO(), noxuDefinitionToUpdate, metav1.UpdateOptions{}); err != nil && !apierrors.IsConflict(err) { - t.Error(err) - continue - } + updateCRD() } }() - // Set up 10 loops creating and reading and watching custom resources + // Set up 10 loops creating and reading custom resources for i := 0; i < 10; i++ { wg.Add(1) go func(i int) { @@ -120,32 +168,6 @@ func TestChangeCRD(t *testing.T) { } } }(i) - - wg.Add(1) - go func(i int) { - defer wg.Done() - for { - time.Sleep(10 * time.Millisecond) - select { - case <-stopChan: - return - default: - w, err := noxuNamespacedResourceClient.Watch(context.TODO(), metav1.ListOptions{}) - if err != nil { - t.Errorf("unexpected error establishing watch: %v", err) - continue - } - for event := range w.ResultChan() { - switch event.Type { - case watch.Added, watch.Modified, watch.Deleted: - // all expected - default: - t.Errorf("unexpected watch event: %#v", event) - } - } - } - } - }(i) } // Let all the established get request loops soak @@ -155,7 +177,7 @@ func TestChangeCRD(t *testing.T) { close(stopChan) // Let loops drain - drained := make(chan struct{}) + drained = make(chan struct{}) go func() { defer close(drained) wg.Wait() @@ -164,6 +186,6 @@ func TestChangeCRD(t *testing.T) { select { case <-drained: case <-time.After(wait.ForeverTestTimeout): - t.Error("timed out waiting for clients to complete") + t.Fatal("timed out waiting for clients to complete") } }