Merge pull request #806 from smarterclayton/follow_up_to_encoding_abstraction

Address remaining comments from #756
This commit is contained in:
Daniel Smith 2014-08-06 10:47:41 -07:00
commit 8c529187b5
6 changed files with 88 additions and 71 deletions

View File

@ -25,19 +25,23 @@ import (
"gopkg.in/v1/yaml" "gopkg.in/v1/yaml"
) )
type EncodingInterface interface { // codec defines methods for serializing and deserializing API
// objects
type codec interface {
Encode(obj interface{}) (data []byte, err error) Encode(obj interface{}) (data []byte, err error)
Decode(data []byte) (interface{}, error) Decode(data []byte) (interface{}, error)
DecodeInto(data []byte, obj interface{}) error DecodeInto(data []byte, obj interface{}) error
} }
type VersioningInterface interface { // resourceVersioner provides methods for setting and retrieving
// the resource version from an API object
type resourceVersioner interface {
SetResourceVersion(obj interface{}, version uint64) error SetResourceVersion(obj interface{}, version uint64) error
ResourceVersion(obj interface{}) (uint64, error) ResourceVersion(obj interface{}) (uint64, error)
} }
var Encoding EncodingInterface var Codec codec
var Versioning VersioningInterface var ResourceVersioner resourceVersioner
var conversionScheme *conversion.Scheme var conversionScheme *conversion.Scheme
@ -101,8 +105,8 @@ func init() {
}, },
) )
Encoding = conversionScheme Codec = conversionScheme
Versioning = JSONBaseVersioning{} ResourceVersioner = NewJSONBaseResourceVersioner()
} }
// AddKnownTypes registers the types of the arguments to the marshaller of the package api. // AddKnownTypes registers the types of the arguments to the marshaller of the package api.

View File

