mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
Updating kubernetes proper to use latest etcd client library
This commit is contained in:
parent
bae050ffea
commit
c505a5d49d
@ -63,9 +63,10 @@ import (
|
||||
"k8s.io/kubernetes/test/integration"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
"github.com/golang/glog"
|
||||
"github.com/spf13/pflag"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -93,17 +94,23 @@ func (h *delegateHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
|
||||
func startComponents(firstManifestURL, secondManifestURL string) (string, string) {
|
||||
// Setup
|
||||
servers := []string{}
|
||||
glog.Infof("Creating etcd client pointing to %v", servers)
|
||||
|
||||
handler := delegateHandler{}
|
||||
apiServer := httptest.NewServer(&handler)
|
||||
|
||||
etcdClient := etcd.NewClient(servers)
|
||||
cfg := etcd.Config{
|
||||
Endpoints: []string{"http://127.0.0.1:4001"},
|
||||
}
|
||||
etcdClient, err := etcd.New(cfg)
|
||||
if err != nil {
|
||||
glog.Fatalf("Error creating etcd client: %v", err)
|
||||
}
|
||||
glog.Infof("Creating etcd client pointing to %v", cfg.Endpoints)
|
||||
|
||||
keysAPI := etcd.NewKeysAPI(etcdClient)
|
||||
sleep := 4 * time.Second
|
||||
ok := false
|
||||
for i := 0; i < 3; i++ {
|
||||
keys, err := etcdClient.Get("/", false, false)
|
||||
keys, err := keysAPI.Get(context.TODO(), "/", nil)
|
||||
if err != nil {
|
||||
glog.Warningf("Unable to list root etcd keys: %v", err)
|
||||
if i < 2 {
|
||||
@ -113,7 +120,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
|
||||
continue
|
||||
}
|
||||
for _, node := range keys.Node.Nodes {
|
||||
if _, err := etcdClient.Delete(node.Key, true); err != nil {
|
||||
if _, err := keysAPI.Delete(context.TODO(), node.Key, &etcd.DeleteOptions{Recursive: true}); err != nil {
|
||||
glog.Fatalf("Unable delete key: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -20,8 +20,10 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
"github.com/golang/glog"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
@ -38,15 +40,15 @@ type Master string
|
||||
func (obj Master) GetObjectKind() unversioned.ObjectKind { return unversioned.EmptyObjectKind }
|
||||
|
||||
// NewEtcdMasterElector returns an implementation of election.MasterElector backed by etcd.
|
||||
func NewEtcdMasterElector(h *etcd.Client) MasterElector {
|
||||
return &etcdMasterElector{etcd: h}
|
||||
func NewEtcdMasterElector(h etcd.Client) MasterElector {
|
||||
return &etcdMasterElector{etcd: etcd.NewKeysAPI(h)}
|
||||
}
|
||||
|
||||
type empty struct{}
|
||||
|
||||
// internal implementation struct
|
||||
type etcdMasterElector struct {
|
||||
etcd *etcd.Client
|
||||
etcd etcd.KeysAPI
|
||||
done chan empty
|
||||
events chan watch.Event
|
||||
}
|
||||
@ -90,7 +92,12 @@ func (e *etcdMasterElector) extendMaster(path, id string, ttl uint64, res *etcd.
|
||||
// Uses compare and swap, so that if we TTL out in the meantime, the write will fail.
|
||||
// We don't handle the TTL delete w/o a write case here, it's handled in the next loop
|
||||
// iteration.
|
||||
_, err := e.etcd.CompareAndSwap(path, id, ttl, "", res.Node.ModifiedIndex)
|
||||
opts := etcd.SetOptions{
|
||||
TTL: time.Duration(ttl) * time.Second,
|
||||
PrevValue: "",
|
||||
PrevIndex: res.Node.ModifiedIndex,
|
||||
}
|
||||
_, err := e.etcd.Set(context.TODO(), path, id, &opts)
|
||||
if err != nil && !etcdutil.IsEtcdTestFailed(err) {
|
||||
return "", err
|
||||
}
|
||||
@ -105,7 +112,12 @@ func (e *etcdMasterElector) extendMaster(path, id string, ttl uint64, res *etcd.
|
||||
// returns id, nil if the attempt succeeded
|
||||
// returns "", err if an error occurred
|
||||
func (e *etcdMasterElector) becomeMaster(path, id string, ttl uint64) (string, error) {
|
||||
_, err := e.etcd.Create(path, id, ttl)
|
||||
opts := etcd.SetOptions{
|
||||
TTL: time.Duration(ttl) * time.Second,
|
||||
PrevExist: etcd.PrevNoExist,
|
||||
}
|
||||
|
||||
_, err := e.etcd.Set(context.TODO(), path, id, &opts)
|
||||
if err != nil && !etcdutil.IsEtcdNodeExist(err) {
|
||||
// unexpected error
|
||||
return "", err
|
||||
@ -122,7 +134,7 @@ func (e *etcdMasterElector) becomeMaster(path, id string, ttl uint64) (string, e
|
||||
// in situations where you should try again due to concurrent state changes (e.g. another actor simultaneously acquiring the lock)
|
||||
// it returns "", nil
|
||||
func (e *etcdMasterElector) handleMaster(path, id string, ttl uint64) (string, error) {
|
||||
res, err := e.etcd.Get(path, false, false)
|
||||
res, err := e.etcd.Get(context.TODO(), path, nil)
|
||||
|
||||
// Unexpected error, bail out
|
||||
if err != nil && !etcdutil.IsEtcdNotFound(err) {
|
||||
|
@ -19,6 +19,9 @@ package election
|
||||
import (
|
||||
"testing"
|
||||
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
@ -28,7 +31,8 @@ func TestEtcdMasterOther(t *testing.T) {
|
||||
defer server.Terminate(t)
|
||||
|
||||
path := "foo"
|
||||
if _, err := server.Client.Set(path, "baz", 0); err != nil {
|
||||
keysAPI := etcd.NewKeysAPI(server.Client)
|
||||
if _, err := keysAPI.Set(context.TODO(), path, "baz", nil); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
master := NewEtcdMasterElector(server.Client)
|
||||
|
@ -32,7 +32,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
log "github.com/golang/glog"
|
||||
"github.com/kardianos/osext"
|
||||
@ -102,7 +102,6 @@ type SchedulerServer struct {
|
||||
authPath string
|
||||
apiServerList []string
|
||||
etcdServerList []string
|
||||
etcdConfigFile string
|
||||
allowPrivileged bool
|
||||
executorPath string
|
||||
proxyPath string
|
||||
@ -234,8 +233,7 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) {
|
||||
fs.BoolVar(&s.enableProfiling, "profiling", s.enableProfiling, "Enable profiling via web interface host:port/debug/pprof/")
|
||||
fs.StringSliceVar(&s.apiServerList, "api-servers", s.apiServerList, "List of Kubernetes API servers for publishing events, and reading pods and services. (ip:port), comma separated.")
|
||||
fs.StringVar(&s.authPath, "auth-path", s.authPath, "Path to .kubernetes_auth file, specifying how to authenticate to API server.")
|
||||
fs.StringSliceVar(&s.etcdServerList, "etcd-servers", s.etcdServerList, "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with --etcd-config")
|
||||
fs.StringVar(&s.etcdConfigFile, "etcd-config", s.etcdConfigFile, "The config file for the etcd client. Mutually exclusive with --etcd-servers.")
|
||||
fs.StringSliceVar(&s.etcdServerList, "etcd-servers", s.etcdServerList, "List of etcd servers to watch (http://ip:port), comma separated.")
|
||||
fs.BoolVar(&s.allowPrivileged, "allow-privileged", s.allowPrivileged, "Enable privileged containers in the kubelet (compare the same flag in the apiserver).")
|
||||
fs.StringVar(&s.clusterDomain, "cluster-domain", s.clusterDomain, "Domain for this cluster. If set, kubelet will configure all containers to search this domain in addition to the host's search domains")
|
||||
fs.IPVar(&s.clusterDNS, "cluster-dns", s.clusterDNS, "IP address for a cluster DNS server. If set, kubelet will configure all containers to use this for DNS resolution in addition to the host's DNS servers")
|
||||
@ -640,16 +638,14 @@ func validateLeadershipTransition(desired, current string) {
|
||||
}
|
||||
|
||||
// hacked from https://github.com/GoogleCloudPlatform/kubernetes/blob/release-0.14/cmd/kube-apiserver/app/server.go
|
||||
func newEtcd(etcdConfigFile string, etcdServerList []string) (client *etcd.Client, err error) {
|
||||
if etcdConfigFile != "" {
|
||||
client, err = etcd.NewClientFromFile(etcdConfigFile)
|
||||
} else {
|
||||
client = etcd.NewClient(etcdServerList)
|
||||
func newEtcd(etcdServerList []string) (etcd.Client, error) {
|
||||
cfg := etcd.Config{
|
||||
Endpoints: etcdServerList,
|
||||
}
|
||||
return
|
||||
return etcd.New(cfg)
|
||||
}
|
||||
|
||||
func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config) (*ha.SchedulerProcess, ha.DriverFactory, *etcd.Client, *mesos.ExecutorID) {
|
||||
func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config) (*ha.SchedulerProcess, ha.DriverFactory, etcd.Client, *mesos.ExecutorID) {
|
||||
s.frameworkName = strings.TrimSpace(s.frameworkName)
|
||||
if s.frameworkName == "" {
|
||||
log.Fatalf("framework-name must be a non-empty string")
|
||||
@ -661,8 +657,8 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
|
||||
s.mux.Handle("/metrics", prometheus.Handler())
|
||||
healthz.InstallHandler(s.mux)
|
||||
|
||||
if (s.etcdConfigFile != "" && len(s.etcdServerList) != 0) || (s.etcdConfigFile == "" && len(s.etcdServerList) == 0) {
|
||||
log.Fatalf("specify either --etcd-servers or --etcd-config")
|
||||
if len(s.etcdServerList) == 0 {
|
||||
log.Fatalf("specify --etcd-servers must be specified")
|
||||
}
|
||||
|
||||
if len(s.apiServerList) < 1 {
|
||||
@ -689,10 +685,11 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
|
||||
// (1) the generic config store is available for the FrameworkId storage
|
||||
// (2) the generic master election is provided by the apiserver
|
||||
// Compare docs/proposals/high-availability.md
|
||||
etcdClient, err := newEtcd(s.etcdConfigFile, s.etcdServerList)
|
||||
etcdClient, err := newEtcd(s.etcdServerList)
|
||||
if err != nil {
|
||||
log.Fatalf("misconfigured etcd: %v", err)
|
||||
}
|
||||
keysAPI := etcd.NewKeysAPI(etcdClient)
|
||||
|
||||
// mirror all nodes into the nodeStore
|
||||
var eiRegistry executorinfo.Registry
|
||||
@ -741,7 +738,7 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
|
||||
LookupNode: lookupNode,
|
||||
StoreFrameworkId: func(id string) {
|
||||
// TODO(jdef): port FrameworkId store to generic Kubernetes config store as soon as available
|
||||
_, err := etcdClient.Set(meta.FrameworkIDKey, id, uint64(s.failoverTimeout))
|
||||
_, err := keysAPI.Set(context.TODO(), meta.FrameworkIDKey, id, &etcd.SetOptions{TTL: time.Duration(s.failoverTimeout) * time.Second})
|
||||
if err != nil {
|
||||
log.Errorf("failed to renew frameworkId TTL: %v", err)
|
||||
}
|
||||
@ -806,7 +803,7 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
|
||||
log.V(1).Infoln("deferred init complete")
|
||||
// defer obtaining framework ID to prevent multiple schedulers
|
||||
// from overwriting each other's framework IDs
|
||||
dconfig.Framework.Id, err = s.fetchFrameworkID(etcdClient)
|
||||
dconfig.Framework.Id, err = s.fetchFrameworkID(keysAPI)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to fetch framework ID from etcd: %v", err)
|
||||
}
|
||||
@ -928,9 +925,9 @@ func (s *SchedulerServer) buildFrameworkInfo() (info *mesos.FrameworkInfo, cred
|
||||
return
|
||||
}
|
||||
|
||||
func (s *SchedulerServer) fetchFrameworkID(client *etcd.Client) (*mesos.FrameworkID, error) {
|
||||
func (s *SchedulerServer) fetchFrameworkID(client etcd.KeysAPI) (*mesos.FrameworkID, error) {
|
||||
if s.failoverTimeout > 0 {
|
||||
if response, err := client.Get(meta.FrameworkIDKey, false, false); err != nil {
|
||||
if response, err := client.Get(context.TODO(), meta.FrameworkIDKey, nil); err != nil {
|
||||
if !etcdutil.IsEtcdNotFound(err) {
|
||||
return nil, fmt.Errorf("unexpected failure attempting to load framework ID from etcd: %v", err)
|
||||
}
|
||||
@ -941,7 +938,7 @@ func (s *SchedulerServer) fetchFrameworkID(client *etcd.Client) (*mesos.Framewor
|
||||
}
|
||||
} else {
|
||||
//TODO(jdef) this seems like a totally hackish way to clean up the framework ID
|
||||
if _, err := client.Delete(meta.FrameworkIDKey, true); err != nil {
|
||||
if _, err := client.Delete(context.TODO(), meta.FrameworkIDKey, &etcd.DeleteOptions{Recursive: true}); err != nil {
|
||||
if !etcdutil.IsEtcdNotFound(err) {
|
||||
return nil, fmt.Errorf("failed to delete framework ID from etcd: %v", err)
|
||||
}
|
||||
|
@ -178,6 +178,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
|
||||
// need to retry it on errors under lock.
|
||||
for {
|
||||
if err := c.reflector.ListAndWatch(stopChannel); err != nil {
|
||||
// TODO: This can tight loop log.
|
||||
glog.Errorf("unexpected ListAndWatch error: %v", err)
|
||||
} else {
|
||||
break
|
||||
|
@ -196,14 +196,7 @@ func TestWatch(t *testing.T) {
|
||||
t.Errorf("Expected 'error too old' error")
|
||||
}
|
||||
|
||||
// Now test watch with initial state.
|
||||
// We want to observe fooCreation too, so need to pass smaller resource version.
|
||||
initialVersion, err := strconv.Atoi(fooCreated.ResourceVersion)
|
||||
if err != nil {
|
||||
t.Fatalf("Incorrect resourceVersion: %s", fooCreated.ResourceVersion)
|
||||
}
|
||||
initialVersion--
|
||||
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", strconv.Itoa(initialVersion), storage.Everything)
|
||||
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
@ -282,13 +275,7 @@ func TestFiltering(t *testing.T) {
|
||||
}
|
||||
return selector.Matches(labels.Set(metadata.GetLabels()))
|
||||
}
|
||||
// We want to observe fooCreation too, so need to pass smaller resource version.
|
||||
initialVersion, err := strconv.Atoi(fooCreated.ResourceVersion)
|
||||
if err != nil {
|
||||
t.Fatalf("Incorrect resourceVersion: %s", fooCreated.ResourceVersion)
|
||||
}
|
||||
initialVersion--
|
||||
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", strconv.Itoa(initialVersion), filter)
|
||||
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, filter)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
@ -299,23 +286,3 @@ func TestFiltering(t *testing.T) {
|
||||
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
|
||||
verifyWatchEvent(t, watcher, watch.Deleted, podFooPrime)
|
||||
}
|
||||
|
||||
/* TODO: So believe it or not... but this test is flakey with the go-etcd client library
|
||||
* which I'm surprised by. Apprently you can close the client that is performing the watch
|
||||
* and the watch *never returns.* I would like to still keep this test here and re-enable
|
||||
* with the new 2.2+ client library.
|
||||
func TestStorageError(t *testing.T) {
|
||||
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
|
||||
cacher := newTestCacher(etcdStorage)
|
||||
|
||||
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 1, storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
server.Terminate(t)
|
||||
|
||||
got := <-watcher.ResultChan()
|
||||
if got.Type != watch.Error {
|
||||
t.Errorf("Unexpected non-error")
|
||||
}
|
||||
} */
|
||||
|
@ -17,9 +17,9 @@ limitations under the License.
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"path"
|
||||
"reflect"
|
||||
@ -36,10 +36,9 @@ import (
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
"github.com/golang/glog"
|
||||
"golang.org/x/net/context"
|
||||
forked "k8s.io/kubernetes/third_party/forked/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
// storage.Config object for etcd.
|
||||
@ -56,27 +55,32 @@ func (c *EtcdConfig) GetType() string {
|
||||
|
||||
// implements storage.Config
|
||||
func (c *EtcdConfig) NewStorage() (storage.Interface, error) {
|
||||
etcdClient := etcd.NewClient(c.ServerList)
|
||||
if etcdClient == nil {
|
||||
return nil, errors.New("Failed to create new etcd client from serverlist")
|
||||
}
|
||||
transport := &http.Transport{
|
||||
Dial: forked.Dial,
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
cfg := etcd.Config{
|
||||
Endpoints: c.ServerList,
|
||||
// TODO: Determine if transport needs optimization
|
||||
Transport: &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
Dial: (&net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
}).Dial,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
MaxIdleConnsPerHost: 500,
|
||||
},
|
||||
MaxIdleConnsPerHost: 500,
|
||||
}
|
||||
etcdClient.SetTransport(transport)
|
||||
|
||||
etcdClient, err := etcd.New(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewEtcdStorage(etcdClient, c.Codec, c.Prefix), nil
|
||||
}
|
||||
|
||||
// Creates a new storage interface from the client
|
||||
// TODO: deprecate in favor of storage.Config abstraction over time
|
||||
func NewEtcdStorage(client *etcd.Client, codec runtime.Codec, prefix string) storage.Interface {
|
||||
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string) storage.Interface {
|
||||
return &etcdHelper{
|
||||
client: client,
|
||||
etcdclient: client,
|
||||
client: etcd.NewKeysAPI(client),
|
||||
codec: codec,
|
||||
versioner: APIObjectVersioner{},
|
||||
copier: api.Scheme,
|
||||
@ -87,9 +91,10 @@ func NewEtcdStorage(client *etcd.Client, codec runtime.Codec, prefix string) sto
|
||||
|
||||
// etcdHelper is the reference implementation of storage.Interface.
|
||||
type etcdHelper struct {
|
||||
client *etcd.Client
|
||||
codec runtime.Codec
|
||||
copier runtime.ObjectCopier
|
||||
etcdclient etcd.Client
|
||||
client etcd.KeysAPI
|
||||
codec runtime.Codec
|
||||
copier runtime.ObjectCopier
|
||||
// optional, has to be set to perform any atomic operations
|
||||
versioner storage.Versioner
|
||||
// prefix for all etcd keys
|
||||
@ -119,7 +124,20 @@ func (h *etcdHelper) Backends(ctx context.Context) []string {
|
||||
if ctx == nil {
|
||||
glog.Errorf("Context is nil")
|
||||
}
|
||||
return h.client.GetCluster()
|
||||
membersAPI := etcd.NewMembersAPI(h.etcdclient)
|
||||
members, err := membersAPI.List(ctx)
|
||||
if err != nil {
|
||||
glog.Errorf("Error obtaining etcd members list: %q", err)
|
||||
return nil
|
||||
}
|
||||
if 0 == len(members) {
|
||||
return nil
|
||||
}
|
||||
mlist := []string{""}
|
||||
for _, member := range members {
|
||||
mlist = append(mlist, member.ClientURLs...)
|
||||
}
|
||||
return mlist
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
@ -144,7 +162,11 @@ func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Ob
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
response, err := h.client.Create(key, string(data), ttl)
|
||||
opts := etcd.SetOptions{
|
||||
TTL: time.Duration(ttl) * time.Second,
|
||||
PrevExist: etcd.PrevNoExist,
|
||||
}
|
||||
response, err := h.client.Set(ctx, key, string(data), &opts)
|
||||
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -175,7 +197,11 @@ func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Objec
|
||||
if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
|
||||
create = false
|
||||
startTime := time.Now()
|
||||
response, err = h.client.CompareAndSwap(key, string(data), ttl, "", version)
|
||||
opts := etcd.SetOptions{
|
||||
TTL: time.Duration(ttl) * time.Second,
|
||||
PrevIndex: version,
|
||||
}
|
||||
response, err = h.client.Set(ctx, key, string(data), &opts)
|
||||
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -185,7 +211,14 @@ func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Objec
|
||||
if create {
|
||||
// Create will fail if a key already exists.
|
||||
startTime := time.Now()
|
||||
response, err = h.client.Create(key, string(data), ttl)
|
||||
opts := etcd.SetOptions{
|
||||
TTL: time.Duration(ttl) * time.Second,
|
||||
PrevExist: etcd.PrevNoExist,
|
||||
}
|
||||
response, err = h.client.Set(ctx, key, string(data), &opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
|
||||
}
|
||||
|
||||
@ -213,7 +246,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object)
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
response, err := h.client.Delete(key, false)
|
||||
response, err := h.client.Delete(ctx, key, nil)
|
||||
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
|
||||
if !etcdutil.IsEtcdNotFound(err) {
|
||||
// if the object that existed prior to the delete is returned by etcd, update out.
|
||||
@ -235,7 +268,7 @@ func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion stri
|
||||
}
|
||||
key = h.prefixEtcdKey(key)
|
||||
w := newEtcdWatcher(false, nil, filter, h.codec, h.versioner, nil, h)
|
||||
go w.etcdWatch(h.client, key, watchRV)
|
||||
go w.etcdWatch(ctx, h.client, key, watchRV)
|
||||
return w, nil
|
||||
}
|
||||
|
||||
@ -250,7 +283,7 @@ func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion
|
||||
}
|
||||
key = h.prefixEtcdKey(key)
|
||||
w := newEtcdWatcher(true, exceptKey(key), filter, h.codec, h.versioner, nil, h)
|
||||
go w.etcdWatch(h.client, key, watchRV)
|
||||
go w.etcdWatch(ctx, h.client, key, watchRV)
|
||||
return w, nil
|
||||
}
|
||||
|
||||
@ -271,7 +304,7 @@ func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr r
|
||||
glog.Errorf("Context is nil")
|
||||
}
|
||||
startTime := time.Now()
|
||||
response, err := h.client.Get(key, false, false)
|
||||
response, err := h.client.Get(ctx, key, nil)
|
||||
metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
|
||||
|
||||
if err != nil && !etcdutil.IsEtcdNotFound(err) {
|
||||
@ -324,7 +357,7 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.F
|
||||
key = h.prefixEtcdKey(key)
|
||||
startTime := time.Now()
|
||||
trace.Step("About to read etcd node")
|
||||
response, err := h.client.Get(key, false, false)
|
||||
response, err := h.client.Get(ctx, key, nil)
|
||||
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
|
||||
trace.Step("Etcd node read")
|
||||
if err != nil {
|
||||
@ -342,7 +375,7 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.F
|
||||
}
|
||||
trace.Step("Object decoded")
|
||||
if h.versioner != nil {
|
||||
if err := h.versioner.UpdateList(listObj, response.EtcdIndex); err != nil {
|
||||
if err := h.versioner.UpdateList(listObj, response.Index); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -429,10 +462,14 @@ func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node
|
||||
if ctx == nil {
|
||||
glog.Errorf("Context is nil")
|
||||
}
|
||||
result, err := h.client.Get(key, true, true)
|
||||
opts := etcd.GetOptions{
|
||||
Recursive: true,
|
||||
Sort: true,
|
||||
}
|
||||
result, err := h.client.Get(ctx, key, &opts)
|
||||
if err != nil {
|
||||
var index uint64
|
||||
if etcdError, ok := err.(*etcd.EtcdError); ok {
|
||||
if etcdError, ok := err.(etcd.Error); ok {
|
||||
index = etcdError.Index
|
||||
}
|
||||
nodes := make([]*etcd.Node, 0)
|
||||
@ -442,7 +479,7 @@ func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node
|
||||
return nodes, index, err
|
||||
}
|
||||
}
|
||||
return result.Node.Nodes, result.EtcdIndex, nil
|
||||
return result.Node.Nodes, result.Index, nil
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
@ -487,7 +524,7 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType
|
||||
ttl = 1
|
||||
}
|
||||
} else if res != nil {
|
||||
index = res.EtcdIndex
|
||||
index = res.Index
|
||||
}
|
||||
|
||||
if newTTL != nil {
|
||||
@ -506,7 +543,11 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType
|
||||
// First time this key has been used, try creating new value.
|
||||
if index == 0 {
|
||||
startTime := time.Now()
|
||||
response, err := h.client.Create(key, string(data), ttl)
|
||||
opts := etcd.SetOptions{
|
||||
TTL: time.Duration(ttl) * time.Second,
|
||||
PrevExist: etcd.PrevNoExist,
|
||||
}
|
||||
response, err := h.client.Set(ctx, key, string(data), &opts)
|
||||
metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime)
|
||||
if etcdutil.IsEtcdNodeExist(err) {
|
||||
continue
|
||||
@ -521,7 +562,12 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType
|
||||
|
||||
startTime := time.Now()
|
||||
// Swap origBody with data, if origBody is the latest etcd data.
|
||||
response, err := h.client.CompareAndSwap(key, string(data), ttl, origBody, index)
|
||||
opts := etcd.SetOptions{
|
||||
PrevValue: origBody,
|
||||
PrevIndex: index,
|
||||
TTL: time.Duration(ttl) * time.Second,
|
||||
}
|
||||
response, err := h.client.Set(ctx, key, string(data), &opts)
|
||||
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime)
|
||||
if etcdutil.IsEtcdTestFailed(err) {
|
||||
// Try again.
|
||||
|
@ -22,7 +22,7 @@ import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"golang.org/x/net/context"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
@ -55,7 +55,7 @@ func init() {
|
||||
)
|
||||
}
|
||||
|
||||
func newEtcdHelper(client *etcd.Client, codec runtime.Codec, prefix string) etcdHelper {
|
||||
func newEtcdHelper(client etcd.Client, codec runtime.Codec, prefix string) etcdHelper {
|
||||
return *NewEtcdStorage(client, codec, prefix).(*etcdHelper)
|
||||
}
|
||||
|
||||
|
@ -29,8 +29,9 @@ import (
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
"github.com/golang/glog"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// Etcd watch event actions
|
||||
@ -85,7 +86,8 @@ type etcdWatcher struct {
|
||||
|
||||
etcdIncoming chan *etcd.Response
|
||||
etcdError chan error
|
||||
etcdStop chan bool
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
etcdCallEnded chan struct{}
|
||||
|
||||
outgoing chan watch.Event
|
||||
@ -124,10 +126,12 @@ func newEtcdWatcher(list bool, include includeFunc, filter storage.FilterFunc, e
|
||||
// monitor how much of this buffer is actually used.
|
||||
etcdIncoming: make(chan *etcd.Response, 100),
|
||||
etcdError: make(chan error, 1),
|
||||
etcdStop: make(chan bool),
|
||||
outgoing: make(chan watch.Event),
|
||||
userStop: make(chan struct{}),
|
||||
stopped: false,
|
||||
cache: cache,
|
||||
ctx: nil,
|
||||
cancel: nil,
|
||||
}
|
||||
w.emit = func(e watch.Event) { w.outgoing <- e }
|
||||
go w.translate()
|
||||
@ -136,37 +140,54 @@ func newEtcdWatcher(list bool, include includeFunc, filter storage.FilterFunc, e
|
||||
|
||||
// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
|
||||
// as a goroutine.
|
||||
func (w *etcdWatcher) etcdWatch(client *etcd.Client, key string, resourceVersion uint64) {
|
||||
func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key string, resourceVersion uint64) {
|
||||
defer util.HandleCrash()
|
||||
defer close(w.etcdError)
|
||||
defer close(w.etcdIncoming)
|
||||
if resourceVersion == 0 {
|
||||
latest, err := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming)
|
||||
latest, err := etcdGetInitialWatchState(ctx, client, key, w.list, w.etcdIncoming)
|
||||
if err != nil {
|
||||
w.etcdError <- err
|
||||
return
|
||||
}
|
||||
resourceVersion = latest + 1
|
||||
resourceVersion = latest
|
||||
}
|
||||
_, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop)
|
||||
if err != nil && err != etcd.ErrWatchStoppedByUser {
|
||||
w.etcdError <- err
|
||||
|
||||
opts := etcd.WatcherOptions{
|
||||
Recursive: w.list,
|
||||
AfterIndex: resourceVersion,
|
||||
}
|
||||
watcher := client.Watcher(key, &opts)
|
||||
w.ctx, w.cancel = context.WithCancel(ctx)
|
||||
|
||||
for {
|
||||
resp, err := watcher.Next(w.ctx)
|
||||
if err != nil {
|
||||
w.etcdError <- err
|
||||
return
|
||||
}
|
||||
w.etcdIncoming <- resp
|
||||
}
|
||||
}
|
||||
|
||||
// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent
|
||||
func etcdGetInitialWatchState(client *etcd.Client, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) {
|
||||
resp, err := client.Get(key, false, recursive)
|
||||
func etcdGetInitialWatchState(ctx context.Context, client etcd.KeysAPI, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) {
|
||||
opts := etcd.GetOptions{
|
||||
Recursive: recursive,
|
||||
Sort: false,
|
||||
}
|
||||
resp, err := client.Get(ctx, key, &opts)
|
||||
if err != nil {
|
||||
if !etcdutil.IsEtcdNotFound(err) {
|
||||
glog.Errorf("watch was unable to retrieve the current index for the provided key (%q): %v", key, err)
|
||||
return resourceVersion, err
|
||||
}
|
||||
if etcdError, ok := err.(*etcd.EtcdError); ok {
|
||||
if etcdError, ok := err.(etcd.Error); ok {
|
||||
resourceVersion = etcdError.Index
|
||||
}
|
||||
return resourceVersion, nil
|
||||
}
|
||||
resourceVersion = resp.EtcdIndex
|
||||
resourceVersion = resp.Index
|
||||
convertRecursiveResponse(resp.Node, resp, incoming)
|
||||
return
|
||||
}
|
||||
@ -228,7 +249,6 @@ func (w *etcdWatcher) translate() {
|
||||
}
|
||||
return
|
||||
case <-w.userStop:
|
||||
w.etcdStop <- true
|
||||
return
|
||||
case res, ok := <-w.etcdIncoming:
|
||||
if ok {
|
||||
@ -407,7 +427,10 @@ func (w *etcdWatcher) ResultChan() <-chan watch.Event {
|
||||
func (w *etcdWatcher) Stop() {
|
||||
w.stopLock.Lock()
|
||||
defer w.stopLock.Unlock()
|
||||
// Prevent double channel closes.
|
||||
if w.cancel != nil {
|
||||
w.cancel()
|
||||
w.cancel = nil
|
||||
}
|
||||
if !w.stopped {
|
||||
w.stopped = true
|
||||
close(w.userStop)
|
||||
|
@ -22,7 +22,6 @@ import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
@ -31,6 +30,7 @@ import (
|
||||
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
@ -217,15 +217,11 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
/* TODO: So believe it or not... but this test is flakey with the go-etcd client library
|
||||
* which I'm surprised by. Apprently you can close the client that is performing the watch
|
||||
* and the watch *never returns.* I would like to still keep this test here and re-enable
|
||||
* with the new 2.2+ client library.
|
||||
func TestWatchEtcdError(t *testing.T) {
|
||||
codec := testapi.Default.Codec()
|
||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||
h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
|
||||
watching, err := h.Watch(context.TODO(), "/some/key", 4, storage.Everything)
|
||||
watching, err := h.Watch(context.TODO(), "/some/key", "4", storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
@ -236,7 +232,7 @@ func TestWatchEtcdError(t *testing.T) {
|
||||
t.Fatalf("Unexpected non-error")
|
||||
}
|
||||
watching.Stop()
|
||||
} */
|
||||
}
|
||||
|
||||
func TestWatch(t *testing.T) {
|
||||
codec := testapi.Default.Codec()
|
||||
|
@ -26,19 +26,19 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/etcdhttp"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
goetcd "github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
// EtcdTestServer encapsulates the datastructures needed to start local instance for testing
|
||||
type EtcdTestServer struct {
|
||||
etcdserver.ServerConfig
|
||||
PeerListeners, ClientListeners []net.Listener
|
||||
Client *goetcd.Client
|
||||
Client etcd.Client
|
||||
|
||||
raftHandler http.Handler
|
||||
s *etcdserver.EtcdServer
|
||||
@ -126,7 +126,7 @@ func (m *EtcdTestServer) launch(t *testing.T) error {
|
||||
|
||||
// Terminate will shutdown the running etcd server
|
||||
func (m *EtcdTestServer) Terminate(t *testing.T) {
|
||||
m.Client.Close()
|
||||
m.Client = nil
|
||||
m.s.Stop()
|
||||
for _, hs := range m.hss {
|
||||
hs.CloseClientConnections()
|
||||
@ -145,9 +145,12 @@ func NewEtcdTestClientServer(t *testing.T) *EtcdTestServer {
|
||||
t.Fatal("Failed to start etcd server error=%v", err)
|
||||
return nil
|
||||
}
|
||||
server.Client = goetcd.NewClient(server.ClientURLs.StringSlice())
|
||||
if server.Client == nil {
|
||||
t.Errorf("Failed to connect to local etcd server")
|
||||
cfg := etcd.Config{
|
||||
Endpoints: server.ClientURLs.StringSlice(),
|
||||
}
|
||||
server.Client, err = etcd.New(cfg)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected Error in NewEtcdTestClientServer (%v)", err)
|
||||
defer server.Terminate(t)
|
||||
return nil
|
||||
}
|
||||
|
@ -22,56 +22,46 @@ import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
|
||||
goetcd "github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
const (
|
||||
etcdErrorCodeNotFound = 100
|
||||
etcdErrorCodeTestFailed = 101
|
||||
etcdErrorCodeNodeExist = 105
|
||||
etcdErrorCodeValueRequired = 200
|
||||
etcdErrorCodeWatchExpired = 401
|
||||
etcdErrorCodeUnreachable = 501
|
||||
)
|
||||
|
||||
var (
|
||||
etcdErrorNotFound = &goetcd.EtcdError{ErrorCode: etcdErrorCodeNotFound}
|
||||
etcdErrorTestFailed = &goetcd.EtcdError{ErrorCode: etcdErrorCodeTestFailed}
|
||||
etcdErrorNodeExist = &goetcd.EtcdError{ErrorCode: etcdErrorCodeNodeExist}
|
||||
etcdErrorValueRequired = &goetcd.EtcdError{ErrorCode: etcdErrorCodeValueRequired}
|
||||
etcdErrorWatchExpired = &goetcd.EtcdError{ErrorCode: etcdErrorCodeWatchExpired}
|
||||
etcdErrorUnreachable = &goetcd.EtcdError{ErrorCode: etcdErrorCodeUnreachable}
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
)
|
||||
|
||||
// IsEtcdNotFound returns true if and only if err is an etcd not found error.
|
||||
func IsEtcdNotFound(err error) bool {
|
||||
return isEtcdErrorNum(err, etcdErrorCodeNotFound)
|
||||
return isEtcdErrorNum(err, etcd.ErrorCodeKeyNotFound)
|
||||
}
|
||||
|
||||
// IsEtcdNodeExist returns true if and only if err is an etcd node already exist error.
|
||||
func IsEtcdNodeExist(err error) bool {
|
||||
return isEtcdErrorNum(err, etcdErrorCodeNodeExist)
|
||||
return isEtcdErrorNum(err, etcd.ErrorCodeNodeExist)
|
||||
}
|
||||
|
||||
// IsEtcdTestFailed returns true if and only if err is an etcd write conflict.
|
||||
func IsEtcdTestFailed(err error) bool {
|
||||
return isEtcdErrorNum(err, etcdErrorCodeTestFailed)
|
||||
return isEtcdErrorNum(err, etcd.ErrorCodeTestFailed)
|
||||
}
|
||||
|
||||
// IsEtcdWatchExpired returns true if and only if err indicates the watch has expired.
|
||||
func IsEtcdWatchExpired(err error) bool {
|
||||
return isEtcdErrorNum(err, etcdErrorCodeWatchExpired)
|
||||
// NOTE: This seems weird why it wouldn't be etcd.ErrorCodeWatcherCleared
|
||||
// I'm using the previous matching value
|
||||
return isEtcdErrorNum(err, etcd.ErrorCodeEventIndexCleared)
|
||||
}
|
||||
|
||||
// IsEtcdUnreachable returns true if and only if err indicates the server could not be reached.
|
||||
func IsEtcdUnreachable(err error) bool {
|
||||
return isEtcdErrorNum(err, etcdErrorCodeUnreachable)
|
||||
// NOTE: The logic has changed previous error code no longer applies
|
||||
return err == etcd.ErrClusterUnavailable
|
||||
}
|
||||
|
||||
// isEtcdErrorNum returns true if and only if err is an etcd error, whose errorCode matches errorCode
|
||||
func isEtcdErrorNum(err error, errorCode int) bool {
|
||||
etcdError, ok := err.(*goetcd.EtcdError)
|
||||
return ok && etcdError != nil && etcdError.ErrorCode == errorCode
|
||||
if err != nil {
|
||||
if etcdError, ok := err.(etcd.Error); ok {
|
||||
return etcdError.Code == errorCode
|
||||
}
|
||||
// NOTE: There are other error types returned
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// GetEtcdVersion performs a version check against the provided Etcd server,
|
||||
|
@ -26,7 +26,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
@ -38,8 +38,7 @@ func TestIsEtcdNotFound(t *testing.T) {
|
||||
t.Errorf("Expected %#v to return %v, but it did not", err, isNotFound)
|
||||
}
|
||||
}
|
||||
try(etcdErrorNotFound, true)
|
||||
try(&etcd.EtcdError{ErrorCode: 101}, false)
|
||||
try(&etcd.Error{Code: 101}, false)
|
||||
try(nil, false)
|
||||
try(fmt.Errorf("some other kind of error"), false)
|
||||
}
|
||||
|
@ -95,13 +95,15 @@ type Interface interface {
|
||||
|
||||
// Watch begins watching the specified key. Events are decoded into API objects,
|
||||
// and any items passing 'filter' are sent down to returned watch.Interface.
|
||||
// resourceVersion may be used to specify what version to begin watching
|
||||
// resourceVersion may be used to specify what version to begin watching,
|
||||
// which should be the current resourceVersion, and no longer rv+1
|
||||
// (e.g. reconnecting without missing any updates).
|
||||
Watch(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error)
|
||||
|
||||
// WatchList begins watching the specified key's items. Items are decoded into API
|
||||
// objects and any item passing 'filter' are sent down to returned watch.Interface.
|
||||
// resourceVersion may be used to specify what version to begin watching
|
||||
// resourceVersion may be used to specify what version to begin watching,
|
||||
// which should be the current resourceVersion, and no longer rv+1
|
||||
// (e.g. reconnecting without missing any updates).
|
||||
WatchList(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error)
|
||||
|
||||
|
@ -53,7 +53,7 @@ func ParseWatchResourceVersion(resourceVersion string) (uint64, error) {
|
||||
field.Invalid(field.NewPath("resourceVersion"), resourceVersion, err.Error()),
|
||||
})
|
||||
}
|
||||
return version + 1, nil
|
||||
return version, nil
|
||||
}
|
||||
|
||||
// ParseListResourceVersion takes a resource version argument and converts it to
|
||||
|
@ -31,8 +31,8 @@ func TestEtcdParseWatchResourceVersion(t *testing.T) {
|
||||
{Version: "", ExpectVersion: 0},
|
||||
{Version: "a", Err: true},
|
||||
{Version: " ", Err: true},
|
||||
{Version: "1", ExpectVersion: 2},
|
||||
{Version: "10", ExpectVersion: 11},
|
||||
{Version: "1", ExpectVersion: 1},
|
||||
{Version: "10", ExpectVersion: 10},
|
||||
}
|
||||
for _, testCase := range testCases {
|
||||
version, err := ParseWatchResourceVersion(testCase.Version)
|
||||
|
@ -26,24 +26,26 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
"k8s.io/kubernetes/pkg/storage/etcd"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func TestSet(t *testing.T) {
|
||||
client := framework.NewEtcdClient()
|
||||
etcdStorage := etcd.NewEtcdStorage(client, testapi.Default.Codec(), "")
|
||||
keysAPI := etcd.NewKeysAPI(client)
|
||||
etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), "")
|
||||
ctx := context.TODO()
|
||||
framework.WithEtcdKey(func(key string) {
|
||||
testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
||||
if err := etcdStorage.Set(ctx, key, &testObject, nil, 0); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
resp, err := client.Get(key, false, false)
|
||||
resp, err := keysAPI.Get(ctx, key, nil)
|
||||
if err != nil || resp.Node == nil {
|
||||
t.Fatalf("unexpected error: %v %v", err, resp)
|
||||
}
|
||||
@ -60,7 +62,8 @@ func TestSet(t *testing.T) {
|
||||
|
||||
func TestGet(t *testing.T) {
|
||||
client := framework.NewEtcdClient()
|
||||
etcdStorage := etcd.NewEtcdStorage(client, testapi.Default.Codec(), "")
|
||||
keysAPI := etcd.NewKeysAPI(client)
|
||||
etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), "")
|
||||
ctx := context.TODO()
|
||||
framework.WithEtcdKey(func(key string) {
|
||||
testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
||||
@ -68,7 +71,7 @@ func TestGet(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
_, err = client.Set(key, string(coded), 0)
|
||||
_, err = keysAPI.Set(ctx, key, string(coded), nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
@ -86,7 +89,8 @@ func TestGet(t *testing.T) {
|
||||
|
||||
func TestWriteTTL(t *testing.T) {
|
||||
client := framework.NewEtcdClient()
|
||||
etcdStorage := etcd.NewEtcdStorage(client, testapi.Default.Codec(), "")
|
||||
keysAPI := etcd.NewKeysAPI(client)
|
||||
etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), "")
|
||||
ctx := context.TODO()
|
||||
framework.WithEtcdKey(func(key string) {
|
||||
testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
||||
@ -111,7 +115,7 @@ func TestWriteTTL(t *testing.T) {
|
||||
if result.Name != "out" {
|
||||
t.Errorf("unexpected response: %#v", result)
|
||||
}
|
||||
if res, err := client.Get(key, false, false); err != nil || res == nil || res.Node.TTL != 10 {
|
||||
if res, err := keysAPI.Get(ctx, key, nil); err != nil || res == nil || res.Node.TTL != 10 {
|
||||
t.Fatalf("unexpected get: %v %#v", err, res)
|
||||
}
|
||||
|
||||
@ -132,7 +136,7 @@ func TestWriteTTL(t *testing.T) {
|
||||
if result.Name != "out2" {
|
||||
t.Errorf("unexpected response: %#v", result)
|
||||
}
|
||||
if res, err := client.Get(key, false, false); err != nil || res == nil || res.Node.TTL <= 1 {
|
||||
if res, err := keysAPI.Get(ctx, key, nil); err != nil || res == nil || res.Node.TTL <= 1 {
|
||||
t.Fatalf("unexpected get: %v %#v", err, res)
|
||||
}
|
||||
})
|
||||
@ -140,11 +144,12 @@ func TestWriteTTL(t *testing.T) {
|
||||
|
||||
func TestWatch(t *testing.T) {
|
||||
client := framework.NewEtcdClient()
|
||||
etcdStorage := etcd.NewEtcdStorage(client, testapi.Default.Codec(), etcdtest.PathPrefix())
|
||||
keysAPI := etcd.NewKeysAPI(client)
|
||||
etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), etcdtest.PathPrefix())
|
||||
ctx := context.TODO()
|
||||
framework.WithEtcdKey(func(key string) {
|
||||
key = etcdtest.AddPrefix(key)
|
||||
resp, err := client.Set(key, runtime.EncodeOrDie(testapi.Default.Codec(), &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0)
|
||||
resp, err := keysAPI.Set(ctx, key, runtime.EncodeOrDie(testapi.Default.Codec(), &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}), nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
@ -178,7 +183,7 @@ func TestWatch(t *testing.T) {
|
||||
}
|
||||
|
||||
// should return the previously deleted item in the watch, but with the latest index
|
||||
resp, err = client.Delete(key, false)
|
||||
resp, err = keysAPI.Delete(ctx, key, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -20,8 +20,10 @@ import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
"github.com/golang/glog"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
@ -35,15 +37,22 @@ func init() {
|
||||
RequireEtcd()
|
||||
}
|
||||
|
||||
func NewEtcdClient() *etcd.Client {
|
||||
return etcd.NewClient([]string{})
|
||||
func NewEtcdClient() etcd.Client {
|
||||
cfg := etcd.Config{
|
||||
Endpoints: []string{"http://127.0.0.1:4001"},
|
||||
}
|
||||
client, err := etcd.New(cfg)
|
||||
if err != nil {
|
||||
glog.Fatalf("unable to connect to etcd for testing: %v", err)
|
||||
}
|
||||
return client
|
||||
}
|
||||
|
||||
func NewEtcdStorage() storage.Interface {
|
||||
return etcdstorage.NewEtcdStorage(NewEtcdClient(), testapi.Default.Codec(), etcdtest.PathPrefix())
|
||||
}
|
||||
|
||||
func NewExtensionsEtcdStorage(client *etcd.Client) storage.Interface {
|
||||
func NewExtensionsEtcdStorage(client etcd.Client) storage.Interface {
|
||||
if client == nil {
|
||||
client = NewEtcdClient()
|
||||
}
|
||||
@ -51,14 +60,14 @@ func NewExtensionsEtcdStorage(client *etcd.Client) storage.Interface {
|
||||
}
|
||||
|
||||
func RequireEtcd() {
|
||||
if _, err := NewEtcdClient().Get("/", false, false); err != nil {
|
||||
if _, err := etcd.NewKeysAPI(NewEtcdClient()).Get(context.TODO(), "/", nil); err != nil {
|
||||
glog.Fatalf("unable to connect to etcd for testing: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func WithEtcdKey(f func(string)) {
|
||||
prefix := fmt.Sprintf("/test-%d", rand.Int63())
|
||||
defer NewEtcdClient().Delete(prefix, true)
|
||||
defer etcd.NewKeysAPI(NewEtcdClient()).Delete(context.TODO(), prefix, &etcd.DeleteOptions{Recursive: true})
|
||||
f(prefix)
|
||||
}
|
||||
|
||||
@ -68,13 +77,13 @@ func WithEtcdKey(f func(string)) {
|
||||
// of the test run.
|
||||
func DeleteAllEtcdKeys() {
|
||||
glog.Infof("Deleting all etcd keys")
|
||||
client := NewEtcdClient()
|
||||
keys, err := client.Get("/", false, false)
|
||||
keysAPI := etcd.NewKeysAPI(NewEtcdClient())
|
||||
keys, err := keysAPI.Get(context.TODO(), "/", nil)
|
||||
if err != nil {
|
||||
glog.Fatalf("Unable to list root etcd keys: %v", err)
|
||||
}
|
||||
for _, node := range keys.Node.Nodes {
|
||||
if _, err := client.Delete(node.Key, true); err != nil {
|
||||
if _, err := keysAPI.Delete(context.TODO(), node.Key, &etcd.DeleteOptions{Recursive: true}); err != nil {
|
||||
glog.Fatalf("Unable delete key: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -22,34 +22,42 @@ import (
|
||||
"math/rand"
|
||||
"os"
|
||||
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
"github.com/golang/glog"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func newEtcdClient() *etcd.Client {
|
||||
return etcd.NewClient([]string{})
|
||||
func newEtcdClient() etcd.Client {
|
||||
cfg := etcd.Config{
|
||||
Endpoints: []string{"http://127.0.0.1:4001"},
|
||||
}
|
||||
client, err := etcd.New(cfg)
|
||||
if err != nil {
|
||||
glog.Fatalf("unable to connect to etcd for testing: %v", err)
|
||||
}
|
||||
return client
|
||||
}
|
||||
|
||||
func requireEtcd() {
|
||||
if _, err := newEtcdClient().Get("/", false, false); err != nil {
|
||||
if _, err := etcd.NewKeysAPI(newEtcdClient()).Get(context.TODO(), "/", nil); err != nil {
|
||||
glog.Fatalf("unable to connect to etcd for integration testing: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func withEtcdKey(f func(string)) {
|
||||
prefix := fmt.Sprintf("/test-%d", rand.Int63())
|
||||
defer newEtcdClient().Delete(prefix, true)
|
||||
defer etcd.NewKeysAPI(newEtcdClient()).Delete(context.TODO(), prefix, &etcd.DeleteOptions{Recursive: true})
|
||||
f(prefix)
|
||||
}
|
||||
|
||||
func deleteAllEtcdKeys() {
|
||||
client := newEtcdClient()
|
||||
keys, err := client.Get("/", false, false)
|
||||
keysAPI := etcd.NewKeysAPI(newEtcdClient())
|
||||
keys, err := keysAPI.Get(context.TODO(), "/", nil)
|
||||
if err != nil {
|
||||
glog.Fatalf("Unable to list root etcd keys: %v", err)
|
||||
}
|
||||
for _, node := range keys.Node.Nodes {
|
||||
if _, err := client.Delete(node.Key, true); err != nil {
|
||||
if _, err := keysAPI.Delete(context.TODO(), node.Key, &etcd.DeleteOptions{Recursive: true}); err != nil {
|
||||
glog.Fatalf("Unable delete key: %v", err)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user