diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 079d197a2cc..43640d732f4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -513,6 +513,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) && opts.SendInitialEvents != nil { opts.SendInitialEvents = nil } + // TODO: we should eventually get rid of this legacy case if opts.SendInitialEvents == nil && opts.ResourceVersion == "" { return c.storage.Watch(ctx, key, opts) } @@ -557,14 +558,14 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions // watchers on our watcher having a processing hiccup chanSize := c.watchCache.suggestedWatchChannelSize(c.indexedTrigger != nil, triggerSupported) - // Determine a function that computes the bookmarkAfterResourceVersion - bookmarkAfterResourceVersionFn, err := c.getBookmarkAfterResourceVersionLockedFunc(ctx, requestedWatchRV, opts) + // Determine the ResourceVersion to which the watch cache must be synchronized + requiredResourceVersion, err := c.getWatchCacheResourceVersion(ctx, requestedWatchRV, opts) if err != nil { return newErrWatcher(err), nil } - // Determine a function that computes the watchRV we should start from - startWatchResourceVersionFn, err := c.getStartResourceVersionForWatchLockedFunc(ctx, requestedWatchRV, opts) + // Determine a function that computes the bookmarkAfterResourceVersion + bookmarkAfterResourceVersionFn, err := c.getBookmarkAfterResourceVersionLockedFunc(requestedWatchRV, requiredResourceVersion, opts) if err != nil { return newErrWatcher(err), nil } @@ -596,7 +597,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions // moreover even though the c.waitUntilWatchCacheFreshAndForceAllEvents acquires a lock // it is safe to release the lock after the method finishes because we don't require // any atomicity between the call to the method and further calls that actually get the events. - forceAllEvents, err := c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requestedWatchRV, opts) + err = c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requiredResourceVersion, opts) if err != nil { return newErrWatcher(err), nil } @@ -609,13 +610,8 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions c.watchCache.RLock() defer c.watchCache.RUnlock() - startWatchRV := startWatchResourceVersionFn() var cacheInterval *watchCacheInterval - if forceAllEvents { - cacheInterval, err = c.watchCache.getIntervalFromStoreLocked() - } else { - cacheInterval, err = c.watchCache.getAllEventsSinceLocked(startWatchRV) - } + cacheInterval, err = c.watchCache.getAllEventsSinceLocked(requiredResourceVersion, opts) if err != nil { // To match the uncached watch implementation, once we have passed authn/authz/admission, // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR, @@ -657,7 +653,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions return newImmediateCloseWatcher(), nil } - go watcher.processInterval(ctx, cacheInterval, startWatchRV) + go watcher.processInterval(ctx, cacheInterval, requiredResourceVersion) return watcher, nil } @@ -1249,59 +1245,61 @@ func (c *Cacher) LastSyncResourceVersion() (uint64, error) { // spits a ResourceVersion after which the bookmark event will be delivered. // // The returned function must be called under the watchCache lock. -func (c *Cacher) getBookmarkAfterResourceVersionLockedFunc(ctx context.Context, parsedResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) { +func (c *Cacher) getBookmarkAfterResourceVersionLockedFunc(parsedResourceVersion, requiredResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) { if opts.SendInitialEvents == nil || !*opts.SendInitialEvents || !opts.Predicate.AllowWatchBookmarks { return func() uint64 { return 0 }, nil } - return c.getCommonResourceVersionLockedFunc(ctx, parsedResourceVersion, opts) -} -// getStartResourceVersionForWatchLockedFunc returns a function that -// spits a ResourceVersion the watch will be started from. -// Depending on the input parameters the semantics of the returned ResourceVersion are: -// - start at Exact (return parsedWatchResourceVersion) -// - start at Most Recent (return an RV from etcd) -// - start at Any (return the current watchCache's RV) -// -// The returned function must be called under the watchCache lock. -func (c *Cacher) getStartResourceVersionForWatchLockedFunc(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) { - if opts.SendInitialEvents == nil || *opts.SendInitialEvents { - return func() uint64 { return parsedWatchResourceVersion }, nil - } - return c.getCommonResourceVersionLockedFunc(ctx, parsedWatchResourceVersion, opts) -} - -// getCommonResourceVersionLockedFunc a helper that simply computes a ResourceVersion -// based on the input parameters. Please examine callers of this method to get more context. -// -// The returned function must be called under the watchCache lock. -func (c *Cacher) getCommonResourceVersionLockedFunc(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) { switch { case len(opts.ResourceVersion) == 0: - rv, err := storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String()) - if err != nil { - return nil, err - } - return func() uint64 { return rv }, nil - case parsedWatchResourceVersion == 0: + return func() uint64 { return requiredResourceVersion }, nil + case parsedResourceVersion == 0: // here we assume that watchCache locked is already held return func() uint64 { return c.watchCache.resourceVersion }, nil default: - return func() uint64 { return parsedWatchResourceVersion }, nil + return func() uint64 { return parsedResourceVersion }, nil } } +// getWatchCacheResourceVersion returns a ResourceVersion to which the watch cache must be synchronized to +// +// Depending on the input parameters, the semantics of the returned ResourceVersion are: +// - must be at Exact RV (when parsedWatchResourceVersion > 0) +// - can be at Any RV (when parsedWatchResourceVersion = 0) +// - must be at Most Recent RV (return an RV from etcd) +// +// note that the above semantic is enforced by the API validation (defined elsewhere): +// +// if SendInitiaEvents != nil => ResourceVersionMatch = NotOlderThan +// if ResourceVersionmatch != nil => ResourceVersionMatch = NotOlderThan & SendInitialEvents != nil +// +// to satisfy the legacy case (SendInitialEvents = true, RV="") we skip checking opts.Predicate.AllowWatchBookmarks +func (c *Cacher) getWatchCacheResourceVersion(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (uint64, error) { + if len(opts.ResourceVersion) != 0 { + return parsedWatchResourceVersion, nil + } + rv, err := storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String()) + return rv, err +} + // waitUntilWatchCacheFreshAndForceAllEvents waits until cache is at least // as fresh as given requestedWatchRV if sendInitialEvents was requested. -// Additionally, it instructs the caller whether it should ask for -// all events from the cache (full state) or not. -func (c *Cacher) waitUntilWatchCacheFreshAndForceAllEvents(ctx context.Context, requestedWatchRV uint64, opts storage.ListOptions) (bool, error) { +// otherwise, we allow for establishing the connection because the clients +// can wait for events without unnecessary blocking. +func (c *Cacher) waitUntilWatchCacheFreshAndForceAllEvents(ctx context.Context, requestedWatchRV uint64, opts storage.ListOptions) error { if opts.SendInitialEvents != nil && *opts.SendInitialEvents { + // TODO(p0lyn0mial): adapt the following logic once + // https://github.com/kubernetes/kubernetes/pull/123264 merges + if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && c.watchCache.notFresh(requestedWatchRV) { + c.watchCache.waitingUntilFresh.Add() + defer c.watchCache.waitingUntilFresh.Remove() + } + // TODO(p0lyn0mial): add a metric to track the number of times we have failed while waiting err := c.watchCache.waitUntilFreshAndBlock(ctx, requestedWatchRV) defer c.watchCache.RUnlock() - return err == nil, err + return err } - return false, nil + return nil } // errWatcher implements watch.Interface to return a single error diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index b6d59af079b..8a44802d86b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -32,6 +32,7 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -85,12 +86,25 @@ type dummyStorage struct { err error getListFn func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error watchFn func(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) + + // use getRequestWatchProgressCounter when reading + // the value of the counter + requestWatchProgressCounter int } func (d *dummyStorage) RequestWatchProgress(ctx context.Context) error { + d.Lock() + defer d.Unlock() + d.requestWatchProgressCounter++ return nil } +func (d *dummyStorage) getRequestWatchProgressCounter() int { + d.RLock() + defer d.RUnlock() + return d.requestWatchProgressCounter +} + type dummyWatch struct { ch chan watch.Event } @@ -1605,48 +1619,156 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) { func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)() - backingStorage := &dummyStorage{} - cacher, _, err := newTestCacher(backingStorage) - if err != nil { - t.Fatalf("Couldn't create cacher: %v", err) - } - defer cacher.Stop() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)() - opts := storage.ListOptions{ - Predicate: storage.Everything, - SendInitialEvents: pointer.Bool(true), - ResourceVersion: "105", - } - opts.Predicate.AllowWatchBookmarks = true - - w, err := cacher.Watch(context.Background(), "pods/ns", opts) - require.NoError(t, err, "failed to create watch: %v") - defer w.Stop() - verifyEvents(t, w, []watch.Event{ + scenarios := []struct { + name string + opts storage.ListOptions + backingStorage *dummyStorage + verifyBackingStore func(t *testing.T, s *dummyStorage) + }{ { - Type: watch.Error, - Object: &metav1.Status{ - Status: metav1.StatusFailure, - Message: storage.NewTooLargeResourceVersionError(105, 100, resourceVersionTooHighRetrySeconds).Error(), - Details: storage.NewTooLargeResourceVersionError(105, 100, resourceVersionTooHighRetrySeconds).(*apierrors.StatusError).Status().Details, - Reason: metav1.StatusReasonTimeout, - Code: 504, + name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=105", + opts: storage.ListOptions{ + Predicate: func() storage.SelectionPredicate { + p := storage.Everything + p.AllowWatchBookmarks = true + return p + }(), + SendInitialEvents: pointer.Bool(true), + ResourceVersion: "105", + }, + verifyBackingStore: func(t *testing.T, s *dummyStorage) { + require.NotEqual(t, 0, s.requestWatchProgressCounter, "expected store.RequestWatchProgressCounter to be > 0. It looks like watch progress wasn't requested!") }, }, - }, true) - go func() { - cacher.watchCache.Add(makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"})) - }() - w, err = cacher.Watch(context.Background(), "pods/ns", opts) - require.NoError(t, err, "failed to create watch: %v") - defer w.Stop() - verifyEvents(t, w, []watch.Event{ { - Type: watch.Added, - Object: makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}), + name: "legacy: allowWatchBookmarks=false, sendInitialEvents=true, RV=unset", + opts: storage.ListOptions{ + Predicate: func() storage.SelectionPredicate { + p := storage.Everything + p.AllowWatchBookmarks = false + return p + }(), + SendInitialEvents: pointer.Bool(true), + }, + backingStorage: func() *dummyStorage { + hasBeenPrimed := false + s := &dummyStorage{} + s.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + listAccessor, err := meta.ListAccessor(listObj) + if err != nil { + return err + } + // the first call to this function + // primes the cacher + if !hasBeenPrimed { + listAccessor.SetResourceVersion("100") + hasBeenPrimed = true + return nil + } + listAccessor.SetResourceVersion("105") + return nil + } + return s + }(), + verifyBackingStore: func(t *testing.T, s *dummyStorage) { + require.NotEqual(t, 0, s.getRequestWatchProgressCounter(), "expected store.RequestWatchProgressCounter to be > 0. It looks like watch progress wasn't requested!") + }, }, - }, true) + + { + name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=unset", + opts: storage.ListOptions{ + Predicate: func() storage.SelectionPredicate { + p := storage.Everything + p.AllowWatchBookmarks = true + return p + }(), + SendInitialEvents: pointer.Bool(true), + }, + backingStorage: func() *dummyStorage { + hasBeenPrimed := false + s := &dummyStorage{} + s.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + listAccessor, err := meta.ListAccessor(listObj) + if err != nil { + return err + } + // the first call to this function + // primes the cacher + if !hasBeenPrimed { + listAccessor.SetResourceVersion("100") + hasBeenPrimed = true + return nil + } + listAccessor.SetResourceVersion("105") + return nil + } + return s + }(), + verifyBackingStore: func(t *testing.T, s *dummyStorage) { + require.NotEqual(t, 0, s.getRequestWatchProgressCounter(), "expected store.RequestWatchProgressCounter to be > 0. It looks like watch progress wasn't requested!") + }, + }, + } + + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + var backingStorage *dummyStorage + if scenario.backingStorage != nil { + backingStorage = scenario.backingStorage + } else { + backingStorage = &dummyStorage{} + } + cacher, _, err := newTestCacher(backingStorage) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } + + w, err := cacher.Watch(context.Background(), "pods/ns", scenario.opts) + require.NoError(t, err, "failed to create watch: %v") + defer w.Stop() + var expectedErr *apierrors.StatusError + if !errors.As(storage.NewTooLargeResourceVersionError(105, 100, resourceVersionTooHighRetrySeconds), &expectedErr) { + t.Fatalf("Unable to convert NewTooLargeResourceVersionError to apierrors.StatusError") + } + verifyEvents(t, w, []watch.Event{ + { + Type: watch.Error, + Object: &metav1.Status{ + Status: metav1.StatusFailure, + Message: expectedErr.Error(), + Details: expectedErr.ErrStatus.Details, + Reason: metav1.StatusReasonTimeout, + Code: 504, + }, + }, + }, true) + + go func(t *testing.T) { + err := cacher.watchCache.Add(makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"})) + require.NoError(t, err, "failed adding a pod to the watchCache") + }(t) + w, err = cacher.Watch(context.Background(), "pods/ns", scenario.opts) + require.NoError(t, err, "failed to create watch: %v") + defer w.Stop() + verifyEvents(t, w, []watch.Event{ + { + Type: watch.Added, + Object: makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}), + }, + }, true) + if scenario.verifyBackingStore != nil { + scenario.verifyBackingStore(t, backingStorage) + } + }) + } } type fakeStorage struct { @@ -1864,3 +1986,362 @@ func TestForgetWatcher(t *testing.T) { assertCacherInternalState(0, 0) require.Equal(t, 2, forgetCounter) } + +// TestGetWatchCacheResourceVersion test the following cases: +// +// +-----------------+---------------------+-----------------------+ +// | ResourceVersion | AllowWatchBookmarks | SendInitialEvents | +// +=================+=====================+=======================+ +// | Unset | true/false | nil/true/false | +// | 0 | true/false | nil/true/false | +// | 95 | true/false | nil/true/false | +// +-----------------+---------------------+-----------------------+ +// where: +// - false indicates the value of the param was set to "false" by a test case +// - true indicates the value of the param was set to "true" by a test case +func TestGetWatchCacheResourceVersion(t *testing.T) { + listOptions := func(allowBookmarks bool, sendInitialEvents *bool, rv string) storage.ListOptions { + p := storage.Everything + p.AllowWatchBookmarks = allowBookmarks + + opts := storage.ListOptions{} + opts.Predicate = p + opts.SendInitialEvents = sendInitialEvents + opts.ResourceVersion = rv + return opts + } + + scenarios := []struct { + name string + opts storage.ListOptions + + expectedWatchResourceVersion int + }{ + // +-----------------+---------------------+-----------------------+ + // | ResourceVersion | AllowWatchBookmarks | SendInitialEvents | + // +=================+=====================+=======================+ + // | Unset | true/false | nil/true/false | + // +-----------------+---------------------+-----------------------+ + { + name: "RV=unset, allowWatchBookmarks=true, sendInitialEvents=nil", + opts: listOptions(true, nil, ""), + expectedWatchResourceVersion: 100, + }, + { + name: "RV=unset, allowWatchBookmarks=true, sendInitialEvents=true", + opts: listOptions(true, pointer.Bool(true), ""), + expectedWatchResourceVersion: 100, + }, + { + name: "RV=unset, allowWatchBookmarks=true, sendInitialEvents=false", + opts: listOptions(true, pointer.Bool(false), ""), + expectedWatchResourceVersion: 100, + }, + { + name: "RV=unset, allowWatchBookmarks=false, sendInitialEvents=nil", + opts: listOptions(false, nil, ""), + expectedWatchResourceVersion: 100, + }, + { + name: "RV=unset, allowWatchBookmarks=false, sendInitialEvents=true, legacy", + opts: listOptions(false, pointer.Bool(true), ""), + expectedWatchResourceVersion: 100, + }, + { + name: "RV=unset, allowWatchBookmarks=false, sendInitialEvents=false", + opts: listOptions(false, pointer.Bool(false), ""), + expectedWatchResourceVersion: 100, + }, + // +-----------------+---------------------+-----------------------+ + // | ResourceVersion | AllowWatchBookmarks | SendInitialEvents | + // +=================+=====================+=======================+ + // | 0 | true/false | nil/true/false | + // +-----------------+---------------------+-----------------------+ + { + name: "RV=0, allowWatchBookmarks=true, sendInitialEvents=nil", + opts: listOptions(true, nil, "0"), + expectedWatchResourceVersion: 0, + }, + { + name: "RV=0, allowWatchBookmarks=true, sendInitialEvents=true", + opts: listOptions(true, pointer.Bool(true), "0"), + expectedWatchResourceVersion: 0, + }, + { + name: "RV=0, allowWatchBookmarks=true, sendInitialEvents=false", + opts: listOptions(true, pointer.Bool(false), "0"), + expectedWatchResourceVersion: 0, + }, + { + name: "RV=0, allowWatchBookmarks=false, sendInitialEvents=nil", + opts: listOptions(false, nil, "0"), + expectedWatchResourceVersion: 0, + }, + { + name: "RV=0, allowWatchBookmarks=false, sendInitialEvents=true", + opts: listOptions(false, pointer.Bool(true), "0"), + expectedWatchResourceVersion: 0, + }, + { + name: "RV=0, allowWatchBookmarks=false, sendInitialEvents=false", + opts: listOptions(false, pointer.Bool(false), "0"), + expectedWatchResourceVersion: 0, + }, + // +-----------------+---------------------+-----------------------+ + // | ResourceVersion | AllowWatchBookmarks | SendInitialEvents | + // +=================+=====================+=======================+ + // | 95 | true/false | nil/true/false | + // +-----------------+---------------------+-----------------------+ + { + name: "RV=95, allowWatchBookmarks=true, sendInitialEvents=nil", + opts: listOptions(true, nil, "95"), + expectedWatchResourceVersion: 95, + }, + { + name: "RV=95, allowWatchBookmarks=true, sendInitialEvents=true", + opts: listOptions(true, pointer.Bool(true), "95"), + expectedWatchResourceVersion: 95, + }, + { + name: "RV=95, allowWatchBookmarks=true, sendInitialEvents=false", + opts: listOptions(true, pointer.Bool(false), "95"), + expectedWatchResourceVersion: 95, + }, + { + name: "RV=95, allowWatchBookmarks=false, sendInitialEvents=nil", + opts: listOptions(false, nil, "95"), + expectedWatchResourceVersion: 95, + }, + { + name: "RV=95, allowWatchBookmarks=false, sendInitialEvents=true", + opts: listOptions(false, pointer.Bool(true), "95"), + expectedWatchResourceVersion: 95, + }, + { + name: "RV=95, allowWatchBookmarks=false, sendInitialEvents=false", + opts: listOptions(false, pointer.Bool(false), "95"), + expectedWatchResourceVersion: 95, + }, + } + + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + backingStorage := &dummyStorage{} + cacher, _, err := newTestCacher(backingStorage) + require.NoError(t, err, "couldn't create cacher") + defer cacher.Stop() + + parsedResourceVersion := 0 + if len(scenario.opts.ResourceVersion) > 0 { + parsedResourceVersion, err = strconv.Atoi(scenario.opts.ResourceVersion) + require.NoError(t, err) + } + + actualResourceVersion, err := cacher.getWatchCacheResourceVersion(context.TODO(), uint64(parsedResourceVersion), scenario.opts) + require.NoError(t, err) + require.Equal(t, uint64(scenario.expectedWatchResourceVersion), actualResourceVersion, "received unexpected ResourceVersion") + }) + } +} + +// TestGetBookmarkAfterResourceVersionLockedFunc test the following cases: +// +// +-----------------+---------------------+-----------------------+ +// | ResourceVersion | AllowWatchBookmarks | SendInitialEvents | +// +=================+=====================+=======================+ +// | Unset | true/false | nil/true/false | +// | 0 | true/false | nil/true/false | +// | 95 | true/false | nil/true/false | +// +-----------------+---------------------+-----------------------+ +// where: +// - false indicates the value of the param was set to "false" by a test case +// - true indicates the value of the param was set to "true" by a test case +func TestGetBookmarkAfterResourceVersionLockedFunc(t *testing.T) { + listOptions := func(allowBookmarks bool, sendInitialEvents *bool, rv string) storage.ListOptions { + p := storage.Everything + p.AllowWatchBookmarks = allowBookmarks + + opts := storage.ListOptions{} + opts.Predicate = p + opts.SendInitialEvents = sendInitialEvents + opts.ResourceVersion = rv + return opts + } + + scenarios := []struct { + name string + opts storage.ListOptions + requiredResourceVersion int + watchCacheResourceVersion int + + expectedBookmarkResourceVersion int + }{ + // +-----------------+---------------------+-----------------------+ + // | ResourceVersion | AllowWatchBookmarks | SendInitialEvents | + // +=================+=====================+=======================+ + // | Unset | true/false | nil/true/false | + // +-----------------+---------------------+-----------------------+ + { + name: "RV=unset, allowWatchBookmarks=true, sendInitialEvents=nil", + opts: listOptions(true, nil, ""), + requiredResourceVersion: 100, + watchCacheResourceVersion: 99, + expectedBookmarkResourceVersion: 0, + }, + { + name: "RV=unset, allowWatchBookmarks=true, sendInitialEvents=true", + opts: listOptions(true, pointer.Bool(true), ""), + requiredResourceVersion: 100, + watchCacheResourceVersion: 99, + expectedBookmarkResourceVersion: 100, + }, + { + name: "RV=unset, allowWatchBookmarks=true, sendInitialEvents=false", + opts: listOptions(true, pointer.Bool(false), ""), + requiredResourceVersion: 100, + watchCacheResourceVersion: 99, + expectedBookmarkResourceVersion: 0, + }, + { + name: "RV=unset, allowWatchBookmarks=false, sendInitialEvents=nil", + opts: listOptions(false, nil, ""), + requiredResourceVersion: 100, + watchCacheResourceVersion: 99, + expectedBookmarkResourceVersion: 0, + }, + { + name: "RV=unset, allowWatchBookmarks=false, sendInitialEvents=true", + opts: listOptions(false, pointer.Bool(true), ""), + requiredResourceVersion: 100, + watchCacheResourceVersion: 99, + expectedBookmarkResourceVersion: 0, + }, + { + name: "RV=unset, allowWatchBookmarks=false, sendInitialEvents=false", + opts: listOptions(false, pointer.Bool(false), ""), + requiredResourceVersion: 100, + watchCacheResourceVersion: 99, + expectedBookmarkResourceVersion: 0, + }, + // +-----------------+---------------------+-----------------------+ + // | ResourceVersion | AllowWatchBookmarks | SendInitialEvents | + // +=================+=====================+=======================+ + // | 0 | true/false | nil/true/false | + // +-----------------+---------------------+-----------------------+ + { + name: "RV=0, allowWatchBookmarks=true, sendInitialEvents=nil", + opts: listOptions(true, nil, "0"), + requiredResourceVersion: 0, + watchCacheResourceVersion: 99, + expectedBookmarkResourceVersion: 0, + }, + { + name: "RV=0, allowWatchBookmarks=true, sendInitialEvents=true", + opts: listOptions(true, pointer.Bool(true), "0"), + requiredResourceVersion: 0, + watchCacheResourceVersion: 99, + expectedBookmarkResourceVersion: 99, + }, + { + name: "RV=0, allowWatchBookmarks=true, sendInitialEvents=false", + opts: listOptions(true, pointer.Bool(false), "0"), + requiredResourceVersion: 0, + watchCacheResourceVersion: 99, + expectedBookmarkResourceVersion: 0, + }, + { + name: "RV=0, allowWatchBookmarks=false, sendInitialEvents=nil", + opts: listOptions(false, nil, "0"), + requiredResourceVersion: 0, + watchCacheResourceVersion: 99, + expectedBookmarkResourceVersion: 0, + }, + { + name: "RV=0, allowWatchBookmarks=false, sendInitialEvents=true", + opts: listOptions(false, pointer.Bool(true), "0"), + requiredResourceVersion: 0, + watchCacheResourceVersion: 99, + expectedBookmarkResourceVersion: 0, + }, + { + name: "RV=0, allowWatchBookmarks=false, sendInitialEvents=false", + opts: listOptions(false, pointer.Bool(false), "0"), + requiredResourceVersion: 0, + watchCacheResourceVersion: 99, + expectedBookmarkResourceVersion: 0, + }, + // +-----------------+---------------------+-----------------------+ + // | ResourceVersion | AllowWatchBookmarks | SendInitialEvents | + // +=================+=====================+=======================+ + // | 95 | true/false | nil/true/false | + // +-----------------+---------------------+-----------------------+ + { + name: "RV=95, allowWatchBookmarks=true, sendInitialEvents=nil", + opts: listOptions(true, nil, "95"), + requiredResourceVersion: 0, + watchCacheResourceVersion: 99, + expectedBookmarkResourceVersion: 0, + }, + { + name: "RV=95, allowWatchBookmarks=true, sendInitialEvents=true", + opts: listOptions(true, pointer.Bool(true), "95"), + requiredResourceVersion: 0, + watchCacheResourceVersion: 99, + expectedBookmarkResourceVersion: 95, + }, + { + name: "RV=95, allowWatchBookmarks=true, sendInitialEvents=false", + opts: listOptions(true, pointer.Bool(false), "95"), + requiredResourceVersion: 0, + watchCacheResourceVersion: 99, + expectedBookmarkResourceVersion: 0, + }, + { + name: "RV=95, allowWatchBookmarks=false, sendInitialEvents=nil", + opts: listOptions(false, nil, "95"), + requiredResourceVersion: 100, + watchCacheResourceVersion: 99, + expectedBookmarkResourceVersion: 0, + }, + { + name: "RV=95, allowWatchBookmarks=false, sendInitialEvents=true", + opts: listOptions(false, pointer.Bool(true), "95"), + requiredResourceVersion: 0, + watchCacheResourceVersion: 99, + expectedBookmarkResourceVersion: 0, + }, + { + name: "RV=95, allowWatchBookmarks=false, sendInitialEvents=false", + opts: listOptions(false, pointer.Bool(false), "95"), + requiredResourceVersion: 0, + watchCacheResourceVersion: 99, + expectedBookmarkResourceVersion: 0, + }, + } + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + backingStorage := &dummyStorage{} + cacher, _, err := newTestCacher(backingStorage) + require.NoError(t, err, "couldn't create cacher") + + defer cacher.Stop() + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } + + cacher.watchCache.UpdateResourceVersion(fmt.Sprintf("%d", scenario.watchCacheResourceVersion)) + parsedResourceVersion := 0 + if len(scenario.opts.ResourceVersion) > 0 { + parsedResourceVersion, err = strconv.Atoi(scenario.opts.ResourceVersion) + require.NoError(t, err) + } + + getBookMarkFn, err := cacher.getBookmarkAfterResourceVersionLockedFunc(uint64(parsedResourceVersion), uint64(scenario.requiredResourceVersion), scenario.opts) + require.NoError(t, err) + cacher.watchCache.RLock() + defer cacher.watchCache.RUnlock() + getBookMarkResourceVersion := getBookMarkFn() + require.Equal(t, uint64(scenario.expectedBookmarkResourceVersion), getBookMarkResourceVersion, "received unexpected ResourceVersion") + }) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index c27ca053b78..cc797621b7a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -691,7 +691,11 @@ func (w *watchCache) isIndexValidLocked(index int) bool { // getAllEventsSinceLocked returns a watchCacheInterval that can be used to // retrieve events since a certain resourceVersion. This function assumes to // be called under the watchCache lock. -func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCacheInterval, error) { +func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, opts storage.ListOptions) (*watchCacheInterval, error) { + if opts.SendInitialEvents != nil && *opts.SendInitialEvents { + return w.getIntervalFromStoreLocked() + } + size := w.endIndex - w.startIndex var oldest uint64 switch { @@ -711,13 +715,19 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCach } if resourceVersion == 0 { - // resourceVersion = 0 means that we don't require any specific starting point - // and we would like to start watching from ~now. - // However, to keep backward compatibility, we additionally need to return the - // current state and only then start watching from that point. - // - // TODO: In v2 api, we should stop returning the current state - #13969. - return w.getIntervalFromStoreLocked() + if opts.SendInitialEvents == nil { + // resourceVersion = 0 means that we don't require any specific starting point + // and we would like to start watching from ~now. + // However, to keep backward compatibility, we additionally need to return the + // current state and only then start watching from that point. + // + // TODO: In v2 api, we should stop returning the current state - #13969. + return w.getIntervalFromStoreLocked() + } + // SendInitialEvents = false and resourceVersion = 0 + // means that the request would like to start watching + // from Any resourceVersion + resourceVersion = w.resourceVersion } if resourceVersion < oldest-1 { return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1)) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index 6809225d76d..8e37e0cf83e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -77,8 +77,8 @@ type testWatchCache struct { stopCh chan struct{} } -func (w *testWatchCache) getAllEventsSince(resourceVersion uint64) ([]*watchCacheEvent, error) { - cacheInterval, err := w.getCacheIntervalForEvents(resourceVersion) +func (w *testWatchCache) getAllEventsSince(resourceVersion uint64, opts storage.ListOptions) ([]*watchCacheEvent, error) { + cacheInterval, err := w.getCacheIntervalForEvents(resourceVersion, opts) if err != nil { return nil, err } @@ -98,11 +98,11 @@ func (w *testWatchCache) getAllEventsSince(resourceVersion uint64) ([]*watchCach return result, nil } -func (w *testWatchCache) getCacheIntervalForEvents(resourceVersion uint64) (*watchCacheInterval, error) { +func (w *testWatchCache) getCacheIntervalForEvents(resourceVersion uint64, opts storage.ListOptions) (*watchCacheInterval, error) { w.RLock() defer w.RUnlock() - return w.getAllEventsSinceLocked(resourceVersion) + return w.getAllEventsSinceLocked(resourceVersion, opts) } // newTestWatchCache just adds a fake clock. @@ -269,7 +269,7 @@ func TestEvents(t *testing.T) { // Test for Added event. { - _, err := store.getAllEventsSince(1) + _, err := store.getAllEventsSince(1, storage.ListOptions{}) if err == nil { t.Errorf("expected error too old") } @@ -278,7 +278,7 @@ func TestEvents(t *testing.T) { } } { - result, err := store.getAllEventsSince(2) + result, err := store.getAllEventsSince(2, storage.ListOptions{}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -302,13 +302,13 @@ func TestEvents(t *testing.T) { // Test with not full cache. { - _, err := store.getAllEventsSince(1) + _, err := store.getAllEventsSince(1, storage.ListOptions{}) if err == nil { t.Errorf("expected error too old") } } { - result, err := store.getAllEventsSince(3) + result, err := store.getAllEventsSince(3, storage.ListOptions{}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -336,13 +336,13 @@ func TestEvents(t *testing.T) { // Test with full cache - there should be elements from 5 to 9. { - _, err := store.getAllEventsSince(3) + _, err := store.getAllEventsSince(3, storage.ListOptions{}) if err == nil { t.Errorf("expected error too old") } } { - result, err := store.getAllEventsSince(4) + result, err := store.getAllEventsSince(4, storage.ListOptions{}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -361,7 +361,7 @@ func TestEvents(t *testing.T) { store.Delete(makeTestPod("pod", uint64(10))) { - result, err := store.getAllEventsSince(9) + result, err := store.getAllEventsSince(9, storage.ListOptions{}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -392,13 +392,13 @@ func TestMarker(t *testing.T) { makeTestPod("pod2", 9), }, "9") - _, err := store.getAllEventsSince(8) + _, err := store.getAllEventsSince(8, storage.ListOptions{}) if err == nil || !strings.Contains(err.Error(), "too old resource version") { t.Errorf("unexpected error: %v", err) } // Getting events from 8 should return no events, // even though there is a marker there. - result, err := store.getAllEventsSince(9) + result, err := store.getAllEventsSince(9, storage.ListOptions{}) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -409,7 +409,7 @@ func TestMarker(t *testing.T) { pod := makeTestPod("pods", 12) store.Add(pod) // Getting events from 8 should still work and return one event. - result, err = store.getAllEventsSince(9) + result, err = store.getAllEventsSince(9, storage.ListOptions{}) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -975,7 +975,7 @@ func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) { // Force cache resize. addEvent("key4", 50, later.Add(time.Second)) - _, err := store.getAllEventsSince(15) + _, err := store.getAllEventsSince(15, storage.ListOptions{}) if err == nil || !strings.Contains(err.Error(), "too old resource version") { t.Errorf("unexpected error: %v", err) }