From eb3aa5be10393968d8083c79f5958501fc029e8d Mon Sep 17 00:00:00 2001 From: Steve Kuznetsov Date: Wed, 11 May 2022 07:52:02 -0700 Subject: [PATCH 1/2] storage: move continue token definition to storage The means by which we encode and decode the continue token during a paginated LIST call is not specific to etcd3. In order to allow for a generic suite of tests against any storage.Interface implementation, we need this logic to live outside of the etcd3 package, or import cycles will exist. Signed-off-by: Steve Kuznetsov --- .../k8s.io/apiserver/pkg/storage/continue.go | 85 +++++++++++++++++++ .../apiserver/pkg/storage/continue_test.go | 71 ++++++++++++++++ .../apiserver/pkg/storage/etcd3/errors.go | 3 +- .../apiserver/pkg/storage/etcd3/store.go | 66 +------------- .../apiserver/pkg/storage/etcd3/store_test.go | 60 ++----------- .../pkg/storage/testing/store_tests.go | 10 +++ 6 files changed, 175 insertions(+), 120 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/storage/continue.go create mode 100644 staging/src/k8s.io/apiserver/pkg/storage/continue_test.go diff --git a/staging/src/k8s.io/apiserver/pkg/storage/continue.go b/staging/src/k8s.io/apiserver/pkg/storage/continue.go new file mode 100644 index 00000000000..ad3bb6b2da2 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/continue.go @@ -0,0 +1,85 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "path" + "strings" +) + +// continueToken is a simple structured object for encoding the state of a continue token. +// TODO: if we change the version of the encoded from, we can't start encoding the new version +// until all other servers are upgraded (i.e. we need to support rolling schema) +// This is a public API struct and cannot change. +type continueToken struct { + APIVersion string `json:"v"` + ResourceVersion int64 `json:"rv"` + StartKey string `json:"start"` +} + +// DecodeContinue transforms an encoded predicate from into a versioned struct. +// TODO: return a typed error that instructs clients that they must relist +func DecodeContinue(continueValue, keyPrefix string) (fromKey string, rv int64, err error) { + data, err := base64.RawURLEncoding.DecodeString(continueValue) + if err != nil { + return "", 0, fmt.Errorf("continue key is not valid: %v", err) + } + var c continueToken + if err := json.Unmarshal(data, &c); err != nil { + return "", 0, fmt.Errorf("continue key is not valid: %v", err) + } + switch c.APIVersion { + case "meta.k8s.io/v1": + if c.ResourceVersion == 0 { + return "", 0, fmt.Errorf("continue key is not valid: incorrect encoded start resourceVersion (version meta.k8s.io/v1)") + } + if len(c.StartKey) == 0 { + return "", 0, fmt.Errorf("continue key is not valid: encoded start key empty (version meta.k8s.io/v1)") + } + // defend against path traversal attacks by clients - path.Clean will ensure that startKey cannot + // be at a higher level of the hierarchy, and so when we append the key prefix we will end up with + // continue start key that is fully qualified and cannot range over anything less specific than + // keyPrefix. + key := c.StartKey + if !strings.HasPrefix(key, "/") { + key = "/" + key + } + cleaned := path.Clean(key) + if cleaned != key { + return "", 0, fmt.Errorf("continue key is not valid: %s", c.StartKey) + } + return keyPrefix + cleaned[1:], c.ResourceVersion, nil + default: + return "", 0, fmt.Errorf("continue key is not valid: server does not recognize this encoded version %q", c.APIVersion) + } +} + +// EncodeContinue returns a string representing the encoded continuation of the current query. +func EncodeContinue(key, keyPrefix string, resourceVersion int64) (string, error) { + nextKey := strings.TrimPrefix(key, keyPrefix) + if nextKey == key { + return "", fmt.Errorf("unable to encode next field: the key and key prefix do not match") + } + out, err := json.Marshal(&continueToken{APIVersion: "meta.k8s.io/v1", ResourceVersion: resourceVersion, StartKey: nextKey}) + if err != nil { + return "", err + } + return base64.RawURLEncoding.EncodeToString(out), nil +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/continue_test.go b/staging/src/k8s.io/apiserver/pkg/storage/continue_test.go new file mode 100644 index 00000000000..3a8afb6a5c2 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/continue_test.go @@ -0,0 +1,71 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "encoding/base64" + "encoding/json" + "testing" +) + +func encodeContinueOrDie(apiVersion string, resourceVersion int64, nextKey string) string { + out, err := json.Marshal(&continueToken{APIVersion: apiVersion, ResourceVersion: resourceVersion, StartKey: nextKey}) + if err != nil { + panic(err) + } + return base64.RawURLEncoding.EncodeToString(out) +} + +func Test_decodeContinue(t *testing.T) { + type args struct { + continueValue string + keyPrefix string + } + tests := []struct { + name string + args args + wantFromKey string + wantRv int64 + wantErr bool + }{ + {name: "valid", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "key"), keyPrefix: "/test/"}, wantRv: 1, wantFromKey: "/test/key"}, + {name: "root path", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "/"), keyPrefix: "/test/"}, wantRv: 1, wantFromKey: "/test/"}, + + {name: "empty version", args: args{continueValue: encodeContinueOrDie("", 1, "key"), keyPrefix: "/test/"}, wantErr: true}, + {name: "invalid version", args: args{continueValue: encodeContinueOrDie("v1", 1, "key"), keyPrefix: "/test/"}, wantErr: true}, + + {name: "path traversal - parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "../key"), keyPrefix: "/test/"}, wantErr: true}, + {name: "path traversal - local", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "./key"), keyPrefix: "/test/"}, wantErr: true}, + {name: "path traversal - double parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "./../key"), keyPrefix: "/test/"}, wantErr: true}, + {name: "path traversal - after parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "key/../.."), keyPrefix: "/test/"}, wantErr: true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotFromKey, gotRv, err := DecodeContinue(tt.args.continueValue, tt.args.keyPrefix) + if (err != nil) != tt.wantErr { + t.Errorf("decodeContinue() error = %v, wantErr %v", err, tt.wantErr) + return + } + if gotFromKey != tt.wantFromKey { + t.Errorf("decodeContinue() gotFromKey = %v, want %v", gotFromKey, tt.wantFromKey) + } + if gotRv != tt.wantRv { + t.Errorf("decodeContinue() gotRv = %v, want %v", gotRv, tt.wantRv) + } + }) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/errors.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/errors.go index 652bd7ca6dc..d71c9917dc7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/errors.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/errors.go @@ -18,6 +18,7 @@ package etcd3 import ( "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apiserver/pkg/storage" etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -60,7 +61,7 @@ func handleCompactedErrorForPaging(continueKey, keyPrefix string) error { // continueToken.ResoureVersion=-1 means that the apiserver can // continue the list at the latest resource version. We don't use rv=0 // for this purpose to distinguish from a bad token that has empty rv. - newToken, err := encodeContinue(continueKey, keyPrefix, -1) + newToken, err := storage.EncodeContinue(continueKey, keyPrefix, -1) if err != nil { utilruntime.HandleError(err) return errors.NewResourceExpired(continueExpired) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 629502448a4..d5f43911eeb 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -19,8 +19,6 @@ package etcd3 import ( "bytes" "context" - "encoding/base64" - "encoding/json" "errors" "fmt" "path" @@ -519,66 +517,6 @@ func (s *store) Count(key string) (int64, error) { return getResp.Count, nil } -// continueToken is a simple structured object for encoding the state of a continue token. -// TODO: if we change the version of the encoded from, we can't start encoding the new version -// until all other servers are upgraded (i.e. we need to support rolling schema) -// This is a public API struct and cannot change. -type continueToken struct { - APIVersion string `json:"v"` - ResourceVersion int64 `json:"rv"` - StartKey string `json:"start"` -} - -// parseFrom transforms an encoded predicate from into a versioned struct. -// TODO: return a typed error that instructs clients that they must relist -func decodeContinue(continueValue, keyPrefix string) (fromKey string, rv int64, err error) { - data, err := base64.RawURLEncoding.DecodeString(continueValue) - if err != nil { - return "", 0, fmt.Errorf("continue key is not valid: %v", err) - } - var c continueToken - if err := json.Unmarshal(data, &c); err != nil { - return "", 0, fmt.Errorf("continue key is not valid: %v", err) - } - switch c.APIVersion { - case "meta.k8s.io/v1": - if c.ResourceVersion == 0 { - return "", 0, fmt.Errorf("continue key is not valid: incorrect encoded start resourceVersion (version meta.k8s.io/v1)") - } - if len(c.StartKey) == 0 { - return "", 0, fmt.Errorf("continue key is not valid: encoded start key empty (version meta.k8s.io/v1)") - } - // defend against path traversal attacks by clients - path.Clean will ensure that startKey cannot - // be at a higher level of the hierarchy, and so when we append the key prefix we will end up with - // continue start key that is fully qualified and cannot range over anything less specific than - // keyPrefix. - key := c.StartKey - if !strings.HasPrefix(key, "/") { - key = "/" + key - } - cleaned := path.Clean(key) - if cleaned != key { - return "", 0, fmt.Errorf("continue key is not valid: %s", c.StartKey) - } - return keyPrefix + cleaned[1:], c.ResourceVersion, nil - default: - return "", 0, fmt.Errorf("continue key is not valid: server does not recognize this encoded version %q", c.APIVersion) - } -} - -// encodeContinue returns a string representing the encoded continuation of the current query. -func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error) { - nextKey := strings.TrimPrefix(key, keyPrefix) - if nextKey == key { - return "", fmt.Errorf("unable to encode next field: the key and key prefix do not match") - } - out, err := json.Marshal(&continueToken{APIVersion: "meta.k8s.io/v1", ResourceVersion: resourceVersion, StartKey: nextKey}) - if err != nil { - return "", err - } - return base64.RawURLEncoding.EncodeToString(out), nil -} - // GetList implements storage.Interface. func (s *store) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { recursive := opts.Recursive @@ -637,7 +575,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption var continueKey string switch { case recursive && s.pagingEnabled && len(pred.Continue) > 0: - continueKey, continueRV, err = decodeContinue(pred.Continue, keyPrefix) + continueKey, continueRV, err = storage.DecodeContinue(pred.Continue, keyPrefix) if err != nil { return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err)) } @@ -798,7 +736,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption // we never return a key that the client wouldn't be allowed to see if hasMore { // we want to start immediately after the last key - next, err := encodeContinue(string(lastKey)+"\x00", keyPrefix, returnedRV) + next, err := storage.EncodeContinue(string(lastKey)+"\x00", keyPrefix, returnedRV) if err != nil { return err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index f371558784d..11a21d3c9e7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -19,8 +19,6 @@ package etcd3 import ( "bytes" "context" - "encoding/base64" - "encoding/json" "fmt" "io/ioutil" "math" @@ -597,7 +595,7 @@ func TestList(t *testing.T) { t.Errorf("Unexpected error: %v", err) } continueRV, _ := strconv.Atoi(list.ResourceVersion) - secondContinuation, err := encodeContinue("/two-level/2", "/two-level/", int64(continueRV)) + secondContinuation, err := storage.EncodeContinue("/two-level/2", "/two-level/", int64(continueRV)) if err != nil { t.Fatal(err) } @@ -927,7 +925,7 @@ func TestList(t *testing.T) { Field: fields.OneTermEqualSelector("metadata.name", "bar"), Label: labels.Everything(), Limit: 2, - Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3"), + Continue: storagetesting.EncodeContinueOrDie("z-level/3", int64(continueRV)), }, expectedOut: []*example.Pod{preset[4].storedObj}, }, @@ -938,7 +936,7 @@ func TestList(t *testing.T) { Field: fields.OneTermEqualSelector("metadata.name", "bar"), Label: labels.Everything(), Limit: 1, - Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3/test-2"), + Continue: storagetesting.EncodeContinueOrDie("z-level/3/test-2", int64(continueRV)), }, expectedOut: []*example.Pod{preset[4].storedObj}, }, @@ -949,7 +947,7 @@ func TestList(t *testing.T) { Field: fields.OneTermEqualSelector("metadata.name", "bar"), Label: labels.Everything(), Limit: 2, - Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3/test-2"), + Continue: storagetesting.EncodeContinueOrDie("z-level/3/test-2", int64(continueRV)), }, expectedOut: []*example.Pod{preset[4].storedObj}, }, @@ -1134,7 +1132,7 @@ func TestListContinuation(t *testing.T) { if len(out.Continue) != 0 { t.Fatalf("Unexpected continuation token set") } - key, rv, err := decodeContinue(continueFromSecondItem, "/") + key, rv, err := storage.DecodeContinue(continueFromSecondItem, "/") t.Logf("continue token was %d %s %v", rv, key, err) storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj, *preset[2].storedObj}, out.Items) if transformer.reads != 2 { @@ -1654,54 +1652,6 @@ func TestPrefix(t *testing.T) { } } -func encodeContinueOrDie(apiVersion string, resourceVersion int64, nextKey string) string { - out, err := json.Marshal(&continueToken{APIVersion: apiVersion, ResourceVersion: resourceVersion, StartKey: nextKey}) - if err != nil { - panic(err) - } - return base64.RawURLEncoding.EncodeToString(out) -} - -func Test_decodeContinue(t *testing.T) { - type args struct { - continueValue string - keyPrefix string - } - tests := []struct { - name string - args args - wantFromKey string - wantRv int64 - wantErr bool - }{ - {name: "valid", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "key"), keyPrefix: "/test/"}, wantRv: 1, wantFromKey: "/test/key"}, - {name: "root path", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "/"), keyPrefix: "/test/"}, wantRv: 1, wantFromKey: "/test/"}, - - {name: "empty version", args: args{continueValue: encodeContinueOrDie("", 1, "key"), keyPrefix: "/test/"}, wantErr: true}, - {name: "invalid version", args: args{continueValue: encodeContinueOrDie("v1", 1, "key"), keyPrefix: "/test/"}, wantErr: true}, - - {name: "path traversal - parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "../key"), keyPrefix: "/test/"}, wantErr: true}, - {name: "path traversal - local", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "./key"), keyPrefix: "/test/"}, wantErr: true}, - {name: "path traversal - double parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "./../key"), keyPrefix: "/test/"}, wantErr: true}, - {name: "path traversal - after parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "key/../.."), keyPrefix: "/test/"}, wantErr: true}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - gotFromKey, gotRv, err := decodeContinue(tt.args.continueValue, tt.args.keyPrefix) - if (err != nil) != tt.wantErr { - t.Errorf("decodeContinue() error = %v, wantErr %v", err, tt.wantErr) - return - } - if gotFromKey != tt.wantFromKey { - t.Errorf("decodeContinue() gotFromKey = %v, want %v", gotFromKey, tt.wantFromKey) - } - if gotRv != tt.wantRv { - t.Errorf("decodeContinue() gotRv = %v, want %v", gotRv, tt.wantRv) - } - }) - } -} - func Test_growSlice(t *testing.T) { type args struct { initialCapacity int diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go index d272a1a6ae6..9776a1e0c67 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go @@ -769,3 +769,13 @@ func ExpectNoDiff(t *testing.T, msg string, expected, got interface{}) { } } } + +const dummyPrefix = "adapter" + +func EncodeContinueOrDie(key string, resourceVersion int64) string { + token, err := storage.EncodeContinue(dummyPrefix+key, dummyPrefix, resourceVersion) + if err != nil { + panic(err) + } + return token +} From e50cb6c8d36ca73c99c935126978cbec01f9be01 Mon Sep 17 00:00:00 2001 From: Steve Kuznetsov Date: Thu, 12 May 2022 11:52:28 -0700 Subject: [PATCH 2/2] storage: move test utiltiies out of test files Signed-off-by: Steve Kuznetsov --- .../pkg/storage/testing/store_tests.go | 45 -------- .../apiserver/pkg/storage/testing/utils.go | 101 ++++++++++++++++++ .../pkg/storage/testing/watcher_tests.go | 51 --------- 3 files changed, 101 insertions(+), 96 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go index 9776a1e0c67..80108555ca0 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go @@ -21,12 +21,10 @@ import ( "errors" "fmt" "math" - "reflect" "strconv" "sync" "testing" - "github.com/google/go-cmp/cmp" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -701,28 +699,6 @@ func RunTestGuaranteedUpdateWithSuggestionAndConflict(ctx context.Context, t *te } } -// TestPropogateStore helps propagates store with objects, automates key generation, and returns -// keys and stored objects. -func TestPropogateStore(ctx context.Context, t *testing.T, store storage.Interface, obj *example.Pod) (string, *example.Pod) { - // Setup store with a key and grab the output for returning. - key := "/testkey" - return key, TestPropogateStoreWithKey(ctx, t, store, key, obj) -} - -// TestPropogateStoreWithKey helps propagate store with objects, the given object will be stored at the specified key. -func TestPropogateStoreWithKey(ctx context.Context, t *testing.T, store storage.Interface, key string, obj *example.Pod) *example.Pod { - // Setup store with the specified key and grab the output for returning. - err := store.Delete(ctx, key, &example.Pod{}, nil, storage.ValidateAllObjectFunc, nil) - if err != nil && !storage.IsNotFound(err) { - t.Fatalf("Cleanup failed: %v", err) - } - setOutput := &example.Pod{} - if err := store.Create(ctx, key, obj, setOutput, 0); err != nil { - t.Fatalf("Set failed: %v", err) - } - return setOutput -} - func RunTestCount(ctx context.Context, t *testing.T, store storage.Interface) { resourceA := "/foo.bar.io/abc" @@ -758,24 +734,3 @@ func RunTestCount(ctx context.Context, t *testing.T, store storage.Interface) { t.Fatalf("store.Count for resource %s: expected %d but got %d", resourceA, resourceACountExpected, resourceACountGot) } } - -func ExpectNoDiff(t *testing.T, msg string, expected, got interface{}) { - t.Helper() - if !reflect.DeepEqual(expected, got) { - if diff := cmp.Diff(expected, got); diff != "" { - t.Errorf("%s: %s", msg, diff) - } else { - t.Errorf("%s:\nexpected: %#v\ngot: %#v", msg, expected, got) - } - } -} - -const dummyPrefix = "adapter" - -func EncodeContinueOrDie(key string, resourceVersion int64) string { - token, err := storage.EncodeContinue(dummyPrefix+key, dummyPrefix, resourceVersion) - if err != nil { - panic(err) - } - return token -} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go index d0fbeef3eaa..821fffa5498 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go @@ -19,9 +19,16 @@ package testing import ( "context" "path" + "reflect" + "testing" + "time" + "github.com/google/go-cmp/cmp" "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/apis/example" "k8s.io/apiserver/pkg/storage" ) @@ -70,3 +77,97 @@ func DeepEqualSafePodSpec() example.PodSpec { SchedulerName: "default-scheduler", } } + +// TestPropogateStore helps propagates store with objects, automates key generation, and returns +// keys and stored objects. +func TestPropogateStore(ctx context.Context, t *testing.T, store storage.Interface, obj *example.Pod) (string, *example.Pod) { + // Setup store with a key and grab the output for returning. + key := "/testkey" + return key, TestPropogateStoreWithKey(ctx, t, store, key, obj) +} + +// TestPropogateStoreWithKey helps propagate store with objects, the given object will be stored at the specified key. +func TestPropogateStoreWithKey(ctx context.Context, t *testing.T, store storage.Interface, key string, obj *example.Pod) *example.Pod { + // Setup store with the specified key and grab the output for returning. + err := store.Delete(ctx, key, &example.Pod{}, nil, storage.ValidateAllObjectFunc, nil) + if err != nil && !storage.IsNotFound(err) { + t.Fatalf("Cleanup failed: %v", err) + } + setOutput := &example.Pod{} + if err := store.Create(ctx, key, obj, setOutput, 0); err != nil { + t.Fatalf("Set failed: %v", err) + } + return setOutput +} + +func ExpectNoDiff(t *testing.T, msg string, expected, got interface{}) { + t.Helper() + if !reflect.DeepEqual(expected, got) { + if diff := cmp.Diff(expected, got); diff != "" { + t.Errorf("%s: %s", msg, diff) + } else { + t.Errorf("%s:\nexpected: %#v\ngot: %#v", msg, expected, got) + } + } +} + +const dummyPrefix = "adapter" + +func EncodeContinueOrDie(key string, resourceVersion int64) string { + token, err := storage.EncodeContinue(dummyPrefix+key, dummyPrefix, resourceVersion) + if err != nil { + panic(err) + } + return token +} + +func TestCheckEventType(t *testing.T, expectEventType watch.EventType, w watch.Interface) { + select { + case res := <-w.ResultChan(): + if res.Type != expectEventType { + t.Errorf("event type want=%v, get=%v", expectEventType, res.Type) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("time out after waiting %v on ResultChan", wait.ForeverTestTimeout) + } +} + +func TestCheckResult(t *testing.T, expectEventType watch.EventType, w watch.Interface, expectObj *example.Pod) { + TestCheckResultFunc(t, expectEventType, w, func(object runtime.Object) error { + ExpectNoDiff(t, "incorrect object", expectObj, object) + return nil + }) +} + +func TestCheckResultFunc(t *testing.T, expectEventType watch.EventType, w watch.Interface, check func(object runtime.Object) error) { + select { + case res := <-w.ResultChan(): + if res.Type != expectEventType { + t.Errorf("event type want=%v, get=%v", expectEventType, res.Type) + return + } + if err := check(res.Object); err != nil { + t.Error(err) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("time out after waiting %v on ResultChan", wait.ForeverTestTimeout) + } +} + +func TestCheckStop(t *testing.T, w watch.Interface) { + select { + case e, ok := <-w.ResultChan(): + if ok { + var obj string + switch e.Object.(type) { + case *example.Pod: + obj = e.Object.(*example.Pod).Name + case *v1.Status: + obj = e.Object.(*v1.Status).Message + } + t.Errorf("ResultChan should have been closed. Event: %s. Object: %s", e.Type, obj) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("time out after waiting 1s on ResultChan") + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go index 56e6525c441..f6717fcbb33 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go @@ -190,54 +190,3 @@ type testWatchStruct struct { expectEvent bool watchType watch.EventType } - -func TestCheckEventType(t *testing.T, expectEventType watch.EventType, w watch.Interface) { - select { - case res := <-w.ResultChan(): - if res.Type != expectEventType { - t.Errorf("event type want=%v, get=%v", expectEventType, res.Type) - } - case <-time.After(wait.ForeverTestTimeout): - t.Errorf("time out after waiting %v on ResultChan", wait.ForeverTestTimeout) - } -} - -func TestCheckResult(t *testing.T, expectEventType watch.EventType, w watch.Interface, expectObj *example.Pod) { - TestCheckResultFunc(t, expectEventType, w, func(object runtime.Object) error { - ExpectNoDiff(t, "incorrect object", expectObj, object) - return nil - }) -} - -func TestCheckResultFunc(t *testing.T, expectEventType watch.EventType, w watch.Interface, check func(object runtime.Object) error) { - select { - case res := <-w.ResultChan(): - if res.Type != expectEventType { - t.Errorf("event type want=%v, get=%v", expectEventType, res.Type) - return - } - if err := check(res.Object); err != nil { - t.Error(err) - } - case <-time.After(wait.ForeverTestTimeout): - t.Errorf("time out after waiting %v on ResultChan", wait.ForeverTestTimeout) - } -} - -func TestCheckStop(t *testing.T, w watch.Interface) { - select { - case e, ok := <-w.ResultChan(): - if ok { - var obj string - switch e.Object.(type) { - case *example.Pod: - obj = e.Object.(*example.Pod).Name - case *metav1.Status: - obj = e.Object.(*metav1.Status).Message - } - t.Errorf("ResultChan should have been closed. Event: %s. Object: %s", e.Type, obj) - } - case <-time.After(wait.ForeverTestTimeout): - t.Errorf("time out after waiting 1s on ResultChan") - } -}