Merge pull request #122830 from p0lyn0mial/upstream-watch-cache-wati-for-bk-after-rv

storage/cacher: ensure the cache is at the Most Recent ResourceVersion when streaming was requested
This commit is contained in:
Kubernetes Prow Robot 2024-02-28 01:43:37 -08:00 committed by GitHub
commit d2b4928669
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 594 additions and 105 deletions

View File

@ -513,6 +513,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) && opts.SendInitialEvents != nil { if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) && opts.SendInitialEvents != nil {
opts.SendInitialEvents = nil opts.SendInitialEvents = nil
} }
// TODO: we should eventually get rid of this legacy case
if opts.SendInitialEvents == nil && opts.ResourceVersion == "" { if opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
return c.storage.Watch(ctx, key, opts) return c.storage.Watch(ctx, key, opts)
} }
@ -557,14 +558,14 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
// watchers on our watcher having a processing hiccup // watchers on our watcher having a processing hiccup
chanSize := c.watchCache.suggestedWatchChannelSize(c.indexedTrigger != nil, triggerSupported) chanSize := c.watchCache.suggestedWatchChannelSize(c.indexedTrigger != nil, triggerSupported)
// Determine a function that computes the bookmarkAfterResourceVersion // Determine the ResourceVersion to which the watch cache must be synchronized
bookmarkAfterResourceVersionFn, err := c.getBookmarkAfterResourceVersionLockedFunc(ctx, requestedWatchRV, opts) requiredResourceVersion, err := c.getWatchCacheResourceVersion(ctx, requestedWatchRV, opts)
if err != nil { if err != nil {
return newErrWatcher(err), nil return newErrWatcher(err), nil
} }
// Determine a function that computes the watchRV we should start from // Determine a function that computes the bookmarkAfterResourceVersion
startWatchResourceVersionFn, err := c.getStartResourceVersionForWatchLockedFunc(ctx, requestedWatchRV, opts) bookmarkAfterResourceVersionFn, err := c.getBookmarkAfterResourceVersionLockedFunc(requestedWatchRV, requiredResourceVersion, opts)
if err != nil { if err != nil {
return newErrWatcher(err), nil return newErrWatcher(err), nil
} }
@ -596,7 +597,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
// moreover even though the c.waitUntilWatchCacheFreshAndForceAllEvents acquires a lock // moreover even though the c.waitUntilWatchCacheFreshAndForceAllEvents acquires a lock
// it is safe to release the lock after the method finishes because we don't require // it is safe to release the lock after the method finishes because we don't require
// any atomicity between the call to the method and further calls that actually get the events. // any atomicity between the call to the method and further calls that actually get the events.
forceAllEvents, err := c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requestedWatchRV, opts) err = c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requiredResourceVersion, opts)
if err != nil { if err != nil {
return newErrWatcher(err), nil return newErrWatcher(err), nil
} }
@ -609,13 +610,8 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
c.watchCache.RLock() c.watchCache.RLock()
defer c.watchCache.RUnlock() defer c.watchCache.RUnlock()
startWatchRV := startWatchResourceVersionFn()
var cacheInterval *watchCacheInterval var cacheInterval *watchCacheInterval
if forceAllEvents { cacheInterval, err = c.watchCache.getAllEventsSinceLocked(requiredResourceVersion, opts)
cacheInterval, err = c.watchCache.getIntervalFromStoreLocked()
} else {
cacheInterval, err = c.watchCache.getAllEventsSinceLocked(startWatchRV)
}
if err != nil { if err != nil {
// To match the uncached watch implementation, once we have passed authn/authz/admission, // To match the uncached watch implementation, once we have passed authn/authz/admission,
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR, // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
@ -657,7 +653,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
return newImmediateCloseWatcher(), nil return newImmediateCloseWatcher(), nil
} }
go watcher.processInterval(ctx, cacheInterval, startWatchRV) go watcher.processInterval(ctx, cacheInterval, requiredResourceVersion)
return watcher, nil return watcher, nil
} }
@ -1249,59 +1245,61 @@ func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
// spits a ResourceVersion after which the bookmark event will be delivered. // spits a ResourceVersion after which the bookmark event will be delivered.
// //
// The returned function must be called under the watchCache lock. // The returned function must be called under the watchCache lock.
func (c *Cacher) getBookmarkAfterResourceVersionLockedFunc(ctx context.Context, parsedResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) { func (c *Cacher) getBookmarkAfterResourceVersionLockedFunc(parsedResourceVersion, requiredResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) {
if opts.SendInitialEvents == nil || !*opts.SendInitialEvents || !opts.Predicate.AllowWatchBookmarks { if opts.SendInitialEvents == nil || !*opts.SendInitialEvents || !opts.Predicate.AllowWatchBookmarks {
return func() uint64 { return 0 }, nil return func() uint64 { return 0 }, nil
} }
return c.getCommonResourceVersionLockedFunc(ctx, parsedResourceVersion, opts)
}
// getStartResourceVersionForWatchLockedFunc returns a function that
// spits a ResourceVersion the watch will be started from.
// Depending on the input parameters the semantics of the returned ResourceVersion are:
// - start at Exact (return parsedWatchResourceVersion)
// - start at Most Recent (return an RV from etcd)
// - start at Any (return the current watchCache's RV)
//
// The returned function must be called under the watchCache lock.
func (c *Cacher) getStartResourceVersionForWatchLockedFunc(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) {
if opts.SendInitialEvents == nil || *opts.SendInitialEvents {
return func() uint64 { return parsedWatchResourceVersion }, nil
}
return c.getCommonResourceVersionLockedFunc(ctx, parsedWatchResourceVersion, opts)
}
// getCommonResourceVersionLockedFunc a helper that simply computes a ResourceVersion
// based on the input parameters. Please examine callers of this method to get more context.
//
// The returned function must be called under the watchCache lock.
func (c *Cacher) getCommonResourceVersionLockedFunc(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) {
switch { switch {
case len(opts.ResourceVersion) == 0: case len(opts.ResourceVersion) == 0:
rv, err := storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String()) return func() uint64 { return requiredResourceVersion }, nil
if err != nil { case parsedResourceVersion == 0:
return nil, err
}
return func() uint64 { return rv }, nil
case parsedWatchResourceVersion == 0:
// here we assume that watchCache locked is already held // here we assume that watchCache locked is already held
return func() uint64 { return c.watchCache.resourceVersion }, nil return func() uint64 { return c.watchCache.resourceVersion }, nil
default: default:
return func() uint64 { return parsedWatchResourceVersion }, nil return func() uint64 { return parsedResourceVersion }, nil
} }
} }
// getWatchCacheResourceVersion returns a ResourceVersion to which the watch cache must be synchronized to
//
// Depending on the input parameters, the semantics of the returned ResourceVersion are:
// - must be at Exact RV (when parsedWatchResourceVersion > 0)
// - can be at Any RV (when parsedWatchResourceVersion = 0)
// - must be at Most Recent RV (return an RV from etcd)
//
// note that the above semantic is enforced by the API validation (defined elsewhere):
//
// if SendInitiaEvents != nil => ResourceVersionMatch = NotOlderThan
// if ResourceVersionmatch != nil => ResourceVersionMatch = NotOlderThan & SendInitialEvents != nil
//
// to satisfy the legacy case (SendInitialEvents = true, RV="") we skip checking opts.Predicate.AllowWatchBookmarks
func (c *Cacher) getWatchCacheResourceVersion(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (uint64, error) {
if len(opts.ResourceVersion) != 0 {
return parsedWatchResourceVersion, nil
}
rv, err := storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
return rv, err
}
// waitUntilWatchCacheFreshAndForceAllEvents waits until cache is at least // waitUntilWatchCacheFreshAndForceAllEvents waits until cache is at least
// as fresh as given requestedWatchRV if sendInitialEvents was requested. // as fresh as given requestedWatchRV if sendInitialEvents was requested.
// Additionally, it instructs the caller whether it should ask for // otherwise, we allow for establishing the connection because the clients
// all events from the cache (full state) or not. // can wait for events without unnecessary blocking.
func (c *Cacher) waitUntilWatchCacheFreshAndForceAllEvents(ctx context.Context, requestedWatchRV uint64, opts storage.ListOptions) (bool, error) { func (c *Cacher) waitUntilWatchCacheFreshAndForceAllEvents(ctx context.Context, requestedWatchRV uint64, opts storage.ListOptions) error {
if opts.SendInitialEvents != nil && *opts.SendInitialEvents { if opts.SendInitialEvents != nil && *opts.SendInitialEvents {
// TODO(p0lyn0mial): adapt the following logic once
// https://github.com/kubernetes/kubernetes/pull/123264 merges
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && c.watchCache.notFresh(requestedWatchRV) {
c.watchCache.waitingUntilFresh.Add()
defer c.watchCache.waitingUntilFresh.Remove()
}
// TODO(p0lyn0mial): add a metric to track the number of times we have failed while waiting
err := c.watchCache.waitUntilFreshAndBlock(ctx, requestedWatchRV) err := c.watchCache.waitUntilFreshAndBlock(ctx, requestedWatchRV)
defer c.watchCache.RUnlock() defer c.watchCache.RUnlock()
return err == nil, err return err
} }
return false, nil return nil
} }
// errWatcher implements watch.Interface to return a single error // errWatcher implements watch.Interface to return a single error

View File

@ -32,6 +32,7 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality" apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
@ -85,12 +86,25 @@ type dummyStorage struct {
err error err error
getListFn func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error getListFn func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error
watchFn func(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) watchFn func(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error)
// use getRequestWatchProgressCounter when reading
// the value of the counter
requestWatchProgressCounter int
} }
func (d *dummyStorage) RequestWatchProgress(ctx context.Context) error { func (d *dummyStorage) RequestWatchProgress(ctx context.Context) error {
d.Lock()
defer d.Unlock()
d.requestWatchProgressCounter++
return nil return nil
} }
func (d *dummyStorage) getRequestWatchProgressCounter() int {
d.RLock()
defer d.RUnlock()
return d.requestWatchProgressCounter
}
type dummyWatch struct { type dummyWatch struct {
ch chan watch.Event ch chan watch.Event
} }
@ -1605,48 +1619,156 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) {
func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) { func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
backingStorage := &dummyStorage{} defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)()
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
opts := storage.ListOptions{ scenarios := []struct {
Predicate: storage.Everything, name string
SendInitialEvents: pointer.Bool(true), opts storage.ListOptions
ResourceVersion: "105", backingStorage *dummyStorage
} verifyBackingStore func(t *testing.T, s *dummyStorage)
opts.Predicate.AllowWatchBookmarks = true }{
w, err := cacher.Watch(context.Background(), "pods/ns", opts)
require.NoError(t, err, "failed to create watch: %v")
defer w.Stop()
verifyEvents(t, w, []watch.Event{
{ {
Type: watch.Error, name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=105",
Object: &metav1.Status{ opts: storage.ListOptions{
Status: metav1.StatusFailure, Predicate: func() storage.SelectionPredicate {
Message: storage.NewTooLargeResourceVersionError(105, 100, resourceVersionTooHighRetrySeconds).Error(), p := storage.Everything
Details: storage.NewTooLargeResourceVersionError(105, 100, resourceVersionTooHighRetrySeconds).(*apierrors.StatusError).Status().Details, p.AllowWatchBookmarks = true
Reason: metav1.StatusReasonTimeout, return p
Code: 504, }(),
SendInitialEvents: pointer.Bool(true),
ResourceVersion: "105",
},
verifyBackingStore: func(t *testing.T, s *dummyStorage) {
require.NotEqual(t, 0, s.requestWatchProgressCounter, "expected store.RequestWatchProgressCounter to be > 0. It looks like watch progress wasn't requested!")
}, },
}, },
}, true)
go func() {
cacher.watchCache.Add(makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}))
}()
w, err = cacher.Watch(context.Background(), "pods/ns", opts)
require.NoError(t, err, "failed to create watch: %v")
defer w.Stop()
verifyEvents(t, w, []watch.Event{
{ {
Type: watch.Added, name: "legacy: allowWatchBookmarks=false, sendInitialEvents=true, RV=unset",
Object: makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}), opts: storage.ListOptions{
Predicate: func() storage.SelectionPredicate {
p := storage.Everything
p.AllowWatchBookmarks = false
return p
}(),
SendInitialEvents: pointer.Bool(true),
},
backingStorage: func() *dummyStorage {
hasBeenPrimed := false
s := &dummyStorage{}
s.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
listAccessor, err := meta.ListAccessor(listObj)
if err != nil {
return err
}
// the first call to this function
// primes the cacher
if !hasBeenPrimed {
listAccessor.SetResourceVersion("100")
hasBeenPrimed = true
return nil
}
listAccessor.SetResourceVersion("105")
return nil
}
return s
}(),
verifyBackingStore: func(t *testing.T, s *dummyStorage) {
require.NotEqual(t, 0, s.getRequestWatchProgressCounter(), "expected store.RequestWatchProgressCounter to be > 0. It looks like watch progress wasn't requested!")
},
}, },
}, true)
{
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=unset",
opts: storage.ListOptions{
Predicate: func() storage.SelectionPredicate {
p := storage.Everything
p.AllowWatchBookmarks = true
return p
}(),
SendInitialEvents: pointer.Bool(true),
},
backingStorage: func() *dummyStorage {
hasBeenPrimed := false
s := &dummyStorage{}
s.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
listAccessor, err := meta.ListAccessor(listObj)
if err != nil {
return err
}
// the first call to this function
// primes the cacher
if !hasBeenPrimed {
listAccessor.SetResourceVersion("100")
hasBeenPrimed = true
return nil
}
listAccessor.SetResourceVersion("105")
return nil
}
return s
}(),
verifyBackingStore: func(t *testing.T, s *dummyStorage) {
require.NotEqual(t, 0, s.getRequestWatchProgressCounter(), "expected store.RequestWatchProgressCounter to be > 0. It looks like watch progress wasn't requested!")
},
},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
var backingStorage *dummyStorage
if scenario.backingStorage != nil {
backingStorage = scenario.backingStorage
} else {
backingStorage = &dummyStorage{}
}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
w, err := cacher.Watch(context.Background(), "pods/ns", scenario.opts)
require.NoError(t, err, "failed to create watch: %v")
defer w.Stop()
var expectedErr *apierrors.StatusError
if !errors.As(storage.NewTooLargeResourceVersionError(105, 100, resourceVersionTooHighRetrySeconds), &expectedErr) {
t.Fatalf("Unable to convert NewTooLargeResourceVersionError to apierrors.StatusError")
}
verifyEvents(t, w, []watch.Event{
{
Type: watch.Error,
Object: &metav1.Status{
Status: metav1.StatusFailure,
Message: expectedErr.Error(),
Details: expectedErr.ErrStatus.Details,
Reason: metav1.StatusReasonTimeout,
Code: 504,
},
},
}, true)
go func(t *testing.T) {
err := cacher.watchCache.Add(makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}))
require.NoError(t, err, "failed adding a pod to the watchCache")
}(t)
w, err = cacher.Watch(context.Background(), "pods/ns", scenario.opts)
require.NoError(t, err, "failed to create watch: %v")
defer w.Stop()
verifyEvents(t, w, []watch.Event{
{
Type: watch.Added,
Object: makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}),
},
}, true)
if scenario.verifyBackingStore != nil {
scenario.verifyBackingStore(t, backingStorage)
}
})
}
} }
type fakeStorage struct { type fakeStorage struct {
@ -1864,3 +1986,362 @@ func TestForgetWatcher(t *testing.T) {
assertCacherInternalState(0, 0) assertCacherInternalState(0, 0)
require.Equal(t, 2, forgetCounter) require.Equal(t, 2, forgetCounter)
} }
// TestGetWatchCacheResourceVersion test the following cases:
//
// +-----------------+---------------------+-----------------------+
// | ResourceVersion | AllowWatchBookmarks | SendInitialEvents |
// +=================+=====================+=======================+
// | Unset | true/false | nil/true/false |
// | 0 | true/false | nil/true/false |
// | 95 | true/false | nil/true/false |
// +-----------------+---------------------+-----------------------+
// where:
// - false indicates the value of the param was set to "false" by a test case
// - true indicates the value of the param was set to "true" by a test case
func TestGetWatchCacheResourceVersion(t *testing.T) {
listOptions := func(allowBookmarks bool, sendInitialEvents *bool, rv string) storage.ListOptions {
p := storage.Everything
p.AllowWatchBookmarks = allowBookmarks
opts := storage.ListOptions{}
opts.Predicate = p
opts.SendInitialEvents = sendInitialEvents
opts.ResourceVersion = rv
return opts
}
scenarios := []struct {
name string
opts storage.ListOptions
expectedWatchResourceVersion int
}{
// +-----------------+---------------------+-----------------------+
// | ResourceVersion | AllowWatchBookmarks | SendInitialEvents |
// +=================+=====================+=======================+
// | Unset | true/false | nil/true/false |
// +-----------------+---------------------+-----------------------+
{
name: "RV=unset, allowWatchBookmarks=true, sendInitialEvents=nil",
opts: listOptions(true, nil, ""),
expectedWatchResourceVersion: 100,
},
{
name: "RV=unset, allowWatchBookmarks=true, sendInitialEvents=true",
opts: listOptions(true, pointer.Bool(true), ""),
expectedWatchResourceVersion: 100,
},
{
name: "RV=unset, allowWatchBookmarks=true, sendInitialEvents=false",
opts: listOptions(true, pointer.Bool(false), ""),
expectedWatchResourceVersion: 100,
},
{
name: "RV=unset, allowWatchBookmarks=false, sendInitialEvents=nil",
opts: listOptions(false, nil, ""),
expectedWatchResourceVersion: 100,
},
{
name: "RV=unset, allowWatchBookmarks=false, sendInitialEvents=true, legacy",
opts: listOptions(false, pointer.Bool(true), ""),
expectedWatchResourceVersion: 100,
},
{
name: "RV=unset, allowWatchBookmarks=false, sendInitialEvents=false",
opts: listOptions(false, pointer.Bool(false), ""),
expectedWatchResourceVersion: 100,
},
// +-----------------+---------------------+-----------------------+
// | ResourceVersion | AllowWatchBookmarks | SendInitialEvents |
// +=================+=====================+=======================+
// | 0 | true/false | nil/true/false |
// +-----------------+---------------------+-----------------------+
{
name: "RV=0, allowWatchBookmarks=true, sendInitialEvents=nil",
opts: listOptions(true, nil, "0"),
expectedWatchResourceVersion: 0,
},
{
name: "RV=0, allowWatchBookmarks=true, sendInitialEvents=true",
opts: listOptions(true, pointer.Bool(true), "0"),
expectedWatchResourceVersion: 0,
},
{
name: "RV=0, allowWatchBookmarks=true, sendInitialEvents=false",
opts: listOptions(true, pointer.Bool(false), "0"),
expectedWatchResourceVersion: 0,
},
{
name: "RV=0, allowWatchBookmarks=false, sendInitialEvents=nil",
opts: listOptions(false, nil, "0"),
expectedWatchResourceVersion: 0,
},
{
name: "RV=0, allowWatchBookmarks=false, sendInitialEvents=true",
opts: listOptions(false, pointer.Bool(true), "0"),
expectedWatchResourceVersion: 0,
},
{
name: "RV=0, allowWatchBookmarks=false, sendInitialEvents=false",
opts: listOptions(false, pointer.Bool(false), "0"),
expectedWatchResourceVersion: 0,
},
// +-----------------+---------------------+-----------------------+
// | ResourceVersion | AllowWatchBookmarks | SendInitialEvents |
// +=================+=====================+=======================+
// | 95 | true/false | nil/true/false |
// +-----------------+---------------------+-----------------------+
{
name: "RV=95, allowWatchBookmarks=true, sendInitialEvents=nil",
opts: listOptions(true, nil, "95"),
expectedWatchResourceVersion: 95,
},
{
name: "RV=95, allowWatchBookmarks=true, sendInitialEvents=true",
opts: listOptions(true, pointer.Bool(true), "95"),
expectedWatchResourceVersion: 95,
},
{
name: "RV=95, allowWatchBookmarks=true, sendInitialEvents=false",
opts: listOptions(true, pointer.Bool(false), "95"),
expectedWatchResourceVersion: 95,
},
{
name: "RV=95, allowWatchBookmarks=false, sendInitialEvents=nil",
opts: listOptions(false, nil, "95"),
expectedWatchResourceVersion: 95,
},
{
name: "RV=95, allowWatchBookmarks=false, sendInitialEvents=true",
opts: listOptions(false, pointer.Bool(true), "95"),
expectedWatchResourceVersion: 95,
},
{
name: "RV=95, allowWatchBookmarks=false, sendInitialEvents=false",
opts: listOptions(false, pointer.Bool(false), "95"),
expectedWatchResourceVersion: 95,
},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
require.NoError(t, err, "couldn't create cacher")
defer cacher.Stop()
parsedResourceVersion := 0
if len(scenario.opts.ResourceVersion) > 0 {
parsedResourceVersion, err = strconv.Atoi(scenario.opts.ResourceVersion)
require.NoError(t, err)
}
actualResourceVersion, err := cacher.getWatchCacheResourceVersion(context.TODO(), uint64(parsedResourceVersion), scenario.opts)
require.NoError(t, err)
require.Equal(t, uint64(scenario.expectedWatchResourceVersion), actualResourceVersion, "received unexpected ResourceVersion")
})
}
}
// TestGetBookmarkAfterResourceVersionLockedFunc test the following cases:
//
// +-----------------+---------------------+-----------------------+
// | ResourceVersion | AllowWatchBookmarks | SendInitialEvents |
// +=================+=====================+=======================+
// | Unset | true/false | nil/true/false |
// | 0 | true/false | nil/true/false |
// | 95 | true/false | nil/true/false |
// +-----------------+---------------------+-----------------------+
// where:
// - false indicates the value of the param was set to "false" by a test case
// - true indicates the value of the param was set to "true" by a test case
func TestGetBookmarkAfterResourceVersionLockedFunc(t *testing.T) {
listOptions := func(allowBookmarks bool, sendInitialEvents *bool, rv string) storage.ListOptions {
p := storage.Everything
p.AllowWatchBookmarks = allowBookmarks
opts := storage.ListOptions{}
opts.Predicate = p
opts.SendInitialEvents = sendInitialEvents
opts.ResourceVersion = rv
return opts
}
scenarios := []struct {
name string
opts storage.ListOptions
requiredResourceVersion int
watchCacheResourceVersion int
expectedBookmarkResourceVersion int
}{
// +-----------------+---------------------+-----------------------+
// | ResourceVersion | AllowWatchBookmarks | SendInitialEvents |
// +=================+=====================+=======================+
// | Unset | true/false | nil/true/false |
// +-----------------+---------------------+-----------------------+
{
name: "RV=unset, allowWatchBookmarks=true, sendInitialEvents=nil",
opts: listOptions(true, nil, ""),
requiredResourceVersion: 100,
watchCacheResourceVersion: 99,
expectedBookmarkResourceVersion: 0,
},
{
name: "RV=unset, allowWatchBookmarks=true, sendInitialEvents=true",
opts: listOptions(true, pointer.Bool(true), ""),
requiredResourceVersion: 100,
watchCacheResourceVersion: 99,
expectedBookmarkResourceVersion: 100,
},
{
name: "RV=unset, allowWatchBookmarks=true, sendInitialEvents=false",
opts: listOptions(true, pointer.Bool(false), ""),
requiredResourceVersion: 100,
watchCacheResourceVersion: 99,
expectedBookmarkResourceVersion: 0,
},
{
name: "RV=unset, allowWatchBookmarks=false, sendInitialEvents=nil",
opts: listOptions(false, nil, ""),
requiredResourceVersion: 100,
watchCacheResourceVersion: 99,
expectedBookmarkResourceVersion: 0,
},
{
name: "RV=unset, allowWatchBookmarks=false, sendInitialEvents=true",
opts: listOptions(false, pointer.Bool(true), ""),
requiredResourceVersion: 100,
watchCacheResourceVersion: 99,
expectedBookmarkResourceVersion: 0,
},
{
name: "RV=unset, allowWatchBookmarks=false, sendInitialEvents=false",
opts: listOptions(false, pointer.Bool(false), ""),
requiredResourceVersion: 100,
watchCacheResourceVersion: 99,
expectedBookmarkResourceVersion: 0,
},
// +-----------------+---------------------+-----------------------+
// | ResourceVersion | AllowWatchBookmarks | SendInitialEvents |
// +=================+=====================+=======================+
// | 0 | true/false | nil/true/false |
// +-----------------+---------------------+-----------------------+
{
name: "RV=0, allowWatchBookmarks=true, sendInitialEvents=nil",
opts: listOptions(true, nil, "0"),
requiredResourceVersion: 0,
watchCacheResourceVersion: 99,
expectedBookmarkResourceVersion: 0,
},
{
name: "RV=0, allowWatchBookmarks=true, sendInitialEvents=true",
opts: listOptions(true, pointer.Bool(true), "0"),
requiredResourceVersion: 0,
watchCacheResourceVersion: 99,
expectedBookmarkResourceVersion: 99,
},
{
name: "RV=0, allowWatchBookmarks=true, sendInitialEvents=false",
opts: listOptions(true, pointer.Bool(false), "0"),
requiredResourceVersion: 0,
watchCacheResourceVersion: 99,
expectedBookmarkResourceVersion: 0,
},
{
name: "RV=0, allowWatchBookmarks=false, sendInitialEvents=nil",
opts: listOptions(false, nil, "0"),
requiredResourceVersion: 0,
watchCacheResourceVersion: 99,
expectedBookmarkResourceVersion: 0,
},
{
name: "RV=0, allowWatchBookmarks=false, sendInitialEvents=true",
opts: listOptions(false, pointer.Bool(true), "0"),
requiredResourceVersion: 0,
watchCacheResourceVersion: 99,
expectedBookmarkResourceVersion: 0,
},
{
name: "RV=0, allowWatchBookmarks=false, sendInitialEvents=false",
opts: listOptions(false, pointer.Bool(false), "0"),
requiredResourceVersion: 0,
watchCacheResourceVersion: 99,
expectedBookmarkResourceVersion: 0,
},
// +-----------------+---------------------+-----------------------+
// | ResourceVersion | AllowWatchBookmarks | SendInitialEvents |
// +=================+=====================+=======================+
// | 95 | true/false | nil/true/false |
// +-----------------+---------------------+-----------------------+
{
name: "RV=95, allowWatchBookmarks=true, sendInitialEvents=nil",
opts: listOptions(true, nil, "95"),
requiredResourceVersion: 0,
watchCacheResourceVersion: 99,
expectedBookmarkResourceVersion: 0,
},
{
name: "RV=95, allowWatchBookmarks=true, sendInitialEvents=true",
opts: listOptions(true, pointer.Bool(true), "95"),
requiredResourceVersion: 0,
watchCacheResourceVersion: 99,
expectedBookmarkResourceVersion: 95,
},
{
name: "RV=95, allowWatchBookmarks=true, sendInitialEvents=false",
opts: listOptions(true, pointer.Bool(false), "95"),
requiredResourceVersion: 0,
watchCacheResourceVersion: 99,
expectedBookmarkResourceVersion: 0,
},
{
name: "RV=95, allowWatchBookmarks=false, sendInitialEvents=nil",
opts: listOptions(false, nil, "95"),
requiredResourceVersion: 100,
watchCacheResourceVersion: 99,
expectedBookmarkResourceVersion: 0,
},
{
name: "RV=95, allowWatchBookmarks=false, sendInitialEvents=true",
opts: listOptions(false, pointer.Bool(true), "95"),
requiredResourceVersion: 0,
watchCacheResourceVersion: 99,
expectedBookmarkResourceVersion: 0,
},
{
name: "RV=95, allowWatchBookmarks=false, sendInitialEvents=false",
opts: listOptions(false, pointer.Bool(false), "95"),
requiredResourceVersion: 0,
watchCacheResourceVersion: 99,
expectedBookmarkResourceVersion: 0,
},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
require.NoError(t, err, "couldn't create cacher")
defer cacher.Stop()
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
cacher.watchCache.UpdateResourceVersion(fmt.Sprintf("%d", scenario.watchCacheResourceVersion))
parsedResourceVersion := 0
if len(scenario.opts.ResourceVersion) > 0 {
parsedResourceVersion, err = strconv.Atoi(scenario.opts.ResourceVersion)
require.NoError(t, err)
}
getBookMarkFn, err := cacher.getBookmarkAfterResourceVersionLockedFunc(uint64(parsedResourceVersion), uint64(scenario.requiredResourceVersion), scenario.opts)
require.NoError(t, err)
cacher.watchCache.RLock()
defer cacher.watchCache.RUnlock()
getBookMarkResourceVersion := getBookMarkFn()
require.Equal(t, uint64(scenario.expectedBookmarkResourceVersion), getBookMarkResourceVersion, "received unexpected ResourceVersion")
})
}
}

