Merge pull request #16182 from timothysc/etcd-remove-fakeclient-part2

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2015-10-28 02:02:01 -07:00
commit ffe92cc4d4

View File

@ -17,19 +17,15 @@ limitations under the License.
package etcd package etcd
import ( import (
"fmt" rt "runtime"
"testing" "testing"
"time"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/tools"
"k8s.io/kubernetes/pkg/tools/etcdtest" "k8s.io/kubernetes/pkg/tools/etcdtest"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -218,62 +214,45 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
} }
} }
/* TODO: So believe it or not... but this test is flakey with the go-etcd client library
* which I'm surprised by. Apprently you can close the client that is performing the watch
* and the watch *never returns.* I would like to still keep this test here and re-enable
* with the new 2.2+ client library.
func TestWatchEtcdError(t *testing.T) { func TestWatchEtcdError(t *testing.T) {
codec := testapi.Default.Codec() codec := testapi.Default.Codec()
fakeClient := tools.NewFakeEtcdClient(t) server := NewEtcdTestClientServer(t)
fakeClient.ExpectNotFoundGet("/some/key") h := newEtcdHelper(server.client, codec, etcdtest.PathPrefix())
fakeClient.WatchImmediateError = fmt.Errorf("immediate error")
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(context.TODO(), "/some/key", 4, storage.Everything) watching, err := h.Watch(context.TODO(), "/some/key", 4, storage.Everything)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
defer watching.Stop() server.Terminate(t)
got := <-watching.ResultChan() got := <-watching.ResultChan()
if got.Type != watch.Error { if got.Type != watch.Error {
t.Fatalf("Unexpected non-error") t.Fatalf("Unexpected non-error")
} }
status, ok := got.Object.(*unversioned.Status) watching.Stop()
if !ok { } */
t.Fatalf("Unexpected non-error object type")
}
if status.Message != "immediate error" {
t.Errorf("Unexpected wrong error")
}
if status.Status != unversioned.StatusFailure {
t.Errorf("Unexpected wrong error status")
}
}
func TestWatch(t *testing.T) { func TestWatch(t *testing.T) {
codec := testapi.Default.Codec() codec := testapi.Default.Codec()
fakeClient := tools.NewFakeEtcdClient(t) server := NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := "/some/key" key := "/some/key"
prefixedKey := etcdtest.AddPrefix(key) h := newEtcdHelper(server.client, codec, etcdtest.PathPrefix())
fakeClient.ExpectNotFoundGet(prefixedKey)
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(context.TODO(), key, 0, storage.Everything) watching, err := h.Watch(context.TODO(), key, 0, storage.Everything)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
fakeClient.WaitForWatchCompletion()
// when server returns not found, the watch index starts at the next value (1)
if fakeClient.WatchIndex != 1 {
t.Errorf("Expected client to be at index %d, got %#v", 1, fakeClient)
}
// Test normal case // Test normal case
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
podBytes, _ := codec.Encode(pod) returnObj := &api.Pod{}
fakeClient.WatchResponse <- &etcd.Response{ err = h.Set(context.TODO(), key, pod, returnObj, 0)
Action: "set", if err != nil {
Node: &etcd.Node{ t.Fatalf("Unexpected error: %v", err)
Value: string(podBytes),
},
} }
event := <-watching.ResultChan() event := <-watching.ResultChan()
@ -284,24 +263,8 @@ func TestWatch(t *testing.T) {
t.Errorf("Expected %v, got %v", e, a) t.Errorf("Expected %v, got %v", e, a)
} }
// Test error case watching.Stop()
fakeClient.WatchInjectError <- fmt.Errorf("Injected error")
if errEvent, ok := <-watching.ResultChan(); !ok {
t.Errorf("no error result?")
} else {
if e, a := watch.Error, errEvent.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := "Injected error", errEvent.Object.(*unversioned.Status).Message; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
}
// Did everything shut down?
if _, open := <-fakeClient.WatchResponse; open {
t.Errorf("An injected error did not cause a graceful shutdown")
}
if _, open := <-watching.ResultChan(); open { if _, open := <-watching.ResultChan(); open {
t.Errorf("An injected error did not cause a graceful shutdown") t.Errorf("An injected error did not cause a graceful shutdown")
} }
@ -320,405 +283,171 @@ func makeSubsets(ip string, port int) []api.EndpointSubset {
func TestWatchEtcdState(t *testing.T) { func TestWatchEtcdState(t *testing.T) {
codec := testapi.Default.Codec() codec := testapi.Default.Codec()
baseKey := "/somekey/foo" key := etcdtest.AddPrefix("/somekey/foo")
prefixedKey := etcdtest.AddPrefix(baseKey) server := NewEtcdTestClientServer(t)
type T struct { defer server.Terminate(t)
Type watch.EventType
Endpoints []api.EndpointSubset h := newEtcdHelper(server.client, codec, etcdtest.PathPrefix())
} watching, err := h.Watch(context.TODO(), key, 0, storage.Everything)
testCases := map[string]struct { if err != nil {
Initial map[string]tools.EtcdResponseWithError t.Fatalf("Unexpected error: %v", err)
Responses []*etcd.Response
From uint64
Expected []*T
}{
"from not found": {
Initial: map[string]tools.EtcdResponseWithError{},
Responses: []*etcd.Response{
{
Action: "create",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Subsets: emptySubsets(),
})),
},
},
},
From: 1,
Expected: []*T{
{watch.Added, nil},
},
},
"from version 1": {
Responses: []*etcd.Response{
{
Action: "compareAndSwap",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Subsets: makeSubsets("127.0.0.1", 9000),
})),
CreatedIndex: 1,
ModifiedIndex: 2,
},
PrevNode: &etcd.Node{
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Subsets: emptySubsets(),
})),
CreatedIndex: 1,
ModifiedIndex: 1,
},
},
},
From: 1,
Expected: []*T{
{watch.Modified, makeSubsets("127.0.0.1", 9000)},
},
},
"from initial state": {
Initial: map[string]tools.EtcdResponseWithError{
prefixedKey: {
R: &etcd.Response{
Action: "get",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Subsets: emptySubsets(),
})),
CreatedIndex: 1,
ModifiedIndex: 1,
},
EtcdIndex: 1,
},
},
},
Responses: []*etcd.Response{
nil,
{
Action: "compareAndSwap",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Subsets: makeSubsets("127.0.0.1", 9000),
})),
CreatedIndex: 1,
ModifiedIndex: 2,
},
PrevNode: &etcd.Node{
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Subsets: emptySubsets(),
})),
CreatedIndex: 1,
ModifiedIndex: 1,
},
},
},
Expected: []*T{
{watch.Added, nil},
{watch.Modified, makeSubsets("127.0.0.1", 9000)},
},
},
} }
for k, testCase := range testCases { endpoint := &api.Endpoints{
fakeClient := tools.NewFakeEtcdClient(t) ObjectMeta: api.ObjectMeta{Name: "foo"},
for key, value := range testCase.Initial { Subsets: emptySubsets(),
fakeClient.Data[key] = value
}
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(context.TODO(), baseKey, testCase.From, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
t.Logf("Testing %v", k)
for i := range testCase.Responses {
if testCase.Responses[i] != nil {
fakeClient.WatchResponse <- testCase.Responses[i]
}
event := <-watching.ResultChan()
if e, a := testCase.Expected[i].Type, event.Type; e != a {
t.Errorf("%s: expected type %v, got %v", k, e, a)
break
}
if e, a := testCase.Expected[i].Endpoints, event.Object.(*api.Endpoints).Subsets; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%s: expected type %v, got %v", k, e, a)
break
}
}
watching.Stop()
} }
err = h.Set(context.TODO(), key, endpoint, endpoint, 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
event := <-watching.ResultChan()
if event.Type != watch.Added {
t.Errorf("Unexpected event %#v", event)
}
subset := makeSubsets("127.0.0.1", 9000)
endpoint.Subsets = subset
// CAS the previous value
err = h.Set(context.TODO(), key, endpoint, endpoint, 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
event = <-watching.ResultChan()
if event.Type != watch.Modified {
t.Errorf("Unexpected event %#v", event)
}
if e, a := endpoint, event.Object; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%s: expected %v, got %v", e, a)
}
watching.Stop()
} }
func TestWatchFromZeroIndex(t *testing.T) { func TestWatchFromZeroIndex(t *testing.T) {
codec := testapi.Default.Codec() codec := testapi.Default.Codec()
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
testCases := map[string]struct { key := etcdtest.AddPrefix("/somekey/foo")
Response tools.EtcdResponseWithError server := NewEtcdTestClientServer(t)
ExpectedVersion string defer server.Terminate(t)
ExpectedType watch.EventType
}{ h := newEtcdHelper(server.client, codec, etcdtest.PathPrefix())
"get value created": {
tools.EtcdResponseWithError{ // set before the watch and verify events
R: &etcd.Response{ err := h.Set(context.TODO(), key, pod, pod, 0)
Node: &etcd.Node{ if err != nil {
Value: runtime.EncodeOrDie(codec, pod), t.Fatalf("Unexpected error: %v", err)
CreatedIndex: 1,
ModifiedIndex: 1,
},
Action: "get",
EtcdIndex: 2,
},
},
"1",
watch.Added,
},
"get value modified": {
tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(codec, pod),
CreatedIndex: 1,
ModifiedIndex: 2,
},
Action: "get",
EtcdIndex: 3,
},
},
"2",
watch.Modified,
},
} }
for k, testCase := range testCases { // check for concatenation on watch event with CAS
fakeClient := tools.NewFakeEtcdClient(t) pod.Name = "bar"
key := "/some/key" err = h.Set(context.TODO(), key, pod, pod, 0)
prefixedKey := etcdtest.AddPrefix(key) if err != nil {
fakeClient.Data[prefixedKey] = testCase.Response t.Fatalf("Unexpected error: %v", err)
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(context.TODO(), key, 0, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
if e, a := testCase.Response.R.EtcdIndex+1, fakeClient.WatchIndex; e != a {
t.Errorf("%s: expected watch index to be %d, got %d", k, e, a)
}
// the existing node is detected and the index set
event := <-watching.ResultChan()
if e, a := testCase.ExpectedType, event.Type; e != a {
t.Errorf("%s: expected %v, got %v", k, e, a)
}
actualPod, ok := event.Object.(*api.Pod)
if !ok {
t.Fatalf("%s: expected a pod, got %#v", k, event.Object)
}
if actualPod.ResourceVersion != testCase.ExpectedVersion {
t.Errorf("%s: expected pod with resource version %v, Got %#v", k, testCase.ExpectedVersion, actualPod)
}
pod.ResourceVersion = testCase.ExpectedVersion
if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%s: expected %v, got %v", k, e, a)
}
watching.Stop()
} }
watching, err := h.Watch(context.TODO(), key, 0, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// marked as modified b/c of concatenation
event := <-watching.ResultChan()
if event.Type != watch.Modified {
t.Errorf("Unexpected event %#v", event)
}
err = h.Set(context.TODO(), key, pod, pod, 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
event = <-watching.ResultChan()
if event.Type != watch.Modified {
t.Errorf("Unexpected event %#v", event)
}
if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%s: expected %v, got %v", e, a)
}
watching.Stop()
} }
func TestWatchListFromZeroIndex(t *testing.T) { func TestWatchListFromZeroIndex(t *testing.T) {
codec := testapi.Default.Codec() codec := testapi.Default.Codec()
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} key := etcdtest.AddPrefix("/some/key")
key := "/some/key" server := NewEtcdTestClientServer(t)
prefixedKey := etcdtest.AddPrefix(key) defer server.Terminate(t)
fakeClient := tools.NewFakeEtcdClient(t) h := newEtcdHelper(server.client, codec, key)
fakeClient.Data[prefixedKey] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Dir: true,
Nodes: etcd.Nodes{
&etcd.Node{
Value: runtime.EncodeOrDie(codec, pod),
CreatedIndex: 1,
ModifiedIndex: 1,
Nodes: etcd.Nodes{},
},
&etcd.Node{
Value: runtime.EncodeOrDie(codec, pod),
CreatedIndex: 2,
ModifiedIndex: 2,
Nodes: etcd.Nodes{},
},
},
},
Action: "get",
EtcdIndex: 3,
},
}
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.WatchList(context.TODO(), key, 0, storage.Everything) watching, err := h.WatchList(context.TODO(), key, 0, storage.Everything)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
// the existing node is detected and the index set // creates key/foo which should trigger the WatchList for "key"
event, open := <-watching.ResultChan() pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
if !open { err = h.Create(context.TODO(), pod.Name, pod, pod, 0)
t.Fatalf("unexpected channel close") if err != nil {
} t.Fatalf("Unexpected error: %v", err)
for i := 0; i < 2; i++ { }
if e, a := watch.Added, event.Type; e != a {
t.Errorf("Expected %v, got %v", e, a) event, _ := <-watching.ResultChan()
} if event.Type != watch.Added {
actualPod, ok := event.Object.(*api.Pod) t.Errorf("Unexpected event %#v", event)
if !ok { }
t.Fatalf("expected a pod, got %#v", event.Object)
} if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) {
if actualPod.ResourceVersion != "1" { t.Errorf("%s: expected %v, got %v", e, a)
t.Errorf("Expected pod with resource version %d, Got %#v", 1, actualPod)
}
pod.ResourceVersion = "1"
if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
} }
fakeClient.WaitForWatchCompletion()
watching.Stop() watching.Stop()
} }
func TestWatchListIgnoresRootKey(t *testing.T) { func TestWatchListIgnoresRootKey(t *testing.T) {
codec := testapi.Default.Codec() codec := testapi.Default.Codec()
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
key := "/some/key" key := etcdtest.AddPrefix("/some/key")
prefixedKey := etcdtest.AddPrefix(key) server := NewEtcdTestClientServer(t)
defer server.Terminate(t)
h := newEtcdHelper(server.client, codec, key)
fakeClient := tools.NewFakeEtcdClient(t) watching, err := h.WatchList(context.TODO(), key, 0, storage.Everything)
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.WatchList(context.TODO(), key, 1, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
// This is the root directory of the watch, which happens to have a value encoded
fakeClient.WatchResponse <- &etcd.Response{
Action: "delete",
PrevNode: &etcd.Node{
Key: prefixedKey,
Value: runtime.EncodeOrDie(codec, pod),
CreatedIndex: 1,
ModifiedIndex: 1,
},
}
// Delete of the parent directory of a key is an event that a list watch would receive,
// but will have no value so the decode will fail.
fakeClient.WatchResponse <- &etcd.Response{
Action: "delete",
PrevNode: &etcd.Node{
Key: prefixedKey,
Value: "",
CreatedIndex: 1,
ModifiedIndex: 1,
},
}
close(fakeClient.WatchStop)
// the existing node is detected and the index set
_, open := <-watching.ResultChan()
if open {
t.Fatalf("unexpected channel open")
}
watching.Stop()
}
func TestWatchFromNotFound(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
key := "/some/key"
prefixedKey := etcdtest.AddPrefix(key)
fakeClient.Data[prefixedKey] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: &etcd.EtcdError{
Index: 2,
ErrorCode: 100,
},
}
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(context.TODO(), key, 0, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
if fakeClient.WatchIndex != 3 {
t.Errorf("Expected client to wait for %d, got %#v", 3, fakeClient)
}
watching.Stop()
}
func TestWatchFromOtherError(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
key := "/some/key"
prefixedKey := etcdtest.AddPrefix(key)
fakeClient.Data[prefixedKey] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: &etcd.EtcdError{
Index: 2,
ErrorCode: 101,
},
}
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(context.TODO(), key, 0, storage.Everything)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
errEvent := <-watching.ResultChan() // creates key/foo which should trigger the WatchList for "key"
if e, a := watch.Error, errEvent.Type; e != a { err = h.Create(context.TODO(), key, pod, pod, 0)
t.Errorf("Expected %v, got %v", e, a) if err != nil {
} t.Fatalf("Unexpected error: %v", err)
if e, a := "101: () [2]", errEvent.Object.(*unversioned.Status).Message; e != a {
t.Errorf("Expected %v, got %v", e, a)
} }
// force context switch to ensure watches would catch and notify.
rt.Gosched()
select { select {
case _, ok := <-watching.ResultChan(): case event, _ := <-watching.ResultChan():
if ok { t.Fatalf("Unexpected event: %#v", event)
t.Fatalf("expected result channel to be closed") default:
} // fall through, expected behavior
case <-time.After(util.ForeverTestTimeout):
t.Fatalf("watch should have closed channel: %#v", watching)
} }
if fakeClient.WatchResponse != nil || fakeClient.WatchIndex != 0 { watching.Stop()
t.Fatalf("Watch should not have been invoked: %#v", fakeClient)
}
} }
func TestWatchPurposefulShutdown(t *testing.T) { func TestWatchPurposefulShutdown(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t) server := NewEtcdTestClientServer(t)
defer server.Terminate(t)
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
key := "/some/key" key := "/some/key"
prefixedKey := etcdtest.AddPrefix(key) h := newEtcdHelper(server.client, codec, etcdtest.PathPrefix())
fakeClient.ExpectNotFoundGet(prefixedKey)
// Test purposeful shutdown // Test purposeful shutdown
watching, err := h.Watch(context.TODO(), key, 0, storage.Everything) watching, err := h.Watch(context.TODO(), key, 0, storage.Everything)
@ -726,14 +455,10 @@ func TestWatchPurposefulShutdown(t *testing.T) {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
fakeClient.WaitForWatchCompletion()
watching.Stop() watching.Stop()
rt.Gosched()
// Did everything shut down?
if _, open := <-fakeClient.WatchResponse; open {
t.Errorf("A stop did not cause a graceful shutdown")
}
if _, open := <-watching.ResultChan(); open { if _, open := <-watching.ResultChan(); open {
t.Errorf("An injected error did not cause a graceful shutdown") t.Errorf("Channel should be closed")
} }
} }