Merge pull request #116302 from p0lyn0mial/upstream-cacher-initial-events-rv-gt-zero

cacher: WaitUntilWatchCacheFreshAndForceAllEvents
This commit is contained in:
Kubernetes Prow Robot 2023-03-07 08:28:54 -08:00 committed by GitHub
commit 05f9e2a3aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 64 additions and 13 deletions

View File

@ -516,7 +516,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
if opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
return c.storage.Watch(ctx, key, opts)
}
watchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
requestedWatchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return nil, err
}
@ -557,13 +557,13 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
chanSize := c.watchCache.suggestedWatchChannelSize(c.indexedTrigger != nil, triggerSupported)
// Determine a function that computes the bookmarkAfterResourceVersion
bookmarkAfterResourceVersionFn, err := c.getBookmarkAfterResourceVersionLockedFunc(ctx, watchRV, opts)
bookmarkAfterResourceVersionFn, err := c.getBookmarkAfterResourceVersionLockedFunc(ctx, requestedWatchRV, opts)
if err != nil {
return newErrWatcher(err), nil
}
// Determine a function that computes the watchRV we should start from
startWatchResourceVersionFn, err := c.getStartResourceVersionForWatchLockedFunc(ctx, watchRV, opts)
startWatchResourceVersionFn, err := c.getStartResourceVersionForWatchLockedFunc(ctx, requestedWatchRV, opts)
if err != nil {
return newErrWatcher(err), nil
}
@ -595,8 +595,17 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
// underlying watchCache is calling processEvent under its lock.
c.watchCache.RLock()
defer c.watchCache.RUnlock()
watchRV = startWatchResourceVersionFn()
cacheInterval, err := c.watchCache.getAllEventsSinceLocked(watchRV)
forceAllEvents, err := c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requestedWatchRV, opts)
if err != nil {
return newErrWatcher(err), nil
}
startWatchRV := startWatchResourceVersionFn()
var cacheInterval *watchCacheInterval
if forceAllEvents {
cacheInterval, err = c.watchCache.getIntervalFromStoreLocked()
} else {
cacheInterval, err = c.watchCache.getAllEventsSinceLocked(startWatchRV)
}
if err != nil {
// To match the uncached watch implementation, once we have passed authn/authz/admission,
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
@ -620,7 +629,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
c.watcherIdx++
}()
go watcher.processInterval(ctx, cacheInterval, watchRV)
go watcher.processInterval(ctx, cacheInterval, startWatchRV)
return watcher, nil
}
@ -1265,6 +1274,19 @@ func (c *Cacher) getCommonResourceVersionLockedFunc(ctx context.Context, parsedW
}
}
// waitUntilWatchCacheFreshAndForceAllEvents waits until cache is at least
// as fresh as given requestedWatchRV if sendInitialEvents was requested.
// Additionally, it instructs the caller whether it should ask for
// all events from the cache (full state) or not.
func (c *Cacher) waitUntilWatchCacheFreshAndForceAllEvents(ctx context.Context, requestedWatchRV uint64, opts storage.ListOptions) (bool, error) {
if opts.SendInitialEvents != nil && *opts.SendInitialEvents == true {
err := c.watchCache.waitUntilFreshAndBlock(ctx, requestedWatchRV)
defer c.watchCache.RUnlock()
return err == nil, err
}
return false, nil
}
// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher.
type cacherListerWatcher struct {
storage storage.Interface

View File

@ -54,6 +54,7 @@ import (
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/utils/clock"
"k8s.io/utils/pointer"
)
type testVersioner struct{}
@ -1497,7 +1498,7 @@ func TestCacherWatchSemantics(t *testing.T) {
resourceVersion: "101",
storageResourceVersion: "105",
initialPods: []*example.Pod{makePod(101), makePod(102)},
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(102)}},
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
expectedInitialEventsInStrictOrder: []watch.Event{
{Type: watch.Bookmark, Object: &example.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -1533,7 +1534,7 @@ func TestCacherWatchSemantics(t *testing.T) {
storageResourceVersion: "105",
initialPods: []*example.Pod{makePod(101), makePod(102)},
// make sure we only get initial events that are > initial RV (101)
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(102)}},
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
},
{
name: "sendInitialEvents=false, RV=unset, storageRV=103",
@ -1693,3 +1694,24 @@ func TestGetCurrentResourceVersionFromStorage(t *testing.T) {
require.NoError(t, err)
require.Equal(t, currentPodRV, podRV, "didn't expect to see the pod's RV changed")
}
func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
forceAllEvents, err := cacher.waitUntilWatchCacheFreshAndForceAllEvents(context.TODO(), 105, storage.ListOptions{SendInitialEvents: pointer.Bool(true)})
require.NotNil(t, err, "the target method should return non nil error")
require.Equal(t, err.Error(), "Timeout: Too large resource version: 105, current: 100")
require.False(t, forceAllEvents, "the target method after returning an error should NOT instruct the caller to ask for all events in the cache (full state)")
go func() {
cacher.watchCache.Add(makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}))
}()
forceAllEvents, err = cacher.waitUntilWatchCacheFreshAndForceAllEvents(context.TODO(), 105, storage.ListOptions{SendInitialEvents: pointer.Bool(true)})
require.NoError(t, err)
require.True(t, forceAllEvents, "the target method should instruct the caller to ask for all events in the cache (full state)")
}

View File

@ -686,11 +686,7 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCach
// current state and only then start watching from that point.
//
// TODO: In v2 api, we should stop returning the current state - #13969.
ci, err := newCacheIntervalFromStore(w.resourceVersion, w.store, w.getAttrsFunc)
if err != nil {
return nil, err
}
return ci, nil
return w.getIntervalFromStoreLocked()
}
if resourceVersion < oldest-1 {
return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))
@ -707,3 +703,14 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCach
ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, &w.RWMutex)
return ci, nil
}
// getIntervalFromStoreLocked returns a watchCacheInterval
// that covers the entire storage state.
// This function assumes to be called under the watchCache lock.
func (w *watchCache) getIntervalFromStoreLocked() (*watchCacheInterval, error) {
ci, err := newCacheIntervalFromStore(w.resourceVersion, w.store, w.getAttrsFunc)
if err != nil {
return nil, err
}
return ci, nil
}