Merge pull request #120265 from serathius/refactor-store

Refactor store
This commit is contained in:
Kubernetes Prow Robot 2023-08-31 02:48:47 -07:00 committed by GitHub
commit 8daecd125e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -607,17 +607,13 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
if err != nil {
return err
}
recursive := opts.Recursive
resourceVersion := opts.ResourceVersion
match := opts.ResourceVersionMatch
pred := opts.Predicate
ctx, span := tracing.Start(ctx, fmt.Sprintf("List(recursive=%v) etcd3", recursive),
ctx, span := tracing.Start(ctx, fmt.Sprintf("List(recursive=%v) etcd3", opts.Recursive),
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
attribute.String("key", key),
attribute.String("resourceVersion", resourceVersion),
attribute.String("resourceVersionMatch", string(match)),
attribute.Int("limit", int(pred.Limit)),
attribute.String("continue", pred.Continue))
attribute.String("resourceVersion", opts.ResourceVersion),
attribute.String("resourceVersionMatch", string(opts.ResourceVersionMatch)),
attribute.Int("limit", int(opts.Predicate.Limit)),
attribute.String("continue", opts.Predicate.Continue))
defer span.End(500 * time.Millisecond)
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
@ -632,17 +628,17 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
// get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys
// with prefix "/a" will return all three, while with prefix "/a/" will return only
// "/a/b" which is the correct answer.
if recursive && !strings.HasSuffix(preparedKey, "/") {
if opts.Recursive && !strings.HasSuffix(preparedKey, "/") {
preparedKey += "/"
}
keyPrefix := preparedKey
// set the appropriate clientv3 options to filter the returned data set
var limitOption *clientv3.OpOption
limit := pred.Limit
limit := opts.Predicate.Limit
var paging bool
options := make([]clientv3.OpOption, 0, 4)
if s.pagingEnabled && pred.Limit > 0 {
if s.pagingEnabled && opts.Predicate.Limit > 0 {
paging = true
options = append(options, clientv3.WithLimit(limit))
limitOption = &options[len(options)-1]
@ -651,8 +647,8 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
newItemFunc := getNewItemFunc(listObj, v)
var fromRV *uint64
if len(resourceVersion) > 0 {
parsedRV, err := s.versioner.ParseResourceVersion(resourceVersion)
if len(opts.ResourceVersion) > 0 {
parsedRV, err := s.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
@ -662,62 +658,43 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
var continueRV, withRev int64
var continueKey string
switch {
case recursive && s.pagingEnabled && len(pred.Continue) > 0:
continueKey, continueRV, err = storage.DecodeContinue(pred.Continue, keyPrefix)
case opts.Recursive && s.pagingEnabled && len(opts.Predicate.Continue) > 0:
continueKey, continueRV, err = storage.DecodeContinue(opts.Predicate.Continue, keyPrefix)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
}
if len(resourceVersion) > 0 && resourceVersion != "0" {
if len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
return apierrors.NewBadRequest("specifying resource version is not allowed when using continue")
}
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
options = append(options, clientv3.WithRange(rangeEnd))
preparedKey = continueKey
// If continueRV > 0, the LIST request needs a specific resource version.
// continueRV==0 is invalid.
// If continueRV < 0, the request is for the latest resource version.
if continueRV > 0 {
withRev = continueRV
}
case recursive && s.pagingEnabled && pred.Limit > 0:
default:
if fromRV != nil {
switch match {
switch opts.ResourceVersionMatch {
case metav1.ResourceVersionMatchNotOlderThan:
// The not older than constraint is checked after we get a response from etcd,
// and returnedRV is then set to the revision we get from the etcd response.
case metav1.ResourceVersionMatchExact:
withRev = int64(*fromRV)
case "": // legacy case
if *fromRV > 0 {
if opts.Recursive && s.pagingEnabled && opts.Predicate.Limit > 0 && *fromRV > 0 {
withRev = int64(*fromRV)
}
default:
return fmt.Errorf("unknown ResourceVersionMatch value: %v", match)
return fmt.Errorf("unknown ResourceVersionMatch value: %v", opts.ResourceVersionMatch)
}
}
}
if opts.Recursive {
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
options = append(options, clientv3.WithRange(rangeEnd))
default:
if fromRV != nil {
switch match {
case metav1.ResourceVersionMatchNotOlderThan:
// The not older than constraint is checked after we get a response from etcd,
// and returnedRV is then set to the revision we get from the etcd response.
case metav1.ResourceVersionMatchExact:
withRev = int64(*fromRV)
case "": // legacy case
default:
return fmt.Errorf("unknown ResourceVersionMatch value: %v", match)
}
}
if recursive {
options = append(options, clientv3.WithPrefix())
}
}
if withRev != 0 {
options = append(options, clientv3.WithRev(withRev))
@ -737,7 +714,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
}()
metricsOp := "get"
if recursive {
if opts.Recursive {
metricsOp = "list"
}
@ -746,10 +723,10 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
getResp, err = s.client.KV.Get(ctx, preparedKey, options...)
metrics.RecordEtcdRequest(metricsOp, s.groupResourceString, err, startTime)
if err != nil {
return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix)
return interpretListError(err, len(opts.Predicate.Continue) > 0, continueKey, keyPrefix)
}
numFetched += len(getResp.Kvs)
if err = s.validateMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil {
return err
}
hasMore = getResp.More
@ -760,7 +737,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
// avoid small allocations for the result slice, since this can be called in many
// different contexts and we don't know how significantly the result will be filtered
if pred.Empty() {
if opts.Predicate.Empty() {
growSlice(v, len(getResp.Kvs))
} else {
growSlice(v, 2048, len(getResp.Kvs))
@ -768,7 +745,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
// take items from the response until the bucket is full, filtering as we go
for i, kv := range getResp.Kvs {
if paging && int64(v.Len()) >= pred.Limit {
if paging && int64(v.Len()) >= opts.Predicate.Limit {
hasMore = true
break
}
@ -779,7 +756,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
return storage.NewInternalErrorf("unable to transform key %q: %v", kv.Key, err)
}
if err := appendListItem(v, data, uint64(kv.ModRevision), pred, s.codec, s.versioner, newItemFunc); err != nil {
if err := appendListItem(v, data, uint64(kv.ModRevision), opts.Predicate, s.codec, s.versioner, newItemFunc); err != nil {
recordDecodeError(s.groupResourceString, string(kv.Key))
return err
}
@ -794,7 +771,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
break
}
// we're paging but we have filled our bucket
if int64(v.Len()) >= pred.Limit {
if int64(v.Len()) >= opts.Predicate.Limit {
break
}
@ -835,8 +812,8 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
// getResp.Count counts in objects that do not match the pred.
// Instead of returning inaccurate count for non-empty selectors, we return nil.
// Only set remainingItemCount if the predicate is empty.
if pred.Empty() {
c := int64(getResp.Count - pred.Limit)
if opts.Predicate.Empty() {
c := int64(getResp.Count - opts.Predicate.Limit)
remainingItemCount = &c
}
return s.versioner.UpdateList(listObj, uint64(withRev), next, remainingItemCount)