mirror of
https://github.com/rancher/rke.git
synced 2025-08-16 05:53:00 +00:00
update etcd client to use both v3 and v2
v3 for >=1.22, v2 otherwise
This commit is contained in:
parent
fe60bf44d7
commit
594296bb10
@ -203,12 +203,14 @@ func addEtcdMembers(ctx context.Context, currentCluster, kubeCluster *Cluster, k
|
|||||||
}
|
}
|
||||||
for _, etcdHost := range etcdToAdd {
|
for _, etcdHost := range etcdToAdd {
|
||||||
// Check if the host already part of the cluster -- this will cover cluster with lost quorum
|
// Check if the host already part of the cluster -- this will cover cluster with lost quorum
|
||||||
isEtcdMember, err := services.IsEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory, clientCert, clientKey)
|
isEtcdMember, err := services.IsEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory,
|
||||||
|
currentCluster.Version, clientCert, clientKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !isEtcdMember {
|
if !isEtcdMember {
|
||||||
if err := services.AddEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory, clientCert, clientKey); err != nil {
|
if err := services.AddEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory,
|
||||||
|
currentCluster.Version, clientCert, clientKey); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -245,7 +247,8 @@ func deleteEtcdMembers(ctx context.Context, currentCluster, kubeCluster *Cluster
|
|||||||
|
|
||||||
for _, etcdHost := range etcdToDelete {
|
for _, etcdHost := range etcdToDelete {
|
||||||
etcdHost.IsEtcd = false
|
etcdHost.IsEtcd = false
|
||||||
if err := services.RemoveEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory, clientCert, clientKey, etcdNodePlanMap); err != nil {
|
if err := services.RemoveEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory,
|
||||||
|
currentCluster.Version, clientCert, clientKey, etcdNodePlanMap); err != nil {
|
||||||
log.Warnf(ctx, "[reconcile] %v", err)
|
log.Warnf(ctx, "[reconcile] %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
175
services/etcd.go
175
services/etcd.go
@ -8,7 +8,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
etcdclient "github.com/coreos/etcd/client"
|
"github.com/blang/semver"
|
||||||
"github.com/docker/docker/api/types/container"
|
"github.com/docker/docker/api/types/container"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/rancher/rke/docker"
|
"github.com/rancher/rke/docker"
|
||||||
@ -19,6 +19,7 @@ import (
|
|||||||
v3 "github.com/rancher/rke/types"
|
v3 "github.com/rancher/rke/types"
|
||||||
"github.com/rancher/rke/util"
|
"github.com/rancher/rke/util"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
etcdclientv2 "go.etcd.io/etcd/client/v2"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -167,7 +168,8 @@ func RemoveEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, force bool) e
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func AddEtcdMember(ctx context.Context, toAddEtcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte) error {
|
func AddEtcdMember(ctx context.Context, toAddEtcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory,
|
||||||
|
k8sVersion string, cert, key []byte) error {
|
||||||
log.Infof(ctx, "[add/%s] Adding member [etcd-%s] to etcd cluster", ETCDRole, toAddEtcdHost.HostnameOverride)
|
log.Infof(ctx, "[add/%s] Adding member [etcd-%s] to etcd cluster", ETCDRole, toAddEtcdHost.HostnameOverride)
|
||||||
peerURL := fmt.Sprintf("https://%s:2380", toAddEtcdHost.InternalAddress)
|
peerURL := fmt.Sprintf("https://%s:2380", toAddEtcdHost.InternalAddress)
|
||||||
added := false
|
added := false
|
||||||
@ -175,15 +177,27 @@ func AddEtcdMember(ctx context.Context, toAddEtcdHost *hosts.Host, etcdHosts []*
|
|||||||
if host.Address == toAddEtcdHost.Address {
|
if host.Address == toAddEtcdHost.Address {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
etcdClient, err := getEtcdClient(ctx, host, localConnDialerFactory, cert, key)
|
if etcdClientV3Range(k8sVersion) {
|
||||||
if err != nil {
|
etcdClient, err := getEtcdClientV3(ctx, host, localConnDialerFactory, cert, key)
|
||||||
logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err)
|
if err != nil {
|
||||||
continue
|
logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err)
|
||||||
}
|
continue
|
||||||
memAPI := etcdclient.NewMembersAPI(etcdClient)
|
}
|
||||||
if _, err := memAPI.Add(ctx, peerURL); err != nil {
|
if _, err := etcdClient.MemberAdd(ctx, []string{peerURL}); err != nil {
|
||||||
logrus.Debugf("Failed to Add etcd member [%s] from host: %v", host.Address, err)
|
logrus.Debugf("Failed to Add etcd member [%s] from host: %v", host.Address, err)
|
||||||
continue
|
continue
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
etcdClient, err := getEtcdClientV2(ctx, host, localConnDialerFactory, cert, key)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
memAPI := etcdclientv2.NewMembersAPI(etcdClient)
|
||||||
|
if _, err := memAPI.Add(ctx, peerURL); err != nil {
|
||||||
|
logrus.Debugf("Failed to Add etcd member [%s] from host: %v", host.Address, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
}
|
}
|
||||||
added = true
|
added = true
|
||||||
break
|
break
|
||||||
@ -195,32 +209,70 @@ func AddEtcdMember(ctx context.Context, toAddEtcdHost *hosts.Host, etcdHosts []*
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func RemoveEtcdMember(ctx context.Context, toDeleteEtcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte, etcdNodePlanMap map[string]v3.RKEConfigNodePlan) error {
|
func etcdClientV3Range(k8sVersion string) bool {
|
||||||
|
toMatch, err := semver.Make(k8sVersion[1:])
|
||||||
|
if err != nil {
|
||||||
|
logrus.Warnf("Cluster version [%s] can not be parsed as semver", k8sVersion)
|
||||||
|
}
|
||||||
|
etcdv3Range, err := semver.ParseRange(">=1.22.0-rancher0")
|
||||||
|
if err != nil {
|
||||||
|
logrus.Warnf("Failed to parse semver range for checking etcd v3 client range")
|
||||||
|
}
|
||||||
|
return etcdv3Range(toMatch)
|
||||||
|
}
|
||||||
|
|
||||||
|
func RemoveEtcdMember(ctx context.Context, toDeleteEtcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory,
|
||||||
|
k8sVersion string, cert, key []byte, etcdNodePlanMap map[string]v3.RKEConfigNodePlan) error {
|
||||||
log.Infof(ctx, "[remove/%s] Removing member [etcd-%s] from etcd cluster", ETCDRole, toDeleteEtcdHost.HostnameOverride)
|
log.Infof(ctx, "[remove/%s] Removing member [etcd-%s] from etcd cluster", ETCDRole, toDeleteEtcdHost.HostnameOverride)
|
||||||
var mID string
|
var mIDv3 uint64
|
||||||
|
var mIDv2 string
|
||||||
removed := false
|
removed := false
|
||||||
for _, host := range etcdHosts {
|
for _, host := range etcdHosts {
|
||||||
etcdClient, err := getEtcdClient(ctx, host, localConnDialerFactory, cert, key)
|
if etcdClientV3Range(k8sVersion) {
|
||||||
if err != nil {
|
etcdClient, err := getEtcdClientV3(ctx, host, localConnDialerFactory, cert, key)
|
||||||
logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err)
|
if err != nil {
|
||||||
continue
|
logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err)
|
||||||
}
|
continue
|
||||||
memAPI := etcdclient.NewMembersAPI(etcdClient)
|
}
|
||||||
members, err := memAPI.List(ctx)
|
members, err := etcdClient.MemberList(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Debugf("Failed to list etcd members from host [%s]: %v", host.Address, err)
|
logrus.Debugf("Failed to list etcd members from host [%s]: %v", host.Address, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, member := range members {
|
for _, member := range members.Members {
|
||||||
if member.Name == fmt.Sprintf("etcd-%s", toDeleteEtcdHost.HostnameOverride) {
|
if member.Name == fmt.Sprintf("etcd-%s", toDeleteEtcdHost.HostnameOverride) {
|
||||||
mID = member.ID
|
mIDv3 = member.ID
|
||||||
break
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if _, err := etcdClient.MemberRemove(ctx, mIDv3); err != nil {
|
||||||
|
logrus.Debugf("Failed to list etcd members from host [%s]: %v", host.Address, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
etcdClient, err := getEtcdClientV2(ctx, host, localConnDialerFactory, cert, key)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
memAPI := etcdclientv2.NewMembersAPI(etcdClient)
|
||||||
|
members, err := memAPI.List(ctx)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Debugf("Failed to list etcd members from host [%s]: %v", host.Address, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, member := range members {
|
||||||
|
if member.Name == fmt.Sprintf("etcd-%s", toDeleteEtcdHost.HostnameOverride) {
|
||||||
|
mIDv2 = member.ID
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := memAPI.Remove(ctx, mIDv2); err != nil {
|
||||||
|
logrus.Debugf("Failed to list etcd members from host [%s]: %v", host.Address, err)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := memAPI.Remove(ctx, mID); err != nil {
|
|
||||||
logrus.Debugf("Failed to list etcd members from host [%s]: %v", host.Address, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
etcdMemberDeletedTime := time.Now()
|
etcdMemberDeletedTime := time.Now()
|
||||||
// Need to health check after successful member remove (especially for leader re-election)
|
// Need to health check after successful member remove (especially for leader re-election)
|
||||||
// We will check all hosts to see if the cluster becomes healthy
|
// We will check all hosts to see if the cluster becomes healthy
|
||||||
@ -278,32 +330,55 @@ func ReloadEtcdCluster(ctx context.Context, readyEtcdHosts []*hosts.Host, newHos
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func IsEtcdMember(ctx context.Context, etcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte) (bool, error) {
|
func IsEtcdMember(ctx context.Context, etcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory,
|
||||||
|
k8sVersion string, cert, key []byte) (bool, error) {
|
||||||
var listErr error
|
var listErr error
|
||||||
peerURL := fmt.Sprintf("https://%s:2380", etcdHost.InternalAddress)
|
peerURL := fmt.Sprintf("https://%s:2380", etcdHost.InternalAddress)
|
||||||
for _, host := range etcdHosts {
|
for _, host := range etcdHosts {
|
||||||
if host.Address == etcdHost.Address {
|
if host.Address == etcdHost.Address {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
etcdClient, err := getEtcdClient(ctx, host, localConnDialerFactory, cert, key)
|
if etcdClientV3Range(k8sVersion) {
|
||||||
if err != nil {
|
etcdClient, err := getEtcdClientV3(ctx, host, localConnDialerFactory, cert, key)
|
||||||
listErr = errors.Wrapf(err, "Failed to create etcd client for host [%s]", host.Address)
|
if err != nil {
|
||||||
logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err)
|
listErr = errors.Wrapf(err, "Failed to create etcd client for host [%s]", host.Address)
|
||||||
continue
|
logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err)
|
||||||
}
|
continue
|
||||||
memAPI := etcdclient.NewMembersAPI(etcdClient)
|
}
|
||||||
members, err := memAPI.List(ctx)
|
members, err := etcdClient.MemberList(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
listErr = errors.Wrapf(err, "Failed to create etcd client for host [%s]", host.Address)
|
listErr = errors.Wrapf(err, "Failed to create etcd client for host [%s]", host.Address)
|
||||||
logrus.Debugf("Failed to list etcd cluster members [%s]: %v", etcdHost.Address, err)
|
logrus.Debugf("Failed to list etcd cluster members [%s]: %v", etcdHost.Address, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, member := range members {
|
for _, member := range members.Members {
|
||||||
if strings.Contains(member.PeerURLs[0], peerURL) {
|
if strings.Contains(member.PeerURLs[0], peerURL) {
|
||||||
logrus.Infof("[etcd] member [%s] is already part of the etcd cluster", etcdHost.Address)
|
logrus.Infof("[etcd] member [%s] is already part of the etcd cluster", etcdHost.Address)
|
||||||
return true, nil
|
return true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
etcdClient, err := getEtcdClientV2(ctx, host, localConnDialerFactory, cert, key)
|
||||||
|
if err != nil {
|
||||||
|
listErr = errors.Wrapf(err, "Failed to create etcd client for host [%s]", host.Address)
|
||||||
|
logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
memAPI := etcdclientv2.NewMembersAPI(etcdClient)
|
||||||
|
members, err := memAPI.List(ctx)
|
||||||
|
if err != nil {
|
||||||
|
listErr = errors.Wrapf(err, "Failed to create etcd client for host [%s]", host.Address)
|
||||||
|
logrus.Debugf("Failed to list etcd cluster members [%s]: %v", etcdHost.Address, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, member := range members {
|
||||||
|
if strings.Contains(member.PeerURLs[0], peerURL) {
|
||||||
|
logrus.Infof("[etcd] member [%s] is already part of the etcd cluster", etcdHost.Address)
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset the list of errors to handle new hosts
|
// reset the list of errors to handle new hosts
|
||||||
listErr = nil
|
listErr = nil
|
||||||
break
|
break
|
||||||
|
@ -11,12 +11,14 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
etcdclient "github.com/coreos/etcd/client"
|
|
||||||
"github.com/rancher/rke/hosts"
|
"github.com/rancher/rke/hosts"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
etcdclientv2 "go.etcd.io/etcd/client/v2"
|
||||||
|
etcdclientv3 "go.etcd.io/etcd/client/v3"
|
||||||
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
func getEtcdClient(ctx context.Context, etcdHost *hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte) (etcdclient.Client, error) {
|
func getEtcdClientV2(ctx context.Context, etcdHost *hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte) (etcdclientv2.Client, error) {
|
||||||
dialer, err := getEtcdDialer(localConnDialerFactory, etcdHost)
|
dialer, err := getEtcdDialer(localConnDialerFactory, etcdHost)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create a dialer for host [%s]: %v", etcdHost.Address, err)
|
return nil, fmt.Errorf("failed to create a dialer for host [%s]: %v", etcdHost.Address, err)
|
||||||
@ -26,18 +28,44 @@ func getEtcdClient(ctx context.Context, etcdHost *hosts.Host, localConnDialerFac
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var DefaultEtcdTransport etcdclient.CancelableTransport = &http.Transport{
|
var defaultEtcdTransport etcdclientv2.CancelableTransport = &http.Transport{
|
||||||
Dial: dialer,
|
Dial: dialer,
|
||||||
TLSClientConfig: tlsConfig,
|
TLSClientConfig: tlsConfig,
|
||||||
TLSHandshakeTimeout: 10 * time.Second,
|
TLSHandshakeTimeout: 10 * time.Second,
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg := etcdclient.Config{
|
cfg := etcdclientv2.Config{
|
||||||
Endpoints: []string{"https://" + etcdHost.InternalAddress + ":2379"},
|
Endpoints: []string{"https://" + etcdHost.InternalAddress + ":2379"},
|
||||||
Transport: DefaultEtcdTransport,
|
Transport: defaultEtcdTransport,
|
||||||
}
|
}
|
||||||
|
|
||||||
return etcdclient.New(cfg)
|
return etcdclientv2.New(cfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getEtcdClientV3(ctx context.Context, etcdHost *hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte) (*etcdclientv3.Client, error) {
|
||||||
|
dialer, err := getEtcdDialer(localConnDialerFactory, etcdHost)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create a dialer for host [%s]: %v", etcdHost.Address, err)
|
||||||
|
}
|
||||||
|
tlsConfig, err := getEtcdTLSConfig(cert, key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg := etcdclientv3.Config{
|
||||||
|
Endpoints: []string{"https://" + etcdHost.InternalAddress + ":2379"},
|
||||||
|
TLS: tlsConfig,
|
||||||
|
DialOptions: []grpc.DialOption{grpc.WithContextDialer(wrapper(dialer))},
|
||||||
|
}
|
||||||
|
|
||||||
|
return etcdclientv3.New(cfg)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func wrapper(f func(network, address string) (net.Conn, error)) func(context.Context, string) (net.Conn, error) {
|
||||||
|
return func(_ context.Context, address string) (net.Conn, error) {
|
||||||
|
return f("tcp", address)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func isEtcdHealthy(localConnDialerFactory hosts.DialerFactory, host *hosts.Host, cert, key []byte, url string) error {
|
func isEtcdHealthy(localConnDialerFactory hosts.DialerFactory, host *hosts.Host, cert, key []byte, url string) error {
|
||||||
|
Loading…
Reference in New Issue
Block a user