diff --git a/pkg/storage/etcd3/store.go b/pkg/storage/etcd3/store.go index 46f10ad892c..f7a25987685 100644 --- a/pkg/storage/etcd3/store.go +++ b/pkg/storage/etcd3/store.go @@ -121,10 +121,15 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, } key = keyWithPrefix(s.pathPrefix, key) + opts, err := s.ttlOpts(ctx, int64(ttl)) + if err != nil { + return err + } + txnResp, err := s.client.KV.Txn(ctx).If( notFound(key), ).Then( - clientv3.OpPut(key, string(data)), + clientv3.OpPut(key, string(data), opts...), ).Commit() if err != nil { return err @@ -225,7 +230,7 @@ func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Ob return err } - ret, err := s.updateState(origState, tryUpdate) + ret, ttl, err := s.updateState(origState, tryUpdate) if err != nil { return err } @@ -238,10 +243,15 @@ func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Ob return decode(s.codec, s.versioner, origState.data, out, origState.rev) } + opts, err := s.ttlOpts(ctx, int64(ttl)) + if err != nil { + return err + } + txnResp, err := s.client.KV.Txn(ctx).If( clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev), ).Then( - clientv3.OpPut(key, string(data)), + clientv3.OpPut(key, string(data), opts...), ).Else( clientv3.OpGet(key), ).Commit() @@ -358,19 +368,39 @@ func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Va return state, nil } -func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtime.Object, error) { - ret, _, err := userUpdate(st.obj, *st.meta) +func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtime.Object, uint64, error) { + ret, ttlPtr, err := userUpdate(st.obj, *st.meta) + version, err := s.versioner.ObjectResourceVersion(ret) if err != nil { - return nil, err + return nil, 0, err } if version != 0 { // We cannot store object with resourceVersion in etcd. We need to reset it. if err := s.versioner.UpdateObject(ret, 0); err != nil { - return nil, fmt.Errorf("UpdateObject failed: %v", err) + return nil, 0, fmt.Errorf("UpdateObject failed: %v", err) } } - return ret, nil + var ttl uint64 + if ttlPtr != nil { + ttl = *ttlPtr + } + return ret, ttl, nil +} + +// ttlOpts returns client options based on given ttl. +// ttl: if ttl is non-zero, it will attach the key to a lease with ttl of roughly the same length +func (s *store) ttlOpts(ctx context.Context, ttl int64) ([]clientv3.OpOption, error) { + if ttl == 0 { + return nil, nil + } + // TODO: one lease per ttl key is expensive. Based on current use case, we can have a long window to + // put keys within into same lease. We shall benchmark this and optimize the performance. + lcr, err := s.client.Lease.Grant(ctx, ttl) + if err != nil { + return nil, err + } + return []clientv3.OpOption{clientv3.WithLease(clientv3.LeaseID(lcr.ID))}, nil } func keyWithPrefix(prefix, key string) string { diff --git a/pkg/storage/etcd3/store_test.go b/pkg/storage/etcd3/store_test.go index 917a64af8e0..4da97f60265 100644 --- a/pkg/storage/etcd3/store_test.go +++ b/pkg/storage/etcd3/store_test.go @@ -29,6 +29,7 @@ import ( "github.com/coreos/etcd/integration" "golang.org/x/net/context" + "k8s.io/kubernetes/pkg/watch" ) func TestCreate(t *testing.T) { @@ -71,6 +72,25 @@ func TestCreate(t *testing.T) { } } +func TestCreateWithTTL(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + + input := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} + key := "/somekey" + + out := &api.Pod{} + if err := store.Create(ctx, key, input, out, 1); err != nil { + t.Fatalf("Create failed: %v", err) + } + + w, err := store.Watch(ctx, key, out.ResourceVersion, storage.Everything) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + testCheckResult(t, 0, watch.Deleted, w, nil) +} + func TestCreateWithKeyExist(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) @@ -355,6 +375,30 @@ func TestGuaranteedUpdate(t *testing.T) { } } +func TestGuaranteedUpdateWithTTL(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + + input := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} + key := "/somekey" + + out := &api.Pod{} + err := store.GuaranteedUpdate(ctx, key, out, true, nil, + func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) { + ttl := uint64(1) + return input, &ttl, nil + }) + if err != nil { + t.Fatalf("Create failed: %v", err) + } + + w, err := store.Watch(ctx, key, out.ResourceVersion, storage.Everything) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + testCheckResult(t, 0, watch.Deleted, w, nil) +} + func TestGuaranteedUpdateWithConflict(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t)