mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
Merge pull request #106542 from wojtek-t/cleanup_storage_list_interface
Unify code for List and GetToList for etcd and cacher
This commit is contained in:
commit
3832299fd2
@ -606,75 +606,40 @@ func shouldDelegateList(opts storage.ListOptions) bool {
|
||||
|
||||
// GetToList implements storage.Interface.
|
||||
func (c *Cacher) GetToList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||
resourceVersion := opts.ResourceVersion
|
||||
pred := opts.Predicate
|
||||
if shouldDelegateList(opts) {
|
||||
return c.storage.GetToList(ctx, key, opts, listObj)
|
||||
}
|
||||
|
||||
// If resourceVersion is specified, serve it from cache.
|
||||
// It's guaranteed that the returned value is at least that
|
||||
// fresh as the given resourceVersion.
|
||||
listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if listRV == 0 && !c.ready.check() {
|
||||
// If Cacher is not yet initialized and we don't require any specific
|
||||
// minimal resource version, simply forward the request to storage.
|
||||
return c.storage.GetToList(ctx, key, opts, listObj)
|
||||
}
|
||||
|
||||
trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
|
||||
defer trace.LogIfLong(500 * time.Millisecond)
|
||||
|
||||
c.ready.wait()
|
||||
trace.Step("Ready")
|
||||
|
||||
// List elements with at least 'listRV' from cache.
|
||||
listPtr, err := meta.GetItemsPtr(listObj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
listVal, err := conversion.EnforcePtr(listPtr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if listVal.Kind() != reflect.Slice {
|
||||
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
|
||||
}
|
||||
filter := filterWithAttrsFunction(key, pred)
|
||||
|
||||
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
trace.Step("Got from cache")
|
||||
|
||||
if exists {
|
||||
elem, ok := obj.(*storeElement)
|
||||
if !ok {
|
||||
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
|
||||
}
|
||||
if filter(elem.Key, elem.Labels, elem.Fields) {
|
||||
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
|
||||
}
|
||||
}
|
||||
if c.versioner != nil {
|
||||
if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return c.list(ctx, key, opts, listObj, false)
|
||||
}
|
||||
|
||||
// List implements storage.Interface.
|
||||
func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||
return c.list(ctx, key, opts, listObj, true)
|
||||
}
|
||||
|
||||
func (c *Cacher) delegateList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, recursive bool) error {
|
||||
if !recursive {
|
||||
return c.storage.GetToList(ctx, key, opts, listObj)
|
||||
}
|
||||
return c.storage.List(ctx, key, opts, listObj)
|
||||
}
|
||||
|
||||
func (c *Cacher) listItems(listRV uint64, key string, pred storage.SelectionPredicate, trace *utiltrace.Trace, recursive bool) ([]interface{}, uint64, string, error) {
|
||||
if !recursive {
|
||||
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace)
|
||||
if err != nil {
|
||||
return nil, 0, "", err
|
||||
}
|
||||
if exists {
|
||||
return []interface{}{obj}, readResourceVersion, "", nil
|
||||
}
|
||||
return nil, readResourceVersion, "", nil
|
||||
}
|
||||
return c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace)
|
||||
}
|
||||
|
||||
func (c *Cacher) list(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, recursive bool) error {
|
||||
resourceVersion := opts.ResourceVersion
|
||||
pred := opts.Predicate
|
||||
if shouldDelegateList(opts) {
|
||||
return c.storage.List(ctx, key, opts, listObj)
|
||||
return c.delegateList(ctx, key, opts, listObj, recursive)
|
||||
}
|
||||
|
||||
// If resourceVersion is specified, serve it from cache.
|
||||
@ -688,7 +653,7 @@ func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions,
|
||||
if listRV == 0 && !c.ready.check() {
|
||||
// If Cacher is not yet initialized and we don't require any specific
|
||||
// minimal resource version, simply forward the request to storage.
|
||||
return c.storage.List(ctx, key, opts, listObj)
|
||||
return c.delegateList(ctx, key, opts, listObj, recursive)
|
||||
}
|
||||
|
||||
trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
|
||||
@ -711,7 +676,7 @@ func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions,
|
||||
}
|
||||
filter := filterWithAttrsFunction(key, pred)
|
||||
|
||||
objs, readResourceVersion, indexUsed, err := c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace)
|
||||
objs, readResourceVersion, indexUsed, err := c.listItems(listRV, key, pred, trace, recursive)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -458,58 +458,7 @@ func (s *store) GuaranteedUpdate(
|
||||
|
||||
// GetToList implements storage.Interface.GetToList.
|
||||
func (s *store) GetToList(ctx context.Context, key string, listOpts storage.ListOptions, listObj runtime.Object) error {
|
||||
resourceVersion := listOpts.ResourceVersion
|
||||
match := listOpts.ResourceVersionMatch
|
||||
pred := listOpts.Predicate
|
||||
trace := utiltrace.New("GetToList etcd3",
|
||||
utiltrace.Field{"key", key},
|
||||
utiltrace.Field{"resourceVersion", resourceVersion},
|
||||
utiltrace.Field{"resourceVersionMatch", match},
|
||||
utiltrace.Field{"limit", pred.Limit},
|
||||
utiltrace.Field{"continue", pred.Continue})
|
||||
defer trace.LogIfLong(500 * time.Millisecond)
|
||||
listPtr, err := meta.GetItemsPtr(listObj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
v, err := conversion.EnforcePtr(listPtr)
|
||||
if err != nil || v.Kind() != reflect.Slice {
|
||||
return fmt.Errorf("need ptr to slice: %v", err)
|
||||
}
|
||||
|
||||
newItemFunc := getNewItemFunc(listObj, v)
|
||||
|
||||
key = path.Join(s.pathPrefix, key)
|
||||
startTime := time.Now()
|
||||
var opts []clientv3.OpOption
|
||||
if len(resourceVersion) > 0 && match == metav1.ResourceVersionMatchExact {
|
||||
rv, err := s.versioner.ParseResourceVersion(resourceVersion)
|
||||
if err != nil {
|
||||
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
|
||||
}
|
||||
opts = append(opts, clientv3.WithRev(int64(rv)))
|
||||
}
|
||||
|
||||
getResp, err := s.client.KV.Get(ctx, key, opts...)
|
||||
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = s.validateMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(getResp.Kvs) > 0 {
|
||||
data, _, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value, authenticatedDataString(key))
|
||||
if err != nil {
|
||||
return storage.NewInternalError(err.Error())
|
||||
}
|
||||
if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), pred, s.codec, s.versioner, newItemFunc); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// update version with cluster level revision
|
||||
return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision), "", nil)
|
||||
return s.list(ctx, key, listOpts, listObj, false)
|
||||
}
|
||||
|
||||
func getNewItemFunc(listObj runtime.Object, v reflect.Value) func() runtime.Object {
|
||||
@ -610,10 +559,14 @@ func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error
|
||||
|
||||
// List implements storage.Interface.List.
|
||||
func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||
return s.list(ctx, key, opts, listObj, true)
|
||||
}
|
||||
|
||||
func (s *store) list(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, recursive bool) error {
|
||||
resourceVersion := opts.ResourceVersion
|
||||
match := opts.ResourceVersionMatch
|
||||
pred := opts.Predicate
|
||||
trace := utiltrace.New("List etcd3",
|
||||
trace := utiltrace.New(fmt.Sprintf("List(recursive=%v) etcd3", recursive),
|
||||
utiltrace.Field{"key", key},
|
||||
utiltrace.Field{"resourceVersion", resourceVersion},
|
||||
utiltrace.Field{"resourceVersionMatch", match},
|
||||
@ -628,14 +581,13 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
|
||||
if err != nil || v.Kind() != reflect.Slice {
|
||||
return fmt.Errorf("need ptr to slice: %v", err)
|
||||
}
|
||||
key = path.Join(s.pathPrefix, key)
|
||||
|
||||
if s.pathPrefix != "" {
|
||||
key = path.Join(s.pathPrefix, key)
|
||||
}
|
||||
// We need to make sure the key ended with "/" so that we only get children "directories".
|
||||
// e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three,
|
||||
// while with prefix "/a/" will return only "/a/b" which is the correct answer.
|
||||
if !strings.HasSuffix(key, "/") {
|
||||
// For recursive lists, we need to make sure the key ended with "/" so that we only
|
||||
// get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys
|
||||
// with prefix "/a" will return all three, while with prefix "/a/" will return only
|
||||
// "/a/b" which is the correct answer.
|
||||
if recursive && !strings.HasSuffix(key, "/") {
|
||||
key += "/"
|
||||
}
|
||||
keyPrefix := key
|
||||
@ -662,7 +614,7 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
|
||||
var returnedRV, continueRV, withRev int64
|
||||
var continueKey string
|
||||
switch {
|
||||
case s.pagingEnabled && len(pred.Continue) > 0:
|
||||
case recursive && s.pagingEnabled && len(pred.Continue) > 0:
|
||||
continueKey, continueRV, err = decodeContinue(pred.Continue, keyPrefix)
|
||||
if err != nil {
|
||||
return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
|
||||
@ -683,7 +635,7 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
|
||||
withRev = continueRV
|
||||
returnedRV = continueRV
|
||||
}
|
||||
case s.pagingEnabled && pred.Limit > 0:
|
||||
case recursive && s.pagingEnabled && pred.Limit > 0:
|
||||
if fromRV != nil {
|
||||
switch match {
|
||||
case metav1.ResourceVersionMatchNotOlderThan:
|
||||
@ -719,7 +671,9 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
|
||||
}
|
||||
}
|
||||
|
||||
options = append(options, clientv3.WithPrefix())
|
||||
if recursive {
|
||||
options = append(options, clientv3.WithPrefix())
|
||||
}
|
||||
}
|
||||
if withRev != 0 {
|
||||
options = append(options, clientv3.WithRev(withRev))
|
||||
@ -740,7 +694,11 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
|
||||
for {
|
||||
startTime := time.Now()
|
||||
getResp, err = s.client.KV.Get(ctx, key, options...)
|
||||
metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
|
||||
if recursive {
|
||||
metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
|
||||
} else {
|
||||
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
|
||||
}
|
||||
if err != nil {
|
||||
return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user