pkg/storage/etcd3: cut off pkg/api dependency

This commit is contained in:
Dr. Stefan Schimanski 2017-02-01 16:27:40 +01:00
parent 2f9fa55c6f
commit 0e2b2048b2
2 changed files with 94 additions and 78 deletions

View File

@ -22,27 +22,38 @@ import (
"sync" "sync"
"testing" "testing"
apitesting "k8s.io/apimachinery/pkg/api/testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"github.com/coreos/etcd/integration" "github.com/coreos/etcd/integration"
"golang.org/x/net/context" "golang.org/x/net/context"
"k8s.io/apimachinery/pkg/watch"
) )
var scheme = runtime.NewScheme()
var codecs = serializer.NewCodecFactory(scheme)
func init() {
metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
example.AddToScheme(scheme)
examplev1.AddToScheme(scheme)
}
func TestCreate(t *testing.T) { func TestCreate(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
etcdClient := cluster.RandClient() etcdClient := cluster.RandClient()
key := "/testkey" key := "/testkey"
out := &api.Pod{} out := &example.Pod{}
obj := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
// verify that kv pair is empty before set // verify that kv pair is empty before set
getResp, err := etcdClient.KV.Get(ctx, key) getResp, err := etcdClient.KV.Get(ctx, key)
@ -79,10 +90,10 @@ func TestCreateWithTTL(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
input := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
key := "/somekey" key := "/somekey"
out := &api.Pod{} out := &example.Pod{}
if err := store.Create(ctx, key, input, out, 1); err != nil { if err := store.Create(ctx, key, input, out, 1); err != nil {
t.Fatalf("Create failed: %v", err) t.Fatalf("Create failed: %v", err)
} }
@ -97,9 +108,9 @@ func TestCreateWithTTL(t *testing.T) {
func TestCreateWithKeyExist(t *testing.T) { func TestCreateWithKeyExist(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
obj := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
key, _ := testPropogateStore(ctx, t, store, obj) key, _ := testPropogateStore(ctx, t, store, obj)
out := &api.Pod{} out := &example.Pod{}
err := store.Create(ctx, key, obj, out, 0) err := store.Create(ctx, key, obj, out, 0)
if err == nil || !storage.IsNodeExist(err) { if err == nil || !storage.IsNodeExist(err) {
t.Errorf("expecting key exists error, but get: %s", err) t.Errorf("expecting key exists error, but get: %s", err)
@ -109,13 +120,13 @@ func TestCreateWithKeyExist(t *testing.T) {
func TestGet(t *testing.T) { func TestGet(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
key, storedObj := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
tests := []struct { tests := []struct {
key string key string
ignoreNotFound bool ignoreNotFound bool
expectNotFoundErr bool expectNotFoundErr bool
expectedOut *api.Pod expectedOut *example.Pod
}{{ // test get on existing item }{{ // test get on existing item
key: key, key: key,
ignoreNotFound: false, ignoreNotFound: false,
@ -129,11 +140,11 @@ func TestGet(t *testing.T) {
key: "/non-existing", key: "/non-existing",
ignoreNotFound: true, ignoreNotFound: true,
expectNotFoundErr: false, expectNotFoundErr: false,
expectedOut: &api.Pod{}, expectedOut: &example.Pod{},
}} }}
for i, tt := range tests { for i, tt := range tests {
out := &api.Pod{} out := &example.Pod{}
err := store.Get(ctx, tt.key, "", out, tt.ignoreNotFound) err := store.Get(ctx, tt.key, "", out, tt.ignoreNotFound)
if tt.expectNotFoundErr { if tt.expectNotFoundErr {
if err == nil || !storage.IsNotFound(err) { if err == nil || !storage.IsNotFound(err) {
@ -153,11 +164,11 @@ func TestGet(t *testing.T) {
func TestUnconditionalDelete(t *testing.T) { func TestUnconditionalDelete(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
key, storedObj := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
tests := []struct { tests := []struct {
key string key string
expectedObj *api.Pod expectedObj *example.Pod
expectNotFoundErr bool expectNotFoundErr bool
}{{ // test unconditional delete on existing key }{{ // test unconditional delete on existing key
key: key, key: key,
@ -170,7 +181,7 @@ func TestUnconditionalDelete(t *testing.T) {
}} }}
for i, tt := range tests { for i, tt := range tests {
out := &api.Pod{} // reset out := &example.Pod{} // reset
err := store.Delete(ctx, tt.key, out, nil) err := store.Delete(ctx, tt.key, out, nil)
if tt.expectNotFoundErr { if tt.expectNotFoundErr {
if err == nil || !storage.IsNotFound(err) { if err == nil || !storage.IsNotFound(err) {
@ -190,7 +201,7 @@ func TestUnconditionalDelete(t *testing.T) {
func TestConditionalDelete(t *testing.T) { func TestConditionalDelete(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
key, storedObj := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
tests := []struct { tests := []struct {
precondition *storage.Preconditions precondition *storage.Preconditions
@ -204,7 +215,7 @@ func TestConditionalDelete(t *testing.T) {
}} }}
for i, tt := range tests { for i, tt := range tests {
out := &api.Pod{} out := &example.Pod{}
err := store.Delete(ctx, key, out, tt.precondition) err := store.Delete(ctx, key, out, tt.precondition)
if tt.expectInvalidObjErr { if tt.expectInvalidObjErr {
if err == nil || !storage.IsInvalidObj(err) { if err == nil || !storage.IsInvalidObj(err) {
@ -218,23 +229,23 @@ func TestConditionalDelete(t *testing.T) {
if !reflect.DeepEqual(storedObj, out) { if !reflect.DeepEqual(storedObj, out) {
t.Errorf("#%d: pod want=%#v, get=%#v", i, storedObj, out) t.Errorf("#%d: pod want=%#v, get=%#v", i, storedObj, out)
} }
key, storedObj = testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}) key, storedObj = testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
} }
} }
func TestGetToList(t *testing.T) { func TestGetToList(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
key, storedObj := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
tests := []struct { tests := []struct {
key string key string
pred storage.SelectionPredicate pred storage.SelectionPredicate
expectedOut []*api.Pod expectedOut []*example.Pod
}{{ // test GetToList on existing key }{{ // test GetToList on existing key
key: key, key: key,
pred: storage.Everything, pred: storage.Everything,
expectedOut: []*api.Pod{storedObj}, expectedOut: []*example.Pod{storedObj},
}, { // test GetToList on non-existing key }, { // test GetToList on non-existing key
key: "/non-existing", key: "/non-existing",
pred: storage.Everything, pred: storage.Everything,
@ -245,7 +256,7 @@ func TestGetToList(t *testing.T) {
Label: labels.Everything(), Label: labels.Everything(),
Field: fields.ParseSelectorOrDie("metadata.name!=" + storedObj.Name), Field: fields.ParseSelectorOrDie("metadata.name!=" + storedObj.Name),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod := obj.(*api.Pod) pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, nil return nil, fields.Set{"metadata.name": pod.Name}, nil
}, },
}, },
@ -253,7 +264,7 @@ func TestGetToList(t *testing.T) {
}} }}
for i, tt := range tests { for i, tt := range tests {
out := &api.PodList{} out := &example.PodList{}
err := store.GetToList(ctx, tt.key, "", tt.pred, out) err := store.GetToList(ctx, tt.key, "", tt.pred, out)
if err != nil { if err != nil {
t.Fatalf("GetToList failed: %v", err) t.Fatalf("GetToList failed: %v", err)
@ -274,7 +285,7 @@ func TestGetToList(t *testing.T) {
func TestGuaranteedUpdate(t *testing.T) { func TestGuaranteedUpdate(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
key, storeObj := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}) key, storeObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
tests := []struct { tests := []struct {
key string key string
@ -328,7 +339,7 @@ func TestGuaranteedUpdate(t *testing.T) {
}} }}
for i, tt := range tests { for i, tt := range tests {
out := &api.Pod{} out := &example.Pod{}
name := fmt.Sprintf("foo-%d", i) name := fmt.Sprintf("foo-%d", i)
if tt.expectNoUpdate { if tt.expectNoUpdate {
name = storeObj.Name name = storeObj.Name
@ -337,7 +348,7 @@ func TestGuaranteedUpdate(t *testing.T) {
err := store.GuaranteedUpdate(ctx, tt.key, out, tt.ignoreNotFound, tt.precondition, err := store.GuaranteedUpdate(ctx, tt.key, out, tt.ignoreNotFound, tt.precondition,
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
if tt.expectNotFoundErr && tt.ignoreNotFound { if tt.expectNotFoundErr && tt.ignoreNotFound {
if pod := obj.(*api.Pod); pod.Name != "" { if pod := obj.(*example.Pod); pod.Name != "" {
t.Errorf("#%d: expecting zero value, but get=%#v", i, pod) t.Errorf("#%d: expecting zero value, but get=%#v", i, pod)
} }
} }
@ -382,10 +393,10 @@ func TestGuaranteedUpdateWithTTL(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
input := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
key := "/somekey" key := "/somekey"
out := &api.Pod{} out := &example.Pod{}
err := store.GuaranteedUpdate(ctx, key, out, true, nil, err := store.GuaranteedUpdate(ctx, key, out, true, nil,
func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) { func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
ttl := uint64(1) ttl := uint64(1)
@ -405,7 +416,7 @@ func TestGuaranteedUpdateWithTTL(t *testing.T) {
func TestGuaranteedUpdateWithConflict(t *testing.T) { func TestGuaranteedUpdateWithConflict(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
key, _ := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) key, _ := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
errChan := make(chan error, 1) errChan := make(chan error, 1)
var firstToFinish sync.WaitGroup var firstToFinish sync.WaitGroup
@ -414,9 +425,9 @@ func TestGuaranteedUpdateWithConflict(t *testing.T) {
secondToEnter.Add(1) secondToEnter.Add(1)
go func() { go func() {
err := store.GuaranteedUpdate(ctx, key, &api.Pod{}, false, nil, err := store.GuaranteedUpdate(ctx, key, &example.Pod{}, false, nil,
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
pod := obj.(*api.Pod) pod := obj.(*example.Pod)
pod.Name = "foo-1" pod.Name = "foo-1"
secondToEnter.Wait() secondToEnter.Wait()
return pod, nil return pod, nil
@ -426,14 +437,14 @@ func TestGuaranteedUpdateWithConflict(t *testing.T) {
}() }()
updateCount := 0 updateCount := 0
err := store.GuaranteedUpdate(ctx, key, &api.Pod{}, false, nil, err := store.GuaranteedUpdate(ctx, key, &example.Pod{}, false, nil,
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
if updateCount == 0 { if updateCount == 0 {
secondToEnter.Done() secondToEnter.Done()
firstToFinish.Wait() firstToFinish.Wait()
} }
updateCount++ updateCount++
pod := obj.(*api.Pod) pod := obj.(*example.Pod)
pod.Name = "foo-2" pod.Name = "foo-2"
return pod, nil return pod, nil
})) }))
@ -450,9 +461,10 @@ func TestGuaranteedUpdateWithConflict(t *testing.T) {
} }
func TestList(t *testing.T) { func TestList(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t) defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), false, testapi.Default.Codec(), "") store := newStore(cluster.RandClient(), false, codec, "")
ctx := context.Background() ctx := context.Background()
// Setup storage with the following structure: // Setup storage with the following structure:
@ -468,21 +480,21 @@ func TestList(t *testing.T) {
// - test // - test
preset := []struct { preset := []struct {
key string key string
obj *api.Pod obj *example.Pod
storedObj *api.Pod storedObj *example.Pod
}{{ }{{
key: "/one-level/test", key: "/one-level/test",
obj: &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
}, { }, {
key: "/two-level/1/test", key: "/two-level/1/test",
obj: &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
}, { }, {
key: "/two-level/2/test", key: "/two-level/2/test",
obj: &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}},
}} }}
for i, ps := range preset { for i, ps := range preset {
preset[i].storedObj = &api.Pod{} preset[i].storedObj = &example.Pod{}
err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0)
if err != nil { if err != nil {
t.Fatalf("Set failed: %v", err) t.Fatalf("Set failed: %v", err)
@ -492,11 +504,11 @@ func TestList(t *testing.T) {
tests := []struct { tests := []struct {
prefix string prefix string
pred storage.SelectionPredicate pred storage.SelectionPredicate
expectedOut []*api.Pod expectedOut []*example.Pod
}{{ // test List on existing key }{{ // test List on existing key
prefix: "/one-level/", prefix: "/one-level/",
pred: storage.Everything, pred: storage.Everything,
expectedOut: []*api.Pod{preset[0].storedObj}, expectedOut: []*example.Pod{preset[0].storedObj},
}, { // test List on non-existing key }, { // test List on non-existing key
prefix: "/non-existing/", prefix: "/non-existing/",
pred: storage.Everything, pred: storage.Everything,
@ -507,7 +519,7 @@ func TestList(t *testing.T) {
Label: labels.Everything(), Label: labels.Everything(),
Field: fields.ParseSelectorOrDie("metadata.name!=" + preset[0].storedObj.Name), Field: fields.ParseSelectorOrDie("metadata.name!=" + preset[0].storedObj.Name),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod := obj.(*api.Pod) pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, nil return nil, fields.Set{"metadata.name": pod.Name}, nil
}, },
}, },
@ -515,11 +527,11 @@ func TestList(t *testing.T) {
}, { // test List with multiple levels of directories and expect flattened result }, { // test List with multiple levels of directories and expect flattened result
prefix: "/two-level/", prefix: "/two-level/",
pred: storage.Everything, pred: storage.Everything,
expectedOut: []*api.Pod{preset[1].storedObj, preset[2].storedObj}, expectedOut: []*example.Pod{preset[1].storedObj, preset[2].storedObj},
}} }}
for i, tt := range tests { for i, tt := range tests {
out := &api.PodList{} out := &example.PodList{}
err := store.List(ctx, tt.prefix, "0", tt.pred, out) err := store.List(ctx, tt.prefix, "0", tt.pred, out)
if err != nil { if err != nil {
t.Fatalf("List failed: %v", err) t.Fatalf("List failed: %v", err)
@ -538,18 +550,19 @@ func TestList(t *testing.T) {
} }
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) { func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
store := newStore(cluster.RandClient(), false, testapi.Default.Codec(), "") store := newStore(cluster.RandClient(), false, codec, "")
ctx := context.Background() ctx := context.Background()
return ctx, store, cluster return ctx, store, cluster
} }
// testPropogateStore helps propogates store with objects, automates key generation, and returns // testPropogateStore helps propogates store with objects, automates key generation, and returns
// keys and stored objects. // keys and stored objects.
func testPropogateStore(ctx context.Context, t *testing.T, store *store, obj *api.Pod) (string, *api.Pod) { func testPropogateStore(ctx context.Context, t *testing.T, store *store, obj *example.Pod) (string, *example.Pod) {
// Setup store with a key and grab the output for returning. // Setup store with a key and grab the output for returning.
key := "/testkey" key := "/testkey"
setOutput := &api.Pod{} setOutput := &example.Pod{}
err := store.Create(ctx, key, obj, setOutput, 0) err := store.Create(ctx, key, obj, setOutput, 0)
if err != nil { if err != nil {
t.Fatalf("Set failed: %v", err) t.Fatalf("Set failed: %v", err)

View File

@ -28,6 +28,8 @@ import (
"github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/integration" "github.com/coreos/etcd/integration"
"golang.org/x/net/context" "golang.org/x/net/context"
apitesting "k8s.io/apimachinery/pkg/api/testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
@ -35,9 +37,9 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
) )
func TestWatch(t *testing.T) { func TestWatch(t *testing.T) {
@ -55,8 +57,8 @@ func TestWatchList(t *testing.T) {
func testWatch(t *testing.T, recursive bool) { func testWatch(t *testing.T, recursive bool) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
podFoo := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} podFoo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
podBar := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}} podBar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
tests := []struct { tests := []struct {
key string key string
@ -73,7 +75,7 @@ func testWatch(t *testing.T, recursive bool) {
Label: labels.Everything(), Label: labels.Everything(),
Field: fields.ParseSelectorOrDie("metadata.name=bar"), Field: fields.ParseSelectorOrDie("metadata.name=bar"),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod := obj.(*api.Pod) pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, nil return nil, fields.Set{"metadata.name": pod.Name}, nil
}, },
}, },
@ -88,7 +90,7 @@ func testWatch(t *testing.T, recursive bool) {
Label: labels.Everything(), Label: labels.Everything(),
Field: fields.ParseSelectorOrDie("metadata.name!=bar"), Field: fields.ParseSelectorOrDie("metadata.name!=bar"),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod := obj.(*api.Pod) pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, nil return nil, fields.Set{"metadata.name": pod.Name}, nil
}, },
}, },
@ -98,9 +100,9 @@ func testWatch(t *testing.T, recursive bool) {
if err != nil { if err != nil {
t.Fatalf("Watch failed: %v", err) t.Fatalf("Watch failed: %v", err)
} }
var prevObj *api.Pod var prevObj *example.Pod
for _, watchTest := range tt.watchTests { for _, watchTest := range tt.watchTests {
out := &api.Pod{} out := &example.Pod{}
key := tt.key key := tt.key
if recursive { if recursive {
key = key + "/item" key = key + "/item"
@ -130,12 +132,12 @@ func testWatch(t *testing.T, recursive bool) {
func TestDeleteTriggerWatch(t *testing.T) { func TestDeleteTriggerWatch(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
key, storedObj := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything) w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything)
if err != nil { if err != nil {
t.Fatalf("Watch failed: %v", err) t.Fatalf("Watch failed: %v", err)
} }
if err := store.Delete(ctx, key, &api.Pod{}, nil); err != nil { if err := store.Delete(ctx, key, &example.Pod{}, nil); err != nil {
t.Fatalf("Delete failed: %v", err) t.Fatalf("Delete failed: %v", err)
} }
testCheckEventType(t, watch.Deleted, w) testCheckEventType(t, watch.Deleted, w)
@ -147,7 +149,7 @@ func TestDeleteTriggerWatch(t *testing.T) {
func TestWatchFromZero(t *testing.T) { func TestWatchFromZero(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
key, storedObj := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}})
w, err := store.Watch(ctx, key, "0", storage.Everything) w, err := store.Watch(ctx, key, "0", storage.Everything)
if err != nil { if err != nil {
@ -157,10 +159,10 @@ func TestWatchFromZero(t *testing.T) {
w.Stop() w.Stop()
// Update // Update
out := &api.Pod{} out := &example.Pod{}
err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
func(runtime.Object) (runtime.Object, error) { func(runtime.Object) (runtime.Object, error) {
return &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns", Annotations: map[string]string{"a": "1"}}}, nil return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns", Annotations: map[string]string{"a": "1"}}}, nil
})) }))
if err != nil { if err != nil {
t.Fatalf("GuaranteedUpdate failed: %v", err) t.Fatalf("GuaranteedUpdate failed: %v", err)
@ -175,10 +177,10 @@ func TestWatchFromZero(t *testing.T) {
w.Stop() w.Stop()
// Update again // Update again
out = &api.Pod{} out = &example.Pod{}
err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
func(runtime.Object) (runtime.Object, error) { func(runtime.Object) (runtime.Object, error) {
return &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}, nil return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}, nil
})) }))
if err != nil { if err != nil {
t.Fatalf("GuaranteedUpdate failed: %v", err) t.Fatalf("GuaranteedUpdate failed: %v", err)
@ -207,33 +209,34 @@ func TestWatchFromZero(t *testing.T) {
func TestWatchFromNoneZero(t *testing.T) { func TestWatchFromNoneZero(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
key, storedObj := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything) w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything)
if err != nil { if err != nil {
t.Fatalf("Watch failed: %v", err) t.Fatalf("Watch failed: %v", err)
} }
out := &api.Pod{} out := &example.Pod{}
store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
func(runtime.Object) (runtime.Object, error) { func(runtime.Object) (runtime.Object, error) {
return &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, err return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, err
})) }))
testCheckResult(t, 0, watch.Modified, w, out) testCheckResult(t, 0, watch.Modified, w, out)
} }
func TestWatchError(t *testing.T) { func TestWatchError(t *testing.T) {
codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)}
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t) defer cluster.Terminate(t)
invalidStore := newStore(cluster.RandClient(), false, &testCodec{testapi.Default.Codec()}, "") invalidStore := newStore(cluster.RandClient(), false, codec, "")
ctx := context.Background() ctx := context.Background()
w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything) w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything)
if err != nil { if err != nil {
t.Fatalf("Watch failed: %v", err) t.Fatalf("Watch failed: %v", err)
} }
validStore := newStore(cluster.RandClient(), false, testapi.Default.Codec(), "") validStore := newStore(cluster.RandClient(), false, codec, "")
validStore.GuaranteedUpdate(ctx, "/abc", &api.Pod{}, true, nil, storage.SimpleUpdate( validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate(
func(runtime.Object) (runtime.Object, error) { func(runtime.Object) (runtime.Object, error) {
return &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil
})) }))
testCheckEventType(t, watch.Error, w) testCheckEventType(t, watch.Error, w)
} }
@ -286,7 +289,7 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
key, storedObj := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything) w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything)
if err != nil { if err != nil {
@ -294,12 +297,12 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
} }
etcdW := cluster.RandClient().Watch(ctx, "/", clientv3.WithPrefix()) etcdW := cluster.RandClient().Watch(ctx, "/", clientv3.WithPrefix())
if err := store.Delete(ctx, key, &api.Pod{}, &storage.Preconditions{}); err != nil { if err := store.Delete(ctx, key, &example.Pod{}, &storage.Preconditions{}); err != nil {
t.Fatalf("Delete failed: %v", err) t.Fatalf("Delete failed: %v", err)
} }
e := <-w.ResultChan() e := <-w.ResultChan()
watchedDeleteObj := e.Object.(*api.Pod) watchedDeleteObj := e.Object.(*example.Pod)
var wres clientv3.WatchResponse var wres clientv3.WatchResponse
wres = <-etcdW wres = <-etcdW
@ -314,7 +317,7 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
} }
type testWatchStruct struct { type testWatchStruct struct {
obj *api.Pod obj *example.Pod
expectEvent bool expectEvent bool
watchType watch.EventType watchType watch.EventType
} }
@ -338,7 +341,7 @@ func testCheckEventType(t *testing.T, expectEventType watch.EventType, w watch.I
} }
} }
func testCheckResult(t *testing.T, i int, expectEventType watch.EventType, w watch.Interface, expectObj *api.Pod) { func testCheckResult(t *testing.T, i int, expectEventType watch.EventType, w watch.Interface, expectObj *example.Pod) {
select { select {
case res := <-w.ResultChan(): case res := <-w.ResultChan():
if res.Type != expectEventType { if res.Type != expectEventType {
@ -359,8 +362,8 @@ func testCheckStop(t *testing.T, i int, w watch.Interface) {
if ok { if ok {
var obj string var obj string
switch e.Object.(type) { switch e.Object.(type) {
case *api.Pod: case *example.Pod:
obj = e.Object.(*api.Pod).Name obj = e.Object.(*example.Pod).Name
case *metav1.Status: case *metav1.Status:
obj = e.Object.(*metav1.Status).Message obj = e.Object.(*metav1.Status).Message
} }