Enable listing from memory

This commit is contained in:
Wojciech Tyczynski 2015-12-04 14:56:33 +01:00
parent 0caaf2583e
commit 0cefb43707
4 changed files with 23 additions and 23 deletions

View File

@ -227,7 +227,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
resyncCh, cleanup := r.resyncChan()
defer cleanup()
options := unversioned.ListOptions{}
// Explicitly set "0" as resource version - it's fine for the List()
// to be served from cache and potentially be delayed relative to
// etcd contents. Reflector framework will catch up via Watch() eventually.
options := unversioned.ListOptions{ResourceVersion: "0"}
list, err := r.listerWatcher.List(options)
if err != nil {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)

View File

@ -48,11 +48,6 @@ type CacherConfig struct {
// An underlying storage.Versioner.
Versioner Versioner
// Whether to serve Lists from in-memory cache.
//
// NOTE: DO NOT SET TO TRUE IN PRODUCTION CODE!
ListFromCache bool
// The Cache will be caching objects of a given Type and assumes that they
// are all stored under ResourcePrefix directory in the underlying database.
Type interface{}
@ -107,11 +102,6 @@ type Cacher struct {
// keyFunc is used to get a key in the underyling storage for a given object.
keyFunc func(runtime.Object) (string, error)
// Whether to serve Lists from in-memory cache.
//
// NOTE: DO NOT SET TO TRUE IN PRODUCTION CODE!
ListFromCache bool
}
// Create a new Cacher responsible from service WATCH and LIST requests from its
@ -153,15 +143,14 @@ func NewCacherFromConfig(config CacherConfig) *Cacher {
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
cacher := &Cacher{
usable: sync.RWMutex{},
storage: config.Storage,
watchCache: watchCache,
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
watcherIdx: 0,
watchers: make(map[int]*cacheWatcher),
versioner: config.Versioner,
keyFunc: config.KeyFunc,
ListFromCache: config.ListFromCache,
usable: sync.RWMutex{},
storage: config.Storage,
watchCache: watchCache,
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
watcherIdx: 0,
watchers: make(map[int]*cacheWatcher),
versioner: config.Versioner,
keyFunc: config.KeyFunc,
}
cacher.usable.Lock()
// See startCaching method for why explanation on it.
@ -270,10 +259,16 @@ func (c *Cacher) GetToList(ctx context.Context, key string, filter FilterFunc, l
// Implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, filter FilterFunc, listObj runtime.Object) error {
if !c.ListFromCache {
if resourceVersion == "" {
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility).
return c.storage.List(ctx, key, resourceVersion, filter, listObj)
}
// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
listRV, err := ParseListResourceVersion(resourceVersion)
if err != nil {
return err

View File

@ -52,7 +52,6 @@ func newTestCacher(s storage.Interface) *storage.Cacher {
CacheCapacity: 10,
Storage: s,
Versioner: etcdstorage.APIObjectVersioner{},
ListFromCache: true,
Type: &api.Pod{},
ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },

View File

@ -214,7 +214,10 @@ func scaleRC(wg *sync.WaitGroup, config *RCConfig) {
expectNoError(ScaleRC(config.Client, config.Namespace, config.Name, newSize, true),
fmt.Sprintf("scaling rc %s for the first time", config.Name))
selector := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.Name}))
options := unversioned.ListOptions{LabelSelector: unversioned.LabelSelector{selector}}
options := unversioned.ListOptions{
LabelSelector: unversioned.LabelSelector{selector},
ResourceVersion: "0",
}
_, err := config.Client.Pods(config.Namespace).List(options)
expectNoError(err, fmt.Sprintf("listing pods from rc %v", config.Name))
}