Merge pull request #23923 from hongchaodeng/exp

Automatic merge from submit-queue

Decouple etcd node.expiration logic from DeleitonTimestamp

ref: https://github.com/kubernetes/kubernetes/issues/23902
This commit is contained in:
k8s-merge-robot 2016-04-17 04:12:26 -07:00
commit 2bf52175f9
7 changed files with 12 additions and 35 deletions

View File

@ -274,7 +274,7 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool
// function, we are resetting resourceVersion to the initial value here. // function, we are resetting resourceVersion to the initial value here.
// //
// TODO: In fact, we should probably return a DeepCopy of obj in all places. // TODO: In fact, we should probably return a DeepCopy of obj in all places.
err := e.Storage.Versioner().UpdateObject(obj, nil, resourceVersion) err := e.Storage.Versioner().UpdateObject(obj, resourceVersion)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -301,7 +301,7 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool
creating = false creating = false
if doUnconditionalUpdate { if doUnconditionalUpdate {
// Update the object's resource version to match the latest etcd object's resource version. // Update the object's resource version to match the latest etcd object's resource version.
err = e.Storage.Versioner().UpdateObject(obj, res.Expiration, res.ResourceVersion) err = e.Storage.Versioner().UpdateObject(obj, res.ResourceVersion)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

View File

@ -18,11 +18,9 @@ package etcd
import ( import (
"strconv" "strconv"
"time"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/storage"
) )
@ -32,14 +30,11 @@ import (
type APIObjectVersioner struct{} type APIObjectVersioner struct{}
// UpdateObject implements Versioner // UpdateObject implements Versioner
func (a APIObjectVersioner) UpdateObject(obj runtime.Object, expiration *time.Time, resourceVersion uint64) error { func (a APIObjectVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) error {
accessor, err := meta.Accessor(obj) accessor, err := meta.Accessor(obj)
if err != nil { if err != nil {
return err return err
} }
if expiration != nil {
accessor.SetDeletionTimestamp(&unversioned.Time{Time: *expiration})
}
versionString := "" versionString := ""
if resourceVersion != 0 { if resourceVersion != 0 {
versionString = strconv.FormatUint(resourceVersion, 10) versionString = strconv.FormatUint(resourceVersion, 10)

View File

@ -18,10 +18,8 @@ package etcd
import ( import (
"testing" "testing"
"time"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
storagetesting "k8s.io/kubernetes/pkg/storage/testing" storagetesting "k8s.io/kubernetes/pkg/storage/testing"
) )
@ -34,18 +32,10 @@ func TestObjectVersioner(t *testing.T) {
t.Errorf("unexpected version: %d %v", ver, err) t.Errorf("unexpected version: %d %v", ver, err)
} }
obj := &storagetesting.TestResource{ObjectMeta: api.ObjectMeta{ResourceVersion: "a"}} obj := &storagetesting.TestResource{ObjectMeta: api.ObjectMeta{ResourceVersion: "a"}}
if err := v.UpdateObject(obj, nil, 5); err != nil { if err := v.UpdateObject(obj, 5); err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
if obj.ResourceVersion != "5" || obj.DeletionTimestamp != nil { if obj.ResourceVersion != "5" || obj.DeletionTimestamp != nil {
t.Errorf("unexpected resource version: %#v", obj) t.Errorf("unexpected resource version: %#v", obj)
} }
now := unversioned.Time{Time: time.Now()}
obj = &storagetesting.TestResource{ObjectMeta: api.ObjectMeta{ResourceVersion: "a"}}
if err := v.UpdateObject(obj, &now.Time, 5); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if obj.ResourceVersion != "5" || *obj.DeletionTimestamp != now {
t.Errorf("unexpected resource version: %#v", obj)
}
} }

View File

@ -392,7 +392,7 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
return body, nil, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr)) return body, nil, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr))
} }
// being unable to set the version does not prevent the object from being extracted // being unable to set the version does not prevent the object from being extracted
_ = h.versioner.UpdateObject(objPtr, node.Expiration, node.ModifiedIndex) _ = h.versioner.UpdateObject(objPtr, node.ModifiedIndex)
return body, node, err return body, node, err
} }
@ -466,7 +466,7 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun
return err return err
} }
// being unable to set the version does not prevent the object from being extracted // being unable to set the version does not prevent the object from being extracted
_ = h.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex) _ = h.versioner.UpdateObject(obj, node.ModifiedIndex)
if filter(obj) { if filter(obj) {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
} }
@ -557,9 +557,6 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType
meta := storage.ResponseMeta{} meta := storage.ResponseMeta{}
if node != nil { if node != nil {
meta.TTL = node.TTL meta.TTL = node.TTL
if node.Expiration != nil {
meta.Expiration = node.Expiration
}
meta.ResourceVersion = node.ModifiedIndex meta.ResourceVersion = node.ModifiedIndex
} }
// Get the object to be written by calling tryUpdate. // Get the object to be written by calling tryUpdate.
@ -591,7 +588,7 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType
} }
// Since update object may have a resourceVersion set, we need to clear it here. // Since update object may have a resourceVersion set, we need to clear it here.
if err := h.versioner.UpdateObject(ret, meta.Expiration, 0); err != nil { if err := h.versioner.UpdateObject(ret, 0); err != nil {
return errors.New("resourceVersion cannot be set on objects store in etcd") return errors.New("resourceVersion cannot be set on objects store in etcd")
} }

