diff --git a/cluster/reconcile.go b/cluster/reconcile.go index 657f0f8b..4e8979da 100644 --- a/cluster/reconcile.go +++ b/cluster/reconcile.go @@ -203,12 +203,14 @@ func addEtcdMembers(ctx context.Context, currentCluster, kubeCluster *Cluster, k } for _, etcdHost := range etcdToAdd { // Check if the host already part of the cluster -- this will cover cluster with lost quorum - isEtcdMember, err := services.IsEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory, clientCert, clientKey) + isEtcdMember, err := services.IsEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory, + currentCluster.Version, clientCert, clientKey) if err != nil { return err } if !isEtcdMember { - if err := services.AddEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory, clientCert, clientKey); err != nil { + if err := services.AddEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory, + currentCluster.Version, clientCert, clientKey); err != nil { return err } } @@ -245,7 +247,8 @@ func deleteEtcdMembers(ctx context.Context, currentCluster, kubeCluster *Cluster for _, etcdHost := range etcdToDelete { etcdHost.IsEtcd = false - if err := services.RemoveEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory, clientCert, clientKey, etcdNodePlanMap); err != nil { + if err := services.RemoveEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory, + currentCluster.Version, clientCert, clientKey, etcdNodePlanMap); err != nil { log.Warnf(ctx, "[reconcile] %v", err) continue } diff --git a/services/etcd.go b/services/etcd.go index c0af7941..f13765c7 100644 --- a/services/etcd.go +++ b/services/etcd.go @@ -8,7 +8,7 @@ import ( "strings" "time" - etcdclient "github.com/coreos/etcd/client" + "github.com/blang/semver" "github.com/docker/docker/api/types/container" "github.com/pkg/errors" "github.com/rancher/rke/docker" @@ -19,6 +19,7 @@ import ( v3 "github.com/rancher/rke/types" "github.com/rancher/rke/util" "github.com/sirupsen/logrus" + etcdclientv2 "go.etcd.io/etcd/client/v2" "golang.org/x/sync/errgroup" ) @@ -167,7 +168,8 @@ func RemoveEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, force bool) e return nil } -func AddEtcdMember(ctx context.Context, toAddEtcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte) error { +func AddEtcdMember(ctx context.Context, toAddEtcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, + k8sVersion string, cert, key []byte) error { log.Infof(ctx, "[add/%s] Adding member [etcd-%s] to etcd cluster", ETCDRole, toAddEtcdHost.HostnameOverride) peerURL := fmt.Sprintf("https://%s:2380", toAddEtcdHost.InternalAddress) added := false @@ -175,15 +177,27 @@ func AddEtcdMember(ctx context.Context, toAddEtcdHost *hosts.Host, etcdHosts []* if host.Address == toAddEtcdHost.Address { continue } - etcdClient, err := getEtcdClient(ctx, host, localConnDialerFactory, cert, key) - if err != nil { - logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err) - continue - } - memAPI := etcdclient.NewMembersAPI(etcdClient) - if _, err := memAPI.Add(ctx, peerURL); err != nil { - logrus.Debugf("Failed to Add etcd member [%s] from host: %v", host.Address, err) - continue + if etcdClientV3Range(k8sVersion) { + etcdClient, err := getEtcdClientV3(ctx, host, localConnDialerFactory, cert, key) + if err != nil { + logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err) + continue + } + if _, err := etcdClient.MemberAdd(ctx, []string{peerURL}); err != nil { + logrus.Debugf("Failed to Add etcd member [%s] from host: %v", host.Address, err) + continue + } + } else { + etcdClient, err := getEtcdClientV2(ctx, host, localConnDialerFactory, cert, key) + if err != nil { + logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err) + continue + } + memAPI := etcdclientv2.NewMembersAPI(etcdClient) + if _, err := memAPI.Add(ctx, peerURL); err != nil { + logrus.Debugf("Failed to Add etcd member [%s] from host: %v", host.Address, err) + continue + } } added = true break @@ -195,32 +209,70 @@ func AddEtcdMember(ctx context.Context, toAddEtcdHost *hosts.Host, etcdHosts []* return nil } -func RemoveEtcdMember(ctx context.Context, toDeleteEtcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte, etcdNodePlanMap map[string]v3.RKEConfigNodePlan) error { +func etcdClientV3Range(k8sVersion string) bool { + toMatch, err := semver.Make(k8sVersion[1:]) + if err != nil { + logrus.Warnf("Cluster version [%s] can not be parsed as semver", k8sVersion) + } + etcdv3Range, err := semver.ParseRange(">=1.22.0-rancher0") + if err != nil { + logrus.Warnf("Failed to parse semver range for checking etcd v3 client range") + } + return etcdv3Range(toMatch) +} + +func RemoveEtcdMember(ctx context.Context, toDeleteEtcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, + k8sVersion string, cert, key []byte, etcdNodePlanMap map[string]v3.RKEConfigNodePlan) error { log.Infof(ctx, "[remove/%s] Removing member [etcd-%s] from etcd cluster", ETCDRole, toDeleteEtcdHost.HostnameOverride) - var mID string + var mIDv3 uint64 + var mIDv2 string removed := false for _, host := range etcdHosts { - etcdClient, err := getEtcdClient(ctx, host, localConnDialerFactory, cert, key) - if err != nil { - logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err) - continue - } - memAPI := etcdclient.NewMembersAPI(etcdClient) - members, err := memAPI.List(ctx) - if err != nil { - logrus.Debugf("Failed to list etcd members from host [%s]: %v", host.Address, err) - continue - } - for _, member := range members { - if member.Name == fmt.Sprintf("etcd-%s", toDeleteEtcdHost.HostnameOverride) { - mID = member.ID - break + if etcdClientV3Range(k8sVersion) { + etcdClient, err := getEtcdClientV3(ctx, host, localConnDialerFactory, cert, key) + if err != nil { + logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err) + continue + } + members, err := etcdClient.MemberList(ctx) + if err != nil { + logrus.Debugf("Failed to list etcd members from host [%s]: %v", host.Address, err) + continue + } + for _, member := range members.Members { + if member.Name == fmt.Sprintf("etcd-%s", toDeleteEtcdHost.HostnameOverride) { + mIDv3 = member.ID + break + } + } + if _, err := etcdClient.MemberRemove(ctx, mIDv3); err != nil { + logrus.Debugf("Failed to list etcd members from host [%s]: %v", host.Address, err) + continue + } + } else { + etcdClient, err := getEtcdClientV2(ctx, host, localConnDialerFactory, cert, key) + if err != nil { + logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err) + continue + } + memAPI := etcdclientv2.NewMembersAPI(etcdClient) + members, err := memAPI.List(ctx) + if err != nil { + logrus.Debugf("Failed to list etcd members from host [%s]: %v", host.Address, err) + continue + } + for _, member := range members { + if member.Name == fmt.Sprintf("etcd-%s", toDeleteEtcdHost.HostnameOverride) { + mIDv2 = member.ID + break + } + } + if err := memAPI.Remove(ctx, mIDv2); err != nil { + logrus.Debugf("Failed to list etcd members from host [%s]: %v", host.Address, err) + continue } } - if err := memAPI.Remove(ctx, mID); err != nil { - logrus.Debugf("Failed to list etcd members from host [%s]: %v", host.Address, err) - continue - } + etcdMemberDeletedTime := time.Now() // Need to health check after successful member remove (especially for leader re-election) // We will check all hosts to see if the cluster becomes healthy @@ -278,32 +330,55 @@ func ReloadEtcdCluster(ctx context.Context, readyEtcdHosts []*hosts.Host, newHos return nil } -func IsEtcdMember(ctx context.Context, etcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte) (bool, error) { +func IsEtcdMember(ctx context.Context, etcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, + k8sVersion string, cert, key []byte) (bool, error) { var listErr error peerURL := fmt.Sprintf("https://%s:2380", etcdHost.InternalAddress) for _, host := range etcdHosts { if host.Address == etcdHost.Address { continue } - etcdClient, err := getEtcdClient(ctx, host, localConnDialerFactory, cert, key) - if err != nil { - listErr = errors.Wrapf(err, "Failed to create etcd client for host [%s]", host.Address) - logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err) - continue - } - memAPI := etcdclient.NewMembersAPI(etcdClient) - members, err := memAPI.List(ctx) - if err != nil { - listErr = errors.Wrapf(err, "Failed to create etcd client for host [%s]", host.Address) - logrus.Debugf("Failed to list etcd cluster members [%s]: %v", etcdHost.Address, err) - continue - } - for _, member := range members { - if strings.Contains(member.PeerURLs[0], peerURL) { - logrus.Infof("[etcd] member [%s] is already part of the etcd cluster", etcdHost.Address) - return true, nil + if etcdClientV3Range(k8sVersion) { + etcdClient, err := getEtcdClientV3(ctx, host, localConnDialerFactory, cert, key) + if err != nil { + listErr = errors.Wrapf(err, "Failed to create etcd client for host [%s]", host.Address) + logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err) + continue + } + members, err := etcdClient.MemberList(ctx) + if err != nil { + listErr = errors.Wrapf(err, "Failed to create etcd client for host [%s]", host.Address) + logrus.Debugf("Failed to list etcd cluster members [%s]: %v", etcdHost.Address, err) + continue + } + for _, member := range members.Members { + if strings.Contains(member.PeerURLs[0], peerURL) { + logrus.Infof("[etcd] member [%s] is already part of the etcd cluster", etcdHost.Address) + return true, nil + } + } + } else { + etcdClient, err := getEtcdClientV2(ctx, host, localConnDialerFactory, cert, key) + if err != nil { + listErr = errors.Wrapf(err, "Failed to create etcd client for host [%s]", host.Address) + logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err) + continue + } + memAPI := etcdclientv2.NewMembersAPI(etcdClient) + members, err := memAPI.List(ctx) + if err != nil { + listErr = errors.Wrapf(err, "Failed to create etcd client for host [%s]", host.Address) + logrus.Debugf("Failed to list etcd cluster members [%s]: %v", etcdHost.Address, err) + continue + } + for _, member := range members { + if strings.Contains(member.PeerURLs[0], peerURL) { + logrus.Infof("[etcd] member [%s] is already part of the etcd cluster", etcdHost.Address) + return true, nil + } } } + // reset the list of errors to handle new hosts listErr = nil break diff --git a/services/etcd_util.go b/services/etcd_util.go index aaca193b..cbe47127 100644 --- a/services/etcd_util.go +++ b/services/etcd_util.go @@ -11,12 +11,14 @@ import ( "strings" "time" - etcdclient "github.com/coreos/etcd/client" "github.com/rancher/rke/hosts" "github.com/sirupsen/logrus" + etcdclientv2 "go.etcd.io/etcd/client/v2" + etcdclientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/grpc" ) -func getEtcdClient(ctx context.Context, etcdHost *hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte) (etcdclient.Client, error) { +func getEtcdClientV2(ctx context.Context, etcdHost *hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte) (etcdclientv2.Client, error) { dialer, err := getEtcdDialer(localConnDialerFactory, etcdHost) if err != nil { return nil, fmt.Errorf("failed to create a dialer for host [%s]: %v", etcdHost.Address, err) @@ -26,18 +28,44 @@ func getEtcdClient(ctx context.Context, etcdHost *hosts.Host, localConnDialerFac return nil, err } - var DefaultEtcdTransport etcdclient.CancelableTransport = &http.Transport{ + var defaultEtcdTransport etcdclientv2.CancelableTransport = &http.Transport{ Dial: dialer, TLSClientConfig: tlsConfig, TLSHandshakeTimeout: 10 * time.Second, } - cfg := etcdclient.Config{ + cfg := etcdclientv2.Config{ Endpoints: []string{"https://" + etcdHost.InternalAddress + ":2379"}, - Transport: DefaultEtcdTransport, + Transport: defaultEtcdTransport, } - return etcdclient.New(cfg) + return etcdclientv2.New(cfg) +} + +func getEtcdClientV3(ctx context.Context, etcdHost *hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte) (*etcdclientv3.Client, error) { + dialer, err := getEtcdDialer(localConnDialerFactory, etcdHost) + if err != nil { + return nil, fmt.Errorf("failed to create a dialer for host [%s]: %v", etcdHost.Address, err) + } + tlsConfig, err := getEtcdTLSConfig(cert, key) + if err != nil { + return nil, err + } + + cfg := etcdclientv3.Config{ + Endpoints: []string{"https://" + etcdHost.InternalAddress + ":2379"}, + TLS: tlsConfig, + DialOptions: []grpc.DialOption{grpc.WithContextDialer(wrapper(dialer))}, + } + + return etcdclientv3.New(cfg) + +} + +func wrapper(f func(network, address string) (net.Conn, error)) func(context.Context, string) (net.Conn, error) { + return func(_ context.Context, address string) (net.Conn, error) { + return f("tcp", address) + } } func isEtcdHealthy(localConnDialerFactory hosts.DialerFactory, host *hosts.Host, cert, key []byte, url string) error {