Make generic etcd more powerful and return operations from etcd

When we complete an operation, etcd usually provides a response
object.  Return that object up instead of querying etcd twice.
This commit is contained in:
Clayton Coleman
2015-02-11 18:35:05 -05:00
parent 78385b1230
commit 23d199ded9
8 changed files with 350 additions and 57 deletions

View File

@@ -236,7 +236,19 @@ func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr runtime.Object, ignore
if err != nil && !IsEtcdNotFound(err) {
return "", 0, err
}
if err != nil || response.Node == nil || len(response.Node.Value) == 0 {
return h.extractObj(response, err, objPtr, ignoreNotFound, false)
}
func (h *EtcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, modifiedIndex uint64, err error) {
var node *etcd.Node
if response != nil {
if prevNode {
node = response.PrevNode
} else {
node = response.Node
}
}
if inErr != nil || node == nil || len(node.Value) == 0 {
if ignoreNotFound {
v, err := conversion.EnforcePtr(objPtr)
if err != nil {
@@ -244,18 +256,18 @@ func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr runtime.Object, ignore
}
v.Set(reflect.Zero(v.Type()))
return "", 0, nil
} else if err != nil {
return "", 0, err
} else if inErr != nil {
return "", 0, inErr
}
return "", 0, fmt.Errorf("key '%v' found no nodes field: %#v", key, response)
return "", 0, fmt.Errorf("unable to locate a value on the response: %#v", response)
}
body = response.Node.Value
body = node.Value
err = h.Codec.DecodeInto([]byte(body), objPtr)
if h.ResourceVersioner != nil {
_ = h.ResourceVersioner.SetResourceVersion(objPtr, response.Node.ModifiedIndex)
_ = h.ResourceVersioner.SetResourceVersion(objPtr, node.ModifiedIndex)
// being unable to set the version does not prevent the object from being extracted
}
return body, response.Node.ModifiedIndex, err
return body, node.ModifiedIndex, err
}
// CreateObj adds a new object at a key unless it already exists. 'ttl' is time-to-live in seconds,
@@ -275,12 +287,50 @@ func (h *EtcdHelper) CreateObj(key string, obj runtime.Object, ttl uint64) error
return err
}
// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live in seconds,
// and 0 means forever. If no error is returned, out will be set to the read value from etcd.
func (h *EtcdHelper) Create(key string, obj, out runtime.Object, ttl uint64) error {
data, err := h.Codec.Encode(obj)
if err != nil {
return err
}
if h.ResourceVersioner != nil {
if version, err := h.ResourceVersioner.ResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion may not be set on objects to be created")
}
}
response, err := h.Client.Create(key, string(data), ttl)
if err != nil {
return err
}
if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer")
}
_, _, err = h.extractObj(response, err, out, false, false)
return err
}
// Delete removes the specified key.
func (h *EtcdHelper) Delete(key string, recursive bool) error {
_, err := h.Client.Delete(key, recursive)
return err
}
// DeleteObj removes the specified key and returns the value that existed at that spot.
func (h *EtcdHelper) DeleteObj(key string, out runtime.Object) error {
if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer")
}
response, err := h.Client.Delete(key, false)
if !IsEtcdNotFound(err) {
// if the object that existed prior to the delete is returned by etcd, update out.
if err != nil || response.PrevNode != nil {
_, _, err = h.extractObj(response, err, out, false, true)
}
}
return err
}
// SetObj marshals obj via json, and stores under key. Will do an atomic update if obj's ResourceVersion
// field is set. 'ttl' is time-to-live in seconds, and 0 means forever.
func (h *EtcdHelper) SetObj(key string, obj runtime.Object, ttl uint64) error {
@@ -359,10 +409,11 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, ignoreNo
return nil
}
_, err = h.Client.CompareAndSwap(key, string(data), 0, origBody, index)
response, err := h.Client.CompareAndSwap(key, string(data), 0, origBody, index)
if IsEtcdTestFailed(err) {
continue
}
_, _, err = h.extractObj(response, err, ptrToType, false, false)
return err
}
}

View File

