storage: move continue token definition to storage

The means by which we encode and decode the continue token during a
paginated LIST call is not specific to etcd3. In order to allow for a
generic suite of tests against any storage.Interface implementation, we
need this logic to live outside of the etcd3 package, or import cycles
will exist.

Signed-off-by: Steve Kuznetsov <skuznets@redhat.com>
This commit is contained in:
Steve Kuznetsov 2022-05-11 07:52:02 -07:00
parent c50579afb1
commit eb3aa5be10
No known key found for this signature in database
GPG Key ID: 8821C29EC988D9B4
6 changed files with 175 additions and 120 deletions

View File

@ -0,0 +1,85 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"encoding/base64"
"encoding/json"
"fmt"
"path"
"strings"
)
// continueToken is a simple structured object for encoding the state of a continue token.
// TODO: if we change the version of the encoded from, we can't start encoding the new version
// until all other servers are upgraded (i.e. we need to support rolling schema)
// This is a public API struct and cannot change.
type continueToken struct {
APIVersion string `json:"v"`
ResourceVersion int64 `json:"rv"`
StartKey string `json:"start"`
}
// DecodeContinue transforms an encoded predicate from into a versioned struct.
// TODO: return a typed error that instructs clients that they must relist
func DecodeContinue(continueValue, keyPrefix string) (fromKey string, rv int64, err error) {
data, err := base64.RawURLEncoding.DecodeString(continueValue)
if err != nil {
return "", 0, fmt.Errorf("continue key is not valid: %v", err)
}
var c continueToken
if err := json.Unmarshal(data, &c); err != nil {
return "", 0, fmt.Errorf("continue key is not valid: %v", err)
}
switch c.APIVersion {
case "meta.k8s.io/v1":
if c.ResourceVersion == 0 {
return "", 0, fmt.Errorf("continue key is not valid: incorrect encoded start resourceVersion (version meta.k8s.io/v1)")
}
if len(c.StartKey) == 0 {
return "", 0, fmt.Errorf("continue key is not valid: encoded start key empty (version meta.k8s.io/v1)")
}
// defend against path traversal attacks by clients - path.Clean will ensure that startKey cannot
// be at a higher level of the hierarchy, and so when we append the key prefix we will end up with
// continue start key that is fully qualified and cannot range over anything less specific than
// keyPrefix.
key := c.StartKey
if !strings.HasPrefix(key, "/") {
key = "/" + key
}
cleaned := path.Clean(key)
if cleaned != key {
return "", 0, fmt.Errorf("continue key is not valid: %s", c.StartKey)
}
return keyPrefix + cleaned[1:], c.ResourceVersion, nil
default:
return "", 0, fmt.Errorf("continue key is not valid: server does not recognize this encoded version %q", c.APIVersion)
}
}
// EncodeContinue returns a string representing the encoded continuation of the current query.
func EncodeContinue(key, keyPrefix string, resourceVersion int64) (string, error) {
nextKey := strings.TrimPrefix(key, keyPrefix)
if nextKey == key {
return "", fmt.Errorf("unable to encode next field: the key and key prefix do not match")
}
out, err := json.Marshal(&continueToken{APIVersion: "meta.k8s.io/v1", ResourceVersion: resourceVersion, StartKey: nextKey})
if err != nil {
return "", err
}
return base64.RawURLEncoding.EncodeToString(out), nil
}

View File

@ -0,0 +1,71 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"encoding/base64"
"encoding/json"
"testing"
)
func encodeContinueOrDie(apiVersion string, resourceVersion int64, nextKey string) string {
out, err := json.Marshal(&continueToken{APIVersion: apiVersion, ResourceVersion: resourceVersion, StartKey: nextKey})
if err != nil {
panic(err)
}
return base64.RawURLEncoding.EncodeToString(out)
}
func Test_decodeContinue(t *testing.T) {
type args struct {
continueValue string
keyPrefix string
}
tests := []struct {
name string
args args
wantFromKey string
wantRv int64
wantErr bool
}{
{name: "valid", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "key"), keyPrefix: "/test/"}, wantRv: 1, wantFromKey: "/test/key"},
{name: "root path", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "/"), keyPrefix: "/test/"}, wantRv: 1, wantFromKey: "/test/"},
{name: "empty version", args: args{continueValue: encodeContinueOrDie("", 1, "key"), keyPrefix: "/test/"}, wantErr: true},
{name: "invalid version", args: args{continueValue: encodeContinueOrDie("v1", 1, "key"), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "../key"), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - local", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "./key"), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - double parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "./../key"), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - after parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "key/../.."), keyPrefix: "/test/"}, wantErr: true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotFromKey, gotRv, err := DecodeContinue(tt.args.continueValue, tt.args.keyPrefix)
if (err != nil) != tt.wantErr {
t.Errorf("decodeContinue() error = %v, wantErr %v", err, tt.wantErr)
return
}
if gotFromKey != tt.wantFromKey {
t.Errorf("decodeContinue() gotFromKey = %v, want %v", gotFromKey, tt.wantFromKey)
}
if gotRv != tt.wantRv {
t.Errorf("decodeContinue() gotRv = %v, want %v", gotRv, tt.wantRv)
}
})
}
}

