From 610b67031c79c6c38964631d27dd59df357c6d2e Mon Sep 17 00:00:00 2001 From: Madhav Jivrajani Date: Mon, 16 Jan 2023 11:10:05 +0530 Subject: [PATCH] cacher: Fix watch behaviour for unset RV The original design was to honour strong consistency semantics for when the RV is unset, i.e. serve the watch by doing a quorum read. However, the implementation did not match the intent, in that, the Cacher did not distinguish between set and unset RV. This commit rectifies that behaviour by serving the watch from the underlying storage if the RV is unset. This commit subsequently also adds a test for the same. Signed-off-by: Madhav Jivrajani --- .../apiserver/pkg/storage/cacher/cacher.go | 7 +++++ .../storage/cacher/cacher_whitebox_test.go | 31 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index bc6500909c3..d913154e000 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -482,6 +482,13 @@ func (c *Cacher) Delete( // Watch implements storage.Interface. func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { pred := opts.Predicate + // If the resourceVersion is unset, ensure that the rv + // from which the watch is being served, is the latest + // one. "latest" is ensured by serving the watch from + // the underlying storage. + if opts.ResourceVersion == "" { + return c.storage.Watch(ctx, key, opts) + } watchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion) if err != nil { return nil, err diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 857fed8c6f7..a06ab757e6e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -449,6 +449,37 @@ func TestGetCacheBypass(t *testing.T) { } } +func TestWatchCacheBypass(t *testing.T) { + backingStorage := &dummyStorage{} + cacher, _, err := newTestCacher(backingStorage) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + + // Wait until cacher is initialized. + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } + + // Inject error to underlying layer and check if cacher is not bypassed. + backingStorage.injectError(errDummy) + _, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{ + ResourceVersion: "0", + }) + if err != nil { + t.Errorf("Watch with RV=0 should be served from cache: %v", err) + } + + // With unset RV, check if cacher is bypassed. + _, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{ + ResourceVersion: "", + }) + if err != errDummy { + t.Errorf("Watch with unset RV should bypass cacher: %v", err) + } +} + func TestWatcherNotGoingBackInTime(t *testing.T) { backingStorage := &dummyStorage{} cacher, _, err := newTestCacher(backingStorage)