diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index b7a6f7ef16a..75f714ad7c4 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -63,9 +63,10 @@ import ( "k8s.io/kubernetes/test/integration" "k8s.io/kubernetes/test/integration/framework" - "github.com/coreos/go-etcd/etcd" + etcd "github.com/coreos/etcd/client" "github.com/golang/glog" "github.com/spf13/pflag" + "golang.org/x/net/context" ) var ( @@ -93,17 +94,23 @@ func (h *delegateHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { func startComponents(firstManifestURL, secondManifestURL string) (string, string) { // Setup - servers := []string{} - glog.Infof("Creating etcd client pointing to %v", servers) - handler := delegateHandler{} apiServer := httptest.NewServer(&handler) - etcdClient := etcd.NewClient(servers) + cfg := etcd.Config{ + Endpoints: []string{"http://127.0.0.1:4001"}, + } + etcdClient, err := etcd.New(cfg) + if err != nil { + glog.Fatalf("Error creating etcd client: %v", err) + } + glog.Infof("Creating etcd client pointing to %v", cfg.Endpoints) + + keysAPI := etcd.NewKeysAPI(etcdClient) sleep := 4 * time.Second ok := false for i := 0; i < 3; i++ { - keys, err := etcdClient.Get("/", false, false) + keys, err := keysAPI.Get(context.TODO(), "/", nil) if err != nil { glog.Warningf("Unable to list root etcd keys: %v", err) if i < 2 { @@ -113,7 +120,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string continue } for _, node := range keys.Node.Nodes { - if _, err := etcdClient.Delete(node.Key, true); err != nil { + if _, err := keysAPI.Delete(context.TODO(), node.Key, &etcd.DeleteOptions{Recursive: true}); err != nil { glog.Fatalf("Unable delete key: %v", err) } } diff --git a/contrib/mesos/pkg/election/etcd_master.go b/contrib/mesos/pkg/election/etcd_master.go index 73038855894..f93a3ac8e38 100644 --- a/contrib/mesos/pkg/election/etcd_master.go +++ b/contrib/mesos/pkg/election/etcd_master.go @@ -20,8 +20,10 @@ import ( "fmt" "time" - "github.com/coreos/go-etcd/etcd" + etcd "github.com/coreos/etcd/client" "github.com/golang/glog" + "golang.org/x/net/context" + "k8s.io/kubernetes/pkg/api/unversioned" etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" "k8s.io/kubernetes/pkg/util" @@ -38,15 +40,15 @@ type Master string func (obj Master) GetObjectKind() unversioned.ObjectKind { return unversioned.EmptyObjectKind } // NewEtcdMasterElector returns an implementation of election.MasterElector backed by etcd. -func NewEtcdMasterElector(h *etcd.Client) MasterElector { - return &etcdMasterElector{etcd: h} +func NewEtcdMasterElector(h etcd.Client) MasterElector { + return &etcdMasterElector{etcd: etcd.NewKeysAPI(h)} } type empty struct{} // internal implementation struct type etcdMasterElector struct { - etcd *etcd.Client + etcd etcd.KeysAPI done chan empty events chan watch.Event } @@ -90,7 +92,12 @@ func (e *etcdMasterElector) extendMaster(path, id string, ttl uint64, res *etcd. // Uses compare and swap, so that if we TTL out in the meantime, the write will fail. // We don't handle the TTL delete w/o a write case here, it's handled in the next loop // iteration. - _, err := e.etcd.CompareAndSwap(path, id, ttl, "", res.Node.ModifiedIndex) + opts := etcd.SetOptions{ + TTL: time.Duration(ttl) * time.Second, + PrevValue: "", + PrevIndex: res.Node.ModifiedIndex, + } + _, err := e.etcd.Set(context.TODO(), path, id, &opts) if err != nil && !etcdutil.IsEtcdTestFailed(err) { return "", err } @@ -105,7 +112,12 @@ func (e *etcdMasterElector) extendMaster(path, id string, ttl uint64, res *etcd. // returns id, nil if the attempt succeeded // returns "", err if an error occurred func (e *etcdMasterElector) becomeMaster(path, id string, ttl uint64) (string, error) { - _, err := e.etcd.Create(path, id, ttl) + opts := etcd.SetOptions{ + TTL: time.Duration(ttl) * time.Second, + PrevExist: etcd.PrevNoExist, + } + + _, err := e.etcd.Set(context.TODO(), path, id, &opts) if err != nil && !etcdutil.IsEtcdNodeExist(err) { // unexpected error return "", err @@ -122,7 +134,7 @@ func (e *etcdMasterElector) becomeMaster(path, id string, ttl uint64) (string, e // in situations where you should try again due to concurrent state changes (e.g. another actor simultaneously acquiring the lock) // it returns "", nil func (e *etcdMasterElector) handleMaster(path, id string, ttl uint64) (string, error) { - res, err := e.etcd.Get(path, false, false) + res, err := e.etcd.Get(context.TODO(), path, nil) // Unexpected error, bail out if err != nil && !etcdutil.IsEtcdNotFound(err) { diff --git a/contrib/mesos/pkg/election/etcd_master_test.go b/contrib/mesos/pkg/election/etcd_master_test.go index 4a420284236..c74a8d173ac 100644 --- a/contrib/mesos/pkg/election/etcd_master_test.go +++ b/contrib/mesos/pkg/election/etcd_master_test.go @@ -19,6 +19,9 @@ package election import ( "testing" + etcd "github.com/coreos/etcd/client" + "golang.org/x/net/context" + etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing" "k8s.io/kubernetes/pkg/watch" ) @@ -28,7 +31,8 @@ func TestEtcdMasterOther(t *testing.T) { defer server.Terminate(t) path := "foo" - if _, err := server.Client.Set(path, "baz", 0); err != nil { + keysAPI := etcd.NewKeysAPI(server.Client) + if _, err := keysAPI.Set(context.TODO(), path, "baz", nil); err != nil { t.Errorf("unexpected error: %v", err) } master := NewEtcdMasterElector(server.Client) diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index 0ba7d58f734..546302d1040 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -32,7 +32,7 @@ import ( "sync" "time" - "github.com/coreos/go-etcd/etcd" + etcd "github.com/coreos/etcd/client" "github.com/gogo/protobuf/proto" log "github.com/golang/glog" "github.com/kardianos/osext" @@ -102,7 +102,6 @@ type SchedulerServer struct { authPath string apiServerList []string etcdServerList []string - etcdConfigFile string allowPrivileged bool executorPath string proxyPath string @@ -234,8 +233,7 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) { fs.BoolVar(&s.enableProfiling, "profiling", s.enableProfiling, "Enable profiling via web interface host:port/debug/pprof/") fs.StringSliceVar(&s.apiServerList, "api-servers", s.apiServerList, "List of Kubernetes API servers for publishing events, and reading pods and services. (ip:port), comma separated.") fs.StringVar(&s.authPath, "auth-path", s.authPath, "Path to .kubernetes_auth file, specifying how to authenticate to API server.") - fs.StringSliceVar(&s.etcdServerList, "etcd-servers", s.etcdServerList, "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with --etcd-config") - fs.StringVar(&s.etcdConfigFile, "etcd-config", s.etcdConfigFile, "The config file for the etcd client. Mutually exclusive with --etcd-servers.") + fs.StringSliceVar(&s.etcdServerList, "etcd-servers", s.etcdServerList, "List of etcd servers to watch (http://ip:port), comma separated.") fs.BoolVar(&s.allowPrivileged, "allow-privileged", s.allowPrivileged, "Enable privileged containers in the kubelet (compare the same flag in the apiserver).") fs.StringVar(&s.clusterDomain, "cluster-domain", s.clusterDomain, "Domain for this cluster. If set, kubelet will configure all containers to search this domain in addition to the host's search domains") fs.IPVar(&s.clusterDNS, "cluster-dns", s.clusterDNS, "IP address for a cluster DNS server. If set, kubelet will configure all containers to use this for DNS resolution in addition to the host's DNS servers") @@ -640,16 +638,14 @@ func validateLeadershipTransition(desired, current string) { } // hacked from https://github.com/GoogleCloudPlatform/kubernetes/blob/release-0.14/cmd/kube-apiserver/app/server.go -func newEtcd(etcdConfigFile string, etcdServerList []string) (client *etcd.Client, err error) { - if etcdConfigFile != "" { - client, err = etcd.NewClientFromFile(etcdConfigFile) - } else { - client = etcd.NewClient(etcdServerList) +func newEtcd(etcdServerList []string) (etcd.Client, error) { + cfg := etcd.Config{ + Endpoints: etcdServerList, } - return + return etcd.New(cfg) } -func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config) (*ha.SchedulerProcess, ha.DriverFactory, *etcd.Client, *mesos.ExecutorID) { +func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config) (*ha.SchedulerProcess, ha.DriverFactory, etcd.Client, *mesos.ExecutorID) { s.frameworkName = strings.TrimSpace(s.frameworkName) if s.frameworkName == "" { log.Fatalf("framework-name must be a non-empty string") @@ -661,8 +657,8 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config s.mux.Handle("/metrics", prometheus.Handler()) healthz.InstallHandler(s.mux) - if (s.etcdConfigFile != "" && len(s.etcdServerList) != 0) || (s.etcdConfigFile == "" && len(s.etcdServerList) == 0) { - log.Fatalf("specify either --etcd-servers or --etcd-config") + if len(s.etcdServerList) == 0 { + log.Fatalf("specify --etcd-servers must be specified") } if len(s.apiServerList) < 1 { @@ -689,10 +685,11 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config // (1) the generic config store is available for the FrameworkId storage // (2) the generic master election is provided by the apiserver // Compare docs/proposals/high-availability.md - etcdClient, err := newEtcd(s.etcdConfigFile, s.etcdServerList) + etcdClient, err := newEtcd(s.etcdServerList) if err != nil { log.Fatalf("misconfigured etcd: %v", err) } + keysAPI := etcd.NewKeysAPI(etcdClient) // mirror all nodes into the nodeStore var eiRegistry executorinfo.Registry @@ -741,7 +738,7 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config LookupNode: lookupNode, StoreFrameworkId: func(id string) { // TODO(jdef): port FrameworkId store to generic Kubernetes config store as soon as available - _, err := etcdClient.Set(meta.FrameworkIDKey, id, uint64(s.failoverTimeout)) + _, err := keysAPI.Set(context.TODO(), meta.FrameworkIDKey, id, &etcd.SetOptions{TTL: time.Duration(s.failoverTimeout) * time.Second}) if err != nil { log.Errorf("failed to renew frameworkId TTL: %v", err) } @@ -806,7 +803,7 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config log.V(1).Infoln("deferred init complete") // defer obtaining framework ID to prevent multiple schedulers // from overwriting each other's framework IDs - dconfig.Framework.Id, err = s.fetchFrameworkID(etcdClient) + dconfig.Framework.Id, err = s.fetchFrameworkID(keysAPI) if err != nil { return nil, fmt.Errorf("failed to fetch framework ID from etcd: %v", err) } @@ -928,9 +925,9 @@ func (s *SchedulerServer) buildFrameworkInfo() (info *mesos.FrameworkInfo, cred return } -func (s *SchedulerServer) fetchFrameworkID(client *etcd.Client) (*mesos.FrameworkID, error) { +func (s *SchedulerServer) fetchFrameworkID(client etcd.KeysAPI) (*mesos.FrameworkID, error) { if s.failoverTimeout > 0 { - if response, err := client.Get(meta.FrameworkIDKey, false, false); err != nil { + if response, err := client.Get(context.TODO(), meta.FrameworkIDKey, nil); err != nil { if !etcdutil.IsEtcdNotFound(err) { return nil, fmt.Errorf("unexpected failure attempting to load framework ID from etcd: %v", err) } @@ -941,7 +938,7 @@ func (s *SchedulerServer) fetchFrameworkID(client *etcd.Client) (*mesos.Framewor } } else { //TODO(jdef) this seems like a totally hackish way to clean up the framework ID - if _, err := client.Delete(meta.FrameworkIDKey, true); err != nil { + if _, err := client.Delete(context.TODO(), meta.FrameworkIDKey, &etcd.DeleteOptions{Recursive: true}); err != nil { if !etcdutil.IsEtcdNotFound(err) { return nil, fmt.Errorf("failed to delete framework ID from etcd: %v", err) } diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index 0149fb60ed7..cc105044179 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -178,6 +178,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) { // need to retry it on errors under lock. for { if err := c.reflector.ListAndWatch(stopChannel); err != nil { + // TODO: This can tight loop log. glog.Errorf("unexpected ListAndWatch error: %v", err) } else { break diff --git a/pkg/storage/cacher_test.go b/pkg/storage/cacher_test.go index 2becc02478f..91d40538999 100644 --- a/pkg/storage/cacher_test.go +++ b/pkg/storage/cacher_test.go @@ -196,14 +196,7 @@ func TestWatch(t *testing.T) { t.Errorf("Expected 'error too old' error") } - // Now test watch with initial state. - // We want to observe fooCreation too, so need to pass smaller resource version. - initialVersion, err := strconv.Atoi(fooCreated.ResourceVersion) - if err != nil { - t.Fatalf("Incorrect resourceVersion: %s", fooCreated.ResourceVersion) - } - initialVersion-- - initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", strconv.Itoa(initialVersion), storage.Everything) + initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -282,13 +275,7 @@ func TestFiltering(t *testing.T) { } return selector.Matches(labels.Set(metadata.GetLabels())) } - // We want to observe fooCreation too, so need to pass smaller resource version. - initialVersion, err := strconv.Atoi(fooCreated.ResourceVersion) - if err != nil { - t.Fatalf("Incorrect resourceVersion: %s", fooCreated.ResourceVersion) - } - initialVersion-- - watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", strconv.Itoa(initialVersion), filter) + watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, filter) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -299,23 +286,3 @@ func TestFiltering(t *testing.T) { verifyWatchEvent(t, watcher, watch.Modified, podFooPrime) verifyWatchEvent(t, watcher, watch.Deleted, podFooPrime) } - -/* TODO: So believe it or not... but this test is flakey with the go-etcd client library - * which I'm surprised by. Apprently you can close the client that is performing the watch - * and the watch *never returns.* I would like to still keep this test here and re-enable - * with the new 2.2+ client library. -func TestStorageError(t *testing.T) { - server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix()) - cacher := newTestCacher(etcdStorage) - - watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 1, storage.Everything) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - server.Terminate(t) - - got := <-watcher.ResultChan() - if got.Type != watch.Error { - t.Errorf("Unexpected non-error") - } -} */ diff --git a/pkg/storage/etcd/etcd_helper.go b/pkg/storage/etcd/etcd_helper.go index a4a38f1497e..c70d6677dad 100644 --- a/pkg/storage/etcd/etcd_helper.go +++ b/pkg/storage/etcd/etcd_helper.go @@ -17,9 +17,9 @@ limitations under the License. package etcd import ( - "crypto/tls" "errors" "fmt" + "net" "net/http" "path" "reflect" @@ -36,10 +36,9 @@ import ( "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/watch" - "github.com/coreos/go-etcd/etcd" + etcd "github.com/coreos/etcd/client" "github.com/golang/glog" "golang.org/x/net/context" - forked "k8s.io/kubernetes/third_party/forked/coreos/go-etcd/etcd" ) // storage.Config object for etcd. @@ -56,27 +55,32 @@ func (c *EtcdConfig) GetType() string { // implements storage.Config func (c *EtcdConfig) NewStorage() (storage.Interface, error) { - etcdClient := etcd.NewClient(c.ServerList) - if etcdClient == nil { - return nil, errors.New("Failed to create new etcd client from serverlist") - } - transport := &http.Transport{ - Dial: forked.Dial, - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, + cfg := etcd.Config{ + Endpoints: c.ServerList, + // TODO: Determine if transport needs optimization + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + Dial: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).Dial, + TLSHandshakeTimeout: 10 * time.Second, + MaxIdleConnsPerHost: 500, }, - MaxIdleConnsPerHost: 500, } - etcdClient.SetTransport(transport) - + etcdClient, err := etcd.New(cfg) + if err != nil { + return nil, err + } return NewEtcdStorage(etcdClient, c.Codec, c.Prefix), nil } // Creates a new storage interface from the client // TODO: deprecate in favor of storage.Config abstraction over time -func NewEtcdStorage(client *etcd.Client, codec runtime.Codec, prefix string) storage.Interface { +func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string) storage.Interface { return &etcdHelper{ - client: client, + etcdclient: client, + client: etcd.NewKeysAPI(client), codec: codec, versioner: APIObjectVersioner{}, copier: api.Scheme, @@ -87,9 +91,10 @@ func NewEtcdStorage(client *etcd.Client, codec runtime.Codec, prefix string) sto // etcdHelper is the reference implementation of storage.Interface. type etcdHelper struct { - client *etcd.Client - codec runtime.Codec - copier runtime.ObjectCopier + etcdclient etcd.Client + client etcd.KeysAPI + codec runtime.Codec + copier runtime.ObjectCopier // optional, has to be set to perform any atomic operations versioner storage.Versioner // prefix for all etcd keys @@ -119,7 +124,20 @@ func (h *etcdHelper) Backends(ctx context.Context) []string { if ctx == nil { glog.Errorf("Context is nil") } - return h.client.GetCluster() + membersAPI := etcd.NewMembersAPI(h.etcdclient) + members, err := membersAPI.List(ctx) + if err != nil { + glog.Errorf("Error obtaining etcd members list: %q", err) + return nil + } + if 0 == len(members) { + return nil + } + mlist := []string{""} + for _, member := range members { + mlist = append(mlist, member.ClientURLs...) + } + return mlist } // Implements storage.Interface. @@ -144,7 +162,11 @@ func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Ob } startTime := time.Now() - response, err := h.client.Create(key, string(data), ttl) + opts := etcd.SetOptions{ + TTL: time.Duration(ttl) * time.Second, + PrevExist: etcd.PrevNoExist, + } + response, err := h.client.Set(ctx, key, string(data), &opts) metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime) if err != nil { return err @@ -175,7 +197,11 @@ func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Objec if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 { create = false startTime := time.Now() - response, err = h.client.CompareAndSwap(key, string(data), ttl, "", version) + opts := etcd.SetOptions{ + TTL: time.Duration(ttl) * time.Second, + PrevIndex: version, + } + response, err = h.client.Set(ctx, key, string(data), &opts) metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime) if err != nil { return err @@ -185,7 +211,14 @@ func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Objec if create { // Create will fail if a key already exists. startTime := time.Now() - response, err = h.client.Create(key, string(data), ttl) + opts := etcd.SetOptions{ + TTL: time.Duration(ttl) * time.Second, + PrevExist: etcd.PrevNoExist, + } + response, err = h.client.Set(ctx, key, string(data), &opts) + if err != nil { + return err + } metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime) } @@ -213,7 +246,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object) } startTime := time.Now() - response, err := h.client.Delete(key, false) + response, err := h.client.Delete(ctx, key, nil) metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime) if !etcdutil.IsEtcdNotFound(err) { // if the object that existed prior to the delete is returned by etcd, update out. @@ -235,7 +268,7 @@ func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion stri } key = h.prefixEtcdKey(key) w := newEtcdWatcher(false, nil, filter, h.codec, h.versioner, nil, h) - go w.etcdWatch(h.client, key, watchRV) + go w.etcdWatch(ctx, h.client, key, watchRV) return w, nil } @@ -250,7 +283,7 @@ func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion } key = h.prefixEtcdKey(key) w := newEtcdWatcher(true, exceptKey(key), filter, h.codec, h.versioner, nil, h) - go w.etcdWatch(h.client, key, watchRV) + go w.etcdWatch(ctx, h.client, key, watchRV) return w, nil } @@ -271,7 +304,7 @@ func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr r glog.Errorf("Context is nil") } startTime := time.Now() - response, err := h.client.Get(key, false, false) + response, err := h.client.Get(ctx, key, nil) metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime) if err != nil && !etcdutil.IsEtcdNotFound(err) { @@ -324,7 +357,7 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.F key = h.prefixEtcdKey(key) startTime := time.Now() trace.Step("About to read etcd node") - response, err := h.client.Get(key, false, false) + response, err := h.client.Get(ctx, key, nil) metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime) trace.Step("Etcd node read") if err != nil { @@ -342,7 +375,7 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.F } trace.Step("Object decoded") if h.versioner != nil { - if err := h.versioner.UpdateList(listObj, response.EtcdIndex); err != nil { + if err := h.versioner.UpdateList(listObj, response.Index); err != nil { return err } } @@ -429,10 +462,14 @@ func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node if ctx == nil { glog.Errorf("Context is nil") } - result, err := h.client.Get(key, true, true) + opts := etcd.GetOptions{ + Recursive: true, + Sort: true, + } + result, err := h.client.Get(ctx, key, &opts) if err != nil { var index uint64 - if etcdError, ok := err.(*etcd.EtcdError); ok { + if etcdError, ok := err.(etcd.Error); ok { index = etcdError.Index } nodes := make([]*etcd.Node, 0) @@ -442,7 +479,7 @@ func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node return nodes, index, err } } - return result.Node.Nodes, result.EtcdIndex, nil + return result.Node.Nodes, result.Index, nil } // Implements storage.Interface. @@ -487,7 +524,7 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType ttl = 1 } } else if res != nil { - index = res.EtcdIndex + index = res.Index } if newTTL != nil { @@ -506,7 +543,11 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType // First time this key has been used, try creating new value. if index == 0 { startTime := time.Now() - response, err := h.client.Create(key, string(data), ttl) + opts := etcd.SetOptions{ + TTL: time.Duration(ttl) * time.Second, + PrevExist: etcd.PrevNoExist, + } + response, err := h.client.Set(ctx, key, string(data), &opts) metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime) if etcdutil.IsEtcdNodeExist(err) { continue @@ -521,7 +562,12 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType startTime := time.Now() // Swap origBody with data, if origBody is the latest etcd data. - response, err := h.client.CompareAndSwap(key, string(data), ttl, origBody, index) + opts := etcd.SetOptions{ + PrevValue: origBody, + PrevIndex: index, + TTL: time.Duration(ttl) * time.Second, + } + response, err := h.client.Set(ctx, key, string(data), &opts) metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime) if etcdutil.IsEtcdTestFailed(err) { // Try again. diff --git a/pkg/storage/etcd/etcd_helper_test.go b/pkg/storage/etcd/etcd_helper_test.go index 0233fc81c4c..5bf82a4c669 100644 --- a/pkg/storage/etcd/etcd_helper_test.go +++ b/pkg/storage/etcd/etcd_helper_test.go @@ -22,7 +22,7 @@ import ( "sync" "testing" - "github.com/coreos/go-etcd/etcd" + etcd "github.com/coreos/etcd/client" "github.com/stretchr/testify/assert" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/api" @@ -55,7 +55,7 @@ func init() { ) } -func newEtcdHelper(client *etcd.Client, codec runtime.Codec, prefix string) etcdHelper { +func newEtcdHelper(client etcd.Client, codec runtime.Codec, prefix string) etcdHelper { return *NewEtcdStorage(client, codec, prefix).(*etcdHelper) } diff --git a/pkg/storage/etcd/etcd_watcher.go b/pkg/storage/etcd/etcd_watcher.go index 20549ef3976..7b6ac901771 100644 --- a/pkg/storage/etcd/etcd_watcher.go +++ b/pkg/storage/etcd/etcd_watcher.go @@ -29,8 +29,9 @@ import ( "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/watch" - "github.com/coreos/go-etcd/etcd" + etcd "github.com/coreos/etcd/client" "github.com/golang/glog" + "golang.org/x/net/context" ) // Etcd watch event actions @@ -85,7 +86,8 @@ type etcdWatcher struct { etcdIncoming chan *etcd.Response etcdError chan error - etcdStop chan bool + ctx context.Context + cancel context.CancelFunc etcdCallEnded chan struct{} outgoing chan watch.Event @@ -124,10 +126,12 @@ func newEtcdWatcher(list bool, include includeFunc, filter storage.FilterFunc, e // monitor how much of this buffer is actually used. etcdIncoming: make(chan *etcd.Response, 100), etcdError: make(chan error, 1), - etcdStop: make(chan bool), outgoing: make(chan watch.Event), userStop: make(chan struct{}), + stopped: false, cache: cache, + ctx: nil, + cancel: nil, } w.emit = func(e watch.Event) { w.outgoing <- e } go w.translate() @@ -136,37 +140,54 @@ func newEtcdWatcher(list bool, include includeFunc, filter storage.FilterFunc, e // etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called // as a goroutine. -func (w *etcdWatcher) etcdWatch(client *etcd.Client, key string, resourceVersion uint64) { +func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key string, resourceVersion uint64) { defer util.HandleCrash() defer close(w.etcdError) + defer close(w.etcdIncoming) if resourceVersion == 0 { - latest, err := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming) + latest, err := etcdGetInitialWatchState(ctx, client, key, w.list, w.etcdIncoming) if err != nil { w.etcdError <- err return } - resourceVersion = latest + 1 + resourceVersion = latest } - _, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop) - if err != nil && err != etcd.ErrWatchStoppedByUser { - w.etcdError <- err + + opts := etcd.WatcherOptions{ + Recursive: w.list, + AfterIndex: resourceVersion, + } + watcher := client.Watcher(key, &opts) + w.ctx, w.cancel = context.WithCancel(ctx) + + for { + resp, err := watcher.Next(w.ctx) + if err != nil { + w.etcdError <- err + return + } + w.etcdIncoming <- resp } } // etcdGetInitialWatchState turns an etcd Get request into a watch equivalent -func etcdGetInitialWatchState(client *etcd.Client, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) { - resp, err := client.Get(key, false, recursive) +func etcdGetInitialWatchState(ctx context.Context, client etcd.KeysAPI, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) { + opts := etcd.GetOptions{ + Recursive: recursive, + Sort: false, + } + resp, err := client.Get(ctx, key, &opts) if err != nil { if !etcdutil.IsEtcdNotFound(err) { glog.Errorf("watch was unable to retrieve the current index for the provided key (%q): %v", key, err) return resourceVersion, err } - if etcdError, ok := err.(*etcd.EtcdError); ok { + if etcdError, ok := err.(etcd.Error); ok { resourceVersion = etcdError.Index } return resourceVersion, nil } - resourceVersion = resp.EtcdIndex + resourceVersion = resp.Index convertRecursiveResponse(resp.Node, resp, incoming) return } @@ -228,7 +249,6 @@ func (w *etcdWatcher) translate() { } return case <-w.userStop: - w.etcdStop <- true return case res, ok := <-w.etcdIncoming: if ok { @@ -407,7 +427,10 @@ func (w *etcdWatcher) ResultChan() <-chan watch.Event { func (w *etcdWatcher) Stop() { w.stopLock.Lock() defer w.stopLock.Unlock() - // Prevent double channel closes. + if w.cancel != nil { + w.cancel() + w.cancel = nil + } if !w.stopped { w.stopped = true close(w.userStop) diff --git a/pkg/storage/etcd/etcd_watcher_test.go b/pkg/storage/etcd/etcd_watcher_test.go index 12a7f6d07fb..49de7643ed9 100644 --- a/pkg/storage/etcd/etcd_watcher_test.go +++ b/pkg/storage/etcd/etcd_watcher_test.go @@ -22,7 +22,6 @@ import ( "sync" "testing" - "github.com/coreos/go-etcd/etcd" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/runtime" @@ -31,6 +30,7 @@ import ( etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing" "k8s.io/kubernetes/pkg/watch" + etcd "github.com/coreos/etcd/client" "golang.org/x/net/context" ) @@ -217,15 +217,11 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) { } } -/* TODO: So believe it or not... but this test is flakey with the go-etcd client library - * which I'm surprised by. Apprently you can close the client that is performing the watch - * and the watch *never returns.* I would like to still keep this test here and re-enable - * with the new 2.2+ client library. func TestWatchEtcdError(t *testing.T) { codec := testapi.Default.Codec() server := etcdtesting.NewEtcdTestClientServer(t) h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) - watching, err := h.Watch(context.TODO(), "/some/key", 4, storage.Everything) + watching, err := h.Watch(context.TODO(), "/some/key", "4", storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -236,7 +232,7 @@ func TestWatchEtcdError(t *testing.T) { t.Fatalf("Unexpected non-error") } watching.Stop() -} */ +} func TestWatch(t *testing.T) { codec := testapi.Default.Codec() diff --git a/pkg/storage/etcd/testing/utils.go b/pkg/storage/etcd/testing/utils.go index 781291fd963..16815a883cb 100644 --- a/pkg/storage/etcd/testing/utils.go +++ b/pkg/storage/etcd/testing/utils.go @@ -26,19 +26,19 @@ import ( "testing" "time" + etcd "github.com/coreos/etcd/client" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/etcdhttp" "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/rafthttp" - goetcd "github.com/coreos/go-etcd/etcd" ) // EtcdTestServer encapsulates the datastructures needed to start local instance for testing type EtcdTestServer struct { etcdserver.ServerConfig PeerListeners, ClientListeners []net.Listener - Client *goetcd.Client + Client etcd.Client raftHandler http.Handler s *etcdserver.EtcdServer @@ -126,7 +126,7 @@ func (m *EtcdTestServer) launch(t *testing.T) error { // Terminate will shutdown the running etcd server func (m *EtcdTestServer) Terminate(t *testing.T) { - m.Client.Close() + m.Client = nil m.s.Stop() for _, hs := range m.hss { hs.CloseClientConnections() @@ -145,9 +145,12 @@ func NewEtcdTestClientServer(t *testing.T) *EtcdTestServer { t.Fatal("Failed to start etcd server error=%v", err) return nil } - server.Client = goetcd.NewClient(server.ClientURLs.StringSlice()) - if server.Client == nil { - t.Errorf("Failed to connect to local etcd server") + cfg := etcd.Config{ + Endpoints: server.ClientURLs.StringSlice(), + } + server.Client, err = etcd.New(cfg) + if err != nil { + t.Errorf("Unexpected Error in NewEtcdTestClientServer (%v)", err) defer server.Terminate(t) return nil } diff --git a/pkg/storage/etcd/util/etcd_util.go b/pkg/storage/etcd/util/etcd_util.go index fb990fe91db..b15ec5bd019 100644 --- a/pkg/storage/etcd/util/etcd_util.go +++ b/pkg/storage/etcd/util/etcd_util.go @@ -22,56 +22,46 @@ import ( "io/ioutil" "net/http" - goetcd "github.com/coreos/go-etcd/etcd" -) - -const ( - etcdErrorCodeNotFound = 100 - etcdErrorCodeTestFailed = 101 - etcdErrorCodeNodeExist = 105 - etcdErrorCodeValueRequired = 200 - etcdErrorCodeWatchExpired = 401 - etcdErrorCodeUnreachable = 501 -) - -var ( - etcdErrorNotFound = &goetcd.EtcdError{ErrorCode: etcdErrorCodeNotFound} - etcdErrorTestFailed = &goetcd.EtcdError{ErrorCode: etcdErrorCodeTestFailed} - etcdErrorNodeExist = &goetcd.EtcdError{ErrorCode: etcdErrorCodeNodeExist} - etcdErrorValueRequired = &goetcd.EtcdError{ErrorCode: etcdErrorCodeValueRequired} - etcdErrorWatchExpired = &goetcd.EtcdError{ErrorCode: etcdErrorCodeWatchExpired} - etcdErrorUnreachable = &goetcd.EtcdError{ErrorCode: etcdErrorCodeUnreachable} + etcd "github.com/coreos/etcd/client" ) // IsEtcdNotFound returns true if and only if err is an etcd not found error. func IsEtcdNotFound(err error) bool { - return isEtcdErrorNum(err, etcdErrorCodeNotFound) + return isEtcdErrorNum(err, etcd.ErrorCodeKeyNotFound) } // IsEtcdNodeExist returns true if and only if err is an etcd node already exist error. func IsEtcdNodeExist(err error) bool { - return isEtcdErrorNum(err, etcdErrorCodeNodeExist) + return isEtcdErrorNum(err, etcd.ErrorCodeNodeExist) } // IsEtcdTestFailed returns true if and only if err is an etcd write conflict. func IsEtcdTestFailed(err error) bool { - return isEtcdErrorNum(err, etcdErrorCodeTestFailed) + return isEtcdErrorNum(err, etcd.ErrorCodeTestFailed) } // IsEtcdWatchExpired returns true if and only if err indicates the watch has expired. func IsEtcdWatchExpired(err error) bool { - return isEtcdErrorNum(err, etcdErrorCodeWatchExpired) + // NOTE: This seems weird why it wouldn't be etcd.ErrorCodeWatcherCleared + // I'm using the previous matching value + return isEtcdErrorNum(err, etcd.ErrorCodeEventIndexCleared) } // IsEtcdUnreachable returns true if and only if err indicates the server could not be reached. func IsEtcdUnreachable(err error) bool { - return isEtcdErrorNum(err, etcdErrorCodeUnreachable) + // NOTE: The logic has changed previous error code no longer applies + return err == etcd.ErrClusterUnavailable } // isEtcdErrorNum returns true if and only if err is an etcd error, whose errorCode matches errorCode func isEtcdErrorNum(err error, errorCode int) bool { - etcdError, ok := err.(*goetcd.EtcdError) - return ok && etcdError != nil && etcdError.ErrorCode == errorCode + if err != nil { + if etcdError, ok := err.(etcd.Error); ok { + return etcdError.Code == errorCode + } + // NOTE: There are other error types returned + } + return false } // GetEtcdVersion performs a version check against the provided Etcd server, diff --git a/pkg/storage/etcd/util/etcd_util_test.go b/pkg/storage/etcd/util/etcd_util_test.go index 09de6aa87c3..cc41958ad04 100644 --- a/pkg/storage/etcd/util/etcd_util_test.go +++ b/pkg/storage/etcd/util/etcd_util_test.go @@ -26,7 +26,7 @@ import ( "testing" "time" - "github.com/coreos/go-etcd/etcd" + etcd "github.com/coreos/etcd/client" "github.com/stretchr/testify/assert" ) @@ -38,8 +38,7 @@ func TestIsEtcdNotFound(t *testing.T) { t.Errorf("Expected %#v to return %v, but it did not", err, isNotFound) } } - try(etcdErrorNotFound, true) - try(&etcd.EtcdError{ErrorCode: 101}, false) + try(&etcd.Error{Code: 101}, false) try(nil, false) try(fmt.Errorf("some other kind of error"), false) } diff --git a/pkg/storage/interfaces.go b/pkg/storage/interfaces.go index 5cf7fcc173e..7c5b1539564 100644 --- a/pkg/storage/interfaces.go +++ b/pkg/storage/interfaces.go @@ -95,13 +95,15 @@ type Interface interface { // Watch begins watching the specified key. Events are decoded into API objects, // and any items passing 'filter' are sent down to returned watch.Interface. - // resourceVersion may be used to specify what version to begin watching + // resourceVersion may be used to specify what version to begin watching, + // which should be the current resourceVersion, and no longer rv+1 // (e.g. reconnecting without missing any updates). Watch(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) // WatchList begins watching the specified key's items. Items are decoded into API // objects and any item passing 'filter' are sent down to returned watch.Interface. - // resourceVersion may be used to specify what version to begin watching + // resourceVersion may be used to specify what version to begin watching, + // which should be the current resourceVersion, and no longer rv+1 // (e.g. reconnecting without missing any updates). WatchList(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) diff --git a/pkg/storage/util.go b/pkg/storage/util.go index 0a741b805a7..ab01f3cda72 100644 --- a/pkg/storage/util.go +++ b/pkg/storage/util.go @@ -53,7 +53,7 @@ func ParseWatchResourceVersion(resourceVersion string) (uint64, error) { field.Invalid(field.NewPath("resourceVersion"), resourceVersion, err.Error()), }) } - return version + 1, nil + return version, nil } // ParseListResourceVersion takes a resource version argument and converts it to diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index 22768ce1afa..d24ae5d5f75 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -31,8 +31,8 @@ func TestEtcdParseWatchResourceVersion(t *testing.T) { {Version: "", ExpectVersion: 0}, {Version: "a", Err: true}, {Version: " ", Err: true}, - {Version: "1", ExpectVersion: 2}, - {Version: "10", ExpectVersion: 11}, + {Version: "1", ExpectVersion: 1}, + {Version: "10", ExpectVersion: 10}, } for _, testCase := range testCases { version, err := ParseWatchResourceVersion(testCase.Version) diff --git a/test/integration/etcd_tools_test.go b/test/integration/etcd_tools_test.go index ca61850c433..720fe278d60 100644 --- a/test/integration/etcd_tools_test.go +++ b/test/integration/etcd_tools_test.go @@ -26,24 +26,26 @@ import ( "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/storage" - "k8s.io/kubernetes/pkg/storage/etcd" + etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" "k8s.io/kubernetes/pkg/storage/etcd/etcdtest" "k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/test/integration/framework" + etcd "github.com/coreos/etcd/client" "golang.org/x/net/context" ) func TestSet(t *testing.T) { client := framework.NewEtcdClient() - etcdStorage := etcd.NewEtcdStorage(client, testapi.Default.Codec(), "") + keysAPI := etcd.NewKeysAPI(client) + etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), "") ctx := context.TODO() framework.WithEtcdKey(func(key string) { testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}} if err := etcdStorage.Set(ctx, key, &testObject, nil, 0); err != nil { t.Fatalf("unexpected error: %v", err) } - resp, err := client.Get(key, false, false) + resp, err := keysAPI.Get(ctx, key, nil) if err != nil || resp.Node == nil { t.Fatalf("unexpected error: %v %v", err, resp) } @@ -60,7 +62,8 @@ func TestSet(t *testing.T) { func TestGet(t *testing.T) { client := framework.NewEtcdClient() - etcdStorage := etcd.NewEtcdStorage(client, testapi.Default.Codec(), "") + keysAPI := etcd.NewKeysAPI(client) + etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), "") ctx := context.TODO() framework.WithEtcdKey(func(key string) { testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}} @@ -68,7 +71,7 @@ func TestGet(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %v", err) } - _, err = client.Set(key, string(coded), 0) + _, err = keysAPI.Set(ctx, key, string(coded), nil) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -86,7 +89,8 @@ func TestGet(t *testing.T) { func TestWriteTTL(t *testing.T) { client := framework.NewEtcdClient() - etcdStorage := etcd.NewEtcdStorage(client, testapi.Default.Codec(), "") + keysAPI := etcd.NewKeysAPI(client) + etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), "") ctx := context.TODO() framework.WithEtcdKey(func(key string) { testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}} @@ -111,7 +115,7 @@ func TestWriteTTL(t *testing.T) { if result.Name != "out" { t.Errorf("unexpected response: %#v", result) } - if res, err := client.Get(key, false, false); err != nil || res == nil || res.Node.TTL != 10 { + if res, err := keysAPI.Get(ctx, key, nil); err != nil || res == nil || res.Node.TTL != 10 { t.Fatalf("unexpected get: %v %#v", err, res) } @@ -132,7 +136,7 @@ func TestWriteTTL(t *testing.T) { if result.Name != "out2" { t.Errorf("unexpected response: %#v", result) } - if res, err := client.Get(key, false, false); err != nil || res == nil || res.Node.TTL <= 1 { + if res, err := keysAPI.Get(ctx, key, nil); err != nil || res == nil || res.Node.TTL <= 1 { t.Fatalf("unexpected get: %v %#v", err, res) } }) @@ -140,11 +144,12 @@ func TestWriteTTL(t *testing.T) { func TestWatch(t *testing.T) { client := framework.NewEtcdClient() - etcdStorage := etcd.NewEtcdStorage(client, testapi.Default.Codec(), etcdtest.PathPrefix()) + keysAPI := etcd.NewKeysAPI(client) + etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), etcdtest.PathPrefix()) ctx := context.TODO() framework.WithEtcdKey(func(key string) { key = etcdtest.AddPrefix(key) - resp, err := client.Set(key, runtime.EncodeOrDie(testapi.Default.Codec(), &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0) + resp, err := keysAPI.Set(ctx, key, runtime.EncodeOrDie(testapi.Default.Codec(), &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}), nil) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -178,7 +183,7 @@ func TestWatch(t *testing.T) { } // should return the previously deleted item in the watch, but with the latest index - resp, err = client.Delete(key, false) + resp, err = keysAPI.Delete(ctx, key, nil) if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/test/integration/framework/etcd_utils.go b/test/integration/framework/etcd_utils.go index 95a631ef95c..1d4e6b50fe0 100644 --- a/test/integration/framework/etcd_utils.go +++ b/test/integration/framework/etcd_utils.go @@ -20,8 +20,10 @@ import ( "fmt" "math/rand" - "github.com/coreos/go-etcd/etcd" + etcd "github.com/coreos/etcd/client" "github.com/golang/glog" + "golang.org/x/net/context" + "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/storage" etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" @@ -35,15 +37,22 @@ func init() { RequireEtcd() } -func NewEtcdClient() *etcd.Client { - return etcd.NewClient([]string{}) +func NewEtcdClient() etcd.Client { + cfg := etcd.Config{ + Endpoints: []string{"http://127.0.0.1:4001"}, + } + client, err := etcd.New(cfg) + if err != nil { + glog.Fatalf("unable to connect to etcd for testing: %v", err) + } + return client } func NewEtcdStorage() storage.Interface { return etcdstorage.NewEtcdStorage(NewEtcdClient(), testapi.Default.Codec(), etcdtest.PathPrefix()) } -func NewExtensionsEtcdStorage(client *etcd.Client) storage.Interface { +func NewExtensionsEtcdStorage(client etcd.Client) storage.Interface { if client == nil { client = NewEtcdClient() } @@ -51,14 +60,14 @@ func NewExtensionsEtcdStorage(client *etcd.Client) storage.Interface { } func RequireEtcd() { - if _, err := NewEtcdClient().Get("/", false, false); err != nil { + if _, err := etcd.NewKeysAPI(NewEtcdClient()).Get(context.TODO(), "/", nil); err != nil { glog.Fatalf("unable to connect to etcd for testing: %v", err) } } func WithEtcdKey(f func(string)) { prefix := fmt.Sprintf("/test-%d", rand.Int63()) - defer NewEtcdClient().Delete(prefix, true) + defer etcd.NewKeysAPI(NewEtcdClient()).Delete(context.TODO(), prefix, &etcd.DeleteOptions{Recursive: true}) f(prefix) } @@ -68,13 +77,13 @@ func WithEtcdKey(f func(string)) { // of the test run. func DeleteAllEtcdKeys() { glog.Infof("Deleting all etcd keys") - client := NewEtcdClient() - keys, err := client.Get("/", false, false) + keysAPI := etcd.NewKeysAPI(NewEtcdClient()) + keys, err := keysAPI.Get(context.TODO(), "/", nil) if err != nil { glog.Fatalf("Unable to list root etcd keys: %v", err) } for _, node := range keys.Node.Nodes { - if _, err := client.Delete(node.Key, true); err != nil { + if _, err := keysAPI.Delete(context.TODO(), node.Key, &etcd.DeleteOptions{Recursive: true}); err != nil { glog.Fatalf("Unable delete key: %v", err) } } diff --git a/test/integration/utils.go b/test/integration/utils.go index 002d23bbf2e..107bb5dd774 100644 --- a/test/integration/utils.go +++ b/test/integration/utils.go @@ -22,34 +22,42 @@ import ( "math/rand" "os" - "github.com/coreos/go-etcd/etcd" + etcd "github.com/coreos/etcd/client" "github.com/golang/glog" + "golang.org/x/net/context" ) -func newEtcdClient() *etcd.Client { - return etcd.NewClient([]string{}) +func newEtcdClient() etcd.Client { + cfg := etcd.Config{ + Endpoints: []string{"http://127.0.0.1:4001"}, + } + client, err := etcd.New(cfg) + if err != nil { + glog.Fatalf("unable to connect to etcd for testing: %v", err) + } + return client } func requireEtcd() { - if _, err := newEtcdClient().Get("/", false, false); err != nil { + if _, err := etcd.NewKeysAPI(newEtcdClient()).Get(context.TODO(), "/", nil); err != nil { glog.Fatalf("unable to connect to etcd for integration testing: %v", err) } } func withEtcdKey(f func(string)) { prefix := fmt.Sprintf("/test-%d", rand.Int63()) - defer newEtcdClient().Delete(prefix, true) + defer etcd.NewKeysAPI(newEtcdClient()).Delete(context.TODO(), prefix, &etcd.DeleteOptions{Recursive: true}) f(prefix) } func deleteAllEtcdKeys() { - client := newEtcdClient() - keys, err := client.Get("/", false, false) + keysAPI := etcd.NewKeysAPI(newEtcdClient()) + keys, err := keysAPI.Get(context.TODO(), "/", nil) if err != nil { glog.Fatalf("Unable to list root etcd keys: %v", err) } for _, node := range keys.Node.Nodes { - if _, err := client.Delete(node.Key, true); err != nil { + if _, err := keysAPI.Delete(context.TODO(), node.Key, &etcd.DeleteOptions{Recursive: true}); err != nil { glog.Fatalf("Unable delete key: %v", err) } }