From 8266c4d934d42a5175a84bff10fda4bf36f13817 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Tue, 27 Jun 2023 18:12:30 +0200 Subject: [PATCH] Generalize watch storage tests --- .../pkg/storage/cacher/cacher_test.go | 165 +----------------- .../cacher/cacher_testing_utils_test.go | 55 +++++- .../pkg/storage/cacher/watch_cache.go | 2 +- .../apiserver/pkg/storage/testing/utils.go | 2 +- .../pkg/storage/testing/watcher_tests.go | 34 +++- 5 files changed, 85 insertions(+), 173 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go index 2d617b49731..9fc2d6aa2fe 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go @@ -19,13 +19,9 @@ package cacher import ( "context" "fmt" - goruntime "runtime" - "strconv" "testing" "time" - apiequality "k8s.io/apimachinery/pkg/api/equality" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -33,19 +29,12 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/storage" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" storagetesting "k8s.io/apiserver/pkg/storage/testing" "k8s.io/utils/clock" - testingclock "k8s.io/utils/clock/testing" -) - -const ( - // watchCacheDefaultCapacity syncs watch cache defaultLowerBoundCapacity. - watchCacheDefaultCapacity = 100 ) func init() { @@ -85,41 +74,6 @@ func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, ha return source } -func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*Cacher, storage.Versioner, error) { - prefix := "pods" - v := storage.APIObjectVersioner{} - config := Config{ - Storage: s, - Versioner: v, - GroupResource: schema.GroupResource{Resource: "pods"}, - ResourcePrefix: prefix, - KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, - GetAttrsFunc: GetPodAttrs, - NewFunc: newPod, - NewListFunc: newPodList, - Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), - Clock: clock, - } - cacher, err := NewCacherFromConfig(config) - return cacher, v, err -} - -func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *example.Pod { - updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { - return obj.DeepCopyObject(), nil, nil - } - key := "pods/" + obj.Namespace + "/" + obj.Name - if err := s.GuaranteedUpdate(context.TODO(), key, &example.Pod{}, old == nil, nil, updateFn, nil); err != nil { - t.Errorf("unexpected error: %v", err) - } - obj.ResourceVersion = "" - result := &example.Pod{} - if err := s.Get(context.TODO(), key, storage.GetOptions{}, result); err != nil { - t.Errorf("unexpected error: %v", err) - } - return result -} - func checkStorageInvariants(ctx context.Context, t *testing.T, key string) { // No-op function since cacher simply passes object creation to the underlying storage. } @@ -231,7 +185,10 @@ func TestListContinuationWithFilter(t *testing.T) { } func TestListInconsistentContinuation(t *testing.T) { - // TODO(#109831): Enable use of this test and run it. + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + // TODO(#109831): Enable use of this by setting compaction. + storagetesting.RunTestListInconsistentContinuation(ctx, t, cacher, nil) } func TestConsistentList(t *testing.T) { @@ -274,28 +231,6 @@ func TestCount(t *testing.T) { storagetesting.RunTestCount(ctx, t, cacher) } -func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) { - _, _, line, _ := goruntime.Caller(1) - select { - case event := <-w.ResultChan(): - if e, a := eventType, event.Type; e != a { - t.Logf("(called from line %d)", line) - t.Errorf("Expected: %s, got: %s", eventType, event.Type) - } - object := event.Object - if co, ok := object.(runtime.CacheableObject); ok { - object = co.GetObject() - } - if e, a := eventObject, object; !apiequality.Semantic.DeepDerivative(e, a) { - t.Logf("(called from line %d)", line) - t.Errorf("Expected (%s): %#v, got: %#v", eventType, e, a) - } - case <-time.After(wait.ForeverTestTimeout): - t.Logf("(called from line %d)", line) - t.Errorf("Timed out waiting for an event") - } -} - func TestWatch(t *testing.T) { ctx, cacher, terminate := testSetup(t) t.Cleanup(terminate) @@ -305,7 +240,7 @@ func TestWatch(t *testing.T) { func TestWatchFromZero(t *testing.T) { ctx, cacher, terminate := testSetup(t) t.Cleanup(terminate) - storagetesting.RunTestWatchFromZero(ctx, t, cacher, nil) + storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactStorage(cacher)) } func TestDeleteTriggerWatch(t *testing.T) { @@ -364,96 +299,6 @@ func TestNamespaceScopedWatch(t *testing.T) { storagetesting.RunTestNamespaceScopedWatch(ctx, t, cacher) } -// TODO(wojtek-t): We should extend the generic RunTestWatch test to cover the -// scenarios that are not yet covered by it and get rid of this test. -func TestWatchDeprecated(t *testing.T) { - server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix(), true) - defer server.Terminate(t) - fakeClock := testingclock.NewFakeClock(time.Now()) - cacher, _, err := newTestCacherWithClock(etcdStorage, fakeClock) - if err != nil { - t.Fatalf("Couldn't create cacher: %v", err) - } - defer cacher.Stop() - - podFoo := makeTestPodWithName("foo") - podBar := makeTestPodWithName("bar") - - podFooPrime := makeTestPodWithName("foo") - podFooPrime.Spec.NodeName = "fakeNode" - - podFooBis := makeTestPodWithName("foo") - podFooBis.Spec.NodeName = "anotherFakeNode" - - podFooNS2 := makeTestPodWithName("foo") - podFooNS2.Namespace += "2" - - // initialVersion is used to initate the watcher at the beginning of the world, - // which is not defined precisely in etcd. - initialVersion, err := cacher.LastSyncResourceVersion() - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - startVersion := strconv.Itoa(int(initialVersion)) - - // Set up Watch for object "podFoo". - watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything}) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer watcher.Stop() - - // Create in another namespace first to make sure events from other namespaces don't get delivered - updatePod(t, etcdStorage, podFooNS2, nil) - - fooCreated := updatePod(t, etcdStorage, podFoo, nil) - _ = updatePod(t, etcdStorage, podBar, nil) - fooUpdated := updatePod(t, etcdStorage, podFooPrime, fooCreated) - - verifyWatchEvent(t, watcher, watch.Added, podFoo) - verifyWatchEvent(t, watcher, watch.Modified, podFooPrime) - - initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: fooCreated.ResourceVersion, Predicate: storage.Everything}) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer initialWatcher.Stop() - - verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime) - - // Now test watch from "now". - nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer nowWatcher.Stop() - - verifyWatchEvent(t, nowWatcher, watch.Added, podFooPrime) - - _ = updatePod(t, etcdStorage, podFooBis, fooUpdated) - - verifyWatchEvent(t, nowWatcher, watch.Modified, podFooBis) - - // Add watchCacheDefaultCapacity events to make current watch cache full. - // Make start and last event duration exceed eventFreshDuration(current 75s) to ensure watch cache won't expand. - for i := 0; i < watchCacheDefaultCapacity; i++ { - fakeClock.SetTime(time.Now().Add(time.Duration(i) * time.Minute)) - podFoo := makeTestPodWithName(fmt.Sprintf("foo-%d", i)) - updatePod(t, etcdStorage, podFoo, nil) - } - - // Check whether we get too-old error via the watch channel - tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "1", Predicate: storage.Everything}) - if err != nil { - t.Fatalf("Expected no direct error, got %v", err) - } - defer tooOldWatcher.Stop() - - // Ensure we get a "Gone" error. - expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus - verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedResourceExpiredError) -} - func TestWatchDispatchBookmarkEvents(t *testing.T) { ctx, cacher, terminate := testSetup(t) t.Cleanup(terminate) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go index 34653a891bc..c7c6968be71 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go @@ -17,6 +17,7 @@ limitations under the License. package cacher import ( + "context" "fmt" "testing" @@ -66,13 +67,53 @@ func newEtcdTestStorage(t *testing.T, prefix string, pagingEnabled bool) (*etcd3 return server, storage } -func makeTestPodWithName(name string) *example.Pod { - return &example.Pod{ - ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name}, - Spec: storagetesting.DeepEqualSafePodSpec(), - } -} - func computePodKey(obj *example.Pod) string { return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name) } + +func compactStorage(c *Cacher) storagetesting.Compaction { + return func(ctx context.Context, t *testing.T, resourceVersion string) { + versioner := storage.APIObjectVersioner{} + rv, err := versioner.ParseResourceVersion(resourceVersion) + if err != nil { + t.Fatal(err) + } + + err = c.watchCache.waitUntilFreshAndBlock(context.TODO(), rv) + if err != nil { + t.Fatalf("WatchCache didn't caught up to RV: %v", rv) + } + c.watchCache.RUnlock() + + c.watchCache.Lock() + defer c.watchCache.Unlock() + c.Lock() + defer c.Unlock() + + if c.watchCache.resourceVersion < rv { + t.Fatalf("Can't compact into a future version: %v", resourceVersion) + } + if rv < c.watchCache.listResourceVersion { + t.Fatalf("Can't compact into a past version: %v", resourceVersion) + } + + if len(c.watchers.allWatchers) > 0 || len(c.watchers.valueWatchers) > 0 { + // We could consider terminating those watchers, but given + // watchcache doesn't really support compaction and we don't + // exercise it in tests, we just throw an error here. + t.Error("Open watchers are not supported during compaction") + } + + for c.watchCache.startIndex < c.watchCache.endIndex { + index := c.watchCache.startIndex % c.watchCache.capacity + if c.watchCache.cache[index].ResourceVersion > rv { + break + } + + c.watchCache.startIndex++ + } + c.watchCache.listResourceVersion = rv + + // TODO(wojtek-t): We should also compact the underlying etcd storage. + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 455a225708a..9e7259c9a15 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -305,7 +305,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd if err := func() error { // TODO: We should consider moving this lock below after the watchCacheEvent - // is created. In such situation, the only problematic scenario is Replace( + // is created. In such situation, the only problematic scenario is Replace() // happening after getting object from store and before acquiring a lock. // Maybe introduce another lock for this purpose. w.Lock() diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go index 5d7207f5f77..67559ecd20f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go @@ -150,7 +150,7 @@ func testCheckEventType(t *testing.T, expectEventType watch.EventType, w watch.I } } -func testCheckResult(t *testing.T, expectEventType watch.EventType, w watch.Interface, expectObj *example.Pod) { +func testCheckResult(t *testing.T, expectEventType watch.EventType, w watch.Interface, expectObj runtime.Object) { testCheckResultFunc(t, expectEventType, w, func(object runtime.Object) error { ExpectNoDiff(t, "incorrect object", expectObj, object) return nil diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go index cec57f91090..4955dbcc570 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go @@ -19,12 +19,15 @@ package testing import ( "context" "fmt" + "net/http" "sync" "testing" "time" "github.com/stretchr/testify/require" + apiequality "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -238,8 +241,8 @@ func RunTestWatchFromZero(ctx context.Context, t *testing.T, store storage.Inter } // Update again - out = &example.Pod{} - err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( + newOut := &example.Pod{} + err = store.GuaranteedUpdate(ctx, key, newOut, true, nil, storage.SimpleUpdate( func(runtime.Object) (runtime.Object, error) { return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}, nil }), nil) @@ -248,14 +251,37 @@ func RunTestWatchFromZero(ctx context.Context, t *testing.T, store storage.Inter } // Compact previous versions - compaction(ctx, t, out.ResourceVersion) + compaction(ctx, t, newOut.ResourceVersion) // Make sure we can still watch from 0 and receive an ADDED event w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) + defer w.Stop() if err != nil { t.Fatalf("Watch failed: %v", err) } - testCheckResult(t, watch.Added, w, out) + testCheckResult(t, watch.Added, w, newOut) + + // Make sure we can't watch from older resource versions anymoer and get a "Gone" error. + tooOldWatcher, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: out.ResourceVersion, Predicate: storage.Everything}) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + defer tooOldWatcher.Stop() + expiredError := errors.NewResourceExpired("").ErrStatus + // TODO(wojtek-t): It seems that etcd is currently returning a different error, + // being an Internal error of "etcd event received with PrevKv=nil". + // We temporary allow both but we should unify here. + internalError := metav1.Status{ + Status: metav1.StatusFailure, + Code: http.StatusInternalServerError, + Reason: metav1.StatusReasonInternalError, + } + testCheckResultFunc(t, watch.Error, tooOldWatcher, func(obj runtime.Object) error { + if !apiequality.Semantic.DeepDerivative(&expiredError, obj) && !apiequality.Semantic.DeepDerivative(&internalError, obj) { + t.Errorf("expected: %#v; got %#v", &expiredError, obj) + } + return nil + }) } func RunTestDeleteTriggerWatch(ctx context.Context, t *testing.T, store storage.Interface) {