mirror of
https://github.com/rancher/rke.git
synced 2025-08-23 08:58:30 +00:00
handle upgrade cases
backup state to kubernetes
This commit is contained in:
parent
8b8870311b
commit
6da35256a8
@ -41,8 +41,13 @@ func regenerateAPICertificate(c *Cluster, certificates map[string]pki.Certificat
|
|||||||
return certificates, nil
|
return certificates, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getClusterCerts(ctx context.Context, kubeClient *kubernetes.Clientset, etcdHosts []*hosts.Host) (map[string]pki.CertificatePKI, error) {
|
func GetClusterCertsFromKubernetes(ctx context.Context, localConfigPath string, k8sWrapTransport k8s.WrapTransport, etcdHosts []*hosts.Host) (map[string]pki.CertificatePKI, error) {
|
||||||
log.Infof(ctx, "[certificates] Getting Cluster certificates from Kubernetes")
|
log.Infof(ctx, "[certificates] Getting Cluster certificates from Kubernetes")
|
||||||
|
|
||||||
|
k8sClient, err := k8s.NewClient(localConfigPath, k8sWrapTransport)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Failed to create Kubernetes Client: %v", err)
|
||||||
|
}
|
||||||
certificatesNames := []string{
|
certificatesNames := []string{
|
||||||
pki.CACertName,
|
pki.CACertName,
|
||||||
pki.KubeAPICertName,
|
pki.KubeAPICertName,
|
||||||
@ -63,7 +68,7 @@ func getClusterCerts(ctx context.Context, kubeClient *kubernetes.Clientset, etcd
|
|||||||
|
|
||||||
certMap := make(map[string]pki.CertificatePKI)
|
certMap := make(map[string]pki.CertificatePKI)
|
||||||
for _, certName := range certificatesNames {
|
for _, certName := range certificatesNames {
|
||||||
secret, err := k8s.GetSecret(kubeClient, certName)
|
secret, err := k8s.GetSecret(k8sClient, certName)
|
||||||
if err != nil && !strings.HasPrefix(certName, "kube-etcd") &&
|
if err != nil && !strings.HasPrefix(certName, "kube-etcd") &&
|
||||||
!strings.Contains(certName, pki.RequestHeaderCACertName) &&
|
!strings.Contains(certName, pki.RequestHeaderCACertName) &&
|
||||||
!strings.Contains(certName, pki.APIProxyClientCertName) &&
|
!strings.Contains(certName, pki.APIProxyClientCertName) &&
|
||||||
@ -104,6 +109,12 @@ func getClusterCerts(ctx context.Context, kubeClient *kubernetes.Clientset, etcd
|
|||||||
ConfigPath: string(secret.Data["ConfigPath"]),
|
ConfigPath: string(secret.Data["ConfigPath"]),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Handle service account token key issue
|
||||||
|
kubeAPICert := certMap[pki.KubeAPICertName]
|
||||||
|
if certMap[pki.ServiceAccountTokenKeyName].Key == nil {
|
||||||
|
log.Infof(ctx, "[certificates] Creating service account token key")
|
||||||
|
certMap[pki.ServiceAccountTokenKeyName] = pki.ToCertObject(pki.ServiceAccountTokenKeyName, pki.ServiceAccountTokenKeyName, "", kubeAPICert.Certificate, kubeAPICert.Key)
|
||||||
|
}
|
||||||
log.Infof(ctx, "[certificates] Successfully fetched Cluster certificates from Kubernetes")
|
log.Infof(ctx, "[certificates] Successfully fetched Cluster certificates from Kubernetes")
|
||||||
return certMap, nil
|
return certMap, nil
|
||||||
}
|
}
|
||||||
|
@ -55,6 +55,7 @@ type Cluster struct {
|
|||||||
const (
|
const (
|
||||||
X509AuthenticationProvider = "x509"
|
X509AuthenticationProvider = "x509"
|
||||||
StateConfigMapName = "cluster-state"
|
StateConfigMapName = "cluster-state"
|
||||||
|
FullStateConfigMapName = "full-cluster-state"
|
||||||
UpdateStateTimeout = 30
|
UpdateStateTimeout = 30
|
||||||
GetStateTimeout = 30
|
GetStateTimeout = 30
|
||||||
KubernetesClientTimeOut = 30
|
KubernetesClientTimeOut = 30
|
||||||
@ -204,6 +205,10 @@ func (c *Cluster) SetupDialers(ctx context.Context, dockerDialerFactory,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func RebuildKubeconfig(ctx context.Context, kubeCluster *Cluster) error {
|
||||||
|
return rebuildLocalAdminConfig(ctx, kubeCluster)
|
||||||
|
}
|
||||||
|
|
||||||
func rebuildLocalAdminConfig(ctx context.Context, kubeCluster *Cluster) error {
|
func rebuildLocalAdminConfig(ctx context.Context, kubeCluster *Cluster) error {
|
||||||
if len(kubeCluster.ControlPlaneHosts) == 0 {
|
if len(kubeCluster.ControlPlaneHosts) == 0 {
|
||||||
return nil
|
return nil
|
||||||
|
@ -161,6 +161,36 @@ func (c *Cluster) GetClusterState(ctx context.Context, fullState *RKEFullState,
|
|||||||
|
|
||||||
return currentCluster, nil
|
return currentCluster, nil
|
||||||
}
|
}
|
||||||
|
func SaveFullStateToKubernetes(ctx context.Context, localConfigPath string, k8sWrapTransport k8s.WrapTransport, fullState *RKEFullState) error {
|
||||||
|
k8sClient, err := k8s.NewClient(localConfigPath, k8sWrapTransport)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to create Kubernetes Client: %v", err)
|
||||||
|
}
|
||||||
|
log.Infof(ctx, "[state] Saving full cluster state to Kubernetes")
|
||||||
|
stateFile, err := json.Marshal(*fullState)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
timeout := make(chan bool, 1)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
_, err := k8s.UpdateConfigMap(k8sClient, stateFile, FullStateConfigMapName)
|
||||||
|
if err != nil {
|
||||||
|
time.Sleep(time.Second * 5)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.Infof(ctx, "[state] Successfully Saved full cluster state to Kubernetes ConfigMap: %s", StateConfigMapName)
|
||||||
|
timeout <- true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-timeout:
|
||||||
|
return nil
|
||||||
|
case <-time.After(time.Second * UpdateStateTimeout):
|
||||||
|
return fmt.Errorf("[state] Timeout waiting for kubernetes to be ready")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func saveStateToKubernetes(ctx context.Context, kubeClient *kubernetes.Clientset, kubeConfigPath string, rkeConfig *v3.RancherKubernetesEngineConfig) error {
|
func saveStateToKubernetes(ctx context.Context, kubeClient *kubernetes.Clientset, kubeConfigPath string, rkeConfig *v3.RancherKubernetesEngineConfig) error {
|
||||||
log.Infof(ctx, "[state] Saving cluster state to Kubernetes")
|
log.Infof(ctx, "[state] Saving cluster state to Kubernetes")
|
||||||
@ -216,15 +246,18 @@ func saveStateToNodes(ctx context.Context, uniqueHosts []*hosts.Host, clusterSta
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getStateFromKubernetes(ctx context.Context, kubeClient *kubernetes.Clientset, kubeConfigPath string) (*Cluster, error) {
|
func GetStateFromKubernetes(ctx context.Context, kubeConfigPath string, k8sWrapTransport k8s.WrapTransport) (*Cluster, error) {
|
||||||
log.Infof(ctx, "[state] Fetching cluster state from Kubernetes")
|
log.Infof(ctx, "[state] Fetching cluster state from Kubernetes")
|
||||||
|
k8sClient, err := k8s.NewClient(kubeConfigPath, k8sWrapTransport)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Failed to create Kubernetes Client: %v", err)
|
||||||
|
}
|
||||||
var cfgMap *v1.ConfigMap
|
var cfgMap *v1.ConfigMap
|
||||||
var currentCluster Cluster
|
var currentCluster Cluster
|
||||||
var err error
|
|
||||||
timeout := make(chan bool, 1)
|
timeout := make(chan bool, 1)
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
cfgMap, err = k8s.GetConfigMap(kubeClient, StateConfigMapName)
|
cfgMap, err = k8s.GetConfigMap(k8sClient, StateConfigMapName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
time.Sleep(time.Second * 5)
|
time.Sleep(time.Second * 5)
|
||||||
continue
|
continue
|
||||||
@ -382,3 +415,11 @@ func RemoveStateFile(ctx context.Context, statePath string) {
|
|||||||
}
|
}
|
||||||
log.Infof(ctx, "State file removed successfully")
|
log.Infof(ctx, "State file removed successfully")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func RemoveLegacyStateFromKubernets(ctx context.Context, kubeConfigPath string, k8sWrapTransport k8s.WrapTransport) error {
|
||||||
|
k8sClient, err := k8s.NewClient(kubeConfigPath, k8sWrapTransport)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to create Kubernetes Client: %v", err)
|
||||||
|
}
|
||||||
|
return k8s.DeleteConfigMap(k8sClient, StateConfigMapName)
|
||||||
|
}
|
||||||
|
60
cmd/up.go
60
cmd/up.go
@ -3,6 +3,7 @@ package cmd
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -13,6 +14,7 @@ import (
|
|||||||
"github.com/rancher/rke/log"
|
"github.com/rancher/rke/log"
|
||||||
"github.com/rancher/rke/pki"
|
"github.com/rancher/rke/pki"
|
||||||
"github.com/rancher/types/apis/management.cattle.io/v3"
|
"github.com/rancher/types/apis/management.cattle.io/v3"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/urfave/cli"
|
"github.com/urfave/cli"
|
||||||
"k8s.io/client-go/util/cert"
|
"k8s.io/client-go/util/cert"
|
||||||
)
|
)
|
||||||
@ -64,21 +66,66 @@ func UpCommand() cli.Command {
|
|||||||
Flags: upFlags,
|
Flags: upFlags,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
func doUpgradeLegacyCluster(ctx context.Context, kubeCluster *cluster.Cluster, fullState *cluster.RKEFullState, k8sWrapTransport k8s.WrapTransport) error {
|
||||||
|
if _, err := os.Stat(kubeCluster.LocalKubeConfigPath); os.IsNotExist(err) {
|
||||||
|
// there is no kubeconfig. This is a new cluster
|
||||||
|
logrus.Debug("[state] local kubeconfig not found, this is a new cluster")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if fullState.CurrentState.RancherKubernetesEngineConfig != nil {
|
||||||
|
// this cluster has a previous state, I don't need to upgrade!
|
||||||
|
logrus.Debug("[state] previous state found, this is not a legacy cluster")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// We have a kubeconfig and no current state. This is a legacy cluster or a new cluster with old kubeconfig
|
||||||
|
// let's try to upgrade
|
||||||
|
log.Infof(ctx, "[state] Possible legacy cluster detected, trying to upgrade")
|
||||||
|
if err := cluster.RebuildKubeconfig(ctx, kubeCluster); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
recoveredCluster, err := cluster.GetStateFromKubernetes(ctx, kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// if we found a recovered cluster, we will need override the current state
|
||||||
|
if recoveredCluster != nil {
|
||||||
|
recoveredCerts, err := cluster.GetClusterCertsFromKubernetes(ctx, kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport, kubeCluster.EtcdHosts)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fullState.CurrentState.RancherKubernetesEngineConfig = recoveredCluster.RancherKubernetesEngineConfig.DeepCopy()
|
||||||
|
fullState.CurrentState.CertificatesBundle = cluster.TransformCertsToV3Certs(recoveredCerts)
|
||||||
|
|
||||||
func ClusterInit(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, configDir string) error {
|
// we don't want to regenerate certificates
|
||||||
|
fullState.DesiredState.CertificatesBundle = cluster.TransformCertsToV3Certs(recoveredCerts)
|
||||||
|
if err = fullState.WriteStateFile(ctx, kubeCluster.StateFilePath); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return cluster.RemoveLegacyStateFromKubernets(ctx, kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func ClusterInit(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, configDir string, k8sWrapTransport k8s.WrapTransport) error {
|
||||||
log.Infof(ctx, "Initiating Kubernetes cluster")
|
log.Infof(ctx, "Initiating Kubernetes cluster")
|
||||||
stateFilePath := cluster.GetStateFilePath(clusterFilePath, configDir)
|
stateFilePath := cluster.GetStateFilePath(clusterFilePath, configDir)
|
||||||
rkeFullState, _ := cluster.ReadStateFile(ctx, stateFilePath)
|
rkeFullState, _ := cluster.ReadStateFile(ctx, stateFilePath)
|
||||||
|
|
||||||
kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, clusterFilePath, configDir)
|
kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, clusterFilePath, configDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = doUpgradeLegacyCluster(ctx, kubeCluster, rkeFullState, k8sWrapTransport)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf(ctx, "[state] can't fetch legacy cluster state from Kubernetes")
|
||||||
|
}
|
||||||
|
|
||||||
fullState, err := cluster.RebuildState(ctx, &kubeCluster.RancherKubernetesEngineConfig, rkeFullState, clusterFilePath, configDir)
|
fullState, err := cluster.RebuildState(ctx, &kubeCluster.RancherKubernetesEngineConfig, rkeFullState, clusterFilePath, configDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
rkeState := cluster.RKEFullState{
|
rkeState := cluster.RKEFullState{
|
||||||
DesiredState: fullState.DesiredState,
|
DesiredState: fullState.DesiredState,
|
||||||
CurrentState: fullState.CurrentState,
|
CurrentState: fullState.CurrentState,
|
||||||
@ -176,6 +223,11 @@ func ClusterUp(
|
|||||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = cluster.SaveFullStateToKubernetes(ctx, kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport, clusterState)
|
||||||
|
if err != nil {
|
||||||
|
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
err = kubeCluster.DeployWorkerPlane(ctx)
|
err = kubeCluster.DeployWorkerPlane(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||||
@ -241,9 +293,9 @@ func clusterUpFromCli(ctx *cli.Context) error {
|
|||||||
updateOnly := ctx.Bool("update-only")
|
updateOnly := ctx.Bool("update-only")
|
||||||
disablePortCheck := ctx.Bool("disable-port-check")
|
disablePortCheck := ctx.Bool("disable-port-check")
|
||||||
if ctx.Bool("init") {
|
if ctx.Bool("init") {
|
||||||
return ClusterInit(context.Background(), rkeConfig, "")
|
return ClusterInit(context.Background(), rkeConfig, "", nil)
|
||||||
}
|
}
|
||||||
if err := ClusterInit(context.Background(), rkeConfig, ""); err != nil {
|
if err := ClusterInit(context.Background(), rkeConfig, "", nil); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, _, _, _, _, err = ClusterUp(context.Background(), rkeConfig, nil, nil, nil, false, "", updateOnly, disablePortCheck)
|
_, _, _, _, _, err = ClusterUp(context.Background(), rkeConfig, nil, nil, nil, false, "", updateOnly, disablePortCheck)
|
||||||
|
@ -44,3 +44,7 @@ func UpdateConfigMap(k8sClient *kubernetes.Clientset, configYaml []byte, configM
|
|||||||
func GetConfigMap(k8sClient *kubernetes.Clientset, configMapName string) (*v1.ConfigMap, error) {
|
func GetConfigMap(k8sClient *kubernetes.Clientset, configMapName string) (*v1.ConfigMap, error) {
|
||||||
return k8sClient.CoreV1().ConfigMaps(metav1.NamespaceSystem).Get(configMapName, metav1.GetOptions{})
|
return k8sClient.CoreV1().ConfigMaps(metav1.NamespaceSystem).Get(configMapName, metav1.GetOptions{})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func DeleteConfigMap(k8sClient *kubernetes.Clientset, configMapName string) error {
|
||||||
|
return k8sClient.CoreV1().ConfigMaps(metav1.NamespaceSystem).Delete(configMapName, &metav1.DeleteOptions{})
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user