Merge pull request #20145 from mqliang/quorum-read

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-02-02 05:50:41 -08:00
commit c5260c8c71
17 changed files with 60 additions and 37 deletions

View File

@ -174,6 +174,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
fs.StringSliceVar(&s.EtcdServerList, "etcd-servers", s.EtcdServerList, "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config") fs.StringSliceVar(&s.EtcdServerList, "etcd-servers", s.EtcdServerList, "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config")
fs.StringSliceVar(&s.EtcdServersOverrides, "etcd-servers-overrides", s.EtcdServersOverrides, "Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated.") fs.StringSliceVar(&s.EtcdServersOverrides, "etcd-servers-overrides", s.EtcdServersOverrides, "Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated.")
fs.StringVar(&s.EtcdPathPrefix, "etcd-prefix", s.EtcdPathPrefix, "The prefix for all resource paths in etcd.") fs.StringVar(&s.EtcdPathPrefix, "etcd-prefix", s.EtcdPathPrefix, "The prefix for all resource paths in etcd.")
fs.BoolVar(&s.EtcdQuorumRead, "etcd-quorum-read", s.EtcdQuorumRead, "If true, enable quorum read")
fs.StringSliceVar(&s.CorsAllowedOriginList, "cors-allowed-origins", s.CorsAllowedOriginList, "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.") fs.StringSliceVar(&s.CorsAllowedOriginList, "cors-allowed-origins", s.CorsAllowedOriginList, "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.")
fs.BoolVar(&s.AllowPrivileged, "allow-privileged", s.AllowPrivileged, "If true, allow privileged containers.") fs.BoolVar(&s.AllowPrivileged, "allow-privileged", s.AllowPrivileged, "If true, allow privileged containers.")
fs.IPNetVar(&s.ServiceClusterIPRange, "service-cluster-ip-range", s.ServiceClusterIPRange, "A CIDR notation IP range from which to assign service cluster IPs. This must not overlap with any IP ranges assigned to nodes for pods.") fs.IPNetVar(&s.ServiceClusterIPRange, "service-cluster-ip-range", s.ServiceClusterIPRange, "A CIDR notation IP range from which to assign service cluster IPs. This must not overlap with any IP ranges assigned to nodes for pods.")

View File

