Migrate GetList to Kubernetes client

This commit is contained in:
Marek Siarkowicz 2024-05-31 14:36:22 +02:00
parent e192ac31a4
commit a16a364324

View File

@ -629,7 +629,7 @@ func (s *store) resolveGetListRev(continueKey string, continueRV int64, opts sto
// GetList implements storage.Interface. // GetList implements storage.Interface.
func (s *store) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { func (s *store) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
preparedKey, err := s.prepareKey(key) keyPrefix, err := s.prepareKey(key)
if err != nil { if err != nil {
return err return err
} }
@ -654,27 +654,13 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
// get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys // get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys
// with prefix "/a" will return all three, while with prefix "/a/" will return only // with prefix "/a" will return all three, while with prefix "/a/" will return only
// "/a/b" which is the correct answer. // "/a/b" which is the correct answer.
if opts.Recursive && !strings.HasSuffix(preparedKey, "/") { if opts.Recursive && !strings.HasSuffix(keyPrefix, "/") {
preparedKey += "/" keyPrefix += "/"
} }
keyPrefix := preparedKey
// set the appropriate clientv3 options to filter the returned data set // set the appropriate clientv3 options to filter the returned data set
var limitOption *clientv3.OpOption
limit := opts.Predicate.Limit limit := opts.Predicate.Limit
var paging bool paging := opts.Predicate.Limit > 0
options := make([]clientv3.OpOption, 0, 4)
if opts.Predicate.Limit > 0 {
paging = true
options = append(options, clientv3.WithLimit(limit))
limitOption = &options[len(options)-1]
}
if opts.Recursive {
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
options = append(options, clientv3.WithRange(rangeEnd))
}
newItemFunc := getNewItemFunc(listObj, v) newItemFunc := getNewItemFunc(listObj, v)
var continueRV, withRev int64 var continueRV, withRev int64
@ -684,20 +670,15 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
if err != nil { if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err)) return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
} }
preparedKey = continueKey
} }
if withRev, err = s.resolveGetListRev(continueKey, continueRV, opts); err != nil { if withRev, err = s.resolveGetListRev(continueKey, continueRV, opts); err != nil {
return err return err
} }
if withRev != 0 {
options = append(options, clientv3.WithRev(withRev))
}
// loop until we have filled the requested limit from etcd or there are no more results // loop until we have filled the requested limit from etcd or there are no more results
var lastKey []byte var lastKey []byte
var hasMore bool var hasMore bool
var getResp *clientv3.GetResponse var getResp kubernetes.ListResponse
var numFetched int var numFetched int
var numEvald int var numEvald int
// Because these metrics are for understanding the costs of handling LIST requests, // Because these metrics are for understanding the costs of handling LIST requests,
@ -714,24 +695,27 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
for { for {
startTime := time.Now() startTime := time.Now()
getResp, err = s.client.KV.Get(ctx, preparedKey, options...) getResp, err = s.getList(ctx, keyPrefix, opts.Recursive, kubernetes.ListOptions{
Revision: withRev,
Limit: limit,
Continue: continueKey,
})
metrics.RecordEtcdRequest(metricsOp, s.groupResourceString, err, startTime) metrics.RecordEtcdRequest(metricsOp, s.groupResourceString, err, startTime)
if err != nil { if err != nil {
return interpretListError(err, len(opts.Predicate.Continue) > 0, continueKey, keyPrefix) return interpretListError(err, len(opts.Predicate.Continue) > 0, continueKey, keyPrefix)
} }
numFetched += len(getResp.Kvs) numFetched += len(getResp.Kvs)
if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil { if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Revision)); err != nil {
return err return err
} }
hasMore = getResp.More hasMore = int64(len(getResp.Kvs)) < getResp.Count
if len(getResp.Kvs) == 0 && getResp.More { if len(getResp.Kvs) == 0 && hasMore {
return fmt.Errorf("no results were found, but etcd indicated there were more values remaining") return fmt.Errorf("no results were found, but etcd indicated there were more values remaining")
} }
// indicate to the client which resource version was returned, and use the same resource version for subsequent requests. // indicate to the client which resource version was returned, and use the same resource version for subsequent requests.
if withRev == 0 { if withRev == 0 {
withRev = getResp.Header.Revision withRev = getResp.Revision
options = append(options, clientv3.WithRev(withRev))
} }
// avoid small allocations for the result slice, since this can be called in many // avoid small allocations for the result slice, since this can be called in many
@ -779,6 +763,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
// free kv early. Long lists can take O(seconds) to decode. // free kv early. Long lists can take O(seconds) to decode.
getResp.Kvs[i] = nil getResp.Kvs[i] = nil
} }
continueKey = string(lastKey) + "\x00"
// no more results remain or we didn't request paging // no more results remain or we didn't request paging
if !hasMore || !paging { if !hasMore || !paging {
@ -796,9 +781,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
if limit > maxLimit { if limit > maxLimit {
limit = maxLimit limit = maxLimit
} }
*limitOption = clientv3.WithLimit(limit)
} }
preparedKey = string(lastKey) + "\x00"
} }
if v.IsNil() { if v.IsNil() {
@ -813,6 +796,26 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
return s.versioner.UpdateList(listObj, uint64(withRev), continueValue, remainingItemCount) return s.versioner.UpdateList(listObj, uint64(withRev), continueValue, remainingItemCount)
} }
func (s *store) getList(ctx context.Context, keyPrefix string, recursive bool, options kubernetes.ListOptions) (kubernetes.ListResponse, error) {
if recursive {
return s.client.Kubernetes.List(ctx, keyPrefix, options)
}
getResp, err := s.client.Kubernetes.Get(ctx, keyPrefix, kubernetes.GetOptions{
Revision: options.Revision,
})
var resp kubernetes.ListResponse
if getResp.KV != nil {
resp.Kvs = []*mvccpb.KeyValue{getResp.KV}
resp.Count = 1
resp.Revision = getResp.Revision
} else {
resp.Kvs = []*mvccpb.KeyValue{}
resp.Count = 0
resp.Revision = getResp.Revision
}
return resp, err
}
// growSlice takes a slice value and grows its capacity up // growSlice takes a slice value and grows its capacity up
// to the maximum of the passed sizes or maxCapacity, whichever // to the maximum of the passed sizes or maxCapacity, whichever
// is smaller. Above maxCapacity decisions about allocation are left // is smaller. Above maxCapacity decisions about allocation are left