etcd3/store: support TTL in Create, Update

This commit is contained in:
Hongchao Deng 2016-04-17 20:25:42 +08:00
parent 6e99624dd6
commit 46214c60bb
2 changed files with 82 additions and 8 deletions

View File

@ -121,10 +121,15 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
}
key = keyWithPrefix(s.pathPrefix, key)
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
}
txnResp, err := s.client.KV.Txn(ctx).If(
notFound(key),
).Then(
clientv3.OpPut(key, string(data)),
clientv3.OpPut(key, string(data), opts...),
).Commit()
if err != nil {
return err
@ -225,7 +230,7 @@ func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Ob
return err
}
ret, err := s.updateState(origState, tryUpdate)
ret, ttl, err := s.updateState(origState, tryUpdate)
if err != nil {
return err
}
@ -238,10 +243,15 @@ func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Ob
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
}
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
}
txnResp, err := s.client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
).Then(
clientv3.OpPut(key, string(data)),
clientv3.OpPut(key, string(data), opts...),
).Else(
clientv3.OpGet(key),
).Commit()
@ -358,19 +368,39 @@ func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Va
return state, nil
}
func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtime.Object, error) {
ret, _, err := userUpdate(st.obj, *st.meta)
func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtime.Object, uint64, error) {
ret, ttlPtr, err := userUpdate(st.obj, *st.meta)
version, err := s.versioner.ObjectResourceVersion(ret)
if err != nil {
return nil, err
return nil, 0, err
}
if version != 0 {
// We cannot store object with resourceVersion in etcd. We need to reset it.
if err := s.versioner.UpdateObject(ret, 0); err != nil {
return nil, fmt.Errorf("UpdateObject failed: %v", err)
return nil, 0, fmt.Errorf("UpdateObject failed: %v", err)
}
}
return ret, nil
var ttl uint64
if ttlPtr != nil {
ttl = *ttlPtr
}
return ret, ttl, nil
}
// ttlOpts returns client options based on given ttl.
// ttl: if ttl is non-zero, it will attach the key to a lease with ttl of roughly the same length
func (s *store) ttlOpts(ctx context.Context, ttl int64) ([]clientv3.OpOption, error) {
if ttl == 0 {
return nil, nil
}
// TODO: one lease per ttl key is expensive. Based on current use case, we can have a long window to
// put keys within into same lease. We shall benchmark this and optimize the performance.
lcr, err := s.client.Lease.Grant(ctx, ttl)
if err != nil {
return nil, err
}
return []clientv3.OpOption{clientv3.WithLease(clientv3.LeaseID(lcr.ID))}, nil
}
func keyWithPrefix(prefix, key string) string {

View File

@ -29,6 +29,7 @@ import (
"github.com/coreos/etcd/integration"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/watch"
)
func TestCreate(t *testing.T) {
@ -71,6 +72,25 @@ func TestCreate(t *testing.T) {
}
}
func TestCreateWithTTL(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
input := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
key := "/somekey"
out := &api.Pod{}
if err := store.Create(ctx, key, input, out, 1); err != nil {
t.Fatalf("Create failed: %v", err)
}
w, err := store.Watch(ctx, key, out.ResourceVersion, storage.Everything)
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
testCheckResult(t, 0, watch.Deleted, w, nil)
}
func TestCreateWithKeyExist(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
@ -355,6 +375,30 @@ func TestGuaranteedUpdate(t *testing.T) {
}
}
func TestGuaranteedUpdateWithTTL(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
input := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
key := "/somekey"
out := &api.Pod{}
err := store.GuaranteedUpdate(ctx, key, out, true, nil,
func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
ttl := uint64(1)
return input, &ttl, nil
})
if err != nil {
t.Fatalf("Create failed: %v", err)
}
w, err := store.Watch(ctx, key, out.ResourceVersion, storage.Everything)
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
testCheckResult(t, 0, watch.Deleted, w, nil)
}
func TestGuaranteedUpdateWithConflict(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)