mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-09 03:57:41 +00:00
Require versioner in etcdHelper to be non-null.
This commit is contained in:
parent
6e2e39864c
commit
35a3394a0a
@ -97,7 +97,9 @@ type etcdHelper struct {
|
|||||||
client etcd.KeysAPI
|
client etcd.KeysAPI
|
||||||
codec runtime.Codec
|
codec runtime.Codec
|
||||||
copier runtime.ObjectCopier
|
copier runtime.ObjectCopier
|
||||||
// optional, has to be set to perform any atomic operations
|
// Note that versioner is required for etcdHelper to work correctly.
|
||||||
|
// The public constructors (NewStorage & NewEtcdStorage) are setting it
|
||||||
|
// correctly, so be careful when manipulating with it manually.
|
||||||
versioner storage.Versioner
|
versioner storage.Versioner
|
||||||
// prefix for all etcd keys
|
// prefix for all etcd keys
|
||||||
pathPrefix string
|
pathPrefix string
|
||||||
@ -159,10 +161,8 @@ func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Ob
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if h.versioner != nil {
|
if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
|
||||||
if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
|
return errors.New("resourceVersion may not be set on objects to be created")
|
||||||
return errors.New("resourceVersion may not be set on objects to be created")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
trace.Step("Version checked")
|
trace.Step("Version checked")
|
||||||
|
|
||||||
@ -193,21 +193,16 @@ func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Objec
|
|||||||
}
|
}
|
||||||
|
|
||||||
version := uint64(0)
|
version := uint64(0)
|
||||||
if h.versioner != nil {
|
var err error
|
||||||
var err error
|
if version, err = h.versioner.ObjectResourceVersion(obj); err != nil {
|
||||||
if version, err = h.versioner.ObjectResourceVersion(obj); err != nil {
|
return errors.New("couldn't get resourceVersion from object")
|
||||||
return errors.New("couldn't get resourceVersion from object")
|
}
|
||||||
}
|
if version != 0 {
|
||||||
if version != 0 {
|
// We cannot store object with resourceVersion in etcd, we need to clear it here.
|
||||||
// We cannot store object with resourceVersion in etcd, we need to clear it here.
|
if err := h.versioner.UpdateObject(obj, nil, 0); err != nil {
|
||||||
if err := h.versioner.UpdateObject(obj, nil, 0); err != nil {
|
return errors.New("resourceVersion cannot be set on objects store in etcd")
|
||||||
return errors.New("resourceVersion cannot be set on objects store in etcd")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// TODO: If versioner is nil, then we may end up with having ResourceVersion set
|
|
||||||
// in the object and this will be incorrect ResourceVersion. We should fix it by
|
|
||||||
// requiring "versioner != nil" at the constructor level for 1.3 milestone.
|
|
||||||
|
|
||||||
var response *etcd.Response
|
var response *etcd.Response
|
||||||
data, err := runtime.Encode(h.codec, obj)
|
data, err := runtime.Encode(h.codec, obj)
|
||||||
@ -217,19 +212,17 @@ func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Objec
|
|||||||
key = h.prefixEtcdKey(key)
|
key = h.prefixEtcdKey(key)
|
||||||
|
|
||||||
create := true
|
create := true
|
||||||
if h.versioner != nil {
|
if version != 0 {
|
||||||
if version != 0 {
|
create = false
|
||||||
create = false
|
startTime := time.Now()
|
||||||
startTime := time.Now()
|
opts := etcd.SetOptions{
|
||||||
opts := etcd.SetOptions{
|
TTL: time.Duration(ttl) * time.Second,
|
||||||
TTL: time.Duration(ttl) * time.Second,
|
PrevIndex: version,
|
||||||
PrevIndex: version,
|
}
|
||||||
}
|
response, err = h.client.Set(ctx, key, string(data), &opts)
|
||||||
response, err = h.client.Set(ctx, key, string(data), &opts)
|
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime)
|
||||||
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime)
|
if err != nil {
|
||||||
if err != nil {
|
return err
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if create {
|
if create {
|
||||||
@ -372,10 +365,8 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
|
|||||||
if out != objPtr {
|
if out != objPtr {
|
||||||
return body, nil, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr))
|
return body, nil, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr))
|
||||||
}
|
}
|
||||||
if h.versioner != nil {
|
// being unable to set the version does not prevent the object from being extracted
|
||||||
_ = h.versioner.UpdateObject(objPtr, node.Expiration, node.ModifiedIndex)
|
_ = h.versioner.UpdateObject(objPtr, node.Expiration, node.ModifiedIndex)
|
||||||
// being unable to set the version does not prevent the object from being extracted
|
|
||||||
}
|
|
||||||
return body, node, err
|
return body, node, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -414,10 +405,8 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.F
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
trace.Step("Object decoded")
|
trace.Step("Object decoded")
|
||||||
if h.versioner != nil {
|
if err := h.versioner.UpdateList(listObj, response.Index); err != nil {
|
||||||
if err := h.versioner.UpdateList(listObj, response.Index); err != nil {
|
return err
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -450,10 +439,8 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if h.versioner != nil {
|
// being unable to set the version does not prevent the object from being extracted
|
||||||
// being unable to set the version does not prevent the object from being extracted
|
_ = h.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex)
|
||||||
_ = h.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex)
|
|
||||||
}
|
|
||||||
if filter(obj) {
|
if filter(obj) {
|
||||||
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
|
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
|
||||||
}
|
}
|
||||||
@ -490,10 +477,8 @@ func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion strin
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
trace.Step("Node list decoded")
|
trace.Step("Node list decoded")
|
||||||
if h.versioner != nil {
|
if err := h.versioner.UpdateList(listObj, index); err != nil {
|
||||||
if err := h.versioner.UpdateList(listObj, index); err != nil {
|
return err
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -577,14 +562,9 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Since update object may have a resourceVersion set, we need to clear it here.
|
// Since update object may have a resourceVersion set, we need to clear it here.
|
||||||
if h.versioner != nil {
|
if err := h.versioner.UpdateObject(ret, meta.Expiration, 0); err != nil {
|
||||||
if err := h.versioner.UpdateObject(ret, meta.Expiration, 0); err != nil {
|
return errors.New("resourceVersion cannot be set on objects store in etcd")
|
||||||
return errors.New("resourceVersion cannot be set on objects store in etcd")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// TODO: If versioner is nil, then we may end up with having ResourceVersion set
|
|
||||||
// in the object and this will be incorrect ResourceVersion. We should fix it by
|
|
||||||
// requiring "versioner != nil" at the constructor level for 1.3 milestone.
|
|
||||||
|
|
||||||
data, err := runtime.Encode(h.codec, ret)
|
data, err := runtime.Encode(h.codec, ret)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -352,22 +352,6 @@ func TestSetWithVersion(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSetWithoutResourceVersioner(t *testing.T) {
|
|
||||||
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
|
||||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
|
||||||
defer server.Terminate(t)
|
|
||||||
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix())
|
|
||||||
helper.versioner = nil
|
|
||||||
returnedObj := &api.Pod{}
|
|
||||||
err := helper.Set(context.TODO(), "/some/key", obj, returnedObj, 3)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Unexpected error %#v", err)
|
|
||||||
}
|
|
||||||
if returnedObj.ResourceVersion != "" {
|
|
||||||
t.Errorf("Resource revision should not be set on returned objects")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSetNilOutParam(t *testing.T) {
|
func TestSetNilOutParam(t *testing.T) {
|
||||||
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
||||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||||
|
@ -77,7 +77,10 @@ func exceptKey(except string) includeFunc {
|
|||||||
|
|
||||||
// etcdWatcher converts a native etcd watch to a watch.Interface.
|
// etcdWatcher converts a native etcd watch to a watch.Interface.
|
||||||
type etcdWatcher struct {
|
type etcdWatcher struct {
|
||||||
encoding runtime.Codec
|
encoding runtime.Codec
|
||||||
|
// Note that versioner is required for etcdWatcher to work correctly.
|
||||||
|
// There is no public constructor of it, so be careful when manipulating
|
||||||
|
// with it manually.
|
||||||
versioner storage.Versioner
|
versioner storage.Versioner
|
||||||
transform TransformFunc
|
transform TransformFunc
|
||||||
|
|
||||||
@ -108,9 +111,12 @@ type etcdWatcher struct {
|
|||||||
// watchWaitDuration is the amount of time to wait for an error from watch.
|
// watchWaitDuration is the amount of time to wait for an error from watch.
|
||||||
const watchWaitDuration = 100 * time.Millisecond
|
const watchWaitDuration = 100 * time.Millisecond
|
||||||
|
|
||||||
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
|
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes.
|
||||||
// and a versioner, the versioner must be able to handle the objects that transform creates.
|
// The versioner must be able to handle the objects that transform creates.
|
||||||
func newEtcdWatcher(list bool, quorum bool, include includeFunc, filter storage.FilterFunc, encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc, cache etcdCache) *etcdWatcher {
|
func newEtcdWatcher(
|
||||||
|
list bool, quorum bool, include includeFunc, filter storage.FilterFunc,
|
||||||
|
encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc,
|
||||||
|
cache etcdCache) *etcdWatcher {
|
||||||
w := &etcdWatcher{
|
w := &etcdWatcher{
|
||||||
encoding: encoding,
|
encoding: encoding,
|
||||||
versioner: versioner,
|
versioner: versioner,
|
||||||
@ -310,10 +316,8 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ensure resource version is set on the object we load from etcd
|
// ensure resource version is set on the object we load from etcd
|
||||||
if w.versioner != nil {
|
if err := w.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex); err != nil {
|
||||||
if err := w.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex); err != nil {
|
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err))
|
||||||
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// perform any necessary transformation
|
// perform any necessary transformation
|
||||||
|
Loading…
Reference in New Issue
Block a user