Expose an Encoding/Versioning interface for use with etcd

etcd_tools.go is not dependent on the specific implementation
(which is provided by pkg/api).  All EtcdHelpers are created
with an encoding object which handles Encode/Decode/DecodeInto.
Additional tests added to verify simple atomic flows.

Begins to break up api singleton pattern.
This commit is contained in:
Clayton Coleman
2014-08-03 20:23:56 -04:00
parent 2282f9ce3a
commit 4448be2d95
7 changed files with 216 additions and 79 deletions

View File

@@ -21,7 +21,6 @@ import (
"reflect"
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/coreos/go-etcd/etcd"
@@ -42,6 +41,17 @@ var (
EtcdErrorValueRequired = &etcd.EtcdError{ErrorCode: EtcdErrorCodeValueRequired}
)
type Encoding interface {
Encode(obj interface{}) (data []byte, err error)
Decode(data []byte) (interface{}, error)
DecodeInto(data []byte, obj interface{}) error
}
type Versioning interface {
SetResourceVersion(obj interface{}, version uint64) error
ResourceVersion(obj interface{}) (uint64, error)
}
// EtcdClient is an injectable interface for testing.
type EtcdClient interface {
AddChild(key, data string, ttl uint64) (*etcd.Response, error)
@@ -65,7 +75,10 @@ type EtcdGetSet interface {
// EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client.
type EtcdHelper struct {
Client EtcdGetSet
Client EtcdGetSet
Encoding Encoding
// optional
Versioning Versioning
}
// Returns true iff err is an etcd not found error.
@@ -116,7 +129,7 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error {
v := pv.Elem()
for _, node := range nodes {
obj := reflect.New(v.Type().Elem())
err = api.DecodeInto([]byte(node.Value), obj.Interface())
err = h.Encoding.DecodeInto([]byte(node.Value), obj.Interface())
if err != nil {
return err
}
@@ -150,12 +163,10 @@ func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNot
return "", 0, fmt.Errorf("key '%v' found no nodes field: %#v", key, response)
}
body = response.Node.Value
err = api.DecodeInto([]byte(body), objPtr)
if jsonBase, err := api.FindJSONBase(objPtr); err == nil {
jsonBase.SetResourceVersion(response.Node.ModifiedIndex)
// Note that err shadows the err returned below, so we won't
// return an error just because we failed to find a JSONBase.
// This is intentional.
err = h.Encoding.DecodeInto([]byte(body), objPtr)
if h.Versioning != nil {
_ = h.Versioning.SetResourceVersion(objPtr, response.Node.ModifiedIndex)
// being unable to set the version does not prevent the object from being extracted
}
return body, response.Node.ModifiedIndex, err
}
@@ -163,13 +174,15 @@ func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNot
// SetObj marshals obj via json, and stores under key. Will do an
// atomic update if obj's ResourceVersion field is set.
func (h *EtcdHelper) SetObj(key string, obj interface{}) error {
data, err := api.Encode(obj)
data, err := h.Encoding.Encode(obj)
if err != nil {
return err
}
if jsonBase, err := api.FindJSONBaseRO(obj); err == nil && jsonBase.ResourceVersion != 0 {
_, err = h.Client.CompareAndSwap(key, string(data), 0, "", jsonBase.ResourceVersion)
return err // err is shadowed!
if h.Versioning != nil {
if version, err := h.Versioning.ResourceVersion(obj); err == nil && version != 0 {
_, err = h.Client.CompareAndSwap(key, string(data), 0, "", version)
return err // err is shadowed!
}
}
// TODO: when client supports atomic creation, integrate this with the above.
@@ -186,7 +199,7 @@ type EtcdUpdateFunc func(input interface{}) (output interface{}, err error)
//
// Example:
//
// h := &util.EtcdHelper{client}
// h := &util.EtcdHelper{client, encoding, versioning}
// err := h.AtomicUpdate("myKey", &MyType{}, func(input interface{}) (interface{}, error) {
// // Before this function is called, currentObj has been reset to etcd's current
// // contents for "myKey".
@@ -225,7 +238,7 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType interface{}, tryUpdate E
return h.SetObj(key, ret)
}
data, err := api.Encode(ret)
data, err := h.Encoding.Encode(ret)
if err != nil {
return err
}
@@ -250,7 +263,7 @@ func Everything(interface{}) bool {
// API objects, and any items passing 'filter' are sent down the returned
// watch.Interface.
func (h *EtcdHelper) WatchList(key string, filter FilterFunc) (watch.Interface, error) {
w := newEtcdWatcher(true, filter)
w := newEtcdWatcher(true, filter, h.Encoding)
go w.etcdWatch(h.Client, key)
return w, nil
}
@@ -258,13 +271,15 @@ func (h *EtcdHelper) WatchList(key string, filter FilterFunc) (watch.Interface,
// Watch begins watching the specified key. Events are decoded into
// API objects and sent down the returned watch.Interface.
func (h *EtcdHelper) Watch(key string) (watch.Interface, error) {
w := newEtcdWatcher(false, nil)
w := newEtcdWatcher(false, nil, h.Encoding)
go w.etcdWatch(h.Client, key)
return w, nil
}
// etcdWatcher converts a native etcd watch to a watch.Interface.
type etcdWatcher struct {
encoding Encoding
list bool // If we're doing a recursive watch, should be true.
filter FilterFunc
@@ -282,8 +297,9 @@ type etcdWatcher struct {
}
// Returns a new etcdWatcher; if list is true, watch sub-nodes.
func newEtcdWatcher(list bool, filter FilterFunc) *etcdWatcher {
func newEtcdWatcher(list bool, filter FilterFunc, encoding Encoding) *etcdWatcher {
w := &etcdWatcher{
encoding: encoding,
list: list,
filter: filter,
etcdIncoming: make(chan *etcd.Response),
@@ -358,7 +374,7 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) {
return
}
obj, err := api.Decode(data)
obj, err := w.encoding.Decode(data)
if err != nil {
glog.Errorf("failure to decode api object: '%v' from %#v", string(data), res)
// TODO: expose an error through watch.Interface?

View File

@@ -23,12 +23,13 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/coreos/go-etcd/etcd"
)
type fakeEtcdGetSet struct {
type fakeClientGetSet struct {
get func(key string, sort, recursive bool) (*etcd.Response, error)
set func(key, value string, ttl uint64) (*etcd.Response, error)
}
@@ -38,9 +39,15 @@ type TestResource struct {
Value int `json:"value" yaml:"value,omitempty"`
}
var scheme *conversion.Scheme
var encoding = api.Encoding
var versioning = api.Versioning
func init() {
api.AddKnownTypes("", TestResource{})
api.AddKnownTypes("v1beta1", TestResource{})
scheme = conversion.NewScheme()
scheme.ExternalVersion = "v1beta1"
scheme.AddKnownTypes("", TestResource{})
scheme.AddKnownTypes("v1beta1", TestResource{})
}
func TestIsNotFoundErr(t *testing.T) {
@@ -80,7 +87,7 @@ func TestExtractList(t *testing.T) {
{JSONBase: api.JSONBase{ID: "baz"}},
}
var got []api.Pod
helper := EtcdHelper{fakeClient}
helper := EtcdHelper{fakeClient, encoding, versioning}
err := helper.ExtractList("/some/key", &got)
if err != nil {
t.Errorf("Unexpected error %#v", err)
@@ -94,7 +101,7 @@ func TestExtractObj(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
expect := api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
fakeClient.Set("/some/key", util.MakeJSONString(expect), 0)
helper := EtcdHelper{fakeClient}
helper := EtcdHelper{fakeClient, encoding, versioning}
var got api.Pod
err := helper.ExtractObj("/some/key", &got, false)
if err != nil {
@@ -127,7 +134,7 @@ func TestExtractObjNotFoundErr(t *testing.T) {
},
},
}
helper := EtcdHelper{fakeClient}
helper := EtcdHelper{fakeClient, encoding, versioning}
try := func(key string) {
var got api.Pod
err := helper.ExtractObj(key, &got, false)
@@ -148,12 +155,60 @@ func TestExtractObjNotFoundErr(t *testing.T) {
func TestSetObj(t *testing.T) {
obj := api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
fakeClient := MakeFakeEtcdClient(t)
helper := EtcdHelper{fakeClient}
helper := EtcdHelper{fakeClient, encoding, versioning}
err := helper.SetObj("/some/key", obj)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
data, err := api.Encode(obj)
data, err := encoding.Encode(obj)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
expect := string(data)
got := fakeClient.Data["/some/key"].R.Node.Value
if expect != got {
t.Errorf("Wanted %v, got %v", expect, got)
}
}
func TestSetObjWithVersion(t *testing.T) {
obj := api.Pod{JSONBase: api.JSONBase{ID: "foo", ResourceVersion: 1}}
fakeClient := MakeFakeEtcdClient(t)
fakeClient.TestIndex = true
fakeClient.Data["/some/key"] = EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: api.EncodeOrDie(obj),
ModifiedIndex: 1,
},
},
}
helper := EtcdHelper{fakeClient, encoding, versioning}
err := helper.SetObj("/some/key", obj)
if err != nil {
t.Fatalf("Unexpected error %#v", err)
}
data, err := encoding.Encode(obj)
if err != nil {
t.Fatalf("Unexpected error %#v", err)
}
expect := string(data)
got := fakeClient.Data["/some/key"].R.Node.Value
if expect != got {
t.Errorf("Wanted %v, got %v", expect, got)
}
}
func TestSetObjWithoutVersioning(t *testing.T) {
obj := api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
fakeClient := MakeFakeEtcdClient(t)
helper := EtcdHelper{fakeClient, encoding, nil}
err := helper.SetObj("/some/key", obj)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
data, err := encoding.Encode(obj)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
@@ -167,7 +222,8 @@ func TestSetObj(t *testing.T) {
func TestAtomicUpdate(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
fakeClient.TestIndex = true
helper := EtcdHelper{fakeClient}
encoding := scheme
helper := EtcdHelper{fakeClient, encoding, api.JSONBaseVersioning{}}
// Create a new node.
fakeClient.ExpectNotFoundGet("/some/key")
@@ -178,7 +234,7 @@ func TestAtomicUpdate(t *testing.T) {
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
data, err := api.Encode(obj)
data, err := encoding.Encode(obj)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
@@ -204,7 +260,7 @@ func TestAtomicUpdate(t *testing.T) {
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
data, err = api.Encode(objUpdate)
data, err = encoding.Encode(objUpdate)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
@@ -223,9 +279,9 @@ func TestWatchInterpretation_ListAdd(t *testing.T) {
w := newEtcdWatcher(true, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
})
}, encoding)
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := api.Encode(pod)
podBytes, _ := encoding.Encode(pod)
go w.sendResult(&etcd.Response{
Action: "set",
@@ -247,9 +303,9 @@ func TestWatchInterpretation_Delete(t *testing.T) {
w := newEtcdWatcher(true, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
})
}, encoding)
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := api.Encode(pod)
podBytes, _ := encoding.Encode(pod)
go w.sendResult(&etcd.Response{
Action: "delete",
@@ -271,7 +327,7 @@ func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
})
}, encoding)
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
@@ -285,7 +341,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
})
}, encoding)
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
@@ -298,7 +354,7 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
})
}, encoding)
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
@@ -311,20 +367,20 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
}
func TestWatch(t *testing.T) {
fakeEtcd := MakeFakeEtcdClient(t)
h := EtcdHelper{fakeEtcd}
fakeClient := MakeFakeEtcdClient(t)
h := EtcdHelper{fakeClient, encoding, versioning}
watching, err := h.Watch("/some/key")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeEtcd.WaitForWatchCompletion()
fakeClient.WaitForWatchCompletion()
// Test normal case
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := api.Encode(pod)
fakeEtcd.WatchResponse <- &etcd.Response{
podBytes, _ := encoding.Encode(pod)
fakeClient.WatchResponse <- &etcd.Response{
Action: "set",
Node: &etcd.Node{
Value: string(podBytes),
@@ -344,10 +400,10 @@ func TestWatch(t *testing.T) {
}
// Test error case
fakeEtcd.WatchInjectError <- fmt.Errorf("Injected error")
fakeClient.WatchInjectError <- fmt.Errorf("Injected error")
// Did everything shut down?
if _, open := <-fakeEtcd.WatchResponse; open {
if _, open := <-fakeClient.WatchResponse; open {
t.Errorf("An injected error did not cause a graceful shutdown")
}
if _, open := <-watching.ResultChan(); open {
@@ -356,19 +412,19 @@ func TestWatch(t *testing.T) {
}
func TestWatchPurposefulShutdown(t *testing.T) {
fakeEtcd := MakeFakeEtcdClient(t)
h := EtcdHelper{fakeEtcd}
fakeClient := MakeFakeEtcdClient(t)
h := EtcdHelper{fakeClient, encoding, versioning}
// Test purposeful shutdown
watching, err := h.Watch("/some/key")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeEtcd.WaitForWatchCompletion()
fakeClient.WaitForWatchCompletion()
watching.Stop()
// Did everything shut down?
if _, open := <-fakeEtcd.WatchResponse; open {
if _, open := <-fakeClient.WatchResponse; open {
t.Errorf("A stop did not cause a graceful shutdown")
}
if _, open := <-watching.ResultChan(); open {