diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index b6dcd2a8760..ec7d194ff74 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -606,75 +606,40 @@ func shouldDelegateList(opts storage.ListOptions) bool { // GetToList implements storage.Interface. func (c *Cacher) GetToList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { - resourceVersion := opts.ResourceVersion - pred := opts.Predicate - if shouldDelegateList(opts) { - return c.storage.GetToList(ctx, key, opts, listObj) - } - - // If resourceVersion is specified, serve it from cache. - // It's guaranteed that the returned value is at least that - // fresh as the given resourceVersion. - listRV, err := c.versioner.ParseResourceVersion(resourceVersion) - if err != nil { - return err - } - - if listRV == 0 && !c.ready.check() { - // If Cacher is not yet initialized and we don't require any specific - // minimal resource version, simply forward the request to storage. - return c.storage.GetToList(ctx, key, opts, listObj) - } - - trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()}) - defer trace.LogIfLong(500 * time.Millisecond) - - c.ready.wait() - trace.Step("Ready") - - // List elements with at least 'listRV' from cache. - listPtr, err := meta.GetItemsPtr(listObj) - if err != nil { - return err - } - listVal, err := conversion.EnforcePtr(listPtr) - if err != nil { - return err - } - if listVal.Kind() != reflect.Slice { - return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind()) - } - filter := filterWithAttrsFunction(key, pred) - - obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace) - if err != nil { - return err - } - trace.Step("Got from cache") - - if exists { - elem, ok := obj.(*storeElement) - if !ok { - return fmt.Errorf("non *storeElement returned from storage: %v", obj) - } - if filter(elem.Key, elem.Labels, elem.Fields) { - listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem())) - } - } - if c.versioner != nil { - if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil { - return err - } - } - return nil + return c.list(ctx, key, opts, listObj, false) } // List implements storage.Interface. func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + return c.list(ctx, key, opts, listObj, true) +} + +func (c *Cacher) delegateList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, recursive bool) error { + if !recursive { + return c.storage.GetToList(ctx, key, opts, listObj) + } + return c.storage.List(ctx, key, opts, listObj) +} + +func (c *Cacher) listItems(listRV uint64, key string, pred storage.SelectionPredicate, trace *utiltrace.Trace, recursive bool) ([]interface{}, uint64, string, error) { + if !recursive { + obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace) + if err != nil { + return nil, 0, "", err + } + if exists { + return []interface{}{obj}, readResourceVersion, "", nil + } + return nil, readResourceVersion, "", nil + } + return c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace) +} + +func (c *Cacher) list(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, recursive bool) error { resourceVersion := opts.ResourceVersion pred := opts.Predicate if shouldDelegateList(opts) { - return c.storage.List(ctx, key, opts, listObj) + return c.delegateList(ctx, key, opts, listObj, recursive) } // If resourceVersion is specified, serve it from cache. @@ -688,7 +653,7 @@ func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, if listRV == 0 && !c.ready.check() { // If Cacher is not yet initialized and we don't require any specific // minimal resource version, simply forward the request to storage. - return c.storage.List(ctx, key, opts, listObj) + return c.delegateList(ctx, key, opts, listObj, recursive) } trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()}) @@ -711,7 +676,7 @@ func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, } filter := filterWithAttrsFunction(key, pred) - objs, readResourceVersion, indexUsed, err := c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace) + objs, readResourceVersion, indexUsed, err := c.listItems(listRV, key, pred, trace, recursive) if err != nil { return err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 59305cff505..d7fe75eeaca 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -458,58 +458,7 @@ func (s *store) GuaranteedUpdate( // GetToList implements storage.Interface.GetToList. func (s *store) GetToList(ctx context.Context, key string, listOpts storage.ListOptions, listObj runtime.Object) error { - resourceVersion := listOpts.ResourceVersion - match := listOpts.ResourceVersionMatch - pred := listOpts.Predicate - trace := utiltrace.New("GetToList etcd3", - utiltrace.Field{"key", key}, - utiltrace.Field{"resourceVersion", resourceVersion}, - utiltrace.Field{"resourceVersionMatch", match}, - utiltrace.Field{"limit", pred.Limit}, - utiltrace.Field{"continue", pred.Continue}) - defer trace.LogIfLong(500 * time.Millisecond) - listPtr, err := meta.GetItemsPtr(listObj) - if err != nil { - return err - } - v, err := conversion.EnforcePtr(listPtr) - if err != nil || v.Kind() != reflect.Slice { - return fmt.Errorf("need ptr to slice: %v", err) - } - - newItemFunc := getNewItemFunc(listObj, v) - - key = path.Join(s.pathPrefix, key) - startTime := time.Now() - var opts []clientv3.OpOption - if len(resourceVersion) > 0 && match == metav1.ResourceVersionMatchExact { - rv, err := s.versioner.ParseResourceVersion(resourceVersion) - if err != nil { - return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err)) - } - opts = append(opts, clientv3.WithRev(int64(rv))) - } - - getResp, err := s.client.KV.Get(ctx, key, opts...) - metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime) - if err != nil { - return err - } - if err = s.validateMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil { - return err - } - - if len(getResp.Kvs) > 0 { - data, _, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value, authenticatedDataString(key)) - if err != nil { - return storage.NewInternalError(err.Error()) - } - if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), pred, s.codec, s.versioner, newItemFunc); err != nil { - return err - } - } - // update version with cluster level revision - return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision), "", nil) + return s.list(ctx, key, listOpts, listObj, false) } func getNewItemFunc(listObj runtime.Object, v reflect.Value) func() runtime.Object { @@ -610,10 +559,14 @@ func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error // List implements storage.Interface.List. func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + return s.list(ctx, key, opts, listObj, true) +} + +func (s *store) list(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, recursive bool) error { resourceVersion := opts.ResourceVersion match := opts.ResourceVersionMatch pred := opts.Predicate - trace := utiltrace.New("List etcd3", + trace := utiltrace.New(fmt.Sprintf("List(recursive=%v) etcd3", recursive), utiltrace.Field{"key", key}, utiltrace.Field{"resourceVersion", resourceVersion}, utiltrace.Field{"resourceVersionMatch", match}, @@ -628,14 +581,13 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, if err != nil || v.Kind() != reflect.Slice { return fmt.Errorf("need ptr to slice: %v", err) } + key = path.Join(s.pathPrefix, key) - if s.pathPrefix != "" { - key = path.Join(s.pathPrefix, key) - } - // We need to make sure the key ended with "/" so that we only get children "directories". - // e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three, - // while with prefix "/a/" will return only "/a/b" which is the correct answer. - if !strings.HasSuffix(key, "/") { + // For recursive lists, we need to make sure the key ended with "/" so that we only + // get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys + // with prefix "/a" will return all three, while with prefix "/a/" will return only + // "/a/b" which is the correct answer. + if recursive && !strings.HasSuffix(key, "/") { key += "/" } keyPrefix := key @@ -662,7 +614,7 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, var returnedRV, continueRV, withRev int64 var continueKey string switch { - case s.pagingEnabled && len(pred.Continue) > 0: + case recursive && s.pagingEnabled && len(pred.Continue) > 0: continueKey, continueRV, err = decodeContinue(pred.Continue, keyPrefix) if err != nil { return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err)) @@ -683,7 +635,7 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, withRev = continueRV returnedRV = continueRV } - case s.pagingEnabled && pred.Limit > 0: + case recursive && s.pagingEnabled && pred.Limit > 0: if fromRV != nil { switch match { case metav1.ResourceVersionMatchNotOlderThan: @@ -719,7 +671,9 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, } } - options = append(options, clientv3.WithPrefix()) + if recursive { + options = append(options, clientv3.WithPrefix()) + } } if withRev != 0 { options = append(options, clientv3.WithRev(withRev)) @@ -740,7 +694,11 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, for { startTime := time.Now() getResp, err = s.client.KV.Get(ctx, key, options...) - metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime) + if recursive { + metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime) + } else { + metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime) + } if err != nil { return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix) }