cacher: Add WaitUntilWatchCacheFreshAndForceAllEvents method

this method waits until cache is at least
as fresh as given requestedWatchRV if sendInitialEvents was requested.
Additionally, it instructs the caller whether it should ask for
all events from the cache (full state) or not.
This commit is contained in:
Lukasz Szaszkiewicz 2023-03-06 14:58:31 +01:00
parent b6acf6f805
commit 21fb981050
3 changed files with 64 additions and 13 deletions

View File

@ -516,7 +516,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
if opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
return c.storage.Watch(ctx, key, opts)
}
watchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
requestedWatchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return nil, err
}
@ -557,13 +557,13 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
chanSize := c.watchCache.suggestedWatchChannelSize(c.indexedTrigger != nil, triggerSupported)
// Determine a function that computes the bookmarkAfterResourceVersion
bookmarkAfterResourceVersionFn, err := c.getBookmarkAfterResourceVersionLockedFunc(ctx, watchRV, opts)
bookmarkAfterResourceVersionFn, err := c.getBookmarkAfterResourceVersionLockedFunc(ctx, requestedWatchRV, opts)
if err != nil {
return newErrWatcher(err), nil
}
// Determine a function that computes the watchRV we should start from
startWatchResourceVersionFn, err := c.getStartResourceVersionForWatchLockedFunc(ctx, watchRV, opts)
startWatchResourceVersionFn, err := c.getStartResourceVersionForWatchLockedFunc(ctx, requestedWatchRV, opts)
if err != nil {
return newErrWatcher(err), nil
}
@ -595,8 +595,17 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
// underlying watchCache is calling processEvent under its lock.
c.watchCache.RLock()
defer c.watchCache.RUnlock()
watchRV = startWatchResourceVersionFn()
cacheInterval, err := c.watchCache.getAllEventsSinceLocked(watchRV)
forceAllEvents, err := c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requestedWatchRV, opts)
if err != nil {
return newErrWatcher(err), nil
}
startWatchRV := startWatchResourceVersionFn()
var cacheInterval *watchCacheInterval
if forceAllEvents {
cacheInterval, err = c.watchCache.getIntervalFromStoreLocked()
} else {
cacheInterval, err = c.watchCache.getAllEventsSinceLocked(startWatchRV)
}
if err != nil {
// To match the uncached watch implementation, once we have passed authn/authz/admission,
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
@ -620,7 +629,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
c.watcherIdx++
}()
go watcher.processInterval(ctx, cacheInterval, watchRV)
go watcher.processInterval(ctx, cacheInterval, startWatchRV)
return watcher, nil
}
@ -1265,6 +1274,19 @@ func (c *Cacher) getCommonResourceVersionLockedFunc(ctx context.Context, parsedW
}
}
// waitUntilWatchCacheFreshAndForceAllEvents waits until cache is at least
// as fresh as given requestedWatchRV if sendInitialEvents was requested.
// Additionally, it instructs the caller whether it should ask for
// all events from the cache (full state) or not.
func (c *Cacher) waitUntilWatchCacheFreshAndForceAllEvents(ctx context.Context, requestedWatchRV uint64, opts storage.ListOptions) (bool, error) {
if opts.SendInitialEvents != nil && *opts.SendInitialEvents == true {
err := c.watchCache.waitUntilFreshAndBlock(ctx, requestedWatchRV)
defer c.watchCache.RUnlock()
return err == nil, err
}
return false, nil
}
// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher.
type cacherListerWatcher struct {
storage storage.Interface

View File

@ -54,6 +54,7 @@ import (
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/utils/clock"
"k8s.io/utils/pointer"
)
type testVersioner struct{}
@ -1497,7 +1498,7 @@ func TestCacherWatchSemantics(t *testing.T) {
resourceVersion: "101",
storageResourceVersion: "105",
initialPods: []*example.Pod{makePod(101), makePod(102)},
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(102)}},
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
expectedInitialEventsInStrictOrder: []watch.Event{
{Type: watch.Bookmark, Object: &example.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -1533,7 +1534,7 @@ func TestCacherWatchSemantics(t *testing.T) {
storageResourceVersion: "105",
initialPods: []*example.Pod{makePod(101), makePod(102)},
// make sure we only get initial events that are > initial RV (101)
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(102)}},
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
},
{
name: "sendInitialEvents=false, RV=unset, storageRV=103",
@ -1693,3 +1694,24 @@ func TestGetCurrentResourceVersionFromStorage(t *testing.T) {
require.NoError(t, err)
require.Equal(t, currentPodRV, podRV, "didn't expect to see the pod's RV changed")
}
func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
forceAllEvents, err := cacher.waitUntilWatchCacheFreshAndForceAllEvents(context.TODO(), 105, storage.ListOptions{SendInitialEvents: pointer.Bool(true)})
require.NotNil(t, err, "the target method should return non nil error")
require.Equal(t, err.Error(), "Timeout: Too large resource version: 105, current: 100")
require.False(t, forceAllEvents, "the target method after returning an error should NOT instruct the caller to ask for all events in the cache (full state)")
go func() {
cacher.watchCache.Add(makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}))
}()
forceAllEvents, err = cacher.waitUntilWatchCacheFreshAndForceAllEvents(context.TODO(), 105, storage.ListOptions{SendInitialEvents: pointer.Bool(true)})
require.NoError(t, err)
require.True(t, forceAllEvents, "the target method should instruct the caller to ask for all events in the cache (full state)")
}

View File

@ -686,11 +686,7 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCach
// current state and only then start watching from that point.
//
// TODO: In v2 api, we should stop returning the current state - #13969.
ci, err := newCacheIntervalFromStore(w.resourceVersion, w.store, w.getAttrsFunc)
if err != nil {
return nil, err
}
return ci, nil
return w.getIntervalFromStoreLocked()
}
if resourceVersion < oldest-1 {
return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))
@ -707,3 +703,14 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCach
ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, &w.RWMutex)
return ci, nil
}
// getIntervalFromStoreLocked returns a watchCacheInterval
// that covers the entire storage state.
// This function assumes to be called under the watchCache lock.
func (w *watchCache) getIntervalFromStoreLocked() (*watchCacheInterval, error) {
ci, err := newCacheIntervalFromStore(w.resourceVersion, w.store, w.getAttrsFunc)
if err != nil {
return nil, err
}
return ci, nil
}