View File

@ -691,7 +691,11 @@ func (w *watchCache) isIndexValidLocked(index int) bool {
// getAllEventsSinceLocked returns a watchCacheInterval that can be used to // getAllEventsSinceLocked returns a watchCacheInterval that can be used to
// retrieve events since a certain resourceVersion. This function assumes to // retrieve events since a certain resourceVersion. This function assumes to
// be called under the watchCache lock. // be called under the watchCache lock.
func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCacheInterval, error) { func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, opts storage.ListOptions) (*watchCacheInterval, error) {
if opts.SendInitialEvents != nil && *opts.SendInitialEvents {
return w.getIntervalFromStoreLocked()
}
size := w.endIndex - w.startIndex size := w.endIndex - w.startIndex
var oldest uint64 var oldest uint64
switch { switch {
@ -711,13 +715,19 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCach
} }
if resourceVersion == 0 { if resourceVersion == 0 {
// resourceVersion = 0 means that we don't require any specific starting point if opts.SendInitialEvents == nil {
// and we would like to start watching from ~now. // resourceVersion = 0 means that we don't require any specific starting point
// However, to keep backward compatibility, we additionally need to return the // and we would like to start watching from ~now.
// current state and only then start watching from that point. // However, to keep backward compatibility, we additionally need to return the
// // current state and only then start watching from that point.
// TODO: In v2 api, we should stop returning the current state - #13969. //
return w.getIntervalFromStoreLocked() // TODO: In v2 api, we should stop returning the current state - #13969.
return w.getIntervalFromStoreLocked()
}
// SendInitialEvents = false and resourceVersion = 0
// means that the request would like to start watching
// from Any resourceVersion
resourceVersion = w.resourceVersion
} }
if resourceVersion < oldest-1 { if resourceVersion < oldest-1 {
return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1)) return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))

