diff --git a/pkg/api/helper.go b/pkg/api/helper.go index 0d7d5b6fdeb..a1365c3d3d4 100644 --- a/pkg/api/helper.go +++ b/pkg/api/helper.go @@ -25,6 +25,20 @@ import ( "gopkg.in/v1/yaml" ) +type EncodingInterface interface { + Encode(obj interface{}) (data []byte, err error) + Decode(data []byte) (interface{}, error) + DecodeInto(data []byte, obj interface{}) error +} + +type VersioningInterface interface { + SetResourceVersion(obj interface{}, version uint64) error + ResourceVersion(obj interface{}) (uint64, error) +} + +var Encoding EncodingInterface +var Versioning VersioningInterface + var conversionScheme *conversion.Scheme func init() { @@ -86,6 +100,9 @@ func init() { return nil }, ) + + Encoding = conversionScheme + Versioning = JSONBaseVersioning{} } // AddKnownTypes registers the types of the arguments to the marshaller of the package api. diff --git a/pkg/api/jsonbase.go b/pkg/api/jsonbase.go index 884615b7731..4ce61709439 100644 --- a/pkg/api/jsonbase.go +++ b/pkg/api/jsonbase.go @@ -21,6 +21,26 @@ import ( "reflect" ) +// versionedJSONBase allows access to the version state of a JSONBase object +type JSONBaseVersioning struct{} + +func (v JSONBaseVersioning) ResourceVersion(obj interface{}) (uint64, error) { + json, err := FindJSONBaseRO(obj) + if err != nil { + return 0, err + } + return json.ResourceVersion, nil +} + +func (v JSONBaseVersioning) SetResourceVersion(obj interface{}, version uint64) error { + json, err := FindJSONBase(obj) + if err != nil { + return err + } + json.SetResourceVersion(version) + return nil +} + // JSONBase lets you work with a JSONBase from any of the versioned or // internal APIObjects. type JSONBaseInterface interface { diff --git a/pkg/api/jsonbase_test.go b/pkg/api/jsonbase_test.go index 66259889d8a..1b0500f0693 100644 --- a/pkg/api/jsonbase_test.go +++ b/pkg/api/jsonbase_test.go @@ -57,3 +57,35 @@ func TestGenericJSONBase(t *testing.T) { t.Errorf("expected %v, got %v", e, a) } } + +func TestVersioningOfAPI(t *testing.T) { + type T struct { + Object interface{} + Expected uint64 + } + testCases := map[string]T{ + "empty api object": {Service{}, 0}, + "api object with version": {Service{JSONBase: JSONBase{ResourceVersion: 1}}, 1}, + "pointer to api object with version": {&Service{JSONBase: JSONBase{ResourceVersion: 1}}, 1}, + } + versioning := JSONBaseVersioning{} + for key, testCase := range testCases { + actual, err := versioning.ResourceVersion(testCase.Object) + if err != nil { + t.Errorf("%s: unexpected error %#v", key, err) + } + if actual != testCase.Expected { + t.Errorf("%s: expected %d, got %d", key, testCase.Expected, actual) + } + } + + failingCases := map[string]T{ + "not a valid object to try": {JSONBase{ResourceVersion: 1}, 1}, + } + for key, testCase := range failingCases { + _, err := versioning.ResourceVersion(testCase.Object) + if err == nil { + t.Errorf("%s: expected error, got nil", key) + } + } +} diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index 37abec12921..143c53bb2b3 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -34,7 +34,7 @@ import ( // TODO: Allow choice of switching between etcd/apiserver watching, or remove etcd references // from this file completely. type ReplicationManager struct { - etcdClient tools.EtcdClient + etcdHelper tools.EtcdHelper kubeClient client.Interface podControl PodControlInterface syncTime <-chan time.Time @@ -84,7 +84,7 @@ func (r RealPodControl) deletePod(podID string) error { func MakeReplicationManager(etcdClient tools.EtcdClient, kubeClient client.Interface) *ReplicationManager { rm := &ReplicationManager{ kubeClient: kubeClient, - etcdClient: etcdClient, + etcdHelper: tools.EtcdHelper{etcdClient, api.Encoding, api.Versioning}, podControl: RealPodControl{ kubeClient: kubeClient, }, @@ -102,8 +102,7 @@ func (rm *ReplicationManager) Run(period time.Duration) { // makeEtcdWatch starts watching via etcd. func (rm *ReplicationManager) makeEtcdWatch() (watch.Interface, error) { - helper := tools.EtcdHelper{rm.etcdClient} - return helper.WatchList("/registry/controllers", tools.Everything) + return rm.etcdHelper.WatchList("/registry/controllers", tools.Everything) } // makeAPIWatch starts watching via the apiserver. @@ -192,8 +191,7 @@ func (rm *ReplicationManager) syncReplicationController(controllerSpec api.Repli func (rm *ReplicationManager) synchronize() { var controllerSpecs []api.ReplicationController - helper := tools.EtcdHelper{rm.etcdClient} - err := helper.ExtractList("/registry/controllers", &controllerSpecs) + err := rm.etcdHelper.ExtractList("/registry/controllers", &controllerSpecs) if err != nil { glog.Errorf("Synchronization error: %v (%#v)", err, err) return diff --git a/pkg/registry/etcdregistry.go b/pkg/registry/etcdregistry.go index f64c7eb544d..8dd6089a284 100644 --- a/pkg/registry/etcdregistry.go +++ b/pkg/registry/etcdregistry.go @@ -32,7 +32,8 @@ import ( // EtcdRegistry implements PodRegistry, ControllerRegistry and ServiceRegistry with backed by etcd. type EtcdRegistry struct { - etcdClient tools.EtcdClient + client tools.EtcdClient + helper tools.EtcdHelper machines MinionRegistry manifestFactory ManifestFactory } @@ -43,8 +44,9 @@ type EtcdRegistry struct { // 'scheduler' is the scheduling algorithm to use. func MakeEtcdRegistry(client tools.EtcdClient, machines MinionRegistry) *EtcdRegistry { registry := &EtcdRegistry{ - etcdClient: client, - machines: machines, + client: client, + helper: tools.EtcdHelper{client, api.Encoding, api.Versioning}, + machines: machines, } registry.manifestFactory = &BasicManifestFactory{ serviceRegistry: registry, @@ -56,10 +58,6 @@ func makePodKey(machine, podID string) string { return "/registry/hosts/" + machine + "/pods/" + podID } -func (registry *EtcdRegistry) helper() *tools.EtcdHelper { - return &tools.EtcdHelper{registry.etcdClient} -} - // ListPods obtains a list of pods that match selector. func (registry *EtcdRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) { pods := []api.Pod{} @@ -69,7 +67,7 @@ func (registry *EtcdRegistry) ListPods(selector labels.Selector) ([]api.Pod, err } for _, machine := range machines { var machinePods []api.Pod - err := registry.helper().ExtractList("/registry/hosts/"+machine+"/pods", &machinePods) + err := registry.helper.ExtractList("/registry/hosts/"+machine+"/pods", &machinePods) if err != nil { return pods, err } @@ -105,7 +103,7 @@ func (registry *EtcdRegistry) CreatePod(machineIn string, pod api.Pod) error { func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error { podKey := makePodKey(machine, pod.ID) - err := registry.helper().SetObj(podKey, pod) + err := registry.helper.SetObj(podKey, pod) manifest, err := registry.manifestFactory.MakeManifest(machine, pod) if err != nil { @@ -113,14 +111,14 @@ func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error { } contKey := makeContainerKey(machine) - err = registry.helper().AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) { + err = registry.helper.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) { manifests := *in.(*api.ContainerManifestList) manifests.Items = append(manifests.Items, manifest) return manifests, nil }) if err != nil { // Don't strand stuff. - _, err2 := registry.etcdClient.Delete(podKey, false) + _, err2 := registry.client.Delete(podKey, false) if err2 != nil { glog.Errorf("Probably stranding a pod, couldn't delete %v: %#v", podKey, err2) } @@ -145,7 +143,7 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error // First delete the pod, so a scheduler doesn't notice it getting removed from the // machine and attempt to put it somewhere. podKey := makePodKey(machine, podID) - _, err := registry.etcdClient.Delete(podKey, true) + _, err := registry.client.Delete(podKey, true) if tools.IsEtcdNotFound(err) { return apiserver.NewNotFoundErr("pod", podID) } @@ -155,7 +153,7 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error // Next, remove the pod from the machine atomically. contKey := makeContainerKey(machine) - return registry.helper().AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) { + return registry.helper.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) { manifests := in.(*api.ContainerManifestList) newManifests := make([]api.ContainerManifest, 0, len(manifests.Items)) found := false @@ -179,7 +177,7 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.Pod, err error) { key := makePodKey(machine, podID) - err = registry.helper().ExtractObj(key, &pod, false) + err = registry.helper.ExtractObj(key, &pod, false) if err != nil { return } @@ -204,14 +202,14 @@ func (registry *EtcdRegistry) findPod(podID string) (api.Pod, string, error) { // ListControllers obtains a list of ReplicationControllers. func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, error) { var controllers []api.ReplicationController - err := registry.helper().ExtractList("/registry/controllers", &controllers) + err := registry.helper.ExtractList("/registry/controllers", &controllers) return controllers, err } // WatchControllers begins watching for new, changed, or deleted controllers. // TODO: Add id/selector parameters? func (registry *EtcdRegistry) WatchControllers() (watch.Interface, error) { - return registry.helper().WatchList("/registry/controllers", tools.Everything) + return registry.helper.WatchList("/registry/controllers", tools.Everything) } func makeControllerKey(id string) string { @@ -222,7 +220,7 @@ func makeControllerKey(id string) string { func (registry *EtcdRegistry) GetController(controllerID string) (*api.ReplicationController, error) { var controller api.ReplicationController key := makeControllerKey(controllerID) - err := registry.helper().ExtractObj(key, &controller, false) + err := registry.helper.ExtractObj(key, &controller, false) if tools.IsEtcdNotFound(err) { return nil, apiserver.NewNotFoundErr("replicationController", controllerID) } @@ -240,13 +238,13 @@ func (registry *EtcdRegistry) CreateController(controller api.ReplicationControl // UpdateController replaces an existing ReplicationController. func (registry *EtcdRegistry) UpdateController(controller api.ReplicationController) error { - return registry.helper().SetObj(makeControllerKey(controller.ID), controller) + return registry.helper.SetObj(makeControllerKey(controller.ID), controller) } // DeleteController deletes a ReplicationController specified by its ID. func (registry *EtcdRegistry) DeleteController(controllerID string) error { key := makeControllerKey(controllerID) - _, err := registry.etcdClient.Delete(key, false) + _, err := registry.client.Delete(key, false) if tools.IsEtcdNotFound(err) { return apiserver.NewNotFoundErr("replicationController", controllerID) } @@ -260,20 +258,20 @@ func makeServiceKey(name string) string { // ListServices obtains a list of Services. func (registry *EtcdRegistry) ListServices() (api.ServiceList, error) { var list api.ServiceList - err := registry.helper().ExtractList("/registry/services/specs", &list.Items) + err := registry.helper.ExtractList("/registry/services/specs", &list.Items) return list, err } // CreateService creates a new Service. func (registry *EtcdRegistry) CreateService(svc api.Service) error { - return registry.helper().SetObj(makeServiceKey(svc.ID), svc) + return registry.helper.SetObj(makeServiceKey(svc.ID), svc) } // GetService obtains a Service specified by its name. func (registry *EtcdRegistry) GetService(name string) (*api.Service, error) { key := makeServiceKey(name) var svc api.Service - err := registry.helper().ExtractObj(key, &svc, false) + err := registry.helper.ExtractObj(key, &svc, false) if tools.IsEtcdNotFound(err) { return nil, apiserver.NewNotFoundErr("service", name) } @@ -290,7 +288,7 @@ func makeServiceEndpointsKey(name string) string { // DeleteService deletes a Service specified by its name. func (registry *EtcdRegistry) DeleteService(name string) error { key := makeServiceKey(name) - _, err := registry.etcdClient.Delete(key, true) + _, err := registry.client.Delete(key, true) if tools.IsEtcdNotFound(err) { return apiserver.NewNotFoundErr("service", name) } @@ -298,7 +296,7 @@ func (registry *EtcdRegistry) DeleteService(name string) error { return err } key = makeServiceEndpointsKey(name) - _, err = registry.etcdClient.Delete(key, true) + _, err = registry.client.Delete(key, true) if !tools.IsEtcdNotFound(err) { return err } @@ -313,5 +311,5 @@ func (registry *EtcdRegistry) UpdateService(svc api.Service) error { // UpdateEndpoints update Endpoints of a Service. func (registry *EtcdRegistry) UpdateEndpoints(e api.Endpoints) error { - return registry.helper().SetObj("/registry/services/endpoints/"+e.ID, e) + return registry.helper.SetObj("/registry/services/endpoints/"+e.ID, e) } diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 4753c9127bd..b186cdec5d2 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -21,7 +21,6 @@ import ( "reflect" "sync" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/coreos/go-etcd/etcd" @@ -42,6 +41,17 @@ var ( EtcdErrorValueRequired = &etcd.EtcdError{ErrorCode: EtcdErrorCodeValueRequired} ) +type Encoding interface { + Encode(obj interface{}) (data []byte, err error) + Decode(data []byte) (interface{}, error) + DecodeInto(data []byte, obj interface{}) error +} + +type Versioning interface { + SetResourceVersion(obj interface{}, version uint64) error + ResourceVersion(obj interface{}) (uint64, error) +} + // EtcdClient is an injectable interface for testing. type EtcdClient interface { AddChild(key, data string, ttl uint64) (*etcd.Response, error) @@ -65,7 +75,10 @@ type EtcdGetSet interface { // EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client. type EtcdHelper struct { - Client EtcdGetSet + Client EtcdGetSet + Encoding Encoding + // optional + Versioning Versioning } // Returns true iff err is an etcd not found error. @@ -116,7 +129,7 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error { v := pv.Elem() for _, node := range nodes { obj := reflect.New(v.Type().Elem()) - err = api.DecodeInto([]byte(node.Value), obj.Interface()) + err = h.Encoding.DecodeInto([]byte(node.Value), obj.Interface()) if err != nil { return err } @@ -150,12 +163,10 @@ func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNot return "", 0, fmt.Errorf("key '%v' found no nodes field: %#v", key, response) } body = response.Node.Value - err = api.DecodeInto([]byte(body), objPtr) - if jsonBase, err := api.FindJSONBase(objPtr); err == nil { - jsonBase.SetResourceVersion(response.Node.ModifiedIndex) - // Note that err shadows the err returned below, so we won't - // return an error just because we failed to find a JSONBase. - // This is intentional. + err = h.Encoding.DecodeInto([]byte(body), objPtr) + if h.Versioning != nil { + _ = h.Versioning.SetResourceVersion(objPtr, response.Node.ModifiedIndex) + // being unable to set the version does not prevent the object from being extracted } return body, response.Node.ModifiedIndex, err } @@ -163,13 +174,15 @@ func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNot // SetObj marshals obj via json, and stores under key. Will do an // atomic update if obj's ResourceVersion field is set. func (h *EtcdHelper) SetObj(key string, obj interface{}) error { - data, err := api.Encode(obj) + data, err := h.Encoding.Encode(obj) if err != nil { return err } - if jsonBase, err := api.FindJSONBaseRO(obj); err == nil && jsonBase.ResourceVersion != 0 { - _, err = h.Client.CompareAndSwap(key, string(data), 0, "", jsonBase.ResourceVersion) - return err // err is shadowed! + if h.Versioning != nil { + if version, err := h.Versioning.ResourceVersion(obj); err == nil && version != 0 { + _, err = h.Client.CompareAndSwap(key, string(data), 0, "", version) + return err // err is shadowed! + } } // TODO: when client supports atomic creation, integrate this with the above. @@ -186,7 +199,7 @@ type EtcdUpdateFunc func(input interface{}) (output interface{}, err error) // // Example: // -// h := &util.EtcdHelper{client} +// h := &util.EtcdHelper{client, encoding, versioning} // err := h.AtomicUpdate("myKey", &MyType{}, func(input interface{}) (interface{}, error) { // // Before this function is called, currentObj has been reset to etcd's current // // contents for "myKey". @@ -225,7 +238,7 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType interface{}, tryUpdate E return h.SetObj(key, ret) } - data, err := api.Encode(ret) + data, err := h.Encoding.Encode(ret) if err != nil { return err } @@ -250,7 +263,7 @@ func Everything(interface{}) bool { // API objects, and any items passing 'filter' are sent down the returned // watch.Interface. func (h *EtcdHelper) WatchList(key string, filter FilterFunc) (watch.Interface, error) { - w := newEtcdWatcher(true, filter) + w := newEtcdWatcher(true, filter, h.Encoding) go w.etcdWatch(h.Client, key) return w, nil } @@ -258,13 +271,15 @@ func (h *EtcdHelper) WatchList(key string, filter FilterFunc) (watch.Interface, // Watch begins watching the specified key. Events are decoded into // API objects and sent down the returned watch.Interface. func (h *EtcdHelper) Watch(key string) (watch.Interface, error) { - w := newEtcdWatcher(false, nil) + w := newEtcdWatcher(false, nil, h.Encoding) go w.etcdWatch(h.Client, key) return w, nil } // etcdWatcher converts a native etcd watch to a watch.Interface. type etcdWatcher struct { + encoding Encoding + list bool // If we're doing a recursive watch, should be true. filter FilterFunc @@ -282,8 +297,9 @@ type etcdWatcher struct { } // Returns a new etcdWatcher; if list is true, watch sub-nodes. -func newEtcdWatcher(list bool, filter FilterFunc) *etcdWatcher { +func newEtcdWatcher(list bool, filter FilterFunc, encoding Encoding) *etcdWatcher { w := &etcdWatcher{ + encoding: encoding, list: list, filter: filter, etcdIncoming: make(chan *etcd.Response), @@ -358,7 +374,7 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) { return } - obj, err := api.Decode(data) + obj, err := w.encoding.Decode(data) if err != nil { glog.Errorf("failure to decode api object: '%v' from %#v", string(data), res) // TODO: expose an error through watch.Interface? diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go index 45babe45536..e70c1df6b9b 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -23,12 +23,13 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/conversion" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/coreos/go-etcd/etcd" ) -type fakeEtcdGetSet struct { +type fakeClientGetSet struct { get func(key string, sort, recursive bool) (*etcd.Response, error) set func(key, value string, ttl uint64) (*etcd.Response, error) } @@ -38,9 +39,15 @@ type TestResource struct { Value int `json:"value" yaml:"value,omitempty"` } +var scheme *conversion.Scheme +var encoding = api.Encoding +var versioning = api.Versioning + func init() { - api.AddKnownTypes("", TestResource{}) - api.AddKnownTypes("v1beta1", TestResource{}) + scheme = conversion.NewScheme() + scheme.ExternalVersion = "v1beta1" + scheme.AddKnownTypes("", TestResource{}) + scheme.AddKnownTypes("v1beta1", TestResource{}) } func TestIsNotFoundErr(t *testing.T) { @@ -80,7 +87,7 @@ func TestExtractList(t *testing.T) { {JSONBase: api.JSONBase{ID: "baz"}}, } var got []api.Pod - helper := EtcdHelper{fakeClient} + helper := EtcdHelper{fakeClient, encoding, versioning} err := helper.ExtractList("/some/key", &got) if err != nil { t.Errorf("Unexpected error %#v", err) @@ -94,7 +101,7 @@ func TestExtractObj(t *testing.T) { fakeClient := MakeFakeEtcdClient(t) expect := api.Pod{JSONBase: api.JSONBase{ID: "foo"}} fakeClient.Set("/some/key", util.MakeJSONString(expect), 0) - helper := EtcdHelper{fakeClient} + helper := EtcdHelper{fakeClient, encoding, versioning} var got api.Pod err := helper.ExtractObj("/some/key", &got, false) if err != nil { @@ -127,7 +134,7 @@ func TestExtractObjNotFoundErr(t *testing.T) { }, }, } - helper := EtcdHelper{fakeClient} + helper := EtcdHelper{fakeClient, encoding, versioning} try := func(key string) { var got api.Pod err := helper.ExtractObj(key, &got, false) @@ -148,12 +155,60 @@ func TestExtractObjNotFoundErr(t *testing.T) { func TestSetObj(t *testing.T) { obj := api.Pod{JSONBase: api.JSONBase{ID: "foo"}} fakeClient := MakeFakeEtcdClient(t) - helper := EtcdHelper{fakeClient} + helper := EtcdHelper{fakeClient, encoding, versioning} err := helper.SetObj("/some/key", obj) if err != nil { t.Errorf("Unexpected error %#v", err) } - data, err := api.Encode(obj) + data, err := encoding.Encode(obj) + if err != nil { + t.Errorf("Unexpected error %#v", err) + } + expect := string(data) + got := fakeClient.Data["/some/key"].R.Node.Value + if expect != got { + t.Errorf("Wanted %v, got %v", expect, got) + } +} + +func TestSetObjWithVersion(t *testing.T) { + obj := api.Pod{JSONBase: api.JSONBase{ID: "foo", ResourceVersion: 1}} + fakeClient := MakeFakeEtcdClient(t) + fakeClient.TestIndex = true + fakeClient.Data["/some/key"] = EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: api.EncodeOrDie(obj), + ModifiedIndex: 1, + }, + }, + } + + helper := EtcdHelper{fakeClient, encoding, versioning} + err := helper.SetObj("/some/key", obj) + if err != nil { + t.Fatalf("Unexpected error %#v", err) + } + data, err := encoding.Encode(obj) + if err != nil { + t.Fatalf("Unexpected error %#v", err) + } + expect := string(data) + got := fakeClient.Data["/some/key"].R.Node.Value + if expect != got { + t.Errorf("Wanted %v, got %v", expect, got) + } +} + +func TestSetObjWithoutVersioning(t *testing.T) { + obj := api.Pod{JSONBase: api.JSONBase{ID: "foo"}} + fakeClient := MakeFakeEtcdClient(t) + helper := EtcdHelper{fakeClient, encoding, nil} + err := helper.SetObj("/some/key", obj) + if err != nil { + t.Errorf("Unexpected error %#v", err) + } + data, err := encoding.Encode(obj) if err != nil { t.Errorf("Unexpected error %#v", err) } @@ -167,7 +222,8 @@ func TestSetObj(t *testing.T) { func TestAtomicUpdate(t *testing.T) { fakeClient := MakeFakeEtcdClient(t) fakeClient.TestIndex = true - helper := EtcdHelper{fakeClient} + encoding := scheme + helper := EtcdHelper{fakeClient, encoding, api.JSONBaseVersioning{}} // Create a new node. fakeClient.ExpectNotFoundGet("/some/key") @@ -178,7 +234,7 @@ func TestAtomicUpdate(t *testing.T) { if err != nil { t.Errorf("Unexpected error %#v", err) } - data, err := api.Encode(obj) + data, err := encoding.Encode(obj) if err != nil { t.Errorf("Unexpected error %#v", err) } @@ -204,7 +260,7 @@ func TestAtomicUpdate(t *testing.T) { if err != nil { t.Errorf("Unexpected error %#v", err) } - data, err = api.Encode(objUpdate) + data, err = encoding.Encode(objUpdate) if err != nil { t.Errorf("Unexpected error %#v", err) } @@ -223,9 +279,9 @@ func TestWatchInterpretation_ListAdd(t *testing.T) { w := newEtcdWatcher(true, func(interface{}) bool { t.Errorf("unexpected filter call") return true - }) + }, encoding) pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} - podBytes, _ := api.Encode(pod) + podBytes, _ := encoding.Encode(pod) go w.sendResult(&etcd.Response{ Action: "set", @@ -247,9 +303,9 @@ func TestWatchInterpretation_Delete(t *testing.T) { w := newEtcdWatcher(true, func(interface{}) bool { t.Errorf("unexpected filter call") return true - }) + }, encoding) pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} - podBytes, _ := api.Encode(pod) + podBytes, _ := encoding.Encode(pod) go w.sendResult(&etcd.Response{ Action: "delete", @@ -271,7 +327,7 @@ func TestWatchInterpretation_ResponseNotSet(t *testing.T) { w := newEtcdWatcher(false, func(interface{}) bool { t.Errorf("unexpected filter call") return true - }) + }, encoding) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -285,7 +341,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) { w := newEtcdWatcher(false, func(interface{}) bool { t.Errorf("unexpected filter call") return true - }) + }, encoding) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -298,7 +354,7 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) { w := newEtcdWatcher(false, func(interface{}) bool { t.Errorf("unexpected filter call") return true - }) + }, encoding) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -311,20 +367,20 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) { } func TestWatch(t *testing.T) { - fakeEtcd := MakeFakeEtcdClient(t) - h := EtcdHelper{fakeEtcd} + fakeClient := MakeFakeEtcdClient(t) + h := EtcdHelper{fakeClient, encoding, versioning} watching, err := h.Watch("/some/key") if err != nil { t.Fatalf("Unexpected error: %v", err) } - fakeEtcd.WaitForWatchCompletion() + fakeClient.WaitForWatchCompletion() // Test normal case pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} - podBytes, _ := api.Encode(pod) - fakeEtcd.WatchResponse <- &etcd.Response{ + podBytes, _ := encoding.Encode(pod) + fakeClient.WatchResponse <- &etcd.Response{ Action: "set", Node: &etcd.Node{ Value: string(podBytes), @@ -344,10 +400,10 @@ func TestWatch(t *testing.T) { } // Test error case - fakeEtcd.WatchInjectError <- fmt.Errorf("Injected error") + fakeClient.WatchInjectError <- fmt.Errorf("Injected error") // Did everything shut down? - if _, open := <-fakeEtcd.WatchResponse; open { + if _, open := <-fakeClient.WatchResponse; open { t.Errorf("An injected error did not cause a graceful shutdown") } if _, open := <-watching.ResultChan(); open { @@ -356,19 +412,19 @@ func TestWatch(t *testing.T) { } func TestWatchPurposefulShutdown(t *testing.T) { - fakeEtcd := MakeFakeEtcdClient(t) - h := EtcdHelper{fakeEtcd} + fakeClient := MakeFakeEtcdClient(t) + h := EtcdHelper{fakeClient, encoding, versioning} // Test purposeful shutdown watching, err := h.Watch("/some/key") if err != nil { t.Fatalf("Unexpected error: %v", err) } - fakeEtcd.WaitForWatchCompletion() + fakeClient.WaitForWatchCompletion() watching.Stop() // Did everything shut down? - if _, open := <-fakeEtcd.WatchResponse; open { + if _, open := <-fakeClient.WatchResponse; open { t.Errorf("A stop did not cause a graceful shutdown") } if _, open := <-watching.ResultChan(); open {