Merge pull request #23995 from hongchaodeng/ttl

Automatic merge from submit-queue

etcd3/store: support TTL in Create, Update

ref: https://github.com/kubernetes/kubernetes/issues/22448

What's included in the PR?
- Support TTL keys in Create() and Update()
- Simple testing for create

Note
- Have TTL keys in a long window (5min) in one lease. I'm not sure if we should do it right now. We could just implement it as simple as is. Once we have etcd3 work done we could start testing and measuring with some realistic load.
This commit is contained in:
k8s-merge-robot 2016-04-20 13:30:54 -07:00
commit 7f362b18de
2 changed files with 82 additions and 8 deletions

View File

@ -121,10 +121,15 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
}
key = keyWithPrefix(s.pathPrefix, key)
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
}
txnResp, err := s.client.KV.Txn(ctx).If(
notFound(key),
).Then(
clientv3.OpPut(key, string(data)),
clientv3.OpPut(key, string(data), opts...),
).Commit()
if err != nil {
return err
@ -225,7 +230,7 @@ func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Ob
return err
}
ret, err := s.updateState(origState, tryUpdate)
ret, ttl, err := s.updateState(origState, tryUpdate)
if err != nil {
return err
}
@ -238,10 +243,15 @@ func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Ob
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
}
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
}
txnResp, err := s.client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
).Then(
clientv3.OpPut(key, string(data)),
clientv3.OpPut(key, string(data), opts...),
).Else(
clientv3.OpGet(key),
).Commit()
@ -358,19 +368,39 @@ func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Va
return state, nil
}
func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtime.Object, error) {
ret, _, err := userUpdate(st.obj, *st.meta)
func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtime.Object, uint64, error) {
ret, ttlPtr, err := userUpdate(st.obj, *st.meta)
version, err := s.versioner.ObjectResourceVersion(ret)
if err != nil {
return nil, err
return nil, 0, err
}
if version != 0 {
// We cannot store object with resourceVersion in etcd. We need to reset it.
if err := s.versioner.UpdateObject(ret, 0); err != nil {
return nil, fmt.Errorf("UpdateObject failed: %v", err)
return nil, 0, fmt.Errorf("UpdateObject failed: %v", err)
}
}
return ret, nil
var ttl uint64
if ttlPtr != nil {
ttl = *ttlPtr
}
return ret, ttl, nil
}
// ttlOpts returns client options based on given ttl.
// ttl: if ttl is non-zero, it will attach the key to a lease with ttl of roughly the same length
func (s *store) ttlOpts(ctx context.Context, ttl int64) ([]clientv3.OpOption, error) {
if ttl == 0 {
return nil, nil
}
// TODO: one lease per ttl key is expensive. Based on current use case, we can have a long window to
// put keys within into same lease. We shall benchmark this and optimize the performance.
lcr, err := s.client.Lease.Grant(ctx, ttl)
if err != nil {
return nil, err
}
return []clientv3.OpOption{clientv3.WithLease(clientv3.LeaseID(lcr.ID))}, nil
}
func keyWithPrefix(prefix, key string) string {

View File

@ -29,6 +29,7 @@ import (
"github.com/coreos/etcd/integration"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/watch"
)
func TestCreate(t *testing.T) {
@ -71,6 +72,25 @@ func TestCreate(t *testing.T) {
}
}
func TestCreateWithTTL(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
input := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
key := "/somekey"
out := &api.Pod{}
if err := store.Create(ctx, key, input, out, 1); err != nil {
t.Fatalf("Create failed: %v", err)
}
w, err := store.Watch(ctx, key, out.ResourceVersion, storage.Everything)
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
testCheckResult(t, 0, watch.Deleted, w, nil)
}
func TestCreateWithKeyExist(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
@ -355,6 +375,30 @@ func TestGuaranteedUpdate(t *testing.T) {
}
}
func TestGuaranteedUpdateWithTTL(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
input := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
key := "/somekey"
out := &api.Pod{}
err := store.GuaranteedUpdate(ctx, key, out, true, nil,
func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
ttl := uint64(1)
return input, &ttl, nil
})
if err != nil {
t.Fatalf("Create failed: %v", err)
}
w, err := store.Watch(ctx, key, out.ResourceVersion, storage.Everything)
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
testCheckResult(t, 0, watch.Deleted, w, nil)
}
func TestGuaranteedUpdateWithConflict(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)