diff --git a/pkg/storage/etcd/etcd_helper.go b/pkg/storage/etcd/etcd_helper.go index 191c4bd6197..7624a03d7f8 100644 --- a/pkg/storage/etcd/etcd_helper.go +++ b/pkg/storage/etcd/etcd_helper.go @@ -97,7 +97,9 @@ type etcdHelper struct { client etcd.KeysAPI codec runtime.Codec copier runtime.ObjectCopier - // optional, has to be set to perform any atomic operations + // Note that versioner is required for etcdHelper to work correctly. + // The public constructors (NewStorage & NewEtcdStorage) are setting it + // correctly, so be careful when manipulating with it manually. versioner storage.Versioner // prefix for all etcd keys pathPrefix string @@ -159,10 +161,8 @@ func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Ob if err != nil { return err } - if h.versioner != nil { - if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 { - return errors.New("resourceVersion may not be set on objects to be created") - } + if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 { + return errors.New("resourceVersion may not be set on objects to be created") } trace.Step("Version checked") @@ -193,21 +193,16 @@ func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Objec } version := uint64(0) - if h.versioner != nil { - var err error - if version, err = h.versioner.ObjectResourceVersion(obj); err != nil { - return errors.New("couldn't get resourceVersion from object") - } - if version != 0 { - // We cannot store object with resourceVersion in etcd, we need to clear it here. - if err := h.versioner.UpdateObject(obj, nil, 0); err != nil { - return errors.New("resourceVersion cannot be set on objects store in etcd") - } + var err error + if version, err = h.versioner.ObjectResourceVersion(obj); err != nil { + return errors.New("couldn't get resourceVersion from object") + } + if version != 0 { + // We cannot store object with resourceVersion in etcd, we need to clear it here. + if err := h.versioner.UpdateObject(obj, nil, 0); err != nil { + return errors.New("resourceVersion cannot be set on objects store in etcd") } } - // TODO: If versioner is nil, then we may end up with having ResourceVersion set - // in the object and this will be incorrect ResourceVersion. We should fix it by - // requiring "versioner != nil" at the constructor level for 1.3 milestone. var response *etcd.Response data, err := runtime.Encode(h.codec, obj) @@ -217,19 +212,17 @@ func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Objec key = h.prefixEtcdKey(key) create := true - if h.versioner != nil { - if version != 0 { - create = false - startTime := time.Now() - opts := etcd.SetOptions{ - TTL: time.Duration(ttl) * time.Second, - PrevIndex: version, - } - response, err = h.client.Set(ctx, key, string(data), &opts) - metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime) - if err != nil { - return err - } + if version != 0 { + create = false + startTime := time.Now() + opts := etcd.SetOptions{ + TTL: time.Duration(ttl) * time.Second, + PrevIndex: version, + } + response, err = h.client.Set(ctx, key, string(data), &opts) + metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime) + if err != nil { + return err } } if create { @@ -372,10 +365,8 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run if out != objPtr { return body, nil, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr)) } - if h.versioner != nil { - _ = h.versioner.UpdateObject(objPtr, node.Expiration, node.ModifiedIndex) - // being unable to set the version does not prevent the object from being extracted - } + // being unable to set the version does not prevent the object from being extracted + _ = h.versioner.UpdateObject(objPtr, node.Expiration, node.ModifiedIndex) return body, node, err } @@ -414,10 +405,8 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.F return err } trace.Step("Object decoded") - if h.versioner != nil { - if err := h.versioner.UpdateList(listObj, response.Index); err != nil { - return err - } + if err := h.versioner.UpdateList(listObj, response.Index); err != nil { + return err } return nil } @@ -450,10 +439,8 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun if err != nil { return err } - if h.versioner != nil { - // being unable to set the version does not prevent the object from being extracted - _ = h.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex) - } + // being unable to set the version does not prevent the object from being extracted + _ = h.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex) if filter(obj) { v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) } @@ -490,10 +477,8 @@ func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion strin return err } trace.Step("Node list decoded") - if h.versioner != nil { - if err := h.versioner.UpdateList(listObj, index); err != nil { - return err - } + if err := h.versioner.UpdateList(listObj, index); err != nil { + return err } return nil } @@ -577,14 +562,9 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType } // Since update object may have a resourceVersion set, we need to clear it here. - if h.versioner != nil { - if err := h.versioner.UpdateObject(ret, meta.Expiration, 0); err != nil { - return errors.New("resourceVersion cannot be set on objects store in etcd") - } + if err := h.versioner.UpdateObject(ret, meta.Expiration, 0); err != nil { + return errors.New("resourceVersion cannot be set on objects store in etcd") } - // TODO: If versioner is nil, then we may end up with having ResourceVersion set - // in the object and this will be incorrect ResourceVersion. We should fix it by - // requiring "versioner != nil" at the constructor level for 1.3 milestone. data, err := runtime.Encode(h.codec, ret) if err != nil { diff --git a/pkg/storage/etcd/etcd_helper_test.go b/pkg/storage/etcd/etcd_helper_test.go index 7a892b34034..5b7d921c798 100644 --- a/pkg/storage/etcd/etcd_helper_test.go +++ b/pkg/storage/etcd/etcd_helper_test.go @@ -352,22 +352,6 @@ func TestSetWithVersion(t *testing.T) { } } -func TestSetWithoutResourceVersioner(t *testing.T) { - obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} - server := etcdtesting.NewEtcdTestClientServer(t) - defer server.Terminate(t) - helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()) - helper.versioner = nil - returnedObj := &api.Pod{} - err := helper.Set(context.TODO(), "/some/key", obj, returnedObj, 3) - if err != nil { - t.Errorf("Unexpected error %#v", err) - } - if returnedObj.ResourceVersion != "" { - t.Errorf("Resource revision should not be set on returned objects") - } -} - func TestSetNilOutParam(t *testing.T) { obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} server := etcdtesting.NewEtcdTestClientServer(t) diff --git a/pkg/storage/etcd/etcd_watcher.go b/pkg/storage/etcd/etcd_watcher.go index c54e4b3ac19..d9bfd3722ff 100644 --- a/pkg/storage/etcd/etcd_watcher.go +++ b/pkg/storage/etcd/etcd_watcher.go @@ -77,7 +77,10 @@ func exceptKey(except string) includeFunc { // etcdWatcher converts a native etcd watch to a watch.Interface. type etcdWatcher struct { - encoding runtime.Codec + encoding runtime.Codec + // Note that versioner is required for etcdWatcher to work correctly. + // There is no public constructor of it, so be careful when manipulating + // with it manually. versioner storage.Versioner transform TransformFunc @@ -108,9 +111,12 @@ type etcdWatcher struct { // watchWaitDuration is the amount of time to wait for an error from watch. const watchWaitDuration = 100 * time.Millisecond -// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform -// and a versioner, the versioner must be able to handle the objects that transform creates. -func newEtcdWatcher(list bool, quorum bool, include includeFunc, filter storage.FilterFunc, encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc, cache etcdCache) *etcdWatcher { +// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. +// The versioner must be able to handle the objects that transform creates. +func newEtcdWatcher( + list bool, quorum bool, include includeFunc, filter storage.FilterFunc, + encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc, + cache etcdCache) *etcdWatcher { w := &etcdWatcher{ encoding: encoding, versioner: versioner, @@ -310,10 +316,8 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) { } // ensure resource version is set on the object we load from etcd - if w.versioner != nil { - if err := w.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex); err != nil { - utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err)) - } + if err := w.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex); err != nil { + utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err)) } // perform any necessary transformation