From 93c008f8a4f6c592994e199e8d7c1b721b3379fc Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Thu, 20 Oct 2016 18:23:13 +0200 Subject: [PATCH] Support resourceVersion in GetToList - unify interface of List and GetToList --- pkg/registry/generic/registry/store.go | 8 ++-- pkg/storage/cacher.go | 58 ++++++++++++++++++++++++-- pkg/storage/etcd/etcd_helper.go | 2 +- pkg/storage/etcd3/store.go | 2 +- pkg/storage/etcd3/store_test.go | 2 +- pkg/storage/interfaces.go | 4 +- pkg/storage/watch_cache.go | 30 +++++++++++-- pkg/storage/watch_cache_test.go | 24 +++++++++++ 8 files changed, 114 insertions(+), 16 deletions(-) diff --git a/pkg/registry/generic/registry/store.go b/pkg/registry/generic/registry/store.go index ec5d4ff1b0b..30094ee8b70 100644 --- a/pkg/registry/generic/registry/store.go +++ b/pkg/registry/generic/registry/store.go @@ -201,18 +201,18 @@ func (e *Store) List(ctx api.Context, options *api.ListOptions) (runtime.Object, // ListPredicate returns a list of all the items matching m. func (e *Store) ListPredicate(ctx api.Context, p storage.SelectionPredicate, options *api.ListOptions) (runtime.Object, error) { + if options == nil { + options = &api.ListOptions{ResourceVersion: "0"} + } list := e.NewListFunc() if name, ok := p.MatchesSingle(); ok { if key, err := e.KeyFunc(ctx, name); err == nil { - err := e.Storage.GetToList(ctx, key, p, list) + err := e.Storage.GetToList(ctx, key, options.ResourceVersion, p, list) return list, storeerr.InterpretListError(err, e.QualifiedResource) } // if we cannot extract a key based on the current context, the optimization is skipped } - if options == nil { - options = &api.ListOptions{ResourceVersion: "0"} - } err := e.Storage.List(ctx, e.KeyRootFunc(ctx), options.ResourceVersion, p, list) return list, storeerr.InterpretListError(err, e.QualifiedResource) } diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index a100f303348..f6fc7131288 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -344,8 +344,59 @@ func (c *Cacher) Get(ctx context.Context, key string, objPtr runtime.Object, ign } // Implements storage.Interface. -func (c *Cacher) GetToList(ctx context.Context, key string, pred SelectionPredicate, listObj runtime.Object) error { - return c.storage.GetToList(ctx, key, pred, listObj) +func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error { + if resourceVersion == "" { + // If resourceVersion is not specified, serve it from underlying + // storage (for backward compatibility). + return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj) + } + + // If resourceVersion is specified, serve it from cache. + // It's guaranteed that the returned value is at least that + // fresh as the given resourceVersion. + listRV, err := ParseListResourceVersion(resourceVersion) + if err != nil { + return err + } + + trace := util.NewTrace(fmt.Sprintf("cacher %v: List", c.objectType.String())) + defer trace.LogIfLong(500 * time.Millisecond) + + c.ready.wait() + trace.Step("Ready") + + // List elements with at least 'listRV' from cache. + listPtr, err := meta.GetItemsPtr(listObj) + if err != nil { + return err + } + listVal, err := conversion.EnforcePtr(listPtr) + if err != nil || listVal.Kind() != reflect.Slice { + return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind()) + } + filter := filterFunction(key, pred) + + obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace) + if err != nil { + return fmt.Errorf("failed to wait for fresh list: %v", err) + } + trace.Step("Got from cache") + + if exists { + elem, ok := obj.(*storeElement) + if !ok { + return fmt.Errorf("non *storeElement returned from storage: %v", obj) + } + if filter(elem.Key, elem.Object) { + listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem())) + } + } + if c.versioner != nil { + if err := c.versioner.UpdateList(listObj, readResourceVersion); err != nil { + return err + } + } + return nil } // Implements storage.Interface. @@ -359,7 +410,6 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p // If resourceVersion is specified, serve it from cache. // It's guaranteed that the returned value is at least that // fresh as the given resourceVersion. - listRV, err := ParseListResourceVersion(resourceVersion) if err != nil { return err @@ -371,7 +421,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p c.ready.wait() trace.Step("Ready") - // List elements from cache, with at least 'listRV'. + // List elements with at least 'listRV' from cache. listPtr, err := meta.GetItemsPtr(listObj) if err != nil { return err diff --git a/pkg/storage/etcd/etcd_helper.go b/pkg/storage/etcd/etcd_helper.go index 066d20d51ca..b01a5e15f51 100644 --- a/pkg/storage/etcd/etcd_helper.go +++ b/pkg/storage/etcd/etcd_helper.go @@ -297,7 +297,7 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run } // Implements storage.Interface. -func (h *etcdHelper) GetToList(ctx context.Context, key string, pred storage.SelectionPredicate, listObj runtime.Object) error { +func (h *etcdHelper) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error { if ctx == nil { glog.Errorf("Context is nil") } diff --git a/pkg/storage/etcd3/store.go b/pkg/storage/etcd3/store.go index 12a931daae3..2566450fbf4 100644 --- a/pkg/storage/etcd3/store.go +++ b/pkg/storage/etcd3/store.go @@ -270,7 +270,7 @@ func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Ob } // GetToList implements storage.Interface.GetToList. -func (s *store) GetToList(ctx context.Context, key string, pred storage.SelectionPredicate, listObj runtime.Object) error { +func (s *store) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error { listPtr, err := meta.GetItemsPtr(listObj) if err != nil { return err diff --git a/pkg/storage/etcd3/store_test.go b/pkg/storage/etcd3/store_test.go index c458164fc98..b9a394724c2 100644 --- a/pkg/storage/etcd3/store_test.go +++ b/pkg/storage/etcd3/store_test.go @@ -253,7 +253,7 @@ func TestGetToList(t *testing.T) { for i, tt := range tests { out := &api.PodList{} - err := store.GetToList(ctx, tt.key, tt.pred, out) + err := store.GetToList(ctx, tt.key, "", tt.pred, out) if err != nil { t.Fatalf("GetToList failed: %v", err) } diff --git a/pkg/storage/interfaces.go b/pkg/storage/interfaces.go index b540574d517..c6a0fdbc82b 100644 --- a/pkg/storage/interfaces.go +++ b/pkg/storage/interfaces.go @@ -128,7 +128,9 @@ type Interface interface { // GetToList unmarshals json found at key and opaque it into *List api object // (an object that satisfies the runtime.IsList definition). - GetToList(ctx context.Context, key string, p SelectionPredicate, listObj runtime.Object) error + // The returned contents may be delayed, but it is guaranteed that they will + // be have at least 'resourceVersion'. + GetToList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error // List unmarshalls jsons found at directory defined by key and opaque them // into *List api object (an object that satisfies runtime.IsList definition). diff --git a/pkg/storage/watch_cache.go b/pkg/storage/watch_cache.go index 683526bbf75..9357e4a81a7 100644 --- a/pkg/storage/watch_cache.go +++ b/pkg/storage/watch_cache.go @@ -245,8 +245,10 @@ func (w *watchCache) List() []interface{} { return w.store.List() } -// WaitUntilFreshAndList returns list of pointers to objects. -func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, trace *util.Trace) ([]interface{}, uint64, error) { +// waitUntilFreshAndBlock waits until cache is at least as fresh as given . +// NOTE: This function acquired lock and doesn't release it. +// You HAVE TO explicitly call w.RUnlock() after this function. +func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *util.Trace) error { startTime := w.clock.Now() go func() { // Wake us up when the time limit has expired. The docs @@ -261,22 +263,42 @@ func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, trace *util.T }() w.RLock() - defer w.RUnlock() if trace != nil { trace.Step("watchCache locked acquired") } for w.resourceVersion < resourceVersion { if w.clock.Since(startTime) >= MaximumListWait { - return nil, 0, fmt.Errorf("time limit exceeded while waiting for resource version %v (current value: %v)", resourceVersion, w.resourceVersion) + return fmt.Errorf("time limit exceeded while waiting for resource version %v (current value: %v)", resourceVersion, w.resourceVersion) } w.cond.Wait() } if trace != nil { trace.Step("watchCache fresh enough") } + return nil +} + +// WaitUntilFreshAndList returns list of pointers to objects. +func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, trace *util.Trace) ([]interface{}, uint64, error) { + err := w.waitUntilFreshAndBlock(resourceVersion, trace) + defer w.RUnlock() + if err != nil { + return nil, 0, err + } return w.store.List(), w.resourceVersion, nil } +// WaitUntilFreshAndGet returns a pointers to object. +func (w *watchCache) WaitUntilFreshAndGet(resourceVersion uint64, key string, trace *util.Trace) (interface{}, bool, uint64, error) { + err := w.waitUntilFreshAndBlock(resourceVersion, trace) + defer w.RUnlock() + if err != nil { + return nil, false, 0, err + } + value, exists, err := w.store.GetByKey(key) + return value, exists, w.resourceVersion, err +} + func (w *watchCache) ListKeys() []string { w.RLock() defer w.RUnlock() diff --git a/pkg/storage/watch_cache_test.go b/pkg/storage/watch_cache_test.go index 57628c362b1..8a930ff0876 100644 --- a/pkg/storage/watch_cache_test.go +++ b/pkg/storage/watch_cache_test.go @@ -263,6 +263,30 @@ func TestWaitUntilFreshAndList(t *testing.T) { } } +func TestWaitUntilFreshAndGet(t *testing.T) { + store := newTestWatchCache(3) + + // In background, update the store. + go func() { + store.Add(makeTestPod("foo", 2)) + store.Add(makeTestPod("bar", 5)) + }() + + obj, exists, resourceVersion, err := store.WaitUntilFreshAndGet(5, "prefix/ns/bar", nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if resourceVersion != 5 { + t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion) + } + if !exists { + t.Fatalf("no results returned: %#v", obj) + } + if !api.Semantic.DeepEqual(&storeElement{Key: "prefix/ns/bar", Object: makeTestPod("bar", 5)}, obj) { + t.Errorf("unexpected element returned: %#v", obj) + } +} + func TestWaitUntilFreshAndListTimeout(t *testing.T) { store := newTestWatchCache(3) fc := store.clock.(*clock.FakeClock)