genericetcd.Etcd should test resourceVersion

Also fix that Update was returning AlreadyExists instead of
NotFound (when create is disabled) and that Update when CreateOnUpdate
was true was not populating the returned object.

Added more tests
This commit is contained in:
Clayton Coleman 2015-03-03 16:16:50 -05:00
parent 6f6218cc1e
commit 3d52aac13c
3 changed files with 268 additions and 14 deletions

View File

@ -17,6 +17,8 @@ limitations under the License.
package etcd package etcd
import ( import (
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
kubeerr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" kubeerr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd" etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd"
@ -228,6 +230,7 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
// TODO: expose TTL
creating := false creating := false
out := e.NewFunc() out := e.NewFunc()
err = e.Helper.AtomicUpdate(key, out, true, func(existing runtime.Object) (runtime.Object, error) { err = e.Helper.AtomicUpdate(key, out, true, func(existing runtime.Object) (runtime.Object, error) {
@ -237,7 +240,7 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool
} }
if version == 0 { if version == 0 {
if !e.UpdateStrategy.AllowCreateOnUpdate() { if !e.UpdateStrategy.AllowCreateOnUpdate() {
return nil, kubeerr.NewAlreadyExists(e.EndpointName, name) return nil, kubeerr.NewNotFound(e.EndpointName, name)
} }
creating = true creating = true
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil { if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
@ -245,13 +248,22 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool
} }
return obj, nil return obj, nil
} }
creating = false creating = false
newVersion, err := e.Helper.ResourceVersioner.ResourceVersion(obj)
if err != nil {
return nil, err
}
if newVersion != version {
// TODO: return the most recent version to a client?
return nil, kubeerr.NewConflict(e.EndpointName, name, fmt.Errorf("the resource was updated to %d", version))
}
if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil { if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil {
return nil, err return nil, err
} }
// TODO: expose TTL
return obj, nil return obj, nil
}) })
if err != nil { if err != nil {
if creating { if creating {
err = etcderr.InterpretCreateError(err, e.EndpointName, name) err = etcderr.InterpretCreateError(err, e.EndpointName, name)

View File

@ -32,18 +32,51 @@ import (
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
) )
type testRESTStrategy struct {
runtime.ObjectTyper
api.NameGenerator
namespaceScoped bool
allowCreateOnUpdate bool
}
func (t *testRESTStrategy) NamespaceScoped() bool { return t.namespaceScoped }
func (t *testRESTStrategy) AllowCreateOnUpdate() bool { return t.allowCreateOnUpdate }
func (t *testRESTStrategy) ResetBeforeCreate(obj runtime.Object) {}
func (t *testRESTStrategy) Validate(obj runtime.Object) errors.ValidationErrorList {
return nil
}
func (t *testRESTStrategy) ValidateUpdate(obj, old runtime.Object) errors.ValidationErrorList {
return nil
}
func hasCreated(t *testing.T, pod *api.Pod) func(runtime.Object) bool {
return func(obj runtime.Object) bool {
actualPod := obj.(*api.Pod)
if !api.Semantic.DeepDerivative(pod.Status, actualPod.Status) {
t.Errorf("not a deep derivative %#v", actualPod)
return false
}
return api.HasObjectMetaSystemFieldValues(&actualPod.ObjectMeta)
}
}
func NewTestGenericEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, *Etcd) { func NewTestGenericEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, *Etcd) {
f := tools.NewFakeEtcdClient(t) f := tools.NewFakeEtcdClient(t)
f.TestIndex = true f.TestIndex = true
h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.MetadataAccessor()}} h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.MetadataAccessor()}}
strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false}
return f, &Etcd{ return f, &Etcd{
NewFunc: func() runtime.Object { return &api.Pod{} }, NewFunc: func() runtime.Object { return &api.Pod{} },
NewListFunc: func() runtime.Object { return &api.PodList{} }, NewListFunc: func() runtime.Object { return &api.PodList{} },
EndpointName: "pods", EndpointName: "pods",
CreateStrategy: strategy,
UpdateStrategy: strategy,
KeyRootFunc: func(ctx api.Context) string { return "/registry/pods" }, KeyRootFunc: func(ctx api.Context) string { return "/registry/pods" },
KeyFunc: func(ctx api.Context, id string) (string, error) { KeyFunc: func(ctx api.Context, id string) (string, error) {
return path.Join("/registry/pods", id), nil return path.Join("/registry/pods", id), nil
}, },
ObjectNameFunc: func(obj runtime.Object) (string, error) { return obj.(*api.Pod).Name, nil },
Helper: h, Helper: h,
} }
} }
@ -153,11 +186,90 @@ func TestEtcdList(t *testing.T) {
func TestEtcdCreate(t *testing.T) { func TestEtcdCreate(t *testing.T) {
podA := &api.Pod{ podA := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"}, ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Status: api.PodStatus{Host: "machine"}, Status: api.PodStatus{Host: "machine"},
} }
podB := &api.Pod{ podB := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"}, ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Status: api.PodStatus{Host: "machine2"},
}
nodeWithPodA := tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(testapi.Codec(), podA),
ModifiedIndex: 1,
CreatedIndex: 1,
},
},
E: nil,
}
emptyNode := tools.EtcdResponseWithError{
R: &etcd.Response{},
E: tools.EtcdErrorNotFound,
}
path := "/registry/pods/foo"
table := map[string]struct {
existing tools.EtcdResponseWithError
expect tools.EtcdResponseWithError
toCreate runtime.Object
objOK func(obj runtime.Object) bool
errOK func(error) bool
}{
"normal": {
existing: emptyNode,
toCreate: podA,
objOK: hasCreated(t, podA),
errOK: func(err error) bool { return err == nil },
},
"preExisting": {
existing: nodeWithPodA,
expect: nodeWithPodA,
toCreate: podB,
errOK: errors.IsAlreadyExists,
},
}
for name, item := range table {
fakeClient, registry := NewTestGenericEtcdRegistry(t)
fakeClient.Data[path] = item.existing
obj, err := registry.Create(api.NewDefaultContext(), item.toCreate)
if !item.errOK(err) {
t.Errorf("%v: unexpected error: %v", name, err)
}
actual := fakeClient.Data[path]
if item.objOK != nil {
if !item.objOK(obj) {
t.Errorf("%v: unexpected returned: %v", name, obj)
}
actualObj, err := api.Scheme.Decode([]byte(actual.R.Node.Value))
if err != nil {
t.Errorf("unable to decode stored value for %#v", actual)
continue
}
if !item.objOK(actualObj) {
t.Errorf("%v: unexpected response: %v", name, actual)
}
} else {
if e, a := item.expect, actual; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a))
}
}
}
}
// DEPRECATED
func TestEtcdCreateWithName(t *testing.T) {
podA := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Status: api.PodStatus{Host: "machine"},
}
podB := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Status: api.PodStatus{Host: "machine2"}, Status: api.PodStatus{Host: "machine2"},
} }
@ -184,12 +296,13 @@ func TestEtcdCreate(t *testing.T) {
existing tools.EtcdResponseWithError existing tools.EtcdResponseWithError
expect tools.EtcdResponseWithError expect tools.EtcdResponseWithError
toCreate runtime.Object toCreate runtime.Object
objOK func(obj runtime.Object) bool
errOK func(error) bool errOK func(error) bool
}{ }{
"normal": { "normal": {
existing: emptyNode, existing: emptyNode,
expect: nodeWithPodA,
toCreate: podA, toCreate: podA,
objOK: hasCreated(t, podA),
errOK: func(err error) bool { return err == nil }, errOK: func(err error) bool { return err == nil },
}, },
"preExisting": { "preExisting": {
@ -203,18 +316,146 @@ func TestEtcdCreate(t *testing.T) {
for name, item := range table { for name, item := range table {
fakeClient, registry := NewTestGenericEtcdRegistry(t) fakeClient, registry := NewTestGenericEtcdRegistry(t)
fakeClient.Data[path] = item.existing fakeClient.Data[path] = item.existing
err := registry.CreateWithName(api.NewContext(), key, item.toCreate) err := registry.CreateWithName(api.NewDefaultContext(), key, item.toCreate)
if !item.errOK(err) { if !item.errOK(err) {
t.Errorf("%v: unexpected error: %v", name, err) t.Errorf("%v: unexpected error: %v", name, err)
} }
if e, a := item.expect, fakeClient.Data[path]; !api.Semantic.DeepDerivative(e, a) { actual := fakeClient.Data[path]
if item.objOK != nil {
obj, err := api.Scheme.Decode([]byte(actual.R.Node.Value))
if err != nil {
t.Errorf("unable to decode stored value for %#v", actual)
continue
}
if !item.objOK(obj) {
t.Errorf("%v: unexpected response: %v", name, actual)
}
} else {
if e, a := item.expect, actual; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a)) t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a))
} }
} }
}
} }
func TestEtcdUpdate(t *testing.T) { func TestEtcdUpdate(t *testing.T) {
podA := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Status: api.PodStatus{Host: "machine"},
}
podB := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault, ResourceVersion: "1"},
Status: api.PodStatus{Host: "machine2"},
}
nodeWithPodA := tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(testapi.Codec(), podA),
ModifiedIndex: 1,
CreatedIndex: 1,
},
},
E: nil,
}
newerNodeWithPodA := tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(testapi.Codec(), podA),
ModifiedIndex: 2,
CreatedIndex: 1,
},
},
E: nil,
}
nodeWithPodB := tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(testapi.Codec(), podB),
ModifiedIndex: 1,
CreatedIndex: 1,
},
},
E: nil,
}
emptyNode := tools.EtcdResponseWithError{
R: &etcd.Response{},
E: tools.EtcdErrorNotFound,
}
path := "/registry/pods/foo"
table := map[string]struct {
existing tools.EtcdResponseWithError
expect tools.EtcdResponseWithError
toUpdate runtime.Object
allowCreate bool
objOK func(obj runtime.Object) bool
errOK func(error) bool
}{
"normal": {
existing: nodeWithPodA,
expect: nodeWithPodB,
toUpdate: podB,
errOK: func(err error) bool { return err == nil },
},
"notExisting": {
existing: emptyNode,
expect: emptyNode,
toUpdate: podA,
errOK: func(err error) bool { return errors.IsNotFound(err) },
},
"createIfNotFound": {
existing: emptyNode,
toUpdate: podA,
allowCreate: true,
objOK: hasCreated(t, podA),
errOK: func(err error) bool { return err == nil },
},
"outOfDate": {
existing: newerNodeWithPodA,
expect: newerNodeWithPodA,
toUpdate: podB,
errOK: func(err error) bool { return errors.IsConflict(err) },
},
}
for name, item := range table {
fakeClient, registry := NewTestGenericEtcdRegistry(t)
registry.UpdateStrategy.(*testRESTStrategy).allowCreateOnUpdate = item.allowCreate
fakeClient.Data[path] = item.existing
obj, _, err := registry.Update(api.NewDefaultContext(), item.toUpdate)
if !item.errOK(err) {
t.Errorf("%v: unexpected error: %v", name, err)
}
actual := fakeClient.Data[path]
if item.objOK != nil {
if !item.objOK(obj) {
t.Errorf("%v: unexpected returned: %#v", name, obj)
}
actualObj, err := api.Scheme.Decode([]byte(actual.R.Node.Value))
if err != nil {
t.Errorf("unable to decode stored value for %#v", actual)
continue
}
if !item.objOK(actualObj) {
t.Errorf("%v: unexpected response: %#v", name, actual)
}
} else {
if e, a := item.expect, actual; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a))
}
}
}
}
// DEPRECATED
func TestEtcdUpdateWithName(t *testing.T) {
podA := &api.Pod{ podA := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"}, ObjectMeta: api.ObjectMeta{Name: "foo"},
Status: api.PodStatus{Host: "machine"}, Status: api.PodStatus{Host: "machine"},

View File

@ -398,10 +398,11 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, ignoreNo
// First time this key has been used, try creating new value. // First time this key has been used, try creating new value.
if index == 0 { if index == 0 {
_, err = h.Client.Create(key, string(data), 0) response, err := h.Client.Create(key, string(data), 0)
if IsEtcdNodeExist(err) { if IsEtcdNodeExist(err) {
continue continue
} }
_, _, err = h.extractObj(response, err, ptrToType, false, false)
return err return err
} }