diff --git a/pkg/storage/cacher_test.go b/pkg/storage/cacher_test.go index abfbd3d82d8..97d41136670 100644 --- a/pkg/storage/cacher_test.go +++ b/pkg/storage/cacher_test.go @@ -17,13 +17,11 @@ limitations under the License. package storage_test import ( - "fmt" "reflect" + "strconv" "testing" "time" - "github.com/coreos/go-etcd/etcd" - "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/testapi" @@ -33,7 +31,6 @@ import ( "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/storage" etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" - "k8s.io/kubernetes/pkg/tools" "k8s.io/kubernetes/pkg/tools/etcdtest" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/sets" @@ -42,11 +39,11 @@ import ( "golang.org/x/net/context" ) -func newTestCacher(client tools.EtcdClient) *storage.Cacher { +func newTestCacher(s storage.Interface) *storage.Cacher { prefix := "pods" config := storage.CacherConfig{ CacheCapacity: 10, - Storage: etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), etcdtest.PathPrefix()), + Storage: s, Versioner: etcdstorage.APIObjectVersioner{}, ListFromCache: true, Type: &api.Pod{}, @@ -65,12 +62,29 @@ func makeTestPod(name string) *api.Pod { } } +func updatePod(t *testing.T, s storage.Interface, obj, old *api.Pod) *api.Pod { + key := etcdtest.AddPrefix("pods/ns/" + obj.Name) + result := &api.Pod{} + if old == nil { + if err := s.Create(context.TODO(), key, obj, result, 0); err != nil { + t.Errorf("unexpected error: %v", err) + } + } else { + // To force "update" behavior of Set() we need to set ResourceVersion of + // previous version of object. + obj.ResourceVersion = old.ResourceVersion + if err := s.Set(context.TODO(), key, obj, result, 0); err != nil { + t.Errorf("unexpected error: %v", err) + } + obj.ResourceVersion = "" + } + return result +} + func TestList(t *testing.T) { - fakeClient := tools.NewFakeEtcdClient(t) - prefixedKey := etcdtest.AddPrefix("pods") - fakeClient.ExpectNotFoundGet(prefixedKey) - cacher := newTestCacher(fakeClient) - fakeClient.WaitForWatchCompletion() + server, etcdStorage := etcdstorage.NewEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix()) + defer server.Terminate(t) + cacher := newTestCacher(etcdStorage) podFoo := makeTestPod("foo") podBar := makeTestPod("bar") @@ -79,353 +93,149 @@ func TestList(t *testing.T) { podFooPrime := makeTestPod("foo") podFooPrime.Spec.NodeName = "fakeNode" - testCases := []*etcd.Response{ - { - Action: "create", - Node: &etcd.Node{ - Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)), - CreatedIndex: 1, - ModifiedIndex: 1, - }, - }, - { - Action: "create", - Node: &etcd.Node{ - Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podBar)), - CreatedIndex: 2, - ModifiedIndex: 2, - }, - }, - { - Action: "create", - Node: &etcd.Node{ - Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podBaz)), - CreatedIndex: 3, - ModifiedIndex: 3, - }, - }, - { - Action: "set", - Node: &etcd.Node{ - Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFooPrime)), - CreatedIndex: 1, - ModifiedIndex: 4, - }, - PrevNode: &etcd.Node{ - Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)), - CreatedIndex: 1, - ModifiedIndex: 1, - }, - }, - { - Action: "delete", - Node: &etcd.Node{ - CreatedIndex: 1, - ModifiedIndex: 5, - }, - PrevNode: &etcd.Node{ - Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podBar)), - CreatedIndex: 1, - ModifiedIndex: 1, - }, - }, - } + fooCreated := updatePod(t, etcdStorage, podFoo, nil) + _ = updatePod(t, etcdStorage, podBar, nil) + _ = updatePod(t, etcdStorage, podBaz, nil) - // Propagate some data to etcd. - for _, test := range testCases { - fakeClient.WatchResponse <- test + _ = updatePod(t, etcdStorage, podFooPrime, fooCreated) + + deleted := api.Pod{} + if err := etcdStorage.Delete(context.TODO(), etcdtest.AddPrefix("pods/ns/bar"), &deleted); err != nil { + t.Errorf("Unexpected error: %v", err) } result := &api.PodList{} - if err := cacher.List(context.TODO(), "pods/ns", 5, storage.Everything, result); err != nil { - t.Errorf("unexpected error: %v", err) + // TODO: We need to pass ResourceVersion of barPod deletion operation. + // However, there is no easy way to get it, so it is hardcoded to 8. + if err := cacher.List(context.TODO(), "pods/ns", 8, storage.Everything, result); err != nil { + t.Errorf("Unexpected error: %v", err) } - if result.ListMeta.ResourceVersion != "5" { - t.Errorf("incorrect resource version: %v", result.ListMeta.ResourceVersion) + if result.ListMeta.ResourceVersion != "8" { + t.Errorf("Incorrect resource version: %v", result.ListMeta.ResourceVersion) } if len(result.Items) != 2 { - t.Errorf("unexpected list result: %d", len(result.Items)) + t.Errorf("Unexpected list result: %d", len(result.Items)) } keys := sets.String{} for _, item := range result.Items { - keys.Insert(item.ObjectMeta.Name) + keys.Insert(item.Name) } if !keys.HasAll("foo", "baz") { - t.Errorf("unexpected list result: %#v", result) + t.Errorf("Unexpected list result: %#v", result) } for _, item := range result.Items { // unset fields that are set by the infrastructure - item.ObjectMeta.ResourceVersion = "" - item.ObjectMeta.CreationTimestamp = unversioned.Time{} + item.ResourceVersion = "" + item.CreationTimestamp = unversioned.Time{} var expected *api.Pod - switch item.ObjectMeta.Name { + switch item.Name { case "foo": expected = podFooPrime case "baz": expected = podBaz default: - t.Errorf("unexpected item: %v", item) + t.Errorf("Unexpected item: %v", item) } if e, a := *expected, item; !reflect.DeepEqual(e, a) { - t.Errorf("expected: %#v, got: %#v", e, a) + t.Errorf("Expected: %#v, got: %#v", e, a) } } +} - close(fakeClient.WatchResponse) +func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) { + select { + case event := <-w.ResultChan(): + if e, a := eventType, event.Type; e != a { + t.Errorf("Expected: %s, got: %s", eventType, event.Type) + } + if e, a := eventObject, event.Object; !api.Semantic.DeepDerivative(e, a) { + t.Errorf("Expected (%s): %#v, got: %#v", eventType, e, a) + } + case <-time.After(util.ForeverTestTimeout): + t.Errorf("Timed out waiting for an event") + } } func TestWatch(t *testing.T) { - fakeClient := tools.NewFakeEtcdClient(t) - prefixedKey := etcdtest.AddPrefix("pods") - fakeClient.ExpectNotFoundGet(prefixedKey) - cacher := newTestCacher(fakeClient) - fakeClient.WaitForWatchCompletion() + server, etcdStorage := etcdstorage.NewEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix()) + defer server.Terminate(t) + cacher := newTestCacher(etcdStorage) podFoo := makeTestPod("foo") podBar := makeTestPod("bar") - testCases := []struct { - object *api.Pod - etcdResponse *etcd.Response - event watch.EventType - filtered bool - }{ - { - object: podFoo, - etcdResponse: &etcd.Response{ - Action: "create", - Node: &etcd.Node{ - Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)), - CreatedIndex: 2, - ModifiedIndex: 2, - }, - }, - event: watch.Added, - filtered: true, - }, - { - object: podBar, - etcdResponse: &etcd.Response{ - Action: "create", - Node: &etcd.Node{ - Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podBar)), - CreatedIndex: 3, - ModifiedIndex: 3, - }, - }, - event: watch.Added, - filtered: false, - }, - { - object: podFoo, - etcdResponse: &etcd.Response{ - Action: "set", - Node: &etcd.Node{ - Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)), - CreatedIndex: 2, - ModifiedIndex: 4, - }, - PrevNode: &etcd.Node{ - Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)), - CreatedIndex: 2, - ModifiedIndex: 2, - }, - }, - event: watch.Modified, - filtered: true, - }, - } + podFooPrime := makeTestPod("foo") + podFooPrime.Spec.NodeName = "fakeNode" + + podFooBis := makeTestPod("foo") + podFooBis.Spec.NodeName = "anotherFakeNode" // Set up Watch for object "podFoo". - watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 2, storage.Everything) + watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 1, storage.Everything) if err != nil { - t.Fatalf("unexpected error: %v", err) + t.Fatalf("Unexpected error: %v", err) } - for _, test := range testCases { - fakeClient.WatchResponse <- test.etcdResponse - if test.filtered { - event := <-watcher.ResultChan() - if e, a := test.event, event.Type; e != a { - t.Errorf("%v %v", e, a) - } - // unset fields that are set by the infrastructure - obj := event.Object.(*api.Pod) - obj.ObjectMeta.ResourceVersion = "" - obj.ObjectMeta.CreationTimestamp = unversioned.Time{} - if e, a := test.object, obj; !reflect.DeepEqual(e, a) { - t.Errorf("expected: %#v, got: %#v", e, a) - } - } - } + fooCreated := updatePod(t, etcdStorage, podFoo, nil) + _ = updatePod(t, etcdStorage, podBar, nil) + fooUpdated := updatePod(t, etcdStorage, podFooPrime, fooCreated) + + verifyWatchEvent(t, watcher, watch.Added, podFoo) + verifyWatchEvent(t, watcher, watch.Modified, podFooPrime) // Check whether we get too-old error. _, err = cacher.Watch(context.TODO(), "pods/ns/foo", 1, storage.Everything) if err == nil { - t.Errorf("exepcted 'error too old' error") + t.Errorf("Expected 'error too old' error") } // Now test watch with initial state. - initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 2, storage.Everything) + initialVersion, err := strconv.Atoi(fooCreated.ResourceVersion) if err != nil { - t.Fatalf("unexpected error: %v", err) + t.Fatalf("Incorrect resourceVersion: %s", fooCreated.ResourceVersion) } - for _, test := range testCases { - if test.filtered { - event := <-initialWatcher.ResultChan() - if e, a := test.event, event.Type; e != a { - t.Errorf("%v %v", e, a) - } - // unset fields that are set by the infrastructure - obj := event.Object.(*api.Pod) - obj.ObjectMeta.ResourceVersion = "" - obj.ObjectMeta.CreationTimestamp = unversioned.Time{} - if e, a := test.object, obj; !reflect.DeepEqual(e, a) { - t.Errorf("expected: %#v, got: %#v", e, a) - } - } + initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", uint64(initialVersion), storage.Everything) + if err != nil { + t.Fatalf("Unexpected error: %v", err) } + verifyWatchEvent(t, initialWatcher, watch.Added, podFoo) + verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime) + // Now test watch from "now". nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 0, storage.Everything) if err != nil { - t.Fatalf("unexpected error: %v", err) - } - select { - case event := <-nowWatcher.ResultChan(): - if obj := event.Object.(*api.Pod); event.Type != watch.Added || obj.ResourceVersion != "4" { - t.Errorf("unexpected event: %v", event) - } - case <-time.After(util.ForeverTestTimeout): - t.Errorf("timed out waiting for an event") - } - // Emit a new event and check if it is observed by the watcher. - fakeClient.WatchResponse <- &etcd.Response{ - Action: "set", - Node: &etcd.Node{ - Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)), - CreatedIndex: 2, - ModifiedIndex: 5, - }, - PrevNode: &etcd.Node{ - Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)), - CreatedIndex: 2, - ModifiedIndex: 4, - }, - } - event := <-nowWatcher.ResultChan() - obj := event.Object.(*api.Pod) - if event.Type != watch.Modified || obj.ResourceVersion != "5" { - t.Errorf("unexpected event: %v", event) + t.Fatalf("Unexpected error: %v", err) } - close(fakeClient.WatchResponse) + verifyWatchEvent(t, nowWatcher, watch.Added, podFooPrime) + + _ = updatePod(t, etcdStorage, podFooBis, fooUpdated) + + verifyWatchEvent(t, nowWatcher, watch.Modified, podFooBis) } func TestFiltering(t *testing.T) { - fakeClient := tools.NewFakeEtcdClient(t) - prefixedKey := etcdtest.AddPrefix("pods") - fakeClient.ExpectNotFoundGet(prefixedKey) - cacher := newTestCacher(fakeClient) - fakeClient.WaitForWatchCompletion() + server, etcdStorage := etcdstorage.NewEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix()) + defer server.Terminate(t) + cacher := newTestCacher(etcdStorage) podFoo := makeTestPod("foo") - podFoo.ObjectMeta.Labels = map[string]string{"filter": "foo"} + podFoo.Labels = map[string]string{"filter": "foo"} podFooFiltered := makeTestPod("foo") + podFooPrime := makeTestPod("foo") + podFooPrime.Labels = map[string]string{"filter": "foo"} + podFooPrime.Spec.NodeName = "fakeNode" - testCases := []struct { - object *api.Pod - etcdResponse *etcd.Response - filtered bool - event watch.EventType - }{ - { - object: podFoo, - etcdResponse: &etcd.Response{ - Action: "create", - Node: &etcd.Node{ - Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)), - CreatedIndex: 1, - ModifiedIndex: 1, - }, - }, - filtered: true, - event: watch.Added, - }, - { - object: podFooFiltered, - etcdResponse: &etcd.Response{ - Action: "set", - Node: &etcd.Node{ - Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFooFiltered)), - CreatedIndex: 1, - ModifiedIndex: 2, - }, - PrevNode: &etcd.Node{ - Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)), - CreatedIndex: 1, - ModifiedIndex: 1, - }, - }, - filtered: true, - // Deleted, because the new object doesn't match filter. - event: watch.Deleted, - }, - { - object: podFoo, - etcdResponse: &etcd.Response{ - Action: "set", - Node: &etcd.Node{ - Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)), - CreatedIndex: 1, - ModifiedIndex: 3, - }, - PrevNode: &etcd.Node{ - Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFooFiltered)), - CreatedIndex: 1, - ModifiedIndex: 2, - }, - }, - filtered: true, - // Added, because the previous object didn't match filter. - event: watch.Added, - }, - { - object: podFoo, - etcdResponse: &etcd.Response{ - Action: "set", - Node: &etcd.Node{ - Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)), - CreatedIndex: 1, - ModifiedIndex: 4, - }, - PrevNode: &etcd.Node{ - Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)), - CreatedIndex: 1, - ModifiedIndex: 3, - }, - }, - filtered: true, - event: watch.Modified, - }, - { - object: podFoo, - etcdResponse: &etcd.Response{ - Action: "delete", - Node: &etcd.Node{ - CreatedIndex: 1, - ModifiedIndex: 5, - }, - PrevNode: &etcd.Node{ - Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)), - CreatedIndex: 1, - ModifiedIndex: 4, - }, - }, - filtered: true, - event: watch.Deleted, - }, + fooCreated := updatePod(t, etcdStorage, podFoo, nil) + fooFiltered := updatePod(t, etcdStorage, podFooFiltered, fooCreated) + fooUnfiltered := updatePod(t, etcdStorage, podFoo, fooFiltered) + _ = updatePod(t, etcdStorage, podFooPrime, fooUnfiltered) + + deleted := api.Pod{} + if err := etcdStorage.Delete(context.TODO(), etcdtest.AddPrefix("pods/ns/foo"), &deleted); err != nil { + t.Errorf("Unexpected error: %v", err) } // Set up Watch for object "podFoo" with label filter set. @@ -433,68 +243,43 @@ func TestFiltering(t *testing.T) { filter := func(obj runtime.Object) bool { metadata, err := meta.Accessor(obj) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Errorf("Unexpected error: %v", err) return false } return selector.Matches(labels.Set(metadata.Labels())) } - watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 1, filter) + initialVersion, err := strconv.Atoi(fooCreated.ResourceVersion) if err != nil { - t.Fatalf("unexpected error: %v", err) + t.Fatalf("Incorrect resourceVersion: %s", fooCreated.ResourceVersion) + } + watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", uint64(initialVersion), filter) + if err != nil { + t.Fatalf("Unexpected error: %v", err) } - for _, test := range testCases { - fakeClient.WatchResponse <- test.etcdResponse - if test.filtered { - event := <-watcher.ResultChan() - if e, a := test.event, event.Type; e != a { - t.Errorf("%v %v", e, a) - } - // unset fields that are set by the infrastructure - obj := event.Object.(*api.Pod) - obj.ObjectMeta.ResourceVersion = "" - obj.ObjectMeta.CreationTimestamp = unversioned.Time{} - if e, a := test.object, obj; !reflect.DeepEqual(e, a) { - t.Errorf("expected: %#v, got: %#v", e, a) - } - } - } - - close(fakeClient.WatchResponse) + verifyWatchEvent(t, watcher, watch.Added, podFoo) + verifyWatchEvent(t, watcher, watch.Deleted, podFooFiltered) + verifyWatchEvent(t, watcher, watch.Added, podFoo) + verifyWatchEvent(t, watcher, watch.Modified, podFooPrime) + verifyWatchEvent(t, watcher, watch.Deleted, podFooPrime) } +/* TODO: So believe it or not... but this test is flakey with the go-etcd client library + * which I'm surprised by. Apprently you can close the client that is performing the watch + * and the watch *never returns.* I would like to still keep this test here and re-enable + * with the new 2.2+ client library. func TestStorageError(t *testing.T) { - fakeClient := tools.NewFakeEtcdClient(t) - prefixedKey := etcdtest.AddPrefix("pods") - fakeClient.ExpectNotFoundGet(prefixedKey) - cacher := newTestCacher(fakeClient) - fakeClient.WaitForWatchCompletion() + server, etcdStorage := etcdstorage.NewEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix()) + cacher := newTestCacher(etcdStorage) - podFoo := makeTestPod("foo") - - // Set up Watch for object "podFoo". watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 1, storage.Everything) if err != nil { - t.Fatalf("unexpected error: %v", err) + t.Fatalf("Unexpected error: %v", err) } + server.Terminate(t) - fakeClient.WatchResponse <- &etcd.Response{ - Action: "create", - Node: &etcd.Node{ - Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)), - CreatedIndex: 1, - ModifiedIndex: 1, - }, + got := <-watcher.ResultChan() + if got.Type != watch.Error { + t.Errorf("Unexpected non-error") } - _ = <-watcher.ResultChan() - - // Injecting error is simulating error from etcd. - // This is almost the same what would happen e.g. in case of - // "error too old" when reconnecting to etcd watch. - fakeClient.WatchInjectError <- fmt.Errorf("fake error") - - _, ok := <-watcher.ResultChan() - if ok { - t.Errorf("unexpected event") - } -} +} */ diff --git a/pkg/storage/etcd/etcd_test_util.go b/pkg/storage/etcd/etcd_test_util.go index b94eb6cba01..72aec2d871c 100644 --- a/pkg/storage/etcd/etcd_test_util.go +++ b/pkg/storage/etcd/etcd_test_util.go @@ -26,6 +26,8 @@ import ( "testing" "time" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/tools" "github.com/coreos/etcd/etcdserver" @@ -155,3 +157,10 @@ func NewEtcdTestClientServer(t *testing.T) *EtcdTestServer { } return server } + +// NewEtcdTestStorage creates a new storage.Interface and TestServer +func NewEtcdTestStorage(t *testing.T, codec runtime.Codec, prefix string) (*EtcdTestServer, storage.Interface) { + server := NewEtcdTestClientServer(t) + storage := NewEtcdStorage(server.client, codec, prefix) + return server, storage +}