diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 2487d407503..116c2c117ed 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -23,15 +23,11 @@ import ( "testing" "time" - storagetesting "k8s.io/apiserver/pkg/storage/testing" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" - "k8s.io/apiserver/pkg/apis/example" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/etcd3/testserver" + storagetesting "k8s.io/apiserver/pkg/storage/testing" + ) func TestWatch(t *testing.T) { @@ -90,32 +86,7 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) { func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { ctx, store, _ := testSetup(t) - - key, storedObj := storagetesting.TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) - - watchCtx, _ := context.WithTimeout(ctx, wait.ForeverTestTimeout) - w, err := store.Watch(watchCtx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything}) - if err != nil { - t.Fatalf("Watch failed: %v", err) - } - - deletedObj := &example.Pod{} - if err := store.Delete(ctx, key, deletedObj, &storage.Preconditions{}, storage.ValidateAllObjectFunc, nil); err != nil { - t.Fatalf("Delete failed: %v", err) - } - - // Verify that ResourceVersion has changed on deletion. - if storedObj.ResourceVersion == deletedObj.ResourceVersion { - t.Fatalf("ResourceVersion didn't changed on deletion: %s", deletedObj.ResourceVersion) - } - - select { - case event := <-w.ResultChan(): - watchedDeleteObj := event.Object.(*example.Pod) - if e, a := deletedObj.ResourceVersion, watchedDeleteObj.ResourceVersion; e != a { - t.Errorf("Unexpected resource version: %v, expected %v", a, e) - } - } + storagetesting.RunTestWatchDeleteEventObjectHaveLatestRV(ctx, t, store) } func TestWatchInitializationSignal(t *testing.T) { @@ -128,43 +99,5 @@ func TestProgressNotify(t *testing.T) { clusterConfig.ExperimentalWatchProgressNotifyInterval = time.Second ctx, store, _ := testSetup(t, withClientConfig(clusterConfig)) - key := "/somekey" - input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}} - out := &example.Pod{} - if err := store.Create(ctx, key, input, out, 0); err != nil { - t.Fatalf("Create failed: %v", err) - } - validateResourceVersion := storagetesting.ResourceVersionNotOlderThan(out.ResourceVersion) - - opts := storage.ListOptions{ - ResourceVersion: out.ResourceVersion, - Predicate: storage.Everything, - ProgressNotify: true, - } - w, err := store.Watch(ctx, key, opts) - if err != nil { - t.Fatalf("Watch failed: %v", err) - } - - // when we send a bookmark event, the client expects the event to contain an - // object of the correct type, but with no fields set other than the resourceVersion - storagetesting.TestCheckResultFunc(t, watch.Bookmark, w, func(object runtime.Object) error { - // first, check that we have the correct resource version - obj, ok := object.(metav1.Object) - if !ok { - return fmt.Errorf("got %T, not metav1.Object", object) - } - if err := validateResourceVersion(obj.GetResourceVersion()); err != nil { - return err - } - - // then, check that we have the right type and content - pod, ok := object.(*example.Pod) - if !ok { - return fmt.Errorf("got %T, not *example.Pod", object) - } - pod.ResourceVersion = "" - storagetesting.ExpectNoDiff(t, "bookmark event should contain an object with no fields set other than resourceVersion", newPod(), pod) - return nil - }) + storagetesting.RunOptionalTestProgressNotify(ctx, t, store) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go index 36ffb9374d1..4876cf36ba9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go @@ -18,6 +18,7 @@ package testing import ( "context" + "fmt" "testing" "time" @@ -261,6 +262,34 @@ func RunTestWatchContextCancel(ctx context.Context, t *testing.T, store storage. } } +func RunTestWatchDeleteEventObjectHaveLatestRV(ctx context.Context, t *testing.T, store storage.Interface) { + key, storedObj := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + + watchCtx, _ := context.WithTimeout(ctx, wait.ForeverTestTimeout) + w, err := store.Watch(watchCtx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything}) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + + deletedObj := &example.Pod{} + if err := store.Delete(ctx, key, deletedObj, &storage.Preconditions{}, storage.ValidateAllObjectFunc, nil); err != nil { + t.Fatalf("Delete failed: %v", err) + } + + // Verify that ResourceVersion has changed on deletion. + if storedObj.ResourceVersion == deletedObj.ResourceVersion { + t.Fatalf("ResourceVersion didn't changed on deletion: %s", deletedObj.ResourceVersion) + } + + select { + case event := <-w.ResultChan(): + watchedDeleteObj := event.Object.(*example.Pod) + if e, a := deletedObj.ResourceVersion, watchedDeleteObj.ResourceVersion; e != a { + t.Errorf("Unexpected resource version: %v, expected %v", a, e) + } + } +} + func RunTestWatchInitializationSignal(ctx context.Context, t *testing.T, store storage.Interface) { ctx, _ = context.WithTimeout(ctx, 5*time.Second) initSignal := utilflowcontrol.NewInitializationSignal() @@ -275,6 +304,52 @@ func RunTestWatchInitializationSignal(ctx context.Context, t *testing.T, store s initSignal.Wait() } +// RunOptionalTestProgressNotify tests ProgressNotify feature of ListOptions. +// Given this feature is currently not explicitly used by higher layers of Kubernetes +// (it rather is used by wrappers of storage.Interface to implement its functionalities) +// this test is currently considered optional. +func RunOptionalTestProgressNotify(ctx context.Context, t *testing.T, store storage.Interface) { + key := "/somekey" + input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}} + out := &example.Pod{} + if err := store.Create(ctx, key, input, out, 0); err != nil { + t.Fatalf("Create failed: %v", err) + } + validateResourceVersion := ResourceVersionNotOlderThan(out.ResourceVersion) + + opts := storage.ListOptions{ + ResourceVersion: out.ResourceVersion, + Predicate: storage.Everything, + ProgressNotify: true, + } + w, err := store.Watch(ctx, key, opts) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + + // when we send a bookmark event, the client expects the event to contain an + // object of the correct type, but with no fields set other than the resourceVersion + TestCheckResultFunc(t, watch.Bookmark, w, func(object runtime.Object) error { + // first, check that we have the correct resource version + obj, ok := object.(metav1.Object) + if !ok { + return fmt.Errorf("got %T, not metav1.Object", object) + } + if err := validateResourceVersion(obj.GetResourceVersion()); err != nil { + return err + } + + // then, check that we have the right type and content + pod, ok := object.(*example.Pod) + if !ok { + return fmt.Errorf("got %T, not *example.Pod", object) + } + pod.ResourceVersion = "" + ExpectNoDiff(t, "bookmark event should contain an object with no fields set other than resourceVersion", &example.Pod{}, pod) + return nil + }) +} + type testWatchStruct struct { obj *example.Pod expectEvent bool