@@ -520,13 +520,13 @@ func TestAtomicUpdateNoChange(t *testing.T) {
// Update an existing node with the same data
callbackCalled := false
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
fakeClient.Err = errors.New("should not be called")
err = helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) {
fakeClient.Err = errors.New("should not be called")
callbackCalled = true
return objUpdate, nil
})
if err != nil {
t.Errorf("Unexpected error %#v", err)
t.Fatalf("Unexpected error %#v", err)
}
if !callbackCalled {
t.Errorf("tryUpdate callback should have been called.")

View File

@@ -34,6 +34,7 @@ type EtcdResponseWithError struct {
// TestLogger is a type passed to Test functions to support formatted test logs.
type TestLogger interface {
Fatalf(format string, args ...interface{})
Errorf(format string, args ...interface{})
Logf(format string, args ...interface{})
}
@@ -85,6 +86,10 @@ func NewFakeEtcdClient(t TestLogger) *FakeEtcdClient {
return ret
}
func (f *FakeEtcdClient) SetError(err error) {
f.Err = err
}
func (f *FakeEtcdClient) GetCluster() []string {
return f.Machines
}
@@ -93,6 +98,13 @@ func (f *FakeEtcdClient) ExpectNotFoundGet(key string) {
f.expectNotFoundGetSet[key] = struct{}{}
}
func (f *FakeEtcdClient) NewError(code int) *etcd.EtcdError {
return &etcd.EtcdError{
ErrorCode: code,
Index: f.ChangeIndex,
}
}
func (f *FakeEtcdClient) generateIndex() uint64 {
if !f.TestIndex {
return 0
@@ -121,6 +133,10 @@ func (f *FakeEtcdClient) AddChild(key, data string, ttl uint64) (*etcd.Response,
}
func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response, error) {
if f.Err != nil {
return nil, f.Err
}
f.Mutex.Lock()
defer f.Mutex.Unlock()
defer f.updateResponse(key)
@@ -128,9 +144,9 @@ func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response,
result := f.Data[key]
if result.R == nil {
if _, ok := f.expectNotFoundGetSet[key]; !ok {
f.t.Errorf("Unexpected get for %s", key)
f.t.Fatalf("data for %s was not defined prior to invoking Get", key)
}
return &etcd.Response{}, EtcdErrorNotFound
return &etcd.Response{}, f.NewError(EtcdErrorCodeNotFound)
}
f.t.Logf("returning %v: %#v %#v", key, result.R, result.E)
@@ -166,7 +182,7 @@ func (f *FakeEtcdClient) setLocked(key, value string, ttl uint64) (*etcd.Respons
if f.nodeExists(key) {
prevResult := f.Data[key]
createdIndex := prevResult.R.Node.CreatedIndex
f.t.Logf("updating %v, index %v -> %v", key, createdIndex, i)
f.t.Logf("updating %v, index %v -> %v (ttl: %d)", key, createdIndex, i, ttl)
result := EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
@@ -181,7 +197,7 @@ func (f *FakeEtcdClient) setLocked(key, value string, ttl uint64) (*etcd.Respons
return result.R, nil
}
f.t.Logf("creating %v, index %v", key, i)
f.t.Logf("creating %v, index %v (ttl: %d)", key, i, ttl)
result := EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
@@ -262,15 +278,27 @@ func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, err
f.Mutex.Lock()
defer f.Mutex.Unlock()
existing := f.Data[key]
index := f.generateIndex()
f.Data[key] = EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
R: &etcd.Response{},
E: &etcd.EtcdError{
ErrorCode: EtcdErrorCodeNotFound,
Index: index,
},
E: EtcdErrorNotFound,
}
res := &etcd.Response{
Action: "delete",
Node: nil,
PrevNode: nil,
EtcdIndex: index,
}
if existing.R != nil && existing.R.Node != nil {
res.PrevNode = existing.R.Node
}
f.DeletedKeys = append(f.DeletedKeys, key)
return &etcd.Response{}, nil
return res, nil
}
func (f *FakeEtcdClient) WaitForWatchCompletion() {