mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
Merge pull request #109833 from stevekuznetsov/skuznets/move-simple-tests
storage/etcd3: factor tests to accept `storage.Interface`
This commit is contained in:
commit
f7457747f6
@ -21,7 +21,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math"
|
"math"
|
||||||
@ -41,14 +40,12 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/api/apitesting"
|
"k8s.io/apimachinery/pkg/api/apitesting"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/conversion"
|
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
|
||||||
"k8s.io/apiserver/pkg/apis/example"
|
"k8s.io/apiserver/pkg/apis/example"
|
||||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||||
"k8s.io/apiserver/pkg/features"
|
"k8s.io/apiserver/pkg/features"
|
||||||
@ -167,209 +164,27 @@ func checkStorageInvariants(ctx context.Context, t *testing.T, etcdClient *clien
|
|||||||
|
|
||||||
func TestCreateWithTTL(t *testing.T) {
|
func TestCreateWithTTL(t *testing.T) {
|
||||||
ctx, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
|
storagetesting.RunTestCreateWithTTL(ctx, t, store)
|
||||||
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
|
||||||
key := "/somekey"
|
|
||||||
|
|
||||||
out := &example.Pod{}
|
|
||||||
if err := store.Create(ctx, key, input, out, 1); err != nil {
|
|
||||||
t.Fatalf("Create failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: out.ResourceVersion, Predicate: storage.Everything})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Watch failed: %v", err)
|
|
||||||
}
|
|
||||||
testCheckEventType(t, watch.Deleted, w)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCreateWithKeyExist(t *testing.T) {
|
func TestCreateWithKeyExist(t *testing.T) {
|
||||||
ctx, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
storagetesting.RunTestCreateWithKeyExist(ctx, t, store)
|
||||||
key, _ := testPropogateStore(ctx, t, store, obj)
|
|
||||||
out := &example.Pod{}
|
|
||||||
err := store.Create(ctx, key, obj, out, 0)
|
|
||||||
if err == nil || !storage.IsExist(err) {
|
|
||||||
t.Errorf("expecting key exists error, but get: %s", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGet(t *testing.T) {
|
func TestGet(t *testing.T) {
|
||||||
ctx, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
// create an object to test
|
storagetesting.RunTestGet(ctx, t, store)
|
||||||
key, createdObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
|
||||||
// update the object once to allow get by exact resource version to be tested
|
|
||||||
updateObj := createdObj.DeepCopy()
|
|
||||||
updateObj.Annotations = map[string]string{"test-annotation": "1"}
|
|
||||||
storedObj := &example.Pod{}
|
|
||||||
err := store.GuaranteedUpdate(ctx, key, storedObj, true, nil,
|
|
||||||
func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
|
||||||
ttl := uint64(1)
|
|
||||||
return updateObj, &ttl, nil
|
|
||||||
}, nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Update failed: %v", err)
|
|
||||||
}
|
|
||||||
// create an additional object to increment the resource version for pods above the resource version of the foo object
|
|
||||||
lastUpdatedObj := &example.Pod{}
|
|
||||||
if err := store.Create(ctx, "bar", &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, lastUpdatedObj, 0); err != nil {
|
|
||||||
t.Fatalf("Set failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
currentRV, _ := strconv.Atoi(storedObj.ResourceVersion)
|
|
||||||
lastUpdatedCurrentRV, _ := strconv.Atoi(lastUpdatedObj.ResourceVersion)
|
|
||||||
|
|
||||||
// TODO(jpbetz): Add exact test cases
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
key string
|
|
||||||
ignoreNotFound bool
|
|
||||||
expectNotFoundErr bool
|
|
||||||
expectRVTooLarge bool
|
|
||||||
expectedOut *example.Pod
|
|
||||||
rv string
|
|
||||||
}{{ // test get on existing item
|
|
||||||
name: "get existing",
|
|
||||||
key: key,
|
|
||||||
ignoreNotFound: false,
|
|
||||||
expectNotFoundErr: false,
|
|
||||||
expectedOut: storedObj,
|
|
||||||
}, { // test get on existing item with resource version set to 0
|
|
||||||
name: "resource version 0",
|
|
||||||
key: key,
|
|
||||||
expectedOut: storedObj,
|
|
||||||
rv: "0",
|
|
||||||
}, { // test get on existing item with resource version set to the resource version is was created on
|
|
||||||
name: "object created resource version",
|
|
||||||
key: key,
|
|
||||||
expectedOut: storedObj,
|
|
||||||
rv: createdObj.ResourceVersion,
|
|
||||||
}, { // test get on existing item with resource version set to current resource version of the object
|
|
||||||
name: "current object resource version, match=NotOlderThan",
|
|
||||||
key: key,
|
|
||||||
expectedOut: storedObj,
|
|
||||||
rv: fmt.Sprintf("%d", currentRV),
|
|
||||||
}, { // test get on existing item with resource version set to latest pod resource version
|
|
||||||
name: "latest resource version",
|
|
||||||
key: key,
|
|
||||||
expectedOut: storedObj,
|
|
||||||
rv: fmt.Sprintf("%d", lastUpdatedCurrentRV),
|
|
||||||
}, { // test get on existing item with resource version set too high
|
|
||||||
name: "too high resource version",
|
|
||||||
key: key,
|
|
||||||
expectRVTooLarge: true,
|
|
||||||
rv: strconv.FormatInt(math.MaxInt64, 10),
|
|
||||||
}, { // test get on non-existing item with ignoreNotFound=false
|
|
||||||
name: "get non-existing",
|
|
||||||
key: "/non-existing",
|
|
||||||
ignoreNotFound: false,
|
|
||||||
expectNotFoundErr: true,
|
|
||||||
}, { // test get on non-existing item with ignoreNotFound=true
|
|
||||||
name: "get non-existing, ignore not found",
|
|
||||||
key: "/non-existing",
|
|
||||||
ignoreNotFound: true,
|
|
||||||
expectNotFoundErr: false,
|
|
||||||
expectedOut: &example.Pod{},
|
|
||||||
}}
|
|
||||||
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
out := &example.Pod{}
|
|
||||||
err := store.Get(ctx, tt.key, storage.GetOptions{IgnoreNotFound: tt.ignoreNotFound, ResourceVersion: tt.rv}, out)
|
|
||||||
if tt.expectNotFoundErr {
|
|
||||||
if err == nil || !storage.IsNotFound(err) {
|
|
||||||
t.Errorf("expecting not found error, but get: %v", err)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if tt.expectRVTooLarge {
|
|
||||||
if err == nil || !storage.IsTooLargeResourceVersion(err) {
|
|
||||||
t.Errorf("expecting resource version too high error, but get: %v", err)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Get failed: %v", err)
|
|
||||||
}
|
|
||||||
expectNoDiff(t, fmt.Sprintf("%s: incorrect pod", tt.name), tt.expectedOut, out)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUnconditionalDelete(t *testing.T) {
|
func TestUnconditionalDelete(t *testing.T) {
|
||||||
ctx, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
storagetesting.RunTestUnconditionalDelete(ctx, t, store)
|
||||||
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
key string
|
|
||||||
expectedObj *example.Pod
|
|
||||||
expectNotFoundErr bool
|
|
||||||
}{{
|
|
||||||
name: "existing key",
|
|
||||||
key: key,
|
|
||||||
expectedObj: storedObj,
|
|
||||||
expectNotFoundErr: false,
|
|
||||||
}, {
|
|
||||||
name: "non-existing key",
|
|
||||||
key: "/non-existing",
|
|
||||||
expectedObj: nil,
|
|
||||||
expectNotFoundErr: true,
|
|
||||||
}}
|
|
||||||
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
out := &example.Pod{} // reset
|
|
||||||
err := store.Delete(ctx, tt.key, out, nil, storage.ValidateAllObjectFunc, nil)
|
|
||||||
if tt.expectNotFoundErr {
|
|
||||||
if err == nil || !storage.IsNotFound(err) {
|
|
||||||
t.Errorf("%s: expecting not found error, but get: %s", tt.name, err)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("%s: Delete failed: %v", tt.name, err)
|
|
||||||
}
|
|
||||||
expectNoDiff(t, fmt.Sprintf("%s: incorrect pod:", tt.name), tt.expectedObj, out)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConditionalDelete(t *testing.T) {
|
func TestConditionalDelete(t *testing.T) {
|
||||||
ctx, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
|
storagetesting.RunTestConditionalDelete(ctx, t, store)
|
||||||
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
precondition *storage.Preconditions
|
|
||||||
expectInvalidObjErr bool
|
|
||||||
}{{
|
|
||||||
name: "UID match",
|
|
||||||
precondition: storage.NewUIDPreconditions("A"),
|
|
||||||
expectInvalidObjErr: false,
|
|
||||||
}, {
|
|
||||||
name: "UID mismatch",
|
|
||||||
precondition: storage.NewUIDPreconditions("B"),
|
|
||||||
expectInvalidObjErr: true,
|
|
||||||
}}
|
|
||||||
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
out := &example.Pod{}
|
|
||||||
err := store.Delete(ctx, key, out, tt.precondition, storage.ValidateAllObjectFunc, nil)
|
|
||||||
if tt.expectInvalidObjErr {
|
|
||||||
if err == nil || !storage.IsInvalidObj(err) {
|
|
||||||
t.Errorf("%s: expecting invalid UID error, but get: %s", tt.name, err)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("%s: Delete failed: %v", tt.name, err)
|
|
||||||
}
|
|
||||||
expectNoDiff(t, fmt.Sprintf("%s: incorrect pod", tt.name), storedObj, out)
|
|
||||||
key, storedObj = testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// The following set of Delete tests are testing the logic of adding `suggestion`
|
// The following set of Delete tests are testing the logic of adding `suggestion`
|
||||||
@ -399,275 +214,32 @@ func TestConditionalDelete(t *testing.T) {
|
|||||||
|
|
||||||
func TestDeleteWithSuggestion(t *testing.T) {
|
func TestDeleteWithSuggestion(t *testing.T) {
|
||||||
ctx, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
|
storagetesting.RunTestDeleteWithSuggestion(ctx, t, store)
|
||||||
key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
|
||||||
|
|
||||||
out := &example.Pod{}
|
|
||||||
if err := store.Delete(ctx, key, out, nil, storage.ValidateAllObjectFunc, originalPod); err != nil {
|
|
||||||
t.Errorf("Unexpected failure during deletion: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := store.Get(ctx, key, storage.GetOptions{}, &example.Pod{}); !storage.IsNotFound(err) {
|
|
||||||
t.Errorf("Unexpected error on reading object: %v", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDeleteWithSuggestionAndConflict(t *testing.T) {
|
func TestDeleteWithSuggestionAndConflict(t *testing.T) {
|
||||||
ctx, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
|
storagetesting.RunTestDeleteWithSuggestionAndConflict(ctx, t, store)
|
||||||
key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
|
||||||
|
|
||||||
// First update, so originalPod is outdated.
|
|
||||||
updatedPod := &example.Pod{}
|
|
||||||
if err := store.GuaranteedUpdate(ctx, key, updatedPod, false, nil,
|
|
||||||
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
|
||||||
pod := obj.(*example.Pod)
|
|
||||||
pod.ObjectMeta.Labels = map[string]string{"foo": "bar"}
|
|
||||||
return pod, nil
|
|
||||||
}), nil); err != nil {
|
|
||||||
t.Errorf("Unexpected failure during updated: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
out := &example.Pod{}
|
|
||||||
if err := store.Delete(ctx, key, out, nil, storage.ValidateAllObjectFunc, originalPod); err != nil {
|
|
||||||
t.Errorf("Unexpected failure during deletion: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := store.Get(ctx, key, storage.GetOptions{}, &example.Pod{}); !storage.IsNotFound(err) {
|
|
||||||
t.Errorf("Unexpected error on reading object: %v", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDeleteWithSuggestionOfDeletedObject(t *testing.T) {
|
func TestDeleteWithSuggestionOfDeletedObject(t *testing.T) {
|
||||||
ctx, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
|
storagetesting.RunTestDeleteWithSuggestionOfDeletedObject(ctx, t, store)
|
||||||
key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
|
||||||
|
|
||||||
// First delete, so originalPod is outdated.
|
|
||||||
deletedPod := &example.Pod{}
|
|
||||||
if err := store.Delete(ctx, key, deletedPod, nil, storage.ValidateAllObjectFunc, originalPod); err != nil {
|
|
||||||
t.Errorf("Unexpected failure during deletion: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now try deleting with stale object.
|
|
||||||
out := &example.Pod{}
|
|
||||||
if err := store.Delete(ctx, key, out, nil, storage.ValidateAllObjectFunc, originalPod); !storage.IsNotFound(err) {
|
|
||||||
t.Errorf("Unexpected error during deletion: %v, expected not-found", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestValidateDeletionWithSuggestion(t *testing.T) {
|
func TestValidateDeletionWithSuggestion(t *testing.T) {
|
||||||
ctx, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
|
storagetesting.RunTestValidateDeletionWithSuggestion(ctx, t, store)
|
||||||
key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
|
||||||
|
|
||||||
// Check that validaing fresh object fails is called once and fails.
|
|
||||||
validationCalls := 0
|
|
||||||
validationError := fmt.Errorf("validation error")
|
|
||||||
validateNothing := func(_ context.Context, _ runtime.Object) error {
|
|
||||||
validationCalls++
|
|
||||||
return validationError
|
|
||||||
}
|
|
||||||
out := &example.Pod{}
|
|
||||||
if err := store.Delete(ctx, key, out, nil, validateNothing, originalPod); err != validationError {
|
|
||||||
t.Errorf("Unexpected failure during deletion: %v", err)
|
|
||||||
}
|
|
||||||
if validationCalls != 1 {
|
|
||||||
t.Errorf("validate function should have been called once, called %d", validationCalls)
|
|
||||||
}
|
|
||||||
|
|
||||||
// First update, so originalPod is outdated.
|
|
||||||
updatedPod := &example.Pod{}
|
|
||||||
if err := store.GuaranteedUpdate(ctx, key, updatedPod, false, nil,
|
|
||||||
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
|
||||||
pod := obj.(*example.Pod)
|
|
||||||
pod.ObjectMeta.Labels = map[string]string{"foo": "bar"}
|
|
||||||
return pod, nil
|
|
||||||
}), nil); err != nil {
|
|
||||||
t.Errorf("Unexpected failure during updated: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
calls := 0
|
|
||||||
validateFresh := func(_ context.Context, obj runtime.Object) error {
|
|
||||||
calls++
|
|
||||||
pod := obj.(*example.Pod)
|
|
||||||
if pod.ObjectMeta.Labels == nil || pod.ObjectMeta.Labels["foo"] != "bar" {
|
|
||||||
return fmt.Errorf("stale object")
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := store.Delete(ctx, key, out, nil, validateFresh, originalPod); err != nil {
|
|
||||||
t.Errorf("Unexpected failure during deletion: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if calls != 2 {
|
|
||||||
t.Errorf("validate function should have been called twice, called %d", calls)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := store.Get(ctx, key, storage.GetOptions{}, &example.Pod{}); !storage.IsNotFound(err) {
|
|
||||||
t.Errorf("Unexpected error on reading object: %v", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPreconditionalDeleteWithSuggestion(t *testing.T) {
|
func TestPreconditionalDeleteWithSuggestion(t *testing.T) {
|
||||||
ctx, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
|
storagetesting.RunTestPreconditionalDeleteWithSuggestion(ctx, t, store)
|
||||||
key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
|
||||||
|
|
||||||
// First update, so originalPod is outdated.
|
|
||||||
updatedPod := &example.Pod{}
|
|
||||||
if err := store.GuaranteedUpdate(ctx, key, updatedPod, false, nil,
|
|
||||||
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
|
||||||
pod := obj.(*example.Pod)
|
|
||||||
pod.ObjectMeta.UID = "myUID"
|
|
||||||
return pod, nil
|
|
||||||
}), nil); err != nil {
|
|
||||||
t.Errorf("Unexpected failure during updated: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
prec := storage.NewUIDPreconditions("myUID")
|
|
||||||
|
|
||||||
out := &example.Pod{}
|
|
||||||
if err := store.Delete(ctx, key, out, prec, storage.ValidateAllObjectFunc, originalPod); err != nil {
|
|
||||||
t.Errorf("Unexpected failure during deletion: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := store.Get(ctx, key, storage.GetOptions{}, &example.Pod{}); !storage.IsNotFound(err) {
|
|
||||||
t.Errorf("Unexpected error on reading object: %v", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetListNonRecursive(t *testing.T) {
|
func TestGetListNonRecursive(t *testing.T) {
|
||||||
ctx, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
prevKey, prevStoredObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "prev"}})
|
storagetesting.RunTestGetListNonRecursive(ctx, t, store)
|
||||||
|
|
||||||
prevRV, _ := strconv.Atoi(prevStoredObj.ResourceVersion)
|
|
||||||
|
|
||||||
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
|
||||||
|
|
||||||
currentRV, _ := strconv.Atoi(storedObj.ResourceVersion)
|
|
||||||
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
key string
|
|
||||||
pred storage.SelectionPredicate
|
|
||||||
expectedOut []*example.Pod
|
|
||||||
rv string
|
|
||||||
rvMatch metav1.ResourceVersionMatch
|
|
||||||
expectRVTooLarge bool
|
|
||||||
}{{
|
|
||||||
name: "existing key",
|
|
||||||
key: key,
|
|
||||||
pred: storage.Everything,
|
|
||||||
expectedOut: []*example.Pod{storedObj},
|
|
||||||
}, {
|
|
||||||
name: "existing key, resourceVersion=0",
|
|
||||||
key: key,
|
|
||||||
pred: storage.Everything,
|
|
||||||
expectedOut: []*example.Pod{storedObj},
|
|
||||||
rv: "0",
|
|
||||||
}, {
|
|
||||||
name: "existing key, resourceVersion=0, resourceVersionMatch=notOlderThan",
|
|
||||||
key: key,
|
|
||||||
pred: storage.Everything,
|
|
||||||
expectedOut: []*example.Pod{storedObj},
|
|
||||||
rv: "0",
|
|
||||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
|
||||||
}, {
|
|
||||||
name: "existing key, resourceVersion=current",
|
|
||||||
key: key,
|
|
||||||
pred: storage.Everything,
|
|
||||||
expectedOut: []*example.Pod{storedObj},
|
|
||||||
rv: fmt.Sprintf("%d", currentRV),
|
|
||||||
}, {
|
|
||||||
name: "existing key, resourceVersion=current, resourceVersionMatch=notOlderThan",
|
|
||||||
key: key,
|
|
||||||
pred: storage.Everything,
|
|
||||||
expectedOut: []*example.Pod{storedObj},
|
|
||||||
rv: fmt.Sprintf("%d", currentRV),
|
|
||||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
|
||||||
}, {
|
|
||||||
name: "existing key, resourceVersion=previous, resourceVersionMatch=notOlderThan",
|
|
||||||
key: key,
|
|
||||||
pred: storage.Everything,
|
|
||||||
expectedOut: []*example.Pod{storedObj},
|
|
||||||
rv: fmt.Sprintf("%d", prevRV),
|
|
||||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
|
||||||
}, {
|
|
||||||
name: "existing key, resourceVersion=current, resourceVersionMatch=exact",
|
|
||||||
key: key,
|
|
||||||
pred: storage.Everything,
|
|
||||||
expectedOut: []*example.Pod{storedObj},
|
|
||||||
rv: fmt.Sprintf("%d", currentRV),
|
|
||||||
rvMatch: metav1.ResourceVersionMatchExact,
|
|
||||||
}, {
|
|
||||||
name: "existing key, resourceVersion=current, resourceVersionMatch=exact",
|
|
||||||
key: prevKey,
|
|
||||||
pred: storage.Everything,
|
|
||||||
expectedOut: []*example.Pod{prevStoredObj},
|
|
||||||
rv: fmt.Sprintf("%d", prevRV),
|
|
||||||
rvMatch: metav1.ResourceVersionMatchExact,
|
|
||||||
}, {
|
|
||||||
name: "existing key, resourceVersion=too high",
|
|
||||||
key: key,
|
|
||||||
pred: storage.Everything,
|
|
||||||
expectedOut: []*example.Pod{storedObj},
|
|
||||||
rv: strconv.FormatInt(math.MaxInt64, 10),
|
|
||||||
expectRVTooLarge: true,
|
|
||||||
}, {
|
|
||||||
name: "non-existing key",
|
|
||||||
key: "/non-existing",
|
|
||||||
pred: storage.Everything,
|
|
||||||
expectedOut: nil,
|
|
||||||
}, {
|
|
||||||
name: "with matching pod name",
|
|
||||||
key: "/non-existing",
|
|
||||||
pred: storage.SelectionPredicate{
|
|
||||||
Label: labels.Everything(),
|
|
||||||
Field: fields.ParseSelectorOrDie("metadata.name!=" + storedObj.Name),
|
|
||||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
|
||||||
pod := obj.(*example.Pod)
|
|
||||||
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
|
||||||
},
|
|
||||||
},
|
|
||||||
expectedOut: nil,
|
|
||||||
}}
|
|
||||||
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
out := &example.PodList{}
|
|
||||||
storageOpts := storage.ListOptions{
|
|
||||||
ResourceVersion: tt.rv,
|
|
||||||
ResourceVersionMatch: tt.rvMatch,
|
|
||||||
Predicate: tt.pred,
|
|
||||||
Recursive: false,
|
|
||||||
}
|
|
||||||
err := store.GetList(ctx, tt.key, storageOpts, out)
|
|
||||||
|
|
||||||
if tt.expectRVTooLarge {
|
|
||||||
if err == nil || !storage.IsTooLargeResourceVersion(err) {
|
|
||||||
t.Errorf("%s: expecting resource version too high error, but get: %s", tt.name, err)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("GetList failed: %v", err)
|
|
||||||
}
|
|
||||||
if len(out.ResourceVersion) == 0 {
|
|
||||||
t.Errorf("%s: unset resourceVersion", tt.name)
|
|
||||||
}
|
|
||||||
if len(out.Items) != len(tt.expectedOut) {
|
|
||||||
t.Errorf("%s: length of list want=%d, get=%d", tt.name, len(tt.expectedOut), len(out.Items))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for j, wantPod := range tt.expectedOut {
|
|
||||||
getPod := &out.Items[j]
|
|
||||||
expectNoDiff(t, fmt.Sprintf("%s: incorrect pod", tt.name), wantPod, getPod)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGuaranteedUpdate(t *testing.T) {
|
func TestGuaranteedUpdate(t *testing.T) {
|
||||||
@ -754,7 +326,7 @@ func TestGuaranteedUpdate(t *testing.T) {
|
|||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
key, storeObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
|
key, storeObj := storagetesting.TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
|
||||||
|
|
||||||
out := &example.Pod{}
|
out := &example.Pod{}
|
||||||
name := fmt.Sprintf("foo-%d", i)
|
name := fmt.Sprintf("foo-%d", i)
|
||||||
@ -825,30 +397,11 @@ func TestGuaranteedUpdate(t *testing.T) {
|
|||||||
|
|
||||||
func TestGuaranteedUpdateWithTTL(t *testing.T) {
|
func TestGuaranteedUpdateWithTTL(t *testing.T) {
|
||||||
ctx, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
|
storagetesting.RunTestGuaranteedUpdateWithTTL(ctx, t, store)
|
||||||
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
|
||||||
key := "/somekey"
|
|
||||||
|
|
||||||
out := &example.Pod{}
|
|
||||||
err := store.GuaranteedUpdate(ctx, key, out, true, nil,
|
|
||||||
func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
|
||||||
ttl := uint64(1)
|
|
||||||
return input, &ttl, nil
|
|
||||||
}, nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Create failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: out.ResourceVersion, Predicate: storage.Everything})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Watch failed: %v", err)
|
|
||||||
}
|
|
||||||
testCheckEventType(t, watch.Deleted, w)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGuaranteedUpdateChecksStoredData(t *testing.T) {
|
func TestGuaranteedUpdateChecksStoredData(t *testing.T) {
|
||||||
ctx, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
|
|
||||||
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||||
key := "/somekey"
|
key := "/somekey"
|
||||||
|
|
||||||
@ -912,124 +465,12 @@ func TestGuaranteedUpdateChecksStoredData(t *testing.T) {
|
|||||||
|
|
||||||
func TestGuaranteedUpdateWithConflict(t *testing.T) {
|
func TestGuaranteedUpdateWithConflict(t *testing.T) {
|
||||||
ctx, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
key, _ := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
storagetesting.RunTestGuaranteedUpdateWithConflict(ctx, t, store)
|
||||||
|
|
||||||
errChan := make(chan error, 1)
|
|
||||||
var firstToFinish sync.WaitGroup
|
|
||||||
var secondToEnter sync.WaitGroup
|
|
||||||
firstToFinish.Add(1)
|
|
||||||
secondToEnter.Add(1)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
err := store.GuaranteedUpdate(ctx, key, &example.Pod{}, false, nil,
|
|
||||||
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
|
||||||
pod := obj.(*example.Pod)
|
|
||||||
pod.Name = "foo-1"
|
|
||||||
secondToEnter.Wait()
|
|
||||||
return pod, nil
|
|
||||||
}), nil)
|
|
||||||
firstToFinish.Done()
|
|
||||||
errChan <- err
|
|
||||||
}()
|
|
||||||
|
|
||||||
updateCount := 0
|
|
||||||
err := store.GuaranteedUpdate(ctx, key, &example.Pod{}, false, nil,
|
|
||||||
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
|
||||||
if updateCount == 0 {
|
|
||||||
secondToEnter.Done()
|
|
||||||
firstToFinish.Wait()
|
|
||||||
}
|
|
||||||
updateCount++
|
|
||||||
pod := obj.(*example.Pod)
|
|
||||||
pod.Name = "foo-2"
|
|
||||||
return pod, nil
|
|
||||||
}), nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Second GuaranteedUpdate error %#v", err)
|
|
||||||
}
|
|
||||||
if err := <-errChan; err != nil {
|
|
||||||
t.Fatalf("First GuaranteedUpdate error %#v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if updateCount != 2 {
|
|
||||||
t.Errorf("Should have conflict and called update func twice")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGuaranteedUpdateWithSuggestionAndConflict(t *testing.T) {
|
func TestGuaranteedUpdateWithSuggestionAndConflict(t *testing.T) {
|
||||||
ctx, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
storagetesting.RunTestGuaranteedUpdateWithSuggestionAndConflict(ctx, t, store)
|
||||||
|
|
||||||
// First, update without a suggestion so originalPod is outdated
|
|
||||||
updatedPod := &example.Pod{}
|
|
||||||
err := store.GuaranteedUpdate(ctx, key, updatedPod, false, nil,
|
|
||||||
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
|
||||||
pod := obj.(*example.Pod)
|
|
||||||
pod.Name = "foo-2"
|
|
||||||
return pod, nil
|
|
||||||
}),
|
|
||||||
nil,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Second, update using the outdated originalPod as the suggestion. Return a conflict error when
|
|
||||||
// passed originalPod, and make sure that SimpleUpdate is called a second time after a live lookup
|
|
||||||
// with the value of updatedPod.
|
|
||||||
sawConflict := false
|
|
||||||
updatedPod2 := &example.Pod{}
|
|
||||||
err = store.GuaranteedUpdate(ctx, key, updatedPod2, false, nil,
|
|
||||||
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
|
||||||
pod := obj.(*example.Pod)
|
|
||||||
if pod.Name != "foo-2" {
|
|
||||||
if sawConflict {
|
|
||||||
t.Fatalf("unexpected second conflict")
|
|
||||||
}
|
|
||||||
sawConflict = true
|
|
||||||
// simulated stale object - return a conflict
|
|
||||||
return nil, apierrors.NewConflict(example.SchemeGroupVersion.WithResource("pods").GroupResource(), "name", errors.New("foo"))
|
|
||||||
}
|
|
||||||
pod.Name = "foo-3"
|
|
||||||
return pod, nil
|
|
||||||
}),
|
|
||||||
originalPod,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
if updatedPod2.Name != "foo-3" {
|
|
||||||
t.Errorf("unexpected pod name: %q", updatedPod2.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Third, update using a current version as the suggestion.
|
|
||||||
// Return an error and make sure that SimpleUpdate is NOT called a second time,
|
|
||||||
// since the live lookup shows the suggestion was already up to date.
|
|
||||||
attempts := 0
|
|
||||||
updatedPod3 := &example.Pod{}
|
|
||||||
err = store.GuaranteedUpdate(ctx, key, updatedPod3, false, nil,
|
|
||||||
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
|
||||||
pod := obj.(*example.Pod)
|
|
||||||
if pod.Name != updatedPod2.Name || pod.ResourceVersion != updatedPod2.ResourceVersion {
|
|
||||||
t.Errorf(
|
|
||||||
"unexpected live object (name=%s, rv=%s), expected name=%s, rv=%s",
|
|
||||||
pod.Name,
|
|
||||||
pod.ResourceVersion,
|
|
||||||
updatedPod2.Name,
|
|
||||||
updatedPod2.ResourceVersion,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
attempts++
|
|
||||||
return nil, fmt.Errorf("validation or admission error")
|
|
||||||
}),
|
|
||||||
updatedPod2,
|
|
||||||
)
|
|
||||||
if err == nil {
|
|
||||||
t.Fatalf("expected error, got none")
|
|
||||||
}
|
|
||||||
if attempts != 1 {
|
|
||||||
t.Errorf("expected 1 attempt, got %d", attempts)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTransformationFailure(t *testing.T) {
|
func TestTransformationFailure(t *testing.T) {
|
||||||
@ -1619,7 +1060,7 @@ func TestList(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for j, wantPod := range tt.expectedOut {
|
for j, wantPod := range tt.expectedOut {
|
||||||
getPod := &out.Items[j]
|
getPod := &out.Items[j]
|
||||||
expectNoDiff(t, fmt.Sprintf("%s: incorrect pod", tt.name), wantPod, getPod)
|
storagetesting.ExpectNoDiff(t, fmt.Sprintf("%s: incorrect pod", tt.name), wantPod, getPod)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -1695,7 +1136,7 @@ func TestListContinuation(t *testing.T) {
|
|||||||
if len(out.Continue) == 0 {
|
if len(out.Continue) == 0 {
|
||||||
t.Fatalf("No continuation token set")
|
t.Fatalf("No continuation token set")
|
||||||
}
|
}
|
||||||
expectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj}, out.Items)
|
storagetesting.ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj}, out.Items)
|
||||||
if transformer.reads != 1 {
|
if transformer.reads != 1 {
|
||||||
t.Errorf("unexpected reads: %d", transformer.reads)
|
t.Errorf("unexpected reads: %d", transformer.reads)
|
||||||
}
|
}
|
||||||
@ -1722,7 +1163,7 @@ func TestListContinuation(t *testing.T) {
|
|||||||
}
|
}
|
||||||
key, rv, err := decodeContinue(continueFromSecondItem, "/")
|
key, rv, err := decodeContinue(continueFromSecondItem, "/")
|
||||||
t.Logf("continue token was %d %s %v", rv, key, err)
|
t.Logf("continue token was %d %s %v", rv, key, err)
|
||||||
expectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj, *preset[2].storedObj}, out.Items)
|
storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj, *preset[2].storedObj}, out.Items)
|
||||||
if transformer.reads != 2 {
|
if transformer.reads != 2 {
|
||||||
t.Errorf("unexpected reads: %d", transformer.reads)
|
t.Errorf("unexpected reads: %d", transformer.reads)
|
||||||
}
|
}
|
||||||
@ -1745,7 +1186,7 @@ func TestListContinuation(t *testing.T) {
|
|||||||
if len(out.Continue) == 0 {
|
if len(out.Continue) == 0 {
|
||||||
t.Fatalf("No continuation token set")
|
t.Fatalf("No continuation token set")
|
||||||
}
|
}
|
||||||
expectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items)
|
storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items)
|
||||||
if transformer.reads != 1 {
|
if transformer.reads != 1 {
|
||||||
t.Errorf("unexpected reads: %d", transformer.reads)
|
t.Errorf("unexpected reads: %d", transformer.reads)
|
||||||
}
|
}
|
||||||
@ -1769,7 +1210,7 @@ func TestListContinuation(t *testing.T) {
|
|||||||
if len(out.Continue) != 0 {
|
if len(out.Continue) != 0 {
|
||||||
t.Fatalf("Unexpected continuation token set")
|
t.Fatalf("Unexpected continuation token set")
|
||||||
}
|
}
|
||||||
expectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items)
|
storagetesting.ExpectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items)
|
||||||
if transformer.reads != 1 {
|
if transformer.reads != 1 {
|
||||||
t.Errorf("unexpected reads: %d", transformer.reads)
|
t.Errorf("unexpected reads: %d", transformer.reads)
|
||||||
}
|
}
|
||||||
@ -1917,7 +1358,7 @@ func TestListContinuationWithFilter(t *testing.T) {
|
|||||||
if len(out.Continue) == 0 {
|
if len(out.Continue) == 0 {
|
||||||
t.Errorf("No continuation token set")
|
t.Errorf("No continuation token set")
|
||||||
}
|
}
|
||||||
expectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj, *preset[2].storedObj}, out.Items)
|
storagetesting.ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj, *preset[2].storedObj}, out.Items)
|
||||||
if transformer.reads != 3 {
|
if transformer.reads != 3 {
|
||||||
t.Errorf("unexpected reads: %d", transformer.reads)
|
t.Errorf("unexpected reads: %d", transformer.reads)
|
||||||
}
|
}
|
||||||
@ -1949,7 +1390,7 @@ func TestListContinuationWithFilter(t *testing.T) {
|
|||||||
if len(out.Continue) != 0 {
|
if len(out.Continue) != 0 {
|
||||||
t.Errorf("Unexpected continuation token set")
|
t.Errorf("Unexpected continuation token set")
|
||||||
}
|
}
|
||||||
expectNoDiff(t, "incorrect second page", []example.Pod{*preset[3].storedObj}, out.Items)
|
storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[3].storedObj}, out.Items)
|
||||||
if transformer.reads != 1 {
|
if transformer.reads != 1 {
|
||||||
t.Errorf("unexpected reads: %d", transformer.reads)
|
t.Errorf("unexpected reads: %d", transformer.reads)
|
||||||
}
|
}
|
||||||
@ -2027,7 +1468,7 @@ func TestListInconsistentContinuation(t *testing.T) {
|
|||||||
if len(out.Continue) == 0 {
|
if len(out.Continue) == 0 {
|
||||||
t.Fatalf("No continuation token set")
|
t.Fatalf("No continuation token set")
|
||||||
}
|
}
|
||||||
expectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj}, out.Items)
|
storagetesting.ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj}, out.Items)
|
||||||
|
|
||||||
continueFromSecondItem := out.Continue
|
continueFromSecondItem := out.Continue
|
||||||
|
|
||||||
@ -2094,7 +1535,7 @@ func TestListInconsistentContinuation(t *testing.T) {
|
|||||||
t.Fatalf("No continuation token set")
|
t.Fatalf("No continuation token set")
|
||||||
}
|
}
|
||||||
validateResourceVersion := resourceVersionNotOlderThan(lastRVString)
|
validateResourceVersion := resourceVersionNotOlderThan(lastRVString)
|
||||||
expectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items)
|
storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items)
|
||||||
if err := validateResourceVersion(out.ResourceVersion); err != nil {
|
if err := validateResourceVersion(out.ResourceVersion); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -2112,7 +1553,7 @@ func TestListInconsistentContinuation(t *testing.T) {
|
|||||||
if len(out.Continue) != 0 {
|
if len(out.Continue) != 0 {
|
||||||
t.Fatalf("Unexpected continuation token set")
|
t.Fatalf("Unexpected continuation token set")
|
||||||
}
|
}
|
||||||
expectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items)
|
storagetesting.ExpectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items)
|
||||||
if out.ResourceVersion != resolvedResourceVersionFromThirdItem {
|
if out.ResourceVersion != resolvedResourceVersionFromThirdItem {
|
||||||
t.Fatalf("Expected list resource version to be %s, got %s", resolvedResourceVersionFromThirdItem, out.ResourceVersion)
|
t.Fatalf("Expected list resource version to be %s, got %s", resolvedResourceVersionFromThirdItem, out.ResourceVersion)
|
||||||
}
|
}
|
||||||
@ -2226,32 +1667,6 @@ func testSetup(t *testing.T, opts ...setupOption) (context.Context, *store, *cli
|
|||||||
return ctx, store, client
|
return ctx, store, client
|
||||||
}
|
}
|
||||||
|
|
||||||
// testPropogateStore helps propagates store with objects, automates key generation, and returns
|
|
||||||
// keys and stored objects.
|
|
||||||
func testPropogateStore(ctx context.Context, t *testing.T, store *store, obj *example.Pod) (string, *example.Pod) {
|
|
||||||
// Setup store with a key and grab the output for returning.
|
|
||||||
key := "/testkey"
|
|
||||||
return key, testPropogateStoreWithKey(ctx, t, store, key, obj)
|
|
||||||
}
|
|
||||||
|
|
||||||
// testPropogateStoreWithKey helps propagate store with objects, the given object will be stored at the specified key.
|
|
||||||
func testPropogateStoreWithKey(ctx context.Context, t *testing.T, store *store, key string, obj *example.Pod) *example.Pod {
|
|
||||||
// Setup store with the specified key and grab the output for returning.
|
|
||||||
v, err := conversion.EnforcePtr(obj)
|
|
||||||
if err != nil {
|
|
||||||
panic("unable to convert output object to pointer")
|
|
||||||
}
|
|
||||||
err = store.conditionalDelete(ctx, key, &example.Pod{}, v, nil, storage.ValidateAllObjectFunc, nil)
|
|
||||||
if err != nil && !storage.IsNotFound(err) {
|
|
||||||
t.Fatalf("Cleanup failed: %v", err)
|
|
||||||
}
|
|
||||||
setOutput := &example.Pod{}
|
|
||||||
if err := store.Create(ctx, key, obj, setOutput, 0); err != nil {
|
|
||||||
t.Fatalf("Set failed: %v", err)
|
|
||||||
}
|
|
||||||
return setOutput
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPrefix(t *testing.T) {
|
func TestPrefix(t *testing.T) {
|
||||||
testcases := map[string]string{
|
testcases := map[string]string{
|
||||||
"custom/prefix": "/custom/prefix",
|
"custom/prefix": "/custom/prefix",
|
||||||
@ -2465,7 +1880,7 @@ func TestConsistentList(t *testing.T) {
|
|||||||
t.Fatalf("failed to list objects: %v", err)
|
t.Fatalf("failed to list objects: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
expectNoDiff(t, "incorrect lists", result1, result2)
|
storagetesting.ExpectNoDiff(t, "incorrect lists", result1, result2)
|
||||||
|
|
||||||
// Now also verify the ResourceVersionMatchNotOlderThan.
|
// Now also verify the ResourceVersionMatchNotOlderThan.
|
||||||
options.ResourceVersionMatch = metav1.ResourceVersionMatchNotOlderThan
|
options.ResourceVersionMatch = metav1.ResourceVersionMatchNotOlderThan
|
||||||
@ -2483,44 +1898,12 @@ func TestConsistentList(t *testing.T) {
|
|||||||
t.Fatalf("failed to list objects: %v", err)
|
t.Fatalf("failed to list objects: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
expectNoDiff(t, "incorrect lists", result3, result4)
|
storagetesting.ExpectNoDiff(t, "incorrect lists", result3, result4)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCount(t *testing.T) {
|
func TestCount(t *testing.T) {
|
||||||
ctx, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
|
storagetesting.RunTestCount(ctx, t, store)
|
||||||
resourceA := "/foo.bar.io/abc"
|
|
||||||
|
|
||||||
// resourceA is intentionally a prefix of resourceB to ensure that the count
|
|
||||||
// for resourceA does not include any objects from resourceB.
|
|
||||||
resourceB := fmt.Sprintf("%sdef", resourceA)
|
|
||||||
|
|
||||||
resourceACountExpected := 5
|
|
||||||
for i := 1; i <= resourceACountExpected; i++ {
|
|
||||||
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i)}}
|
|
||||||
|
|
||||||
key := fmt.Sprintf("%s/%d", resourceA, i)
|
|
||||||
testPropogateStoreWithKey(ctx, t, store, key, obj)
|
|
||||||
}
|
|
||||||
|
|
||||||
resourceBCount := 4
|
|
||||||
for i := 1; i <= resourceBCount; i++ {
|
|
||||||
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i)}}
|
|
||||||
|
|
||||||
key := fmt.Sprintf("%s/%d", resourceB, i)
|
|
||||||
testPropogateStoreWithKey(ctx, t, store, key, obj)
|
|
||||||
}
|
|
||||||
|
|
||||||
resourceACountGot, err := store.Count(resourceA)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("store.Count failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// count for resourceA should not include the objects for resourceB
|
|
||||||
// even though resourceA is a prefix of resourceB.
|
|
||||||
if int64(resourceACountExpected) != resourceACountGot {
|
|
||||||
t.Fatalf("store.Count for resource %s: expected %d but got %d", resourceA, resourceACountExpected, resourceACountGot)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLeaseMaxObjectCount(t *testing.T) {
|
func TestLeaseMaxObjectCount(t *testing.T) {
|
||||||
@ -2562,14 +1945,3 @@ func TestLeaseMaxObjectCount(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func expectNoDiff(t *testing.T, msg string, expected, got interface{}) {
|
|
||||||
t.Helper()
|
|
||||||
if !reflect.DeepEqual(expected, got) {
|
|
||||||
if diff := cmp.Diff(expected, got); diff != "" {
|
|
||||||
t.Errorf("%s: %s", msg, diff)
|
|
||||||
} else {
|
|
||||||
t.Errorf("%s:\nexpected: %#v\ngot: %#v", msg, expected, got)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -25,11 +25,10 @@ import (
|
|||||||
|
|
||||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
|
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/apitesting"
|
"k8s.io/apimachinery/pkg/api/apitesting"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
@ -38,110 +37,16 @@ import (
|
|||||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
"k8s.io/apiserver/pkg/storage/etcd3/testserver"
|
"k8s.io/apiserver/pkg/storage/etcd3/testserver"
|
||||||
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestWatch(t *testing.T) {
|
func TestWatch(t *testing.T) {
|
||||||
testWatch(t, false)
|
|
||||||
testWatch(t, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
// It tests that
|
|
||||||
// - first occurrence of objects should notify Add event
|
|
||||||
// - update should trigger Modified event
|
|
||||||
// - update that gets filtered should trigger Deleted event
|
|
||||||
func testWatch(t *testing.T, recursive bool) {
|
|
||||||
ctx, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
podFoo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
storagetesting.RunTestWatch(ctx, t, store)
|
||||||
podBar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
|
|
||||||
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
key string
|
|
||||||
pred storage.SelectionPredicate
|
|
||||||
watchTests []*testWatchStruct
|
|
||||||
}{{
|
|
||||||
name: "create a key",
|
|
||||||
key: "/somekey-1",
|
|
||||||
watchTests: []*testWatchStruct{{podFoo, true, watch.Added}},
|
|
||||||
pred: storage.Everything,
|
|
||||||
}, {
|
|
||||||
name: "key updated to match predicate",
|
|
||||||
key: "/somekey-3",
|
|
||||||
watchTests: []*testWatchStruct{{podFoo, false, ""}, {podBar, true, watch.Added}},
|
|
||||||
pred: storage.SelectionPredicate{
|
|
||||||
Label: labels.Everything(),
|
|
||||||
Field: fields.ParseSelectorOrDie("metadata.name=bar"),
|
|
||||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
|
||||||
pod := obj.(*example.Pod)
|
|
||||||
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}, {
|
|
||||||
name: "update",
|
|
||||||
key: "/somekey-4",
|
|
||||||
watchTests: []*testWatchStruct{{podFoo, true, watch.Added}, {podBar, true, watch.Modified}},
|
|
||||||
pred: storage.Everything,
|
|
||||||
}, {
|
|
||||||
name: "delete because of being filtered",
|
|
||||||
key: "/somekey-5",
|
|
||||||
watchTests: []*testWatchStruct{{podFoo, true, watch.Added}, {podBar, true, watch.Deleted}},
|
|
||||||
pred: storage.SelectionPredicate{
|
|
||||||
Label: labels.Everything(),
|
|
||||||
Field: fields.ParseSelectorOrDie("metadata.name!=bar"),
|
|
||||||
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
|
||||||
pod := obj.(*example.Pod)
|
|
||||||
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
w, err := store.Watch(ctx, tt.key, storage.ListOptions{ResourceVersion: "0", Predicate: tt.pred, Recursive: recursive})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Watch failed: %v", err)
|
|
||||||
}
|
|
||||||
var prevObj *example.Pod
|
|
||||||
for _, watchTest := range tt.watchTests {
|
|
||||||
out := &example.Pod{}
|
|
||||||
key := tt.key
|
|
||||||
if recursive {
|
|
||||||
key = key + "/item"
|
|
||||||
}
|
|
||||||
err := store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
|
|
||||||
func(runtime.Object) (runtime.Object, error) {
|
|
||||||
return watchTest.obj, nil
|
|
||||||
}), nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
|
||||||
}
|
|
||||||
if watchTest.expectEvent {
|
|
||||||
expectObj := out
|
|
||||||
if watchTest.watchType == watch.Deleted {
|
|
||||||
expectObj = prevObj
|
|
||||||
expectObj.ResourceVersion = out.ResourceVersion
|
|
||||||
}
|
|
||||||
testCheckResult(t, watchTest.watchType, w, expectObj)
|
|
||||||
}
|
|
||||||
prevObj = out
|
|
||||||
}
|
|
||||||
w.Stop()
|
|
||||||
testCheckStop(t, w)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDeleteTriggerWatch(t *testing.T) {
|
func TestDeleteTriggerWatch(t *testing.T) {
|
||||||
ctx, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
storagetesting.RunTestDeleteTriggerWatch(ctx, t, store)
|
||||||
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Watch failed: %v", err)
|
|
||||||
}
|
|
||||||
if err := store.Delete(ctx, key, &example.Pod{}, nil, storage.ValidateAllObjectFunc, nil); err != nil {
|
|
||||||
t.Fatalf("Delete failed: %v", err)
|
|
||||||
}
|
|
||||||
testCheckEventType(t, watch.Deleted, w)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestWatchFromZero tests that
|
// TestWatchFromZero tests that
|
||||||
@ -149,13 +54,13 @@ func TestDeleteTriggerWatch(t *testing.T) {
|
|||||||
// - watch from 0 is able to return events for objects whose previous version has been compacted
|
// - watch from 0 is able to return events for objects whose previous version has been compacted
|
||||||
func TestWatchFromZero(t *testing.T) {
|
func TestWatchFromZero(t *testing.T) {
|
||||||
ctx, store, client := testSetup(t)
|
ctx, store, client := testSetup(t)
|
||||||
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}})
|
key, storedObj := storagetesting.TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}})
|
||||||
|
|
||||||
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Watch failed: %v", err)
|
t.Fatalf("Watch failed: %v", err)
|
||||||
}
|
}
|
||||||
testCheckResult(t, watch.Added, w, storedObj)
|
storagetesting.TestCheckResult(t, watch.Added, w, storedObj)
|
||||||
w.Stop()
|
w.Stop()
|
||||||
|
|
||||||
// Update
|
// Update
|
||||||
@ -173,7 +78,7 @@ func TestWatchFromZero(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Watch failed: %v", err)
|
t.Fatalf("Watch failed: %v", err)
|
||||||
}
|
}
|
||||||
testCheckResult(t, watch.Added, w, out)
|
storagetesting.TestCheckResult(t, watch.Added, w, out)
|
||||||
w.Stop()
|
w.Stop()
|
||||||
|
|
||||||
// Update again
|
// Update again
|
||||||
@ -201,25 +106,14 @@ func TestWatchFromZero(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Watch failed: %v", err)
|
t.Fatalf("Watch failed: %v", err)
|
||||||
}
|
}
|
||||||
testCheckResult(t, watch.Added, w, out)
|
storagetesting.TestCheckResult(t, watch.Added, w, out)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestWatchFromNoneZero tests that
|
// TestWatchFromNoneZero tests that
|
||||||
// - watch from non-0 should just watch changes after given version
|
// - watch from non-0 should just watch changes after given version
|
||||||
func TestWatchFromNoneZero(t *testing.T) {
|
func TestWatchFromNoneZero(t *testing.T) {
|
||||||
ctx, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
storagetesting.RunTestWatchFromNoneZero(ctx, t, store)
|
||||||
|
|
||||||
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Watch failed: %v", err)
|
|
||||||
}
|
|
||||||
out := &example.Pod{}
|
|
||||||
store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
|
|
||||||
func(runtime.Object) (runtime.Object, error) {
|
|
||||||
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, err
|
|
||||||
}), nil)
|
|
||||||
testCheckResult(t, watch.Modified, w, out)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWatchError(t *testing.T) {
|
func TestWatchError(t *testing.T) {
|
||||||
@ -238,7 +132,7 @@ func TestWatchError(t *testing.T) {
|
|||||||
}), nil); err != nil {
|
}), nil); err != nil {
|
||||||
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
||||||
}
|
}
|
||||||
testCheckEventType(t, watch.Error, w)
|
storagetesting.TestCheckEventType(t, watch.Error, w)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWatchContextCancel(t *testing.T) {
|
func TestWatchContextCancel(t *testing.T) {
|
||||||
@ -286,7 +180,7 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
|
|||||||
|
|
||||||
func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
|
func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
|
||||||
ctx, store, client := testSetup(t)
|
ctx, store, client := testSetup(t)
|
||||||
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
key, storedObj := storagetesting.TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||||
|
|
||||||
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -341,19 +235,8 @@ func deletedRevision(ctx context.Context, watch <-chan clientv3.WatchResponse) (
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestWatchInitializationSignal(t *testing.T) {
|
func TestWatchInitializationSignal(t *testing.T) {
|
||||||
_, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
|
storagetesting.RunTestWatchInitializationSignal(ctx, t, store)
|
||||||
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
|
|
||||||
initSignal := utilflowcontrol.NewInitializationSignal()
|
|
||||||
ctx = utilflowcontrol.WithInitializationSignal(ctx, initSignal)
|
|
||||||
|
|
||||||
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
|
||||||
_, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Watch failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
initSignal.Wait()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestProgressNotify(t *testing.T) {
|
func TestProgressNotify(t *testing.T) {
|
||||||
@ -381,7 +264,7 @@ func TestProgressNotify(t *testing.T) {
|
|||||||
|
|
||||||
// when we send a bookmark event, the client expects the event to contain an
|
// when we send a bookmark event, the client expects the event to contain an
|
||||||
// object of the correct type, but with no fields set other than the resourceVersion
|
// object of the correct type, but with no fields set other than the resourceVersion
|
||||||
testCheckResultFunc(t, watch.Bookmark, w, func(object runtime.Object) error {
|
storagetesting.TestCheckResultFunc(t, watch.Bookmark, w, func(object runtime.Object) error {
|
||||||
// first, check that we have the correct resource version
|
// first, check that we have the correct resource version
|
||||||
obj, ok := object.(metav1.Object)
|
obj, ok := object.(metav1.Object)
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -397,17 +280,11 @@ func TestProgressNotify(t *testing.T) {
|
|||||||
return fmt.Errorf("got %T, not *example.Pod", object)
|
return fmt.Errorf("got %T, not *example.Pod", object)
|
||||||
}
|
}
|
||||||
pod.ResourceVersion = ""
|
pod.ResourceVersion = ""
|
||||||
expectNoDiff(t, "bookmark event should contain an object with no fields set other than resourceVersion", newPod(), pod)
|
storagetesting.ExpectNoDiff(t, "bookmark event should contain an object with no fields set other than resourceVersion", newPod(), pod)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
type testWatchStruct struct {
|
|
||||||
obj *example.Pod
|
|
||||||
expectEvent bool
|
|
||||||
watchType watch.EventType
|
|
||||||
}
|
|
||||||
|
|
||||||
type testCodec struct {
|
type testCodec struct {
|
||||||
runtime.Codec
|
runtime.Codec
|
||||||
}
|
}
|
||||||
@ -416,17 +293,6 @@ func (c *testCodec) Decode(data []byte, defaults *schema.GroupVersionKind, into
|
|||||||
return nil, nil, errTestingDecode
|
return nil, nil, errTestingDecode
|
||||||
}
|
}
|
||||||
|
|
||||||
func testCheckEventType(t *testing.T, expectEventType watch.EventType, w watch.Interface) {
|
|
||||||
select {
|
|
||||||
case res := <-w.ResultChan():
|
|
||||||
if res.Type != expectEventType {
|
|
||||||
t.Errorf("event type want=%v, get=%v", expectEventType, res.Type)
|
|
||||||
}
|
|
||||||
case <-time.After(wait.ForeverTestTimeout):
|
|
||||||
t.Errorf("time out after waiting %v on ResultChan", wait.ForeverTestTimeout)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// resourceVersionNotOlderThan returns a function to validate resource versions. Resource versions
|
// resourceVersionNotOlderThan returns a function to validate resource versions. Resource versions
|
||||||
// referring to points in logical time before the sentinel generate an error. All logical times as
|
// referring to points in logical time before the sentinel generate an error. All logical times as
|
||||||
// new as the sentinel or newer generate no error.
|
// new as the sentinel or newer generate no error.
|
||||||
@ -447,43 +313,3 @@ func resourceVersionNotOlderThan(sentinel string) func(string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testCheckResult(t *testing.T, expectEventType watch.EventType, w watch.Interface, expectObj *example.Pod) {
|
|
||||||
testCheckResultFunc(t, expectEventType, w, func(object runtime.Object) error {
|
|
||||||
expectNoDiff(t, "incorrect object", expectObj, object)
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func testCheckResultFunc(t *testing.T, expectEventType watch.EventType, w watch.Interface, check func(object runtime.Object) error) {
|
|
||||||
select {
|
|
||||||
case res := <-w.ResultChan():
|
|
||||||
if res.Type != expectEventType {
|
|
||||||
t.Errorf("event type want=%v, get=%v", expectEventType, res.Type)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := check(res.Object); err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
case <-time.After(wait.ForeverTestTimeout):
|
|
||||||
t.Errorf("time out after waiting %v on ResultChan", wait.ForeverTestTimeout)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func testCheckStop(t *testing.T, w watch.Interface) {
|
|
||||||
select {
|
|
||||||
case e, ok := <-w.ResultChan():
|
|
||||||
if ok {
|
|
||||||
var obj string
|
|
||||||
switch e.Object.(type) {
|
|
||||||
case *example.Pod:
|
|
||||||
obj = e.Object.(*example.Pod).Name
|
|
||||||
case *metav1.Status:
|
|
||||||
obj = e.Object.(*metav1.Status).Message
|
|
||||||
}
|
|
||||||
t.Errorf("ResultChan should have been closed. Event: %s. Object: %s", e.Type, obj)
|
|
||||||
}
|
|
||||||
case <-time.After(wait.ForeverTestTimeout):
|
|
||||||
t.Errorf("time out after waiting 1s on ResultChan")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
742
staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go
Normal file
742
staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go
Normal file
@ -0,0 +1,742 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package testing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
|
"reflect"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/google/go-cmp/cmp"
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
|
"k8s.io/apiserver/pkg/apis/example"
|
||||||
|
"k8s.io/apiserver/pkg/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
func RunTestCreateWithTTL(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||||
|
key := "/somekey"
|
||||||
|
|
||||||
|
out := &example.Pod{}
|
||||||
|
if err := store.Create(ctx, key, input, out, 1); err != nil {
|
||||||
|
t.Fatalf("Create failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: out.ResourceVersion, Predicate: storage.Everything})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Watch failed: %v", err)
|
||||||
|
}
|
||||||
|
TestCheckEventType(t, watch.Deleted, w)
|
||||||
|
}
|
||||||
|
|
||||||
|
func RunTestCreateWithKeyExist(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||||
|
key, _ := TestPropogateStore(ctx, t, store, obj)
|
||||||
|
out := &example.Pod{}
|
||||||
|
err := store.Create(ctx, key, obj, out, 0)
|
||||||
|
if err == nil || !storage.IsExist(err) {
|
||||||
|
t.Errorf("expecting key exists error, but get: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func RunTestGet(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
// create an object to test
|
||||||
|
key, createdObj := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||||
|
// update the object once to allow get by exact resource version to be tested
|
||||||
|
updateObj := createdObj.DeepCopy()
|
||||||
|
updateObj.Annotations = map[string]string{"test-annotation": "1"}
|
||||||
|
storedObj := &example.Pod{}
|
||||||
|
err := store.GuaranteedUpdate(ctx, key, storedObj, true, nil,
|
||||||
|
func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
||||||
|
ttl := uint64(1)
|
||||||
|
return updateObj, &ttl, nil
|
||||||
|
}, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Update failed: %v", err)
|
||||||
|
}
|
||||||
|
// create an additional object to increment the resource version for pods above the resource version of the foo object
|
||||||
|
lastUpdatedObj := &example.Pod{}
|
||||||
|
if err := store.Create(ctx, "bar", &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, lastUpdatedObj, 0); err != nil {
|
||||||
|
t.Fatalf("Set failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
currentRV, _ := strconv.Atoi(storedObj.ResourceVersion)
|
||||||
|
lastUpdatedCurrentRV, _ := strconv.Atoi(lastUpdatedObj.ResourceVersion)
|
||||||
|
|
||||||
|
// TODO(jpbetz): Add exact test cases
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
key string
|
||||||
|
ignoreNotFound bool
|
||||||
|
expectNotFoundErr bool
|
||||||
|
expectRVTooLarge bool
|
||||||
|
expectedOut *example.Pod
|
||||||
|
rv string
|
||||||
|
}{{ // test get on existing item
|
||||||
|
name: "get existing",
|
||||||
|
key: key,
|
||||||
|
ignoreNotFound: false,
|
||||||
|
expectNotFoundErr: false,
|
||||||
|
expectedOut: storedObj,
|
||||||
|
}, { // test get on existing item with resource version set to 0
|
||||||
|
name: "resource version 0",
|
||||||
|
key: key,
|
||||||
|
expectedOut: storedObj,
|
||||||
|
rv: "0",
|
||||||
|
}, { // test get on existing item with resource version set to the resource version is was created on
|
||||||
|
name: "object created resource version",
|
||||||
|
key: key,
|
||||||
|
expectedOut: storedObj,
|
||||||
|
rv: createdObj.ResourceVersion,
|
||||||
|
}, { // test get on existing item with resource version set to current resource version of the object
|
||||||
|
name: "current object resource version, match=NotOlderThan",
|
||||||
|
key: key,
|
||||||
|
expectedOut: storedObj,
|
||||||
|
rv: fmt.Sprintf("%d", currentRV),
|
||||||
|
}, { // test get on existing item with resource version set to latest pod resource version
|
||||||
|
name: "latest resource version",
|
||||||
|
key: key,
|
||||||
|
expectedOut: storedObj,
|
||||||
|
rv: fmt.Sprintf("%d", lastUpdatedCurrentRV),
|
||||||
|
}, { // test get on existing item with resource version set too high
|
||||||
|
name: "too high resource version",
|
||||||
|
key: key,
|
||||||
|
expectRVTooLarge: true,
|
||||||
|
rv: strconv.FormatInt(math.MaxInt64, 10),
|
||||||
|
}, { // test get on non-existing item with ignoreNotFound=false
|
||||||
|
name: "get non-existing",
|
||||||
|
key: "/non-existing",
|
||||||
|
ignoreNotFound: false,
|
||||||
|
expectNotFoundErr: true,
|
||||||
|
}, { // test get on non-existing item with ignoreNotFound=true
|
||||||
|
name: "get non-existing, ignore not found",
|
||||||
|
key: "/non-existing",
|
||||||
|
ignoreNotFound: true,
|
||||||
|
expectNotFoundErr: false,
|
||||||
|
expectedOut: &example.Pod{},
|
||||||
|
}}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
out := &example.Pod{}
|
||||||
|
err := store.Get(ctx, tt.key, storage.GetOptions{IgnoreNotFound: tt.ignoreNotFound, ResourceVersion: tt.rv}, out)
|
||||||
|
if tt.expectNotFoundErr {
|
||||||
|
if err == nil || !storage.IsNotFound(err) {
|
||||||
|
t.Errorf("expecting not found error, but get: %v", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if tt.expectRVTooLarge {
|
||||||
|
if err == nil || !storage.IsTooLargeResourceVersion(err) {
|
||||||
|
t.Errorf("expecting resource version too high error, but get: %v", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Get failed: %v", err)
|
||||||
|
}
|
||||||
|
ExpectNoDiff(t, fmt.Sprintf("%s: incorrect pod", tt.name), tt.expectedOut, out)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func RunTestUnconditionalDelete(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
key, storedObj := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
key string
|
||||||
|
expectedObj *example.Pod
|
||||||
|
expectNotFoundErr bool
|
||||||
|
}{{
|
||||||
|
name: "existing key",
|
||||||
|
key: key,
|
||||||
|
expectedObj: storedObj,
|
||||||
|
expectNotFoundErr: false,
|
||||||
|
}, {
|
||||||
|
name: "non-existing key",
|
||||||
|
key: "/non-existing",
|
||||||
|
expectedObj: nil,
|
||||||
|
expectNotFoundErr: true,
|
||||||
|
}}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
out := &example.Pod{} // reset
|
||||||
|
err := store.Delete(ctx, tt.key, out, nil, storage.ValidateAllObjectFunc, nil)
|
||||||
|
if tt.expectNotFoundErr {
|
||||||
|
if err == nil || !storage.IsNotFound(err) {
|
||||||
|
t.Errorf("%s: expecting not found error, but get: %s", tt.name, err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%s: Delete failed: %v", tt.name, err)
|
||||||
|
}
|
||||||
|
ExpectNoDiff(t, fmt.Sprintf("%s: incorrect pod:", tt.name), tt.expectedObj, out)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func RunTestConditionalDelete(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
key, storedObj := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
precondition *storage.Preconditions
|
||||||
|
expectInvalidObjErr bool
|
||||||
|
}{{
|
||||||
|
name: "UID match",
|
||||||
|
precondition: storage.NewUIDPreconditions("A"),
|
||||||
|
expectInvalidObjErr: false,
|
||||||
|
}, {
|
||||||
|
name: "UID mismatch",
|
||||||
|
precondition: storage.NewUIDPreconditions("B"),
|
||||||
|
expectInvalidObjErr: true,
|
||||||
|
}}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
out := &example.Pod{}
|
||||||
|
err := store.Delete(ctx, key, out, tt.precondition, storage.ValidateAllObjectFunc, nil)
|
||||||
|
if tt.expectInvalidObjErr {
|
||||||
|
if err == nil || !storage.IsInvalidObj(err) {
|
||||||
|
t.Errorf("%s: expecting invalid UID error, but get: %s", tt.name, err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%s: Delete failed: %v", tt.name, err)
|
||||||
|
}
|
||||||
|
ExpectNoDiff(t, fmt.Sprintf("%s: incorrect pod", tt.name), storedObj, out)
|
||||||
|
key, storedObj = TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// The following set of Delete tests are testing the logic of adding `suggestion`
|
||||||
|
// as a parameter with probably value of the current state.
|
||||||
|
// Introducing it for GuaranteedUpdate cause a number of issues, so we're addressing
|
||||||
|
// all of those upfront by adding appropriate tests:
|
||||||
|
// - https://github.com/kubernetes/kubernetes/pull/35415
|
||||||
|
// [DONE] Lack of tests originally - added TestDeleteWithSuggestion.
|
||||||
|
// - https://github.com/kubernetes/kubernetes/pull/40664
|
||||||
|
// [DONE] Irrelevant for delete, as Delete doesn't write data (nor compare it).
|
||||||
|
// - https://github.com/kubernetes/kubernetes/pull/47703
|
||||||
|
// [DONE] Irrelevant for delete, because Delete doesn't persist data.
|
||||||
|
// - https://github.com/kubernetes/kubernetes/pull/48394/
|
||||||
|
// [DONE] Irrelevant for delete, because Delete doesn't compare data.
|
||||||
|
// - https://github.com/kubernetes/kubernetes/pull/43152
|
||||||
|
// [DONE] Added TestDeleteWithSuggestionAndConflict
|
||||||
|
// - https://github.com/kubernetes/kubernetes/pull/54780
|
||||||
|
// [DONE] Irrelevant for delete, because Delete doesn't compare data.
|
||||||
|
// - https://github.com/kubernetes/kubernetes/pull/58375
|
||||||
|
// [DONE] Irrelevant for delete, because Delete doesn't compare data.
|
||||||
|
// - https://github.com/kubernetes/kubernetes/pull/77619
|
||||||
|
// [DONE] Added TestValidateDeletionWithSuggestion for corresponding delete checks.
|
||||||
|
// - https://github.com/kubernetes/kubernetes/pull/78713
|
||||||
|
// [DONE] Bug was in getState function which is shared with the new code.
|
||||||
|
// - https://github.com/kubernetes/kubernetes/pull/78713
|
||||||
|
// [DONE] Added TestPreconditionalDeleteWithSuggestion
|
||||||
|
|
||||||
|
func RunTestDeleteWithSuggestion(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
|
||||||
|
key, originalPod := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
||||||
|
|
||||||
|
out := &example.Pod{}
|
||||||
|
if err := store.Delete(ctx, key, out, nil, storage.ValidateAllObjectFunc, originalPod); err != nil {
|
||||||
|
t.Errorf("Unexpected failure during deletion: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := store.Get(ctx, key, storage.GetOptions{}, &example.Pod{}); !storage.IsNotFound(err) {
|
||||||
|
t.Errorf("Unexpected error on reading object: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func RunTestDeleteWithSuggestionAndConflict(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
|
||||||
|
key, originalPod := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
||||||
|
|
||||||
|
// First update, so originalPod is outdated.
|
||||||
|
updatedPod := &example.Pod{}
|
||||||
|
if err := store.GuaranteedUpdate(ctx, key, updatedPod, false, nil,
|
||||||
|
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
||||||
|
pod := obj.(*example.Pod)
|
||||||
|
pod.ObjectMeta.Labels = map[string]string{"foo": "bar"}
|
||||||
|
return pod, nil
|
||||||
|
}), nil); err != nil {
|
||||||
|
t.Errorf("Unexpected failure during updated: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
out := &example.Pod{}
|
||||||
|
if err := store.Delete(ctx, key, out, nil, storage.ValidateAllObjectFunc, originalPod); err != nil {
|
||||||
|
t.Errorf("Unexpected failure during deletion: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := store.Get(ctx, key, storage.GetOptions{}, &example.Pod{}); !storage.IsNotFound(err) {
|
||||||
|
t.Errorf("Unexpected error on reading object: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func RunTestDeleteWithSuggestionOfDeletedObject(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
|
||||||
|
key, originalPod := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
||||||
|
|
||||||
|
// First delete, so originalPod is outdated.
|
||||||
|
deletedPod := &example.Pod{}
|
||||||
|
if err := store.Delete(ctx, key, deletedPod, nil, storage.ValidateAllObjectFunc, originalPod); err != nil {
|
||||||
|
t.Errorf("Unexpected failure during deletion: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now try deleting with stale object.
|
||||||
|
out := &example.Pod{}
|
||||||
|
if err := store.Delete(ctx, key, out, nil, storage.ValidateAllObjectFunc, originalPod); !storage.IsNotFound(err) {
|
||||||
|
t.Errorf("Unexpected error during deletion: %v, expected not-found", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func RunTestValidateDeletionWithSuggestion(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
|
||||||
|
key, originalPod := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
||||||
|
|
||||||
|
// Check that validaing fresh object fails is called once and fails.
|
||||||
|
validationCalls := 0
|
||||||
|
validationError := fmt.Errorf("validation error")
|
||||||
|
validateNothing := func(_ context.Context, _ runtime.Object) error {
|
||||||
|
validationCalls++
|
||||||
|
return validationError
|
||||||
|
}
|
||||||
|
out := &example.Pod{}
|
||||||
|
if err := store.Delete(ctx, key, out, nil, validateNothing, originalPod); err != validationError {
|
||||||
|
t.Errorf("Unexpected failure during deletion: %v", err)
|
||||||
|
}
|
||||||
|
if validationCalls != 1 {
|
||||||
|
t.Errorf("validate function should have been called once, called %d", validationCalls)
|
||||||
|
}
|
||||||
|
|
||||||
|
// First update, so originalPod is outdated.
|
||||||
|
updatedPod := &example.Pod{}
|
||||||
|
if err := store.GuaranteedUpdate(ctx, key, updatedPod, false, nil,
|
||||||
|
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
||||||
|
pod := obj.(*example.Pod)
|
||||||
|
pod.ObjectMeta.Labels = map[string]string{"foo": "bar"}
|
||||||
|
return pod, nil
|
||||||
|
}), nil); err != nil {
|
||||||
|
t.Errorf("Unexpected failure during updated: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
calls := 0
|
||||||
|
validateFresh := func(_ context.Context, obj runtime.Object) error {
|
||||||
|
calls++
|
||||||
|
pod := obj.(*example.Pod)
|
||||||
|
if pod.ObjectMeta.Labels == nil || pod.ObjectMeta.Labels["foo"] != "bar" {
|
||||||
|
return fmt.Errorf("stale object")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := store.Delete(ctx, key, out, nil, validateFresh, originalPod); err != nil {
|
||||||
|
t.Errorf("Unexpected failure during deletion: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if calls != 2 {
|
||||||
|
t.Errorf("validate function should have been called twice, called %d", calls)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := store.Get(ctx, key, storage.GetOptions{}, &example.Pod{}); !storage.IsNotFound(err) {
|
||||||
|
t.Errorf("Unexpected error on reading object: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func RunTestPreconditionalDeleteWithSuggestion(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
|
||||||
|
key, originalPod := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
||||||
|
|
||||||
|
// First update, so originalPod is outdated.
|
||||||
|
updatedPod := &example.Pod{}
|
||||||
|
if err := store.GuaranteedUpdate(ctx, key, updatedPod, false, nil,
|
||||||
|
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
||||||
|
pod := obj.(*example.Pod)
|
||||||
|
pod.ObjectMeta.UID = "myUID"
|
||||||
|
return pod, nil
|
||||||
|
}), nil); err != nil {
|
||||||
|
t.Errorf("Unexpected failure during updated: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
prec := storage.NewUIDPreconditions("myUID")
|
||||||
|
|
||||||
|
out := &example.Pod{}
|
||||||
|
if err := store.Delete(ctx, key, out, prec, storage.ValidateAllObjectFunc, originalPod); err != nil {
|
||||||
|
t.Errorf("Unexpected failure during deletion: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := store.Get(ctx, key, storage.GetOptions{}, &example.Pod{}); !storage.IsNotFound(err) {
|
||||||
|
t.Errorf("Unexpected error on reading object: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
prevKey, prevStoredObj := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "prev"}})
|
||||||
|
|
||||||
|
prevRV, _ := strconv.Atoi(prevStoredObj.ResourceVersion)
|
||||||
|
|
||||||
|
key, storedObj := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||||
|
|
||||||
|
currentRV, _ := strconv.Atoi(storedObj.ResourceVersion)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
key string
|
||||||
|
pred storage.SelectionPredicate
|
||||||
|
expectedOut []*example.Pod
|
||||||
|
rv string
|
||||||
|
rvMatch metav1.ResourceVersionMatch
|
||||||
|
expectRVTooLarge bool
|
||||||
|
}{{
|
||||||
|
name: "existing key",
|
||||||
|
key: key,
|
||||||
|
pred: storage.Everything,
|
||||||
|
expectedOut: []*example.Pod{storedObj},
|
||||||
|
}, {
|
||||||
|
name: "existing key, resourceVersion=0",
|
||||||
|
key: key,
|
||||||
|
pred: storage.Everything,
|
||||||
|
expectedOut: []*example.Pod{storedObj},
|
||||||
|
rv: "0",
|
||||||
|
}, {
|
||||||
|
name: "existing key, resourceVersion=0, resourceVersionMatch=notOlderThan",
|
||||||
|
key: key,
|
||||||
|
pred: storage.Everything,
|
||||||
|
expectedOut: []*example.Pod{storedObj},
|
||||||
|
rv: "0",
|
||||||
|
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||||
|
}, {
|
||||||
|
name: "existing key, resourceVersion=current",
|
||||||
|
key: key,
|
||||||
|
pred: storage.Everything,
|
||||||
|
expectedOut: []*example.Pod{storedObj},
|
||||||
|
rv: fmt.Sprintf("%d", currentRV),
|
||||||
|
}, {
|
||||||
|
name: "existing key, resourceVersion=current, resourceVersionMatch=notOlderThan",
|
||||||
|
key: key,
|
||||||
|
pred: storage.Everything,
|
||||||
|
expectedOut: []*example.Pod{storedObj},
|
||||||
|
rv: fmt.Sprintf("%d", currentRV),
|
||||||
|
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||||
|
}, {
|
||||||
|
name: "existing key, resourceVersion=previous, resourceVersionMatch=notOlderThan",
|
||||||
|
key: key,
|
||||||
|
pred: storage.Everything,
|
||||||
|
expectedOut: []*example.Pod{storedObj},
|
||||||
|
rv: fmt.Sprintf("%d", prevRV),
|
||||||
|
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||||
|
}, {
|
||||||
|
name: "existing key, resourceVersion=current, resourceVersionMatch=exact",
|
||||||
|
key: key,
|
||||||
|
pred: storage.Everything,
|
||||||
|
expectedOut: []*example.Pod{storedObj},
|
||||||
|
rv: fmt.Sprintf("%d", currentRV),
|
||||||
|
rvMatch: metav1.ResourceVersionMatchExact,
|
||||||
|
}, {
|
||||||
|
name: "existing key, resourceVersion=current, resourceVersionMatch=exact",
|
||||||
|
key: prevKey,
|
||||||
|
pred: storage.Everything,
|
||||||
|
expectedOut: []*example.Pod{prevStoredObj},
|
||||||
|
rv: fmt.Sprintf("%d", prevRV),
|
||||||
|
rvMatch: metav1.ResourceVersionMatchExact,
|
||||||
|
}, {
|
||||||
|
name: "existing key, resourceVersion=too high",
|
||||||
|
key: key,
|
||||||
|
pred: storage.Everything,
|
||||||
|
expectedOut: []*example.Pod{storedObj},
|
||||||
|
rv: strconv.FormatInt(math.MaxInt64, 10),
|
||||||
|
expectRVTooLarge: true,
|
||||||
|
}, {
|
||||||
|
name: "non-existing key",
|
||||||
|
key: "/non-existing",
|
||||||
|
pred: storage.Everything,
|
||||||
|
expectedOut: nil,
|
||||||
|
}, {
|
||||||
|
name: "with matching pod name",
|
||||||
|
key: "/non-existing",
|
||||||
|
pred: storage.SelectionPredicate{
|
||||||
|
Label: labels.Everything(),
|
||||||
|
Field: fields.ParseSelectorOrDie("metadata.name!=" + storedObj.Name),
|
||||||
|
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||||
|
pod := obj.(*example.Pod)
|
||||||
|
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedOut: nil,
|
||||||
|
}}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
out := &example.PodList{}
|
||||||
|
storageOpts := storage.ListOptions{
|
||||||
|
ResourceVersion: tt.rv,
|
||||||
|
ResourceVersionMatch: tt.rvMatch,
|
||||||
|
Predicate: tt.pred,
|
||||||
|
Recursive: false,
|
||||||
|
}
|
||||||
|
err := store.GetList(ctx, tt.key, storageOpts, out)
|
||||||
|
|
||||||
|
if tt.expectRVTooLarge {
|
||||||
|
if err == nil || !storage.IsTooLargeResourceVersion(err) {
|
||||||
|
t.Errorf("%s: expecting resource version too high error, but get: %s", tt.name, err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetList failed: %v", err)
|
||||||
|
}
|
||||||
|
if len(out.ResourceVersion) == 0 {
|
||||||
|
t.Errorf("%s: unset resourceVersion", tt.name)
|
||||||
|
}
|
||||||
|
if len(out.Items) != len(tt.expectedOut) {
|
||||||
|
t.Errorf("%s: length of list want=%d, get=%d", tt.name, len(tt.expectedOut), len(out.Items))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for j, wantPod := range tt.expectedOut {
|
||||||
|
getPod := &out.Items[j]
|
||||||
|
ExpectNoDiff(t, fmt.Sprintf("%s: incorrect pod", tt.name), wantPod, getPod)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func RunTestGuaranteedUpdateWithTTL(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
|
||||||
|
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||||
|
key := "/somekey"
|
||||||
|
|
||||||
|
out := &example.Pod{}
|
||||||
|
err := store.GuaranteedUpdate(ctx, key, out, true, nil,
|
||||||
|
func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
||||||
|
ttl := uint64(1)
|
||||||
|
return input, &ttl, nil
|
||||||
|
}, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Create failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: out.ResourceVersion, Predicate: storage.Everything})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Watch failed: %v", err)
|
||||||
|
}
|
||||||
|
TestCheckEventType(t, watch.Deleted, w)
|
||||||
|
}
|
||||||
|
|
||||||
|
func RunTestGuaranteedUpdateWithConflict(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
key, _ := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||||
|
|
||||||
|
errChan := make(chan error, 1)
|
||||||
|
var firstToFinish sync.WaitGroup
|
||||||
|
var secondToEnter sync.WaitGroup
|
||||||
|
firstToFinish.Add(1)
|
||||||
|
secondToEnter.Add(1)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
err := store.GuaranteedUpdate(ctx, key, &example.Pod{}, false, nil,
|
||||||
|
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
||||||
|
pod := obj.(*example.Pod)
|
||||||
|
pod.Name = "foo-1"
|
||||||
|
secondToEnter.Wait()
|
||||||
|
return pod, nil
|
||||||
|
}), nil)
|
||||||
|
firstToFinish.Done()
|
||||||
|
errChan <- err
|
||||||
|
}()
|
||||||
|
|
||||||
|
updateCount := 0
|
||||||
|
err := store.GuaranteedUpdate(ctx, key, &example.Pod{}, false, nil,
|
||||||
|
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
||||||
|
if updateCount == 0 {
|
||||||
|
secondToEnter.Done()
|
||||||
|
firstToFinish.Wait()
|
||||||
|
}
|
||||||
|
updateCount++
|
||||||
|
pod := obj.(*example.Pod)
|
||||||
|
pod.Name = "foo-2"
|
||||||
|
return pod, nil
|
||||||
|
}), nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Second GuaranteedUpdate error %#v", err)
|
||||||
|
}
|
||||||
|
if err := <-errChan; err != nil {
|
||||||
|
t.Fatalf("First GuaranteedUpdate error %#v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if updateCount != 2 {
|
||||||
|
t.Errorf("Should have conflict and called update func twice")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func RunTestGuaranteedUpdateWithSuggestionAndConflict(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
key, originalPod := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||||
|
|
||||||
|
// First, update without a suggestion so originalPod is outdated
|
||||||
|
updatedPod := &example.Pod{}
|
||||||
|
err := store.GuaranteedUpdate(ctx, key, updatedPod, false, nil,
|
||||||
|
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
||||||
|
pod := obj.(*example.Pod)
|
||||||
|
pod.Name = "foo-2"
|
||||||
|
return pod, nil
|
||||||
|
}),
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Second, update using the outdated originalPod as the suggestion. Return a conflict error when
|
||||||
|
// passed originalPod, and make sure that SimpleUpdate is called a second time after a live lookup
|
||||||
|
// with the value of updatedPod.
|
||||||
|
sawConflict := false
|
||||||
|
updatedPod2 := &example.Pod{}
|
||||||
|
err = store.GuaranteedUpdate(ctx, key, updatedPod2, false, nil,
|
||||||
|
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
||||||
|
pod := obj.(*example.Pod)
|
||||||
|
if pod.Name != "foo-2" {
|
||||||
|
if sawConflict {
|
||||||
|
t.Fatalf("unexpected second conflict")
|
||||||
|
}
|
||||||
|
sawConflict = true
|
||||||
|
// simulated stale object - return a conflict
|
||||||
|
return nil, apierrors.NewConflict(example.SchemeGroupVersion.WithResource("pods").GroupResource(), "name", errors.New("foo"))
|
||||||
|
}
|
||||||
|
pod.Name = "foo-3"
|
||||||
|
return pod, nil
|
||||||
|
}),
|
||||||
|
originalPod,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if updatedPod2.Name != "foo-3" {
|
||||||
|
t.Errorf("unexpected pod name: %q", updatedPod2.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Third, update using a current version as the suggestion.
|
||||||
|
// Return an error and make sure that SimpleUpdate is NOT called a second time,
|
||||||
|
// since the live lookup shows the suggestion was already up to date.
|
||||||
|
attempts := 0
|
||||||
|
updatedPod3 := &example.Pod{}
|
||||||
|
err = store.GuaranteedUpdate(ctx, key, updatedPod3, false, nil,
|
||||||
|
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
||||||
|
pod := obj.(*example.Pod)
|
||||||
|
if pod.Name != updatedPod2.Name || pod.ResourceVersion != updatedPod2.ResourceVersion {
|
||||||
|
t.Errorf(
|
||||||
|
"unexpected live object (name=%s, rv=%s), expected name=%s, rv=%s",
|
||||||
|
pod.Name,
|
||||||
|
pod.ResourceVersion,
|
||||||
|
updatedPod2.Name,
|
||||||
|
updatedPod2.ResourceVersion,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
attempts++
|
||||||
|
return nil, fmt.Errorf("validation or admission error")
|
||||||
|
}),
|
||||||
|
updatedPod2,
|
||||||
|
)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("expected error, got none")
|
||||||
|
}
|
||||||
|
if attempts != 1 {
|
||||||
|
t.Errorf("expected 1 attempt, got %d", attempts)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestPropogateStore helps propagates store with objects, automates key generation, and returns
|
||||||
|
// keys and stored objects.
|
||||||
|
func TestPropogateStore(ctx context.Context, t *testing.T, store storage.Interface, obj *example.Pod) (string, *example.Pod) {
|
||||||
|
// Setup store with a key and grab the output for returning.
|
||||||
|
key := "/testkey"
|
||||||
|
return key, TestPropogateStoreWithKey(ctx, t, store, key, obj)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestPropogateStoreWithKey helps propagate store with objects, the given object will be stored at the specified key.
|
||||||
|
func TestPropogateStoreWithKey(ctx context.Context, t *testing.T, store storage.Interface, key string, obj *example.Pod) *example.Pod {
|
||||||
|
// Setup store with the specified key and grab the output for returning.
|
||||||
|
err := store.Delete(ctx, key, &example.Pod{}, nil, storage.ValidateAllObjectFunc, nil)
|
||||||
|
if err != nil && !storage.IsNotFound(err) {
|
||||||
|
t.Fatalf("Cleanup failed: %v", err)
|
||||||
|
}
|
||||||
|
setOutput := &example.Pod{}
|
||||||
|
if err := store.Create(ctx, key, obj, setOutput, 0); err != nil {
|
||||||
|
t.Fatalf("Set failed: %v", err)
|
||||||
|
}
|
||||||
|
return setOutput
|
||||||
|
}
|
||||||
|
|
||||||
|
func RunTestCount(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
|
||||||
|
resourceA := "/foo.bar.io/abc"
|
||||||
|
|
||||||
|
// resourceA is intentionally a prefix of resourceB to ensure that the count
|
||||||
|
// for resourceA does not include any objects from resourceB.
|
||||||
|
resourceB := fmt.Sprintf("%sdef", resourceA)
|
||||||
|
|
||||||
|
resourceACountExpected := 5
|
||||||
|
for i := 1; i <= resourceACountExpected; i++ {
|
||||||
|
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i)}}
|
||||||
|
|
||||||
|
key := fmt.Sprintf("%s/%d", resourceA, i)
|
||||||
|
TestPropogateStoreWithKey(ctx, t, store, key, obj)
|
||||||
|
}
|
||||||
|
|
||||||
|
resourceBCount := 4
|
||||||
|
for i := 1; i <= resourceBCount; i++ {
|
||||||
|
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i)}}
|
||||||
|
|
||||||
|
key := fmt.Sprintf("%s/%d", resourceB, i)
|
||||||
|
TestPropogateStoreWithKey(ctx, t, store, key, obj)
|
||||||
|
}
|
||||||
|
|
||||||
|
resourceACountGot, err := store.Count(resourceA)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("store.Count failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// count for resourceA should not include the objects for resourceB
|
||||||
|
// even though resourceA is a prefix of resourceB.
|
||||||
|
if int64(resourceACountExpected) != resourceACountGot {
|
||||||
|
t.Fatalf("store.Count for resource %s: expected %d but got %d", resourceA, resourceACountExpected, resourceACountGot)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ExpectNoDiff(t *testing.T, msg string, expected, got interface{}) {
|
||||||
|
t.Helper()
|
||||||
|
if !reflect.DeepEqual(expected, got) {
|
||||||
|
if diff := cmp.Diff(expected, got); diff != "" {
|
||||||
|
t.Errorf("%s: %s", msg, diff)
|
||||||
|
} else {
|
||||||
|
t.Errorf("%s:\nexpected: %#v\ngot: %#v", msg, expected, got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,220 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package testing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
|
"k8s.io/apiserver/pkg/apis/example"
|
||||||
|
"k8s.io/apiserver/pkg/storage"
|
||||||
|
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
||||||
|
)
|
||||||
|
|
||||||
|
func RunTestWatch(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
testWatch(ctx, t, store, false)
|
||||||
|
testWatch(ctx, t, store, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// It tests that
|
||||||
|
// - first occurrence of objects should notify Add event
|
||||||
|
// - update should trigger Modified event
|
||||||
|
// - update that gets filtered should trigger Deleted event
|
||||||
|
func testWatch(ctx context.Context, t *testing.T, store storage.Interface, recursive bool) {
|
||||||
|
podFoo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||||
|
podBar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
key string
|
||||||
|
pred storage.SelectionPredicate
|
||||||
|
watchTests []*testWatchStruct
|
||||||
|
}{{
|
||||||
|
name: "create a key",
|
||||||
|
key: "/somekey-1",
|
||||||
|
watchTests: []*testWatchStruct{{podFoo, true, watch.Added}},
|
||||||
|
pred: storage.Everything,
|
||||||
|
}, {
|
||||||
|
name: "key updated to match predicate",
|
||||||
|
key: "/somekey-3",
|
||||||
|
watchTests: []*testWatchStruct{{podFoo, false, ""}, {podBar, true, watch.Added}},
|
||||||
|
pred: storage.SelectionPredicate{
|
||||||
|
Label: labels.Everything(),
|
||||||
|
Field: fields.ParseSelectorOrDie("metadata.name=bar"),
|
||||||
|
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||||
|
pod := obj.(*example.Pod)
|
||||||
|
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
name: "update",
|
||||||
|
key: "/somekey-4",
|
||||||
|
watchTests: []*testWatchStruct{{podFoo, true, watch.Added}, {podBar, true, watch.Modified}},
|
||||||
|
pred: storage.Everything,
|
||||||
|
}, {
|
||||||
|
name: "delete because of being filtered",
|
||||||
|
key: "/somekey-5",
|
||||||
|
watchTests: []*testWatchStruct{{podFoo, true, watch.Added}, {podBar, true, watch.Deleted}},
|
||||||
|
pred: storage.SelectionPredicate{
|
||||||
|
Label: labels.Everything(),
|
||||||
|
Field: fields.ParseSelectorOrDie("metadata.name!=bar"),
|
||||||
|
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||||
|
pod := obj.(*example.Pod)
|
||||||
|
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
w, err := store.Watch(ctx, tt.key, storage.ListOptions{ResourceVersion: "0", Predicate: tt.pred, Recursive: recursive})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Watch failed: %v", err)
|
||||||
|
}
|
||||||
|
var prevObj *example.Pod
|
||||||
|
for _, watchTest := range tt.watchTests {
|
||||||
|
out := &example.Pod{}
|
||||||
|
key := tt.key
|
||||||
|
if recursive {
|
||||||
|
key = key + "/item"
|
||||||
|
}
|
||||||
|
err := store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
|
||||||
|
func(runtime.Object) (runtime.Object, error) {
|
||||||
|
return watchTest.obj, nil
|
||||||
|
}), nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
||||||
|
}
|
||||||
|
if watchTest.expectEvent {
|
||||||
|
expectObj := out
|
||||||
|
if watchTest.watchType == watch.Deleted {
|
||||||
|
expectObj = prevObj
|
||||||
|
expectObj.ResourceVersion = out.ResourceVersion
|
||||||
|
}
|
||||||
|
TestCheckResult(t, watchTest.watchType, w, expectObj)
|
||||||
|
}
|
||||||
|
prevObj = out
|
||||||
|
}
|
||||||
|
w.Stop()
|
||||||
|
TestCheckStop(t, w)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func RunTestDeleteTriggerWatch(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
key, storedObj := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||||
|
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Watch failed: %v", err)
|
||||||
|
}
|
||||||
|
if err := store.Delete(ctx, key, &example.Pod{}, nil, storage.ValidateAllObjectFunc, nil); err != nil {
|
||||||
|
t.Fatalf("Delete failed: %v", err)
|
||||||
|
}
|
||||||
|
TestCheckEventType(t, watch.Deleted, w)
|
||||||
|
}
|
||||||
|
|
||||||
|
func RunTestWatchFromNoneZero(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
key, storedObj := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||||
|
|
||||||
|
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Watch failed: %v", err)
|
||||||
|
}
|
||||||
|
out := &example.Pod{}
|
||||||
|
store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
|
||||||
|
func(runtime.Object) (runtime.Object, error) {
|
||||||
|
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, err
|
||||||
|
}), nil)
|
||||||
|
TestCheckResult(t, watch.Modified, w, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
func RunTestWatchInitializationSignal(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
ctx, _ = context.WithTimeout(ctx, 5*time.Second)
|
||||||
|
initSignal := utilflowcontrol.NewInitializationSignal()
|
||||||
|
ctx = utilflowcontrol.WithInitializationSignal(ctx, initSignal)
|
||||||
|
|
||||||
|
key, storedObj := TestPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||||
|
_, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Watch failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
initSignal.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
type testWatchStruct struct {
|
||||||
|
obj *example.Pod
|
||||||
|
expectEvent bool
|
||||||
|
watchType watch.EventType
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCheckEventType(t *testing.T, expectEventType watch.EventType, w watch.Interface) {
|
||||||
|
select {
|
||||||
|
case res := <-w.ResultChan():
|
||||||
|
if res.Type != expectEventType {
|
||||||
|
t.Errorf("event type want=%v, get=%v", expectEventType, res.Type)
|
||||||
|
}
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Errorf("time out after waiting %v on ResultChan", wait.ForeverTestTimeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCheckResult(t *testing.T, expectEventType watch.EventType, w watch.Interface, expectObj *example.Pod) {
|
||||||
|
TestCheckResultFunc(t, expectEventType, w, func(object runtime.Object) error {
|
||||||
|
ExpectNoDiff(t, "incorrect object", expectObj, object)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCheckResultFunc(t *testing.T, expectEventType watch.EventType, w watch.Interface, check func(object runtime.Object) error) {
|
||||||
|
select {
|
||||||
|
case res := <-w.ResultChan():
|
||||||
|
if res.Type != expectEventType {
|
||||||
|
t.Errorf("event type want=%v, get=%v", expectEventType, res.Type)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := check(res.Object); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Errorf("time out after waiting %v on ResultChan", wait.ForeverTestTimeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCheckStop(t *testing.T, w watch.Interface) {
|
||||||
|
select {
|
||||||
|
case e, ok := <-w.ResultChan():
|
||||||
|
if ok {
|
||||||
|
var obj string
|
||||||
|
switch e.Object.(type) {
|
||||||
|
case *example.Pod:
|
||||||
|
obj = e.Object.(*example.Pod).Name
|
||||||
|
case *metav1.Status:
|
||||||
|
obj = e.Object.(*metav1.Status).Message
|
||||||
|
}
|
||||||
|
t.Errorf("ResultChan should have been closed. Event: %s. Object: %s", e.Type, obj)
|
||||||
|
}
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Errorf("time out after waiting 1s on ResultChan")
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user