@ -21,10 +21,15 @@ import (
"reflect" "reflect"
) )
// versionedJSONBase allows access to the version state of a JSONBase object // NewJSONBaseVersioner returns a resourceVersioner that can set or retrieve
type JSONBaseVersioning struct{} // ResourceVersion on objects derived from JSONBase.
func NewJSONBaseResourceVersioner() resourceVersioner {
return &jsonBaseResourceVersioner{}
}
func (v JSONBaseVersioning) ResourceVersion(obj interface{}) (uint64, error) { type jsonBaseResourceVersioner struct{}
func (v jsonBaseResourceVersioner) ResourceVersion(obj interface{}) (uint64, error) {
json, err := FindJSONBaseRO(obj) json, err := FindJSONBaseRO(obj)
if err != nil { if err != nil {
return 0, err return 0, err
@ -32,7 +37,7 @@ func (v JSONBaseVersioning) ResourceVersion(obj interface{}) (uint64, error) {
return json.ResourceVersion, nil return json.ResourceVersion, nil
} }
func (v JSONBaseVersioning) SetResourceVersion(obj interface{}, version uint64) error { func (v jsonBaseResourceVersioner) SetResourceVersion(obj interface{}, version uint64) error {
json, err := FindJSONBase(obj) json, err := FindJSONBase(obj)
if err != nil { if err != nil {
return err return err

View File

@ -67,7 +67,7 @@ func TestGenericJSONBase(t *testing.T) {
} }
} }
func TestVersioningOfAPI(t *testing.T) { func TestResourceVersionerOfAPI(t *testing.T) {
type T struct { type T struct {
Object interface{} Object interface{}
Expected uint64 Expected uint64
@ -77,7 +77,7 @@ func TestVersioningOfAPI(t *testing.T) {
"api object with version": {Service{JSONBase: JSONBase{ResourceVersion: 1}}, 1}, "api object with version": {Service{JSONBase: JSONBase{ResourceVersion: 1}}, 1},
"pointer to api object with version": {&Service{JSONBase: JSONBase{ResourceVersion: 1}}, 1}, "pointer to api object with version": {&Service{JSONBase: JSONBase{ResourceVersion: 1}}, 1},
} }
versioning := JSONBaseVersioning{} versioning := NewJSONBaseResourceVersioner()
for key, testCase := range testCases { for key, testCase := range testCases {
actual, err := versioning.ResourceVersion(testCase.Object) actual, err := versioning.ResourceVersion(testCase.Object)
if err != nil { if err != nil {

View File

@ -32,7 +32,6 @@ import (
// EtcdRegistry implements PodRegistry, ControllerRegistry and ServiceRegistry with backed by etcd. // EtcdRegistry implements PodRegistry, ControllerRegistry and ServiceRegistry with backed by etcd.
type EtcdRegistry struct { type EtcdRegistry struct {
client tools.EtcdClient
helper tools.EtcdHelper helper tools.EtcdHelper
machines MinionRegistry machines MinionRegistry
manifestFactory ManifestFactory manifestFactory ManifestFactory
@ -44,8 +43,7 @@ type EtcdRegistry struct {
// 'scheduler' is the scheduling algorithm to use. // 'scheduler' is the scheduling algorithm to use.
func MakeEtcdRegistry(client tools.EtcdClient, machines MinionRegistry) *EtcdRegistry { func MakeEtcdRegistry(client tools.EtcdClient, machines MinionRegistry) *EtcdRegistry {
registry := &EtcdRegistry{ registry := &EtcdRegistry{
client: client, helper: tools.EtcdHelper{client, api.Codec, api.ResourceVersioner},
helper: tools.EtcdHelper{client, api.Encoding, api.Versioning},
machines: machines, machines: machines,
} }
registry.manifestFactory = &BasicManifestFactory{ registry.manifestFactory = &BasicManifestFactory{
@ -118,7 +116,7 @@ func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error {
}) })
if err != nil { if err != nil {
// Don't strand stuff. // Don't strand stuff.
_, err2 := registry.client.Delete(podKey, false) err2 := registry.helper.Delete(podKey, false)
if err2 != nil { if err2 != nil {
glog.Errorf("Probably stranding a pod, couldn't delete %v: %#v", podKey, err2) glog.Errorf("Probably stranding a pod, couldn't delete %v: %#v", podKey, err2)
} }
@ -143,7 +141,7 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error
// First delete the pod, so a scheduler doesn't notice it getting removed from the // First delete the pod, so a scheduler doesn't notice it getting removed from the
// machine and attempt to put it somewhere. // machine and attempt to put it somewhere.
podKey := makePodKey(machine, podID) podKey := makePodKey(machine, podID)
_, err := registry.client.Delete(podKey, true) err := registry.helper.Delete(podKey, true)
if tools.IsEtcdNotFound(err) { if tools.IsEtcdNotFound(err) {
return apiserver.NewNotFoundErr("pod", podID) return apiserver.NewNotFoundErr("pod", podID)
} }
@ -247,7 +245,7 @@ func (registry *EtcdRegistry) UpdateController(controller api.ReplicationControl
// DeleteController deletes a ReplicationController specified by its ID. // DeleteController deletes a ReplicationController specified by its ID.
func (registry *EtcdRegistry) DeleteController(controllerID string) error { func (registry *EtcdRegistry) DeleteController(controllerID string) error {
key := makeControllerKey(controllerID) key := makeControllerKey(controllerID)
_, err := registry.client.Delete(key, false) err := registry.helper.Delete(key, false)
if tools.IsEtcdNotFound(err) { if tools.IsEtcdNotFound(err) {
return apiserver.NewNotFoundErr("replicationController", controllerID) return apiserver.NewNotFoundErr("replicationController", controllerID)
} }
@ -295,7 +293,7 @@ func makeServiceEndpointsKey(name string) string {
// DeleteService deletes a Service specified by its name. // DeleteService deletes a Service specified by its name.
func (registry *EtcdRegistry) DeleteService(name string) error { func (registry *EtcdRegistry) DeleteService(name string) error {
key := makeServiceKey(name) key := makeServiceKey(name)
_, err := registry.client.Delete(key, true) err := registry.helper.Delete(key, true)
if tools.IsEtcdNotFound(err) { if tools.IsEtcdNotFound(err) {
return apiserver.NewNotFoundErr("service", name) return apiserver.NewNotFoundErr("service", name)
} }
@ -303,7 +301,7 @@ func (registry *EtcdRegistry) DeleteService(name string) error {
return err return err
} }
key = makeServiceEndpointsKey(name) key = makeServiceEndpointsKey(name)
_, err = registry.client.Delete(key, true) err = registry.helper.Delete(key, true)
if !tools.IsEtcdNotFound(err) { if !tools.IsEtcdNotFound(err) {
return err return err
} }

View File

@ -42,13 +42,15 @@ var (
EtcdErrorValueRequired = &etcd.EtcdError{ErrorCode: EtcdErrorCodeValueRequired} EtcdErrorValueRequired = &etcd.EtcdError{ErrorCode: EtcdErrorCodeValueRequired}
) )
type Encoding interface { // Codec provides methods for transforming Etcd values into objects and back
type Codec interface {
Encode(obj interface{}) (data []byte, err error) Encode(obj interface{}) (data []byte, err error)
Decode(data []byte) (interface{}, error) Decode(data []byte) (interface{}, error)
DecodeInto(data []byte, obj interface{}) error DecodeInto(data []byte, obj interface{}) error
} }
type Versioning interface { // ResourceVersioner provides methods for managing object modification tracking
type ResourceVersioner interface {
SetResourceVersion(obj interface{}, version uint64) error SetResourceVersion(obj interface{}, version uint64) error
ResourceVersion(obj interface{}) (uint64, error) ResourceVersion(obj interface{}) (uint64, error)
} }
@ -71,16 +73,17 @@ type EtcdGetSet interface {
Get(key string, sort, recursive bool) (*etcd.Response, error) Get(key string, sort, recursive bool) (*etcd.Response, error)
Set(key, value string, ttl uint64) (*etcd.Response, error) Set(key, value string, ttl uint64) (*etcd.Response, error)
Create(key, value string, ttl uint64) (*etcd.Response, error) Create(key, value string, ttl uint64) (*etcd.Response, error)
Delete(key string, recursive bool) (*etcd.Response, error)
CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error)
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
} }
// EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client. // EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client.
type EtcdHelper struct { type EtcdHelper struct {
Client EtcdGetSet Client EtcdGetSet
Encoding Encoding Codec Codec
// optional // optional, no atomic operations can be performed without this interface
Versioning Versioning ResourceVersioner ResourceVersioner
} }
// IsEtcdNotFound returns true iff err is an etcd not found error. // IsEtcdNotFound returns true iff err is an etcd not found error.
@ -136,7 +139,7 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error {
v := pv.Elem() v := pv.Elem()
for _, node := range nodes { for _, node := range nodes {
obj := reflect.New(v.Type().Elem()) obj := reflect.New(v.Type().Elem())
err = h.Encoding.DecodeInto([]byte(node.Value), obj.Interface()) err = h.Codec.DecodeInto([]byte(node.Value), obj.Interface())
if err != nil { if err != nil {
return err return err
} }
@ -145,7 +148,7 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error {
return nil return nil
} }
// Unmarshals json found at key into objPtr. On a not found error, will either return // ExtractObj unmarshals json found at key into objPtr. On a not found error, will either return
// a zero object of the requested type, or an error, depending on ignoreNotFound. Treats // a zero object of the requested type, or an error, depending on ignoreNotFound. Treats
// empty responses and nil response nodes exactly like a not found error. // empty responses and nil response nodes exactly like a not found error.
func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error { func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error {
@ -170,21 +173,22 @@ func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNot
return "", 0, fmt.Errorf("key '%v' found no nodes field: %#v", key, response) return "", 0, fmt.Errorf("key '%v' found no nodes field: %#v", key, response)
} }
body = response.Node.Value body = response.Node.Value
err = h.Encoding.DecodeInto([]byte(body), objPtr) err = h.Codec.DecodeInto([]byte(body), objPtr)
if h.Versioning != nil { if h.ResourceVersioner != nil {
_ = h.Versioning.SetResourceVersion(objPtr, response.Node.ModifiedIndex) _ = h.ResourceVersioner.SetResourceVersion(objPtr, response.Node.ModifiedIndex)
// being unable to set the version does not prevent the object from being extracted // being unable to set the version does not prevent the object from being extracted
} }
return body, response.Node.ModifiedIndex, err return body, response.Node.ModifiedIndex, err
} }
// Create adds a new object at a key unless it already exists
func (h *EtcdHelper) CreateObj(key string, obj interface{}) error { func (h *EtcdHelper) CreateObj(key string, obj interface{}) error {
data, err := h.Encoding.Encode(obj) data, err := h.Codec.Encode(obj)
if err != nil { if err != nil {
return err return err
} }
if h.Versioning != nil { if h.ResourceVersioner != nil {
if version, err := h.Versioning.ResourceVersion(obj); err == nil && version != 0 { if version, err := h.ResourceVersioner.ResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion may not be set on objects to be created") return errors.New("resourceVersion may not be set on objects to be created")
} }
} }
@ -193,15 +197,21 @@ func (h *EtcdHelper) CreateObj(key string, obj interface{}) error {
return err return err
} }
// Delete removes the specified key
func (h *EtcdHelper) Delete(key string, recursive bool) error {
_, err := h.Client.Delete(key, recursive)
return err
}
// SetObj marshals obj via json, and stores under key. Will do an // SetObj marshals obj via json, and stores under key. Will do an
// atomic update if obj's ResourceVersion field is set. // atomic update if obj's ResourceVersion field is set.
func (h *EtcdHelper) SetObj(key string, obj interface{}) error { func (h *EtcdHelper) SetObj(key string, obj interface{}) error {
data, err := h.Encoding.Encode(obj) data, err := h.Codec.Encode(obj)
if err != nil { if err != nil {
return err return err
} }
if h.Versioning != nil { if h.ResourceVersioner != nil {
if version, err := h.Versioning.ResourceVersion(obj); err == nil && version != 0 { if version, err := h.ResourceVersioner.ResourceVersion(obj); err == nil && version != 0 {
_, err = h.Client.CompareAndSwap(key, string(data), 0, "", version) _, err = h.Client.CompareAndSwap(key, string(data), 0, "", version)
return err // err is shadowed! return err // err is shadowed!
} }
@ -253,7 +263,7 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType interface{}, tryUpdate E
return err return err
} }
data, err := h.Encoding.Encode(ret) data, err := h.Codec.Encode(ret)
if err != nil { if err != nil {
return err return err
} }
@ -288,7 +298,7 @@ func Everything(interface{}) bool {
// API objects, and any items passing 'filter' are sent down the returned // API objects, and any items passing 'filter' are sent down the returned
// watch.Interface. // watch.Interface.
func (h *EtcdHelper) WatchList(key string, filter FilterFunc) (watch.Interface, error) { func (h *EtcdHelper) WatchList(key string, filter FilterFunc) (watch.Interface, error) {
w := newEtcdWatcher(true, filter, h.Encoding) w := newEtcdWatcher(true, filter, h.Codec)
go w.etcdWatch(h.Client, key) go w.etcdWatch(h.Client, key)
return w, nil return w, nil
} }
@ -296,14 +306,14 @@ func (h *EtcdHelper) WatchList(key string, filter FilterFunc) (watch.Interface,
// Watch begins watching the specified key. Events are decoded into // Watch begins watching the specified key. Events are decoded into
// API objects and sent down the returned watch.Interface. // API objects and sent down the returned watch.Interface.
func (h *EtcdHelper) Watch(key string) (watch.Interface, error) { func (h *EtcdHelper) Watch(key string) (watch.Interface, error) {
w := newEtcdWatcher(false, nil, h.Encoding) w := newEtcdWatcher(false, nil, h.Codec)
go w.etcdWatch(h.Client, key) go w.etcdWatch(h.Client, key)
return w, nil return w, nil
} }
// etcdWatcher converts a native etcd watch to a watch.Interface. // etcdWatcher converts a native etcd watch to a watch.Interface.
type etcdWatcher struct { type etcdWatcher struct {
encoding Encoding encoding Codec
list bool // If we're doing a recursive watch, should be true. list bool // If we're doing a recursive watch, should be true.
filter FilterFunc filter FilterFunc
@ -322,7 +332,7 @@ type etcdWatcher struct {
} }
// Returns a new etcdWatcher; if list is true, watch sub-nodes. // Returns a new etcdWatcher; if list is true, watch sub-nodes.
func newEtcdWatcher(list bool, filter FilterFunc, encoding Encoding) *etcdWatcher { func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec) *etcdWatcher {
w := &etcdWatcher{ w := &etcdWatcher{
encoding: encoding, encoding: encoding,
list: list, list: list,

View File

@ -40,8 +40,8 @@ type TestResource struct {
} }
var scheme *conversion.Scheme var scheme *conversion.Scheme
var encoding = api.Encoding var codec = api.Codec
var versioning = api.Versioning var versioner = api.ResourceVersioner
func init() { func init() {
scheme = conversion.NewScheme() scheme = conversion.NewScheme()
@ -87,7 +87,7 @@ func TestExtractList(t *testing.T) {
{JSONBase: api.JSONBase{ID: "baz"}}, {JSONBase: api.JSONBase{ID: "baz"}},
} }
var got []api.Pod var got []api.Pod
helper := EtcdHelper{fakeClient, encoding, versioning} helper := EtcdHelper{fakeClient, codec, versioner}
err := helper.ExtractList("/some/key", &got) err := helper.ExtractList("/some/key", &got)
if err != nil { if err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)
@ -101,7 +101,7 @@ func TestExtractObj(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := MakeFakeEtcdClient(t)
expect := api.Pod{JSONBase: api.JSONBase{ID: "foo"}} expect := api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
fakeClient.Set("/some/key", util.MakeJSONString(expect), 0) fakeClient.Set("/some/key", util.MakeJSONString(expect), 0)
helper := EtcdHelper{fakeClient, encoding, versioning} helper := EtcdHelper{fakeClient, codec, versioner}
var got api.Pod var got api.Pod
err := helper.ExtractObj("/some/key", &got, false) err := helper.ExtractObj("/some/key", &got, false)
if err != nil { if err != nil {
@ -134,7 +134,7 @@ func TestExtractObjNotFoundErr(t *testing.T) {
}, },
}, },
} }
helper := EtcdHelper{fakeClient, encoding, versioning} helper := EtcdHelper{fakeClient, codec, versioner}
try := func(key string) { try := func(key string) {
var got api.Pod var got api.Pod
err := helper.ExtractObj(key, &got, false) err := helper.ExtractObj(key, &got, false)
@ -155,12 +155,12 @@ func TestExtractObjNotFoundErr(t *testing.T) {
func TestSetObj(t *testing.T) { func TestSetObj(t *testing.T) {
obj := api.Pod{JSONBase: api.JSONBase{ID: "foo"}} obj := api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
fakeClient := MakeFakeEtcdClient(t) fakeClient := MakeFakeEtcdClient(t)
helper := EtcdHelper{fakeClient, encoding, versioning} helper := EtcdHelper{fakeClient, codec, versioner}
err := helper.SetObj("/some/key", obj) err := helper.SetObj("/some/key", obj)
if err != nil { if err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)
} }
data, err := encoding.Encode(obj) data, err := codec.Encode(obj)
if err != nil { if err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)
} }
@ -184,12 +184,12 @@ func TestSetObjWithVersion(t *testing.T) {
}, },
} }
helper := EtcdHelper{fakeClient, encoding, versioning} helper := EtcdHelper{fakeClient, codec, versioner}
err := helper.SetObj("/some/key", obj) err := helper.SetObj("/some/key", obj)
if err != nil { if err != nil {
t.Fatalf("Unexpected error %#v", err) t.Fatalf("Unexpected error %#v", err)
} }
data, err := encoding.Encode(obj) data, err := codec.Encode(obj)
if err != nil { if err != nil {
t.Fatalf("Unexpected error %#v", err) t.Fatalf("Unexpected error %#v", err)
} }
@ -200,15 +200,15 @@ func TestSetObjWithVersion(t *testing.T) {
} }
} }
func TestSetObjWithoutVersioning(t *testing.T) { func TestSetObjWithoutResourceVersioner(t *testing.T) {
obj := api.Pod{JSONBase: api.JSONBase{ID: "foo"}} obj := api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
fakeClient := MakeFakeEtcdClient(t) fakeClient := MakeFakeEtcdClient(t)
helper := EtcdHelper{fakeClient, encoding, nil} helper := EtcdHelper{fakeClient, codec, nil}
err := helper.SetObj("/some/key", obj) err := helper.SetObj("/some/key", obj)
if err != nil { if err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)
} }
data, err := encoding.Encode(obj) data, err := codec.Encode(obj)
if err != nil { if err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)
} }
@ -222,8 +222,8 @@ func TestSetObjWithoutVersioning(t *testing.T) {
func TestAtomicUpdate(t *testing.T) { func TestAtomicUpdate(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := MakeFakeEtcdClient(t)
fakeClient.TestIndex = true fakeClient.TestIndex = true
encoding := scheme codec := scheme
helper := EtcdHelper{fakeClient, encoding, api.JSONBaseVersioning{}} helper := EtcdHelper{fakeClient, codec, api.NewJSONBaseResourceVersioner()}
// Create a new node. // Create a new node.
fakeClient.ExpectNotFoundGet("/some/key") fakeClient.ExpectNotFoundGet("/some/key")
@ -234,7 +234,7 @@ func TestAtomicUpdate(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)
} }
data, err := encoding.Encode(obj) data, err := codec.Encode(obj)
if err != nil { if err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)
} }
@ -259,7 +259,7 @@ func TestAtomicUpdate(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)
} }
data, err = encoding.Encode(objUpdate) data, err = codec.Encode(objUpdate)
if err != nil { if err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)
} }
@ -277,8 +277,8 @@ func TestAtomicUpdate(t *testing.T) {
func TestAtomicUpdate_CreateCollision(t *testing.T) { func TestAtomicUpdate_CreateCollision(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := MakeFakeEtcdClient(t)
fakeClient.TestIndex = true fakeClient.TestIndex = true
encoding := scheme codec := scheme
helper := EtcdHelper{fakeClient, encoding, api.JSONBaseVersioning{}} helper := EtcdHelper{fakeClient, codec, api.NewJSONBaseResourceVersioner()}
fakeClient.ExpectNotFoundGet("/some/key") fakeClient.ExpectNotFoundGet("/some/key")
@ -317,7 +317,7 @@ func TestAtomicUpdate_CreateCollision(t *testing.T) {
// Check that stored TestResource has received all updates. // Check that stored TestResource has received all updates.
body := fakeClient.Data["/some/key"].R.Node.Value body := fakeClient.Data["/some/key"].R.Node.Value
stored := &TestResource{} stored := &TestResource{}
if err := encoding.DecodeInto([]byte(body), stored); err != nil { if err := codec.DecodeInto([]byte(body), stored); err != nil {
t.Errorf("Error decoding stored value: %v", body) t.Errorf("Error decoding stored value: %v", body)
} }
if stored.Value != concurrency { if stored.Value != concurrency {
@ -329,9 +329,9 @@ func TestWatchInterpretation_ListAdd(t *testing.T) {
w := newEtcdWatcher(true, func(interface{}) bool { w := newEtcdWatcher(true, func(interface{}) bool {
t.Errorf("unexpected filter call") t.Errorf("unexpected filter call")
return true return true
}, encoding) }, codec)
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := encoding.Encode(pod) podBytes, _ := codec.Encode(pod)
go w.sendResult(&etcd.Response{ go w.sendResult(&etcd.Response{
Action: "set", Action: "set",
@ -353,9 +353,9 @@ func TestWatchInterpretation_Delete(t *testing.T) {
w := newEtcdWatcher(true, func(interface{}) bool { w := newEtcdWatcher(true, func(interface{}) bool {
t.Errorf("unexpected filter call") t.Errorf("unexpected filter call")
return true return true
}, encoding) }, codec)
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := encoding.Encode(pod) podBytes, _ := codec.Encode(pod)
go w.sendResult(&etcd.Response{ go w.sendResult(&etcd.Response{
Action: "delete", Action: "delete",
@ -377,7 +377,7 @@ func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool { w := newEtcdWatcher(false, func(interface{}) bool {
t.Errorf("unexpected filter call") t.Errorf("unexpected filter call")
return true return true
}, encoding) }, codec)
w.emit = func(e watch.Event) { w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e) t.Errorf("Unexpected emit: %v", e)
} }
@ -391,7 +391,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool { w := newEtcdWatcher(false, func(interface{}) bool {
t.Errorf("unexpected filter call") t.Errorf("unexpected filter call")
return true return true
}, encoding) }, codec)
w.emit = func(e watch.Event) { w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e) t.Errorf("Unexpected emit: %v", e)
} }
@ -404,7 +404,7 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool { w := newEtcdWatcher(false, func(interface{}) bool {
t.Errorf("unexpected filter call") t.Errorf("unexpected filter call")
return true return true
}, encoding) }, codec)
w.emit = func(e watch.Event) { w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e) t.Errorf("Unexpected emit: %v", e)
} }
@ -418,7 +418,7 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
func TestWatch(t *testing.T) { func TestWatch(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := MakeFakeEtcdClient(t)
h := EtcdHelper{fakeClient, encoding, versioning} h := EtcdHelper{fakeClient, codec, versioner}
watching, err := h.Watch("/some/key") watching, err := h.Watch("/some/key")
if err != nil { if err != nil {
@ -429,7 +429,7 @@ func TestWatch(t *testing.T) {
// Test normal case // Test normal case
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := encoding.Encode(pod) podBytes, _ := codec.Encode(pod)
fakeClient.WatchResponse <- &etcd.Response{ fakeClient.WatchResponse <- &etcd.Response{
Action: "set", Action: "set",
Node: &etcd.Node{ Node: &etcd.Node{
@ -459,7 +459,7 @@ func TestWatch(t *testing.T) {
func TestWatchPurposefulShutdown(t *testing.T) { func TestWatchPurposefulShutdown(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := MakeFakeEtcdClient(t)
h := EtcdHelper{fakeClient, encoding, versioning} h := EtcdHelper{fakeClient, codec, versioner}
// Test purposeful shutdown // Test purposeful shutdown
watching, err := h.Watch("/some/key") watching, err := h.Watch("/some/key")