diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 445f72b746c..3d095c5ad0e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -54,11 +54,11 @@ func TestWatchFromZero(t *testing.T) { storagetesting.RunTestWatchFromZero(ctx, t, store, compactStorage(client)) } -// TestWatchFromNoneZero tests that +// TestWatchFromNonZero tests that // - watch from non-0 should just watch changes after given version func TestWatchFromNoneZero(t *testing.T) { ctx, store, _ := testSetup(t) - storagetesting.RunTestWatchFromNoneZero(ctx, t, store) + storagetesting.RunTestWatchFromNonZero(ctx, t, store) } func TestDelayedWatchDelivery(t *testing.T) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go index fa674356148..f5dfd72ec69 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go @@ -441,8 +441,10 @@ func RunTestValidateDeletionWithSuggestion(ctx context.Context, t *testing.T, st t.Errorf("Unexpected failure during deletion: %v", err) } - if calls != 2 { - t.Errorf("validate function should have been called twice, called %d", calls) + // Implementations of the storage interface are allowed to ignore the suggestion, + // in which case just one validation call is possible. + if calls > 2 { + t.Errorf("validate function should have been called at most twice, called %d", calls) } if err := store.Get(ctx, key, storage.GetOptions{}, &example.Pod{}); !storage.IsNotFound(err) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go index bb564872090..d42364a93bb 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go @@ -256,7 +256,7 @@ func RunTestDeleteTriggerWatch(ctx context.Context, t *testing.T, store storage. testCheckEventType(t, watch.Deleted, w) } -func RunTestWatchFromNoneZero(ctx context.Context, t *testing.T, store storage.Interface) { +func RunTestWatchFromNonZero(ctx context.Context, t *testing.T, store storage.Interface) { key, storedObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}) w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything}) @@ -463,13 +463,7 @@ func RunTestWatchDeleteEventObjectHaveLatestRV(ctx context.Context, t *testing.T t.Fatalf("ResourceVersion didn't changed on deletion: %s", deletedObj.ResourceVersion) } - select { - case event := <-w.ResultChan(): - watchedDeleteObj := event.Object.(*example.Pod) - if e, a := deletedObj.ResourceVersion, watchedDeleteObj.ResourceVersion; e != a { - t.Errorf("Unexpected resource version: %v, expected %v", a, e) - } - } + testCheckResult(t, watch.Deleted, w, deletedObj) } func RunTestWatchInitializationSignal(ctx context.Context, t *testing.T, store storage.Interface) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index 31c592dceb8..60017e3b374 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -98,9 +98,17 @@ func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, ha func newPod() runtime.Object { return &example.Pod{} } func newPodList() runtime.Object { return &example.PodList{} } -func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) { +func newEtcdTestStorage(t *testing.T, prefix string, pagingEnabled bool) (*etcd3testing.EtcdTestServer, storage.Interface) { server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) - storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), true, etcd3.NewDefaultLeaseManagerConfig()) + storage := etcd3.New( + server.V3Client, + apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), + newPod, + prefix, + schema.GroupResource{Resource: "pods"}, + identity.NewEncryptCheckTransformer(), + pagingEnabled, + etcd3.NewDefaultLeaseManagerConfig()) return server, storage } @@ -150,16 +158,74 @@ func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *exampl return result } +func checkStorageInvariants(ctx context.Context, t *testing.T, key string) { + // No-op function since cacher simply passes object creation to the underlying storage. +} + +func TestCreate(t *testing.T) { + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestCreate(ctx, t, cacher, checkStorageInvariants) +} + +func TestCreateWithTTL(t *testing.T) { + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestCreateWithTTL(ctx, t, cacher) +} + +func TestCreateWithKeyExist(t *testing.T) { + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestCreateWithKeyExist(ctx, t, cacher) +} + func TestGet(t *testing.T) { ctx, cacher, terminate := testSetup(t) t.Cleanup(terminate) storagetesting.RunTestGet(ctx, t, cacher) } -func TestGetListNonRecursive(t *testing.T) { +func TestUnconditionalDelete(t *testing.T) { ctx, cacher, terminate := testSetup(t) t.Cleanup(terminate) - storagetesting.RunTestGetListNonRecursive(ctx, t, cacher) + storagetesting.RunTestUnconditionalDelete(ctx, t, cacher) +} + +func TestConditionalDelete(t *testing.T) { + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestConditionalDelete(ctx, t, cacher) +} + +func TestDeleteWithSuggestion(t *testing.T) { + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestDeleteWithSuggestion(ctx, t, cacher) +} + +func TestDeleteWithSuggestionAndConflict(t *testing.T) { + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestDeleteWithSuggestionAndConflict(ctx, t, cacher) +} + +func TestDeleteWithSuggestionOfDeletedObject(t *testing.T) { + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestDeleteWithSuggestionOfDeletedObject(ctx, t, cacher) +} + +func TestValidateDeletionWithSuggestion(t *testing.T) { + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestValidateDeletionWithSuggestion(ctx, t, cacher) +} + +func TestPreconditionalDeleteWithSuggestion(t *testing.T) { + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestPreconditionalDeleteWithSuggestion(ctx, t, cacher) } func TestList(t *testing.T) { @@ -168,16 +234,82 @@ func TestList(t *testing.T) { storagetesting.RunTestList(ctx, t, cacher, true) } -func TestClusterScopedWatch(t *testing.T) { - ctx, cacher, terminate := testSetup(t, withClusterScopedKeyFunc, withSpecNodeNameIndexerFuncs) +func TestListWithoutPaging(t *testing.T) { + ctx, cacher, terminate := testSetup(t, withoutPaging) t.Cleanup(terminate) - storagetesting.RunTestClusterScopedWatch(ctx, t, cacher) + storagetesting.RunTestListWithoutPaging(ctx, t, cacher) } -func TestNamespaceScopedWatch(t *testing.T) { - ctx, cacher, terminate := testSetup(t, withSpecNodeNameIndexerFuncs) +func TestGetListNonRecursive(t *testing.T) { + ctx, cacher, terminate := testSetup(t) t.Cleanup(terminate) - storagetesting.RunTestNamespaceScopedWatch(ctx, t, cacher) + storagetesting.RunTestGetListNonRecursive(ctx, t, cacher) +} + +func checkStorageCalls(t *testing.T, pageSize, estimatedProcessedObjects uint64) { + // No-op function for now, since cacher passes pagination calls to underlying storage. +} + +func TestListContinuation(t *testing.T) { + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestListContinuation(ctx, t, cacher, checkStorageCalls) +} + +func TestListPaginationRareObject(t *testing.T) { + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestListPaginationRareObject(ctx, t, cacher, checkStorageCalls) +} + +func TestListContinuationWithFilter(t *testing.T) { + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestListContinuationWithFilter(ctx, t, cacher, checkStorageCalls) +} + +func TestListInconsistentContinuation(t *testing.T) { + // TODO(#109831): Enable use of this test and run it. +} + +func TestConsistentList(t *testing.T) { + // TODO(#109831): Enable use of this test and run it. +} + +func TestGuaranteedUpdate(t *testing.T) { + // TODO(#109831): Enable use of this test and run it. +} + +func TestGuaranteedUpdateWithTTL(t *testing.T) { + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestGuaranteedUpdateWithTTL(ctx, t, cacher) +} + +func TestGuaranteedUpdateChecksStoredData(t *testing.T) { + // TODO(#109831): Enable use of this test and run it. +} + +func TestGuaranteedUpdateWithConflict(t *testing.T) { + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestGuaranteedUpdateWithConflict(ctx, t, cacher) +} + +func TestGuaranteedUpdateWithSuggestionAndConflict(t *testing.T) { + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestGuaranteedUpdateWithSuggestionAndConflict(ctx, t, cacher) +} + +func TestTransformationFailure(t *testing.T) { + // TODO(#109831): Enable use of this test and run it. +} + +func TestCount(t *testing.T) { + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestCount(ctx, t, cacher) } func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) { @@ -208,10 +340,70 @@ func TestWatch(t *testing.T) { storagetesting.RunTestWatch(ctx, t, cacher) } +func TestWatchFromZero(t *testing.T) { + // TODO(#109831): Enable use of this test and run it. +} + +func TestDeleteTriggerWatch(t *testing.T) { + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestDeleteTriggerWatch(ctx, t, cacher) +} + +func TestWatchFromNonZero(t *testing.T) { + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestWatchFromNonZero(ctx, t, cacher) +} + +func TestDelayedWatchDelivery(t *testing.T) { + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestDelayedWatchDelivery(ctx, t, cacher) +} + +func TestWatchError(t *testing.T) { + // TODO(#109831): Enable use of this test and run it. +} + +func TestWatchContextCancel(t *testing.T) { + // TODO(#109831): Enable use of this test and run it. +} + +func TestWatcherTimeout(t *testing.T) { + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestWatcherTimeout(ctx, t, cacher) +} + +func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestWatchDeleteEventObjectHaveLatestRV(ctx, t, cacher) +} + +func TestWatchInitializationSignal(t *testing.T) { + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestWatchInitializationSignal(ctx, t, cacher) +} + +func TestClusterScopedWatch(t *testing.T) { + ctx, cacher, terminate := testSetup(t, withClusterScopedKeyFunc, withSpecNodeNameIndexerFuncs) + t.Cleanup(terminate) + storagetesting.RunTestClusterScopedWatch(ctx, t, cacher) +} + +func TestNamespaceScopedWatch(t *testing.T) { + ctx, cacher, terminate := testSetup(t, withSpecNodeNameIndexerFuncs) + t.Cleanup(terminate) + storagetesting.RunTestNamespaceScopedWatch(ctx, t, cacher) +} + // TODO(wojtek-t): We should extend the generic RunTestWatch test to cover the // scenarios that are not yet covered by it and get rid of this test. func TestWatchDeprecated(t *testing.T) { - server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) + server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix(), true) defer server.Terminate(t) fakeClock := testingclock.NewFakeClock(time.Now()) cacher, _, err := newTestCacherWithClock(etcdStorage, fakeClock) @@ -298,14 +490,8 @@ func TestWatchDeprecated(t *testing.T) { verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedResourceExpiredError) } -func TestWatcherTimeout(t *testing.T) { - ctx, cacher, terminate := testSetup(t) - t.Cleanup(terminate) - storagetesting.RunTestWatcherTimeout(ctx, t, cacher) -} - func TestEmptyWatchEventCache(t *testing.T) { - server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) + server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix(), true) defer server.Terminate(t) // add a few objects @@ -371,16 +557,10 @@ func TestEmptyWatchEventCache(t *testing.T) { } } -func TestDelayedWatchDelivery(t *testing.T) { - ctx, cacher, terminate := testSetup(t) - t.Cleanup(terminate) - storagetesting.RunTestDelayedWatchDelivery(ctx, t, cacher) -} - func TestCacherListerWatcher(t *testing.T) { prefix := "pods" fn := func() runtime.Object { return &example.PodList{} } - server, store := newEtcdTestStorage(t, prefix) + server, store := newEtcdTestStorage(t, prefix, true) defer server.Terminate(t) podFoo := makeTestPod("foo") @@ -409,7 +589,7 @@ func TestCacherListerWatcher(t *testing.T) { func TestCacherListerWatcherPagination(t *testing.T) { prefix := "pods" fn := func() runtime.Object { return &example.PodList{} } - server, store := newEtcdTestStorage(t, prefix) + server, store := newEtcdTestStorage(t, prefix, true) defer server.Terminate(t) podFoo := makeTestPod("foo") @@ -482,6 +662,7 @@ type setupOptions struct { resourcePrefix string keyFunc func(runtime.Object) (string, error) indexerFuncs map[string]storage.IndexerFunc + pagingEnabled bool clock clock.Clock } @@ -492,6 +673,7 @@ func withDefaults(options *setupOptions) { options.resourcePrefix = prefix options.keyFunc = func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) } + options.pagingEnabled = true options.clock = clock.RealClock{} } @@ -513,6 +695,10 @@ func withSpecNodeNameIndexerFuncs(options *setupOptions) { } } +func withoutPaging(options *setupOptions) { + options.pagingEnabled = false +} + func testSetup(t *testing.T, opts ...setupOption) (context.Context, *cacherstorage.Cacher, tearDownFunc) { setupOpts := setupOptions{} opts = append([]setupOption{withDefaults}, opts...) @@ -520,7 +706,7 @@ func testSetup(t *testing.T, opts ...setupOption) (context.Context, *cacherstora opt(&setupOpts) } - server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) + server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix(), setupOpts.pagingEnabled) // Inject one list error to make sure we test the relist case. wrappedStorage := &storagetesting.StorageInjectingListErrors{ Interface: etcdStorage,