diff --git a/pkg/storage/etcd3/store.go b/pkg/storage/etcd3/store.go index 9eb5fda3c6f..12a931daae3 100644 --- a/pkg/storage/etcd3/store.go +++ b/pkg/storage/etcd3/store.go @@ -38,7 +38,10 @@ import ( ) type store struct { - client *clientv3.Client + client *clientv3.Client + // getOpts contains additional options that should be passed + // to all Get() calls. + getOps []clientv3.OpOption codec runtime.Codec versioner storage.Versioner pathPrefix string @@ -59,18 +62,30 @@ type objState struct { // New returns an etcd3 implementation of storage.Interface. func New(c *clientv3.Client, codec runtime.Codec, prefix string) storage.Interface { - return newStore(c, codec, prefix) + return newStore(c, true, codec, prefix) } -func newStore(c *clientv3.Client, codec runtime.Codec, prefix string) *store { +// NewWithNoQuorumRead returns etcd3 implementation of storage.Interface +// where Get operations don't require quorum read. +func NewWithNoQuorumRead(c *clientv3.Client, codec runtime.Codec, prefix string) storage.Interface { + return newStore(c, false, codec, prefix) +} + +func newStore(c *clientv3.Client, quorumRead bool, codec runtime.Codec, prefix string) *store { versioner := etcd.APIObjectVersioner{} - return &store{ + result := &store{ client: c, versioner: versioner, codec: codec, pathPrefix: prefix, watcher: newWatcher(c, codec, versioner), } + if !quorumRead { + // In case of non-quorum reads, we can set WithSerializable() + // options for all Get operations. + result.getOps = append(result.getOps, clientv3.WithSerializable()) + } + return result } // Versioner implements storage.Interface.Versioner. @@ -81,7 +96,7 @@ func (s *store) Versioner() storage.Versioner { // Get implements storage.Interface.Get. func (s *store) Get(ctx context.Context, key string, out runtime.Object, ignoreNotFound bool) error { key = keyWithPrefix(s.pathPrefix, key) - getResp, err := s.client.KV.Get(ctx, key) + getResp, err := s.client.KV.Get(ctx, key, s.getOps...) if err != nil { return err } @@ -202,7 +217,7 @@ func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Ob panic("unable to convert output object to pointer") } key = keyWithPrefix(s.pathPrefix, key) - getResp, err := s.client.KV.Get(ctx, key) + getResp, err := s.client.KV.Get(ctx, key, s.getOps...) if err != nil { return err } @@ -262,7 +277,7 @@ func (s *store) GetToList(ctx context.Context, key string, pred storage.Selectio } key = keyWithPrefix(s.pathPrefix, key) - getResp, err := s.client.KV.Get(ctx, key) + getResp, err := s.client.KV.Get(ctx, key, s.getOps...) if err != nil { return err } diff --git a/pkg/storage/etcd3/store_test.go b/pkg/storage/etcd3/store_test.go index 9e5f088f998..c458164fc98 100644 --- a/pkg/storage/etcd3/store_test.go +++ b/pkg/storage/etcd3/store_test.go @@ -451,7 +451,7 @@ func TestGuaranteedUpdateWithConflict(t *testing.T) { func TestList(t *testing.T) { cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), testapi.Default.Codec(), "") + store := newStore(cluster.RandClient(), false, testapi.Default.Codec(), "") ctx := context.Background() // Setup storage with the following structure: @@ -538,7 +538,7 @@ func TestList(t *testing.T) { func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) { cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - store := newStore(cluster.RandClient(), testapi.Default.Codec(), "") + store := newStore(cluster.RandClient(), false, testapi.Default.Codec(), "") ctx := context.Background() return ctx, store, cluster } diff --git a/pkg/storage/etcd3/watcher.go b/pkg/storage/etcd3/watcher.go index 7ccb61cb011..63ede7ee2e9 100644 --- a/pkg/storage/etcd3/watcher.go +++ b/pkg/storage/etcd3/watcher.go @@ -158,7 +158,7 @@ func (wc *watchChan) sync() error { wc.initialRev = getResp.Header.Revision for _, kv := range getResp.Kvs { - prevResp, err := wc.watcher.client.Get(wc.ctx, string(kv.Key), clientv3.WithRev(kv.ModRevision-1)) + prevResp, err := wc.watcher.client.Get(wc.ctx, string(kv.Key), clientv3.WithRev(kv.ModRevision-1), clientv3.WithSerializable()) if err != nil { return err } diff --git a/pkg/storage/etcd3/watcher_test.go b/pkg/storage/etcd3/watcher_test.go index b0bdf9631d0..f0d71db36dd 100644 --- a/pkg/storage/etcd3/watcher_test.go +++ b/pkg/storage/etcd3/watcher_test.go @@ -176,13 +176,13 @@ func TestWatchFromNoneZero(t *testing.T) { func TestWatchError(t *testing.T) { cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - invalidStore := newStore(cluster.RandClient(), &testCodec{testapi.Default.Codec()}, "") + invalidStore := newStore(cluster.RandClient(), false, &testCodec{testapi.Default.Codec()}, "") ctx := context.Background() w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything) if err != nil { t.Fatalf("Watch failed: %v", err) } - validStore := newStore(cluster.RandClient(), testapi.Default.Codec(), "") + validStore := newStore(cluster.RandClient(), false, testapi.Default.Codec(), "") validStore.GuaranteedUpdate(ctx, "/abc", &api.Pod{}, true, nil, storage.SimpleUpdate( func(runtime.Object) (runtime.Object, error) { return &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}, nil diff --git a/pkg/storage/storagebackend/factory/etcd3.go b/pkg/storage/storagebackend/factory/etcd3.go index f860f91814d..5a69fdc2b15 100644 --- a/pkg/storage/storagebackend/factory/etcd3.go +++ b/pkg/storage/storagebackend/factory/etcd3.go @@ -55,5 +55,8 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e cancel() client.Close() } - return etcd3.New(client, c.Codec, c.Prefix), destroyFunc, nil + if c.Quorum { + return etcd3.New(client, c.Codec, c.Prefix), destroyFunc, nil + } + return etcd3.NewWithNoQuorumRead(client, c.Codec, c.Prefix), destroyFunc, nil }