Merge pull request #8724 from smarterclayton/preserve_ttl

TTL is not preserved automatically during edit
This commit is contained in:
Rohit Jnagal
2015-05-29 11:12:00 -07:00
6 changed files with 245 additions and 96 deletions

View File

@@ -44,7 +44,7 @@ func NewEtcdRegistry(h tools.EtcdHelper, ttl uint64) generic.Registry {
KeyFunc: func(ctx api.Context, id string) (string, error) {
return etcdgeneric.NamespaceKeyFunc(ctx, prefix, id)
},
TTLFunc: func(runtime.Object, bool) (uint64, error) {
TTLFunc: func(runtime.Object, uint64, bool) (uint64, error) {
return ttl, nil
},
Helper: h,

View File

@@ -73,8 +73,9 @@ type Etcd struct {
ObjectNameFunc func(obj runtime.Object) (string, error)
// Return the TTL objects should be persisted with. Update is true if this
// is an operation against an existing object.
TTLFunc func(obj runtime.Object, update bool) (uint64, error)
// is an operation against an existing object. Existing is the current TTL
// or the default for this operation.
TTLFunc func(obj runtime.Object, existing uint64, update bool) (uint64, error)
// Returns a matcher corresponding to the provided labels and fields.
PredicateFunc func(label labels.Selector, field fields.Selector) generic.Matcher
@@ -185,12 +186,9 @@ func (e *Etcd) CreateWithName(ctx api.Context, name string, obj runtime.Object)
return err
}
}
ttl := uint64(0)
if e.TTLFunc != nil {
ttl, err = e.TTLFunc(obj, false)
if err != nil {
return err
}
ttl, err := e.calculateTTL(obj, 0, false)
if err != nil {
return err
}
err = e.Helper.CreateObj(key, obj, nil, ttl)
err = etcderr.InterpretCreateError(err, e.EndpointName, name)
@@ -215,12 +213,9 @@ func (e *Etcd) Create(ctx api.Context, obj runtime.Object) (runtime.Object, erro
if err != nil {
return nil, err
}
ttl := uint64(0)
if e.TTLFunc != nil {
ttl, err = e.TTLFunc(obj, false)
if err != nil {
return nil, err
}
ttl, err := e.calculateTTL(obj, 0, false)
if err != nil {
return nil, err
}
trace.Step("About to create object")
out := e.NewFunc()
@@ -250,12 +245,9 @@ func (e *Etcd) UpdateWithName(ctx api.Context, name string, obj runtime.Object)
if err != nil {
return err
}
ttl := uint64(0)
if e.TTLFunc != nil {
ttl, err = e.TTLFunc(obj, true)
if err != nil {
return err
}
ttl, err := e.calculateTTL(obj, 0, true)
if err != nil {
return err
}
err = e.Helper.SetObj(key, obj, nil, ttl)
err = etcderr.InterpretUpdateError(err, e.EndpointName, name)
@@ -282,49 +274,46 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool
// TODO: expose TTL
creating := false
out := e.NewFunc()
err = e.Helper.GuaranteedUpdate(key, out, true, func(existing runtime.Object) (runtime.Object, uint64, error) {
err = e.Helper.GuaranteedUpdate(key, out, true, func(existing runtime.Object, res tools.ResponseMeta) (runtime.Object, *uint64, error) {
version, err := e.Helper.Versioner.ObjectResourceVersion(existing)
if err != nil {
return nil, 0, err
return nil, nil, err
}
if version == 0 {
if !e.UpdateStrategy.AllowCreateOnUpdate() {
return nil, 0, kubeerr.NewNotFound(e.EndpointName, name)
return nil, nil, kubeerr.NewNotFound(e.EndpointName, name)
}
creating = true
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
return nil, 0, err
return nil, nil, err
}
ttl := uint64(0)
if e.TTLFunc != nil {
ttl, err = e.TTLFunc(obj, false)
if err != nil {
return nil, 0, err
}
ttl, err := e.calculateTTL(obj, 0, false)
if err != nil {
return nil, nil, err
}
return obj, ttl, nil
return obj, &ttl, nil
}
creating = false
newVersion, err := e.Helper.Versioner.ObjectResourceVersion(obj)
if err != nil {
return nil, 0, err
return nil, nil, err
}
if newVersion != version {
// TODO: return the most recent version to a client?
return nil, 0, kubeerr.NewConflict(e.EndpointName, name, fmt.Errorf("the resource was updated to %d", version))
return nil, nil, kubeerr.NewConflict(e.EndpointName, name, fmt.Errorf("the resource was updated to %d", version))
}
if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil {
return nil, 0, err
return nil, nil, err
}
ttl := uint64(0)
if e.TTLFunc != nil {
ttl, err = e.TTLFunc(obj, true)
if err != nil {
return nil, 0, err
}
ttl, err := e.calculateTTL(obj, res.TTL, true)
if err != nil {
return nil, nil, err
}
return obj, ttl, nil
if int64(ttl) != res.TTL {
return obj, &ttl, nil
}
return obj, nil, nil
})
if err != nil {
@@ -480,3 +469,19 @@ func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersio
return e.Helper.WatchList(e.KeyRootFunc(ctx), version, filterFunc)
}
// calculateTTL is a helper for retrieving the updated TTL for an object or returning an error
// if the TTL cannot be calculated. The defaultTTL is changed to 1 if less than zero. Zero means
// no TTL, not expire immediately.
func (e *Etcd) calculateTTL(obj runtime.Object, defaultTTL int64, update bool) (ttl uint64, err error) {
// etcd may return a negative TTL for a node if the expiration has not occured due
// to server lag - we will ensure that the value is at least set.
if defaultTTL < 0 {
defaultTTL = 1
}
ttl = uint64(defaultTTL)
if e.TTLFunc != nil {
ttl, err = e.TTLFunc(obj, ttl, update)
}
return ttl, err
}

View File

@@ -142,13 +142,16 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx api.Context, podID, oldMachin
if err != nil {
return nil, err
}
err = r.store.Helper.GuaranteedUpdate(podKey, &api.Pod{}, false, func(obj runtime.Object) (runtime.Object, uint64, error) {
err = r.store.Helper.GuaranteedUpdate(podKey, &api.Pod{}, false, tools.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
pod, ok := obj.(*api.Pod)
if !ok {
return nil, 0, fmt.Errorf("unexpected object: %#v", obj)
return nil, fmt.Errorf("unexpected object: %#v", obj)
}
if pod.DeletionTimestamp != nil {
return nil, fmt.Errorf("pod %s is being deleted, cannot be assigned to a host", pod.Name)
}
if pod.Spec.NodeName != oldMachine {
return nil, 0, fmt.Errorf("pod %v is already assigned to node %q", pod.Name, pod.Spec.NodeName)
return nil, fmt.Errorf("pod %v is already assigned to node %q", pod.Name, pod.Spec.NodeName)
}
pod.Spec.NodeName = machine
if pod.Annotations == nil {
@@ -158,8 +161,8 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx api.Context, podID, oldMachin
pod.Annotations[k] = v
}
finalPod = pod
return pod, 0, nil
})
return pod, nil
}))
return finalPod, err
}

View File

@@ -141,25 +141,25 @@ func (e *Etcd) Release(item int) error {
// tryUpdate performs a read-update to persist the latest snapshot state of allocation.
func (e *Etcd) tryUpdate(fn func() error) error {
err := e.helper.GuaranteedUpdate(e.baseKey, &api.RangeAllocation{}, true,
func(input runtime.Object) (output runtime.Object, ttl uint64, err error) {
tools.SimpleUpdate(func(input runtime.Object) (output runtime.Object, err error) {
existing := input.(*api.RangeAllocation)
if len(existing.ResourceVersion) == 0 {
return nil, 0, fmt.Errorf("cannot allocate resources of type %s at this time", e.kind)
return nil, fmt.Errorf("cannot allocate resources of type %s at this time", e.kind)
}
if existing.ResourceVersion != e.last {
if err := e.alloc.Restore(existing.Range, existing.Data); err != nil {
return nil, 0, err
return nil, err
}
if err := fn(); err != nil {
return nil, 0, err
return nil, err
}
}
e.last = existing.ResourceVersion
rangeSpec, data := e.alloc.Snapshot()
existing.Range = rangeSpec
existing.Data = data
return existing, 0, nil
},
return existing, nil
}),
)
return etcderr.InterpretUpdateError(err, e.kind, "")
}
@@ -198,19 +198,19 @@ func (e *Etcd) CreateOrUpdate(snapshot *api.RangeAllocation) error {
last := ""
err := e.helper.GuaranteedUpdate(e.baseKey, &api.RangeAllocation{}, true,
func(input runtime.Object) (output runtime.Object, ttl uint64, err error) {
tools.SimpleUpdate(func(input runtime.Object) (output runtime.Object, err error) {
existing := input.(*api.RangeAllocation)
switch {
case len(snapshot.ResourceVersion) != 0 && len(existing.ResourceVersion) != 0:
if snapshot.ResourceVersion != existing.ResourceVersion {
return nil, 0, k8serr.NewConflict(e.kind, "", fmt.Errorf("the provided resource version does not match"))
return nil, k8serr.NewConflict(e.kind, "", fmt.Errorf("the provided resource version does not match"))
}
case len(existing.ResourceVersion) != 0:
return nil, 0, k8serr.NewConflict(e.kind, "", fmt.Errorf("another caller has already initialized the resource"))
return nil, k8serr.NewConflict(e.kind, "", fmt.Errorf("another caller has already initialized the resource"))
}
last = snapshot.ResourceVersion
return snapshot, 0, nil
},
return snapshot, nil
}),
)
if err != nil {
return etcderr.InterpretUpdateError(err, e.kind, "")