Merge pull request #121625 from siyuanfoundation/refactor2

k8s.io/apiserver/storage/etcd: refactor etcd GetList.
This commit is contained in:
Kubernetes Prow Robot 2023-12-13 22:34:24 +01:00 committed by GitHub
commit 09c5396607
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 168 additions and 30 deletions

View File

@ -584,6 +584,47 @@ func (s *store) Count(key string) (int64, error) {
return getResp.Count, nil
}
// resolveGetListRev is used by GetList to resolve the rev to use in the client.KV.Get request.
func (s *store) resolveGetListRev(continueKey string, continueRV int64, opts storage.ListOptions) (int64, error) {
var withRev int64
// Uses continueRV if this is a continuation request.
if len(continueKey) > 0 {
if len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
return withRev, apierrors.NewBadRequest("specifying resource version is not allowed when using continue")
}
// If continueRV > 0, the LIST request needs a specific resource version.
// continueRV==0 is invalid.
// If continueRV < 0, the request is for the latest resource version.
if continueRV > 0 {
withRev = continueRV
}
return withRev, nil
}
// Returns 0 if ResourceVersion is not specified.
if len(opts.ResourceVersion) == 0 {
return withRev, nil
}
parsedRV, err := s.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return withRev, apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
switch opts.ResourceVersionMatch {
case metav1.ResourceVersionMatchNotOlderThan:
// The not older than constraint is checked after we get a response from etcd,
// and returnedRV is then set to the revision we get from the etcd response.
case metav1.ResourceVersionMatchExact:
withRev = int64(parsedRV)
case "": // legacy case
if opts.Recursive && opts.Predicate.Limit > 0 && parsedRV > 0 {
withRev = int64(parsedRV)
}
default:
return withRev, fmt.Errorf("unknown ResourceVersionMatch value: %v", opts.ResourceVersionMatch)
}
return withRev, nil
}
// GetList implements storage.Interface.
func (s *store) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
preparedKey, err := s.prepareKey(key)
@ -636,41 +677,15 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
var continueRV, withRev int64
var continueKey string
switch {
case opts.Recursive && len(opts.Predicate.Continue) > 0:
if opts.Recursive && len(opts.Predicate.Continue) > 0 {
continueKey, continueRV, err = storage.DecodeContinue(opts.Predicate.Continue, keyPrefix)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
}
if len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
return apierrors.NewBadRequest("specifying resource version is not allowed when using continue")
}
preparedKey = continueKey
// If continueRV > 0, the LIST request needs a specific resource version.
// continueRV==0 is invalid.
// If continueRV < 0, the request is for the latest resource version.
if continueRV > 0 {
withRev = continueRV
}
case len(opts.ResourceVersion) > 0:
parsedRV, err := s.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
switch opts.ResourceVersionMatch {
case metav1.ResourceVersionMatchNotOlderThan:
// The not older than constraint is checked after we get a response from etcd,
// and returnedRV is then set to the revision we get from the etcd response.
case metav1.ResourceVersionMatchExact:
withRev = int64(parsedRV)
case "": // legacy case
if opts.Recursive && opts.Predicate.Limit > 0 && parsedRV > 0 {
withRev = int64(parsedRV)
}
default:
return fmt.Errorf("unknown ResourceVersionMatch value: %v", opts.ResourceVersionMatch)
}
}
if withRev, err = s.resolveGetListRev(continueKey, continueRV, opts); err != nil {
return err
}
if withRev != 0 {

View File

@ -639,6 +639,129 @@ func TestInvalidKeys(t *testing.T) {
expectInvalidKey("Count", countErr)
}
func TestResolveGetListRev(t *testing.T) {
_, store, _ := testSetup(t)
testCases := []struct {
name string
continueKey string
continueRV int64
rv string
rvMatch metav1.ResourceVersionMatch
recursive bool
expectedError string
limit int64
expectedRev int64
}{
{
name: "specifying resource versionwhen using continue",
continueKey: "continue",
continueRV: 100,
rv: "200",
expectedError: "specifying resource version is not allowed when using continue",
},
{
name: "invalid resource version",
rv: "invalid",
expectedError: "invalid resource version",
},
{
name: "unknown ResourceVersionMatch value",
rv: "200",
rvMatch: "unknown",
expectedError: "unknown ResourceVersionMatch value",
},
{
name: "use continueRV",
continueKey: "continue",
continueRV: 100,
rv: "0",
expectedRev: 100,
},
{
name: "use continueRV with empty rv",
continueKey: "continue",
continueRV: 100,
rv: "",
expectedRev: 100,
},
{
name: "continueRV = 0",
continueKey: "continue",
continueRV: 0,
rv: "",
expectedRev: 0,
},
{
name: "continueRV < 0",
continueKey: "continue",
continueRV: -1,
rv: "",
expectedRev: 0,
},
{
name: "default",
expectedRev: 0,
},
{
name: "rev resolve to 0 if ResourceVersionMatchNotOlderThan",
rv: "200",
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
expectedRev: 0,
},
{
name: "specified rev if ResourceVersionMatchExact",
rv: "200",
rvMatch: metav1.ResourceVersionMatchExact,
expectedRev: 200,
},
{
name: "rev resolve to 0 if not recursive",
rv: "200",
limit: 1,
expectedRev: 0,
},
{
name: "rev resolve to 0 if limit unspecified",
rv: "200",
recursive: true,
expectedRev: 0,
},
{
name: "specified rev if recursive with limit",
rv: "200",
recursive: true,
limit: 1,
expectedRev: 200,
},
}
for _, tt := range testCases {
tt := tt
t.Run(tt.name, func(t *testing.T) {
storageOpts := storage.ListOptions{
ResourceVersion: tt.rv,
ResourceVersionMatch: tt.rvMatch,
Predicate: storage.SelectionPredicate{
Limit: tt.limit,
},
Recursive: tt.recursive,
}
rev, err := store.resolveGetListRev(tt.continueKey, tt.continueRV, storageOpts)
if len(tt.expectedError) > 0 {
if err == nil || !strings.Contains(err.Error(), tt.expectedError) {
t.Fatalf("expected error: %s, but got: %v", tt.expectedError, err)
}
return
}
if err != nil {
t.Fatalf("resolveRevForGetList failed: %v", err)
}
if rev != tt.expectedRev {
t.Errorf("%s: expecting rev = %d, but get %d", tt.name, tt.expectedRev, rev)
}
})
}
}
func BenchmarkStore_GetList(b *testing.B) {
generateBigPod := func(index int, total int, expect int) runtime.Object {
l := map[string]string{}