Merge pull request #34711 from wojtek-t/serializable_get

Automatic merge from submit-queue

Change etcd Get operation to serializable

Ref https://github.com/kubernetes/kubernetes/issues/34709
This commit is contained in:
Kubernetes Submit Queue 2016-10-15 17:48:23 -07:00 committed by GitHub
commit 8b6cebcb35
5 changed files with 31 additions and 13 deletions

View File

@ -38,7 +38,10 @@ import (
) )
type store struct { type store struct {
client *clientv3.Client client *clientv3.Client
// getOpts contains additional options that should be passed
// to all Get() calls.
getOps []clientv3.OpOption
codec runtime.Codec codec runtime.Codec
versioner storage.Versioner versioner storage.Versioner
pathPrefix string pathPrefix string
@ -59,18 +62,30 @@ type objState struct {
// New returns an etcd3 implementation of storage.Interface. // New returns an etcd3 implementation of storage.Interface.
func New(c *clientv3.Client, codec runtime.Codec, prefix string) storage.Interface { func New(c *clientv3.Client, codec runtime.Codec, prefix string) storage.Interface {
return newStore(c, codec, prefix) return newStore(c, true, codec, prefix)
} }
func newStore(c *clientv3.Client, codec runtime.Codec, prefix string) *store { // NewWithNoQuorumRead returns etcd3 implementation of storage.Interface
// where Get operations don't require quorum read.
func NewWithNoQuorumRead(c *clientv3.Client, codec runtime.Codec, prefix string) storage.Interface {
return newStore(c, false, codec, prefix)
}
func newStore(c *clientv3.Client, quorumRead bool, codec runtime.Codec, prefix string) *store {
versioner := etcd.APIObjectVersioner{} versioner := etcd.APIObjectVersioner{}
return &store{ result := &store{
client: c, client: c,
versioner: versioner, versioner: versioner,
codec: codec, codec: codec,
pathPrefix: prefix, pathPrefix: prefix,
watcher: newWatcher(c, codec, versioner), watcher: newWatcher(c, codec, versioner),
} }
if !quorumRead {
// In case of non-quorum reads, we can set WithSerializable()
// options for all Get operations.
result.getOps = append(result.getOps, clientv3.WithSerializable())
}
return result
} }
// Versioner implements storage.Interface.Versioner. // Versioner implements storage.Interface.Versioner.
@ -81,7 +96,7 @@ func (s *store) Versioner() storage.Versioner {
// Get implements storage.Interface.Get. // Get implements storage.Interface.Get.
func (s *store) Get(ctx context.Context, key string, out runtime.Object, ignoreNotFound bool) error { func (s *store) Get(ctx context.Context, key string, out runtime.Object, ignoreNotFound bool) error {
key = keyWithPrefix(s.pathPrefix, key) key = keyWithPrefix(s.pathPrefix, key)
getResp, err := s.client.KV.Get(ctx, key) getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
if err != nil { if err != nil {
return err return err
} }
@ -202,7 +217,7 @@ func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Ob
panic("unable to convert output object to pointer") panic("unable to convert output object to pointer")
} }
key = keyWithPrefix(s.pathPrefix, key) key = keyWithPrefix(s.pathPrefix, key)
getResp, err := s.client.KV.Get(ctx, key) getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
if err != nil { if err != nil {
return err return err
} }
@ -262,7 +277,7 @@ func (s *store) GetToList(ctx context.Context, key string, pred storage.Selectio
} }
key = keyWithPrefix(s.pathPrefix, key) key = keyWithPrefix(s.pathPrefix, key)
getResp, err := s.client.KV.Get(ctx, key) getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
if err != nil { if err != nil {
return err return err
} }

View File

@ -451,7 +451,7 @@ func TestGuaranteedUpdateWithConflict(t *testing.T) {
func TestList(t *testing.T) { func TestList(t *testing.T) {
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t) defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), testapi.Default.Codec(), "") store := newStore(cluster.RandClient(), false, testapi.Default.Codec(), "")
ctx := context.Background() ctx := context.Background()
// Setup storage with the following structure: // Setup storage with the following structure:
@ -538,7 +538,7 @@ func TestList(t *testing.T) {
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) { func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
store := newStore(cluster.RandClient(), testapi.Default.Codec(), "") store := newStore(cluster.RandClient(), false, testapi.Default.Codec(), "")
ctx := context.Background() ctx := context.Background()
return ctx, store, cluster return ctx, store, cluster
} }

View File

@ -158,7 +158,7 @@ func (wc *watchChan) sync() error {
wc.initialRev = getResp.Header.Revision wc.initialRev = getResp.Header.Revision
for _, kv := range getResp.Kvs { for _, kv := range getResp.Kvs {
prevResp, err := wc.watcher.client.Get(wc.ctx, string(kv.Key), clientv3.WithRev(kv.ModRevision-1)) prevResp, err := wc.watcher.client.Get(wc.ctx, string(kv.Key), clientv3.WithRev(kv.ModRevision-1), clientv3.WithSerializable())
if err != nil { if err != nil {
return err return err
} }

View File

@ -176,13 +176,13 @@ func TestWatchFromNoneZero(t *testing.T) {
func TestWatchError(t *testing.T) { func TestWatchError(t *testing.T) {
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t) defer cluster.Terminate(t)
invalidStore := newStore(cluster.RandClient(), &testCodec{testapi.Default.Codec()}, "") invalidStore := newStore(cluster.RandClient(), false, &testCodec{testapi.Default.Codec()}, "")
ctx := context.Background() ctx := context.Background()
w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything) w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything)
if err != nil { if err != nil {
t.Fatalf("Watch failed: %v", err) t.Fatalf("Watch failed: %v", err)
} }
validStore := newStore(cluster.RandClient(), testapi.Default.Codec(), "") validStore := newStore(cluster.RandClient(), false, testapi.Default.Codec(), "")
validStore.GuaranteedUpdate(ctx, "/abc", &api.Pod{}, true, nil, storage.SimpleUpdate( validStore.GuaranteedUpdate(ctx, "/abc", &api.Pod{}, true, nil, storage.SimpleUpdate(
func(runtime.Object) (runtime.Object, error) { func(runtime.Object) (runtime.Object, error) {
return &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}, nil return &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}, nil

View File

@ -55,5 +55,8 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
cancel() cancel()
client.Close() client.Close()
} }
return etcd3.New(client, c.Codec, c.Prefix), destroyFunc, nil if c.Quorum {
return etcd3.New(client, c.Codec, c.Prefix), destroyFunc, nil
}
return etcd3.NewWithNoQuorumRead(client, c.Codec, c.Prefix), destroyFunc, nil
} }