diff --git a/cluster/etcd.go b/cluster/etcd.go index a955e743..d47b81b9 100644 --- a/cluster/etcd.go +++ b/cluster/etcd.go @@ -3,12 +3,9 @@ package cluster import ( "context" "fmt" - "path" - "github.com/rancher/rke/docker" - "github.com/rancher/rke/hosts" + "github.com/rancher/rke/log" "github.com/rancher/rke/services" - "github.com/rancher/types/apis/management.cattle.io/v3" ) func (c *Cluster) SnapshotEtcd(ctx context.Context, snapshotName string) error { @@ -21,11 +18,8 @@ func (c *Cluster) SnapshotEtcd(ctx context.Context, snapshotName string) error { } func (c *Cluster) RestoreEtcdSnapshot(ctx context.Context, snapshotPath string) error { - // Stopping all etcd containers - for _, host := range c.EtcdHosts { - if err := tearDownOldEtcd(ctx, host, c.SystemImages.Alpine, c.PrivateRegistriesMap); err != nil { - return err - } + if isEqual := c.etcdSnapshotChecksum(ctx, snapshotPath); !isEqual { + return fmt.Errorf("etcd snapshots are not consistent") } // Start restore process on all etcd hosts initCluster := services.GetEtcdInitialCluster(c.EtcdHosts) @@ -34,30 +28,25 @@ func (c *Cluster) RestoreEtcdSnapshot(ctx context.Context, snapshotPath string) return fmt.Errorf("[etcd] Failed to restore etcd snapshot: %v", err) } } - // Deploy Etcd Plane - etcdNodePlanMap := make(map[string]v3.RKEConfigNodePlan) - // Build etcd node plan map - for _, etcdHost := range c.EtcdHosts { - etcdNodePlanMap[etcdHost.Address] = BuildRKEConfigNodePlan(ctx, c, etcdHost, etcdHost.DockerInfo) - } - etcdRollingSnapshots := services.EtcdSnapshot{ - Snapshot: c.Services.Etcd.Snapshot, - Creation: c.Services.Etcd.Creation, - Retention: c.Services.Etcd.Retention, - } - if err := services.RunEtcdPlane(ctx, c.EtcdHosts, etcdNodePlanMap, c.LocalConnDialerFactory, c.PrivateRegistriesMap, c.UpdateWorkersOnly, c.SystemImages.Alpine, etcdRollingSnapshots); err != nil { - return fmt.Errorf("[etcd] Failed to bring up Etcd Plane: %v", err) - } return nil } -func tearDownOldEtcd(ctx context.Context, host *hosts.Host, cleanupImage string, prsMap map[string]v3.PrivateRegistry) error { - if err := docker.DoRemoveContainer(ctx, host.DClient, services.EtcdContainerName, host.Address); err != nil { - return fmt.Errorf("[etcd] Failed to stop old etcd containers: %v", err) +func (c *Cluster) etcdSnapshotChecksum(ctx context.Context, snapshotPath string) bool { + log.Infof(ctx, "[etcd] Checking if all snapshots are identical") + etcdChecksums := []string{} + for _, etcdHost := range c.EtcdHosts { + checksum, err := services.GetEtcdSnapshotChecksum(ctx, etcdHost, c.PrivateRegistriesMap, c.SystemImages.Alpine, snapshotPath) + if err != nil { + return false + } + etcdChecksums = append(etcdChecksums, checksum) + log.Infof(ctx, "[etcd] Checksum of etcd snapshot on host [%s] is [%s]", etcdHost.Address, checksum) } - // cleanup etcd data directory - toCleanPaths := []string{ - path.Join(host.PrefixPath, hosts.ToCleanEtcdDir), + hostChecksum := etcdChecksums[0] + for _, checksum := range etcdChecksums { + if checksum != hostChecksum { + return false + } } - return host.CleanUp(ctx, toCleanPaths, cleanupImage, prsMap) + return true } diff --git a/cluster/network.go b/cluster/network.go index f68e9a47..17e27fb2 100644 --- a/cluster/network.go +++ b/cluster/network.go @@ -480,7 +480,7 @@ func checkPlaneTCPPortsFromHost(ctx context.Context, host *hosts.Host, portList return err } - containerLog, logsErr := docker.GetContainerLogsStdoutStderr(ctx, host.DClient, PortCheckContainer, "all", true) + containerLog, _, logsErr := docker.GetContainerLogsStdoutStderr(ctx, host.DClient, PortCheckContainer, "all", true) if logsErr != nil { log.Warnf(ctx, "[network] Failed to get network port check logs: %v", logsErr) } diff --git a/cluster/remove.go b/cluster/remove.go index b98349db..9afcaf10 100644 --- a/cluster/remove.go +++ b/cluster/remove.go @@ -4,6 +4,8 @@ import ( "context" "github.com/rancher/rke/hosts" + "github.com/rancher/rke/k8s" + "github.com/rancher/rke/log" "github.com/rancher/rke/pki" "github.com/rancher/rke/services" "github.com/rancher/rke/util" @@ -12,33 +14,10 @@ import ( ) func (c *Cluster) ClusterRemove(ctx context.Context) error { - externalEtcd := false - if len(c.Services.Etcd.ExternalURLs) > 0 { - externalEtcd = true - } - // Remove Worker Plane - if err := services.RemoveWorkerPlane(ctx, c.WorkerHosts, true); err != nil { + if err := c.CleanupNodes(ctx); err != nil { return err } - // Remove Contol Plane - if err := services.RemoveControlPlane(ctx, c.ControlPlaneHosts, true); err != nil { - return err - } - - // Remove Etcd Plane - if !externalEtcd { - if err := services.RemoveEtcdPlane(ctx, c.EtcdHosts, true); err != nil { - return err - } - } - - // Clean up all hosts - if err := cleanUpHosts(ctx, c.ControlPlaneHosts, c.WorkerHosts, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap, externalEtcd); err != nil { - return err - } - - pki.RemoveAdminConfig(ctx, c.LocalKubeConfigPath) - removeStateFile(ctx, c.StateFilePath) + c.CleanupFiles(ctx) return nil } @@ -63,3 +42,56 @@ func cleanUpHosts(ctx context.Context, cpHosts, workerHosts, etcdHosts []*hosts. return errgrp.Wait() } + +func (c *Cluster) CleanupNodes(ctx context.Context) error { + externalEtcd := false + if len(c.Services.Etcd.ExternalURLs) > 0 { + externalEtcd = true + } + // Remove Worker Plane + if err := services.RemoveWorkerPlane(ctx, c.WorkerHosts, true); err != nil { + return err + } + // Remove Contol Plane + if err := services.RemoveControlPlane(ctx, c.ControlPlaneHosts, true); err != nil { + return err + } + + // Remove Etcd Plane + if !externalEtcd { + if err := services.RemoveEtcdPlane(ctx, c.EtcdHosts, true); err != nil { + return err + } + } + + // Clean up all hosts + return cleanUpHosts(ctx, c.ControlPlaneHosts, c.WorkerHosts, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap, externalEtcd) +} + +func (c *Cluster) CleanupFiles(ctx context.Context) error { + pki.RemoveAdminConfig(ctx, c.LocalKubeConfigPath) + removeStateFile(ctx, c.StateFilePath) + return nil +} + +func (c *Cluster) RemoveOldNodes(ctx context.Context) error { + kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport) + if err != nil { + return err + } + nodeList, err := k8s.GetNodeList(kubeClient) + if err != nil { + return err + } + uniqueHosts := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts) + for _, node := range nodeList.Items { + host := &hosts.Host{} + host.HostnameOverride = node.Name + if !hosts.IsNodeInList(host, uniqueHosts) { + if err := k8s.DeleteNode(kubeClient, node.Name, c.CloudProvider.Name); err != nil { + log.Warnf(ctx, "Failed to delete old node [%s] from kubernetes") + } + } + } + return nil +} diff --git a/cmd/etcd.go b/cmd/etcd.go index 8e74f0d1..61c82578 100644 --- a/cmd/etcd.go +++ b/cmd/etcd.go @@ -83,7 +83,17 @@ func RestoreEtcdSnapshot( dialersOptions hosts.DialersOptions, flags cluster.ExternalFlags, snapshotName string) error { - log.Infof(ctx, "Starting restoring snapshot on etcd hosts") + log.Infof(ctx, "Restoring etcd snapshot %s", snapshotName) + stateFilePath := cluster.GetStateFilePath(flags.ClusterFilePath, flags.ConfigDir) + rkeFullState, err := cluster.ReadStateFile(ctx, stateFilePath) + if err != nil { + return err + } + + rkeFullState.CurrentState = cluster.State{} + if err := rkeFullState.WriteStateFile(ctx, stateFilePath); err != nil { + return err + } kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, flags) if err != nil { return err @@ -91,15 +101,30 @@ func RestoreEtcdSnapshot( if err := kubeCluster.SetupDialers(ctx, dialersOptions); err != nil { return err } - if err := kubeCluster.TunnelHosts(ctx, flags); err != nil { return err } + log.Infof(ctx, "Cleaning old kubernetes cluster") + if err := kubeCluster.CleanupNodes(ctx); err != nil { + return err + } if err := kubeCluster.RestoreEtcdSnapshot(ctx, snapshotName); err != nil { return err } + if err := ClusterInit(ctx, rkeConfig, dialersOptions, flags); err != nil { + return err + } + if _, _, _, _, _, err := ClusterUp(ctx, dialersOptions, flags); err != nil { + return err + } + if err := cluster.RestartClusterPods(ctx, kubeCluster); err != nil { + return nil + } + if err := kubeCluster.RemoveOldNodes(ctx); err != nil { + return err + } log.Infof(ctx, "Finished restoring snapshot [%s] on all etcd hosts", snapshotName) return nil } diff --git a/docker/docker.go b/docker/docker.go index 7d39cc26..850bfc86 100644 --- a/docker/docker.go +++ b/docker/docker.go @@ -388,19 +388,20 @@ func ReadContainerLogs(ctx context.Context, dClient *client.Client, containerNam return dClient.ContainerLogs(ctx, containerName, types.ContainerLogsOptions{Follow: follow, ShowStdout: true, ShowStderr: true, Timestamps: false, Tail: tail}) } -func GetContainerLogsStdoutStderr(ctx context.Context, dClient *client.Client, containerName, tail string, follow bool) (string, error) { +func GetContainerLogsStdoutStderr(ctx context.Context, dClient *client.Client, containerName, tail string, follow bool) (string, string, error) { var containerStderr bytes.Buffer var containerStdout bytes.Buffer - var containerLog string + var containerErrLog, containerStdLog string clogs, logserr := ReadContainerLogs(ctx, dClient, containerName, follow, tail) if logserr != nil { logrus.Debugf("logserr: %v", logserr) - return containerLog, fmt.Errorf("Failed to get gather logs from container [%s]: %v", containerName, logserr) + return containerErrLog, containerStdLog, fmt.Errorf("Failed to get gather logs from container [%s]: %v", containerName, logserr) } defer clogs.Close() stdcopy.StdCopy(&containerStdout, &containerStderr, clogs) - containerLog = containerStderr.String() - return containerLog, nil + containerErrLog = containerStderr.String() + containerStdLog = containerStdout.String() + return containerErrLog, containerStdLog, nil } func tryRegistryAuth(pr v3.PrivateRegistry) types.RequestPrivilegeFunc { diff --git a/pki/pki.go b/pki/pki.go index 12c8ccd5..55ade785 100644 --- a/pki/pki.go +++ b/pki/pki.go @@ -177,7 +177,7 @@ func ExtractBackupBundleOnHost(ctx context.Context, host *hosts.Host, alpineSyst return err } if status != 0 { - containerLog, err := docker.GetContainerLogsStdoutStderr(ctx, host.DClient, BundleCertContainer, "5", false) + containerErrLog, _, err := docker.GetContainerLogsStdoutStderr(ctx, host.DClient, BundleCertContainer, "5", false) if err != nil { return err } @@ -185,7 +185,7 @@ func ExtractBackupBundleOnHost(ctx context.Context, host *hosts.Host, alpineSyst if err := docker.RemoveContainer(ctx, host.DClient, host.Address, BundleCertContainer); err != nil { return err } - return fmt.Errorf("Failed to run certificate bundle extract, exit status is: %d, container logs: %s", status, containerLog) + return fmt.Errorf("Failed to run certificate bundle extract, exit status is: %d, container logs: %s", status, containerErrLog) } log.Infof(ctx, "[certificates] successfully extracted certificate bundle on host [%s] to backup path [%s]", host.Address, TempCertPath) return docker.RemoveContainer(ctx, host.DClient, host.Address, BundleCertContainer) diff --git a/services/etcd.go b/services/etcd.go index eab11c0d..d97e082d 100644 --- a/services/etcd.go +++ b/services/etcd.go @@ -361,7 +361,7 @@ func RestoreEtcdSnapshot(ctx context.Context, etcdHost *hosts.Host, prsMap map[s return err } if status != 0 { - containerLog, err := docker.GetContainerLogsStdoutStderr(ctx, etcdHost.DClient, EtcdRestoreContainerName, "5", false) + containerLog, _, err := docker.GetContainerLogsStdoutStderr(ctx, etcdHost.DClient, EtcdRestoreContainerName, "5", false) if err != nil { return err } @@ -373,3 +373,37 @@ func RestoreEtcdSnapshot(ctx context.Context, etcdHost *hosts.Host, prsMap map[s } return docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdRestoreContainerName) } + +func GetEtcdSnapshotChecksum(ctx context.Context, etcdHost *hosts.Host, prsMap map[string]v3.PrivateRegistry, alpineImage, snapshotName string) (string, error) { + var checksum string + var err error + + snapshotPath := fmt.Sprintf("%s%s", EtcdSnapshotPath, snapshotName) + imageCfg := &container.Config{ + Cmd: []string{ + "sh", "-c", strings.Join([]string{ + "md5sum", snapshotPath, + "|", "cut", "-f1", "-d' '", "|", "tr", "-d", "'\n'"}, " "), + }, + Image: alpineImage, + } + hostCfg := &container.HostConfig{ + Binds: []string{ + "/opt/rke/:/opt/rke/:z", + }} + + if err := docker.DoRunContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdChecksumContainerName, etcdHost.Address, ETCDRole, prsMap); err != nil { + return checksum, err + } + if _, err := docker.WaitForContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdChecksumContainerName); err != nil { + return checksum, err + } + _, checksum, err = docker.GetContainerLogsStdoutStderr(ctx, etcdHost.DClient, EtcdChecksumContainerName, "1", false) + if err != nil { + return checksum, err + } + if err := docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdChecksumContainerName); err != nil { + return checksum, err + } + return checksum, nil +} diff --git a/services/healthcheck.go b/services/healthcheck.go index 065ef79c..257169d5 100644 --- a/services/healthcheck.go +++ b/services/healthcheck.go @@ -63,7 +63,7 @@ func runHealthcheck(ctx context.Context, host *hosts.Host, serviceName string, l return nil } logrus.Debug("Checking container logs") - containerLog, logserr := docker.GetContainerLogsStdoutStderr(ctx, host.DClient, serviceName, "1", false) + containerLog, _, logserr := docker.GetContainerLogsStdoutStderr(ctx, host.DClient, serviceName, "1", false) containerLog = strings.TrimSuffix(containerLog, "\n") if logserr != nil { return fmt.Errorf("Failed to verify healthcheck for service [%s]: %v", serviceName, logserr) diff --git a/services/services.go b/services/services.go index 589176f4..4fbf9c46 100644 --- a/services/services.go +++ b/services/services.go @@ -31,6 +31,7 @@ const ( EtcdSnapshotContainerName = "etcd-rolling-snapshots" EtcdSnapshotOnceContainerName = "etcd-snapshot-once" EtcdRestoreContainerName = "etcd-restore" + EtcdChecksumContainerName = "etcd-checksum-checker" NginxProxyContainerName = "nginx-proxy" SidekickContainerName = "service-sidekick" LogLinkContainerName = "rke-log-linker"