@ -82,9 +82,9 @@ func verifyClusterIPFlags(s *options.APIServer) {
} }
} }
type newEtcdFunc func([]string, runtime.NegotiatedSerializer, string, string) (storage.Interface, error) type newEtcdFunc func([]string, runtime.NegotiatedSerializer, string, string, bool) (storage.Interface, error)
func newEtcd(etcdServerList []string, ns runtime.NegotiatedSerializer, storageGroupVersionString, pathPrefix string) (etcdStorage storage.Interface, err error) { func newEtcd(etcdServerList []string, ns runtime.NegotiatedSerializer, storageGroupVersionString, pathPrefix string, quorum bool) (etcdStorage storage.Interface, err error) {
if storageGroupVersionString == "" { if storageGroupVersionString == "" {
return etcdStorage, fmt.Errorf("storageVersion is required to create a etcd storage") return etcdStorage, fmt.Errorf("storageVersion is required to create a etcd storage")
} }
@ -96,6 +96,7 @@ func newEtcd(etcdServerList []string, ns runtime.NegotiatedSerializer, storageGr
var storageConfig etcdstorage.EtcdConfig var storageConfig etcdstorage.EtcdConfig
storageConfig.ServerList = etcdServerList storageConfig.ServerList = etcdServerList
storageConfig.Prefix = pathPrefix storageConfig.Prefix = pathPrefix
storageConfig.Quorum = quorum
s, ok := ns.SerializerForMediaType("application/json", nil) s, ok := ns.SerializerForMediaType("application/json", nil)
if !ok { if !ok {
return nil, fmt.Errorf("unable to find serializer for JSON") return nil, fmt.Errorf("unable to find serializer for JSON")
@ -120,7 +121,7 @@ func generateStorageVersionMap(legacyVersion string, storageVersions string) map
} }
// parse the value of --etcd-servers-overrides and update given storageDestinations. // parse the value of --etcd-servers-overrides and update given storageDestinations.
func updateEtcdOverrides(overrides []string, storageVersions map[string]string, prefix string, storageDestinations *genericapiserver.StorageDestinations, newEtcdFn newEtcdFunc) { func updateEtcdOverrides(overrides []string, storageVersions map[string]string, prefix string, quorum bool, storageDestinations *genericapiserver.StorageDestinations, newEtcdFn newEtcdFunc) {
if len(overrides) == 0 { if len(overrides) == 0 {
return return
} }
@ -149,7 +150,7 @@ func updateEtcdOverrides(overrides []string, storageVersions map[string]string,
} }
servers := strings.Split(tokens[1], ";") servers := strings.Split(tokens[1], ";")
etcdOverrideStorage, err := newEtcdFn(servers, api.Codecs, storageVersions[apigroup.GroupVersion.Group], prefix) etcdOverrideStorage, err := newEtcdFn(servers, api.Codecs, storageVersions[apigroup.GroupVersion.Group], prefix, quorum)
if err != nil { if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd for %s: %v", tokens[0], err) glog.Fatalf("Invalid storage version or misconfigured etcd for %s: %v", tokens[0], err)
} }
@ -260,7 +261,7 @@ func Run(s *options.APIServer) error {
if _, found := storageVersions[legacyV1Group.GroupVersion.Group]; !found { if _, found := storageVersions[legacyV1Group.GroupVersion.Group]; !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", legacyV1Group.GroupVersion.Group, storageVersions) glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", legacyV1Group.GroupVersion.Group, storageVersions)
} }
etcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[legacyV1Group.GroupVersion.Group], s.EtcdPathPrefix) etcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[legacyV1Group.GroupVersion.Group], s.EtcdPathPrefix, s.EtcdQuorumRead)
if err != nil { if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err) glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err)
} }
@ -274,14 +275,14 @@ func Run(s *options.APIServer) error {
if _, found := storageVersions[expGroup.GroupVersion.Group]; !found { if _, found := storageVersions[expGroup.GroupVersion.Group]; !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", expGroup.GroupVersion.Group, storageVersions) glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", expGroup.GroupVersion.Group, storageVersions)
} }
expEtcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[expGroup.GroupVersion.Group], s.EtcdPathPrefix) expEtcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[expGroup.GroupVersion.Group], s.EtcdPathPrefix, s.EtcdQuorumRead)
if err != nil { if err != nil {
glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err) glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
} }
storageDestinations.AddAPIGroup(extensions.GroupName, expEtcdStorage) storageDestinations.AddAPIGroup(extensions.GroupName, expEtcdStorage)
} }
updateEtcdOverrides(s.EtcdServersOverrides, storageVersions, s.EtcdPathPrefix, &storageDestinations, newEtcd) updateEtcdOverrides(s.EtcdServersOverrides, storageVersions, s.EtcdPathPrefix, s.EtcdQuorumRead, &storageDestinations, newEtcd)
n := s.ServiceClusterIPRange n := s.ServiceClusterIPRange

View File

