mirror of
https://github.com/rancher/rke.git
synced 2025-04-27 11:21:08 +00:00
Able to include and extract state file in snapshot
This commit is contained in:
parent
e2b5828e5b
commit
9bca29befb
@ -2,6 +2,7 @@ package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
@ -50,6 +51,44 @@ func (c *Cluster) DeployRestoreCerts(ctx context.Context, clusterCerts map[strin
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) DeployStateFile(ctx context.Context, fullState *FullState, snapshotName string) error {
|
||||
var errgrp errgroup.Group
|
||||
hostsQueue := util.GetObjectQueue(c.EtcdHosts)
|
||||
stateFile, err := json.MarshalIndent(fullState, "", " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for w := 0; w < WorkerThreads; w++ {
|
||||
errgrp.Go(func() error {
|
||||
var errList []error
|
||||
for host := range hostsQueue {
|
||||
err := pki.DeployStateOnPlaneHost(ctx, host.(*hosts.Host), c.SystemImages.CertDownloader, c.PrivateRegistriesMap, string(stateFile), snapshotName)
|
||||
if err != nil {
|
||||
errList = append(errList, err)
|
||||
}
|
||||
}
|
||||
return util.ErrList(errList)
|
||||
})
|
||||
}
|
||||
if err := errgrp.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) GetStateFileFromSnapshot(ctx context.Context, snapshotName string) (string, error) {
|
||||
backupImage := c.getBackupImage()
|
||||
for _, host := range c.EtcdHosts {
|
||||
stateFile, err := services.RunGetStateFileFromSnapshot(ctx, host, c.PrivateRegistriesMap, backupImage, snapshotName, c.Services.Etcd)
|
||||
if err != nil || stateFile == "" {
|
||||
logrus.Infof("Could not extract state file from snapshot [%s] on host [%s]", snapshotName, host.Address)
|
||||
continue
|
||||
}
|
||||
return stateFile, nil
|
||||
}
|
||||
return "", fmt.Errorf("Unable to find statefile in snapshot [%s]", snapshotName)
|
||||
}
|
||||
|
||||
func (c *Cluster) PrepareBackup(ctx context.Context, snapshotPath string) error {
|
||||
// local backup case
|
||||
var backupReady bool
|
||||
|
@ -228,6 +228,19 @@ func GetCertificateDirPath(configPath, configDir string) string {
|
||||
return trimmedName + certDirExt
|
||||
}
|
||||
|
||||
func StringToFullState(ctx context.Context, stateFileContent string) (*FullState, error) {
|
||||
rkeFullState := &FullState{}
|
||||
logrus.Tracef("stateFileContent: %s", stateFileContent)
|
||||
if err := json.Unmarshal([]byte(stateFileContent), rkeFullState); err != nil {
|
||||
return rkeFullState, err
|
||||
}
|
||||
rkeFullState.DesiredState.CertificatesBundle = pki.TransformPEMToObject(rkeFullState.DesiredState.CertificatesBundle)
|
||||
rkeFullState.CurrentState.CertificatesBundle = pki.TransformPEMToObject(rkeFullState.CurrentState.CertificatesBundle)
|
||||
logrus.Tracef("rkeFullState: %+v", rkeFullState)
|
||||
|
||||
return rkeFullState, nil
|
||||
}
|
||||
|
||||
func ReadStateFile(ctx context.Context, statePath string) (*FullState, error) {
|
||||
rkeFullState := &FullState{}
|
||||
fp, err := filepath.Abs(statePath)
|
||||
|
52
cmd/etcd.go
52
cmd/etcd.go
@ -107,6 +107,10 @@ func SnapshotSaveEtcdHosts(
|
||||
flags cluster.ExternalFlags, snapshotName string) error {
|
||||
|
||||
log.Infof(ctx, "Starting saving snapshot on etcd hosts")
|
||||
|
||||
stateFilePath := cluster.GetStateFilePath(flags.ClusterFilePath, flags.ConfigDir)
|
||||
rkeFullState, _ := cluster.ReadStateFile(ctx, stateFilePath)
|
||||
|
||||
kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, flags, "")
|
||||
if err != nil {
|
||||
return err
|
||||
@ -119,6 +123,10 @@ func SnapshotSaveEtcdHosts(
|
||||
return err
|
||||
}
|
||||
|
||||
if err := kubeCluster.DeployStateFile(ctx, rkeFullState, snapshotName); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := kubeCluster.SnapshotEtcd(ctx, snapshotName); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -135,10 +143,43 @@ func RestoreEtcdSnapshot(
|
||||
data map[string]interface{},
|
||||
snapshotName string) (string, string, string, string, map[string]pki.CertificatePKI, error) {
|
||||
var APIURL, caCrt, clientCert, clientKey string
|
||||
log.Infof(ctx, "Restoring etcd snapshot %s", snapshotName)
|
||||
|
||||
log.Infof(ctx, "Checking if state file is included in snapshot file for %s", snapshotName)
|
||||
// Creating temp cluster to check if snapshot archive contains statefile and retrieve it
|
||||
tempCluster, err := cluster.InitClusterObject(ctx, rkeConfig, flags, "")
|
||||
if err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
if err := tempCluster.SetupDialers(ctx, dialersOptions); err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
if err := tempCluster.TunnelHosts(ctx, flags); err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
|
||||
rkeFullState := &cluster.FullState{}
|
||||
stateFileRetrieved := false
|
||||
|
||||
// Local state file
|
||||
stateFilePath := cluster.GetStateFilePath(flags.ClusterFilePath, flags.ConfigDir)
|
||||
rkeFullState, _ := cluster.ReadStateFile(ctx, stateFilePath)
|
||||
// Extract state file from snapshot
|
||||
stateFile, err := tempCluster.GetStateFileFromSnapshot(ctx, snapshotName)
|
||||
// If state file is not in snapshot (or can't be retrieved), fallback to local state file
|
||||
if err != nil {
|
||||
logrus.Infof("Could not extract state file from snapshot [%s] on any host, falling back to local state file: %v", snapshotName, err)
|
||||
rkeFullState, _ = cluster.ReadStateFile(ctx, stateFilePath)
|
||||
} else {
|
||||
// Parse extracted statefile to FullState struct
|
||||
rkeFullState, err = cluster.StringToFullState(ctx, stateFile)
|
||||
if err != nil {
|
||||
logrus.Errorf("Error when converting state file contents to rkeFullState: %v", err)
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
logrus.Infof("State file is successfully extracted from snapshot [%s]", snapshotName)
|
||||
stateFileRetrieved = true
|
||||
}
|
||||
|
||||
log.Infof(ctx, "Restoring etcd snapshot %s", snapshotName)
|
||||
|
||||
kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, flags, rkeFullState.DesiredState.EncryptionConfig)
|
||||
if err != nil {
|
||||
@ -149,8 +190,11 @@ func RestoreEtcdSnapshot(
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
|
||||
if err := checkLegacyCluster(ctx, kubeCluster, rkeFullState, flags); err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
// If we can't retrieve statefile from snapshot, and we don't have local, we need to check for legacy cluster
|
||||
if !stateFileRetrieved {
|
||||
if err := checkLegacyCluster(ctx, kubeCluster, rkeFullState, flags); err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
rkeFullState.CurrentState = cluster.State{}
|
||||
|
@ -483,14 +483,24 @@ func WaitForContainer(ctx context.Context, dClient *client.Client, hostname stri
|
||||
return 1, fmt.Errorf("Could not inspect container [%s] on host [%s]: %s", containerName, hostname, err)
|
||||
}
|
||||
if container.State.Running {
|
||||
log.Infof(ctx, "Container [%s] is still running on host [%s]", containerName, hostname)
|
||||
stderr, stdout, err := GetContainerLogsStdoutStderr(ctx, dClient, containerName, "1", false)
|
||||
if err != nil {
|
||||
logrus.Warnf("Failed to get container logs from container [%s] on host [%s]: %v", containerName, hostname, err)
|
||||
}
|
||||
|
||||
log.Infof(ctx, "Container [%s] is still running on host [%s]: stderr: [%s], stdout: [%s]", containerName, hostname, stderr, stdout)
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
}
|
||||
logrus.Debugf("Exit code for [%s] container on host [%s] is [%d]", containerName, hostname, int64(container.State.ExitCode))
|
||||
return int64(container.State.ExitCode), nil
|
||||
}
|
||||
return 1, fmt.Errorf("Container [%s] did not exit in time on host [%s]", containerName, hostname)
|
||||
stderr, stdout, err := GetContainerLogsStdoutStderr(ctx, dClient, containerName, "1", false)
|
||||
if err != nil {
|
||||
logrus.Warnf("Failed to get container logs from container [%s] on host [%s]", containerName, hostname)
|
||||
}
|
||||
|
||||
return 1, fmt.Errorf("Container [%s] did not exit in time on host [%s]: stderr: [%s], stdout: [%s]", containerName, hostname, stderr, stdout)
|
||||
}
|
||||
|
||||
func IsContainerUpgradable(ctx context.Context, dClient *client.Client, imageCfg *container.Config, hostCfg *container.HostConfig, containerName string, hostname string, plane string) (bool, error) {
|
||||
|
@ -3,7 +3,8 @@ package pki
|
||||
import "time"
|
||||
|
||||
const (
|
||||
CertPathPrefix = "/etc/kubernetes/ssl/"
|
||||
K8sBaseDir = "/etc/kubernetes/"
|
||||
CertPathPrefix = K8sBaseDir + "ssl/"
|
||||
CertificatesServiceName = "certificates"
|
||||
CrtDownloaderContainer = "cert-deployer"
|
||||
CertFetcherContainer = "cert-fetcher"
|
||||
@ -11,6 +12,7 @@ const (
|
||||
TempCertPath = "/etc/kubernetes/.tmp/"
|
||||
ClusterConfig = "cluster.yml"
|
||||
ClusterStateFile = "cluster-state.yml"
|
||||
ClusterStateExt = ".rkestate"
|
||||
ClusterStateEnv = "CLUSTER_STATE"
|
||||
BundleCertPath = "/backup/pki.bundle.tar.gz"
|
||||
|
||||
|
@ -53,19 +53,20 @@ func DeployCertificatesOnPlaneHost(ctx context.Context, host *hosts.Host, rkeCon
|
||||
return doRunDeployer(ctx, host, env, certDownloaderImage, prsMap)
|
||||
}
|
||||
|
||||
func DeployStateOnPlaneHost(ctx context.Context, host *hosts.Host, stateDownloaderImage string, prsMap map[string]v3.PrivateRegistry, clusterState string) error {
|
||||
func DeployStateOnPlaneHost(ctx context.Context, host *hosts.Host, stateDownloaderImage string, prsMap map[string]v3.PrivateRegistry, clusterState string, snapshotName string) error {
|
||||
// remove existing container. Only way it's still here is if previous deployment failed
|
||||
if err := docker.DoRemoveContainer(ctx, host.DClient, StateDeployerContainerName, host.Address); err != nil {
|
||||
return err
|
||||
}
|
||||
containerEnv := []string{ClusterStateEnv + "=" + clusterState}
|
||||
ClusterStateFilePath := path.Join(host.PrefixPath, TempCertPath, ClusterStateFile)
|
||||
ClusterStateFilePath := path.Join(host.PrefixPath, K8sBaseDir, "/", fmt.Sprintf("%s%s", snapshotName, ClusterStateExt))
|
||||
logrus.Debugf("[state] Deploying state to [%v] on node [%s]", ClusterStateFilePath, host.Address)
|
||||
imageCfg := &container.Config{
|
||||
Image: stateDownloaderImage,
|
||||
Cmd: []string{
|
||||
"sh",
|
||||
"-c",
|
||||
fmt.Sprintf("t=$(mktemp); echo -e \"$%s\" > $t && mv $t %s && chmod 644 %s", ClusterStateEnv, ClusterStateFilePath, ClusterStateFilePath),
|
||||
fmt.Sprintf("t=$(mktemp); echo \"$%s\" > $t && mv $t %s && chmod 400 %s", ClusterStateEnv, ClusterStateFilePath, ClusterStateFilePath),
|
||||
},
|
||||
Env: containerEnv,
|
||||
}
|
||||
|
@ -377,6 +377,47 @@ func RunEtcdSnapshotSave(ctx context.Context, etcdHost *hosts.Host, prsMap map[s
|
||||
return nil
|
||||
}
|
||||
|
||||
func RunGetStateFileFromSnapshot(ctx context.Context, etcdHost *hosts.Host, prsMap map[string]v3.PrivateRegistry, etcdSnapshotImage string, name string, es v3.ETCDService) (string, error) {
|
||||
backupCmd := "etcd-backup"
|
||||
imageCfg := &container.Config{
|
||||
Cmd: []string{
|
||||
"/opt/rke-tools/rke-etcd-backup",
|
||||
backupCmd,
|
||||
"extractstatefile",
|
||||
"--name", name,
|
||||
},
|
||||
Image: etcdSnapshotImage,
|
||||
Env: es.ExtraEnv,
|
||||
}
|
||||
// Configure imageCfg for S3 backups
|
||||
if es.BackupConfig != nil {
|
||||
imageCfg = configS3BackupImgCmd(ctx, imageCfg, es.BackupConfig)
|
||||
}
|
||||
hostCfg := &container.HostConfig{
|
||||
Binds: []string{
|
||||
fmt.Sprintf("%s:/backup:z", EtcdSnapshotPath),
|
||||
},
|
||||
NetworkMode: container.NetworkMode("host"),
|
||||
RestartPolicy: container.RestartPolicy{Name: "no"},
|
||||
}
|
||||
|
||||
if err := docker.DoRemoveContainer(ctx, etcdHost.DClient, EtcdStateFileContainerName, etcdHost.Address); err != nil {
|
||||
return "", err
|
||||
}
|
||||
if err := docker.DoRunOnetimeContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdStateFileContainerName, etcdHost.Address, ETCDRole, prsMap); err != nil {
|
||||
return "", err
|
||||
}
|
||||
statefile, err := docker.ReadFileFromContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdStateFileContainerName, "/tmp/cluster.rkestate")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if err := docker.DoRemoveContainer(ctx, etcdHost.DClient, EtcdStateFileContainerName, etcdHost.Address); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return statefile, nil
|
||||
}
|
||||
|
||||
func DownloadEtcdSnapshotFromS3(ctx context.Context, etcdHost *hosts.Host, prsMap map[string]v3.PrivateRegistry, etcdSnapshotImage string, name string, es v3.ETCDService) error {
|
||||
s3Backend := es.BackupConfig.S3BackupConfig
|
||||
if len(s3Backend.Endpoint) == 0 || len(s3Backend.BucketName) == 0 {
|
||||
|
@ -35,6 +35,7 @@ const (
|
||||
EtcdDownloadBackupContainerName = "etcd-download-backup"
|
||||
EtcdServeBackupContainerName = "etcd-Serve-backup"
|
||||
EtcdChecksumContainerName = "etcd-checksum-checker"
|
||||
EtcdStateFileContainerName = "etcd-extract-statefile"
|
||||
NginxProxyContainerName = "nginx-proxy"
|
||||
SidekickContainerName = "service-sidekick"
|
||||
LogLinkContainerName = "rke-log-linker"
|
||||
|
Loading…
Reference in New Issue
Block a user