1
0
mirror of https://github.com/rancher/rke.git synced 2025-04-28 11:36:27 +00:00
rke/services/etcd.go

321 lines
12 KiB
Go
Raw Normal View History

2017-10-29 09:45:21 +00:00
package services
import (
2017-11-14 18:11:21 +00:00
"fmt"
2018-05-09 17:39:19 +00:00
"path"
"path/filepath"
2018-02-21 01:53:32 +00:00
"strings"
"time"
2017-11-14 18:11:21 +00:00
"context"
etcdclient "github.com/coreos/etcd/client"
2018-05-09 17:39:19 +00:00
"github.com/docker/docker/api/types/container"
2018-02-21 01:53:32 +00:00
"github.com/pkg/errors"
"github.com/rancher/rke/docker"
2017-10-29 09:45:21 +00:00
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/log"
2018-05-09 17:39:19 +00:00
"github.com/rancher/rke/pki"
2017-12-05 16:55:58 +00:00
"github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus"
2017-10-29 09:45:21 +00:00
)
2018-05-15 21:06:05 +00:00
const (
2018-05-17 22:27:35 +00:00
EtcdSnapshotPath = "/opt/rke/etcd-snapshots"
EtcdRestorePath = "/opt/rke/etcd-snapshots-restore/"
EtcdDataDir = "/var/lib/rancher/etcd/"
2018-05-15 21:06:05 +00:00
)
2018-05-17 22:27:35 +00:00
type EtcdSnapshot struct {
// Enable or disable snapshot creation
Snapshot bool
// Creation period of the etcd snapshots
2018-05-09 17:39:19 +00:00
Creation string
2018-05-17 22:27:35 +00:00
// Retention period of the etcd snapshots
2018-05-09 17:39:19 +00:00
Retention string
}
func RunEtcdPlane(
ctx context.Context,
etcdHosts []*hosts.Host,
etcdNodePlanMap map[string]v3.RKEConfigNodePlan,
localConnDialerFactory hosts.DialerFactory,
prsMap map[string]v3.PrivateRegistry,
updateWorkersOnly bool,
alpineImage string,
2018-05-17 22:27:35 +00:00
etcdSnapshot EtcdSnapshot) error {
2018-03-31 10:53:59 +00:00
log.Infof(ctx, "[%s] Building up etcd plane..", ETCDRole)
2017-10-29 09:45:21 +00:00
for _, host := range etcdHosts {
if updateWorkersOnly {
continue
}
etcdProcess := etcdNodePlanMap[host.Address].Processes[EtcdContainerName]
imageCfg, hostCfg, _ := GetProcessConfig(etcdProcess)
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, EtcdContainerName, host.Address, ETCDRole, prsMap); err != nil {
return err
}
2018-05-17 22:27:35 +00:00
if etcdSnapshot.Snapshot {
if err := RunEtcdSnapshotSave(ctx, host, prsMap, alpineImage, etcdSnapshot.Creation, etcdSnapshot.Retention, EtcdSnapshotContainerName, false); err != nil {
2018-05-09 17:39:19 +00:00
return err
}
if err := pki.SaveBackupBundleOnHost(ctx, host, alpineImage, EtcdSnapshotPath, prsMap); err != nil {
return err
}
2018-05-09 17:39:19 +00:00
}
if err := createLogLink(ctx, host, EtcdContainerName, ETCDRole, alpineImage, prsMap); err != nil {
2017-10-29 09:45:21 +00:00
return err
}
}
2018-03-31 10:53:59 +00:00
log.Infof(ctx, "[%s] Successfully started etcd plane..", ETCDRole)
2017-10-29 09:45:21 +00:00
return nil
}
func RemoveEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, force bool) error {
2018-03-31 10:53:59 +00:00
log.Infof(ctx, "[%s] Tearing down etcd plane..", ETCDRole)
for _, host := range etcdHosts {
err := docker.DoRemoveContainer(ctx, host.DClient, EtcdContainerName, host.Address)
if err != nil {
return err
}
if !host.IsWorker || !host.IsControl || force {
// remove unschedulable kubelet on etcd host
if err := removeKubelet(ctx, host); err != nil {
return err
}
if err := removeKubeproxy(ctx, host); err != nil {
return err
}
if err := removeNginxProxy(ctx, host); err != nil {
return err
}
if err := removeSidekick(ctx, host); err != nil {
return err
}
}
}
2018-03-31 10:53:59 +00:00
log.Infof(ctx, "[%s] Successfully tore down etcd plane..", ETCDRole)
return nil
}
2018-02-21 01:53:32 +00:00
func AddEtcdMember(ctx context.Context, toAddEtcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte) error {
log.Infof(ctx, "[add/%s] Adding member [etcd-%s] to etcd cluster", ETCDRole, toAddEtcdHost.HostnameOverride)
peerURL := fmt.Sprintf("https://%s:2380", toAddEtcdHost.InternalAddress)
added := false
for _, host := range etcdHosts {
2018-02-21 01:53:32 +00:00
if host.Address == toAddEtcdHost.Address {
continue
}
etcdClient, err := getEtcdClient(ctx, host, localConnDialerFactory, cert, key)
if err != nil {
logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err)
continue
}
memAPI := etcdclient.NewMembersAPI(etcdClient)
if _, err := memAPI.Add(ctx, peerURL); err != nil {
2018-02-21 01:53:32 +00:00
logrus.Debugf("Failed to Add etcd member [%s] from host: %v", host.Address, err)
continue
}
added = true
break
}
if !added {
2018-02-21 01:53:32 +00:00
return fmt.Errorf("Failed to add etcd member [etcd-%s] to etcd cluster", toAddEtcdHost.HostnameOverride)
}
2018-02-21 01:53:32 +00:00
log.Infof(ctx, "[add/%s] Successfully Added member [etcd-%s] to etcd cluster", ETCDRole, toAddEtcdHost.HostnameOverride)
return nil
}
func RemoveEtcdMember(ctx context.Context, etcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte) error {
log.Infof(ctx, "[remove/%s] Removing member [etcd-%s] from etcd cluster", ETCDRole, etcdHost.HostnameOverride)
var mID string
removed := false
for _, host := range etcdHosts {
etcdClient, err := getEtcdClient(ctx, host, localConnDialerFactory, cert, key)
if err != nil {
logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err)
continue
}
memAPI := etcdclient.NewMembersAPI(etcdClient)
members, err := memAPI.List(ctx)
if err != nil {
logrus.Debugf("Failed to list etcd members from host [%s]: %v", host.Address, err)
continue
}
for _, member := range members {
if member.Name == fmt.Sprintf("etcd-%s", etcdHost.HostnameOverride) {
mID = member.ID
break
}
2017-10-29 09:45:21 +00:00
}
if err := memAPI.Remove(ctx, mID); err != nil {
logrus.Debugf("Failed to list etcd members from host [%s]: %v", host.Address, err)
continue
}
removed = true
break
}
if !removed {
return fmt.Errorf("Failed to delete etcd member [etcd-%s] from etcd cluster", etcdHost.HostnameOverride)
2017-10-29 09:45:21 +00:00
}
log.Infof(ctx, "[remove/%s] Successfully removed member [etcd-%s] from etcd cluster", ETCDRole, etcdHost.HostnameOverride)
return nil
2017-10-29 09:45:21 +00:00
}
func ReloadEtcdCluster(ctx context.Context, readyEtcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte, prsMap map[string]v3.PrivateRegistry, etcdNodePlanMap map[string]v3.RKEConfigNodePlan, alpineImage string) error {
for _, etcdHost := range readyEtcdHosts {
imageCfg, hostCfg, _ := GetProcessConfig(etcdNodePlanMap[etcdHost.Address].Processes[EtcdContainerName])
if err := docker.DoRunContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdContainerName, etcdHost.Address, ETCDRole, prsMap); err != nil {
return err
}
if err := createLogLink(ctx, etcdHost, EtcdContainerName, ETCDRole, alpineImage, prsMap); err != nil {
return err
}
}
time.Sleep(10 * time.Second)
var healthy bool
for _, host := range readyEtcdHosts {
_, _, healthCheckURL := GetProcessConfig(etcdNodePlanMap[host.Address].Processes[EtcdContainerName])
if healthy = isEtcdHealthy(ctx, localConnDialerFactory, host, cert, key, healthCheckURL); healthy {
break
}
}
if !healthy {
return fmt.Errorf("[etcd] Etcd Cluster is not healthy")
}
return nil
}
2018-02-21 01:53:32 +00:00
func IsEtcdMember(ctx context.Context, etcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte) (bool, error) {
var listErr error
peerURL := fmt.Sprintf("https://%s:2380", etcdHost.InternalAddress)
for _, host := range etcdHosts {
if host.Address == etcdHost.Address {
continue
}
etcdClient, err := getEtcdClient(ctx, host, localConnDialerFactory, cert, key)
if err != nil {
listErr = errors.Wrapf(err, "Failed to create etcd client for host [%s]", host.Address)
logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err)
continue
}
memAPI := etcdclient.NewMembersAPI(etcdClient)
members, err := memAPI.List(ctx)
if err != nil {
listErr = errors.Wrapf(err, "Failed to create etcd client for host [%s]", host.Address)
logrus.Debugf("Failed to list etcd cluster members [%s]: %v", etcdHost.Address, err)
continue
}
for _, member := range members {
if strings.Contains(member.PeerURLs[0], peerURL) {
logrus.Infof("[etcd] member [%s] is already part of the etcd cluster", etcdHost.Address)
return true, nil
}
}
2018-03-06 22:32:50 +00:00
// reset the list of errors to handle new hosts
listErr = nil
break
2018-02-21 01:53:32 +00:00
}
if listErr != nil {
return false, listErr
}
return false, nil
}
2018-05-09 17:39:19 +00:00
2018-05-17 22:27:35 +00:00
func RunEtcdSnapshotSave(ctx context.Context, etcdHost *hosts.Host, prsMap map[string]v3.PrivateRegistry, etcdSnapshotImage string, creation, retention, name string, once bool) error {
log.Infof(ctx, "[etcd] Saving snapshot [%s] on host [%s]", name, etcdHost.Address)
2018-05-09 17:39:19 +00:00
imageCfg := &container.Config{
Cmd: []string{
"/opt/rke-tools/rke-etcd-backup",
2018-05-09 17:39:19 +00:00
"rolling-backup",
"--cacert", pki.GetCertPath(pki.CACertName),
"--cert", pki.GetCertPath(pki.KubeNodeCertName),
"--key", pki.GetKeyPath(pki.KubeNodeCertName),
"--name", name,
"--endpoints=" + etcdHost.InternalAddress + ":2379",
2018-05-09 17:39:19 +00:00
},
2018-05-17 22:27:35 +00:00
Image: etcdSnapshotImage,
2018-05-09 17:39:19 +00:00
}
if once {
imageCfg.Cmd = append(imageCfg.Cmd, "--once")
}
if !once {
imageCfg.Cmd = append(imageCfg.Cmd, "--retention="+retention)
imageCfg.Cmd = append(imageCfg.Cmd, "--creation="+creation)
}
hostCfg := &container.HostConfig{
Binds: []string{
2018-05-17 22:27:35 +00:00
fmt.Sprintf("%s:/backup", EtcdSnapshotPath),
2018-05-09 17:39:19 +00:00
fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(etcdHost.PrefixPath, "/etc/kubernetes"))},
NetworkMode: container.NetworkMode("host"),
}
if once {
2018-05-17 22:27:35 +00:00
if err := docker.DoRunContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdSnapshotOnceContainerName, etcdHost.Address, ETCDRole, prsMap); err != nil {
2018-05-09 17:39:19 +00:00
return err
}
2018-05-17 22:27:35 +00:00
status, err := docker.WaitForContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdSnapshotOnceContainerName)
2018-05-09 17:39:19 +00:00
if status != 0 || err != nil {
2018-05-17 22:27:35 +00:00
return fmt.Errorf("Failed to take etcd snapshot exit code [%d]: %v", status, err)
2018-05-09 17:39:19 +00:00
}
2018-05-17 22:27:35 +00:00
return docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdSnapshotOnceContainerName)
2018-05-09 17:39:19 +00:00
}
2018-05-17 22:27:35 +00:00
return docker.DoRunContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdSnapshotContainerName, etcdHost.Address, ETCDRole, prsMap)
2018-05-09 17:39:19 +00:00
}
2018-05-17 22:27:35 +00:00
func RestoreEtcdSnapshot(ctx context.Context, etcdHost *hosts.Host, prsMap map[string]v3.PrivateRegistry, etcdRestoreImage, snapshotName, initCluster string) error {
log.Infof(ctx, "[etcd] Restoring [%s] snapshot on etcd host [%s]", snapshotName, etcdHost.Address)
2018-05-09 17:39:19 +00:00
nodeName := pki.GetEtcdCrtName(etcdHost.InternalAddress)
2018-05-17 22:27:35 +00:00
snapshotPath := filepath.Join(EtcdSnapshotPath, snapshotName)
2018-05-09 17:39:19 +00:00
// make sure that restore path is empty otherwise etcd restore will fail
2018-05-09 17:39:19 +00:00
imageCfg := &container.Config{
Cmd: []string{
2018-05-15 21:06:05 +00:00
"sh", "-c", strings.Join([]string{
"rm -rf", EtcdRestorePath,
"&& /usr/local/bin/etcdctl",
fmt.Sprintf("--endpoints=[%s:2379]", etcdHost.InternalAddress),
2018-05-15 21:06:05 +00:00
"--cacert", pki.GetCertPath(pki.CACertName),
"--cert", pki.GetCertPath(nodeName),
"--key", pki.GetKeyPath(nodeName),
2018-05-17 22:27:35 +00:00
"snapshot", "restore", snapshotPath,
2018-05-15 21:06:05 +00:00
"--data-dir=" + EtcdRestorePath,
"--name=etcd-" + etcdHost.HostnameOverride,
"--initial-cluster=" + initCluster,
"--initial-cluster-token=etcd-cluster-1",
"--initial-advertise-peer-urls=https://" + etcdHost.InternalAddress + ":2380",
"&& mv", EtcdRestorePath + "*", EtcdDataDir,
"&& rm -rf", EtcdRestorePath,
}, " "),
2018-05-09 17:39:19 +00:00
},
Env: []string{"ETCDCTL_API=3"},
Image: etcdRestoreImage,
}
hostCfg := &container.HostConfig{
Binds: []string{
2018-05-15 21:06:05 +00:00
"/opt/rke/:/opt/rke/:z",
fmt.Sprintf("%s:/var/lib/rancher/etcd:z", path.Join(etcdHost.PrefixPath, "/var/lib/etcd")),
2018-05-09 17:39:19 +00:00
fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(etcdHost.PrefixPath, "/etc/kubernetes"))},
NetworkMode: container.NetworkMode("host"),
}
if err := docker.DoRunContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdRestoreContainerName, etcdHost.Address, ETCDRole, prsMap); err != nil {
return err
}
status, err := docker.WaitForContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdRestoreContainerName)
if err != nil {
return err
}
if status != 0 {
2018-07-21 07:14:45 +00:00
containerLog, err := docker.GetContainerLogsStdoutStderr(ctx, etcdHost.DClient, EtcdRestoreContainerName, "5", false)
if err != nil {
return err
}
if err := docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdRestoreContainerName); err != nil {
return err
}
// printing the restore container's logs
return fmt.Errorf("Failed to run etcd restore container, exit status is: %d, container logs: %s", status, containerLog)
2018-05-09 17:39:19 +00:00
}
return docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdRestoreContainerName)
}