From 21fb98105043d1a15ef48089ef231931851d2d15 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 6 Mar 2023 14:58:31 +0100 Subject: [PATCH] cacher: Add WaitUntilWatchCacheFreshAndForceAllEvents method this method waits until cache is at least as fresh as given requestedWatchRV if sendInitialEvents was requested. Additionally, it instructs the caller whether it should ask for all events from the cache (full state) or not. --- .../apiserver/pkg/storage/cacher/cacher.go | 34 +++++++++++++++---- .../storage/cacher/cacher_whitebox_test.go | 26 ++++++++++++-- .../pkg/storage/cacher/watch_cache.go | 17 +++++++--- 3 files changed, 64 insertions(+), 13 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 9f1cfcbdaea..3948129c578 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -516,7 +516,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions if opts.SendInitialEvents == nil && opts.ResourceVersion == "" { return c.storage.Watch(ctx, key, opts) } - watchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion) + requestedWatchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion) if err != nil { return nil, err } @@ -557,13 +557,13 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions chanSize := c.watchCache.suggestedWatchChannelSize(c.indexedTrigger != nil, triggerSupported) // Determine a function that computes the bookmarkAfterResourceVersion - bookmarkAfterResourceVersionFn, err := c.getBookmarkAfterResourceVersionLockedFunc(ctx, watchRV, opts) + bookmarkAfterResourceVersionFn, err := c.getBookmarkAfterResourceVersionLockedFunc(ctx, requestedWatchRV, opts) if err != nil { return newErrWatcher(err), nil } // Determine a function that computes the watchRV we should start from - startWatchResourceVersionFn, err := c.getStartResourceVersionForWatchLockedFunc(ctx, watchRV, opts) + startWatchResourceVersionFn, err := c.getStartResourceVersionForWatchLockedFunc(ctx, requestedWatchRV, opts) if err != nil { return newErrWatcher(err), nil } @@ -595,8 +595,17 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions // underlying watchCache is calling processEvent under its lock. c.watchCache.RLock() defer c.watchCache.RUnlock() - watchRV = startWatchResourceVersionFn() - cacheInterval, err := c.watchCache.getAllEventsSinceLocked(watchRV) + forceAllEvents, err := c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requestedWatchRV, opts) + if err != nil { + return newErrWatcher(err), nil + } + startWatchRV := startWatchResourceVersionFn() + var cacheInterval *watchCacheInterval + if forceAllEvents { + cacheInterval, err = c.watchCache.getIntervalFromStoreLocked() + } else { + cacheInterval, err = c.watchCache.getAllEventsSinceLocked(startWatchRV) + } if err != nil { // To match the uncached watch implementation, once we have passed authn/authz/admission, // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR, @@ -620,7 +629,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions c.watcherIdx++ }() - go watcher.processInterval(ctx, cacheInterval, watchRV) + go watcher.processInterval(ctx, cacheInterval, startWatchRV) return watcher, nil } @@ -1265,6 +1274,19 @@ func (c *Cacher) getCommonResourceVersionLockedFunc(ctx context.Context, parsedW } } +// waitUntilWatchCacheFreshAndForceAllEvents waits until cache is at least +// as fresh as given requestedWatchRV if sendInitialEvents was requested. +// Additionally, it instructs the caller whether it should ask for +// all events from the cache (full state) or not. +func (c *Cacher) waitUntilWatchCacheFreshAndForceAllEvents(ctx context.Context, requestedWatchRV uint64, opts storage.ListOptions) (bool, error) { + if opts.SendInitialEvents != nil && *opts.SendInitialEvents == true { + err := c.watchCache.waitUntilFreshAndBlock(ctx, requestedWatchRV) + defer c.watchCache.RUnlock() + return err == nil, err + } + return false, nil +} + // cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher. type cacherListerWatcher struct { storage storage.Interface diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 57ab0c28e44..9a1a905cce0 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -54,6 +54,7 @@ import ( utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/utils/clock" + "k8s.io/utils/pointer" ) type testVersioner struct{} @@ -1497,7 +1498,7 @@ func TestCacherWatchSemantics(t *testing.T) { resourceVersion: "101", storageResourceVersion: "105", initialPods: []*example.Pod{makePod(101), makePod(102)}, - expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(102)}}, + expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}}, expectedInitialEventsInStrictOrder: []watch.Event{ {Type: watch.Bookmark, Object: &example.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -1533,7 +1534,7 @@ func TestCacherWatchSemantics(t *testing.T) { storageResourceVersion: "105", initialPods: []*example.Pod{makePod(101), makePod(102)}, // make sure we only get initial events that are > initial RV (101) - expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(102)}}, + expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}}, }, { name: "sendInitialEvents=false, RV=unset, storageRV=103", @@ -1693,3 +1694,24 @@ func TestGetCurrentResourceVersionFromStorage(t *testing.T) { require.NoError(t, err) require.Equal(t, currentPodRV, podRV, "didn't expect to see the pod's RV changed") } + +func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) { + backingStorage := &dummyStorage{} + cacher, _, err := newTestCacher(backingStorage) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + + forceAllEvents, err := cacher.waitUntilWatchCacheFreshAndForceAllEvents(context.TODO(), 105, storage.ListOptions{SendInitialEvents: pointer.Bool(true)}) + require.NotNil(t, err, "the target method should return non nil error") + require.Equal(t, err.Error(), "Timeout: Too large resource version: 105, current: 100") + require.False(t, forceAllEvents, "the target method after returning an error should NOT instruct the caller to ask for all events in the cache (full state)") + + go func() { + cacher.watchCache.Add(makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"})) + }() + forceAllEvents, err = cacher.waitUntilWatchCacheFreshAndForceAllEvents(context.TODO(), 105, storage.ListOptions{SendInitialEvents: pointer.Bool(true)}) + require.NoError(t, err) + require.True(t, forceAllEvents, "the target method should instruct the caller to ask for all events in the cache (full state)") +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index e719f6f0551..783cf4524c3 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -686,11 +686,7 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCach // current state and only then start watching from that point. // // TODO: In v2 api, we should stop returning the current state - #13969. - ci, err := newCacheIntervalFromStore(w.resourceVersion, w.store, w.getAttrsFunc) - if err != nil { - return nil, err - } - return ci, nil + return w.getIntervalFromStoreLocked() } if resourceVersion < oldest-1 { return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1)) @@ -707,3 +703,14 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCach ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, &w.RWMutex) return ci, nil } + +// getIntervalFromStoreLocked returns a watchCacheInterval +// that covers the entire storage state. +// This function assumes to be called under the watchCache lock. +func (w *watchCache) getIntervalFromStoreLocked() (*watchCacheInterval, error) { + ci, err := newCacheIntervalFromStore(w.resourceVersion, w.store, w.getAttrsFunc) + if err != nil { + return nil, err + } + return ci, nil +}