diff --git a/staging/src/k8s.io/apiextensions-apiserver/test/integration/change_test.go b/staging/src/k8s.io/apiextensions-apiserver/test/integration/change_test.go index 554a1c80691..831db12bae8 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/test/integration/change_test.go +++ b/staging/src/k8s.io/apiextensions-apiserver/test/integration/change_test.go @@ -27,6 +27,8 @@ import ( "k8s.io/apiextensions-apiserver/test/integration/fixtures" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" ) @@ -91,7 +93,7 @@ func TestChangeCRD(t *testing.T) { } }() - // Set up 100 loops creating and reading custom resources + // Set up 100 loops creating and reading and watching custom resources for i := 0; i < 100; i++ { wg.Add(1) go func(i int) { @@ -112,6 +114,31 @@ func TestChangeCRD(t *testing.T) { time.Sleep(10 * time.Millisecond) } }(i) + + wg.Add(1) + go func(i int) { + defer wg.Done() + for { + select { + case <-stopChan: + return + default: + w, err := noxuNamespacedResourceClient.Watch(metav1.ListOptions{}) + if err != nil { + t.Fatalf("unexpected error establishing watch: %v", err) + } + for event := range w.ResultChan() { + switch event.Type { + case watch.Added, watch.Modified, watch.Deleted: + // all expected + default: + t.Errorf("unexpected watch event: %#v", event) + } + } + } + time.Sleep(10 * time.Millisecond) + } + }(i) } // Let all the established get request loops soak @@ -121,5 +148,15 @@ func TestChangeCRD(t *testing.T) { close(stopChan) // Let loops drain - wg.Wait() + drained := make(chan struct{}) + go func() { + defer close(drained) + wg.Wait() + }() + + select { + case <-drained: + case <-time.After(wait.ForeverTestTimeout): + t.Error("timed out waiting for clients to complete") + } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 51c3d36ca7b..bb9fceca4b9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -351,6 +351,7 @@ func NewCacherFromConfig(config Config) *Cacher { cacher.stopWg.Add(1) go func() { defer cacher.stopWg.Done() + defer cacher.terminateAllWatchers() wait.Until( func() { if !cacher.isStopped() { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index f2a2326f3f6..103bda5f66e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -496,6 +496,42 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { } } +func TestCacheWatcherStoppedOnDestroy(t *testing.T) { + backingStorage := &dummyStorage{} + cacher, _ := newTestCacher(backingStorage, 1000) + defer cacher.Stop() + + // Wait until cacher is initialized. + cacher.ready.wait() + + w, err := cacher.Watch(context.Background(), "pods/ns", "0", storage.Everything) + if err != nil { + t.Fatalf("Failed to create watch: %v", err) + } + + watchClosed := make(chan struct{}) + go func() { + defer close(watchClosed) + for event := range w.ResultChan() { + switch event.Type { + case watch.Added, watch.Modified, watch.Deleted: + // ok + default: + t.Errorf("unexpected event %#v", event) + } + } + }() + + cacher.Stop() + + select { + case <-watchClosed: + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("timed out waiting for watch to close") + } + +} + func TestTimeBucketWatchersBasic(t *testing.T) { filter := func(_ string, _ labels.Set, _ fields.Set) bool { return true