View File

@ -317,7 +317,7 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
} }
// ensure resource version is set on the object we load from etcd // ensure resource version is set on the object we load from etcd
if err := w.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex); err != nil { if err := w.versioner.UpdateObject(obj, node.ModifiedIndex); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err)) utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err))
} }

View File

@ -354,7 +354,7 @@ func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtim
} }
if version != 0 { if version != 0 {
// We cannot store object with resourceVersion in etcd. We need to reset it. // We cannot store object with resourceVersion in etcd. We need to reset it.
if err := s.versioner.UpdateObject(ret, nil, 0); err != nil { if err := s.versioner.UpdateObject(ret, 0); err != nil {
return nil, fmt.Errorf("UpdateObject failed: %v", err) return nil, fmt.Errorf("UpdateObject failed: %v", err)
} }
} }
@ -379,7 +379,7 @@ func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objP
return err return err
} }
// being unable to set the version does not prevent the object from being extracted // being unable to set the version does not prevent the object from being extracted
versioner.UpdateObject(objPtr, nil, uint64(rev)) versioner.UpdateObject(objPtr, uint64(rev))
return nil return nil
} }
@ -396,7 +396,7 @@ func decodeList(elems []*elemForDecode, filter storage.FilterFunc, ListPtr inter
return err return err
} }
// being unable to set the version does not prevent the object from being extracted // being unable to set the version does not prevent the object from being extracted
versioner.UpdateObject(obj, nil, elem.rev) versioner.UpdateObject(obj, elem.rev)
if filter(obj) { if filter(obj) {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
} }

View File

@ -17,8 +17,6 @@ limitations under the License.
package storage package storage
import ( import (
"time"
"golang.org/x/net/context" "golang.org/x/net/context"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
@ -31,7 +29,7 @@ type Versioner interface {
// UpdateObject sets storage metadata into an API object. Returns an error if the object // UpdateObject sets storage metadata into an API object. Returns an error if the object
// cannot be updated correctly. May return nil if the requested object does not need metadata // cannot be updated correctly. May return nil if the requested object does not need metadata
// from database. // from database.
UpdateObject(obj runtime.Object, expiration *time.Time, resourceVersion uint64) error UpdateObject(obj runtime.Object, resourceVersion uint64) error
// UpdateList sets the resource version into an API list object. Returns an error if the object // UpdateList sets the resource version into an API list object. Returns an error if the object
// cannot be updated correctly. May return nil if the requested object does not need metadata // cannot be updated correctly. May return nil if the requested object does not need metadata
// from database. // from database.
@ -49,9 +47,6 @@ type ResponseMeta struct {
// zero or negative in some cases (objects may be expired after the requested // zero or negative in some cases (objects may be expired after the requested
// expiration time due to server lag). // expiration time due to server lag).
TTL int64 TTL int64
// Expiration is the time at which the node that contained the returned object will expire and be deleted.
// This can be nil if there is no expiration time set for the node.
Expiration *time.Time
// The resource version of the node that contained the returned object. // The resource version of the node that contained the returned object.
ResourceVersion uint64 ResourceVersion uint64
} }