mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-12 05:21:58 +00:00
Merge pull request #115096 from MadhavJivrajani/unset-rv-watch-semantics
cacher: Fix watch behaviour for unset RV
This commit is contained in:
commit
70f337c0d5
@ -482,6 +482,13 @@ func (c *Cacher) Delete(
|
|||||||
// Watch implements storage.Interface.
|
// Watch implements storage.Interface.
|
||||||
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
|
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
|
||||||
pred := opts.Predicate
|
pred := opts.Predicate
|
||||||
|
// If the resourceVersion is unset, ensure that the rv
|
||||||
|
// from which the watch is being served, is the latest
|
||||||
|
// one. "latest" is ensured by serving the watch from
|
||||||
|
// the underlying storage.
|
||||||
|
if opts.ResourceVersion == "" {
|
||||||
|
return c.storage.Watch(ctx, key, opts)
|
||||||
|
}
|
||||||
watchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
|
watchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -277,6 +277,7 @@ func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type dummyStorage struct {
|
type dummyStorage struct {
|
||||||
|
sync.RWMutex
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -306,12 +307,21 @@ func (d *dummyStorage) Delete(_ context.Context, _ string, _ runtime.Object, _ *
|
|||||||
return fmt.Errorf("unimplemented")
|
return fmt.Errorf("unimplemented")
|
||||||
}
|
}
|
||||||
func (d *dummyStorage) Watch(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) {
|
func (d *dummyStorage) Watch(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) {
|
||||||
return newDummyWatch(), nil
|
d.RLock()
|
||||||
|
defer d.RUnlock()
|
||||||
|
|
||||||
|
return newDummyWatch(), d.err
|
||||||
}
|
}
|
||||||
func (d *dummyStorage) Get(_ context.Context, _ string, _ storage.GetOptions, _ runtime.Object) error {
|
func (d *dummyStorage) Get(_ context.Context, _ string, _ storage.GetOptions, _ runtime.Object) error {
|
||||||
|
d.RLock()
|
||||||
|
defer d.RUnlock()
|
||||||
|
|
||||||
return d.err
|
return d.err
|
||||||
}
|
}
|
||||||
func (d *dummyStorage) GetList(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error {
|
func (d *dummyStorage) GetList(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error {
|
||||||
|
d.RLock()
|
||||||
|
defer d.RUnlock()
|
||||||
|
|
||||||
podList := listObj.(*example.PodList)
|
podList := listObj.(*example.PodList)
|
||||||
podList.ListMeta = metav1.ListMeta{ResourceVersion: "100"}
|
podList.ListMeta = metav1.ListMeta{ResourceVersion: "100"}
|
||||||
return d.err
|
return d.err
|
||||||
@ -322,6 +332,12 @@ func (d *dummyStorage) GuaranteedUpdate(_ context.Context, _ string, _ runtime.O
|
|||||||
func (d *dummyStorage) Count(_ string) (int64, error) {
|
func (d *dummyStorage) Count(_ string) (int64, error) {
|
||||||
return 0, fmt.Errorf("unimplemented")
|
return 0, fmt.Errorf("unimplemented")
|
||||||
}
|
}
|
||||||
|
func (d *dummyStorage) injectError(err error) {
|
||||||
|
d.Lock()
|
||||||
|
defer d.Unlock()
|
||||||
|
|
||||||
|
d.err = err
|
||||||
|
}
|
||||||
|
|
||||||
func TestGetListCacheBypass(t *testing.T) {
|
func TestGetListCacheBypass(t *testing.T) {
|
||||||
backingStorage := &dummyStorage{}
|
backingStorage := &dummyStorage{}
|
||||||
@ -342,7 +358,7 @@ func TestGetListCacheBypass(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Inject error to underlying layer and check if cacher is not bypassed.
|
// Inject error to underlying layer and check if cacher is not bypassed.
|
||||||
backingStorage.err = errDummy
|
backingStorage.injectError(errDummy)
|
||||||
err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{
|
err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{
|
||||||
ResourceVersion: "0",
|
ResourceVersion: "0",
|
||||||
Predicate: pred,
|
Predicate: pred,
|
||||||
@ -381,7 +397,7 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Inject error to underlying layer and check if cacher is not bypassed.
|
// Inject error to underlying layer and check if cacher is not bypassed.
|
||||||
backingStorage.err = errDummy
|
backingStorage.injectError(errDummy)
|
||||||
err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{
|
err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{
|
||||||
ResourceVersion: "0",
|
ResourceVersion: "0",
|
||||||
Predicate: pred,
|
Predicate: pred,
|
||||||
@ -415,7 +431,7 @@ func TestGetCacheBypass(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Inject error to underlying layer and check if cacher is not bypassed.
|
// Inject error to underlying layer and check if cacher is not bypassed.
|
||||||
backingStorage.err = errDummy
|
backingStorage.injectError(errDummy)
|
||||||
err = cacher.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
|
err = cacher.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
|
||||||
IgnoreNotFound: true,
|
IgnoreNotFound: true,
|
||||||
ResourceVersion: "0",
|
ResourceVersion: "0",
|
||||||
@ -433,6 +449,37 @@ func TestGetCacheBypass(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWatchCacheBypass(t *testing.T) {
|
||||||
|
backingStorage := &dummyStorage{}
|
||||||
|
cacher, _, err := newTestCacher(backingStorage)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
|
defer cacher.Stop()
|
||||||
|
|
||||||
|
// Wait until cacher is initialized.
|
||||||
|
if err := cacher.ready.wait(); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Inject error to underlying layer and check if cacher is not bypassed.
|
||||||
|
backingStorage.injectError(errDummy)
|
||||||
|
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
|
||||||
|
ResourceVersion: "0",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Watch with RV=0 should be served from cache: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// With unset RV, check if cacher is bypassed.
|
||||||
|
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
|
||||||
|
ResourceVersion: "",
|
||||||
|
})
|
||||||
|
if err != errDummy {
|
||||||
|
t.Errorf("Watch with unset RV should bypass cacher: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestWatcherNotGoingBackInTime(t *testing.T) {
|
func TestWatcherNotGoingBackInTime(t *testing.T) {
|
||||||
backingStorage := &dummyStorage{}
|
backingStorage := &dummyStorage{}
|
||||||
cacher, _, err := newTestCacher(backingStorage)
|
cacher, _, err := newTestCacher(backingStorage)
|
||||||
|
Loading…
Reference in New Issue
Block a user