diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 9f198ba25a6..48e66d72b4b 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -227,7 +227,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { resyncCh, cleanup := r.resyncChan() defer cleanup() - options := unversioned.ListOptions{} + // Explicitly set "0" as resource version - it's fine for the List() + // to be served from cache and potentially be delayed relative to + // etcd contents. Reflector framework will catch up via Watch() eventually. + options := unversioned.ListOptions{ResourceVersion: "0"} list, err := r.listerWatcher.List(options) if err != nil { return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err) diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index 7f4258de3b8..6defe84faac 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -48,11 +48,6 @@ type CacherConfig struct { // An underlying storage.Versioner. Versioner Versioner - // Whether to serve Lists from in-memory cache. - // - // NOTE: DO NOT SET TO TRUE IN PRODUCTION CODE! - ListFromCache bool - // The Cache will be caching objects of a given Type and assumes that they // are all stored under ResourcePrefix directory in the underlying database. Type interface{} @@ -107,11 +102,6 @@ type Cacher struct { // keyFunc is used to get a key in the underyling storage for a given object. keyFunc func(runtime.Object) (string, error) - - // Whether to serve Lists from in-memory cache. - // - // NOTE: DO NOT SET TO TRUE IN PRODUCTION CODE! - ListFromCache bool } // Create a new Cacher responsible from service WATCH and LIST requests from its @@ -153,15 +143,14 @@ func NewCacherFromConfig(config CacherConfig) *Cacher { listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) cacher := &Cacher{ - usable: sync.RWMutex{}, - storage: config.Storage, - watchCache: watchCache, - reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0), - watcherIdx: 0, - watchers: make(map[int]*cacheWatcher), - versioner: config.Versioner, - keyFunc: config.KeyFunc, - ListFromCache: config.ListFromCache, + usable: sync.RWMutex{}, + storage: config.Storage, + watchCache: watchCache, + reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0), + watcherIdx: 0, + watchers: make(map[int]*cacheWatcher), + versioner: config.Versioner, + keyFunc: config.KeyFunc, } cacher.usable.Lock() // See startCaching method for why explanation on it. @@ -270,10 +259,16 @@ func (c *Cacher) GetToList(ctx context.Context, key string, filter FilterFunc, l // Implements storage.Interface. func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, filter FilterFunc, listObj runtime.Object) error { - if !c.ListFromCache { + if resourceVersion == "" { + // If resourceVersion is not specified, serve it from underlying + // storage (for backward compatibility). return c.storage.List(ctx, key, resourceVersion, filter, listObj) } + // If resourceVersion is specified, serve it from cache. + // It's guaranteed that the returned value is at least that + // fresh as the given resourceVersion. + listRV, err := ParseListResourceVersion(resourceVersion) if err != nil { return err diff --git a/pkg/storage/cacher_test.go b/pkg/storage/cacher_test.go index 402a2004f08..1c749959299 100644 --- a/pkg/storage/cacher_test.go +++ b/pkg/storage/cacher_test.go @@ -52,7 +52,6 @@ func newTestCacher(s storage.Interface) *storage.Cacher { CacheCapacity: 10, Storage: s, Versioner: etcdstorage.APIObjectVersioner{}, - ListFromCache: true, Type: &api.Pod{}, ResourcePrefix: prefix, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, diff --git a/test/e2e/load.go b/test/e2e/load.go index b731540ee29..14a0164e4df 100644 --- a/test/e2e/load.go +++ b/test/e2e/load.go @@ -214,7 +214,10 @@ func scaleRC(wg *sync.WaitGroup, config *RCConfig) { expectNoError(ScaleRC(config.Client, config.Namespace, config.Name, newSize, true), fmt.Sprintf("scaling rc %s for the first time", config.Name)) selector := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.Name})) - options := unversioned.ListOptions{LabelSelector: unversioned.LabelSelector{selector}} + options := unversioned.ListOptions{ + LabelSelector: unversioned.LabelSelector{selector}, + ResourceVersion: "0", + } _, err := config.Client.Pods(config.Namespace).List(options) expectNoError(err, fmt.Sprintf("listing pods from rc %v", config.Name)) }