View File

@ -18,6 +18,7 @@ package etcd3
import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apiserver/pkg/storage"
etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -60,7 +61,7 @@ func handleCompactedErrorForPaging(continueKey, keyPrefix string) error {
// continueToken.ResoureVersion=-1 means that the apiserver can
// continue the list at the latest resource version. We don't use rv=0
// for this purpose to distinguish from a bad token that has empty rv.
newToken, err := encodeContinue(continueKey, keyPrefix, -1)
newToken, err := storage.EncodeContinue(continueKey, keyPrefix, -1)
if err != nil {
utilruntime.HandleError(err)
return errors.NewResourceExpired(continueExpired)

View File

@ -19,8 +19,6 @@ package etcd3
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"path"
@ -519,66 +517,6 @@ func (s *store) Count(key string) (int64, error) {
return getResp.Count, nil
}
// continueToken is a simple structured object for encoding the state of a continue token.
// TODO: if we change the version of the encoded from, we can't start encoding the new version
// until all other servers are upgraded (i.e. we need to support rolling schema)
// This is a public API struct and cannot change.
type continueToken struct {
APIVersion string `json:"v"`
ResourceVersion int64 `json:"rv"`
StartKey string `json:"start"`
}
// parseFrom transforms an encoded predicate from into a versioned struct.
// TODO: return a typed error that instructs clients that they must relist
func decodeContinue(continueValue, keyPrefix string) (fromKey string, rv int64, err error) {
data, err := base64.RawURLEncoding.DecodeString(continueValue)
if err != nil {
return "", 0, fmt.Errorf("continue key is not valid: %v", err)
}
var c continueToken
if err := json.Unmarshal(data, &c); err != nil {
return "", 0, fmt.Errorf("continue key is not valid: %v", err)
}
switch c.APIVersion {
case "meta.k8s.io/v1":
if c.ResourceVersion == 0 {
return "", 0, fmt.Errorf("continue key is not valid: incorrect encoded start resourceVersion (version meta.k8s.io/v1)")
}
if len(c.StartKey) == 0 {
return "", 0, fmt.Errorf("continue key is not valid: encoded start key empty (version meta.k8s.io/v1)")
}
// defend against path traversal attacks by clients - path.Clean will ensure that startKey cannot
// be at a higher level of the hierarchy, and so when we append the key prefix we will end up with
// continue start key that is fully qualified and cannot range over anything less specific than
// keyPrefix.
key := c.StartKey
if !strings.HasPrefix(key, "/") {
key = "/" + key
}
cleaned := path.Clean(key)
if cleaned != key {
return "", 0, fmt.Errorf("continue key is not valid: %s", c.StartKey)
}
return keyPrefix + cleaned[1:], c.ResourceVersion, nil
default:
return "", 0, fmt.Errorf("continue key is not valid: server does not recognize this encoded version %q", c.APIVersion)
}
}
// encodeContinue returns a string representing the encoded continuation of the current query.
func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error) {
nextKey := strings.TrimPrefix(key, keyPrefix)
if nextKey == key {
return "", fmt.Errorf("unable to encode next field: the key and key prefix do not match")
}
out, err := json.Marshal(&continueToken{APIVersion: "meta.k8s.io/v1", ResourceVersion: resourceVersion, StartKey: nextKey})
if err != nil {
return "", err
}
return base64.RawURLEncoding.EncodeToString(out), nil
}
// GetList implements storage.Interface.
func (s *store) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
recursive := opts.Recursive
@ -637,7 +575,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
var continueKey string
switch {
case recursive && s.pagingEnabled && len(pred.Continue) > 0:
continueKey, continueRV, err = decodeContinue(pred.Continue, keyPrefix)
continueKey, continueRV, err = storage.DecodeContinue(pred.Continue, keyPrefix)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
}
@ -798,7 +736,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
// we never return a key that the client wouldn't be allowed to see
if hasMore {
// we want to start immediately after the last key
next, err := encodeContinue(string(lastKey)+"\x00", keyPrefix, returnedRV)
next, err := storage.EncodeContinue(string(lastKey)+"\x00", keyPrefix, returnedRV)
if err != nil {
return err
}

View File

@ -19,8 +19,6 @@ package etcd3
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"math"
@ -597,7 +595,7 @@ func TestList(t *testing.T) {
t.Errorf("Unexpected error: %v", err)
}
continueRV, _ := strconv.Atoi(list.ResourceVersion)
secondContinuation, err := encodeContinue("/two-level/2", "/two-level/", int64(continueRV))
secondContinuation, err := storage.EncodeContinue("/two-level/2", "/two-level/", int64(continueRV))
if err != nil {
t.Fatal(err)
}
@ -927,7 +925,7 @@ func TestList(t *testing.T) {
Field: fields.OneTermEqualSelector("metadata.name", "bar"),
Label: labels.Everything(),
Limit: 2,
Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3"),
Continue: storagetesting.EncodeContinueOrDie("z-level/3", int64(continueRV)),
},
expectedOut: []*example.Pod{preset[4].storedObj},
},
@ -938,7 +936,7 @@ func TestList(t *testing.T) {
Field: fields.OneTermEqualSelector("metadata.name", "bar"),
Label: labels.Everything(),
Limit: 1,
Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3/test-2"),
Continue: storagetesting.EncodeContinueOrDie("z-level/3/test-2", int64(continueRV)),
},
expectedOut: []*example.Pod{preset[4].storedObj},
},
@ -949,7 +947,7 @@ func TestList(t *testing.T) {
Field: fields.OneTermEqualSelector("metadata.name", "bar"),
Label: labels.Everything(),
Limit: 2,
Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3/test-2"),
Continue: storagetesting.EncodeContinueOrDie("z-level/3/test-2", int64(continueRV)),
},
expectedOut: []*example.Pod{preset[4].storedObj},
},
@ -1134,7 +1132,7 @@ func TestListContinuation(t *testing.T) {
if len(out.Continue) != 0 {
t.Fatalf("Unexpected continuation token set")
}
key, rv, err := decodeContinue(continueFromSecondItem, "/")
key, rv, err := storage.DecodeContinue(continueFromSecondItem, "/")
t.Logf("continue token was %d %s %v", rv, key, err)
storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj, *preset[2].storedObj}, out.Items)
if transformer.reads != 2 {
@ -1654,54 +1652,6 @@ func TestPrefix(t *testing.T) {
}
}
func encodeContinueOrDie(apiVersion string, resourceVersion int64, nextKey string) string {
out, err := json.Marshal(&continueToken{APIVersion: apiVersion, ResourceVersion: resourceVersion, StartKey: nextKey})
if err != nil {
panic(err)
}
return base64.RawURLEncoding.EncodeToString(out)
}
func Test_decodeContinue(t *testing.T) {
type args struct {
continueValue string
keyPrefix string
}
tests := []struct {
name string
args args
wantFromKey string
wantRv int64
wantErr bool
}{
{name: "valid", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "key"), keyPrefix: "/test/"}, wantRv: 1, wantFromKey: "/test/key"},
{name: "root path", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "/"), keyPrefix: "/test/"}, wantRv: 1, wantFromKey: "/test/"},
{name: "empty version", args: args{continueValue: encodeContinueOrDie("", 1, "key"), keyPrefix: "/test/"}, wantErr: true},
{name: "invalid version", args: args{continueValue: encodeContinueOrDie("v1", 1, "key"), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "../key"), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - local", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "./key"), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - double parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "./../key"), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - after parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "key/../.."), keyPrefix: "/test/"}, wantErr: true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotFromKey, gotRv, err := decodeContinue(tt.args.continueValue, tt.args.keyPrefix)
if (err != nil) != tt.wantErr {
t.Errorf("decodeContinue() error = %v, wantErr %v", err, tt.wantErr)
return
}
if gotFromKey != tt.wantFromKey {
t.Errorf("decodeContinue() gotFromKey = %v, want %v", gotFromKey, tt.wantFromKey)
}
if gotRv != tt.wantRv {
t.Errorf("decodeContinue() gotRv = %v, want %v", gotRv, tt.wantRv)
}
})
}
}
func Test_growSlice(t *testing.T) {
type args struct {
initialCapacity int

View File

@ -769,3 +769,13 @@ func ExpectNoDiff(t *testing.T, msg string, expected, got interface{}) {
}
}
}
const dummyPrefix = "adapter"
func EncodeContinueOrDie(key string, resourceVersion int64) string {
token, err := storage.EncodeContinue(dummyPrefix+key, dummyPrefix, resourceVersion)
if err != nil {
panic(err)
}
return token
}