From 6da35256a8862b303acaf317d699335f09655291 Mon Sep 17 00:00:00 2001 From: moelsayed Date: Wed, 7 Nov 2018 02:24:49 +0200 Subject: [PATCH] handle upgrade cases backup state to kubernetes --- cluster/certificates.go | 15 +++++++++-- cluster/cluster.go | 5 ++++ cluster/state.go | 47 +++++++++++++++++++++++++++++--- cmd/up.go | 60 ++++++++++++++++++++++++++++++++++++++--- k8s/configmap.go | 4 +++ 5 files changed, 122 insertions(+), 9 deletions(-) diff --git a/cluster/certificates.go b/cluster/certificates.go index 4902ebb3..4a067f04 100644 --- a/cluster/certificates.go +++ b/cluster/certificates.go @@ -41,8 +41,13 @@ func regenerateAPICertificate(c *Cluster, certificates map[string]pki.Certificat return certificates, nil } -func getClusterCerts(ctx context.Context, kubeClient *kubernetes.Clientset, etcdHosts []*hosts.Host) (map[string]pki.CertificatePKI, error) { +func GetClusterCertsFromKubernetes(ctx context.Context, localConfigPath string, k8sWrapTransport k8s.WrapTransport, etcdHosts []*hosts.Host) (map[string]pki.CertificatePKI, error) { log.Infof(ctx, "[certificates] Getting Cluster certificates from Kubernetes") + + k8sClient, err := k8s.NewClient(localConfigPath, k8sWrapTransport) + if err != nil { + return nil, fmt.Errorf("Failed to create Kubernetes Client: %v", err) + } certificatesNames := []string{ pki.CACertName, pki.KubeAPICertName, @@ -63,7 +68,7 @@ func getClusterCerts(ctx context.Context, kubeClient *kubernetes.Clientset, etcd certMap := make(map[string]pki.CertificatePKI) for _, certName := range certificatesNames { - secret, err := k8s.GetSecret(kubeClient, certName) + secret, err := k8s.GetSecret(k8sClient, certName) if err != nil && !strings.HasPrefix(certName, "kube-etcd") && !strings.Contains(certName, pki.RequestHeaderCACertName) && !strings.Contains(certName, pki.APIProxyClientCertName) && @@ -104,6 +109,12 @@ func getClusterCerts(ctx context.Context, kubeClient *kubernetes.Clientset, etcd ConfigPath: string(secret.Data["ConfigPath"]), } } + // Handle service account token key issue + kubeAPICert := certMap[pki.KubeAPICertName] + if certMap[pki.ServiceAccountTokenKeyName].Key == nil { + log.Infof(ctx, "[certificates] Creating service account token key") + certMap[pki.ServiceAccountTokenKeyName] = pki.ToCertObject(pki.ServiceAccountTokenKeyName, pki.ServiceAccountTokenKeyName, "", kubeAPICert.Certificate, kubeAPICert.Key) + } log.Infof(ctx, "[certificates] Successfully fetched Cluster certificates from Kubernetes") return certMap, nil } diff --git a/cluster/cluster.go b/cluster/cluster.go index 73407485..bbeffcb6 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -55,6 +55,7 @@ type Cluster struct { const ( X509AuthenticationProvider = "x509" StateConfigMapName = "cluster-state" + FullStateConfigMapName = "full-cluster-state" UpdateStateTimeout = 30 GetStateTimeout = 30 KubernetesClientTimeOut = 30 @@ -204,6 +205,10 @@ func (c *Cluster) SetupDialers(ctx context.Context, dockerDialerFactory, return nil } +func RebuildKubeconfig(ctx context.Context, kubeCluster *Cluster) error { + return rebuildLocalAdminConfig(ctx, kubeCluster) +} + func rebuildLocalAdminConfig(ctx context.Context, kubeCluster *Cluster) error { if len(kubeCluster.ControlPlaneHosts) == 0 { return nil diff --git a/cluster/state.go b/cluster/state.go index b917f522..c0226432 100644 --- a/cluster/state.go +++ b/cluster/state.go @@ -161,6 +161,36 @@ func (c *Cluster) GetClusterState(ctx context.Context, fullState *RKEFullState, return currentCluster, nil } +func SaveFullStateToKubernetes(ctx context.Context, localConfigPath string, k8sWrapTransport k8s.WrapTransport, fullState *RKEFullState) error { + k8sClient, err := k8s.NewClient(localConfigPath, k8sWrapTransport) + if err != nil { + return fmt.Errorf("Failed to create Kubernetes Client: %v", err) + } + log.Infof(ctx, "[state] Saving full cluster state to Kubernetes") + stateFile, err := json.Marshal(*fullState) + if err != nil { + return err + } + timeout := make(chan bool, 1) + go func() { + for { + _, err := k8s.UpdateConfigMap(k8sClient, stateFile, FullStateConfigMapName) + if err != nil { + time.Sleep(time.Second * 5) + continue + } + log.Infof(ctx, "[state] Successfully Saved full cluster state to Kubernetes ConfigMap: %s", StateConfigMapName) + timeout <- true + break + } + }() + select { + case <-timeout: + return nil + case <-time.After(time.Second * UpdateStateTimeout): + return fmt.Errorf("[state] Timeout waiting for kubernetes to be ready") + } +} func saveStateToKubernetes(ctx context.Context, kubeClient *kubernetes.Clientset, kubeConfigPath string, rkeConfig *v3.RancherKubernetesEngineConfig) error { log.Infof(ctx, "[state] Saving cluster state to Kubernetes") @@ -216,15 +246,18 @@ func saveStateToNodes(ctx context.Context, uniqueHosts []*hosts.Host, clusterSta return nil } -func getStateFromKubernetes(ctx context.Context, kubeClient *kubernetes.Clientset, kubeConfigPath string) (*Cluster, error) { +func GetStateFromKubernetes(ctx context.Context, kubeConfigPath string, k8sWrapTransport k8s.WrapTransport) (*Cluster, error) { log.Infof(ctx, "[state] Fetching cluster state from Kubernetes") + k8sClient, err := k8s.NewClient(kubeConfigPath, k8sWrapTransport) + if err != nil { + return nil, fmt.Errorf("Failed to create Kubernetes Client: %v", err) + } var cfgMap *v1.ConfigMap var currentCluster Cluster - var err error timeout := make(chan bool, 1) go func() { for { - cfgMap, err = k8s.GetConfigMap(kubeClient, StateConfigMapName) + cfgMap, err = k8s.GetConfigMap(k8sClient, StateConfigMapName) if err != nil { time.Sleep(time.Second * 5) continue @@ -382,3 +415,11 @@ func RemoveStateFile(ctx context.Context, statePath string) { } log.Infof(ctx, "State file removed successfully") } + +func RemoveLegacyStateFromKubernets(ctx context.Context, kubeConfigPath string, k8sWrapTransport k8s.WrapTransport) error { + k8sClient, err := k8s.NewClient(kubeConfigPath, k8sWrapTransport) + if err != nil { + return fmt.Errorf("Failed to create Kubernetes Client: %v", err) + } + return k8s.DeleteConfigMap(k8sClient, StateConfigMapName) +} diff --git a/cmd/up.go b/cmd/up.go index 8bf5e6f9..69eafdd2 100644 --- a/cmd/up.go +++ b/cmd/up.go @@ -3,6 +3,7 @@ package cmd import ( "context" "fmt" + "os" "strings" "time" @@ -13,6 +14,7 @@ import ( "github.com/rancher/rke/log" "github.com/rancher/rke/pki" "github.com/rancher/types/apis/management.cattle.io/v3" + "github.com/sirupsen/logrus" "github.com/urfave/cli" "k8s.io/client-go/util/cert" ) @@ -64,21 +66,66 @@ func UpCommand() cli.Command { Flags: upFlags, } } +func doUpgradeLegacyCluster(ctx context.Context, kubeCluster *cluster.Cluster, fullState *cluster.RKEFullState, k8sWrapTransport k8s.WrapTransport) error { + if _, err := os.Stat(kubeCluster.LocalKubeConfigPath); os.IsNotExist(err) { + // there is no kubeconfig. This is a new cluster + logrus.Debug("[state] local kubeconfig not found, this is a new cluster") + return nil + } + if fullState.CurrentState.RancherKubernetesEngineConfig != nil { + // this cluster has a previous state, I don't need to upgrade! + logrus.Debug("[state] previous state found, this is not a legacy cluster") + return nil + } + // We have a kubeconfig and no current state. This is a legacy cluster or a new cluster with old kubeconfig + // let's try to upgrade + log.Infof(ctx, "[state] Possible legacy cluster detected, trying to upgrade") + if err := cluster.RebuildKubeconfig(ctx, kubeCluster); err != nil { + return err + } + recoveredCluster, err := cluster.GetStateFromKubernetes(ctx, kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport) + if err != nil { + return err + } + // if we found a recovered cluster, we will need override the current state + if recoveredCluster != nil { + recoveredCerts, err := cluster.GetClusterCertsFromKubernetes(ctx, kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport, kubeCluster.EtcdHosts) + if err != nil { + return err + } + fullState.CurrentState.RancherKubernetesEngineConfig = recoveredCluster.RancherKubernetesEngineConfig.DeepCopy() + fullState.CurrentState.CertificatesBundle = cluster.TransformCertsToV3Certs(recoveredCerts) -func ClusterInit(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, configDir string) error { + // we don't want to regenerate certificates + fullState.DesiredState.CertificatesBundle = cluster.TransformCertsToV3Certs(recoveredCerts) + if err = fullState.WriteStateFile(ctx, kubeCluster.StateFilePath); err != nil { + return err + } + return cluster.RemoveLegacyStateFromKubernets(ctx, kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport) + } + + return nil +} + +func ClusterInit(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, configDir string, k8sWrapTransport k8s.WrapTransport) error { log.Infof(ctx, "Initiating Kubernetes cluster") stateFilePath := cluster.GetStateFilePath(clusterFilePath, configDir) rkeFullState, _ := cluster.ReadStateFile(ctx, stateFilePath) - kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, clusterFilePath, configDir) if err != nil { return err } + err = doUpgradeLegacyCluster(ctx, kubeCluster, rkeFullState, k8sWrapTransport) + if err != nil { + log.Warnf(ctx, "[state] can't fetch legacy cluster state from Kubernetes") + } + fullState, err := cluster.RebuildState(ctx, &kubeCluster.RancherKubernetesEngineConfig, rkeFullState, clusterFilePath, configDir) if err != nil { return err } + rkeState := cluster.RKEFullState{ DesiredState: fullState.DesiredState, CurrentState: fullState.CurrentState, @@ -176,6 +223,11 @@ func ClusterUp( return APIURL, caCrt, clientCert, clientKey, nil, err } + err = cluster.SaveFullStateToKubernetes(ctx, kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport, clusterState) + if err != nil { + return APIURL, caCrt, clientCert, clientKey, nil, err + } + err = kubeCluster.DeployWorkerPlane(ctx) if err != nil { return APIURL, caCrt, clientCert, clientKey, nil, err @@ -241,9 +293,9 @@ func clusterUpFromCli(ctx *cli.Context) error { updateOnly := ctx.Bool("update-only") disablePortCheck := ctx.Bool("disable-port-check") if ctx.Bool("init") { - return ClusterInit(context.Background(), rkeConfig, "") + return ClusterInit(context.Background(), rkeConfig, "", nil) } - if err := ClusterInit(context.Background(), rkeConfig, ""); err != nil { + if err := ClusterInit(context.Background(), rkeConfig, "", nil); err != nil { return err } _, _, _, _, _, err = ClusterUp(context.Background(), rkeConfig, nil, nil, nil, false, "", updateOnly, disablePortCheck) diff --git a/k8s/configmap.go b/k8s/configmap.go index f513274e..df7be147 100644 --- a/k8s/configmap.go +++ b/k8s/configmap.go @@ -44,3 +44,7 @@ func UpdateConfigMap(k8sClient *kubernetes.Clientset, configYaml []byte, configM func GetConfigMap(k8sClient *kubernetes.Clientset, configMapName string) (*v1.ConfigMap, error) { return k8sClient.CoreV1().ConfigMaps(metav1.NamespaceSystem).Get(configMapName, metav1.GetOptions{}) } + +func DeleteConfigMap(k8sClient *kubernetes.Clientset, configMapName string) error { + return k8sClient.CoreV1().ConfigMaps(metav1.NamespaceSystem).Delete(configMapName, &metav1.DeleteOptions{}) +}