diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 2487d407503..d4239062dee 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -23,15 +23,10 @@ import ( "testing" "time" - storagetesting "k8s.io/apiserver/pkg/storage/testing" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" - "k8s.io/apiserver/pkg/apis/example" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/etcd3/testserver" + storagetesting "k8s.io/apiserver/pkg/storage/testing" ) func TestWatch(t *testing.T) { @@ -66,6 +61,32 @@ func TestWatchContextCancel(t *testing.T) { storagetesting.RunTestWatchContextCancel(ctx, t, store) } +func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { + ctx, store, _ := testSetup(t) + storagetesting.RunTestWatchDeleteEventObjectHaveLatestRV(ctx, t, store) +} + +func TestWatchInitializationSignal(t *testing.T) { + ctx, store, _ := testSetup(t) + storagetesting.RunTestWatchInitializationSignal(ctx, t, store) +} + +func TestProgressNotify(t *testing.T) { + clusterConfig := testserver.NewTestConfig(t) + clusterConfig.ExperimentalWatchProgressNotifyInterval = time.Second + ctx, store, _ := testSetup(t, withClientConfig(clusterConfig)) + + storagetesting.RunOptionalTestProgressNotify(ctx, t, store) +} + +// ======================================================================= +// Implementation-specific tests are following. +// The following tests are exercising the details of the implementation +// not the actual user-facing contract of storage interface. +// As such, they may focus e.g. on non-functional aspects like performance +// impact. +// ======================================================================= + func TestWatchErrResultNotBlockAfterCancel(t *testing.T) { origCtx, store, _ := testSetup(t) ctx, cancel := context.WithCancel(origCtx) @@ -87,84 +108,3 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) { cancel() wg.Wait() } - -func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { - ctx, store, _ := testSetup(t) - - key, storedObj := storagetesting.TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) - - watchCtx, _ := context.WithTimeout(ctx, wait.ForeverTestTimeout) - w, err := store.Watch(watchCtx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything}) - if err != nil { - t.Fatalf("Watch failed: %v", err) - } - - deletedObj := &example.Pod{} - if err := store.Delete(ctx, key, deletedObj, &storage.Preconditions{}, storage.ValidateAllObjectFunc, nil); err != nil { - t.Fatalf("Delete failed: %v", err) - } - - // Verify that ResourceVersion has changed on deletion. - if storedObj.ResourceVersion == deletedObj.ResourceVersion { - t.Fatalf("ResourceVersion didn't changed on deletion: %s", deletedObj.ResourceVersion) - } - - select { - case event := <-w.ResultChan(): - watchedDeleteObj := event.Object.(*example.Pod) - if e, a := deletedObj.ResourceVersion, watchedDeleteObj.ResourceVersion; e != a { - t.Errorf("Unexpected resource version: %v, expected %v", a, e) - } - } -} - -func TestWatchInitializationSignal(t *testing.T) { - ctx, store, _ := testSetup(t) - storagetesting.RunTestWatchInitializationSignal(ctx, t, store) -} - -func TestProgressNotify(t *testing.T) { - clusterConfig := testserver.NewTestConfig(t) - clusterConfig.ExperimentalWatchProgressNotifyInterval = time.Second - ctx, store, _ := testSetup(t, withClientConfig(clusterConfig)) - - key := "/somekey" - input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}} - out := &example.Pod{} - if err := store.Create(ctx, key, input, out, 0); err != nil { - t.Fatalf("Create failed: %v", err) - } - validateResourceVersion := storagetesting.ResourceVersionNotOlderThan(out.ResourceVersion) - - opts := storage.ListOptions{ - ResourceVersion: out.ResourceVersion, - Predicate: storage.Everything, - ProgressNotify: true, - } - w, err := store.Watch(ctx, key, opts) - if err != nil { - t.Fatalf("Watch failed: %v", err) - } - - // when we send a bookmark event, the client expects the event to contain an - // object of the correct type, but with no fields set other than the resourceVersion - storagetesting.TestCheckResultFunc(t, watch.Bookmark, w, func(object runtime.Object) error { - // first, check that we have the correct resource version - obj, ok := object.(metav1.Object) - if !ok { - return fmt.Errorf("got %T, not metav1.Object", object) - } - if err := validateResourceVersion(obj.GetResourceVersion()); err != nil { - return err - } - - // then, check that we have the right type and content - pod, ok := object.(*example.Pod) - if !ok { - return fmt.Errorf("got %T, not *example.Pod", object) - } - pod.ResourceVersion = "" - storagetesting.ExpectNoDiff(t, "bookmark event should contain an object with no fields set other than resourceVersion", newPod(), pod) - return nil - }) -} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go index 36ffb9374d1..4876cf36ba9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go @@ -18,6 +18,7 @@ package testing import ( "context" + "fmt" "testing" "time" @@ -261,6 +262,34 @@ func RunTestWatchContextCancel(ctx context.Context, t *testing.T, store storage. } } +func RunTestWatchDeleteEventObjectHaveLatestRV(ctx context.Context, t *testing.T, store storage.Interface) { + key, storedObj := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + + watchCtx, _ := context.WithTimeout(ctx, wait.ForeverTestTimeout) + w, err := store.Watch(watchCtx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything}) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + + deletedObj := &example.Pod{} + if err := store.Delete(ctx, key, deletedObj, &storage.Preconditions{}, storage.ValidateAllObjectFunc, nil); err != nil { + t.Fatalf("Delete failed: %v", err) + } + + // Verify that ResourceVersion has changed on deletion. + if storedObj.ResourceVersion == deletedObj.ResourceVersion { + t.Fatalf("ResourceVersion didn't changed on deletion: %s", deletedObj.ResourceVersion) + } + + select { + case event := <-w.ResultChan(): + watchedDeleteObj := event.Object.(*example.Pod) + if e, a := deletedObj.ResourceVersion, watchedDeleteObj.ResourceVersion; e != a { + t.Errorf("Unexpected resource version: %v, expected %v", a, e) + } + } +} + func RunTestWatchInitializationSignal(ctx context.Context, t *testing.T, store storage.Interface) { ctx, _ = context.WithTimeout(ctx, 5*time.Second) initSignal := utilflowcontrol.NewInitializationSignal() @@ -275,6 +304,52 @@ func RunTestWatchInitializationSignal(ctx context.Context, t *testing.T, store s initSignal.Wait() } +// RunOptionalTestProgressNotify tests ProgressNotify feature of ListOptions. +// Given this feature is currently not explicitly used by higher layers of Kubernetes +// (it rather is used by wrappers of storage.Interface to implement its functionalities) +// this test is currently considered optional. +func RunOptionalTestProgressNotify(ctx context.Context, t *testing.T, store storage.Interface) { + key := "/somekey" + input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}} + out := &example.Pod{} + if err := store.Create(ctx, key, input, out, 0); err != nil { + t.Fatalf("Create failed: %v", err) + } + validateResourceVersion := ResourceVersionNotOlderThan(out.ResourceVersion) + + opts := storage.ListOptions{ + ResourceVersion: out.ResourceVersion, + Predicate: storage.Everything, + ProgressNotify: true, + } + w, err := store.Watch(ctx, key, opts) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + + // when we send a bookmark event, the client expects the event to contain an + // object of the correct type, but with no fields set other than the resourceVersion + TestCheckResultFunc(t, watch.Bookmark, w, func(object runtime.Object) error { + // first, check that we have the correct resource version + obj, ok := object.(metav1.Object) + if !ok { + return fmt.Errorf("got %T, not metav1.Object", object) + } + if err := validateResourceVersion(obj.GetResourceVersion()); err != nil { + return err + } + + // then, check that we have the right type and content + pod, ok := object.(*example.Pod) + if !ok { + return fmt.Errorf("got %T, not *example.Pod", object) + } + pod.ResourceVersion = "" + ExpectNoDiff(t, "bookmark event should contain an object with no fields set other than resourceVersion", &example.Pod{}, pod) + return nil + }) +} + type testWatchStruct struct { obj *example.Pod expectEvent bool