Merge pull request #35218 from wojtek-t/get_to_list_support_resource_version

Automatic merge from submit-queue

Support resourceVersion in GetToList - unify interface of List and Ge…

This pretty much unifies the interface of List() and GetToList() methods of storage interface.

I'm going to use it in a subsequent PR to improve performance of the whole cluster.
This commit is contained in:
Kubernetes Submit Queue 2016-10-21 08:02:14 -07:00 committed by GitHub
commit 850c586b69
8 changed files with 114 additions and 16 deletions

View File

@ -201,18 +201,18 @@ func (e *Store) List(ctx api.Context, options *api.ListOptions) (runtime.Object,
// ListPredicate returns a list of all the items matching m.
func (e *Store) ListPredicate(ctx api.Context, p storage.SelectionPredicate, options *api.ListOptions) (runtime.Object, error) {
if options == nil {
options = &api.ListOptions{ResourceVersion: "0"}
}
list := e.NewListFunc()
if name, ok := p.MatchesSingle(); ok {
if key, err := e.KeyFunc(ctx, name); err == nil {
err := e.Storage.GetToList(ctx, key, p, list)
err := e.Storage.GetToList(ctx, key, options.ResourceVersion, p, list)
return list, storeerr.InterpretListError(err, e.QualifiedResource)
}
// if we cannot extract a key based on the current context, the optimization is skipped
}
if options == nil {
options = &api.ListOptions{ResourceVersion: "0"}
}
err := e.Storage.List(ctx, e.KeyRootFunc(ctx), options.ResourceVersion, p, list)
return list, storeerr.InterpretListError(err, e.QualifiedResource)
}

View File

@ -344,8 +344,59 @@ func (c *Cacher) Get(ctx context.Context, key string, objPtr runtime.Object, ign
}
// Implements storage.Interface.
func (c *Cacher) GetToList(ctx context.Context, key string, pred SelectionPredicate, listObj runtime.Object) error {
return c.storage.GetToList(ctx, key, pred, listObj)
func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error {
if resourceVersion == "" {
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility).
return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
}
// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
listRV, err := ParseListResourceVersion(resourceVersion)
if err != nil {
return err
}
trace := util.NewTrace(fmt.Sprintf("cacher %v: List", c.objectType.String()))
defer trace.LogIfLong(500 * time.Millisecond)
c.ready.wait()
trace.Step("Ready")
// List elements with at least 'listRV' from cache.
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
listVal, err := conversion.EnforcePtr(listPtr)
if err != nil || listVal.Kind() != reflect.Slice {
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
}
filter := filterFunction(key, pred)
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace)
if err != nil {
return fmt.Errorf("failed to wait for fresh list: %v", err)
}
trace.Step("Got from cache")
if exists {
elem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
}
if filter(elem.Key, elem.Object) {
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
}
}
if c.versioner != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion); err != nil {
return err
}
}
return nil
}
// Implements storage.Interface.
@ -359,7 +410,6 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
listRV, err := ParseListResourceVersion(resourceVersion)
if err != nil {
return err
@ -371,7 +421,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
c.ready.wait()
trace.Step("Ready")
// List elements from cache, with at least 'listRV'.
// List elements with at least 'listRV' from cache.
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err

View File

@ -297,7 +297,7 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
}
// Implements storage.Interface.
func (h *etcdHelper) GetToList(ctx context.Context, key string, pred storage.SelectionPredicate, listObj runtime.Object) error {
func (h *etcdHelper) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
if ctx == nil {
glog.Errorf("Context is nil")
}

View File

@ -270,7 +270,7 @@ func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Ob
}
// GetToList implements storage.Interface.GetToList.
func (s *store) GetToList(ctx context.Context, key string, pred storage.SelectionPredicate, listObj runtime.Object) error {
func (s *store) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err

View File

@ -253,7 +253,7 @@ func TestGetToList(t *testing.T) {
for i, tt := range tests {
out := &api.PodList{}
err := store.GetToList(ctx, tt.key, tt.pred, out)
err := store.GetToList(ctx, tt.key, "", tt.pred, out)
if err != nil {
t.Fatalf("GetToList failed: %v", err)
}

View File

@ -128,7 +128,9 @@ type Interface interface {
// GetToList unmarshals json found at key and opaque it into *List api object
// (an object that satisfies the runtime.IsList definition).
GetToList(ctx context.Context, key string, p SelectionPredicate, listObj runtime.Object) error
// The returned contents may be delayed, but it is guaranteed that they will
// be have at least 'resourceVersion'.
GetToList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error
// List unmarshalls jsons found at directory defined by key and opaque them
// into *List api object (an object that satisfies runtime.IsList definition).

View File

@ -245,8 +245,10 @@ func (w *watchCache) List() []interface{} {
return w.store.List()
}
// WaitUntilFreshAndList returns list of pointers to <storeElement> objects.
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, trace *util.Trace) ([]interface{}, uint64, error) {
// waitUntilFreshAndBlock waits until cache is at least as fresh as given <resourceVersion>.
// NOTE: This function acquired lock and doesn't release it.
// You HAVE TO explicitly call w.RUnlock() after this function.
func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *util.Trace) error {
startTime := w.clock.Now()
go func() {
// Wake us up when the time limit has expired. The docs
@ -261,22 +263,42 @@ func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, trace *util.T
}()
w.RLock()
defer w.RUnlock()
if trace != nil {
trace.Step("watchCache locked acquired")
}
for w.resourceVersion < resourceVersion {
if w.clock.Since(startTime) >= MaximumListWait {
return nil, 0, fmt.Errorf("time limit exceeded while waiting for resource version %v (current value: %v)", resourceVersion, w.resourceVersion)
return fmt.Errorf("time limit exceeded while waiting for resource version %v (current value: %v)", resourceVersion, w.resourceVersion)
}
w.cond.Wait()
}
if trace != nil {
trace.Step("watchCache fresh enough")
}
return nil
}
// WaitUntilFreshAndList returns list of pointers to <storeElement> objects.
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, trace *util.Trace) ([]interface{}, uint64, error) {
err := w.waitUntilFreshAndBlock(resourceVersion, trace)
defer w.RUnlock()
if err != nil {
return nil, 0, err
}
return w.store.List(), w.resourceVersion, nil
}
// WaitUntilFreshAndGet returns a pointers to <storeElement> object.
func (w *watchCache) WaitUntilFreshAndGet(resourceVersion uint64, key string, trace *util.Trace) (interface{}, bool, uint64, error) {
err := w.waitUntilFreshAndBlock(resourceVersion, trace)
defer w.RUnlock()
if err != nil {
return nil, false, 0, err
}
value, exists, err := w.store.GetByKey(key)
return value, exists, w.resourceVersion, err
}
func (w *watchCache) ListKeys() []string {
w.RLock()
defer w.RUnlock()

View File

@ -263,6 +263,30 @@ func TestWaitUntilFreshAndList(t *testing.T) {
}
}
func TestWaitUntilFreshAndGet(t *testing.T) {
store := newTestWatchCache(3)
// In background, update the store.
go func() {
store.Add(makeTestPod("foo", 2))
store.Add(makeTestPod("bar", 5))
}()
obj, exists, resourceVersion, err := store.WaitUntilFreshAndGet(5, "prefix/ns/bar", nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if resourceVersion != 5 {
t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion)
}
if !exists {
t.Fatalf("no results returned: %#v", obj)
}
if !api.Semantic.DeepEqual(&storeElement{Key: "prefix/ns/bar", Object: makeTestPod("bar", 5)}, obj) {
t.Errorf("unexpected element returned: %#v", obj)
}
}
func TestWaitUntilFreshAndListTimeout(t *testing.T) {
store := newTestWatchCache(3)
fc := store.clock.(*clock.FakeClock)