Remove fakeClient from cacher_test.

This commit is contained in:
Wojciech Tyczynski 2015-11-02 14:51:56 +01:00
parent b79379516f
commit 6246201eec
2 changed files with 139 additions and 345 deletions

View File

@ -17,13 +17,11 @@ limitations under the License.
package storage_test package storage_test
import ( import (
"fmt"
"reflect" "reflect"
"strconv"
"testing" "testing"
"time" "time"
"github.com/coreos/go-etcd/etcd"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/testapi"
@ -33,7 +31,6 @@ import (
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/tools"
"k8s.io/kubernetes/pkg/tools/etcdtest" "k8s.io/kubernetes/pkg/tools/etcdtest"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
@ -42,11 +39,11 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
) )
func newTestCacher(client tools.EtcdClient) *storage.Cacher { func newTestCacher(s storage.Interface) *storage.Cacher {
prefix := "pods" prefix := "pods"
config := storage.CacherConfig{ config := storage.CacherConfig{
CacheCapacity: 10, CacheCapacity: 10,
Storage: etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), etcdtest.PathPrefix()), Storage: s,
Versioner: etcdstorage.APIObjectVersioner{}, Versioner: etcdstorage.APIObjectVersioner{},
ListFromCache: true, ListFromCache: true,
Type: &api.Pod{}, Type: &api.Pod{},
@ -65,12 +62,29 @@ func makeTestPod(name string) *api.Pod {
} }
} }
func updatePod(t *testing.T, s storage.Interface, obj, old *api.Pod) *api.Pod {
key := etcdtest.AddPrefix("pods/ns/" + obj.Name)
result := &api.Pod{}
if old == nil {
if err := s.Create(context.TODO(), key, obj, result, 0); err != nil {
t.Errorf("unexpected error: %v", err)
}
} else {
// To force "update" behavior of Set() we need to set ResourceVersion of
// previous version of object.
obj.ResourceVersion = old.ResourceVersion
if err := s.Set(context.TODO(), key, obj, result, 0); err != nil {
t.Errorf("unexpected error: %v", err)
}
obj.ResourceVersion = ""
}
return result
}
func TestList(t *testing.T) { func TestList(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t) server, etcdStorage := etcdstorage.NewEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
prefixedKey := etcdtest.AddPrefix("pods") defer server.Terminate(t)
fakeClient.ExpectNotFoundGet(prefixedKey) cacher := newTestCacher(etcdStorage)
cacher := newTestCacher(fakeClient)
fakeClient.WaitForWatchCompletion()
podFoo := makeTestPod("foo") podFoo := makeTestPod("foo")
podBar := makeTestPod("bar") podBar := makeTestPod("bar")
@ -79,353 +93,149 @@ func TestList(t *testing.T) {
podFooPrime := makeTestPod("foo") podFooPrime := makeTestPod("foo")
podFooPrime.Spec.NodeName = "fakeNode" podFooPrime.Spec.NodeName = "fakeNode"
testCases := []*etcd.Response{ fooCreated := updatePod(t, etcdStorage, podFoo, nil)
{ _ = updatePod(t, etcdStorage, podBar, nil)
Action: "create", _ = updatePod(t, etcdStorage, podBaz, nil)
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)),
CreatedIndex: 1,
ModifiedIndex: 1,
},
},
{
Action: "create",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podBar)),
CreatedIndex: 2,
ModifiedIndex: 2,
},
},
{
Action: "create",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podBaz)),
CreatedIndex: 3,
ModifiedIndex: 3,
},
},
{
Action: "set",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFooPrime)),
CreatedIndex: 1,
ModifiedIndex: 4,
},
PrevNode: &etcd.Node{
Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)),
CreatedIndex: 1,
ModifiedIndex: 1,
},
},
{
Action: "delete",
Node: &etcd.Node{
CreatedIndex: 1,
ModifiedIndex: 5,
},
PrevNode: &etcd.Node{
Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podBar)),
CreatedIndex: 1,
ModifiedIndex: 1,
},
},
}
// Propagate some data to etcd. _ = updatePod(t, etcdStorage, podFooPrime, fooCreated)
for _, test := range testCases {
fakeClient.WatchResponse <- test deleted := api.Pod{}
if err := etcdStorage.Delete(context.TODO(), etcdtest.AddPrefix("pods/ns/bar"), &deleted); err != nil {
t.Errorf("Unexpected error: %v", err)
} }
result := &api.PodList{} result := &api.PodList{}
if err := cacher.List(context.TODO(), "pods/ns", 5, storage.Everything, result); err != nil { // TODO: We need to pass ResourceVersion of barPod deletion operation.
t.Errorf("unexpected error: %v", err) // However, there is no easy way to get it, so it is hardcoded to 8.
if err := cacher.List(context.TODO(), "pods/ns", 8, storage.Everything, result); err != nil {
t.Errorf("Unexpected error: %v", err)
} }
if result.ListMeta.ResourceVersion != "5" { if result.ListMeta.ResourceVersion != "8" {
t.Errorf("incorrect resource version: %v", result.ListMeta.ResourceVersion) t.Errorf("Incorrect resource version: %v", result.ListMeta.ResourceVersion)
} }
if len(result.Items) != 2 { if len(result.Items) != 2 {
t.Errorf("unexpected list result: %d", len(result.Items)) t.Errorf("Unexpected list result: %d", len(result.Items))
} }
keys := sets.String{} keys := sets.String{}
for _, item := range result.Items { for _, item := range result.Items {
keys.Insert(item.ObjectMeta.Name) keys.Insert(item.Name)
} }
if !keys.HasAll("foo", "baz") { if !keys.HasAll("foo", "baz") {
t.Errorf("unexpected list result: %#v", result) t.Errorf("Unexpected list result: %#v", result)
} }
for _, item := range result.Items { for _, item := range result.Items {
// unset fields that are set by the infrastructure // unset fields that are set by the infrastructure
item.ObjectMeta.ResourceVersion = "" item.ResourceVersion = ""
item.ObjectMeta.CreationTimestamp = unversioned.Time{} item.CreationTimestamp = unversioned.Time{}
var expected *api.Pod var expected *api.Pod
switch item.ObjectMeta.Name { switch item.Name {
case "foo": case "foo":
expected = podFooPrime expected = podFooPrime
case "baz": case "baz":
expected = podBaz expected = podBaz
default: default:
t.Errorf("unexpected item: %v", item) t.Errorf("Unexpected item: %v", item)
} }
if e, a := *expected, item; !reflect.DeepEqual(e, a) { if e, a := *expected, item; !reflect.DeepEqual(e, a) {
t.Errorf("expected: %#v, got: %#v", e, a) t.Errorf("Expected: %#v, got: %#v", e, a)
} }
} }
}
close(fakeClient.WatchResponse) func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) {
select {
case event := <-w.ResultChan():
if e, a := eventType, event.Type; e != a {
t.Errorf("Expected: %s, got: %s", eventType, event.Type)
}
if e, a := eventObject, event.Object; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("Expected (%s): %#v, got: %#v", eventType, e, a)
}
case <-time.After(util.ForeverTestTimeout):
t.Errorf("Timed out waiting for an event")
}
} }
func TestWatch(t *testing.T) { func TestWatch(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t) server, etcdStorage := etcdstorage.NewEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
prefixedKey := etcdtest.AddPrefix("pods") defer server.Terminate(t)
fakeClient.ExpectNotFoundGet(prefixedKey) cacher := newTestCacher(etcdStorage)
cacher := newTestCacher(fakeClient)
fakeClient.WaitForWatchCompletion()
podFoo := makeTestPod("foo") podFoo := makeTestPod("foo")
podBar := makeTestPod("bar") podBar := makeTestPod("bar")
testCases := []struct { podFooPrime := makeTestPod("foo")
object *api.Pod podFooPrime.Spec.NodeName = "fakeNode"
etcdResponse *etcd.Response
event watch.EventType podFooBis := makeTestPod("foo")
filtered bool podFooBis.Spec.NodeName = "anotherFakeNode"
}{
{
object: podFoo,
etcdResponse: &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)),
CreatedIndex: 2,
ModifiedIndex: 2,
},
},
event: watch.Added,
filtered: true,
},
{
object: podBar,
etcdResponse: &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podBar)),
CreatedIndex: 3,
ModifiedIndex: 3,
},
},
event: watch.Added,
filtered: false,
},
{
object: podFoo,
etcdResponse: &etcd.Response{
Action: "set",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)),
CreatedIndex: 2,
ModifiedIndex: 4,
},
PrevNode: &etcd.Node{
Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)),
CreatedIndex: 2,
ModifiedIndex: 2,
},
},
event: watch.Modified,
filtered: true,
},
}
// Set up Watch for object "podFoo". // Set up Watch for object "podFoo".
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 2, storage.Everything) watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 1, storage.Everything)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
for _, test := range testCases { fooCreated := updatePod(t, etcdStorage, podFoo, nil)
fakeClient.WatchResponse <- test.etcdResponse _ = updatePod(t, etcdStorage, podBar, nil)
if test.filtered { fooUpdated := updatePod(t, etcdStorage, podFooPrime, fooCreated)
event := <-watcher.ResultChan()
if e, a := test.event, event.Type; e != a { verifyWatchEvent(t, watcher, watch.Added, podFoo)
t.Errorf("%v %v", e, a) verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
}
// unset fields that are set by the infrastructure
obj := event.Object.(*api.Pod)
obj.ObjectMeta.ResourceVersion = ""
obj.ObjectMeta.CreationTimestamp = unversioned.Time{}
if e, a := test.object, obj; !reflect.DeepEqual(e, a) {
t.Errorf("expected: %#v, got: %#v", e, a)
}
}
}
// Check whether we get too-old error. // Check whether we get too-old error.
_, err = cacher.Watch(context.TODO(), "pods/ns/foo", 1, storage.Everything) _, err = cacher.Watch(context.TODO(), "pods/ns/foo", 1, storage.Everything)
if err == nil { if err == nil {
t.Errorf("exepcted 'error too old' error") t.Errorf("Expected 'error too old' error")
} }
// Now test watch with initial state. // Now test watch with initial state.
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 2, storage.Everything) initialVersion, err := strconv.Atoi(fooCreated.ResourceVersion)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("Incorrect resourceVersion: %s", fooCreated.ResourceVersion)
} }
for _, test := range testCases { initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", uint64(initialVersion), storage.Everything)
if test.filtered { if err != nil {
event := <-initialWatcher.ResultChan() t.Fatalf("Unexpected error: %v", err)
if e, a := test.event, event.Type; e != a {
t.Errorf("%v %v", e, a)
}
// unset fields that are set by the infrastructure
obj := event.Object.(*api.Pod)
obj.ObjectMeta.ResourceVersion = ""
obj.ObjectMeta.CreationTimestamp = unversioned.Time{}
if e, a := test.object, obj; !reflect.DeepEqual(e, a) {
t.Errorf("expected: %#v, got: %#v", e, a)
}
}
} }
verifyWatchEvent(t, initialWatcher, watch.Added, podFoo)
verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime)
// Now test watch from "now". // Now test watch from "now".
nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 0, storage.Everything) nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 0, storage.Everything)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
}
select {
case event := <-nowWatcher.ResultChan():
if obj := event.Object.(*api.Pod); event.Type != watch.Added || obj.ResourceVersion != "4" {
t.Errorf("unexpected event: %v", event)
}
case <-time.After(util.ForeverTestTimeout):
t.Errorf("timed out waiting for an event")
}
// Emit a new event and check if it is observed by the watcher.
fakeClient.WatchResponse <- &etcd.Response{
Action: "set",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)),
CreatedIndex: 2,
ModifiedIndex: 5,
},
PrevNode: &etcd.Node{
Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)),
CreatedIndex: 2,
ModifiedIndex: 4,
},
}
event := <-nowWatcher.ResultChan()
obj := event.Object.(*api.Pod)
if event.Type != watch.Modified || obj.ResourceVersion != "5" {
t.Errorf("unexpected event: %v", event)
} }
close(fakeClient.WatchResponse) verifyWatchEvent(t, nowWatcher, watch.Added, podFooPrime)
_ = updatePod(t, etcdStorage, podFooBis, fooUpdated)
verifyWatchEvent(t, nowWatcher, watch.Modified, podFooBis)
} }
func TestFiltering(t *testing.T) { func TestFiltering(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t) server, etcdStorage := etcdstorage.NewEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
prefixedKey := etcdtest.AddPrefix("pods") defer server.Terminate(t)
fakeClient.ExpectNotFoundGet(prefixedKey) cacher := newTestCacher(etcdStorage)
cacher := newTestCacher(fakeClient)
fakeClient.WaitForWatchCompletion()
podFoo := makeTestPod("foo") podFoo := makeTestPod("foo")
podFoo.ObjectMeta.Labels = map[string]string{"filter": "foo"} podFoo.Labels = map[string]string{"filter": "foo"}
podFooFiltered := makeTestPod("foo") podFooFiltered := makeTestPod("foo")
podFooPrime := makeTestPod("foo")
podFooPrime.Labels = map[string]string{"filter": "foo"}
podFooPrime.Spec.NodeName = "fakeNode"
testCases := []struct { fooCreated := updatePod(t, etcdStorage, podFoo, nil)
object *api.Pod fooFiltered := updatePod(t, etcdStorage, podFooFiltered, fooCreated)
etcdResponse *etcd.Response fooUnfiltered := updatePod(t, etcdStorage, podFoo, fooFiltered)
filtered bool _ = updatePod(t, etcdStorage, podFooPrime, fooUnfiltered)
event watch.EventType
}{ deleted := api.Pod{}
{ if err := etcdStorage.Delete(context.TODO(), etcdtest.AddPrefix("pods/ns/foo"), &deleted); err != nil {
object: podFoo, t.Errorf("Unexpected error: %v", err)
etcdResponse: &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)),
CreatedIndex: 1,
ModifiedIndex: 1,
},
},
filtered: true,
event: watch.Added,
},
{
object: podFooFiltered,
etcdResponse: &etcd.Response{
Action: "set",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFooFiltered)),
CreatedIndex: 1,
ModifiedIndex: 2,
},
PrevNode: &etcd.Node{
Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)),
CreatedIndex: 1,
ModifiedIndex: 1,
},
},
filtered: true,
// Deleted, because the new object doesn't match filter.
event: watch.Deleted,
},
{
object: podFoo,
etcdResponse: &etcd.Response{
Action: "set",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)),
CreatedIndex: 1,
ModifiedIndex: 3,
},
PrevNode: &etcd.Node{
Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFooFiltered)),
CreatedIndex: 1,
ModifiedIndex: 2,
},
},
filtered: true,
// Added, because the previous object didn't match filter.
event: watch.Added,
},
{
object: podFoo,
etcdResponse: &etcd.Response{
Action: "set",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)),
CreatedIndex: 1,
ModifiedIndex: 4,
},
PrevNode: &etcd.Node{
Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)),
CreatedIndex: 1,
ModifiedIndex: 3,
},
},
filtered: true,
event: watch.Modified,
},
{
object: podFoo,
etcdResponse: &etcd.Response{
Action: "delete",
Node: &etcd.Node{
CreatedIndex: 1,
ModifiedIndex: 5,
},
PrevNode: &etcd.Node{
Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)),
CreatedIndex: 1,
ModifiedIndex: 4,
},
},
filtered: true,
event: watch.Deleted,
},
} }
// Set up Watch for object "podFoo" with label filter set. // Set up Watch for object "podFoo" with label filter set.
@ -433,68 +243,43 @@ func TestFiltering(t *testing.T) {
filter := func(obj runtime.Object) bool { filter := func(obj runtime.Object) bool {
metadata, err := meta.Accessor(obj) metadata, err := meta.Accessor(obj)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
return false return false
} }
return selector.Matches(labels.Set(metadata.Labels())) return selector.Matches(labels.Set(metadata.Labels()))
} }
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 1, filter) initialVersion, err := strconv.Atoi(fooCreated.ResourceVersion)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("Incorrect resourceVersion: %s", fooCreated.ResourceVersion)
}
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", uint64(initialVersion), filter)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
} }
for _, test := range testCases { verifyWatchEvent(t, watcher, watch.Added, podFoo)
fakeClient.WatchResponse <- test.etcdResponse verifyWatchEvent(t, watcher, watch.Deleted, podFooFiltered)
if test.filtered { verifyWatchEvent(t, watcher, watch.Added, podFoo)
event := <-watcher.ResultChan() verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
if e, a := test.event, event.Type; e != a { verifyWatchEvent(t, watcher, watch.Deleted, podFooPrime)
t.Errorf("%v %v", e, a)
}
// unset fields that are set by the infrastructure
obj := event.Object.(*api.Pod)
obj.ObjectMeta.ResourceVersion = ""
obj.ObjectMeta.CreationTimestamp = unversioned.Time{}
if e, a := test.object, obj; !reflect.DeepEqual(e, a) {
t.Errorf("expected: %#v, got: %#v", e, a)
}
}
}
close(fakeClient.WatchResponse)
} }
/* TODO: So believe it or not... but this test is flakey with the go-etcd client library
* which I'm surprised by. Apprently you can close the client that is performing the watch
* and the watch *never returns.* I would like to still keep this test here and re-enable
* with the new 2.2+ client library.
func TestStorageError(t *testing.T) { func TestStorageError(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t) server, etcdStorage := etcdstorage.NewEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
prefixedKey := etcdtest.AddPrefix("pods") cacher := newTestCacher(etcdStorage)
fakeClient.ExpectNotFoundGet(prefixedKey)
cacher := newTestCacher(fakeClient)
fakeClient.WaitForWatchCompletion()
podFoo := makeTestPod("foo")
// Set up Watch for object "podFoo".
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 1, storage.Everything) watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 1, storage.Everything)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
server.Terminate(t)
fakeClient.WatchResponse <- &etcd.Response{ got := <-watcher.ResultChan()
Action: "create", if got.Type != watch.Error {
Node: &etcd.Node{ t.Errorf("Unexpected non-error")
Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)),
CreatedIndex: 1,
ModifiedIndex: 1,
},
} }
_ = <-watcher.ResultChan() } */
// Injecting error is simulating error from etcd.
// This is almost the same what would happen e.g. in case of
// "error too old" when reconnecting to etcd watch.
fakeClient.WatchInjectError <- fmt.Errorf("fake error")
_, ok := <-watcher.ResultChan()
if ok {
t.Errorf("unexpected event")
}
}

View File

@ -26,6 +26,8 @@ import (
"testing" "testing"
"time" "time"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/tools" "k8s.io/kubernetes/pkg/tools"
"github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver"
@ -155,3 +157,10 @@ func NewEtcdTestClientServer(t *testing.T) *EtcdTestServer {
} }
return server return server
} }
// NewEtcdTestStorage creates a new storage.Interface and TestServer
func NewEtcdTestStorage(t *testing.T, codec runtime.Codec, prefix string) (*EtcdTestServer, storage.Interface) {
server := NewEtcdTestClientServer(t)
storage := NewEtcdStorage(server.client, codec, prefix)
return server, storage
}