diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go index 1a88249c77c..cec57f91090 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go @@ -192,8 +192,10 @@ func testWatch(ctx context.Context, t *testing.T, store storage.Interface, recur } // RunTestWatchFromZero tests that -// - watch from 0 should sync up and grab the object added before -// - watch from 0 is able to return events for objects whose previous version has been compacted +// - watch from 0 should sync up and grab the object added before +// - For testing with etcd, watch from 0 is able to return events for objects +// whose previous version has been compacted. If testing with cacher, we +// expect compaction to be nil. func RunTestWatchFromZero(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction) { key, storedObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}) @@ -202,7 +204,6 @@ func RunTestWatchFromZero(ctx context.Context, t *testing.T, store storage.Inter t.Fatalf("Watch failed: %v", err) } testCheckResult(t, watch.Added, w, storedObj) - w.Stop() // Update out := &example.Pod{} @@ -214,14 +215,24 @@ func RunTestWatchFromZero(ctx context.Context, t *testing.T, store storage.Inter t.Fatalf("GuaranteedUpdate failed: %v", err) } + // Check that we receive a modified watch event. This check also + // indirectly ensures that the cache is synced. This is important + // when testing with the Cacher since we may have to allow for slow + // processing by allowing updates to propagate to the watch cache. + // This allows for that. + testCheckResult(t, watch.Modified, w, out) + w.Stop() + // Make sure when we watch from 0 we receive an ADDED event w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) if err != nil { t.Fatalf("Watch failed: %v", err) } + testCheckResult(t, watch.Added, w, out) w.Stop() + // Compact previous versions if compaction == nil { t.Skip("compaction callback not provided") } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index 6bb30764028..2e10e762ae7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -337,7 +337,9 @@ func TestWatch(t *testing.T) { } func TestWatchFromZero(t *testing.T) { - // TODO(#109831): Enable use of this test and run it. + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestWatchFromZero(ctx, t, cacher, nil) } func TestDeleteTriggerWatch(t *testing.T) {