1
0
mirror of https://github.com/rancher/rke.git synced 2025-08-01 23:33:39 +00:00

Update etcd save/restore to work with new state managemnet

This commit is contained in:
galal-hussein 2018-11-22 02:20:24 +02:00 committed by Alena Prokharchyk
parent 11aa0caabc
commit f3bbd81c52
2 changed files with 3 additions and 132 deletions

View File

@ -5,17 +5,13 @@ import (
"crypto/rsa"
"fmt"
"strings"
"time"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/k8s"
"github.com/rancher/rke/log"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/services"
"github.com/rancher/rke/util"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/cert"
)
@ -129,127 +125,6 @@ func GetClusterCertsFromKubernetes(ctx context.Context, kubeCluster *Cluster) (m
return certMap, nil
}
func saveClusterCerts(ctx context.Context, kubeClient *kubernetes.Clientset, crts map[string]pki.CertificatePKI) error {
log.Infof(ctx, "[certificates] Save kubernetes certificates as secrets")
var errgrp errgroup.Group
for crtName, crt := range crts {
name := crtName
certificate := crt
errgrp.Go(func() error {
return saveCertToKubernetes(kubeClient, name, certificate)
})
}
if err := errgrp.Wait(); err != nil {
return err
}
log.Infof(ctx, "[certificates] Successfully saved certificates as kubernetes secret [%s]", pki.CertificatesSecretName)
return nil
}
func saveCertToKubernetes(kubeClient *kubernetes.Clientset, crtName string, crt pki.CertificatePKI) error {
logrus.Debugf("[certificates] Saving certificate [%s] to kubernetes", crtName)
timeout := make(chan bool, 1)
// build secret Data
secretData := make(map[string][]byte)
if crt.Certificate != nil {
secretData["Certificate"] = cert.EncodeCertPEM(crt.Certificate)
secretData["EnvName"] = []byte(crt.EnvName)
secretData["Path"] = []byte(crt.Path)
}
if crt.Key != nil {
secretData["Key"] = cert.EncodePrivateKeyPEM(crt.Key)
secretData["KeyEnvName"] = []byte(crt.KeyEnvName)
secretData["KeyPath"] = []byte(crt.KeyPath)
}
if len(crt.Config) > 0 {
secretData["ConfigEnvName"] = []byte(crt.ConfigEnvName)
secretData["Config"] = []byte(crt.Config)
secretData["ConfigPath"] = []byte(crt.ConfigPath)
}
go func() {
for {
err := k8s.UpdateSecret(kubeClient, secretData, crtName)
if err != nil {
time.Sleep(time.Second * 5)
continue
}
timeout <- true
break
}
}()
select {
case <-timeout:
return nil
case <-time.After(time.Second * KubernetesClientTimeOut):
return fmt.Errorf("[certificates] Timeout waiting for kubernetes to be ready")
}
}
func deployBackupCertificates(ctx context.Context, backupHosts []*hosts.Host, kubeCluster *Cluster) error {
var errgrp errgroup.Group
hostsQueue := util.GetObjectQueue(backupHosts)
for w := 0; w < WorkerThreads; w++ {
errgrp.Go(func() error {
var errList []error
for host := range hostsQueue {
err := pki.DeployCertificatesOnHost(ctx, host.(*hosts.Host), kubeCluster.Certificates, kubeCluster.SystemImages.CertDownloader, pki.TempCertPath, kubeCluster.PrivateRegistriesMap)
if err != nil {
errList = append(errList, err)
}
}
return util.ErrList(errList)
})
}
return errgrp.Wait()
}
func (c *Cluster) SaveBackupCertificateBundle(ctx context.Context) error {
var errgrp errgroup.Group
hostsQueue := util.GetObjectQueue(c.getBackupHosts())
for w := 0; w < WorkerThreads; w++ {
errgrp.Go(func() error {
var errList []error
for host := range hostsQueue {
err := pki.SaveBackupBundleOnHost(ctx, host.(*hosts.Host), c.SystemImages.Alpine, services.EtcdSnapshotPath, c.PrivateRegistriesMap)
if err != nil {
errList = append(errList, err)
}
}
return util.ErrList(errList)
})
}
return errgrp.Wait()
}
func (c *Cluster) ExtractBackupCertificateBundle(ctx context.Context) error {
backupHosts := c.getBackupHosts()
var errgrp errgroup.Group
errList := []string{}
hostsQueue := util.GetObjectQueue(backupHosts)
for w := 0; w < WorkerThreads; w++ {
errgrp.Go(func() error {
for host := range hostsQueue {
if err := pki.ExtractBackupBundleOnHost(ctx, host.(*hosts.Host), c.SystemImages.Alpine, services.EtcdSnapshotPath, c.PrivateRegistriesMap); err != nil {
errList = append(errList, fmt.Errorf(
"Failed to extract certificate bundle on host [%s], please make sure etcd bundle exist in /opt/rke/etcd-snapshots/pki.bundle.tar.gz: %v", host.(*hosts.Host).Address, err).Error())
}
}
return nil
})
}
errgrp.Wait()
if len(errList) == len(backupHosts) {
return fmt.Errorf(strings.Join(errList, ","))
}
return nil
}
func (c *Cluster) getBackupHosts() []*hosts.Host {
var backupHosts []*hosts.Host
if len(c.Services.Etcd.ExternalURLs) > 0 {

View File

@ -68,11 +68,8 @@ func SnapshotSaveEtcdHosts(
if err := kubeCluster.TunnelHosts(ctx, flags); err != nil {
return err
}
if err := kubeCluster.SnapshotEtcd(ctx, snapshotName); err != nil {
return err
}
if err := kubeCluster.SaveBackupCertificateBundle(ctx); err != nil {
if err := kubeCluster.SnapshotEtcd(ctx, snapshotName); err != nil {
return err
}
@ -98,12 +95,11 @@ func RestoreEtcdSnapshot(
if err := kubeCluster.TunnelHosts(ctx, flags); err != nil {
return err
}
if err := kubeCluster.RestoreEtcdSnapshot(ctx, snapshotName); err != nil {
return err
}
if err := kubeCluster.ExtractBackupCertificateBundle(ctx); err != nil {
return err
}
log.Infof(ctx, "Finished restoring snapshot [%s] on all etcd hosts", snapshotName)
return nil
}