diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 9f1cfcbdaea..3948129c578 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -516,7 +516,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions if opts.SendInitialEvents == nil && opts.ResourceVersion == "" { return c.storage.Watch(ctx, key, opts) } - watchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion) + requestedWatchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion) if err != nil { return nil, err } @@ -557,13 +557,13 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions chanSize := c.watchCache.suggestedWatchChannelSize(c.indexedTrigger != nil, triggerSupported) // Determine a function that computes the bookmarkAfterResourceVersion - bookmarkAfterResourceVersionFn, err := c.getBookmarkAfterResourceVersionLockedFunc(ctx, watchRV, opts) + bookmarkAfterResourceVersionFn, err := c.getBookmarkAfterResourceVersionLockedFunc(ctx, requestedWatchRV, opts) if err != nil { return newErrWatcher(err), nil } // Determine a function that computes the watchRV we should start from - startWatchResourceVersionFn, err := c.getStartResourceVersionForWatchLockedFunc(ctx, watchRV, opts) + startWatchResourceVersionFn, err := c.getStartResourceVersionForWatchLockedFunc(ctx, requestedWatchRV, opts) if err != nil { return newErrWatcher(err), nil } @@ -595,8 +595,17 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions // underlying watchCache is calling processEvent under its lock. c.watchCache.RLock() defer c.watchCache.RUnlock() - watchRV = startWatchResourceVersionFn() - cacheInterval, err := c.watchCache.getAllEventsSinceLocked(watchRV) + forceAllEvents, err := c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requestedWatchRV, opts) + if err != nil { + return newErrWatcher(err), nil + } + startWatchRV := startWatchResourceVersionFn() + var cacheInterval *watchCacheInterval + if forceAllEvents { + cacheInterval, err = c.watchCache.getIntervalFromStoreLocked() + } else { + cacheInterval, err = c.watchCache.getAllEventsSinceLocked(startWatchRV) + } if err != nil { // To match the uncached watch implementation, once we have passed authn/authz/admission, // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR, @@ -620,7 +629,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions c.watcherIdx++ }() - go watcher.processInterval(ctx, cacheInterval, watchRV) + go watcher.processInterval(ctx, cacheInterval, startWatchRV) return watcher, nil } @@ -1265,6 +1274,19 @@ func (c *Cacher) getCommonResourceVersionLockedFunc(ctx context.Context, parsedW } } +// waitUntilWatchCacheFreshAndForceAllEvents waits until cache is at least +// as fresh as given requestedWatchRV if sendInitialEvents was requested. +// Additionally, it instructs the caller whether it should ask for +// all events from the cache (full state) or not. +func (c *Cacher) waitUntilWatchCacheFreshAndForceAllEvents(ctx context.Context, requestedWatchRV uint64, opts storage.ListOptions) (bool, error) { + if opts.SendInitialEvents != nil && *opts.SendInitialEvents == true { + err := c.watchCache.waitUntilFreshAndBlock(ctx, requestedWatchRV) + defer c.watchCache.RUnlock() + return err == nil, err + } + return false, nil +} + // cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher. type cacherListerWatcher struct { storage storage.Interface diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 57ab0c28e44..9a1a905cce0 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -54,6 +54,7 @@ import ( utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/utils/clock" + "k8s.io/utils/pointer" ) type testVersioner struct{} @@ -1497,7 +1498,7 @@ func TestCacherWatchSemantics(t *testing.T) { resourceVersion: "101", storageResourceVersion: "105", initialPods: []*example.Pod{makePod(101), makePod(102)}, - expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(102)}}, + expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}}, expectedInitialEventsInStrictOrder: []watch.Event{ {Type: watch.Bookmark, Object: &example.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -1533,7 +1534,7 @@ func TestCacherWatchSemantics(t *testing.T) { storageResourceVersion: "105", initialPods: []*example.Pod{makePod(101), makePod(102)}, // make sure we only get initial events that are > initial RV (101) - expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(102)}}, + expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}}, }, { name: "sendInitialEvents=false, RV=unset, storageRV=103", @@ -1693,3 +1694,24 @@ func TestGetCurrentResourceVersionFromStorage(t *testing.T) { require.NoError(t, err) require.Equal(t, currentPodRV, podRV, "didn't expect to see the pod's RV changed") } + +func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) { + backingStorage := &dummyStorage{} + cacher, _, err := newTestCacher(backingStorage) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + + forceAllEvents, err := cacher.waitUntilWatchCacheFreshAndForceAllEvents(context.TODO(), 105, storage.ListOptions{SendInitialEvents: pointer.Bool(true)}) + require.NotNil(t, err, "the target method should return non nil error") + require.Equal(t, err.Error(), "Timeout: Too large resource version: 105, current: 100") + require.False(t, forceAllEvents, "the target method after returning an error should NOT instruct the caller to ask for all events in the cache (full state)") + + go func() { + cacher.watchCache.Add(makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"})) + }() + forceAllEvents, err = cacher.waitUntilWatchCacheFreshAndForceAllEvents(context.TODO(), 105, storage.ListOptions{SendInitialEvents: pointer.Bool(true)}) + require.NoError(t, err) + require.True(t, forceAllEvents, "the target method should instruct the caller to ask for all events in the cache (full state)") +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index e719f6f0551..783cf4524c3 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -686,11 +686,7 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCach // current state and only then start watching from that point. // // TODO: In v2 api, we should stop returning the current state - #13969. - ci, err := newCacheIntervalFromStore(w.resourceVersion, w.store, w.getAttrsFunc) - if err != nil { - return nil, err - } - return ci, nil + return w.getIntervalFromStoreLocked() } if resourceVersion < oldest-1 { return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1)) @@ -707,3 +703,14 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCach ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, &w.RWMutex) return ci, nil } + +// getIntervalFromStoreLocked returns a watchCacheInterval +// that covers the entire storage state. +// This function assumes to be called under the watchCache lock. +func (w *watchCache) getIntervalFromStoreLocked() (*watchCacheInterval, error) { + ci, err := newCacheIntervalFromStore(w.resourceVersion, w.store, w.getAttrsFunc) + if err != nil { + return nil, err + } + return ci, nil +}