mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
Merge pull request #113357 from wojtek-t/refactor_storage_tests
Refactor storage tests
This commit is contained in:
commit
c33a773886
@ -17,7 +17,6 @@ limitations under the License.
|
|||||||
package etcd3
|
package etcd3
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
@ -63,68 +62,36 @@ func init() {
|
|||||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, os.Stderr))
|
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, os.Stderr))
|
||||||
}
|
}
|
||||||
|
|
||||||
// prefixTransformer adds and verifies that all data has the correct prefix on its way in and out.
|
|
||||||
type prefixTransformer struct {
|
|
||||||
prefix []byte
|
|
||||||
stale bool
|
|
||||||
err error
|
|
||||||
reads uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *prefixTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) {
|
|
||||||
atomic.AddUint64(&p.reads, 1)
|
|
||||||
if dataCtx == nil {
|
|
||||||
panic("no context provided")
|
|
||||||
}
|
|
||||||
if !bytes.HasPrefix(data, p.prefix) {
|
|
||||||
return nil, false, fmt.Errorf("value does not have expected prefix %q: %s,", p.prefix, string(data))
|
|
||||||
}
|
|
||||||
return bytes.TrimPrefix(data, p.prefix), p.stale, p.err
|
|
||||||
}
|
|
||||||
func (p *prefixTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) {
|
|
||||||
if dataCtx == nil {
|
|
||||||
panic("no context provided")
|
|
||||||
}
|
|
||||||
if len(data) > 0 {
|
|
||||||
return append(append([]byte{}, p.prefix...), data...), p.err
|
|
||||||
}
|
|
||||||
return data, p.err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *prefixTransformer) resetReads() {
|
|
||||||
p.reads = 0
|
|
||||||
}
|
|
||||||
|
|
||||||
func newPod() runtime.Object {
|
func newPod() runtime.Object {
|
||||||
return &example.Pod{}
|
return &example.Pod{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCreate(t *testing.T) {
|
func checkStorageInvariants(etcdClient *clientv3.Client, codec runtime.Codec) storagetesting.KeyValidation {
|
||||||
ctx, store, etcdClient := testSetup(t)
|
return func(ctx context.Context, t *testing.T, key string) {
|
||||||
storagetesting.RunTestCreate(ctx, t, store, func(ctx context.Context, t *testing.T, key string) {
|
getResp, err := etcdClient.KV.Get(ctx, key)
|
||||||
checkStorageInvariants(ctx, t, etcdClient, store.codec, key)
|
if err != nil {
|
||||||
})
|
t.Fatalf("etcdClient.KV.Get failed: %v", err)
|
||||||
|
}
|
||||||
|
if len(getResp.Kvs) == 0 {
|
||||||
|
t.Fatalf("expecting non empty result on key: %s", key)
|
||||||
|
}
|
||||||
|
decoded, err := runtime.Decode(codec, getResp.Kvs[0].Value[len(defaultTestPrefix):])
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("expecting successful decode of object from %v\n%v", err, string(getResp.Kvs[0].Value))
|
||||||
|
}
|
||||||
|
obj := decoded.(*example.Pod)
|
||||||
|
if obj.ResourceVersion != "" {
|
||||||
|
t.Errorf("stored object should have empty resource version")
|
||||||
|
}
|
||||||
|
if obj.SelfLink != "" {
|
||||||
|
t.Errorf("stored output should have empty selfLink")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkStorageInvariants(ctx context.Context, t *testing.T, etcdClient *clientv3.Client, codec runtime.Codec, key string) {
|
func TestCreate(t *testing.T) {
|
||||||
getResp, err := etcdClient.KV.Get(ctx, key)
|
ctx, store, etcdClient := testSetup(t)
|
||||||
if err != nil {
|
storagetesting.RunTestCreate(ctx, t, store, checkStorageInvariants(etcdClient, store.codec))
|
||||||
t.Fatalf("etcdClient.KV.Get failed: %v", err)
|
|
||||||
}
|
|
||||||
if len(getResp.Kvs) == 0 {
|
|
||||||
t.Fatalf("expecting non empty result on key: %s", key)
|
|
||||||
}
|
|
||||||
decoded, err := runtime.Decode(codec, getResp.Kvs[0].Value[len(defaultTestPrefix):])
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("expecting successful decode of object from %v\n%v", err, string(getResp.Kvs[0].Value))
|
|
||||||
}
|
|
||||||
obj := decoded.(*example.Pod)
|
|
||||||
if obj.ResourceVersion != "" {
|
|
||||||
t.Errorf("stored object should have empty resource version")
|
|
||||||
}
|
|
||||||
if obj.SelfLink != "" {
|
|
||||||
t.Errorf("stored output should have empty selfLink")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCreateWithTTL(t *testing.T) {
|
func TestCreateWithTTL(t *testing.T) {
|
||||||
@ -152,31 +119,6 @@ func TestConditionalDelete(t *testing.T) {
|
|||||||
storagetesting.RunTestConditionalDelete(ctx, t, store)
|
storagetesting.RunTestConditionalDelete(ctx, t, store)
|
||||||
}
|
}
|
||||||
|
|
||||||
// The following set of Delete tests are testing the logic of adding `suggestion`
|
|
||||||
// as a parameter with probably value of the current state.
|
|
||||||
// Introducing it for GuaranteedUpdate cause a number of issues, so we're addressing
|
|
||||||
// all of those upfront by adding appropriate tests:
|
|
||||||
// - https://github.com/kubernetes/kubernetes/pull/35415
|
|
||||||
// [DONE] Lack of tests originally - added TestDeleteWithSuggestion.
|
|
||||||
// - https://github.com/kubernetes/kubernetes/pull/40664
|
|
||||||
// [DONE] Irrelevant for delete, as Delete doesn't write data (nor compare it).
|
|
||||||
// - https://github.com/kubernetes/kubernetes/pull/47703
|
|
||||||
// [DONE] Irrelevant for delete, because Delete doesn't persist data.
|
|
||||||
// - https://github.com/kubernetes/kubernetes/pull/48394/
|
|
||||||
// [DONE] Irrelevant for delete, because Delete doesn't compare data.
|
|
||||||
// - https://github.com/kubernetes/kubernetes/pull/43152
|
|
||||||
// [DONE] Added TestDeleteWithSuggestionAndConflict
|
|
||||||
// - https://github.com/kubernetes/kubernetes/pull/54780
|
|
||||||
// [DONE] Irrelevant for delete, because Delete doesn't compare data.
|
|
||||||
// - https://github.com/kubernetes/kubernetes/pull/58375
|
|
||||||
// [DONE] Irrelevant for delete, because Delete doesn't compare data.
|
|
||||||
// - https://github.com/kubernetes/kubernetes/pull/77619
|
|
||||||
// [DONE] Added TestValidateDeletionWithSuggestion for corresponding delete checks.
|
|
||||||
// - https://github.com/kubernetes/kubernetes/pull/78713
|
|
||||||
// [DONE] Bug was in getState function which is shared with the new code.
|
|
||||||
// - https://github.com/kubernetes/kubernetes/pull/78713
|
|
||||||
// [DONE] Added TestPreconditionalDeleteWithSuggestion
|
|
||||||
|
|
||||||
func TestDeleteWithSuggestion(t *testing.T) {
|
func TestDeleteWithSuggestion(t *testing.T) {
|
||||||
ctx, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
storagetesting.RunTestDeleteWithSuggestion(ctx, t, store)
|
storagetesting.RunTestDeleteWithSuggestion(ctx, t, store)
|
||||||
@ -207,157 +149,22 @@ func TestGetListNonRecursive(t *testing.T) {
|
|||||||
storagetesting.RunTestGetListNonRecursive(ctx, t, store)
|
storagetesting.RunTestGetListNonRecursive(ctx, t, store)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type storeWithPrefixTransformer struct {
|
||||||
|
*store
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *storeWithPrefixTransformer) UpdatePrefixTransformer(modifier storagetesting.PrefixTransformerModifier) func() {
|
||||||
|
originalTransformer := s.transformer.(*storagetesting.PrefixTransformer)
|
||||||
|
transformer := *originalTransformer
|
||||||
|
s.transformer = modifier(&transformer)
|
||||||
|
return func() {
|
||||||
|
s.transformer = originalTransformer
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestGuaranteedUpdate(t *testing.T) {
|
func TestGuaranteedUpdate(t *testing.T) {
|
||||||
ctx, store, etcdClient := testSetup(t)
|
ctx, store, etcdClient := testSetup(t)
|
||||||
key := "/testkey"
|
storagetesting.RunTestGuaranteedUpdate(ctx, t, &storeWithPrefixTransformer{store}, checkStorageInvariants(etcdClient, store.codec))
|
||||||
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
key string
|
|
||||||
ignoreNotFound bool
|
|
||||||
precondition *storage.Preconditions
|
|
||||||
expectNotFoundErr bool
|
|
||||||
expectInvalidObjErr bool
|
|
||||||
expectNoUpdate bool
|
|
||||||
transformStale bool
|
|
||||||
hasSelfLink bool
|
|
||||||
}{{
|
|
||||||
name: "non-existing key, ignoreNotFound=false",
|
|
||||||
key: "/non-existing",
|
|
||||||
ignoreNotFound: false,
|
|
||||||
precondition: nil,
|
|
||||||
expectNotFoundErr: true,
|
|
||||||
expectInvalidObjErr: false,
|
|
||||||
expectNoUpdate: false,
|
|
||||||
}, {
|
|
||||||
name: "non-existing key, ignoreNotFound=true",
|
|
||||||
key: "/non-existing",
|
|
||||||
ignoreNotFound: true,
|
|
||||||
precondition: nil,
|
|
||||||
expectNotFoundErr: false,
|
|
||||||
expectInvalidObjErr: false,
|
|
||||||
expectNoUpdate: false,
|
|
||||||
}, {
|
|
||||||
name: "existing key",
|
|
||||||
key: key,
|
|
||||||
ignoreNotFound: false,
|
|
||||||
precondition: nil,
|
|
||||||
expectNotFoundErr: false,
|
|
||||||
expectInvalidObjErr: false,
|
|
||||||
expectNoUpdate: false,
|
|
||||||
}, {
|
|
||||||
name: "same data",
|
|
||||||
key: key,
|
|
||||||
ignoreNotFound: false,
|
|
||||||
precondition: nil,
|
|
||||||
expectNotFoundErr: false,
|
|
||||||
expectInvalidObjErr: false,
|
|
||||||
expectNoUpdate: true,
|
|
||||||
}, {
|
|
||||||
name: "same data, a selfLink",
|
|
||||||
key: key,
|
|
||||||
ignoreNotFound: false,
|
|
||||||
precondition: nil,
|
|
||||||
expectNotFoundErr: false,
|
|
||||||
expectInvalidObjErr: false,
|
|
||||||
expectNoUpdate: true,
|
|
||||||
hasSelfLink: true,
|
|
||||||
}, {
|
|
||||||
name: "same data, stale",
|
|
||||||
key: key,
|
|
||||||
ignoreNotFound: false,
|
|
||||||
precondition: nil,
|
|
||||||
expectNotFoundErr: false,
|
|
||||||
expectInvalidObjErr: false,
|
|
||||||
expectNoUpdate: false,
|
|
||||||
transformStale: true,
|
|
||||||
}, {
|
|
||||||
name: "UID match",
|
|
||||||
key: key,
|
|
||||||
ignoreNotFound: false,
|
|
||||||
precondition: storage.NewUIDPreconditions("A"),
|
|
||||||
expectNotFoundErr: false,
|
|
||||||
expectInvalidObjErr: false,
|
|
||||||
expectNoUpdate: true,
|
|
||||||
}, {
|
|
||||||
name: "UID mismatch",
|
|
||||||
key: key,
|
|
||||||
ignoreNotFound: false,
|
|
||||||
precondition: storage.NewUIDPreconditions("B"),
|
|
||||||
expectNotFoundErr: false,
|
|
||||||
expectInvalidObjErr: true,
|
|
||||||
expectNoUpdate: true,
|
|
||||||
}}
|
|
||||||
|
|
||||||
for i, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
key, storeObj := storagetesting.TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
|
|
||||||
|
|
||||||
out := &example.Pod{}
|
|
||||||
name := fmt.Sprintf("foo-%d", i)
|
|
||||||
if tt.expectNoUpdate {
|
|
||||||
name = storeObj.Name
|
|
||||||
}
|
|
||||||
originalTransformer := store.transformer.(*prefixTransformer)
|
|
||||||
if tt.transformStale {
|
|
||||||
transformer := *originalTransformer
|
|
||||||
transformer.stale = true
|
|
||||||
store.transformer = &transformer
|
|
||||||
}
|
|
||||||
version := storeObj.ResourceVersion
|
|
||||||
err := store.GuaranteedUpdate(ctx, tt.key, out, tt.ignoreNotFound, tt.precondition,
|
|
||||||
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
|
||||||
if tt.expectNotFoundErr && tt.ignoreNotFound {
|
|
||||||
if pod := obj.(*example.Pod); pod.Name != "" {
|
|
||||||
t.Errorf("%s: expecting zero value, but get=%#v", tt.name, pod)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pod := *storeObj
|
|
||||||
if tt.hasSelfLink {
|
|
||||||
pod.SelfLink = "testlink"
|
|
||||||
}
|
|
||||||
pod.Name = name
|
|
||||||
return &pod, nil
|
|
||||||
}), nil)
|
|
||||||
store.transformer = originalTransformer
|
|
||||||
|
|
||||||
if tt.expectNotFoundErr {
|
|
||||||
if err == nil || !storage.IsNotFound(err) {
|
|
||||||
t.Errorf("%s: expecting not found error, but get: %v", tt.name, err)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if tt.expectInvalidObjErr {
|
|
||||||
if err == nil || !storage.IsInvalidObj(err) {
|
|
||||||
t.Errorf("%s: expecting invalid UID error, but get: %s", tt.name, err)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("%s: GuaranteedUpdate failed: %v", tt.name, err)
|
|
||||||
}
|
|
||||||
if out.ObjectMeta.Name != name {
|
|
||||||
t.Errorf("%s: pod name want=%s, get=%s", tt.name, name, out.ObjectMeta.Name)
|
|
||||||
}
|
|
||||||
if out.SelfLink != "" {
|
|
||||||
t.Errorf("%s: selfLink should not be set", tt.name)
|
|
||||||
}
|
|
||||||
|
|
||||||
// verify that kv pair is not empty after set and that the underlying data matches expectations
|
|
||||||
checkStorageInvariants(ctx, t, etcdClient, store.codec, key)
|
|
||||||
|
|
||||||
switch tt.expectNoUpdate {
|
|
||||||
case true:
|
|
||||||
if version != out.ResourceVersion {
|
|
||||||
t.Errorf("%s: expect no version change, before=%s, after=%s", tt.name, version, out.ResourceVersion)
|
|
||||||
}
|
|
||||||
case false:
|
|
||||||
if version == out.ResourceVersion {
|
|
||||||
t.Errorf("%s: expect version change, but get the same version=%s", tt.name, version)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGuaranteedUpdateWithTTL(t *testing.T) {
|
func TestGuaranteedUpdateWithTTL(t *testing.T) {
|
||||||
@ -367,65 +174,7 @@ func TestGuaranteedUpdateWithTTL(t *testing.T) {
|
|||||||
|
|
||||||
func TestGuaranteedUpdateChecksStoredData(t *testing.T) {
|
func TestGuaranteedUpdateChecksStoredData(t *testing.T) {
|
||||||
ctx, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
storagetesting.RunTestGuaranteedUpdateChecksStoredData(ctx, t, &storeWithPrefixTransformer{store})
|
||||||
key := "/somekey"
|
|
||||||
|
|
||||||
// serialize input into etcd with data that would be normalized by a write - in this case, leading
|
|
||||||
// and trailing whitespace
|
|
||||||
codec := codecs.LegacyCodec(examplev1.SchemeGroupVersion)
|
|
||||||
data, err := runtime.Encode(codec, input)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
resp, err := store.client.Put(ctx, key, "test! "+string(data)+" ")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// this update should write the canonical value to etcd because the new serialization differs
|
|
||||||
// from the stored serialization
|
|
||||||
input.ResourceVersion = strconv.FormatInt(resp.Header.Revision, 10)
|
|
||||||
out := &example.Pod{}
|
|
||||||
err = store.GuaranteedUpdate(ctx, key, out, true, nil,
|
|
||||||
func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
|
||||||
return input, nil, nil
|
|
||||||
}, input)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Update failed: %v", err)
|
|
||||||
}
|
|
||||||
if out.ResourceVersion == strconv.FormatInt(resp.Header.Revision, 10) {
|
|
||||||
t.Errorf("guaranteed update should have updated the serialized data, got %#v", out)
|
|
||||||
}
|
|
||||||
|
|
||||||
lastVersion := out.ResourceVersion
|
|
||||||
|
|
||||||
// this update should not write to etcd because the input matches the stored data
|
|
||||||
input = out
|
|
||||||
out = &example.Pod{}
|
|
||||||
err = store.GuaranteedUpdate(ctx, key, out, true, nil,
|
|
||||||
func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
|
||||||
return input, nil, nil
|
|
||||||
}, input)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Update failed: %v", err)
|
|
||||||
}
|
|
||||||
if out.ResourceVersion != lastVersion {
|
|
||||||
t.Errorf("guaranteed update should have short-circuited write, got %#v", out)
|
|
||||||
}
|
|
||||||
|
|
||||||
store.transformer = &prefixTransformer{prefix: []byte(defaultTestPrefix), stale: true}
|
|
||||||
|
|
||||||
// this update should write to etcd because the transformer reported stale
|
|
||||||
err = store.GuaranteedUpdate(ctx, key, out, true, nil,
|
|
||||||
func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
|
||||||
return input, nil, nil
|
|
||||||
}, input)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Update failed: %v", err)
|
|
||||||
}
|
|
||||||
if out.ResourceVersion == lastVersion {
|
|
||||||
t.Errorf("guaranteed update should have written to etcd when transformer reported stale, got %#v", out)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGuaranteedUpdateWithConflict(t *testing.T) {
|
func TestGuaranteedUpdateWithConflict(t *testing.T) {
|
||||||
@ -440,78 +189,7 @@ func TestGuaranteedUpdateWithSuggestionAndConflict(t *testing.T) {
|
|||||||
|
|
||||||
func TestTransformationFailure(t *testing.T) {
|
func TestTransformationFailure(t *testing.T) {
|
||||||
ctx, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
|
storagetesting.RunTestTransformationFailure(ctx, t, &storeWithPrefixTransformer{store})
|
||||||
preset := []struct {
|
|
||||||
key string
|
|
||||||
obj *example.Pod
|
|
||||||
storedObj *example.Pod
|
|
||||||
}{{
|
|
||||||
key: "/one-level/test",
|
|
||||||
obj: &example.Pod{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{Name: "bar"},
|
|
||||||
Spec: storagetesting.DeepEqualSafePodSpec(),
|
|
||||||
},
|
|
||||||
}, {
|
|
||||||
key: "/two-level/1/test",
|
|
||||||
obj: &example.Pod{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{Name: "baz"},
|
|
||||||
Spec: storagetesting.DeepEqualSafePodSpec(),
|
|
||||||
},
|
|
||||||
}}
|
|
||||||
for i, ps := range preset[:1] {
|
|
||||||
preset[i].storedObj = &example.Pod{}
|
|
||||||
err := store.Create(ctx, ps.key, ps.obj, preset[:1][i].storedObj, 0)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Set failed: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// create a second resource with an invalid prefix
|
|
||||||
oldTransformer := store.transformer
|
|
||||||
store.transformer = &prefixTransformer{prefix: []byte("otherprefix!")}
|
|
||||||
for i, ps := range preset[1:] {
|
|
||||||
preset[1:][i].storedObj = &example.Pod{}
|
|
||||||
err := store.Create(ctx, ps.key, ps.obj, preset[1:][i].storedObj, 0)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Set failed: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
store.transformer = oldTransformer
|
|
||||||
|
|
||||||
// List should fail
|
|
||||||
var got example.PodList
|
|
||||||
storageOpts := storage.ListOptions{
|
|
||||||
Predicate: storage.Everything,
|
|
||||||
Recursive: true,
|
|
||||||
}
|
|
||||||
if err := store.GetList(ctx, "/", storageOpts, &got); !storage.IsInternalError(err) {
|
|
||||||
t.Errorf("Unexpected error %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get should fail
|
|
||||||
if err := store.Get(ctx, preset[1].key, storage.GetOptions{}, &example.Pod{}); !storage.IsInternalError(err) {
|
|
||||||
t.Errorf("Unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
// GuaranteedUpdate without suggestion should return an error
|
|
||||||
if err := store.GuaranteedUpdate(ctx, preset[1].key, &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) {
|
|
||||||
return input, nil, nil
|
|
||||||
}, nil); !storage.IsInternalError(err) {
|
|
||||||
t.Errorf("Unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
// GuaranteedUpdate with suggestion should return an error if we don't change the object
|
|
||||||
if err := store.GuaranteedUpdate(ctx, preset[1].key, &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) {
|
|
||||||
return input, nil, nil
|
|
||||||
}, preset[1].obj); err == nil {
|
|
||||||
t.Errorf("Unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete fails with internal error.
|
|
||||||
if err := store.Delete(ctx, preset[1].key, &example.Pod{}, nil, storage.ValidateAllObjectFunc, nil); !storage.IsInternalError(err) {
|
|
||||||
t.Errorf("Unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
if err := store.Get(ctx, preset[1].key, storage.GetOptions{}, &example.Pod{}); !storage.IsInternalError(err) {
|
|
||||||
t.Errorf("Unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestList(t *testing.T) {
|
func TestList(t *testing.T) {
|
||||||
@ -526,7 +204,7 @@ func TestListWithoutPaging(t *testing.T) {
|
|||||||
|
|
||||||
func TestListContinuation(t *testing.T) {
|
func TestListContinuation(t *testing.T) {
|
||||||
ctx, store, etcdClient := testSetup(t)
|
ctx, store, etcdClient := testSetup(t)
|
||||||
transformer := store.transformer.(*prefixTransformer)
|
transformer := store.transformer.(*storagetesting.PrefixTransformer)
|
||||||
recorder := &clientRecorder{KV: etcdClient.KV}
|
recorder := &clientRecorder{KV: etcdClient.KV}
|
||||||
etcdClient.KV = recorder
|
etcdClient.KV = recorder
|
||||||
|
|
||||||
@ -595,13 +273,13 @@ func TestListContinuation(t *testing.T) {
|
|||||||
t.Fatalf("No continuation token set")
|
t.Fatalf("No continuation token set")
|
||||||
}
|
}
|
||||||
storagetesting.ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj}, out.Items)
|
storagetesting.ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj}, out.Items)
|
||||||
if transformer.reads != 1 {
|
if reads := transformer.GetReads(); reads != 1 {
|
||||||
t.Errorf("unexpected reads: %d", transformer.reads)
|
t.Errorf("unexpected reads: %d", reads)
|
||||||
}
|
}
|
||||||
if recorder.reads != 1 {
|
if recorder.reads != 1 {
|
||||||
t.Errorf("unexpected reads: %d", recorder.reads)
|
t.Errorf("unexpected reads: %d", recorder.reads)
|
||||||
}
|
}
|
||||||
transformer.resetReads()
|
transformer.ResetReads()
|
||||||
recorder.resetReads()
|
recorder.resetReads()
|
||||||
|
|
||||||
continueFromSecondItem := out.Continue
|
continueFromSecondItem := out.Continue
|
||||||
@ -622,13 +300,13 @@ func TestListContinuation(t *testing.T) {
|
|||||||
key, rv, err := storage.DecodeContinue(continueFromSecondItem, "/")
|
key, rv, err := storage.DecodeContinue(continueFromSecondItem, "/")
|
||||||
t.Logf("continue token was %d %s %v", rv, key, err)
|
t.Logf("continue token was %d %s %v", rv, key, err)
|
||||||
storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj, *preset[2].storedObj}, out.Items)
|
storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj, *preset[2].storedObj}, out.Items)
|
||||||
if transformer.reads != 2 {
|
if reads := transformer.GetReads(); reads != 2 {
|
||||||
t.Errorf("unexpected reads: %d", transformer.reads)
|
t.Errorf("unexpected reads: %d", reads)
|
||||||
}
|
}
|
||||||
if recorder.reads != 1 {
|
if recorder.reads != 1 {
|
||||||
t.Errorf("unexpected reads: %d", recorder.reads)
|
t.Errorf("unexpected reads: %d", recorder.reads)
|
||||||
}
|
}
|
||||||
transformer.resetReads()
|
transformer.ResetReads()
|
||||||
recorder.resetReads()
|
recorder.resetReads()
|
||||||
|
|
||||||
// limit, should get two more pages
|
// limit, should get two more pages
|
||||||
@ -645,13 +323,13 @@ func TestListContinuation(t *testing.T) {
|
|||||||
t.Fatalf("No continuation token set")
|
t.Fatalf("No continuation token set")
|
||||||
}
|
}
|
||||||
storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items)
|
storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items)
|
||||||
if transformer.reads != 1 {
|
if reads := transformer.GetReads(); reads != 1 {
|
||||||
t.Errorf("unexpected reads: %d", transformer.reads)
|
t.Errorf("unexpected reads: %d", reads)
|
||||||
}
|
}
|
||||||
if recorder.reads != 1 {
|
if recorder.reads != 1 {
|
||||||
t.Errorf("unexpected reads: %d", recorder.reads)
|
t.Errorf("unexpected reads: %d", recorder.reads)
|
||||||
}
|
}
|
||||||
transformer.resetReads()
|
transformer.ResetReads()
|
||||||
recorder.resetReads()
|
recorder.resetReads()
|
||||||
|
|
||||||
continueFromThirdItem := out.Continue
|
continueFromThirdItem := out.Continue
|
||||||
@ -669,19 +347,17 @@ func TestListContinuation(t *testing.T) {
|
|||||||
t.Fatalf("Unexpected continuation token set")
|
t.Fatalf("Unexpected continuation token set")
|
||||||
}
|
}
|
||||||
storagetesting.ExpectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items)
|
storagetesting.ExpectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items)
|
||||||
if transformer.reads != 1 {
|
if reads := transformer.GetReads(); reads != 1 {
|
||||||
t.Errorf("unexpected reads: %d", transformer.reads)
|
t.Errorf("unexpected reads: %d", reads)
|
||||||
}
|
}
|
||||||
if recorder.reads != 1 {
|
if recorder.reads != 1 {
|
||||||
t.Errorf("unexpected reads: %d", recorder.reads)
|
t.Errorf("unexpected reads: %d", recorder.reads)
|
||||||
}
|
}
|
||||||
transformer.resetReads()
|
|
||||||
recorder.resetReads()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestListPaginationRareObject(t *testing.T) {
|
func TestListPaginationRareObject(t *testing.T) {
|
||||||
ctx, store, etcdClient := testSetup(t)
|
ctx, store, etcdClient := testSetup(t)
|
||||||
transformer := store.transformer.(*prefixTransformer)
|
transformer := store.transformer.(*storagetesting.PrefixTransformer)
|
||||||
recorder := &clientRecorder{KV: etcdClient.KV}
|
recorder := &clientRecorder{KV: etcdClient.KV}
|
||||||
etcdClient.KV = recorder
|
etcdClient.KV = recorder
|
||||||
|
|
||||||
@ -720,8 +396,8 @@ func TestListPaginationRareObject(t *testing.T) {
|
|||||||
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], pods[999]) {
|
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], pods[999]) {
|
||||||
t.Fatalf("Unexpected first page: %#v", out.Items)
|
t.Fatalf("Unexpected first page: %#v", out.Items)
|
||||||
}
|
}
|
||||||
if transformer.reads != uint64(podCount) {
|
if reads := transformer.GetReads(); reads != uint64(podCount) {
|
||||||
t.Errorf("unexpected reads: %d", transformer.reads)
|
t.Errorf("unexpected reads: %d", reads)
|
||||||
}
|
}
|
||||||
// We expect that kube-apiserver will be increasing page sizes
|
// We expect that kube-apiserver will be increasing page sizes
|
||||||
// if not full pages are received, so we should see significantly less
|
// if not full pages are received, so we should see significantly less
|
||||||
@ -733,8 +409,6 @@ func TestListPaginationRareObject(t *testing.T) {
|
|||||||
if recorder.reads != 10 {
|
if recorder.reads != 10 {
|
||||||
t.Errorf("unexpected reads: %d", recorder.reads)
|
t.Errorf("unexpected reads: %d", recorder.reads)
|
||||||
}
|
}
|
||||||
transformer.resetReads()
|
|
||||||
recorder.resetReads()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type clientRecorder struct {
|
type clientRecorder struct {
|
||||||
@ -753,7 +427,7 @@ func (r *clientRecorder) resetReads() {
|
|||||||
|
|
||||||
func TestListContinuationWithFilter(t *testing.T) {
|
func TestListContinuationWithFilter(t *testing.T) {
|
||||||
ctx, store, etcdClient := testSetup(t)
|
ctx, store, etcdClient := testSetup(t)
|
||||||
transformer := store.transformer.(*prefixTransformer)
|
transformer := store.transformer.(*storagetesting.PrefixTransformer)
|
||||||
recorder := &clientRecorder{KV: etcdClient.KV}
|
recorder := &clientRecorder{KV: etcdClient.KV}
|
||||||
etcdClient.KV = recorder
|
etcdClient.KV = recorder
|
||||||
|
|
||||||
@ -817,13 +491,13 @@ func TestListContinuationWithFilter(t *testing.T) {
|
|||||||
t.Errorf("No continuation token set")
|
t.Errorf("No continuation token set")
|
||||||
}
|
}
|
||||||
storagetesting.ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj, *preset[2].storedObj}, out.Items)
|
storagetesting.ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj, *preset[2].storedObj}, out.Items)
|
||||||
if transformer.reads != 3 {
|
if reads := transformer.GetReads(); reads != 3 {
|
||||||
t.Errorf("unexpected reads: %d", transformer.reads)
|
t.Errorf("unexpected reads: %d", reads)
|
||||||
}
|
}
|
||||||
if recorder.reads != 2 {
|
if recorder.reads != 2 {
|
||||||
t.Errorf("unexpected reads: %d", recorder.reads)
|
t.Errorf("unexpected reads: %d", recorder.reads)
|
||||||
}
|
}
|
||||||
transformer.resetReads()
|
transformer.ResetReads()
|
||||||
recorder.resetReads()
|
recorder.resetReads()
|
||||||
|
|
||||||
// the rest of the test does not make sense if the previous call failed
|
// the rest of the test does not make sense if the previous call failed
|
||||||
@ -849,14 +523,12 @@ func TestListContinuationWithFilter(t *testing.T) {
|
|||||||
t.Errorf("Unexpected continuation token set")
|
t.Errorf("Unexpected continuation token set")
|
||||||
}
|
}
|
||||||
storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[3].storedObj}, out.Items)
|
storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[3].storedObj}, out.Items)
|
||||||
if transformer.reads != 1 {
|
if reads := transformer.GetReads(); reads != 1 {
|
||||||
t.Errorf("unexpected reads: %d", transformer.reads)
|
t.Errorf("unexpected reads: %d", reads)
|
||||||
}
|
}
|
||||||
if recorder.reads != 1 {
|
if recorder.reads != 1 {
|
||||||
t.Errorf("unexpected reads: %d", recorder.reads)
|
t.Errorf("unexpected reads: %d", recorder.reads)
|
||||||
}
|
}
|
||||||
transformer.resetReads()
|
|
||||||
recorder.resetReads()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestListInconsistentContinuation(t *testing.T) {
|
func TestListInconsistentContinuation(t *testing.T) {
|
||||||
@ -1026,8 +698,8 @@ func newTestLeaseManagerConfig() LeaseManagerConfig {
|
|||||||
return cfg
|
return cfg
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTestTransformer() *prefixTransformer {
|
func newTestTransformer() value.Transformer {
|
||||||
return &prefixTransformer{prefix: []byte(defaultTestPrefix)}
|
return storagetesting.NewPrefixTransformer([]byte(defaultTestPrefix), false)
|
||||||
}
|
}
|
||||||
|
|
||||||
type setupOptions struct {
|
type setupOptions struct {
|
||||||
|
@ -54,7 +54,7 @@ func TestDeleteTriggerWatch(t *testing.T) {
|
|||||||
// - watch from 0 is able to return events for objects whose previous version has been compacted
|
// - watch from 0 is able to return events for objects whose previous version has been compacted
|
||||||
func TestWatchFromZero(t *testing.T) {
|
func TestWatchFromZero(t *testing.T) {
|
||||||
ctx, store, client := testSetup(t)
|
ctx, store, client := testSetup(t)
|
||||||
key, storedObj := storagetesting.TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}})
|
key, storedObj := storagetesting.TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}})
|
||||||
|
|
||||||
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -164,7 +164,7 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
|
|||||||
|
|
||||||
func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
|
func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
|
||||||
ctx, store, client := testSetup(t)
|
ctx, store, client := testSetup(t)
|
||||||
key, storedObj := storagetesting.TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
key, storedObj := storagetesting.TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||||
|
|
||||||
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -35,6 +35,7 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/apis/example"
|
"k8s.io/apiserver/pkg/apis/example"
|
||||||
"k8s.io/apiserver/pkg/features"
|
"k8s.io/apiserver/pkg/features"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
|
"k8s.io/apiserver/pkg/storage/value"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||||
utilpointer "k8s.io/utils/pointer"
|
utilpointer "k8s.io/utils/pointer"
|
||||||
@ -87,7 +88,7 @@ func RunTestCreateWithTTL(ctx context.Context, t *testing.T, store storage.Inter
|
|||||||
|
|
||||||
func RunTestCreateWithKeyExist(ctx context.Context, t *testing.T, store storage.Interface) {
|
func RunTestCreateWithKeyExist(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||||
key, _ := TestPropogateStore(ctx, t, store, obj)
|
key, _ := TestPropagateStore(ctx, t, store, obj)
|
||||||
out := &example.Pod{}
|
out := &example.Pod{}
|
||||||
err := store.Create(ctx, key, obj, out, 0)
|
err := store.Create(ctx, key, obj, out, 0)
|
||||||
if err == nil || !storage.IsExist(err) {
|
if err == nil || !storage.IsExist(err) {
|
||||||
@ -97,7 +98,7 @@ func RunTestCreateWithKeyExist(ctx context.Context, t *testing.T, store storage.
|
|||||||
|
|
||||||
func RunTestGet(ctx context.Context, t *testing.T, store storage.Interface) {
|
func RunTestGet(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
// create an object to test
|
// create an object to test
|
||||||
key, createdObj := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
key, createdObj := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||||
// update the object once to allow get by exact resource version to be tested
|
// update the object once to allow get by exact resource version to be tested
|
||||||
updateObj := createdObj.DeepCopy()
|
updateObj := createdObj.DeepCopy()
|
||||||
updateObj.Annotations = map[string]string{"test-annotation": "1"}
|
updateObj.Annotations = map[string]string{"test-annotation": "1"}
|
||||||
@ -128,43 +129,43 @@ func RunTestGet(ctx context.Context, t *testing.T, store storage.Interface) {
|
|||||||
expectRVTooLarge bool
|
expectRVTooLarge bool
|
||||||
expectedOut *example.Pod
|
expectedOut *example.Pod
|
||||||
rv string
|
rv string
|
||||||
}{{ // test get on existing item
|
}{{
|
||||||
name: "get existing",
|
name: "get existing",
|
||||||
key: key,
|
key: key,
|
||||||
ignoreNotFound: false,
|
ignoreNotFound: false,
|
||||||
expectNotFoundErr: false,
|
expectNotFoundErr: false,
|
||||||
expectedOut: storedObj,
|
expectedOut: storedObj,
|
||||||
}, { // test get on existing item with resource version set to 0
|
}, {
|
||||||
name: "resource version 0",
|
name: "resource version 0",
|
||||||
key: key,
|
key: key,
|
||||||
expectedOut: storedObj,
|
expectedOut: storedObj,
|
||||||
rv: "0",
|
rv: "0",
|
||||||
}, { // test get on existing item with resource version set to the resource version is was created on
|
}, {
|
||||||
name: "object created resource version",
|
name: "object created resource version",
|
||||||
key: key,
|
key: key,
|
||||||
expectedOut: storedObj,
|
expectedOut: storedObj,
|
||||||
rv: createdObj.ResourceVersion,
|
rv: createdObj.ResourceVersion,
|
||||||
}, { // test get on existing item with resource version set to current resource version of the object
|
}, {
|
||||||
name: "current object resource version, match=NotOlderThan",
|
name: "current object resource version, match=NotOlderThan",
|
||||||
key: key,
|
key: key,
|
||||||
expectedOut: storedObj,
|
expectedOut: storedObj,
|
||||||
rv: fmt.Sprintf("%d", currentRV),
|
rv: fmt.Sprintf("%d", currentRV),
|
||||||
}, { // test get on existing item with resource version set to latest pod resource version
|
}, {
|
||||||
name: "latest resource version",
|
name: "latest resource version",
|
||||||
key: key,
|
key: key,
|
||||||
expectedOut: storedObj,
|
expectedOut: storedObj,
|
||||||
rv: fmt.Sprintf("%d", lastUpdatedCurrentRV),
|
rv: fmt.Sprintf("%d", lastUpdatedCurrentRV),
|
||||||
}, { // test get on existing item with resource version set too high
|
}, {
|
||||||
name: "too high resource version",
|
name: "too high resource version",
|
||||||
key: key,
|
key: key,
|
||||||
expectRVTooLarge: true,
|
expectRVTooLarge: true,
|
||||||
rv: strconv.FormatInt(math.MaxInt64, 10),
|
rv: strconv.FormatInt(math.MaxInt64, 10),
|
||||||
}, { // test get on non-existing item with ignoreNotFound=false
|
}, {
|
||||||
name: "get non-existing",
|
name: "get non-existing",
|
||||||
key: "/non-existing",
|
key: "/non-existing",
|
||||||
ignoreNotFound: false,
|
ignoreNotFound: false,
|
||||||
expectNotFoundErr: true,
|
expectNotFoundErr: true,
|
||||||
}, { // test get on non-existing item with ignoreNotFound=true
|
}, {
|
||||||
name: "get non-existing, ignore not found",
|
name: "get non-existing, ignore not found",
|
||||||
key: "/non-existing",
|
key: "/non-existing",
|
||||||
ignoreNotFound: true,
|
ignoreNotFound: true,
|
||||||
@ -197,7 +198,7 @@ func RunTestGet(ctx context.Context, t *testing.T, store storage.Interface) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func RunTestUnconditionalDelete(ctx context.Context, t *testing.T, store storage.Interface) {
|
func RunTestUnconditionalDelete(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
key, storedObj := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
key, storedObj := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
@ -235,7 +236,7 @@ func RunTestUnconditionalDelete(ctx context.Context, t *testing.T, store storage
|
|||||||
}
|
}
|
||||||
|
|
||||||
func RunTestConditionalDelete(ctx context.Context, t *testing.T, store storage.Interface) {
|
func RunTestConditionalDelete(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
key, storedObj := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
|
key, storedObj := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
@ -265,7 +266,7 @@ func RunTestConditionalDelete(ctx context.Context, t *testing.T, store storage.I
|
|||||||
t.Fatalf("%s: Delete failed: %v", tt.name, err)
|
t.Fatalf("%s: Delete failed: %v", tt.name, err)
|
||||||
}
|
}
|
||||||
ExpectNoDiff(t, fmt.Sprintf("%s: incorrect pod", tt.name), storedObj, out)
|
ExpectNoDiff(t, fmt.Sprintf("%s: incorrect pod", tt.name), storedObj, out)
|
||||||
key, storedObj = TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
|
key, storedObj = TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -296,8 +297,7 @@ func RunTestConditionalDelete(ctx context.Context, t *testing.T, store storage.I
|
|||||||
// [DONE] Added TestPreconditionalDeleteWithSuggestion
|
// [DONE] Added TestPreconditionalDeleteWithSuggestion
|
||||||
|
|
||||||
func RunTestDeleteWithSuggestion(ctx context.Context, t *testing.T, store storage.Interface) {
|
func RunTestDeleteWithSuggestion(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
key, originalPod := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
||||||
key, originalPod := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
|
||||||
|
|
||||||
out := &example.Pod{}
|
out := &example.Pod{}
|
||||||
if err := store.Delete(ctx, key, out, nil, storage.ValidateAllObjectFunc, originalPod); err != nil {
|
if err := store.Delete(ctx, key, out, nil, storage.ValidateAllObjectFunc, originalPod); err != nil {
|
||||||
@ -310,8 +310,7 @@ func RunTestDeleteWithSuggestion(ctx context.Context, t *testing.T, store storag
|
|||||||
}
|
}
|
||||||
|
|
||||||
func RunTestDeleteWithSuggestionAndConflict(ctx context.Context, t *testing.T, store storage.Interface) {
|
func RunTestDeleteWithSuggestionAndConflict(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
key, originalPod := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
||||||
key, originalPod := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
|
||||||
|
|
||||||
// First update, so originalPod is outdated.
|
// First update, so originalPod is outdated.
|
||||||
updatedPod := &example.Pod{}
|
updatedPod := &example.Pod{}
|
||||||
@ -335,8 +334,7 @@ func RunTestDeleteWithSuggestionAndConflict(ctx context.Context, t *testing.T, s
|
|||||||
}
|
}
|
||||||
|
|
||||||
func RunTestDeleteWithSuggestionOfDeletedObject(ctx context.Context, t *testing.T, store storage.Interface) {
|
func RunTestDeleteWithSuggestionOfDeletedObject(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
key, originalPod := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
||||||
key, originalPod := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
|
||||||
|
|
||||||
// First delete, so originalPod is outdated.
|
// First delete, so originalPod is outdated.
|
||||||
deletedPod := &example.Pod{}
|
deletedPod := &example.Pod{}
|
||||||
@ -352,8 +350,7 @@ func RunTestDeleteWithSuggestionOfDeletedObject(ctx context.Context, t *testing.
|
|||||||
}
|
}
|
||||||
|
|
||||||
func RunTestValidateDeletionWithSuggestion(ctx context.Context, t *testing.T, store storage.Interface) {
|
func RunTestValidateDeletionWithSuggestion(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
key, originalPod := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
||||||
key, originalPod := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
|
||||||
|
|
||||||
// Check that validaing fresh object fails is called once and fails.
|
// Check that validaing fresh object fails is called once and fails.
|
||||||
validationCalls := 0
|
validationCalls := 0
|
||||||
@ -405,8 +402,7 @@ func RunTestValidateDeletionWithSuggestion(ctx context.Context, t *testing.T, st
|
|||||||
}
|
}
|
||||||
|
|
||||||
func RunTestPreconditionalDeleteWithSuggestion(ctx context.Context, t *testing.T, store storage.Interface) {
|
func RunTestPreconditionalDeleteWithSuggestion(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
key, originalPod := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
||||||
key, originalPod := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
|
||||||
|
|
||||||
// First update, so originalPod is outdated.
|
// First update, so originalPod is outdated.
|
||||||
updatedPod := &example.Pod{}
|
updatedPod := &example.Pod{}
|
||||||
@ -1019,12 +1015,10 @@ func seedMultiLevelData(ctx context.Context, store storage.Interface) (string, [
|
|||||||
}
|
}
|
||||||
|
|
||||||
func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, store storage.Interface) {
|
func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
prevKey, prevStoredObj := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "prev"}})
|
prevKey, prevStoredObj := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "prev"}})
|
||||||
|
|
||||||
prevRV, _ := strconv.Atoi(prevStoredObj.ResourceVersion)
|
prevRV, _ := strconv.Atoi(prevStoredObj.ResourceVersion)
|
||||||
|
key, storedObj := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||||
key, storedObj := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
|
||||||
|
|
||||||
currentRV, _ := strconv.Atoi(storedObj.ResourceVersion)
|
currentRV, _ := strconv.Atoi(storedObj.ResourceVersion)
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
@ -1149,8 +1143,170 @@ func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, store storage
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func RunTestGuaranteedUpdateWithTTL(ctx context.Context, t *testing.T, store storage.Interface) {
|
type PrefixTransformerModifier func(*PrefixTransformer) value.Transformer
|
||||||
|
|
||||||
|
type InterfaceWithPrefixTransformer interface {
|
||||||
|
storage.Interface
|
||||||
|
|
||||||
|
UpdatePrefixTransformer(PrefixTransformerModifier) func()
|
||||||
|
}
|
||||||
|
|
||||||
|
func RunTestGuaranteedUpdate(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer, validation KeyValidation) {
|
||||||
|
key := "/testkey"
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
key string
|
||||||
|
ignoreNotFound bool
|
||||||
|
precondition *storage.Preconditions
|
||||||
|
expectNotFoundErr bool
|
||||||
|
expectInvalidObjErr bool
|
||||||
|
expectNoUpdate bool
|
||||||
|
transformStale bool
|
||||||
|
hasSelfLink bool
|
||||||
|
}{{
|
||||||
|
name: "non-existing key, ignoreNotFound=false",
|
||||||
|
key: "/non-existing",
|
||||||
|
ignoreNotFound: false,
|
||||||
|
precondition: nil,
|
||||||
|
expectNotFoundErr: true,
|
||||||
|
expectInvalidObjErr: false,
|
||||||
|
expectNoUpdate: false,
|
||||||
|
}, {
|
||||||
|
name: "non-existing key, ignoreNotFound=true",
|
||||||
|
key: "/non-existing",
|
||||||
|
ignoreNotFound: true,
|
||||||
|
precondition: nil,
|
||||||
|
expectNotFoundErr: false,
|
||||||
|
expectInvalidObjErr: false,
|
||||||
|
expectNoUpdate: false,
|
||||||
|
}, {
|
||||||
|
name: "existing key",
|
||||||
|
key: key,
|
||||||
|
ignoreNotFound: false,
|
||||||
|
precondition: nil,
|
||||||
|
expectNotFoundErr: false,
|
||||||
|
expectInvalidObjErr: false,
|
||||||
|
expectNoUpdate: false,
|
||||||
|
}, {
|
||||||
|
name: "same data",
|
||||||
|
key: key,
|
||||||
|
ignoreNotFound: false,
|
||||||
|
precondition: nil,
|
||||||
|
expectNotFoundErr: false,
|
||||||
|
expectInvalidObjErr: false,
|
||||||
|
expectNoUpdate: true,
|
||||||
|
}, {
|
||||||
|
name: "same data, a selfLink",
|
||||||
|
key: key,
|
||||||
|
ignoreNotFound: false,
|
||||||
|
precondition: nil,
|
||||||
|
expectNotFoundErr: false,
|
||||||
|
expectInvalidObjErr: false,
|
||||||
|
expectNoUpdate: true,
|
||||||
|
hasSelfLink: true,
|
||||||
|
}, {
|
||||||
|
name: "same data, stale",
|
||||||
|
key: key,
|
||||||
|
ignoreNotFound: false,
|
||||||
|
precondition: nil,
|
||||||
|
expectNotFoundErr: false,
|
||||||
|
expectInvalidObjErr: false,
|
||||||
|
expectNoUpdate: false,
|
||||||
|
transformStale: true,
|
||||||
|
}, {
|
||||||
|
name: "UID match",
|
||||||
|
key: key,
|
||||||
|
ignoreNotFound: false,
|
||||||
|
precondition: storage.NewUIDPreconditions("A"),
|
||||||
|
expectNotFoundErr: false,
|
||||||
|
expectInvalidObjErr: false,
|
||||||
|
expectNoUpdate: true,
|
||||||
|
}, {
|
||||||
|
name: "UID mismatch",
|
||||||
|
key: key,
|
||||||
|
ignoreNotFound: false,
|
||||||
|
precondition: storage.NewUIDPreconditions("B"),
|
||||||
|
expectNotFoundErr: false,
|
||||||
|
expectInvalidObjErr: true,
|
||||||
|
expectNoUpdate: true,
|
||||||
|
}}
|
||||||
|
|
||||||
|
for i, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
key, storeObj := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
|
||||||
|
|
||||||
|
out := &example.Pod{}
|
||||||
|
name := fmt.Sprintf("foo-%d", i)
|
||||||
|
if tt.expectNoUpdate {
|
||||||
|
name = storeObj.Name
|
||||||
|
}
|
||||||
|
|
||||||
|
if tt.transformStale {
|
||||||
|
revertTransformer := store.UpdatePrefixTransformer(
|
||||||
|
func(transformer *PrefixTransformer) value.Transformer {
|
||||||
|
transformer.stale = true
|
||||||
|
return transformer
|
||||||
|
})
|
||||||
|
defer revertTransformer()
|
||||||
|
}
|
||||||
|
|
||||||
|
version := storeObj.ResourceVersion
|
||||||
|
err := store.GuaranteedUpdate(ctx, tt.key, out, tt.ignoreNotFound, tt.precondition,
|
||||||
|
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
||||||
|
if tt.expectNotFoundErr && tt.ignoreNotFound {
|
||||||
|
if pod := obj.(*example.Pod); pod.Name != "" {
|
||||||
|
t.Errorf("%s: expecting zero value, but get=%#v", tt.name, pod)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pod := *storeObj
|
||||||
|
if tt.hasSelfLink {
|
||||||
|
pod.SelfLink = "testlink"
|
||||||
|
}
|
||||||
|
pod.Name = name
|
||||||
|
return &pod, nil
|
||||||
|
}), nil)
|
||||||
|
|
||||||
|
if tt.expectNotFoundErr {
|
||||||
|
if err == nil || !storage.IsNotFound(err) {
|
||||||
|
t.Errorf("%s: expecting not found error, but get: %v", tt.name, err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if tt.expectInvalidObjErr {
|
||||||
|
if err == nil || !storage.IsInvalidObj(err) {
|
||||||
|
t.Errorf("%s: expecting invalid UID error, but get: %s", tt.name, err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%s: GuaranteedUpdate failed: %v", tt.name, err)
|
||||||
|
}
|
||||||
|
if out.ObjectMeta.Name != name {
|
||||||
|
t.Errorf("%s: pod name want=%s, get=%s", tt.name, name, out.ObjectMeta.Name)
|
||||||
|
}
|
||||||
|
if out.SelfLink != "" {
|
||||||
|
t.Errorf("%s: selfLink should not be set", tt.name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify that kv pair is not empty after set and that the underlying data matches expectations
|
||||||
|
validation(ctx, t, key)
|
||||||
|
|
||||||
|
switch tt.expectNoUpdate {
|
||||||
|
case true:
|
||||||
|
if version != out.ResourceVersion {
|
||||||
|
t.Errorf("%s: expect no version change, before=%s, after=%s", tt.name, version, out.ResourceVersion)
|
||||||
|
}
|
||||||
|
case false:
|
||||||
|
if version == out.ResourceVersion {
|
||||||
|
t.Errorf("%s: expect version change, but get the same version=%s", tt.name, version)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func RunTestGuaranteedUpdateWithTTL(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||||
key := "/somekey"
|
key := "/somekey"
|
||||||
|
|
||||||
@ -1171,8 +1327,73 @@ func RunTestGuaranteedUpdateWithTTL(ctx context.Context, t *testing.T, store sto
|
|||||||
TestCheckEventType(t, watch.Deleted, w)
|
TestCheckEventType(t, watch.Deleted, w)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func RunTestGuaranteedUpdateChecksStoredData(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) {
|
||||||
|
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||||
|
key := "/somekey"
|
||||||
|
|
||||||
|
// serialize input into etcd with data that would be normalized by a write -
|
||||||
|
// in this case, leading whitespace
|
||||||
|
revertTransformer := store.UpdatePrefixTransformer(
|
||||||
|
func(transformer *PrefixTransformer) value.Transformer {
|
||||||
|
transformer.prefix = []byte(string(transformer.prefix) + " ")
|
||||||
|
return transformer
|
||||||
|
})
|
||||||
|
_, initial := TestPropagateStore(ctx, t, store, input)
|
||||||
|
revertTransformer()
|
||||||
|
|
||||||
|
// this update should write the canonical value to etcd because the new serialization differs
|
||||||
|
// from the stored serialization
|
||||||
|
input.ResourceVersion = initial.ResourceVersion
|
||||||
|
out := &example.Pod{}
|
||||||
|
err := store.GuaranteedUpdate(ctx, key, out, true, nil,
|
||||||
|
func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
||||||
|
return input, nil, nil
|
||||||
|
}, input)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Update failed: %v", err)
|
||||||
|
}
|
||||||
|
if out.ResourceVersion == initial.ResourceVersion {
|
||||||
|
t.Errorf("guaranteed update should have updated the serialized data, got %#v", out)
|
||||||
|
}
|
||||||
|
|
||||||
|
lastVersion := out.ResourceVersion
|
||||||
|
|
||||||
|
// this update should not write to etcd because the input matches the stored data
|
||||||
|
input = out
|
||||||
|
out = &example.Pod{}
|
||||||
|
err = store.GuaranteedUpdate(ctx, key, out, true, nil,
|
||||||
|
func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
||||||
|
return input, nil, nil
|
||||||
|
}, input)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Update failed: %v", err)
|
||||||
|
}
|
||||||
|
if out.ResourceVersion != lastVersion {
|
||||||
|
t.Errorf("guaranteed update should have short-circuited write, got %#v", out)
|
||||||
|
}
|
||||||
|
|
||||||
|
revertTransformer = store.UpdatePrefixTransformer(
|
||||||
|
func(transformer *PrefixTransformer) value.Transformer {
|
||||||
|
transformer.stale = true
|
||||||
|
return transformer
|
||||||
|
})
|
||||||
|
defer revertTransformer()
|
||||||
|
|
||||||
|
// this update should write to etcd because the transformer reported stale
|
||||||
|
err = store.GuaranteedUpdate(ctx, key, out, true, nil,
|
||||||
|
func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
||||||
|
return input, nil, nil
|
||||||
|
}, input)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Update failed: %v", err)
|
||||||
|
}
|
||||||
|
if out.ResourceVersion == lastVersion {
|
||||||
|
t.Errorf("guaranteed update should have written to etcd when transformer reported stale, got %#v", out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func RunTestGuaranteedUpdateWithConflict(ctx context.Context, t *testing.T, store storage.Interface) {
|
func RunTestGuaranteedUpdateWithConflict(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
key, _ := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
key, _ := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||||
|
|
||||||
errChan := make(chan error, 1)
|
errChan := make(chan error, 1)
|
||||||
var firstToFinish sync.WaitGroup
|
var firstToFinish sync.WaitGroup
|
||||||
@ -1217,7 +1438,7 @@ func RunTestGuaranteedUpdateWithConflict(ctx context.Context, t *testing.T, stor
|
|||||||
}
|
}
|
||||||
|
|
||||||
func RunTestGuaranteedUpdateWithSuggestionAndConflict(ctx context.Context, t *testing.T, store storage.Interface) {
|
func RunTestGuaranteedUpdateWithSuggestionAndConflict(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
key, originalPod := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
key, originalPod := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||||
|
|
||||||
// First, update without a suggestion so originalPod is outdated
|
// First, update without a suggestion so originalPod is outdated
|
||||||
updatedPod := &example.Pod{}
|
updatedPod := &example.Pod{}
|
||||||
@ -1291,8 +1512,83 @@ func RunTestGuaranteedUpdateWithSuggestionAndConflict(ctx context.Context, t *te
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func RunTestCount(ctx context.Context, t *testing.T, store storage.Interface) {
|
func RunTestTransformationFailure(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) {
|
||||||
|
preset := []struct {
|
||||||
|
key string
|
||||||
|
obj *example.Pod
|
||||||
|
storedObj *example.Pod
|
||||||
|
}{{
|
||||||
|
key: "/one-level/test",
|
||||||
|
obj: &example.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "bar"},
|
||||||
|
Spec: DeepEqualSafePodSpec(),
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
key: "/two-level/1/test",
|
||||||
|
obj: &example.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "baz"},
|
||||||
|
Spec: DeepEqualSafePodSpec(),
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
for i, ps := range preset[:1] {
|
||||||
|
preset[i].storedObj = &example.Pod{}
|
||||||
|
err := store.Create(ctx, ps.key, ps.obj, preset[:1][i].storedObj, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Set failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// create a second resource with an invalid prefix
|
||||||
|
revertTransformer := store.UpdatePrefixTransformer(
|
||||||
|
func(transformer *PrefixTransformer) value.Transformer {
|
||||||
|
return NewPrefixTransformer([]byte("otherprefix!"), false)
|
||||||
|
})
|
||||||
|
for i, ps := range preset[1:] {
|
||||||
|
preset[1:][i].storedObj = &example.Pod{}
|
||||||
|
err := store.Create(ctx, ps.key, ps.obj, preset[1:][i].storedObj, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Set failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
revertTransformer()
|
||||||
|
|
||||||
|
// List should fail
|
||||||
|
var got example.PodList
|
||||||
|
storageOpts := storage.ListOptions{
|
||||||
|
Predicate: storage.Everything,
|
||||||
|
Recursive: true,
|
||||||
|
}
|
||||||
|
if err := store.GetList(ctx, "/", storageOpts, &got); !storage.IsInternalError(err) {
|
||||||
|
t.Errorf("Unexpected error %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get should fail
|
||||||
|
if err := store.Get(ctx, preset[1].key, storage.GetOptions{}, &example.Pod{}); !storage.IsInternalError(err) {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
updateFunc := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
||||||
|
return input, nil, nil
|
||||||
|
}
|
||||||
|
// GuaranteedUpdate without suggestion should return an error
|
||||||
|
if err := store.GuaranteedUpdate(ctx, preset[1].key, &example.Pod{}, false, nil, updateFunc, nil); !storage.IsInternalError(err) {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
// GuaranteedUpdate with suggestion should return an error if we don't change the object
|
||||||
|
if err := store.GuaranteedUpdate(ctx, preset[1].key, &example.Pod{}, false, nil, updateFunc, preset[1].obj); err == nil {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete fails with internal error.
|
||||||
|
if err := store.Delete(ctx, preset[1].key, &example.Pod{}, nil, storage.ValidateAllObjectFunc, nil); !storage.IsInternalError(err) {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if err := store.Get(ctx, preset[1].key, storage.GetOptions{}, &example.Pod{}); !storage.IsInternalError(err) {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func RunTestCount(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
resourceA := "/foo.bar.io/abc"
|
resourceA := "/foo.bar.io/abc"
|
||||||
|
|
||||||
// resourceA is intentionally a prefix of resourceB to ensure that the count
|
// resourceA is intentionally a prefix of resourceB to ensure that the count
|
||||||
@ -1304,7 +1600,7 @@ func RunTestCount(ctx context.Context, t *testing.T, store storage.Interface) {
|
|||||||
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i)}}
|
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i)}}
|
||||||
|
|
||||||
key := fmt.Sprintf("%s/%d", resourceA, i)
|
key := fmt.Sprintf("%s/%d", resourceA, i)
|
||||||
TestPropogateStoreWithKey(ctx, t, store, key, obj)
|
TestPropagateStoreWithKey(ctx, t, store, key, obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
resourceBCount := 4
|
resourceBCount := 4
|
||||||
@ -1312,7 +1608,7 @@ func RunTestCount(ctx context.Context, t *testing.T, store storage.Interface) {
|
|||||||
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i)}}
|
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i)}}
|
||||||
|
|
||||||
key := fmt.Sprintf("%s/%d", resourceB, i)
|
key := fmt.Sprintf("%s/%d", resourceB, i)
|
||||||
TestPropogateStoreWithKey(ctx, t, store, key, obj)
|
TestPropagateStoreWithKey(ctx, t, store, key, obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
resourceACountGot, err := store.Count(resourceA)
|
resourceACountGot, err := store.Count(resourceA)
|
||||||
|
@ -17,10 +17,12 @@ limitations under the License.
|
|||||||
package testing
|
package testing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"path"
|
"path"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -32,6 +34,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/apis/example"
|
"k8s.io/apiserver/pkg/apis/example"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
|
"k8s.io/apiserver/pkg/storage/value"
|
||||||
)
|
)
|
||||||
|
|
||||||
// CreateObj will create a single object using the storage interface.
|
// CreateObj will create a single object using the storage interface.
|
||||||
@ -79,16 +82,16 @@ func DeepEqualSafePodSpec() example.PodSpec {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestPropogateStore helps propagates store with objects, automates key generation, and returns
|
// TestPropagateStore helps propagates store with objects, automates key generation, and returns
|
||||||
// keys and stored objects.
|
// keys and stored objects.
|
||||||
func TestPropogateStore(ctx context.Context, t *testing.T, store storage.Interface, obj *example.Pod) (string, *example.Pod) {
|
func TestPropagateStore(ctx context.Context, t *testing.T, store storage.Interface, obj *example.Pod) (string, *example.Pod) {
|
||||||
// Setup store with a key and grab the output for returning.
|
// Setup store with a key and grab the output for returning.
|
||||||
key := "/testkey"
|
key := "/testkey"
|
||||||
return key, TestPropogateStoreWithKey(ctx, t, store, key, obj)
|
return key, TestPropagateStoreWithKey(ctx, t, store, key, obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestPropogateStoreWithKey helps propagate store with objects, the given object will be stored at the specified key.
|
// TestPropagateStoreWithKey helps propagate store with objects, the given object will be stored at the specified key.
|
||||||
func TestPropogateStoreWithKey(ctx context.Context, t *testing.T, store storage.Interface, key string, obj *example.Pod) *example.Pod {
|
func TestPropagateStoreWithKey(ctx context.Context, t *testing.T, store storage.Interface, key string, obj *example.Pod) *example.Pod {
|
||||||
// Setup store with the specified key and grab the output for returning.
|
// Setup store with the specified key and grab the output for returning.
|
||||||
err := store.Delete(ctx, key, &example.Pod{}, nil, storage.ValidateAllObjectFunc, nil)
|
err := store.Delete(ctx, key, &example.Pod{}, nil, storage.ValidateAllObjectFunc, nil)
|
||||||
if err != nil && !storage.IsNotFound(err) {
|
if err != nil && !storage.IsNotFound(err) {
|
||||||
@ -193,3 +196,46 @@ func ResourceVersionNotOlderThan(sentinel string) func(string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PrefixTransformer adds and verifies that all data has the correct prefix on its way in and out.
|
||||||
|
type PrefixTransformer struct {
|
||||||
|
prefix []byte
|
||||||
|
stale bool
|
||||||
|
err error
|
||||||
|
reads uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPrefixTransformer(prefix []byte, stale bool) *PrefixTransformer {
|
||||||
|
return &PrefixTransformer{
|
||||||
|
prefix: prefix,
|
||||||
|
stale: stale,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PrefixTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) {
|
||||||
|
atomic.AddUint64(&p.reads, 1)
|
||||||
|
if dataCtx == nil {
|
||||||
|
panic("no context provided")
|
||||||
|
}
|
||||||
|
if !bytes.HasPrefix(data, p.prefix) {
|
||||||
|
return nil, false, fmt.Errorf("value does not have expected prefix %q: %s,", p.prefix, string(data))
|
||||||
|
}
|
||||||
|
return bytes.TrimPrefix(data, p.prefix), p.stale, p.err
|
||||||
|
}
|
||||||
|
func (p *PrefixTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) {
|
||||||
|
if dataCtx == nil {
|
||||||
|
panic("no context provided")
|
||||||
|
}
|
||||||
|
if len(data) > 0 {
|
||||||
|
return append(append([]byte{}, p.prefix...), data...), p.err
|
||||||
|
}
|
||||||
|
return data, p.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PrefixTransformer) GetReads() uint64 {
|
||||||
|
return atomic.LoadUint64(&p.reads)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PrefixTransformer) ResetReads() {
|
||||||
|
atomic.StoreUint64(&p.reads, 0)
|
||||||
|
}
|
||||||
|
@ -122,7 +122,7 @@ func testWatch(ctx context.Context, t *testing.T, store storage.Interface, recur
|
|||||||
}
|
}
|
||||||
|
|
||||||
func RunTestDeleteTriggerWatch(ctx context.Context, t *testing.T, store storage.Interface) {
|
func RunTestDeleteTriggerWatch(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
key, storedObj := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
key, storedObj := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||||
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Watch failed: %v", err)
|
t.Fatalf("Watch failed: %v", err)
|
||||||
@ -134,7 +134,7 @@ func RunTestDeleteTriggerWatch(ctx context.Context, t *testing.T, store storage.
|
|||||||
}
|
}
|
||||||
|
|
||||||
func RunTestWatchFromNoneZero(ctx context.Context, t *testing.T, store storage.Interface) {
|
func RunTestWatchFromNoneZero(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
key, storedObj := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
key, storedObj := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||||
|
|
||||||
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -176,7 +176,7 @@ func RunTestWatchInitializationSignal(ctx context.Context, t *testing.T, store s
|
|||||||
initSignal := utilflowcontrol.NewInitializationSignal()
|
initSignal := utilflowcontrol.NewInitializationSignal()
|
||||||
ctx = utilflowcontrol.WithInitializationSignal(ctx, initSignal)
|
ctx = utilflowcontrol.WithInitializationSignal(ctx, initSignal)
|
||||||
|
|
||||||
key, storedObj := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
key, storedObj := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||||
_, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
_, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Watch failed: %v", err)
|
t.Fatalf("Watch failed: %v", err)
|
||||||
|
Loading…
Reference in New Issue
Block a user