mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-12 13:31:52 +00:00
cacher: Fix watch behaviour for unset RV
The original design was to honour strong consistency semantics for when the RV is unset, i.e. serve the watch by doing a quorum read. However, the implementation did not match the intent, in that, the Cacher did not distinguish between set and unset RV. This commit rectifies that behaviour by serving the watch from the underlying storage if the RV is unset. This commit subsequently also adds a test for the same. Signed-off-by: Madhav Jivrajani <madhav.jiv@gmail.com>
This commit is contained in:
parent
2593671337
commit
610b67031c
@ -482,6 +482,13 @@ func (c *Cacher) Delete(
|
||||
// Watch implements storage.Interface.
|
||||
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
|
||||
pred := opts.Predicate
|
||||
// If the resourceVersion is unset, ensure that the rv
|
||||
// from which the watch is being served, is the latest
|
||||
// one. "latest" is ensured by serving the watch from
|
||||
// the underlying storage.
|
||||
if opts.ResourceVersion == "" {
|
||||
return c.storage.Watch(ctx, key, opts)
|
||||
}
|
||||
watchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -449,6 +449,37 @@ func TestGetCacheBypass(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchCacheBypass(t *testing.T) {
|
||||
backingStorage := &dummyStorage{}
|
||||
cacher, _, err := newTestCacher(backingStorage)
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't create cacher: %v", err)
|
||||
}
|
||||
defer cacher.Stop()
|
||||
|
||||
// Wait until cacher is initialized.
|
||||
if err := cacher.ready.wait(); err != nil {
|
||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||
}
|
||||
|
||||
// Inject error to underlying layer and check if cacher is not bypassed.
|
||||
backingStorage.injectError(errDummy)
|
||||
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
|
||||
ResourceVersion: "0",
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("Watch with RV=0 should be served from cache: %v", err)
|
||||
}
|
||||
|
||||
// With unset RV, check if cacher is bypassed.
|
||||
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
|
||||
ResourceVersion: "",
|
||||
})
|
||||
if err != errDummy {
|
||||
t.Errorf("Watch with unset RV should bypass cacher: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatcherNotGoingBackInTime(t *testing.T) {
|
||||
backingStorage := &dummyStorage{}
|
||||
cacher, _, err := newTestCacher(backingStorage)
|
||||
|
Loading…
Reference in New Issue
Block a user