From c86543d9cb302859510604256fdb81ea1b0d46fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Thu, 18 Nov 2021 16:41:36 +0100 Subject: [PATCH] Unify List and GetToList functions of cacher. --- .../apiserver/pkg/storage/cacher/cacher.go | 93 ++++++------------- 1 file changed, 29 insertions(+), 64 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 217fa72588c..617552144f1 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -610,75 +610,40 @@ func shouldDelegateList(opts storage.ListOptions) bool { // GetToList implements storage.Interface. func (c *Cacher) GetToList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { - resourceVersion := opts.ResourceVersion - pred := opts.Predicate - if shouldDelegateList(opts) { - return c.storage.GetToList(ctx, key, opts, listObj) - } - - // If resourceVersion is specified, serve it from cache. - // It's guaranteed that the returned value is at least that - // fresh as the given resourceVersion. - listRV, err := c.versioner.ParseResourceVersion(resourceVersion) - if err != nil { - return err - } - - if listRV == 0 && !c.ready.check() { - // If Cacher is not yet initialized and we don't require any specific - // minimal resource version, simply forward the request to storage. - return c.storage.GetToList(ctx, key, opts, listObj) - } - - trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()}) - defer trace.LogIfLong(500 * time.Millisecond) - - c.ready.wait() - trace.Step("Ready") - - // List elements with at least 'listRV' from cache. - listPtr, err := meta.GetItemsPtr(listObj) - if err != nil { - return err - } - listVal, err := conversion.EnforcePtr(listPtr) - if err != nil { - return err - } - if listVal.Kind() != reflect.Slice { - return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind()) - } - filter := filterWithAttrsFunction(key, pred) - - obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace) - if err != nil { - return err - } - trace.Step("Got from cache") - - if exists { - elem, ok := obj.(*storeElement) - if !ok { - return fmt.Errorf("non *storeElement returned from storage: %v", obj) - } - if filter(elem.Key, elem.Labels, elem.Fields) { - listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem())) - } - } - if c.versioner != nil { - if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil { - return err - } - } - return nil + return c.list(ctx, key, opts, listObj, false) } // List implements storage.Interface. func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + return c.list(ctx, key, opts, listObj, true) +} + +func (c *Cacher) delegateList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, recursive bool) error { + if !recursive { + return c.storage.GetToList(ctx, key, opts, listObj) + } + return c.storage.List(ctx, key, opts, listObj) +} + +func (c *Cacher) listItems(listRV uint64, key string, pred storage.SelectionPredicate, trace *utiltrace.Trace, recursive bool) ([]interface{}, uint64, string, error) { + if !recursive { + obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace) + if err != nil { + return nil, 0, "", err + } + if exists { + return []interface{}{obj}, readResourceVersion, "", nil + } + return nil, readResourceVersion, "", nil + } + return c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace) +} + +func (c *Cacher) list(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, recursive bool) error { resourceVersion := opts.ResourceVersion pred := opts.Predicate if shouldDelegateList(opts) { - return c.storage.List(ctx, key, opts, listObj) + return c.delegateList(ctx, key, opts, listObj, recursive) } // If resourceVersion is specified, serve it from cache. @@ -692,7 +657,7 @@ func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, if listRV == 0 && !c.ready.check() { // If Cacher is not yet initialized and we don't require any specific // minimal resource version, simply forward the request to storage. - return c.storage.List(ctx, key, opts, listObj) + return c.delegateList(ctx, key, opts, listObj, recursive) } trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()}) @@ -715,7 +680,7 @@ func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, } filter := filterWithAttrsFunction(key, pred) - objs, readResourceVersion, indexUsed, err := c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace) + objs, readResourceVersion, indexUsed, err := c.listItems(listRV, key, pred, trace, recursive) if err != nil { return err }