mirror of
https://github.com/rancher/rke.git
synced 2025-08-31 22:46:25 +00:00
Change file copy method for state file
This commit is contained in:
@@ -2,7 +2,6 @@ package cluster
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
@@ -51,18 +50,14 @@ func (c *Cluster) DeployRestoreCerts(ctx context.Context, clusterCerts map[strin
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) DeployStateFile(ctx context.Context, fullState *FullState, snapshotName string) error {
|
func (c *Cluster) DeployStateFile(ctx context.Context, stateFilePath, snapshotName string) error {
|
||||||
var errgrp errgroup.Group
|
var errgrp errgroup.Group
|
||||||
hostsQueue := util.GetObjectQueue(c.EtcdHosts)
|
hostsQueue := util.GetObjectQueue(c.EtcdHosts)
|
||||||
stateFile, err := json.MarshalIndent(fullState, "", " ")
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
for w := 0; w < WorkerThreads; w++ {
|
for w := 0; w < WorkerThreads; w++ {
|
||||||
errgrp.Go(func() error {
|
errgrp.Go(func() error {
|
||||||
var errList []error
|
var errList []error
|
||||||
for host := range hostsQueue {
|
for host := range hostsQueue {
|
||||||
err := pki.DeployStateOnPlaneHost(ctx, host.(*hosts.Host), c.SystemImages.CertDownloader, c.PrivateRegistriesMap, string(stateFile), snapshotName)
|
err := pki.DeployStateOnPlaneHost(ctx, host.(*hosts.Host), c.SystemImages.CertDownloader, c.PrivateRegistriesMap, stateFilePath, snapshotName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errList = append(errList, err)
|
errList = append(errList, err)
|
||||||
}
|
}
|
||||||
|
@@ -113,7 +113,6 @@ func SnapshotSaveEtcdHosts(
|
|||||||
log.Infof(ctx, "Starting saving snapshot on etcd hosts")
|
log.Infof(ctx, "Starting saving snapshot on etcd hosts")
|
||||||
|
|
||||||
stateFilePath := cluster.GetStateFilePath(flags.ClusterFilePath, flags.ConfigDir)
|
stateFilePath := cluster.GetStateFilePath(flags.ClusterFilePath, flags.ConfigDir)
|
||||||
rkeFullState, _ := cluster.ReadStateFile(ctx, stateFilePath)
|
|
||||||
|
|
||||||
kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, flags, "")
|
kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, flags, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -127,7 +126,7 @@ func SnapshotSaveEtcdHosts(
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := kubeCluster.DeployStateFile(ctx, rkeFullState, snapshotName); err != nil {
|
if err := kubeCluster.DeployStateFile(ctx, stateFilePath, snapshotName); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -135,6 +135,23 @@ func DoRunOnetimeContainer(ctx context.Context, dClient *client.Client, imageCfg
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func DoCopyToContainer(ctx context.Context, dClient *client.Client, plane, containerName, hostname, destinationDir string, tarFile io.Reader) error {
|
||||||
|
if dClient == nil {
|
||||||
|
return fmt.Errorf("[%s] Failed to run container: docker client is nil for container [%s] on host [%s]", plane, containerName, hostname)
|
||||||
|
}
|
||||||
|
// Need container.ID for CopyToContainer function
|
||||||
|
container, err := InspectContainer(ctx, dClient, hostname, containerName)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Could not inspect container [%s] on host [%s]: %s", containerName, hostname, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = dClient.CopyToContainer(ctx, container.ID, destinationDir, tarFile, types.CopyToContainerOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Error while copying file to container [%s] on host [%s]: %v", containerName, hostname, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func DoRollingUpdateContainer(ctx context.Context, dClient *client.Client, imageCfg *container.Config, hostCfg *container.HostConfig, containerName, hostname, plane string, prsMap map[string]v3.PrivateRegistry) error {
|
func DoRollingUpdateContainer(ctx context.Context, dClient *client.Client, imageCfg *container.Config, hostCfg *container.HostConfig, containerName, hostname, plane string, prsMap map[string]v3.PrivateRegistry) error {
|
||||||
if dClient == nil {
|
if dClient == nil {
|
||||||
return fmt.Errorf("[%s] Failed rolling update of container: docker client is nil for container [%s] on host [%s]", plane, containerName, hostname)
|
return fmt.Errorf("[%s] Failed rolling update of container: docker client is nil for container [%s] on host [%s]", plane, containerName, hostname)
|
||||||
|
@@ -11,6 +11,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/docker/docker/api/types/container"
|
"github.com/docker/docker/api/types/container"
|
||||||
|
"github.com/docker/docker/pkg/archive"
|
||||||
"github.com/rancher/rke/docker"
|
"github.com/rancher/rke/docker"
|
||||||
"github.com/rancher/rke/hosts"
|
"github.com/rancher/rke/hosts"
|
||||||
"github.com/rancher/rke/log"
|
"github.com/rancher/rke/log"
|
||||||
@@ -53,22 +54,26 @@ func DeployCertificatesOnPlaneHost(ctx context.Context, host *hosts.Host, rkeCon
|
|||||||
return doRunDeployer(ctx, host, env, certDownloaderImage, prsMap)
|
return doRunDeployer(ctx, host, env, certDownloaderImage, prsMap)
|
||||||
}
|
}
|
||||||
|
|
||||||
func DeployStateOnPlaneHost(ctx context.Context, host *hosts.Host, stateDownloaderImage string, prsMap map[string]v3.PrivateRegistry, clusterState string, snapshotName string) error {
|
func DeployStateOnPlaneHost(ctx context.Context, host *hosts.Host, stateDownloaderImage string, prsMap map[string]v3.PrivateRegistry, stateFilePath, snapshotName string) error {
|
||||||
// remove existing container. Only way it's still here is if previous deployment failed
|
// remove existing container. Only way it's still here is if previous deployment failed
|
||||||
if err := docker.DoRemoveContainer(ctx, host.DClient, StateDeployerContainerName, host.Address); err != nil {
|
if err := docker.DoRemoveContainer(ctx, host.DClient, StateDeployerContainerName, host.Address); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
containerEnv := []string{ClusterStateEnv + "=" + clusterState}
|
// This is the location it needs to end up for rke-tools to pick it up and include it in the snapshot
|
||||||
ClusterStateFilePath := path.Join(host.PrefixPath, K8sBaseDir, "/", fmt.Sprintf("%s%s", snapshotName, ClusterStateExt))
|
// Example: /etc/kubernetes/snapshotname.rkestate
|
||||||
logrus.Debugf("[state] Deploying state to [%v] on node [%s]", ClusterStateFilePath, host.Address)
|
DestinationClusterStateFilePath := path.Join(K8sBaseDir, "/", fmt.Sprintf("%s%s", snapshotName, ClusterStateExt))
|
||||||
|
// This is the location where the 1-on-1 copy from local will be placed in the container, this is later moved to DestinationClusterStateFilePath
|
||||||
|
// Example: /etc/kubernetes/cluster.rkestate
|
||||||
|
SourceClusterStateFilePath := path.Join(K8sBaseDir, stateFilePath)
|
||||||
|
logrus.Infof("[state] Deploying state file to [%v] on host [%s]", DestinationClusterStateFilePath, host.Address)
|
||||||
|
|
||||||
imageCfg := &container.Config{
|
imageCfg := &container.Config{
|
||||||
Image: stateDownloaderImage,
|
Image: stateDownloaderImage,
|
||||||
Cmd: []string{
|
Cmd: []string{
|
||||||
"sh",
|
"sh",
|
||||||
"-c",
|
"-c",
|
||||||
fmt.Sprintf("t=$(mktemp); echo \"$%s\" > $t && mv $t %s && chmod 400 %s", ClusterStateEnv, ClusterStateFilePath, ClusterStateFilePath),
|
fmt.Sprintf("for i in $(seq 1 12); do if [ -f \"%[1]s\" ]; then echo \"File [%[1]s] present in this container\"; echo \"Moving [%[1]s] to [%[2]s]\"; mv %[1]s %[2]s; echo \"State file successfully moved to [%[2]s]\"; echo \"Changing permissions to 0400\"; chmod 400 %[2]s; break; else echo \"Waiting for file [%[1]s] to be successfully copied to this container, retry count $i\"; sleep 5; fi; done", SourceClusterStateFilePath, DestinationClusterStateFilePath),
|
||||||
},
|
},
|
||||||
Env: containerEnv,
|
|
||||||
}
|
}
|
||||||
hostCfg := &container.HostConfig{
|
hostCfg := &container.HostConfig{
|
||||||
Binds: []string{
|
Binds: []string{
|
||||||
@@ -79,10 +84,23 @@ func DeployStateOnPlaneHost(ctx context.Context, host *hosts.Host, stateDownload
|
|||||||
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, StateDeployerContainerName, host.Address, "state", prsMap); err != nil {
|
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, StateDeployerContainerName, host.Address, "state", prsMap); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
tarFile, err := archive.Tar(stateFilePath, archive.Uncompressed)
|
||||||
|
if err != nil {
|
||||||
|
// Snapshot is still valid without containing the state file
|
||||||
|
logrus.Warnf("[state] Error during creating archive tar to copy for local cluster state file [%s] on host [%s]: %v", stateFilePath, host.Address, err)
|
||||||
|
}
|
||||||
|
if err := docker.DoCopyToContainer(ctx, host.DClient, "state", StateDeployerContainerName, host.Address, K8sBaseDir, tarFile); err != nil {
|
||||||
|
// Snapshot is still valid without containing the state file
|
||||||
|
logrus.Warnf("[state] Error during copying state file [%s] to node [%s]: %v", stateFilePath, host.Address, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := docker.WaitForContainer(ctx, host.DClient, host.Address, StateDeployerContainerName); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if err := docker.DoRemoveContainer(ctx, host.DClient, StateDeployerContainerName, host.Address); err != nil {
|
if err := docker.DoRemoveContainer(ctx, host.DClient, StateDeployerContainerName, host.Address); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
logrus.Debugf("[state] Successfully started state deployer container on node [%s]", host.Address)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user