@ -133,7 +133,7 @@ func TestUpdateEtcdOverrides(t *testing.T) {
} }
for _, test := range testCases { for _, test := range testCases {
newEtcd := func(serverList []string, _ runtime.NegotiatedSerializer, _, _ string) (storage.Interface, error) { newEtcd := func(serverList []string, _ runtime.NegotiatedSerializer, _, _ string, _ bool) (storage.Interface, error) {
if !reflect.DeepEqual(test.servers, serverList) { if !reflect.DeepEqual(test.servers, serverList) {
t.Errorf("unexpected server list, expected: %#v, got: %#v", test.servers, serverList) t.Errorf("unexpected server list, expected: %#v, got: %#v", test.servers, serverList)
} }
@ -141,7 +141,7 @@ func TestUpdateEtcdOverrides(t *testing.T) {
} }
storageDestinations := genericapiserver.NewStorageDestinations() storageDestinations := genericapiserver.NewStorageDestinations()
override := test.apigroup + "/" + test.resource + "#" + strings.Join(test.servers, ";") override := test.apigroup + "/" + test.resource + "#" + strings.Join(test.servers, ";")
updateEtcdOverrides([]string{override}, storageVersions, "", &storageDestinations, newEtcd) updateEtcdOverrides([]string{override}, storageVersions, "", false, &storageDestinations, newEtcd)
apigroup, ok := storageDestinations.APIGroups[test.apigroup] apigroup, ok := storageDestinations.APIGroups[test.apigroup]
if !ok { if !ok {
t.Errorf("apigroup: %s not created", test.apigroup) t.Errorf("apigroup: %s not created", test.apigroup)

View File

@ -66,6 +66,7 @@ kube-apiserver
--cloud-provider="": The provider for cloud services. Empty string for no provider. --cloud-provider="": The provider for cloud services. Empty string for no provider.
--cors-allowed-origins=[]: List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled. --cors-allowed-origins=[]: List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.
--etcd-prefix="/registry": The prefix for all resource paths in etcd. --etcd-prefix="/registry": The prefix for all resource paths in etcd.
--etcd-quorum-read[=false]: If true, enable quorum read
--etcd-servers=[]: List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config --etcd-servers=[]: List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config
--etcd-servers-overrides=[]: Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated. --etcd-servers-overrides=[]: Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated.
--event-ttl=1h0m0s: Amount of time to retain events. Default 1 hour. --event-ttl=1h0m0s: Amount of time to retain events. Default 1 hour.
@ -107,7 +108,7 @@ kube-apiserver
--watch-cache[=true]: Enable watch caching in the apiserver --watch-cache[=true]: Enable watch caching in the apiserver
``` ```
###### Auto generated by spf13/cobra on 14-Jan-2016 ###### Auto generated by spf13/cobra on 26-Jan-2016
<!-- BEGIN MUNGE: GENERATED_ANALYTICS --> <!-- BEGIN MUNGE: GENERATED_ANALYTICS -->

View File

@ -90,6 +90,7 @@ enable-server
etcd-config etcd-config
etcd-mutation-timeout etcd-mutation-timeout
etcd-prefix etcd-prefix
etcd-quorum-read
etcd-server etcd-server
etcd-servers etcd-servers
etcd-servers-overrides etcd-servers-overrides

View File

@ -30,6 +30,7 @@ type ServerRunOptions struct {
BindAddress net.IP BindAddress net.IP
CertDirectory string CertDirectory string
ClientCAFile string ClientCAFile string
EtcdQuorumRead bool
InsecureBindAddress net.IP InsecureBindAddress net.IP
InsecurePort int InsecurePort int
LongRunningRequestRE string LongRunningRequestRE string

View File

@ -66,9 +66,9 @@ func setUp(t *testing.T) (Master, *etcdtesting.EtcdTestServer, Config, *assert.A
storageVersions := make(map[string]string) storageVersions := make(map[string]string)
storageDestinations := genericapiserver.NewStorageDestinations() storageDestinations := genericapiserver.NewStorageDestinations()
storageDestinations.AddAPIGroup( storageDestinations.AddAPIGroup(
api.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix())) api.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix(), false))
storageDestinations.AddAPIGroup( storageDestinations.AddAPIGroup(
extensions.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Extensions.Codec(), etcdtest.PathPrefix())) extensions.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Extensions.Codec(), etcdtest.PathPrefix(), false))
config.StorageDestinations = storageDestinations config.StorageDestinations = storageDestinations
storageVersions[api.GroupName] = testapi.Default.GroupVersion().String() storageVersions[api.GroupName] = testapi.Default.GroupVersion().String()
@ -348,7 +348,7 @@ func initThirdParty(t *testing.T, version string) (*Master, *etcdtesting.EtcdTes
}, },
} }
master.HandlerContainer = restful.NewContainer() master.HandlerContainer = restful.NewContainer()
master.thirdPartyStorage = etcdstorage.NewEtcdStorage(etcdserver.Client, testapi.Extensions.Codec(), etcdtest.PathPrefix()) master.thirdPartyStorage = etcdstorage.NewEtcdStorage(etcdserver.Client, testapi.Extensions.Codec(), etcdtest.PathPrefix(), false)
if !assert.NoError(master.InstallThirdPartyResource(api)) { if !assert.NoError(master.InstallThirdPartyResource(api)) {
t.FailNow() t.FailNow()

View File

@ -88,7 +88,7 @@ func hasCreated(t *testing.T, pod *api.Pod) func(runtime.Object) bool {
func NewTestGenericEtcdRegistry(t *testing.T) (*etcdtesting.EtcdTestServer, *Etcd) { func NewTestGenericEtcdRegistry(t *testing.T) (*etcdtesting.EtcdTestServer, *Etcd) {
podPrefix := "/pods" podPrefix := "/pods"
server := etcdtesting.NewEtcdTestClientServer(t) server := etcdtesting.NewEtcdTestClientServer(t)
s := etcdstorage.NewEtcdStorage(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()) s := etcdstorage.NewEtcdStorage(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix(), false)
strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false, true} strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false, true}
return server, &Etcd{ return server, &Etcd{

View File

@ -37,7 +37,7 @@ import (
func NewEtcdStorage(t *testing.T, group string) (storage.Interface, *etcdtesting.EtcdTestServer) { func NewEtcdStorage(t *testing.T, group string) (storage.Interface, *etcdtesting.EtcdTestServer) {
server := etcdtesting.NewEtcdTestClientServer(t) server := etcdtesting.NewEtcdTestClientServer(t)
storage := etcdstorage.NewEtcdStorage(server.Client, testapi.Groups[group].Codec(), etcdtest.PathPrefix()) storage := etcdstorage.NewEtcdStorage(server.Client, testapi.Groups[group].Codec(), etcdtest.PathPrefix(), false)
return storage, server return storage, server
} }

View File

@ -42,7 +42,7 @@ import (
func newEtcdTestStorage(t *testing.T, codec runtime.Codec, prefix string) (*etcdtesting.EtcdTestServer, storage.Interface) { func newEtcdTestStorage(t *testing.T, codec runtime.Codec, prefix string) (*etcdtesting.EtcdTestServer, storage.Interface) {
server := etcdtesting.NewEtcdTestClientServer(t) server := etcdtesting.NewEtcdTestClientServer(t)
storage := etcdstorage.NewEtcdStorage(server.Client, codec, prefix) storage := etcdstorage.NewEtcdStorage(server.Client, codec, prefix, false)
return server, storage return server, storage
} }

View File

@ -46,6 +46,7 @@ type EtcdConfig struct {
ServerList []string ServerList []string
Codec runtime.Codec Codec runtime.Codec
Prefix string Prefix string
Quorum bool
} }
// implements storage.Config // implements storage.Config
@ -72,12 +73,12 @@ func (c *EtcdConfig) NewStorage() (storage.Interface, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewEtcdStorage(etcdClient, c.Codec, c.Prefix), nil return NewEtcdStorage(etcdClient, c.Codec, c.Prefix, c.Quorum), nil
} }
// Creates a new storage interface from the client // Creates a new storage interface from the client
// TODO: deprecate in favor of storage.Config abstraction over time // TODO: deprecate in favor of storage.Config abstraction over time
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string) storage.Interface { func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool) storage.Interface {
return &etcdHelper{ return &etcdHelper{
etcdclient: client, etcdclient: client,
client: etcd.NewKeysAPI(client), client: etcd.NewKeysAPI(client),
@ -85,6 +86,7 @@ func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string) stor
versioner: APIObjectVersioner{}, versioner: APIObjectVersioner{},
copier: api.Scheme, copier: api.Scheme,
pathPrefix: path.Join("/", prefix), pathPrefix: path.Join("/", prefix),
quorum: quorum,
cache: util.NewCache(maxEtcdCacheEntries), cache: util.NewCache(maxEtcdCacheEntries),
} }
} }
@ -99,6 +101,8 @@ type etcdHelper struct {
versioner storage.Versioner versioner storage.Versioner
// prefix for all etcd keys // prefix for all etcd keys
pathPrefix string pathPrefix string
// if true, perform quorum read
quorum bool
// We cache objects stored in etcd. For keys we use Node.ModifiedIndex which is equivalent // We cache objects stored in etcd. For keys we use Node.ModifiedIndex which is equivalent
// to resourceVersion. // to resourceVersion.
@ -269,7 +273,7 @@ func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion stri
return nil, err return nil, err
} }
key = h.prefixEtcdKey(key) key = h.prefixEtcdKey(key)
w := newEtcdWatcher(false, nil, filter, h.codec, h.versioner, nil, h) w := newEtcdWatcher(false, h.quorum, nil, filter, h.codec, h.versioner, nil, h)
go w.etcdWatch(ctx, h.client, key, watchRV) go w.etcdWatch(ctx, h.client, key, watchRV)
return w, nil return w, nil
} }
@ -284,7 +288,7 @@ func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion
return nil, err return nil, err
} }
key = h.prefixEtcdKey(key) key = h.prefixEtcdKey(key)
w := newEtcdWatcher(true, exceptKey(key), filter, h.codec, h.versioner, nil, h) w := newEtcdWatcher(true, h.quorum, exceptKey(key), filter, h.codec, h.versioner, nil, h)
go w.etcdWatch(ctx, h.client, key, watchRV) go w.etcdWatch(ctx, h.client, key, watchRV)
return w, nil return w, nil
} }
@ -306,7 +310,12 @@ func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr r
glog.Errorf("Context is nil") glog.Errorf("Context is nil")
} }
startTime := time.Now() startTime := time.Now()
response, err := h.client.Get(ctx, key, nil)
opts := &etcd.GetOptions{
Quorum: h.quorum,
}
response, err := h.client.Get(ctx, key, opts)
metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime) metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
if err != nil && !etcdutil.IsEtcdNotFound(err) { if err != nil && !etcdutil.IsEtcdNotFound(err) {
@ -365,7 +374,12 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.F
key = h.prefixEtcdKey(key) key = h.prefixEtcdKey(key)
startTime := time.Now() startTime := time.Now()
trace.Step("About to read etcd node") trace.Step("About to read etcd node")
response, err := h.client.Get(ctx, key, nil)
opts := &etcd.GetOptions{
Quorum: h.quorum,
}
response, err := h.client.Get(ctx, key, opts)
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime) metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
trace.Step("Etcd node read") trace.Step("Etcd node read")
if err != nil { if err != nil {
@ -473,6 +487,7 @@ func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node
opts := etcd.GetOptions{ opts := etcd.GetOptions{
Recursive: true, Recursive: true,
Sort: true, Sort: true,
Quorum: h.quorum,
} }
result, err := h.client.Get(ctx, key, &opts) result, err := h.client.Get(ctx, key, &opts)
if err != nil { if err != nil {

View File

@ -63,7 +63,7 @@ func testScheme(t *testing.T) (*runtime.Scheme, runtime.Codec) {
} }
func newEtcdHelper(client etcd.Client, codec runtime.Codec, prefix string) etcdHelper { func newEtcdHelper(client etcd.Client, codec runtime.Codec, prefix string) etcdHelper {
return *NewEtcdStorage(client, codec, prefix).(*etcdHelper) return *NewEtcdStorage(client, codec, prefix, false).(*etcdHelper)
} }
// Returns an encoded version of api.Pod with the given name. // Returns an encoded version of api.Pod with the given name.

View File

@ -82,6 +82,7 @@ type etcdWatcher struct {
transform TransformFunc transform TransformFunc
list bool // If we're doing a recursive watch, should be true. list bool // If we're doing a recursive watch, should be true.
quorum bool // If we enable quorum, shoule be true
include includeFunc include includeFunc
filter storage.FilterFunc filter storage.FilterFunc
@ -109,12 +110,13 @@ const watchWaitDuration = 100 * time.Millisecond
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform // newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
// and a versioner, the versioner must be able to handle the objects that transform creates. // and a versioner, the versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(list bool, include includeFunc, filter storage.FilterFunc, encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc, cache etcdCache) *etcdWatcher { func newEtcdWatcher(list bool, quorum bool, include includeFunc, filter storage.FilterFunc, encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc, cache etcdCache) *etcdWatcher {
w := &etcdWatcher{ w := &etcdWatcher{
encoding: encoding, encoding: encoding,
versioner: versioner, versioner: versioner,
transform: transform, transform: transform,
list: list, list: list,
quorum: quorum,
include: include, include: include,
filter: filter, filter: filter,
// Buffer this channel, so that the etcd client is not forced // Buffer this channel, so that the etcd client is not forced
@ -171,7 +173,7 @@ func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key st
// Stop() is called in the meantime (which in tests can cause etcd termination and // Stop() is called in the meantime (which in tests can cause etcd termination and
// strange behavior here). // strange behavior here).
if resourceVersion == 0 { if resourceVersion == 0 {
latest, err := etcdGetInitialWatchState(ctx, client, key, w.list, w.etcdIncoming) latest, err := etcdGetInitialWatchState(ctx, client, key, w.list, w.quorum, w.etcdIncoming)
if err != nil { if err != nil {
w.etcdError <- err w.etcdError <- err
return true return true
@ -203,10 +205,11 @@ func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key st
} }
// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent // etcdGetInitialWatchState turns an etcd Get request into a watch equivalent
func etcdGetInitialWatchState(ctx context.Context, client etcd.KeysAPI, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) { func etcdGetInitialWatchState(ctx context.Context, client etcd.KeysAPI, key string, recursive bool, quorum bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) {
opts := etcd.GetOptions{ opts := etcd.GetOptions{
Recursive: recursive, Recursive: recursive,
Sort: false, Sort: false,
Quorum: quorum,
} }
resp, err := client.Get(ctx, key, &opts) resp, err := client.Get(ctx, key, &opts)
if err != nil { if err != nil {

View File

@ -131,7 +131,7 @@ func TestWatchInterpretations(t *testing.T) {
for name, item := range table { for name, item := range table {
for _, action := range item.actions { for _, action := range item.actions {
w := newEtcdWatcher(true, nil, firstLetterIsB, codec, versioner, nil, &fakeEtcdCache{}) w := newEtcdWatcher(true, false, nil, firstLetterIsB, codec, versioner, nil, &fakeEtcdCache{})
emitCalled := false emitCalled := false
w.emit = func(event watch.Event) { w.emit = func(event watch.Event) {
emitCalled = true emitCalled = true
@ -170,7 +170,7 @@ func TestWatchInterpretations(t *testing.T) {
func TestWatchInterpretation_ResponseNotSet(t *testing.T) { func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
_, codec := testScheme(t) _, codec := testScheme(t)
w := newEtcdWatcher(false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{}) w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
w.emit = func(e watch.Event) { w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e) t.Errorf("Unexpected emit: %v", e)
} }
@ -185,7 +185,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
_, codec := testScheme(t) _, codec := testScheme(t)
actions := []string{"create", "set", "compareAndSwap", "delete"} actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions { for _, action := range actions {
w := newEtcdWatcher(false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{}) w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
w.emit = func(e watch.Event) { w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e) t.Errorf("Unexpected emit: %v", e)
} }
@ -200,7 +200,7 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
_, codec := testScheme(t) _, codec := testScheme(t)
actions := []string{"create", "set", "compareAndSwap", "delete"} actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions { for _, action := range actions {
w := newEtcdWatcher(false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{}) w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
w.emit = func(e watch.Event) { w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e) t.Errorf("Unexpected emit: %v", e)
} }

View File

@ -38,7 +38,7 @@ import (
func TestSet(t *testing.T) { func TestSet(t *testing.T) {
client := framework.NewEtcdClient() client := framework.NewEtcdClient()
keysAPI := etcd.NewKeysAPI(client) keysAPI := etcd.NewKeysAPI(client)
etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), "") etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), "", false)
ctx := context.TODO() ctx := context.TODO()
framework.WithEtcdKey(func(key string) { framework.WithEtcdKey(func(key string) {
testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}} testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}}
@ -63,7 +63,7 @@ func TestSet(t *testing.T) {
func TestGet(t *testing.T) { func TestGet(t *testing.T) {
client := framework.NewEtcdClient() client := framework.NewEtcdClient()
keysAPI := etcd.NewKeysAPI(client) keysAPI := etcd.NewKeysAPI(client)
etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), "") etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), "", false)
ctx := context.TODO() ctx := context.TODO()
framework.WithEtcdKey(func(key string) { framework.WithEtcdKey(func(key string) {
testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}} testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}}
@ -90,7 +90,7 @@ func TestGet(t *testing.T) {
func TestWriteTTL(t *testing.T) { func TestWriteTTL(t *testing.T) {
client := framework.NewEtcdClient() client := framework.NewEtcdClient()
keysAPI := etcd.NewKeysAPI(client) keysAPI := etcd.NewKeysAPI(client)
etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), "") etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), "", false)
ctx := context.TODO() ctx := context.TODO()
framework.WithEtcdKey(func(key string) { framework.WithEtcdKey(func(key string) {
testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}} testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}}
@ -145,7 +145,7 @@ func TestWriteTTL(t *testing.T) {
func TestWatch(t *testing.T) { func TestWatch(t *testing.T) {
client := framework.NewEtcdClient() client := framework.NewEtcdClient()
keysAPI := etcd.NewKeysAPI(client) keysAPI := etcd.NewKeysAPI(client)
etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), etcdtest.PathPrefix()) etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), etcdtest.PathPrefix(), false)
ctx := context.TODO() ctx := context.TODO()
framework.WithEtcdKey(func(key string) { framework.WithEtcdKey(func(key string) {
key = etcdtest.AddPrefix(key) key = etcdtest.AddPrefix(key)

View File

@ -49,14 +49,14 @@ func NewEtcdClient() etcd.Client {
} }
func NewEtcdStorage() storage.Interface { func NewEtcdStorage() storage.Interface {
return etcdstorage.NewEtcdStorage(NewEtcdClient(), testapi.Default.Codec(), etcdtest.PathPrefix()) return etcdstorage.NewEtcdStorage(NewEtcdClient(), testapi.Default.Codec(), etcdtest.PathPrefix(), false)
} }
func NewExtensionsEtcdStorage(client etcd.Client) storage.Interface { func NewExtensionsEtcdStorage(client etcd.Client) storage.Interface {
if client == nil { if client == nil {
client = NewEtcdClient() client = NewEtcdClient()
} }
return etcdstorage.NewEtcdStorage(client, testapi.Extensions.Codec(), etcdtest.PathPrefix()) return etcdstorage.NewEtcdStorage(client, testapi.Extensions.Codec(), etcdtest.PathPrefix(), false)
} }
func RequireEtcd() { func RequireEtcd() {

View File

@ -144,7 +144,7 @@ func NewMasterConfig() *master.Config {
etcdClient := NewEtcdClient() etcdClient := NewEtcdClient()
storageVersions := make(map[string]string) storageVersions := make(map[string]string)
etcdStorage := etcdstorage.NewEtcdStorage(etcdClient, testapi.Default.Codec(), etcdtest.PathPrefix()) etcdStorage := etcdstorage.NewEtcdStorage(etcdClient, testapi.Default.Codec(), etcdtest.PathPrefix(), false)
storageVersions[api.GroupName] = testapi.Default.GroupVersion().String() storageVersions[api.GroupName] = testapi.Default.GroupVersion().String()
expEtcdStorage := NewExtensionsEtcdStorage(etcdClient) expEtcdStorage := NewExtensionsEtcdStorage(etcdClient)
storageVersions[extensions.GroupName] = testapi.Extensions.GroupVersion().String() storageVersions[extensions.GroupName] = testapi.Extensions.GroupVersion().String()