Merge pull request #21310 from wojtek-t/require_versioner

Require versioner in etcdHelper to be non-null.
This commit is contained in:
Fabio Yeon
2016-02-26 15:44:59 -08:00
3 changed files with 46 additions and 78 deletions

View File

@@ -97,7 +97,9 @@ type etcdHelper struct {
client etcd.KeysAPI
codec runtime.Codec
copier runtime.ObjectCopier
// optional, has to be set to perform any atomic operations
// Note that versioner is required for etcdHelper to work correctly.
// The public constructors (NewStorage & NewEtcdStorage) are setting it
// correctly, so be careful when manipulating with it manually.
versioner storage.Versioner
// prefix for all etcd keys
pathPrefix string
@@ -159,10 +161,8 @@ func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Ob
if err != nil {
return err
}
if h.versioner != nil {
if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion may not be set on objects to be created")
}
if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion may not be set on objects to be created")
}
trace.Step("Version checked")
@@ -193,21 +193,16 @@ func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Objec
}
version := uint64(0)
if h.versioner != nil {
var err error
if version, err = h.versioner.ObjectResourceVersion(obj); err != nil {
return errors.New("couldn't get resourceVersion from object")
}
if version != 0 {
// We cannot store object with resourceVersion in etcd, we need to clear it here.
if err := h.versioner.UpdateObject(obj, nil, 0); err != nil {
return errors.New("resourceVersion cannot be set on objects store in etcd")
}
var err error
if version, err = h.versioner.ObjectResourceVersion(obj); err != nil {
return errors.New("couldn't get resourceVersion from object")
}
if version != 0 {
// We cannot store object with resourceVersion in etcd, we need to clear it here.
if err := h.versioner.UpdateObject(obj, nil, 0); err != nil {
return errors.New("resourceVersion cannot be set on objects store in etcd")
}
}
// TODO: If versioner is nil, then we may end up with having ResourceVersion set
// in the object and this will be incorrect ResourceVersion. We should fix it by
// requiring "versioner != nil" at the constructor level for 1.3 milestone.
var response *etcd.Response
data, err := runtime.Encode(h.codec, obj)
@@ -217,19 +212,17 @@ func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Objec
key = h.prefixEtcdKey(key)
create := true
if h.versioner != nil {
if version != 0 {
create = false
startTime := time.Now()
opts := etcd.SetOptions{
TTL: time.Duration(ttl) * time.Second,
PrevIndex: version,
}
response, err = h.client.Set(ctx, key, string(data), &opts)
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime)
if err != nil {
return err
}
if version != 0 {
create = false
startTime := time.Now()
opts := etcd.SetOptions{
TTL: time.Duration(ttl) * time.Second,
PrevIndex: version,
}
response, err = h.client.Set(ctx, key, string(data), &opts)
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime)
if err != nil {
return err
}
}
if create {
@@ -369,10 +362,8 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
if out != objPtr {
return body, nil, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr))
}
if h.versioner != nil {
_ = h.versioner.UpdateObject(objPtr, node.Expiration, node.ModifiedIndex)
// being unable to set the version does not prevent the object from being extracted
}
// being unable to set the version does not prevent the object from being extracted
_ = h.versioner.UpdateObject(objPtr, node.Expiration, node.ModifiedIndex)
return body, node, err
}
@@ -411,10 +402,8 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.F
return err
}
trace.Step("Object decoded")
if h.versioner != nil {
if err := h.versioner.UpdateList(listObj, response.Index); err != nil {
return err
}
if err := h.versioner.UpdateList(listObj, response.Index); err != nil {
return err
}
return nil
}
@@ -447,10 +436,8 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun
if err != nil {
return err
}
if h.versioner != nil {
// being unable to set the version does not prevent the object from being extracted
_ = h.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex)
}
// being unable to set the version does not prevent the object from being extracted
_ = h.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex)
if filter(obj) {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
@@ -487,10 +474,8 @@ func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion strin
return err
}
trace.Step("Node list decoded")
if h.versioner != nil {
if err := h.versioner.UpdateList(listObj, index); err != nil {
return err
}
if err := h.versioner.UpdateList(listObj, index); err != nil {
return err
}
return nil
}
@@ -574,14 +559,9 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType
}
// Since update object may have a resourceVersion set, we need to clear it here.
if h.versioner != nil {
if err := h.versioner.UpdateObject(ret, meta.Expiration, 0); err != nil {
return errors.New("resourceVersion cannot be set on objects store in etcd")
}
if err := h.versioner.UpdateObject(ret, meta.Expiration, 0); err != nil {
return errors.New("resourceVersion cannot be set on objects store in etcd")
}
// TODO: If versioner is nil, then we may end up with having ResourceVersion set
// in the object and this will be incorrect ResourceVersion. We should fix it by
// requiring "versioner != nil" at the constructor level for 1.3 milestone.
data, err := runtime.Encode(h.codec, ret)
if err != nil {

View File

@@ -352,22 +352,6 @@ func TestSetWithVersion(t *testing.T) {
}
}
func TestSetWithoutResourceVersioner(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix())
helper.versioner = nil
returnedObj := &api.Pod{}
err := helper.Set(context.TODO(), "/some/key", obj, returnedObj, 3)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
if returnedObj.ResourceVersion != "" {
t.Errorf("Resource revision should not be set on returned objects")
}
}
func TestSetNilOutParam(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
server := etcdtesting.NewEtcdTestClientServer(t)

View File

@@ -77,7 +77,10 @@ func exceptKey(except string) includeFunc {
// etcdWatcher converts a native etcd watch to a watch.Interface.
type etcdWatcher struct {
encoding runtime.Codec
encoding runtime.Codec
// Note that versioner is required for etcdWatcher to work correctly.
// There is no public constructor of it, so be careful when manipulating
// with it manually.
versioner storage.Versioner
transform TransformFunc
@@ -108,9 +111,12 @@ type etcdWatcher struct {
// watchWaitDuration is the amount of time to wait for an error from watch.
const watchWaitDuration = 100 * time.Millisecond
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
// and a versioner, the versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(list bool, quorum bool, include includeFunc, filter storage.FilterFunc, encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc, cache etcdCache) *etcdWatcher {
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes.
// The versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(
list bool, quorum bool, include includeFunc, filter storage.FilterFunc,
encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc,
cache etcdCache) *etcdWatcher {
w := &etcdWatcher{
encoding: encoding,
versioner: versioner,
@@ -310,10 +316,8 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
}
// ensure resource version is set on the object we load from etcd
if w.versioner != nil {
if err := w.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err))
}
if err := w.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err))
}
// perform any necessary transformation