mirror of
https://github.com/rancher/rke.git
synced 2025-04-27 11:21:08 +00:00
843 lines
34 KiB
Go
843 lines
34 KiB
Go
package services
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
"fmt"
|
|
"path"
|
|
"strings"
|
|
"time"
|
|
|
|
etcdclient "github.com/coreos/etcd/client"
|
|
"github.com/docker/docker/api/types/container"
|
|
"github.com/pkg/errors"
|
|
"github.com/rancher/rke/docker"
|
|
"github.com/rancher/rke/hosts"
|
|
"github.com/rancher/rke/log"
|
|
"github.com/rancher/rke/pki"
|
|
"github.com/rancher/rke/pki/cert"
|
|
v3 "github.com/rancher/rke/types"
|
|
"github.com/rancher/rke/util"
|
|
"github.com/sirupsen/logrus"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
const (
|
|
EtcdSnapshotPath = "/opt/rke/etcd-snapshots/"
|
|
EtcdRestorePath = "/opt/rke/etcd-snapshots-restore/"
|
|
EtcdDataDir = "/var/lib/rancher/etcd/"
|
|
EtcdInitWaitTime = 10
|
|
EtcdSnapshotWaitTime = 5
|
|
EtcdPermFixContainerName = "etcd-fix-perm"
|
|
)
|
|
|
|
func RunEtcdPlane(
|
|
ctx context.Context,
|
|
etcdHosts []*hosts.Host,
|
|
etcdNodePlanMap map[string]v3.RKEConfigNodePlan,
|
|
localConnDialerFactory hosts.DialerFactory,
|
|
prsMap map[string]v3.PrivateRegistry,
|
|
updateWorkersOnly bool,
|
|
alpineImage string,
|
|
es v3.ETCDService,
|
|
certMap map[string]pki.CertificatePKI) error {
|
|
log.Infof(ctx, "[%s] Building up etcd plane..", ETCDRole)
|
|
for _, host := range etcdHosts {
|
|
if updateWorkersOnly {
|
|
continue
|
|
}
|
|
|
|
etcdProcess := etcdNodePlanMap[host.Address].Processes[EtcdContainerName]
|
|
|
|
// need to run this first to set proper ownership and permissions on etcd data dir
|
|
if err := setEtcdPermissions(ctx, host, prsMap, alpineImage, etcdProcess); err != nil {
|
|
return err
|
|
}
|
|
imageCfg, hostCfg, _ := GetProcessConfig(etcdProcess, host)
|
|
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, EtcdContainerName, host.Address, ETCDRole, prsMap); err != nil {
|
|
return err
|
|
}
|
|
if *es.Snapshot == true {
|
|
rkeToolsImage, err := util.GetDefaultRKETools(alpineImage)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := RunEtcdSnapshotSave(ctx, host, prsMap, rkeToolsImage, EtcdSnapshotContainerName, false, es); err != nil {
|
|
return err
|
|
}
|
|
if err := pki.SaveBackupBundleOnHost(ctx, host, rkeToolsImage, EtcdSnapshotPath, prsMap); err != nil {
|
|
return err
|
|
}
|
|
if err := createLogLink(ctx, host, EtcdSnapshotContainerName, ETCDRole, alpineImage, prsMap); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
if err := docker.DoRemoveContainer(ctx, host.DClient, EtcdSnapshotContainerName, host.Address); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if err := createLogLink(ctx, host, EtcdContainerName, ETCDRole, alpineImage, prsMap); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
log.Infof(ctx, "[%s] Successfully started etcd plane.. Checking etcd cluster health", ETCDRole)
|
|
clientCert := cert.EncodeCertPEM(certMap[pki.KubeNodeCertName].Certificate)
|
|
clientKey := cert.EncodePrivateKeyPEM(certMap[pki.KubeNodeCertName].Key)
|
|
var healthError error
|
|
var hosts []string
|
|
for _, host := range etcdHosts {
|
|
_, _, healthCheckURL := GetProcessConfig(etcdNodePlanMap[host.Address].Processes[EtcdContainerName], host)
|
|
healthError = isEtcdHealthy(localConnDialerFactory, host, clientCert, clientKey, healthCheckURL)
|
|
if healthError == nil {
|
|
break
|
|
}
|
|
logrus.Warn(healthError)
|
|
hosts = append(hosts, host.Address)
|
|
}
|
|
if healthError != nil {
|
|
return fmt.Errorf("etcd cluster is unhealthy: hosts [%s] failed to report healthy."+
|
|
" Check etcd container logs on each host for more information", strings.Join(hosts, ","))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func RestartEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host) error {
|
|
log.Infof(ctx, "[%s] Restarting up etcd plane..", ETCDRole)
|
|
var errgrp errgroup.Group
|
|
|
|
hostsQueue := util.GetObjectQueue(etcdHosts)
|
|
for w := 0; w < WorkerThreads; w++ {
|
|
errgrp.Go(func() error {
|
|
var errList []error
|
|
for host := range hostsQueue {
|
|
runHost := host.(*hosts.Host)
|
|
if err := docker.DoRestartContainer(ctx, runHost.DClient, EtcdContainerName, runHost.Address); err != nil {
|
|
errList = append(errList, err)
|
|
}
|
|
}
|
|
return util.ErrList(errList)
|
|
})
|
|
}
|
|
if err := errgrp.Wait(); err != nil {
|
|
return err
|
|
}
|
|
log.Infof(ctx, "[%s] Successfully restarted etcd plane..", ETCDRole)
|
|
return nil
|
|
}
|
|
|
|
func RemoveEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, force bool) error {
|
|
log.Infof(ctx, "[%s] Tearing down etcd plane..", ETCDRole)
|
|
|
|
var errgrp errgroup.Group
|
|
hostsQueue := util.GetObjectQueue(etcdHosts)
|
|
for w := 0; w < WorkerThreads; w++ {
|
|
errgrp.Go(func() error {
|
|
var errList []error
|
|
for host := range hostsQueue {
|
|
runHost := host.(*hosts.Host)
|
|
if err := docker.DoRemoveContainer(ctx, runHost.DClient, EtcdContainerName, runHost.Address); err != nil {
|
|
errList = append(errList, err)
|
|
}
|
|
if err := docker.DoRemoveContainer(ctx, runHost.DClient, EtcdSnapshotContainerName, runHost.Address); err != nil {
|
|
errList = append(errList, err)
|
|
}
|
|
if !runHost.IsWorker || !runHost.IsControl || force {
|
|
// remove unschedulable kubelet on etcd host
|
|
if err := removeKubelet(ctx, runHost); err != nil {
|
|
errList = append(errList, err)
|
|
}
|
|
if err := removeKubeproxy(ctx, runHost); err != nil {
|
|
errList = append(errList, err)
|
|
}
|
|
if err := removeNginxProxy(ctx, runHost); err != nil {
|
|
errList = append(errList, err)
|
|
}
|
|
if err := removeSidekick(ctx, runHost); err != nil {
|
|
errList = append(errList, err)
|
|
}
|
|
}
|
|
}
|
|
return util.ErrList(errList)
|
|
})
|
|
}
|
|
if err := errgrp.Wait(); err != nil {
|
|
return err
|
|
}
|
|
log.Infof(ctx, "[%s] Successfully tore down etcd plane..", ETCDRole)
|
|
return nil
|
|
}
|
|
|
|
func AddEtcdMember(ctx context.Context, toAddEtcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte) error {
|
|
log.Infof(ctx, "[add/%s] Adding member [etcd-%s] to etcd cluster", ETCDRole, toAddEtcdHost.HostnameOverride)
|
|
peerURL := fmt.Sprintf("https://%s:2380", toAddEtcdHost.InternalAddress)
|
|
added := false
|
|
for _, host := range etcdHosts {
|
|
if host.Address == toAddEtcdHost.Address {
|
|
continue
|
|
}
|
|
etcdClient, err := getEtcdClient(ctx, host, localConnDialerFactory, cert, key)
|
|
if err != nil {
|
|
logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err)
|
|
continue
|
|
}
|
|
memAPI := etcdclient.NewMembersAPI(etcdClient)
|
|
if _, err := memAPI.Add(ctx, peerURL); err != nil {
|
|
logrus.Debugf("Failed to Add etcd member [%s] from host: %v", host.Address, err)
|
|
continue
|
|
}
|
|
added = true
|
|
break
|
|
}
|
|
if !added {
|
|
return fmt.Errorf("Failed to add etcd member [etcd-%s] to etcd cluster", toAddEtcdHost.HostnameOverride)
|
|
}
|
|
log.Infof(ctx, "[add/%s] Successfully Added member [etcd-%s] to etcd cluster", ETCDRole, toAddEtcdHost.HostnameOverride)
|
|
return nil
|
|
}
|
|
|
|
func RemoveEtcdMember(ctx context.Context, toDeleteEtcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte, etcdNodePlanMap map[string]v3.RKEConfigNodePlan) error {
|
|
log.Infof(ctx, "[remove/%s] Removing member [etcd-%s] from etcd cluster", ETCDRole, toDeleteEtcdHost.HostnameOverride)
|
|
var mID string
|
|
removed := false
|
|
for _, host := range etcdHosts {
|
|
etcdClient, err := getEtcdClient(ctx, host, localConnDialerFactory, cert, key)
|
|
if err != nil {
|
|
logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err)
|
|
continue
|
|
}
|
|
memAPI := etcdclient.NewMembersAPI(etcdClient)
|
|
members, err := memAPI.List(ctx)
|
|
if err != nil {
|
|
logrus.Debugf("Failed to list etcd members from host [%s]: %v", host.Address, err)
|
|
continue
|
|
}
|
|
for _, member := range members {
|
|
if member.Name == fmt.Sprintf("etcd-%s", toDeleteEtcdHost.HostnameOverride) {
|
|
mID = member.ID
|
|
break
|
|
}
|
|
}
|
|
if err := memAPI.Remove(ctx, mID); err != nil {
|
|
logrus.Debugf("Failed to list etcd members from host [%s]: %v", host.Address, err)
|
|
continue
|
|
}
|
|
etcdMemberDeletedTime := time.Now()
|
|
// Need to health check after successful member remove (especially for leader re-election)
|
|
// We will check all hosts to see if the cluster becomes healthy
|
|
var healthError error
|
|
_, _, healthCheckURL := GetProcessConfig(etcdNodePlanMap[host.Address].Processes[EtcdContainerName], host)
|
|
logrus.Infof("[remove/%s] Checking etcd cluster health on [etcd-%s] after removing [etcd-%s]", ETCDRole, host.HostnameOverride, toDeleteEtcdHost.HostnameOverride)
|
|
logrus.Debugf("[remove/%s] healthCheckURL for checking etcd cluster health on [etcd-%s] after removing [%s]: [%s]", ETCDRole, host.HostnameOverride, toDeleteEtcdHost.HostnameOverride, healthCheckURL)
|
|
healthError = isEtcdHealthy(localConnDialerFactory, host, cert, key, healthCheckURL)
|
|
if healthError == nil {
|
|
logrus.Infof("[remove/%s] etcd cluster health is healthy on [etcd-%s] after removing [etcd-%s]", ETCDRole, host.HostnameOverride, toDeleteEtcdHost.HostnameOverride)
|
|
etcdHealthyTime := time.Now()
|
|
diffTime := etcdHealthyTime.Sub(etcdMemberDeletedTime)
|
|
logrus.Debugf("Total time between etcd member deleted and etcd cluster healthy is: [%s]", diffTime)
|
|
removed = true
|
|
break
|
|
}
|
|
logrus.Warn(healthError)
|
|
}
|
|
if !removed {
|
|
return fmt.Errorf("Failed to delete etcd member [etcd-%s] from etcd cluster", toDeleteEtcdHost.HostnameOverride)
|
|
}
|
|
log.Infof(ctx, "[remove/%s] Successfully removed member [etcd-%s] from etcd cluster", ETCDRole, toDeleteEtcdHost.HostnameOverride)
|
|
return nil
|
|
}
|
|
|
|
func ReloadEtcdCluster(ctx context.Context, readyEtcdHosts []*hosts.Host, newHost *hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte, prsMap map[string]v3.PrivateRegistry, etcdNodePlanMap map[string]v3.RKEConfigNodePlan, alpineImage string) error {
|
|
imageCfg, hostCfg, _ := GetProcessConfig(etcdNodePlanMap[newHost.Address].Processes[EtcdContainerName], newHost)
|
|
|
|
if err := setEtcdPermissions(ctx, newHost, prsMap, alpineImage, etcdNodePlanMap[newHost.Address].Processes[EtcdContainerName]); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := docker.DoRunContainer(ctx, newHost.DClient, imageCfg, hostCfg, EtcdContainerName, newHost.Address, ETCDRole, prsMap); err != nil {
|
|
return err
|
|
}
|
|
if err := createLogLink(ctx, newHost, EtcdContainerName, ETCDRole, alpineImage, prsMap); err != nil {
|
|
return err
|
|
}
|
|
time.Sleep(EtcdInitWaitTime * time.Second)
|
|
var healthError error
|
|
var hosts []string
|
|
for _, host := range readyEtcdHosts {
|
|
_, _, healthCheckURL := GetProcessConfig(etcdNodePlanMap[host.Address].Processes[EtcdContainerName], host)
|
|
healthError = isEtcdHealthy(localConnDialerFactory, host, cert, key, healthCheckURL)
|
|
if healthError == nil {
|
|
break
|
|
}
|
|
logrus.Warn(healthError)
|
|
hosts = append(hosts, host.Address)
|
|
}
|
|
if healthError != nil {
|
|
return fmt.Errorf("etcd cluster is unhealthy: hosts [%s] failed to report healthy."+
|
|
" Check etcd container logs on each host for more information", strings.Join(hosts, ","))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func IsEtcdMember(ctx context.Context, etcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte) (bool, error) {
|
|
var listErr error
|
|
peerURL := fmt.Sprintf("https://%s:2380", etcdHost.InternalAddress)
|
|
for _, host := range etcdHosts {
|
|
if host.Address == etcdHost.Address {
|
|
continue
|
|
}
|
|
etcdClient, err := getEtcdClient(ctx, host, localConnDialerFactory, cert, key)
|
|
if err != nil {
|
|
listErr = errors.Wrapf(err, "Failed to create etcd client for host [%s]", host.Address)
|
|
logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err)
|
|
continue
|
|
}
|
|
memAPI := etcdclient.NewMembersAPI(etcdClient)
|
|
members, err := memAPI.List(ctx)
|
|
if err != nil {
|
|
listErr = errors.Wrapf(err, "Failed to create etcd client for host [%s]", host.Address)
|
|
logrus.Debugf("Failed to list etcd cluster members [%s]: %v", etcdHost.Address, err)
|
|
continue
|
|
}
|
|
for _, member := range members {
|
|
if strings.Contains(member.PeerURLs[0], peerURL) {
|
|
logrus.Infof("[etcd] member [%s] is already part of the etcd cluster", etcdHost.Address)
|
|
return true, nil
|
|
}
|
|
}
|
|
// reset the list of errors to handle new hosts
|
|
listErr = nil
|
|
break
|
|
}
|
|
if listErr != nil {
|
|
return false, listErr
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
func RunEtcdSnapshotSave(ctx context.Context, etcdHost *hosts.Host, prsMap map[string]v3.PrivateRegistry, etcdSnapshotImage string, name string, once bool, es v3.ETCDService) error {
|
|
backupCmd := "etcd-backup"
|
|
restartPolicy := "always"
|
|
imageCfg := &container.Config{
|
|
Cmd: []string{
|
|
"/opt/rke-tools/rke-etcd-backup",
|
|
backupCmd,
|
|
"save",
|
|
"--cacert", pki.GetCertPath(pki.CACertName),
|
|
"--cert", pki.GetCertPath(pki.KubeNodeCertName),
|
|
"--key", pki.GetKeyPath(pki.KubeNodeCertName),
|
|
"--name", name,
|
|
"--endpoints=" + etcdHost.InternalAddress + ":2379",
|
|
},
|
|
Image: etcdSnapshotImage,
|
|
Env: es.ExtraEnv,
|
|
}
|
|
// Configure imageCfg for one time snapshot
|
|
if once {
|
|
imageCfg.Cmd = append(imageCfg.Cmd, "--once")
|
|
restartPolicy = "no"
|
|
// Configure imageCfg for rolling snapshots
|
|
} else if es.BackupConfig == nil {
|
|
imageCfg.Cmd = append(imageCfg.Cmd, "--retention="+es.Retention)
|
|
imageCfg.Cmd = append(imageCfg.Cmd, "--creation="+es.Creation)
|
|
}
|
|
// Configure imageCfg for S3 backups
|
|
if es.BackupConfig != nil {
|
|
imageCfg = configS3BackupImgCmd(ctx, imageCfg, es.BackupConfig)
|
|
}
|
|
hostCfg := &container.HostConfig{
|
|
Binds: []string{
|
|
fmt.Sprintf("%s:/backup:z", EtcdSnapshotPath),
|
|
fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(etcdHost.PrefixPath, "/etc/kubernetes"))},
|
|
NetworkMode: container.NetworkMode("host"),
|
|
RestartPolicy: container.RestartPolicy{Name: restartPolicy},
|
|
}
|
|
|
|
if once {
|
|
log.Infof(ctx, "[etcd] Running snapshot save once on host [%s]", etcdHost.Address)
|
|
logrus.Debugf("[etcd] Using command [%s] for snapshot save once container [%s] on host [%s]", getSanitizedSnapshotCmd(imageCfg, es.BackupConfig), EtcdSnapshotOnceContainerName, etcdHost.Address)
|
|
if err := docker.DoRemoveContainer(ctx, etcdHost.DClient, EtcdSnapshotOnceContainerName, etcdHost.Address); err != nil {
|
|
return err
|
|
}
|
|
if err := docker.DoRunContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdSnapshotOnceContainerName, etcdHost.Address, ETCDRole, prsMap); err != nil {
|
|
return err
|
|
}
|
|
status, _, stderr, err := docker.GetContainerOutput(ctx, etcdHost.DClient, EtcdSnapshotOnceContainerName, etcdHost.Address)
|
|
if status != 0 || err != nil {
|
|
if removeErr := docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdSnapshotOnceContainerName); removeErr != nil {
|
|
log.Warnf(ctx, "[etcd] Failed to remove container [%s] on host [%s]: %v", removeErr, etcdHost.Address)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return fmt.Errorf("[etcd] Failed to take one-time snapshot on host [%s], exit code [%d]: %v", etcdHost.Address, status, stderr)
|
|
}
|
|
|
|
return docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdSnapshotOnceContainerName)
|
|
}
|
|
log.Infof(ctx, "[etcd] Running rolling snapshot container [%s] on host [%s]", EtcdSnapshotOnceContainerName, etcdHost.Address)
|
|
logrus.Debugf("[etcd] Using command [%s] for rolling snapshot container [%s] on host [%s]", getSanitizedSnapshotCmd(imageCfg, es.BackupConfig), EtcdSnapshotContainerName, etcdHost.Address)
|
|
if err := docker.DoRemoveContainer(ctx, etcdHost.DClient, EtcdSnapshotContainerName, etcdHost.Address); err != nil {
|
|
return err
|
|
}
|
|
if err := docker.DoRunContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdSnapshotContainerName, etcdHost.Address, ETCDRole, prsMap); err != nil {
|
|
return err
|
|
}
|
|
// check if the container exited with error
|
|
snapshotCont, err := docker.InspectContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdSnapshotContainerName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
time.Sleep(EtcdSnapshotWaitTime * time.Second)
|
|
if snapshotCont.State.Status == "exited" || snapshotCont.State.Restarting {
|
|
log.Warnf(ctx, "[etcd] etcd rolling snapshot container failed to start correctly")
|
|
return docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdSnapshotContainerName)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func RunGetStateFileFromSnapshot(ctx context.Context, etcdHost *hosts.Host, prsMap map[string]v3.PrivateRegistry, etcdSnapshotImage string, name string, es v3.ETCDService) (string, error) {
|
|
backupCmd := "etcd-backup"
|
|
imageCfg := &container.Config{
|
|
Cmd: []string{
|
|
"/opt/rke-tools/rke-etcd-backup",
|
|
backupCmd,
|
|
"extractstatefile",
|
|
"--name", name,
|
|
},
|
|
Image: etcdSnapshotImage,
|
|
Env: es.ExtraEnv,
|
|
}
|
|
// Configure imageCfg for S3 backups
|
|
if es.BackupConfig != nil {
|
|
imageCfg = configS3BackupImgCmd(ctx, imageCfg, es.BackupConfig)
|
|
}
|
|
hostCfg := &container.HostConfig{
|
|
Binds: []string{
|
|
fmt.Sprintf("%s:/backup:z", EtcdSnapshotPath),
|
|
},
|
|
NetworkMode: container.NetworkMode("host"),
|
|
RestartPolicy: container.RestartPolicy{Name: "no"},
|
|
}
|
|
|
|
if err := docker.DoRemoveContainer(ctx, etcdHost.DClient, EtcdStateFileContainerName, etcdHost.Address); err != nil {
|
|
return "", err
|
|
}
|
|
if err := docker.DoRunOnetimeContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdStateFileContainerName, etcdHost.Address, ETCDRole, prsMap); err != nil {
|
|
return "", err
|
|
}
|
|
statefile, err := docker.ReadFileFromContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdStateFileContainerName, "/tmp/cluster.rkestate")
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if err := docker.DoRemoveContainer(ctx, etcdHost.DClient, EtcdStateFileContainerName, etcdHost.Address); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return statefile, nil
|
|
}
|
|
|
|
func DownloadEtcdSnapshotFromS3(ctx context.Context, etcdHost *hosts.Host, prsMap map[string]v3.PrivateRegistry, etcdSnapshotImage string, name string, es v3.ETCDService) error {
|
|
s3Backend := es.BackupConfig.S3BackupConfig
|
|
if len(s3Backend.Endpoint) == 0 || len(s3Backend.BucketName) == 0 {
|
|
return fmt.Errorf("failed to get snapshot [%s] from s3 on host [%s], invalid s3 configurations", name, etcdHost.Address)
|
|
}
|
|
imageCfg := &container.Config{
|
|
Cmd: []string{
|
|
"/opt/rke-tools/rke-etcd-backup",
|
|
"etcd-backup",
|
|
"download",
|
|
"--name", name,
|
|
"--s3-backup=true",
|
|
"--s3-endpoint=" + s3Backend.Endpoint,
|
|
"--s3-bucketName=" + s3Backend.BucketName,
|
|
"--s3-region=" + s3Backend.Region,
|
|
},
|
|
Image: etcdSnapshotImage,
|
|
Env: es.ExtraEnv,
|
|
}
|
|
// Base64 encoding S3 accessKey and secretKey before add them as env variables
|
|
if len(s3Backend.AccessKey) > 0 || len(s3Backend.SecretKey) > 0 {
|
|
env := []string{
|
|
"S3_ACCESS_KEY=" + base64.StdEncoding.EncodeToString([]byte(s3Backend.AccessKey)),
|
|
"S3_SECRET_KEY=" + base64.StdEncoding.EncodeToString([]byte(s3Backend.SecretKey)),
|
|
}
|
|
imageCfg.Env = append(imageCfg.Env, env...)
|
|
}
|
|
s3Logline := fmt.Sprintf("[etcd] Snapshot [%s] will be downloaded on host [%s] from S3 compatible backend at [%s] from bucket [%s] using accesskey [%s]", name, etcdHost.Address, s3Backend.Endpoint, s3Backend.BucketName, s3Backend.AccessKey)
|
|
if s3Backend.Region != "" {
|
|
s3Logline += fmt.Sprintf(" and using region [%s]", s3Backend.Region)
|
|
}
|
|
|
|
if s3Backend.CustomCA != "" {
|
|
caStr := base64.StdEncoding.EncodeToString([]byte(s3Backend.CustomCA))
|
|
imageCfg.Cmd = append(imageCfg.Cmd, "--s3-endpoint-ca="+caStr)
|
|
s3Logline += fmt.Sprintf(" and using endpoint CA [%s]", caStr)
|
|
}
|
|
if s3Backend.Folder != "" {
|
|
imageCfg.Cmd = append(imageCfg.Cmd, "--s3-folder="+s3Backend.Folder)
|
|
s3Logline += fmt.Sprintf(" and using folder [%s]", s3Backend.Folder)
|
|
}
|
|
log.Infof(ctx, s3Logline)
|
|
hostCfg := &container.HostConfig{
|
|
Binds: []string{
|
|
fmt.Sprintf("%s:/backup:z", EtcdSnapshotPath),
|
|
fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(etcdHost.PrefixPath, "/etc/kubernetes"))},
|
|
NetworkMode: container.NetworkMode("host"),
|
|
RestartPolicy: container.RestartPolicy{Name: "no"},
|
|
}
|
|
if err := docker.DoRemoveContainer(ctx, etcdHost.DClient, EtcdDownloadBackupContainerName, etcdHost.Address); err != nil {
|
|
return err
|
|
}
|
|
if err := docker.DoRunContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdDownloadBackupContainerName, etcdHost.Address, ETCDRole, prsMap); err != nil {
|
|
return err
|
|
}
|
|
|
|
status, _, stderr, err := docker.GetContainerOutput(ctx, etcdHost.DClient, EtcdDownloadBackupContainerName, etcdHost.Address)
|
|
if status != 0 || err != nil {
|
|
if removeErr := docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdDownloadBackupContainerName); removeErr != nil {
|
|
log.Warnf(ctx, "Failed to remove container [%s]: %v", removeErr)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return fmt.Errorf("Failed to download etcd snapshot from s3, exit code [%d]: %v", status, stderr)
|
|
}
|
|
return docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdDownloadBackupContainerName)
|
|
}
|
|
|
|
func RestoreEtcdSnapshot(ctx context.Context, etcdHost *hosts.Host, prsMap map[string]v3.PrivateRegistry,
|
|
etcdRestoreImage, etcdBackupImage, snapshotName, initCluster string, es v3.ETCDService) error {
|
|
log.Infof(ctx, "[etcd] Restoring [%s] snapshot on etcd host [%s]", snapshotName, etcdHost.Address)
|
|
nodeName := pki.GetCrtNameForHost(etcdHost, pki.EtcdCertName)
|
|
snapshotPath := fmt.Sprintf("%s%s", EtcdSnapshotPath, snapshotName)
|
|
|
|
// make sure that restore path is empty otherwise etcd restore will fail
|
|
imageCfg := &container.Config{
|
|
Cmd: []string{
|
|
"sh", "-c", strings.Join([]string{
|
|
"rm -rf", EtcdRestorePath,
|
|
"&& /usr/local/bin/etcdctl",
|
|
fmt.Sprintf("--endpoints=[%s:2379]", etcdHost.InternalAddress),
|
|
"--cacert", pki.GetCertPath(pki.CACertName),
|
|
"--cert", pki.GetCertPath(nodeName),
|
|
"--key", pki.GetKeyPath(nodeName),
|
|
"snapshot", "restore", snapshotPath,
|
|
"--data-dir=" + EtcdRestorePath,
|
|
"--name=etcd-" + etcdHost.HostnameOverride,
|
|
"--initial-cluster=" + initCluster,
|
|
"--initial-cluster-token=etcd-cluster-1",
|
|
"--initial-advertise-peer-urls=https://" + etcdHost.InternalAddress + ":2380",
|
|
"&& mv", EtcdRestorePath + "*", EtcdDataDir,
|
|
"&& rm -rf", EtcdRestorePath,
|
|
}, " "),
|
|
},
|
|
Env: append([]string{"ETCDCTL_API=3"}, es.ExtraEnv...),
|
|
Image: etcdRestoreImage,
|
|
}
|
|
hostCfg := &container.HostConfig{
|
|
Binds: []string{
|
|
"/opt/rke/:/opt/rke/:z",
|
|
fmt.Sprintf("%s:/var/lib/rancher/etcd:z", path.Join(etcdHost.PrefixPath, "/var/lib/etcd")),
|
|
fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(etcdHost.PrefixPath, "/etc/kubernetes"))},
|
|
NetworkMode: container.NetworkMode("host"),
|
|
}
|
|
if err := docker.DoRemoveContainer(ctx, etcdHost.DClient, EtcdRestoreContainerName, etcdHost.Address); err != nil {
|
|
return err
|
|
}
|
|
if err := docker.DoRunContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdRestoreContainerName, etcdHost.Address, ETCDRole, prsMap); err != nil {
|
|
return err
|
|
}
|
|
status, err := docker.WaitForContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdRestoreContainerName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if status != 0 {
|
|
containerLog, _, err := docker.GetContainerLogsStdoutStderr(ctx, etcdHost.DClient, EtcdRestoreContainerName, "5", false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdRestoreContainerName); err != nil {
|
|
return err
|
|
}
|
|
// printing the restore container's logs
|
|
return fmt.Errorf("Failed to run etcd restore container, exit status is: %d, container logs: %s", status, containerLog)
|
|
}
|
|
if err := docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdRestoreContainerName); err != nil {
|
|
return err
|
|
}
|
|
return RunEtcdSnapshotRemove(ctx, etcdHost, prsMap, etcdBackupImage, snapshotName, true, es)
|
|
}
|
|
|
|
func RunEtcdSnapshotRemove(ctx context.Context, etcdHost *hosts.Host, prsMap map[string]v3.PrivateRegistry, etcdSnapshotImage string, name string, cleanupRestore bool, es v3.ETCDService) error {
|
|
log.Infof(ctx, "[etcd] Removing snapshot [%s] from host [%s]", name, etcdHost.Address)
|
|
imageCfg := &container.Config{
|
|
Image: etcdSnapshotImage,
|
|
Env: es.ExtraEnv,
|
|
Cmd: []string{
|
|
"/opt/rke-tools/rke-etcd-backup",
|
|
"etcd-backup",
|
|
"delete",
|
|
"--name", name,
|
|
},
|
|
}
|
|
if cleanupRestore {
|
|
imageCfg.Cmd = append(imageCfg.Cmd, "--cleanup")
|
|
}
|
|
if es.BackupConfig != nil && es.BackupConfig.S3BackupConfig != nil {
|
|
s3cmd := []string{
|
|
"--s3-backup",
|
|
"--s3-endpoint=" + es.BackupConfig.S3BackupConfig.Endpoint,
|
|
"--s3-bucketName=" + es.BackupConfig.S3BackupConfig.BucketName,
|
|
"--s3-region=" + es.BackupConfig.S3BackupConfig.Region,
|
|
}
|
|
// Base64 encoding S3 accessKey and secretKey before add them as env variables
|
|
if len(es.BackupConfig.S3BackupConfig.AccessKey) > 0 || len(es.BackupConfig.S3BackupConfig.SecretKey) > 0 {
|
|
env := []string{
|
|
"S3_ACCESS_KEY=" + base64.StdEncoding.EncodeToString([]byte(es.BackupConfig.S3BackupConfig.AccessKey)),
|
|
"S3_SECRET_KEY=" + base64.StdEncoding.EncodeToString([]byte(es.BackupConfig.S3BackupConfig.SecretKey)),
|
|
}
|
|
imageCfg.Env = append(imageCfg.Env, env...)
|
|
}
|
|
if es.BackupConfig.S3BackupConfig.CustomCA != "" {
|
|
caStr := base64.StdEncoding.EncodeToString([]byte(es.BackupConfig.S3BackupConfig.CustomCA))
|
|
s3cmd = append(s3cmd, "--s3-endpoint-ca="+caStr)
|
|
}
|
|
if es.BackupConfig.S3BackupConfig.Folder != "" {
|
|
s3cmd = append(s3cmd, "--s3-folder="+es.BackupConfig.S3BackupConfig.Folder)
|
|
}
|
|
imageCfg.Cmd = append(imageCfg.Cmd, s3cmd...)
|
|
}
|
|
|
|
hostCfg := &container.HostConfig{
|
|
Binds: []string{
|
|
fmt.Sprintf("%s:/backup:z", EtcdSnapshotPath),
|
|
},
|
|
RestartPolicy: container.RestartPolicy{Name: "no"},
|
|
}
|
|
if err := docker.DoRemoveContainer(ctx, etcdHost.DClient, EtcdSnapshotRemoveContainerName, etcdHost.Address); err != nil {
|
|
return err
|
|
}
|
|
if err := docker.DoRunContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdSnapshotRemoveContainerName, etcdHost.Address, ETCDRole, prsMap); err != nil {
|
|
return err
|
|
}
|
|
status, _, stderr, err := docker.GetContainerOutput(ctx, etcdHost.DClient, EtcdSnapshotRemoveContainerName, etcdHost.Address)
|
|
if status != 0 || err != nil {
|
|
if removeErr := docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdSnapshotRemoveContainerName); removeErr != nil {
|
|
log.Warnf(ctx, "Failed to remove container [%s]: %v", removeErr)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return fmt.Errorf("Failed to remove snapshot [%s] on host [%s], exit code [%d]: %v", EtcdSnapshotRemoveContainerName, etcdHost.Address, status, stderr)
|
|
}
|
|
|
|
return docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdSnapshotRemoveContainerName)
|
|
}
|
|
|
|
func GetEtcdSnapshotChecksum(ctx context.Context, etcdHost *hosts.Host, prsMap map[string]v3.PrivateRegistry, alpineImage, snapshotName string) (string, error) {
|
|
var checksum string
|
|
var err error
|
|
var stderr string
|
|
|
|
// compressedSnapshotPath := fmt.Sprintf("%s%s.%s", EtcdSnapshotPath, snapshotName, EtcdSnapshotCompressedExtension)
|
|
snapshotPath := fmt.Sprintf("%s%s", EtcdSnapshotPath, snapshotName)
|
|
imageCfg := &container.Config{
|
|
Cmd: []string{
|
|
"sh", "-c", strings.Join([]string{
|
|
" if [ -f '", snapshotPath, "' ]; then md5sum '", snapshotPath, "' | cut -f1 -d' ' | tr -d '\n'; else echo 'snapshot file does not exist' >&2; fi"}, ""),
|
|
},
|
|
Image: alpineImage,
|
|
}
|
|
hostCfg := &container.HostConfig{
|
|
Binds: []string{
|
|
"/opt/rke/:/opt/rke/:z",
|
|
}}
|
|
|
|
if err := docker.DoRunContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdChecksumContainerName, etcdHost.Address, ETCDRole, prsMap); err != nil {
|
|
return checksum, err
|
|
}
|
|
if _, err := docker.WaitForContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdChecksumContainerName); err != nil {
|
|
return checksum, err
|
|
}
|
|
stderr, checksum, err = docker.GetContainerLogsStdoutStderr(ctx, etcdHost.DClient, EtcdChecksumContainerName, "1", false)
|
|
if err != nil {
|
|
return checksum, err
|
|
}
|
|
if stderr != "" {
|
|
return checksum, fmt.Errorf("Error output not nil from snapshot checksum container [%s]: %s", EtcdChecksumContainerName, stderr)
|
|
|
|
}
|
|
if err := docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdChecksumContainerName); err != nil {
|
|
return checksum, err
|
|
}
|
|
return checksum, nil
|
|
}
|
|
|
|
func configS3BackupImgCmd(ctx context.Context, imageCfg *container.Config, bc *v3.BackupConfig) *container.Config {
|
|
cmd := []string{
|
|
"--creation=" + fmt.Sprintf("%dh", bc.IntervalHours),
|
|
"--retention=" + fmt.Sprintf("%dh", bc.Retention*bc.IntervalHours),
|
|
}
|
|
|
|
if bc.S3BackupConfig != nil {
|
|
cmd = append(cmd, []string{
|
|
"--s3-backup=true",
|
|
"--s3-endpoint=" + bc.S3BackupConfig.Endpoint,
|
|
"--s3-bucketName=" + bc.S3BackupConfig.BucketName,
|
|
"--s3-region=" + bc.S3BackupConfig.Region,
|
|
}...)
|
|
// Base64 encoding S3 accessKey and secretKey before add them as env variables
|
|
if len(bc.S3BackupConfig.AccessKey) > 0 || len(bc.S3BackupConfig.SecretKey) > 0 {
|
|
env := []string{
|
|
"S3_ACCESS_KEY=" + base64.StdEncoding.EncodeToString([]byte(bc.S3BackupConfig.AccessKey)),
|
|
"S3_SECRET_KEY=" + base64.StdEncoding.EncodeToString([]byte(bc.S3BackupConfig.SecretKey)),
|
|
}
|
|
imageCfg.Env = append(imageCfg.Env, env...)
|
|
}
|
|
s3Logline := fmt.Sprintf("[etcd] Snapshots configured to S3 compatible backend at [%s] to bucket [%s] using accesskey [%s]", bc.S3BackupConfig.Endpoint, bc.S3BackupConfig.BucketName, bc.S3BackupConfig.AccessKey)
|
|
if bc.S3BackupConfig.Region != "" {
|
|
s3Logline += fmt.Sprintf(" and using region [%s]", bc.S3BackupConfig.Region)
|
|
}
|
|
if bc.S3BackupConfig.CustomCA != "" {
|
|
caStr := base64.StdEncoding.EncodeToString([]byte(bc.S3BackupConfig.CustomCA))
|
|
cmd = append(cmd, "--s3-endpoint-ca="+caStr)
|
|
s3Logline += fmt.Sprintf(" and using endpoint CA [%s]", caStr)
|
|
}
|
|
if bc.S3BackupConfig.Folder != "" {
|
|
cmd = append(cmd, "--s3-folder="+bc.S3BackupConfig.Folder)
|
|
s3Logline += fmt.Sprintf(" and using folder [%s]", bc.S3BackupConfig.Folder)
|
|
}
|
|
log.Infof(ctx, s3Logline)
|
|
}
|
|
imageCfg.Cmd = append(imageCfg.Cmd, cmd...)
|
|
return imageCfg
|
|
}
|
|
|
|
func StartBackupServer(ctx context.Context, etcdHost *hosts.Host, prsMap map[string]v3.PrivateRegistry, etcdSnapshotImage string, name string) error {
|
|
log.Infof(ctx, "[etcd] starting backup server on host [%s]", etcdHost.Address)
|
|
|
|
imageCfg := &container.Config{
|
|
Cmd: []string{
|
|
"/opt/rke-tools/rke-etcd-backup",
|
|
"etcd-backup",
|
|
"serve",
|
|
"--name", name,
|
|
"--cacert", pki.GetCertPath(pki.CACertName),
|
|
"--cert", pki.GetCertPath(pki.KubeNodeCertName),
|
|
"--key", pki.GetKeyPath(pki.KubeNodeCertName),
|
|
},
|
|
Image: etcdSnapshotImage,
|
|
}
|
|
|
|
hostCfg := &container.HostConfig{
|
|
Binds: []string{
|
|
fmt.Sprintf("%s:/backup:z", EtcdSnapshotPath),
|
|
fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(etcdHost.PrefixPath, "/etc/kubernetes"))},
|
|
NetworkMode: container.NetworkMode("host"),
|
|
RestartPolicy: container.RestartPolicy{Name: "no"},
|
|
}
|
|
if err := docker.DoRemoveContainer(ctx, etcdHost.DClient, EtcdServeBackupContainerName, etcdHost.Address); err != nil {
|
|
return err
|
|
}
|
|
if err := docker.DoRunContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdServeBackupContainerName, etcdHost.Address, ETCDRole, prsMap); err != nil {
|
|
return err
|
|
}
|
|
time.Sleep(EtcdSnapshotWaitTime * time.Second)
|
|
container, err := docker.InspectContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdServeBackupContainerName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !container.State.Running {
|
|
containerLog, _, err := docker.GetContainerLogsStdoutStderr(ctx, etcdHost.DClient, EtcdServeBackupContainerName, "1", false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdServeBackupContainerName); err != nil {
|
|
return err
|
|
}
|
|
// printing the restore container's logs
|
|
return fmt.Errorf("Failed to run backup server container, container logs: %s", containerLog)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func DownloadEtcdSnapshotFromBackupServer(ctx context.Context, etcdHost *hosts.Host, prsMap map[string]v3.PrivateRegistry, etcdSnapshotImage, name string, backupServer *hosts.Host) error {
|
|
log.Infof(ctx, "[etcd] Get snapshot [%s] on host [%s]", name, etcdHost.Address)
|
|
imageCfg := &container.Config{
|
|
Cmd: []string{
|
|
"/opt/rke-tools/rke-etcd-backup",
|
|
"etcd-backup",
|
|
"download",
|
|
"--name", name,
|
|
"--local-endpoint", backupServer.InternalAddress,
|
|
"--cacert", pki.GetCertPath(pki.CACertName),
|
|
"--cert", pki.GetCertPath(pki.KubeNodeCertName),
|
|
"--key", pki.GetKeyPath(pki.KubeNodeCertName),
|
|
},
|
|
Image: etcdSnapshotImage,
|
|
}
|
|
|
|
hostCfg := &container.HostConfig{
|
|
Binds: []string{
|
|
fmt.Sprintf("%s:/backup:z", EtcdSnapshotPath),
|
|
fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(etcdHost.PrefixPath, "/etc/kubernetes"))},
|
|
NetworkMode: container.NetworkMode("host"),
|
|
RestartPolicy: container.RestartPolicy{Name: "on-failure"},
|
|
}
|
|
if err := docker.DoRemoveContainer(ctx, etcdHost.DClient, EtcdDownloadBackupContainerName, etcdHost.Address); err != nil {
|
|
return err
|
|
}
|
|
if err := docker.DoRunContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdDownloadBackupContainerName, etcdHost.Address, ETCDRole, prsMap); err != nil {
|
|
return err
|
|
}
|
|
|
|
status, _, stderr, err := docker.GetContainerOutput(ctx, etcdHost.DClient, EtcdDownloadBackupContainerName, etcdHost.Address)
|
|
if status != 0 || err != nil {
|
|
if removeErr := docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdDownloadBackupContainerName); removeErr != nil {
|
|
log.Warnf(ctx, "Failed to remove container [%s]: %v", removeErr)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return fmt.Errorf("Failed to download etcd snapshot from backup server [%s], exit code [%d]: %v", backupServer.Address, status, stderr)
|
|
}
|
|
return docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdDownloadBackupContainerName)
|
|
}
|
|
|
|
func setEtcdPermissions(ctx context.Context, etcdHost *hosts.Host, prsMap map[string]v3.PrivateRegistry, alpineImage string, process v3.Process) error {
|
|
var dataBind string
|
|
|
|
cmd := fmt.Sprintf("chmod 700 %s", EtcdDataDir)
|
|
if len(process.User) != 0 {
|
|
cmd = fmt.Sprintf("chmod 700 %s ; chown -R %s %s", EtcdDataDir, process.User, EtcdDataDir)
|
|
}
|
|
imageCfg := &container.Config{
|
|
Cmd: []string{
|
|
"sh", "-c",
|
|
cmd,
|
|
},
|
|
Image: alpineImage,
|
|
}
|
|
for _, bind := range process.Binds {
|
|
if strings.Contains(bind, "/var/lib/etcd") {
|
|
dataBind = bind
|
|
}
|
|
}
|
|
hostCfg := &container.HostConfig{
|
|
Binds: []string{dataBind},
|
|
}
|
|
if err := docker.DoRunOnetimeContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdPermFixContainerName,
|
|
etcdHost.Address, ETCDRole, prsMap); err != nil {
|
|
return err
|
|
}
|
|
return docker.DoRemoveContainer(ctx, etcdHost.DClient, EtcdPermFixContainerName, etcdHost.Address)
|
|
}
|
|
|
|
func getSanitizedSnapshotCmd(imageCfg *container.Config, bc *v3.BackupConfig) string {
|
|
cmd := strings.Join(imageCfg.Cmd, " ")
|
|
if bc != nil && bc.S3BackupConfig != nil {
|
|
return strings.Replace(cmd, bc.S3BackupConfig.SecretKey, "***", -1)
|
|
}
|
|
return cmd
|
|
}
|