View File

@ -77,8 +77,8 @@ type testWatchCache struct {
stopCh chan struct{} stopCh chan struct{}
} }
func (w *testWatchCache) getAllEventsSince(resourceVersion uint64) ([]*watchCacheEvent, error) { func (w *testWatchCache) getAllEventsSince(resourceVersion uint64, opts storage.ListOptions) ([]*watchCacheEvent, error) {
cacheInterval, err := w.getCacheIntervalForEvents(resourceVersion) cacheInterval, err := w.getCacheIntervalForEvents(resourceVersion, opts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -98,11 +98,11 @@ func (w *testWatchCache) getAllEventsSince(resourceVersion uint64) ([]*watchCach
return result, nil return result, nil
} }
func (w *testWatchCache) getCacheIntervalForEvents(resourceVersion uint64) (*watchCacheInterval, error) { func (w *testWatchCache) getCacheIntervalForEvents(resourceVersion uint64, opts storage.ListOptions) (*watchCacheInterval, error) {
w.RLock() w.RLock()
defer w.RUnlock() defer w.RUnlock()
return w.getAllEventsSinceLocked(resourceVersion) return w.getAllEventsSinceLocked(resourceVersion, opts)
} }
// newTestWatchCache just adds a fake clock. // newTestWatchCache just adds a fake clock.
@ -269,7 +269,7 @@ func TestEvents(t *testing.T) {
// Test for Added event. // Test for Added event.
{ {
_, err := store.getAllEventsSince(1) _, err := store.getAllEventsSince(1, storage.ListOptions{})
if err == nil { if err == nil {
t.Errorf("expected error too old") t.Errorf("expected error too old")
} }
@ -278,7 +278,7 @@ func TestEvents(t *testing.T) {
} }
} }
{ {
result, err := store.getAllEventsSince(2) result, err := store.getAllEventsSince(2, storage.ListOptions{})
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -302,13 +302,13 @@ func TestEvents(t *testing.T) {
// Test with not full cache. // Test with not full cache.
{ {
_, err := store.getAllEventsSince(1) _, err := store.getAllEventsSince(1, storage.ListOptions{})
if err == nil { if err == nil {
t.Errorf("expected error too old") t.Errorf("expected error too old")
} }
} }
{ {
result, err := store.getAllEventsSince(3) result, err := store.getAllEventsSince(3, storage.ListOptions{})
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -336,13 +336,13 @@ func TestEvents(t *testing.T) {
// Test with full cache - there should be elements from 5 to 9. // Test with full cache - there should be elements from 5 to 9.
{ {
_, err := store.getAllEventsSince(3) _, err := store.getAllEventsSince(3, storage.ListOptions{})
if err == nil { if err == nil {
t.Errorf("expected error too old") t.Errorf("expected error too old")
} }
} }
{ {
result, err := store.getAllEventsSince(4) result, err := store.getAllEventsSince(4, storage.ListOptions{})
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -361,7 +361,7 @@ func TestEvents(t *testing.T) {
store.Delete(makeTestPod("pod", uint64(10))) store.Delete(makeTestPod("pod", uint64(10)))
{ {
result, err := store.getAllEventsSince(9) result, err := store.getAllEventsSince(9, storage.ListOptions{})
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -392,13 +392,13 @@ func TestMarker(t *testing.T) {
makeTestPod("pod2", 9), makeTestPod("pod2", 9),
}, "9") }, "9")
_, err := store.getAllEventsSince(8) _, err := store.getAllEventsSince(8, storage.ListOptions{})
if err == nil || !strings.Contains(err.Error(), "too old resource version") { if err == nil || !strings.Contains(err.Error(), "too old resource version") {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
// Getting events from 8 should return no events, // Getting events from 8 should return no events,
// even though there is a marker there. // even though there is a marker there.
result, err := store.getAllEventsSince(9) result, err := store.getAllEventsSince(9, storage.ListOptions{})
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@ -409,7 +409,7 @@ func TestMarker(t *testing.T) {
pod := makeTestPod("pods", 12) pod := makeTestPod("pods", 12)
store.Add(pod) store.Add(pod)
// Getting events from 8 should still work and return one event. // Getting events from 8 should still work and return one event.
result, err = store.getAllEventsSince(9) result, err = store.getAllEventsSince(9, storage.ListOptions{})
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@ -975,7 +975,7 @@ func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) {
// Force cache resize. // Force cache resize.
addEvent("key4", 50, later.Add(time.Second)) addEvent("key4", 50, later.Add(time.Second))
_, err := store.getAllEventsSince(15) _, err := store.getAllEventsSince(15, storage.ListOptions{})
if err == nil || !strings.Contains(err.Error(), "too old resource version") { if err == nil || !strings.Contains(err.Error(), "too old resource version") {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }