diff --git a/pkg/api/rest/types.go b/pkg/api/rest/types.go index dad927ff5ec..a5523aeb18e 100644 --- a/pkg/api/rest/types.go +++ b/pkg/api/rest/types.go @@ -87,3 +87,7 @@ func (svcStrategy) AllowCreateOnUpdate() bool { func (svcStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object) fielderrors.ValidationErrorList { return validation.ValidateServiceUpdate(old.(*api.Service), obj.(*api.Service)) } + +func (svcStrategy) AllowUnconditionalUpdate() bool { + return true +} diff --git a/pkg/api/rest/update.go b/pkg/api/rest/update.go index 96ce2440143..b7c1c02e903 100644 --- a/pkg/api/rest/update.go +++ b/pkg/api/rest/update.go @@ -40,6 +40,8 @@ type RESTUpdateStrategy interface { // ValidateUpdate is invoked after default fields in the object have been filled in before // the object is persisted. ValidateUpdate(ctx api.Context, obj, old runtime.Object) fielderrors.ValidationErrorList + // AllowUnconditionalUpdate returns true if the object can be updated unconditionally (irrespective of the latest resource version), when there is no resource version specified in the object. + AllowUnconditionalUpdate() bool } // BeforeUpdate ensures that common operations for all resources are performed on update. It only returns diff --git a/pkg/registry/controller/rest.go b/pkg/registry/controller/rest.go index 34997067695..bee7f00d450 100644 --- a/pkg/registry/controller/rest.go +++ b/pkg/registry/controller/rest.go @@ -97,6 +97,10 @@ func (rcStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object) field return append(validationErrorList, updateErrorList...) } +func (rcStrategy) AllowUnconditionalUpdate() bool { + return true +} + // ControllerToSelectableFields returns a label set that represents the object. func ControllerToSelectableFields(controller *api.ReplicationController) fields.Set { return fields.Set{ diff --git a/pkg/registry/endpoint/rest.go b/pkg/registry/endpoint/rest.go index 1e59a92ea32..72ae60cddca 100644 --- a/pkg/registry/endpoint/rest.go +++ b/pkg/registry/endpoint/rest.go @@ -73,6 +73,10 @@ func (endpointsStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object return append(errorList, validation.ValidateEndpointsUpdate(old.(*api.Endpoints), obj.(*api.Endpoints))...) } +func (endpointsStrategy) AllowUnconditionalUpdate() bool { + return true +} + // MatchEndpoints returns a generic matcher for a given label and field selector. func MatchEndpoints(label labels.Selector, field fields.Selector) generic.Matcher { return &generic.SelectionPredicate{label, field, EndpointsAttributes} diff --git a/pkg/registry/generic/etcd/etcd.go b/pkg/registry/generic/etcd/etcd.go index 5b764311116..ea495b6ad2f 100644 --- a/pkg/registry/generic/etcd/etcd.go +++ b/pkg/registry/generic/etcd/etcd.go @@ -271,6 +271,14 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool if err != nil { return nil, false, err } + // If AllowUnconditionalUpdate() is true and the object specified by the user does not have a resource version, + // then we populate it with the latest version. + // Else, we check that the version specified by the user matches the version of latest etcd object. + resourceVersion, err := e.Helper.Versioner.ObjectResourceVersion(obj) + if err != nil { + return nil, false, err + } + doUnconditionalUpdate := resourceVersion == 0 && e.UpdateStrategy.AllowUnconditionalUpdate() // TODO: expose TTL creating := false out := e.NewFunc() @@ -295,13 +303,21 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool } creating = false - newVersion, err := e.Helper.Versioner.ObjectResourceVersion(obj) - if err != nil { - return nil, nil, err - } - if newVersion != version { - // TODO: return the most recent version to a client? - return nil, nil, kubeerr.NewConflict(e.EndpointName, name, fmt.Errorf("the object has been modified; please apply your changes to the latest version and try again")) + if doUnconditionalUpdate { + // Update the object's resource version to match the latest etcd object's resource version. + err = e.Helper.Versioner.UpdateObject(obj, res.Expiration, res.ResourceVersion) + if err != nil { + return nil, nil, err + } + } else { + // Check if the object's resource version matches the latest resource version. + newVersion, err := e.Helper.Versioner.ObjectResourceVersion(obj) + if err != nil { + return nil, nil, err + } + if newVersion != version { + return nil, nil, kubeerr.NewConflict(e.EndpointName, name, fmt.Errorf("the object has been modified; please apply your changes to the latest version and try again")) + } } if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil { return nil, nil, err diff --git a/pkg/registry/generic/etcd/etcd_test.go b/pkg/registry/generic/etcd/etcd_test.go index 7431b813582..f1345352d0b 100644 --- a/pkg/registry/generic/etcd/etcd_test.go +++ b/pkg/registry/generic/etcd/etcd_test.go @@ -37,12 +37,14 @@ import ( type testRESTStrategy struct { runtime.ObjectTyper api.NameGenerator - namespaceScoped bool - allowCreateOnUpdate bool + namespaceScoped bool + allowCreateOnUpdate bool + allowUnconditionalUpdate bool } -func (t *testRESTStrategy) NamespaceScoped() bool { return t.namespaceScoped } -func (t *testRESTStrategy) AllowCreateOnUpdate() bool { return t.allowCreateOnUpdate } +func (t *testRESTStrategy) NamespaceScoped() bool { return t.namespaceScoped } +func (t *testRESTStrategy) AllowCreateOnUpdate() bool { return t.allowCreateOnUpdate } +func (t *testRESTStrategy) AllowUnconditionalUpdate() bool { return t.allowUnconditionalUpdate } func (t *testRESTStrategy) PrepareForCreate(obj runtime.Object) {} func (t *testRESTStrategy) PrepareForUpdate(obj, old runtime.Object) {} @@ -68,7 +70,7 @@ func NewTestGenericEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, *Etcd) { f := tools.NewFakeEtcdClient(t) f.TestIndex = true h := tools.NewEtcdHelper(f, testapi.Codec(), etcdtest.PathPrefix()) - strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false} + strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false, true} podPrefix := "/pods" return f, &Etcd{ NewFunc: func() runtime.Object { return &api.Pod{} }, @@ -390,7 +392,10 @@ func TestEtcdUpdate(t *testing.T) { ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault, ResourceVersion: "1"}, Spec: api.PodSpec{NodeName: "machine2"}, } - + podAWithResourceVersion := &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault, ResourceVersion: "3"}, + Spec: api.PodSpec{NodeName: "machine"}, + } nodeWithPodA := tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ @@ -424,18 +429,29 @@ func TestEtcdUpdate(t *testing.T) { E: nil, } + nodeWithPodAWithResourceVersion := tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(testapi.Codec(), podAWithResourceVersion), + ModifiedIndex: 3, + CreatedIndex: 1, + }, + }, + E: nil, + } emptyNode := tools.EtcdResponseWithError{ R: &etcd.Response{}, E: tools.EtcdErrorNotFound, } table := map[string]struct { - existing tools.EtcdResponseWithError - expect tools.EtcdResponseWithError - toUpdate runtime.Object - allowCreate bool - objOK func(obj runtime.Object) bool - errOK func(error) bool + existing tools.EtcdResponseWithError + expect tools.EtcdResponseWithError + toUpdate runtime.Object + allowCreate bool + allowUnconditionalUpdate bool + objOK func(obj runtime.Object) bool + errOK func(error) bool }{ "normal": { existing: nodeWithPodA, @@ -462,11 +478,19 @@ func TestEtcdUpdate(t *testing.T) { toUpdate: podB, errOK: func(err error) bool { return errors.IsConflict(err) }, }, + "unconditionalUpdate": { + existing: nodeWithPodAWithResourceVersion, + allowUnconditionalUpdate: true, + toUpdate: podA, + objOK: func(obj runtime.Object) bool { return true }, + errOK: func(err error) bool { return err == nil }, + }, } for name, item := range table { fakeClient, registry := NewTestGenericEtcdRegistry(t) registry.UpdateStrategy.(*testRESTStrategy).allowCreateOnUpdate = item.allowCreate + registry.UpdateStrategy.(*testRESTStrategy).allowUnconditionalUpdate = item.allowUnconditionalUpdate path := etcdtest.AddPrefix("pods/foo") fakeClient.Data[path] = item.existing obj, _, err := registry.Update(api.NewDefaultContext(), item.toUpdate) diff --git a/pkg/registry/minion/rest.go b/pkg/registry/minion/rest.go index 528fde5b1d6..3d2504c9678 100644 --- a/pkg/registry/minion/rest.go +++ b/pkg/registry/minion/rest.go @@ -82,6 +82,10 @@ func (nodeStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object) fie return append(errorList, validation.ValidateNodeUpdate(old.(*api.Node), obj.(*api.Node))...) } +func (nodeStrategy) AllowUnconditionalUpdate() bool { + return true +} + type nodeStatusStrategy struct { nodeStrategy } diff --git a/pkg/registry/namespace/rest.go b/pkg/registry/namespace/rest.go index ff09ed59093..ce19617bbf5 100644 --- a/pkg/registry/namespace/rest.go +++ b/pkg/registry/namespace/rest.go @@ -93,6 +93,10 @@ func (namespaceStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object return append(errorList, validation.ValidateNamespaceUpdate(obj.(*api.Namespace), old.(*api.Namespace))...) } +func (namespaceStrategy) AllowUnconditionalUpdate() bool { + return true +} + type namespaceStatusStrategy struct { namespaceStrategy } diff --git a/pkg/registry/persistentvolume/rest.go b/pkg/registry/persistentvolume/rest.go index 6501bd591eb..c9b8000eb5e 100644 --- a/pkg/registry/persistentvolume/rest.go +++ b/pkg/registry/persistentvolume/rest.go @@ -69,6 +69,10 @@ func (persistentvolumeStrategy) ValidateUpdate(ctx api.Context, obj, old runtime return append(errorList, validation.ValidatePersistentVolumeUpdate(obj.(*api.PersistentVolume), old.(*api.PersistentVolume))...) } +func (persistentvolumeStrategy) AllowUnconditionalUpdate() bool { + return true +} + type persistentvolumeStatusStrategy struct { persistentvolumeStrategy } diff --git a/pkg/registry/persistentvolumeclaim/rest.go b/pkg/registry/persistentvolumeclaim/rest.go index b315358e525..b565759b5fd 100644 --- a/pkg/registry/persistentvolumeclaim/rest.go +++ b/pkg/registry/persistentvolumeclaim/rest.go @@ -69,6 +69,10 @@ func (persistentvolumeclaimStrategy) ValidateUpdate(ctx api.Context, obj, old ru return append(errorList, validation.ValidatePersistentVolumeClaimUpdate(obj.(*api.PersistentVolumeClaim), old.(*api.PersistentVolumeClaim))...) } +func (persistentvolumeclaimStrategy) AllowUnconditionalUpdate() bool { + return true +} + type persistentvolumeclaimStatusStrategy struct { persistentvolumeclaimStrategy } diff --git a/pkg/registry/pod/rest.go b/pkg/registry/pod/rest.go index 46d608061d6..a4a10741e6f 100644 --- a/pkg/registry/pod/rest.go +++ b/pkg/registry/pod/rest.go @@ -81,6 +81,10 @@ func (podStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object) fiel return append(errorList, validation.ValidatePodUpdate(obj.(*api.Pod), old.(*api.Pod))...) } +func (podStrategy) AllowUnconditionalUpdate() bool { + return true +} + // CheckGracefulDelete allows a pod to be gracefully deleted. func (podStrategy) CheckGracefulDelete(obj runtime.Object, options *api.DeleteOptions) bool { return false diff --git a/pkg/registry/podtemplate/rest.go b/pkg/registry/podtemplate/rest.go index c1fea94fecb..982f516d84e 100644 --- a/pkg/registry/podtemplate/rest.go +++ b/pkg/registry/podtemplate/rest.go @@ -69,6 +69,10 @@ func (podTemplateStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Obje return validation.ValidatePodTemplateUpdate(obj.(*api.PodTemplate), old.(*api.PodTemplate)) } +func (podTemplateStrategy) AllowUnconditionalUpdate() bool { + return true +} + // MatchPodTemplate returns a generic matcher for a given label and field selector. func MatchPodTemplate(label labels.Selector, field fields.Selector) generic.Matcher { return generic.MatcherFunc(func(obj runtime.Object) (bool, error) { diff --git a/pkg/registry/resourcequota/rest.go b/pkg/registry/resourcequota/rest.go index f1003eb1e16..ae256e1f1ab 100644 --- a/pkg/registry/resourcequota/rest.go +++ b/pkg/registry/resourcequota/rest.go @@ -73,6 +73,10 @@ func (resourcequotaStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Ob return append(errorList, validation.ValidateResourceQuotaUpdate(obj.(*api.ResourceQuota), old.(*api.ResourceQuota))...) } +func (resourcequotaStrategy) AllowUnconditionalUpdate() bool { + return true +} + type resourcequotaStatusStrategy struct { resourcequotaStrategy } diff --git a/pkg/registry/secret/strategy.go b/pkg/registry/secret/strategy.go index d8eac2081c6..942d0be1d77 100644 --- a/pkg/registry/secret/strategy.go +++ b/pkg/registry/secret/strategy.go @@ -65,6 +65,10 @@ func (strategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object) fielder return validation.ValidateSecretUpdate(old.(*api.Secret), obj.(*api.Secret)) } +func (strategy) AllowUnconditionalUpdate() bool { + return true +} + // Matcher returns a generic matcher for a given label and field selector. func Matcher(label labels.Selector, field fields.Selector) generic.Matcher { return generic.MatcherFunc(func(obj runtime.Object) (bool, error) { diff --git a/pkg/registry/serviceaccount/strategy.go b/pkg/registry/serviceaccount/strategy.go index 3d7373aafce..54c593da357 100644 --- a/pkg/registry/serviceaccount/strategy.go +++ b/pkg/registry/serviceaccount/strategy.go @@ -68,6 +68,10 @@ func (strategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object) fielder return validation.ValidateServiceAccountUpdate(old.(*api.ServiceAccount), obj.(*api.ServiceAccount)) } +func (strategy) AllowUnconditionalUpdate() bool { + return true +} + // Matcher returns a generic matcher for a given label and field selector. func Matcher(label labels.Selector, field fields.Selector) generic.Matcher { return generic.MatcherFunc(func(obj runtime.Object) (bool, error) { diff --git a/pkg/tools/etcd_helper.go b/pkg/tools/etcd_helper.go index d60cf68b43a..05f2f952313 100644 --- a/pkg/tools/etcd_helper.go +++ b/pkg/tools/etcd_helper.go @@ -207,7 +207,7 @@ func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er } if h.Versioner != nil { // being unable to set the version does not prevent the object from being extracted - _ = h.Versioner.UpdateObject(obj.Interface().(runtime.Object), node) + _ = h.Versioner.UpdateObject(obj.Interface().(runtime.Object), node.Expiration, node.ModifiedIndex) } v.Set(reflect.Append(v, obj.Elem())) if node.ModifiedIndex != 0 { @@ -377,7 +377,7 @@ func (h *EtcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run body = node.Value err = h.Codec.DecodeInto([]byte(body), objPtr) if h.Versioner != nil { - _ = h.Versioner.UpdateObject(objPtr, node) + _ = h.Versioner.UpdateObject(objPtr, node.Expiration, node.ModifiedIndex) // being unable to set the version does not prevent the object from being extracted } return body, node, err @@ -492,6 +492,11 @@ type ResponseMeta struct { // zero or negative in some cases (objects may be expired after the requested // expiration time due to server lag). TTL int64 + // Expiration is the time at which the node that contained the returned object will expire and be deleted. + // This can be nil if there is no expiration time set for the node. + Expiration *time.Time + // The resource version of the node that contained the returned object. + ResourceVersion uint64 } // Pass an EtcdUpdateFunc to EtcdHelper.GuaranteedUpdate to make an etcd update that is guaranteed to succeed. @@ -525,7 +530,7 @@ func SimpleUpdate(fn SimpleEtcdUpdateFunc) EtcdUpdateFunc { // cur.Counter++ // // // Return the modified object. Return an error to stop iterating. Return a uint64 to alter -// // the TTL on the object, or nil to keep it the same value. +// // the TTL on the object, or nil to keep it the same value. // return cur, nil, nil // }) // @@ -545,8 +550,12 @@ func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno meta := ResponseMeta{} if node != nil { meta.TTL = node.TTL + if node.Expiration != nil { + meta.Expiration = node.Expiration + } + meta.ResourceVersion = node.ModifiedIndex } - + // Get the object to be written by calling tryUpdate. ret, newTTL, err := tryUpdate(obj, meta) if err != nil { return err @@ -589,9 +598,11 @@ func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno } startTime := time.Now() + // Swap origBody with data, if origBody is the latest etcd data. response, err := h.Client.CompareAndSwap(key, string(data), ttl, origBody, index) recordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime) if IsEtcdTestFailed(err) { + // Try again. continue } _, _, err = h.extractObj(response, err, ptrToType, false, false) diff --git a/pkg/tools/etcd_helper_watch.go b/pkg/tools/etcd_helper_watch.go index a10128cdd4a..1a3fd1e74b3 100644 --- a/pkg/tools/etcd_helper_watch.go +++ b/pkg/tools/etcd_helper_watch.go @@ -270,7 +270,7 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) { // ensure resource version is set on the object we load from etcd if w.versioner != nil { - if err := w.versioner.UpdateObject(obj, node); err != nil { + if err := w.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex); err != nil { glog.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err) } } diff --git a/pkg/tools/etcd_object.go b/pkg/tools/etcd_object.go index 7db6e2646d5..132b0876c0b 100644 --- a/pkg/tools/etcd_object.go +++ b/pkg/tools/etcd_object.go @@ -18,12 +18,11 @@ package tools import ( "strconv" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - - "github.com/coreos/go-etcd/etcd" ) // APIObjectVersioner implements versioning and extracting etcd node information @@ -31,18 +30,17 @@ import ( type APIObjectVersioner struct{} // UpdateObject implements EtcdVersioner -func (a APIObjectVersioner) UpdateObject(obj runtime.Object, node *etcd.Node) error { +func (a APIObjectVersioner) UpdateObject(obj runtime.Object, expiration *time.Time, resourceVersion uint64) error { objectMeta, err := api.ObjectMetaFor(obj) if err != nil { return err } - if ttl := node.Expiration; ttl != nil { - objectMeta.DeletionTimestamp = &util.Time{*node.Expiration} + if expiration != nil { + objectMeta.DeletionTimestamp = &util.Time{*expiration} } - version := node.ModifiedIndex versionString := "" - if version != 0 { - versionString = strconv.FormatUint(version, 10) + if resourceVersion != 0 { + versionString = strconv.FormatUint(resourceVersion, 10) } objectMeta.ResourceVersion = versionString return nil diff --git a/pkg/tools/etcd_object_test.go b/pkg/tools/etcd_object_test.go index a3932d274b5..6675fe35013 100644 --- a/pkg/tools/etcd_object_test.go +++ b/pkg/tools/etcd_object_test.go @@ -22,7 +22,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/coreos/go-etcd/etcd" ) func TestObjectVersioner(t *testing.T) { @@ -34,7 +33,7 @@ func TestObjectVersioner(t *testing.T) { t.Errorf("unexpected version: %d %v", ver, err) } obj := &TestResource{ObjectMeta: api.ObjectMeta{ResourceVersion: "a"}} - if err := v.UpdateObject(obj, &etcd.Node{ModifiedIndex: 5}); err != nil { + if err := v.UpdateObject(obj, nil, 5); err != nil { t.Fatalf("unexpected error: %v", err) } if obj.ResourceVersion != "5" || obj.DeletionTimestamp != nil { @@ -42,7 +41,7 @@ func TestObjectVersioner(t *testing.T) { } now := util.Time{time.Now()} obj = &TestResource{ObjectMeta: api.ObjectMeta{ResourceVersion: "a"}} - if err := v.UpdateObject(obj, &etcd.Node{ModifiedIndex: 5, Expiration: &now.Time}); err != nil { + if err := v.UpdateObject(obj, &now.Time, 5); err != nil { t.Fatalf("unexpected error: %v", err) } if obj.ResourceVersion != "5" || *obj.DeletionTimestamp != now { diff --git a/pkg/tools/interfaces.go b/pkg/tools/interfaces.go index 241a01721cc..873d09602b3 100644 --- a/pkg/tools/interfaces.go +++ b/pkg/tools/interfaces.go @@ -19,6 +19,7 @@ package tools import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/coreos/go-etcd/etcd" + "time" ) const ( @@ -66,7 +67,7 @@ type EtcdVersioner interface { // UpdateObject sets etcd storage metadata into an API object. Returns an error if the object // cannot be updated correctly. May return nil if the requested object does not need metadata // from etcd. - UpdateObject(obj runtime.Object, node *etcd.Node) error + UpdateObject(obj runtime.Object, expiration *time.Time, resourceVersion uint64) error // UpdateList sets the resource version into an API list object. Returns an error if the object // cannot be updated correctly. May return nil if the requested object does not need metadata // from etcd.