mirror of
https://github.com/rancher/rke.git
synced 2025-09-09 11:01:40 +00:00
refactor the build state
remove extra cert generation for etcd in reconcile fix reconcile and etcd add and remove cluster state with rke remove fix add/remove issues Fix the up command Fix default paths for kubeconfig and rkestate
This commit is contained in:
committed by
Alena Prokharchyk
parent
90fd13db65
commit
8b8870311b
@@ -19,102 +19,14 @@ import (
|
||||
"k8s.io/client-go/util/cert"
|
||||
)
|
||||
|
||||
func NewSetUpAuthentication(ctx context.Context, kubeCluster, currentCluster *Cluster, fullState *RKEFullState) error {
|
||||
func SetUpAuthentication(ctx context.Context, kubeCluster, currentCluster *Cluster, fullState *RKEFullState) error {
|
||||
if kubeCluster.Authentication.Strategy == X509AuthenticationProvider {
|
||||
if currentCluster != nil {
|
||||
kubeCluster.Certificates = currentCluster.Certificates
|
||||
return nil
|
||||
}
|
||||
kubeCluster.Certificates = TransformV3CertsToCerts(fullState.DesiredState.CertificatesBundle)
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func SetUpAuthentication(ctx context.Context, kubeCluster, currentCluster *Cluster) error {
|
||||
if kubeCluster.Authentication.Strategy == X509AuthenticationProvider {
|
||||
var err error
|
||||
if currentCluster != nil {
|
||||
kubeCluster.Certificates = currentCluster.Certificates
|
||||
// this is the case of handling upgrades for API server aggregation layer ca cert and API server proxy client key and cert
|
||||
if kubeCluster.Certificates[pki.RequestHeaderCACertName].Certificate == nil {
|
||||
|
||||
kubeCluster.Certificates, err = regenerateAPIAggregationCerts(kubeCluster, kubeCluster.Certificates)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to regenerate Aggregation layer certificates %v", err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
var backupPlane string
|
||||
var backupHosts []*hosts.Host
|
||||
if len(kubeCluster.Services.Etcd.ExternalURLs) > 0 {
|
||||
backupPlane = ControlPlane
|
||||
backupHosts = kubeCluster.ControlPlaneHosts
|
||||
} else {
|
||||
// Save certificates on etcd and controlplane hosts
|
||||
backupPlane = fmt.Sprintf("%s,%s", EtcdPlane, ControlPlane)
|
||||
backupHosts = hosts.GetUniqueHostList(kubeCluster.EtcdHosts, kubeCluster.ControlPlaneHosts, nil)
|
||||
}
|
||||
log.Infof(ctx, "[certificates] Attempting to recover certificates from backup on [%s] hosts", backupPlane)
|
||||
|
||||
kubeCluster.Certificates, err = fetchBackupCertificates(ctx, backupHosts, kubeCluster)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if kubeCluster.Certificates != nil {
|
||||
log.Infof(ctx, "[certificates] Certificate backup found on [%s] hosts", backupPlane)
|
||||
|
||||
// make sure I have all the etcd certs, We need handle dialer failure for etcd nodes https://github.com/rancher/rancher/issues/12898
|
||||
for _, host := range kubeCluster.EtcdHosts {
|
||||
certName := pki.GetEtcdCrtName(host.InternalAddress)
|
||||
if kubeCluster.Certificates[certName].Certificate == nil {
|
||||
if kubeCluster.Certificates, err = pki.RegenerateEtcdCertificate(ctx,
|
||||
kubeCluster.Certificates,
|
||||
host,
|
||||
kubeCluster.EtcdHosts,
|
||||
kubeCluster.ClusterDomain,
|
||||
kubeCluster.KubernetesServiceIP); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
// this is the case of adding controlplane node on empty cluster with only etcd nodes
|
||||
if kubeCluster.Certificates[pki.KubeAdminCertName].Config == "" && len(kubeCluster.ControlPlaneHosts) > 0 {
|
||||
if err := rebuildLocalAdminConfig(ctx, kubeCluster); err != nil {
|
||||
return err
|
||||
}
|
||||
err = pki.GenerateKubeAPICertificate(ctx, kubeCluster.Certificates, kubeCluster.RancherKubernetesEngineConfig, "", "")
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to regenerate KubeAPI certificate %v", err)
|
||||
}
|
||||
}
|
||||
// this is the case of handling upgrades for API server aggregation layer ca cert and API server proxy client key and cert
|
||||
if kubeCluster.Certificates[pki.RequestHeaderCACertName].Certificate == nil {
|
||||
|
||||
kubeCluster.Certificates, err = regenerateAPIAggregationCerts(kubeCluster, kubeCluster.Certificates)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to regenerate Aggregation layer certificates %v", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
log.Infof(ctx, "[certificates] No Certificate backup found on [%s] hosts", backupPlane)
|
||||
|
||||
kubeCluster.Certificates, err = pki.GenerateRKECerts(ctx, kubeCluster.RancherKubernetesEngineConfig, kubeCluster.LocalKubeConfigPath, "")
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to generate Kubernetes certificates: %v", err)
|
||||
}
|
||||
|
||||
log.Infof(ctx, "[certificates] Temporarily saving certs to [%s] hosts", backupPlane)
|
||||
if err := deployBackupCertificates(ctx, backupHosts, kubeCluster); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof(ctx, "[certificates] Saved certs to [%s] hosts", backupPlane)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func regenerateAPICertificate(c *Cluster, certificates map[string]pki.CertificatePKI) (map[string]pki.CertificatePKI, error) {
|
||||
logrus.Debugf("[certificates] Regenerating kubeAPI certificate")
|
||||
kubeAPIAltNames := pki.GetAltNames(c.ControlPlaneHosts, c.ClusterDomain, c.KubernetesServiceIP, c.Authentication.SANs)
|
||||
|
@@ -150,12 +150,15 @@ func InitClusterObject(ctx context.Context, rkeConfig *v3.RancherKubernetesEngin
|
||||
RancherKubernetesEngineConfig: *rkeConfig,
|
||||
ConfigPath: clusterFilePath,
|
||||
StateFilePath: GetStateFilePath(clusterFilePath, configDir),
|
||||
LocalKubeConfigPath: pki.GetLocalKubeConfig(clusterFilePath, configDir),
|
||||
PrivateRegistriesMap: make(map[string]v3.PrivateRegistry),
|
||||
}
|
||||
if len(c.ConfigPath) == 0 {
|
||||
c.ConfigPath = pki.ClusterConfig
|
||||
}
|
||||
// set kube_config and state file
|
||||
c.LocalKubeConfigPath = pki.GetLocalKubeConfig(c.ConfigPath, configDir)
|
||||
c.StateFilePath = GetStateFilePath(c.ConfigPath, configDir)
|
||||
|
||||
// Setting cluster Defaults
|
||||
c.setClusterDefaults(ctx)
|
||||
// extract cluster network configuration
|
||||
@@ -201,24 +204,6 @@ func (c *Cluster) SetupDialers(ctx context.Context, dockerDialerFactory,
|
||||
return nil
|
||||
}
|
||||
|
||||
func ParseCluster(
|
||||
ctx context.Context,
|
||||
rkeConfig *v3.RancherKubernetesEngineConfig,
|
||||
clusterFilePath, configDir string,
|
||||
dockerDialerFactory,
|
||||
localConnDialerFactory hosts.DialerFactory,
|
||||
k8sWrapTransport k8s.WrapTransport) (*Cluster, error) {
|
||||
var err error
|
||||
// get state filepath
|
||||
c, err := InitClusterObject(ctx, rkeConfig, clusterFilePath, configDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.SetupDialers(ctx, dockerDialerFactory, localConnDialerFactory, k8sWrapTransport)
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func rebuildLocalAdminConfig(ctx context.Context, kubeCluster *Cluster) error {
|
||||
if len(kubeCluster.ControlPlaneHosts) == 0 {
|
||||
return nil
|
||||
@@ -287,10 +272,13 @@ func getLocalAdminConfigWithNewAddress(localConfigPath, cpAddress string, cluste
|
||||
|
||||
func ApplyAuthzResources(ctx context.Context, rkeConfig v3.RancherKubernetesEngineConfig, clusterFilePath, configDir string, k8sWrapTransport k8s.WrapTransport) error {
|
||||
// dialer factories are not needed here since we are not uses docker only k8s jobs
|
||||
kubeCluster, err := ParseCluster(ctx, &rkeConfig, clusterFilePath, configDir, nil, nil, k8sWrapTransport)
|
||||
kubeCluster, err := InitClusterObject(ctx, &rkeConfig, clusterFilePath, configDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := kubeCluster.SetupDialers(ctx, nil, nil, k8sWrapTransport); err != nil {
|
||||
return err
|
||||
}
|
||||
if len(kubeCluster.ControlPlaneHosts) == 0 {
|
||||
return nil
|
||||
}
|
||||
@@ -447,10 +435,13 @@ func ConfigureCluster(
|
||||
k8sWrapTransport k8s.WrapTransport,
|
||||
useKubectl bool) error {
|
||||
// dialer factories are not needed here since we are not uses docker only k8s jobs
|
||||
kubeCluster, err := ParseCluster(ctx, &rkeConfig, clusterFilePath, configDir, nil, nil, k8sWrapTransport)
|
||||
kubeCluster, err := InitClusterObject(ctx, &rkeConfig, clusterFilePath, configDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := kubeCluster.SetupDialers(ctx, nil, nil, k8sWrapTransport); err != nil {
|
||||
return err
|
||||
}
|
||||
kubeCluster.UseKubectlDeploy = useKubectl
|
||||
if len(kubeCluster.ControlPlaneHosts) > 0 {
|
||||
kubeCluster.Certificates = crtBundle
|
||||
|
@@ -139,7 +139,7 @@ func (c *Cluster) SetUpHosts(ctx context.Context, rotateCerts bool) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := pki.DeployAdminConfig(ctx, c.Certificates[pki.KubeAdminCertName].Config, c.LocalKubeConfigPath); err != nil {
|
||||
if err := rebuildLocalAdminConfig(ctx, c); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof(ctx, "[certificates] Successfully deployed kubernetes certificates to Cluster nodes")
|
||||
|
@@ -41,7 +41,7 @@ var admissionControlOptionNames = []string{"enable-admission-plugins", "admissio
|
||||
|
||||
func GeneratePlan(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, hostsInfoMap map[string]types.Info) (v3.RKEPlan, error) {
|
||||
clusterPlan := v3.RKEPlan{}
|
||||
myCluster, err := ParseCluster(ctx, rkeConfig, "", "", nil, nil, nil)
|
||||
myCluster, err := InitClusterObject(ctx, rkeConfig, "", "")
|
||||
if err != nil {
|
||||
return clusterPlan, err
|
||||
}
|
||||
|
@@ -47,12 +47,6 @@ func ReconcileCluster(ctx context.Context, kubeCluster, currentCluster *Cluster,
|
||||
if err := reconcileControl(ctx, currentCluster, kubeCluster, kubeClient); err != nil {
|
||||
return err
|
||||
}
|
||||
// Handle service account token key issue
|
||||
kubeAPICert := currentCluster.Certificates[pki.KubeAPICertName]
|
||||
if currentCluster.Certificates[pki.ServiceAccountTokenKeyName].Key == nil {
|
||||
log.Infof(ctx, "[certificates] Creating service account token key")
|
||||
currentCluster.Certificates[pki.ServiceAccountTokenKeyName] = pki.ToCertObject(pki.ServiceAccountTokenKeyName, pki.ServiceAccountTokenKeyName, "", kubeAPICert.Certificate, kubeAPICert.Key)
|
||||
}
|
||||
log.Infof(ctx, "[reconcile] Reconciled cluster state successfully")
|
||||
return nil
|
||||
}
|
||||
@@ -173,30 +167,11 @@ func reconcileEtcd(ctx context.Context, currentCluster, kubeCluster *Cluster, ku
|
||||
}
|
||||
log.Infof(ctx, "[reconcile] Check etcd hosts to be added")
|
||||
etcdToAdd := hosts.GetToAddHosts(currentCluster.EtcdHosts, kubeCluster.EtcdHosts)
|
||||
crtMap := currentCluster.Certificates
|
||||
var err error
|
||||
for _, etcdHost := range etcdToAdd {
|
||||
kubeCluster.UpdateWorkersOnly = false
|
||||
etcdHost.ToAddEtcdMember = true
|
||||
// Generate new certificate for the new etcd member
|
||||
crtMap, err = pki.RegenerateEtcdCertificate(
|
||||
ctx,
|
||||
crtMap,
|
||||
etcdHost,
|
||||
kubeCluster.EtcdHosts,
|
||||
kubeCluster.ClusterDomain,
|
||||
kubeCluster.KubernetesServiceIP)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
currentCluster.Certificates = crtMap
|
||||
for _, etcdHost := range etcdToAdd {
|
||||
// deploy certificates on new etcd host
|
||||
if err := pki.DeployCertificatesOnHost(ctx, etcdHost, currentCluster.Certificates, kubeCluster.SystemImages.CertDownloader, pki.CertPathPrefix, kubeCluster.PrivateRegistriesMap); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check if the host already part of the cluster -- this will cover cluster with lost quorum
|
||||
isEtcdMember, err := services.IsEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory, clientCert, clientkey)
|
||||
if err != nil {
|
||||
|
@@ -38,6 +38,7 @@ func (c *Cluster) ClusterRemove(ctx context.Context) error {
|
||||
}
|
||||
|
||||
pki.RemoveAdminConfig(ctx, c.LocalKubeConfigPath)
|
||||
RemoveStateFile(ctx, c.StateFilePath)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
143
cluster/state.go
143
cluster/state.go
@@ -40,13 +40,10 @@ type RKEState struct {
|
||||
CertificatesBundle map[string]v3.CertificatePKI `json:"certificatesBundle,omitempty"`
|
||||
}
|
||||
|
||||
func (c *Cluster) UpdateClusterSate(ctx context.Context, fullState *RKEFullState) error {
|
||||
currentState, err := RebuildState(ctx, &c.RancherKubernetesEngineConfig, fullState.CurrentState)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
currentState.CertificatesBundle = TransformCertsToV3Certs(c.Certificates)
|
||||
fullState.CurrentState = currentState
|
||||
// UpdateClusterState will update the cluster's current state
|
||||
func (c *Cluster) UpdateClusterState(ctx context.Context, fullState *RKEFullState) error {
|
||||
fullState.CurrentState.RancherKubernetesEngineConfig = c.RancherKubernetesEngineConfig.DeepCopy()
|
||||
fullState.CurrentState.CertificatesBundle = TransformCertsToV3Certs(c.Certificates)
|
||||
return fullState.WriteStateFile(ctx, c.StateFilePath)
|
||||
}
|
||||
|
||||
@@ -122,13 +119,11 @@ func TransformCertsToV3Certs(in map[string]pki.CertificatePKI) map[string]v3.Cer
|
||||
}
|
||||
return out
|
||||
}
|
||||
func (c *Cluster) NewGetClusterState(ctx context.Context, fullState *RKEFullState, configDir string) (*Cluster, error) {
|
||||
func (c *Cluster) GetClusterState(ctx context.Context, fullState *RKEFullState, configDir string) (*Cluster, error) {
|
||||
var err error
|
||||
// no current state, take me home
|
||||
if fullState.CurrentState.RancherKubernetesEngineConfig == nil {
|
||||
return nil, nil
|
||||
}
|
||||
// Do I still need to check and fix kube config ?
|
||||
|
||||
currentCluster, err := InitClusterObject(ctx, fullState.CurrentState.RancherKubernetesEngineConfig, c.ConfigPath, configDir)
|
||||
if err != nil {
|
||||
@@ -167,84 +162,6 @@ func (c *Cluster) NewGetClusterState(ctx context.Context, fullState *RKEFullStat
|
||||
return currentCluster, nil
|
||||
}
|
||||
|
||||
func (c *Cluster) GetClusterState(ctx context.Context) (*Cluster, error) {
|
||||
var err error
|
||||
var currentCluster *Cluster
|
||||
|
||||
// check if local kubeconfig file exists
|
||||
if _, err = os.Stat(c.LocalKubeConfigPath); !os.IsNotExist(err) {
|
||||
log.Infof(ctx, "[state] Found local kube config file, trying to get state from cluster")
|
||||
|
||||
// to handle if current local admin is down and we need to use new cp from the list
|
||||
if !isLocalConfigWorking(ctx, c.LocalKubeConfigPath, c.K8sWrapTransport) {
|
||||
if err := rebuildLocalAdminConfig(ctx, c); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// initiate kubernetes client
|
||||
c.KubeClient, err = k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
|
||||
if err != nil {
|
||||
log.Warnf(ctx, "Failed to initiate new Kubernetes Client: %v", err)
|
||||
return nil, nil
|
||||
}
|
||||
// Get previous kubernetes state
|
||||
currentCluster, err = getStateFromKubernetes(ctx, c.KubeClient, c.LocalKubeConfigPath)
|
||||
if err != nil {
|
||||
// attempting to fetch state from nodes
|
||||
uniqueHosts := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts)
|
||||
currentCluster = getStateFromNodes(ctx, uniqueHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
|
||||
}
|
||||
// Get previous kubernetes certificates
|
||||
if currentCluster != nil {
|
||||
if err := currentCluster.InvertIndexHosts(); err != nil {
|
||||
return nil, fmt.Errorf("Failed to classify hosts from fetched cluster: %v", err)
|
||||
}
|
||||
activeEtcdHosts := currentCluster.EtcdHosts
|
||||
for _, inactiveHost := range c.InactiveHosts {
|
||||
activeEtcdHosts = removeFromHosts(inactiveHost, activeEtcdHosts)
|
||||
}
|
||||
currentCluster.Certificates, err = getClusterCerts(ctx, c.KubeClient, activeEtcdHosts)
|
||||
// if getting certificates from k8s failed then we attempt to fetch the backup certs
|
||||
if err != nil {
|
||||
backupHosts := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, nil)
|
||||
currentCluster.Certificates, err = fetchBackupCertificates(ctx, backupHosts, c)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to Get Kubernetes certificates: %v", err)
|
||||
}
|
||||
if currentCluster.Certificates != nil {
|
||||
log.Infof(ctx, "[certificates] Certificate backup found on backup hosts")
|
||||
}
|
||||
}
|
||||
currentCluster.DockerDialerFactory = c.DockerDialerFactory
|
||||
currentCluster.LocalConnDialerFactory = c.LocalConnDialerFactory
|
||||
|
||||
// make sure I have all the etcd certs, We need handle dialer failure for etcd nodes https://github.com/rancher/rancher/issues/12898
|
||||
for _, host := range activeEtcdHosts {
|
||||
certName := pki.GetEtcdCrtName(host.InternalAddress)
|
||||
if (currentCluster.Certificates[certName] == pki.CertificatePKI{}) {
|
||||
if currentCluster.Certificates, err = pki.RegenerateEtcdCertificate(ctx,
|
||||
currentCluster.Certificates,
|
||||
host,
|
||||
activeEtcdHosts,
|
||||
currentCluster.ClusterDomain,
|
||||
currentCluster.KubernetesServiceIP); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
// setting cluster defaults for the fetched cluster as well
|
||||
currentCluster.setClusterDefaults(ctx)
|
||||
|
||||
currentCluster.Certificates, err = regenerateAPICertificate(c, currentCluster.Certificates)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to regenerate KubeAPI certificate %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return currentCluster, nil
|
||||
}
|
||||
|
||||
func saveStateToKubernetes(ctx context.Context, kubeClient *kubernetes.Clientset, kubeConfigPath string, rkeConfig *v3.RancherKubernetesEngineConfig) error {
|
||||
log.Infof(ctx, "[state] Saving cluster state to Kubernetes")
|
||||
clusterFile, err := yaml.Marshal(*rkeConfig)
|
||||
@@ -372,21 +289,39 @@ func GetK8sVersion(localConfigPath string, k8sWrapTransport k8s.WrapTransport) (
|
||||
return fmt.Sprintf("%#v", *serverVersion), nil
|
||||
}
|
||||
|
||||
func RebuildState(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, oldState RKEState) (RKEState, error) {
|
||||
var newState RKEState
|
||||
if oldState.CertificatesBundle == nil {
|
||||
func RebuildState(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, oldState *RKEFullState, configPath, configDir string) (*RKEFullState, error) {
|
||||
newState := &RKEFullState{
|
||||
DesiredState: RKEState{
|
||||
RancherKubernetesEngineConfig: rkeConfig.DeepCopy(),
|
||||
},
|
||||
}
|
||||
|
||||
// Rebuilding the certificates of the desired state
|
||||
if oldState.DesiredState.CertificatesBundle == nil {
|
||||
// Get the certificate Bundle
|
||||
certBundle, err := pki.GenerateRKECerts(ctx, *rkeConfig, "", "")
|
||||
if err != nil {
|
||||
return newState, fmt.Errorf("Failed to generate certificate bundle: %v", err)
|
||||
return nil, fmt.Errorf("Failed to generate certificate bundle: %v", err)
|
||||
}
|
||||
// Convert rke certs to v3.certs
|
||||
newState.CertificatesBundle = TransformCertsToV3Certs(certBundle)
|
||||
newState.DesiredState.CertificatesBundle = TransformCertsToV3Certs(certBundle)
|
||||
} else {
|
||||
newState.CertificatesBundle = oldState.CertificatesBundle
|
||||
// Regenerating etcd certificates for any new etcd nodes
|
||||
pkiCertBundle := TransformV3CertsToCerts(oldState.DesiredState.CertificatesBundle)
|
||||
if err := pki.GenerateEtcdCertificates(ctx, pkiCertBundle, *rkeConfig, "", ""); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
newState.RancherKubernetesEngineConfig = rkeConfig
|
||||
|
||||
// Regenerating kubeapi certificates for any new kubeapi nodes
|
||||
if err := pki.GenerateKubeAPICertificate(ctx, pkiCertBundle, *rkeConfig, "", ""); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Regenerating kubeadmin certificates/config
|
||||
if err := pki.GenerateKubeAdminCertificate(ctx, pkiCertBundle, *rkeConfig, configPath, configDir); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
newState.DesiredState.CertificatesBundle = TransformCertsToV3Certs(pkiCertBundle)
|
||||
}
|
||||
newState.CurrentState = oldState.CurrentState
|
||||
return newState, nil
|
||||
}
|
||||
|
||||
@@ -396,7 +331,7 @@ func (s *RKEFullState) WriteStateFile(ctx context.Context, statePath string) err
|
||||
return fmt.Errorf("Failed to Marshal state object: %v", err)
|
||||
}
|
||||
logrus.Debugf("Writing state file: %s", stateFile)
|
||||
if err := ioutil.WriteFile(statePath, []byte(stateFile), 0640); err != nil {
|
||||
if err := ioutil.WriteFile(statePath, stateFile, 0640); err != nil {
|
||||
return fmt.Errorf("Failed to write state file: %v", err)
|
||||
}
|
||||
log.Infof(ctx, "Successfully Deployed state file at [%s]", statePath)
|
||||
@@ -404,6 +339,9 @@ func (s *RKEFullState) WriteStateFile(ctx context.Context, statePath string) err
|
||||
}
|
||||
|
||||
func GetStateFilePath(configPath, configDir string) string {
|
||||
if configPath == "" {
|
||||
configPath = pki.ClusterConfig
|
||||
}
|
||||
baseDir := filepath.Dir(configPath)
|
||||
if len(configDir) > 0 {
|
||||
baseDir = filepath.Dir(configDir)
|
||||
@@ -428,10 +366,19 @@ func ReadStateFile(ctx context.Context, statePath string) (*RKEFullState, error)
|
||||
defer file.Close()
|
||||
buf, err := ioutil.ReadAll(file)
|
||||
if err != nil {
|
||||
return rkeFullState, fmt.Errorf("failed to read file: %v", err)
|
||||
return rkeFullState, fmt.Errorf("failed to read state file: %v", err)
|
||||
}
|
||||
if err := json.Unmarshal(buf, rkeFullState); err != nil {
|
||||
return rkeFullState, fmt.Errorf("failed to unmarshal the state file: %v", err)
|
||||
}
|
||||
return rkeFullState, nil
|
||||
}
|
||||
|
||||
func RemoveStateFile(ctx context.Context, statePath string) {
|
||||
log.Infof(ctx, "Removing state file: %s", statePath)
|
||||
if err := os.Remove(statePath); err != nil {
|
||||
logrus.Warningf("Failed to remove state file: %v", err)
|
||||
return
|
||||
}
|
||||
log.Infof(ctx, "State file removed successfully")
|
||||
}
|
||||
|
@@ -9,11 +9,6 @@ import (
|
||||
)
|
||||
|
||||
func (c *Cluster) ValidateCluster() error {
|
||||
// make sure cluster has at least one controlplane/etcd host
|
||||
if err := ValidateHostCount(c); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// validate duplicate nodes
|
||||
if err := validateDuplicateNodes(c); err != nil {
|
||||
return err
|
||||
|
14
cmd/cert.go
14
cmd/cert.go
@@ -84,21 +84,29 @@ func RotateRKECertificates(
|
||||
local bool, configDir string, components []string, rotateCACerts bool) error {
|
||||
|
||||
log.Infof(ctx, "Rotating Kubernetes cluster certificates")
|
||||
kubeCluster, err := cluster.ParseCluster(ctx, rkeConfig, clusterFilePath, configDir, dockerDialerFactory, localConnDialerFactory, k8sWrapTransport)
|
||||
clusterState, err := cluster.ReadStateFile(ctx, cluster.GetStateFilePath(clusterFilePath, configDir))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, clusterFilePath, configDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := kubeCluster.SetupDialers(ctx, dockerDialerFactory, localConnDialerFactory, k8sWrapTransport); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := kubeCluster.TunnelHosts(ctx, local); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
currentCluster, err := kubeCluster.GetClusterState(ctx)
|
||||
currentCluster, err := kubeCluster.GetClusterState(ctx, clusterState, configDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := cluster.SetUpAuthentication(ctx, kubeCluster, currentCluster); err != nil {
|
||||
if err := cluster.SetUpAuthentication(ctx, kubeCluster, currentCluster, clusterState); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
10
cmd/etcd.go
10
cmd/etcd.go
@@ -57,10 +57,13 @@ func SnapshotSaveEtcdHosts(
|
||||
configDir, snapshotName string) error {
|
||||
|
||||
log.Infof(ctx, "Starting saving snapshot on etcd hosts")
|
||||
kubeCluster, err := cluster.ParseCluster(ctx, rkeConfig, clusterFilePath, configDir, dockerDialerFactory, nil, nil)
|
||||
kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, clusterFilePath, configDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := kubeCluster.SetupDialers(ctx, dockerDialerFactory, nil, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := kubeCluster.TunnelHosts(ctx, false); err != nil {
|
||||
return err
|
||||
@@ -84,10 +87,13 @@ func RestoreEtcdSnapshot(
|
||||
configDir, snapshotName string) error {
|
||||
|
||||
log.Infof(ctx, "Starting restoring snapshot on etcd hosts")
|
||||
kubeCluster, err := cluster.ParseCluster(ctx, rkeConfig, clusterFilePath, configDir, dockerDialerFactory, nil, nil)
|
||||
kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, clusterFilePath, configDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := kubeCluster.SetupDialers(ctx, dockerDialerFactory, nil, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := kubeCluster.TunnelHosts(ctx, false); err != nil {
|
||||
return err
|
||||
|
@@ -58,10 +58,13 @@ func ClusterRemove(
|
||||
local bool, configDir string) error {
|
||||
|
||||
log.Infof(ctx, "Tearing down Kubernetes cluster")
|
||||
kubeCluster, err := cluster.ParseCluster(ctx, rkeConfig, clusterFilePath, configDir, dialerFactory, nil, k8sWrapTransport)
|
||||
kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, clusterFilePath, configDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := kubeCluster.SetupDialers(ctx, dialerFactory, nil, k8sWrapTransport); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = kubeCluster.TunnelHosts(ctx, local)
|
||||
if err != nil {
|
||||
|
62
cmd/up.go
62
cmd/up.go
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/rancher/rke/pki"
|
||||
"github.com/rancher/types/apis/management.cattle.io/v3"
|
||||
"github.com/urfave/cli"
|
||||
"k8s.io/client-go/util/cert"
|
||||
)
|
||||
|
||||
var clusterFilePath string
|
||||
@@ -69,18 +70,18 @@ func ClusterInit(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfi
|
||||
stateFilePath := cluster.GetStateFilePath(clusterFilePath, configDir)
|
||||
rkeFullState, _ := cluster.ReadStateFile(ctx, stateFilePath)
|
||||
|
||||
kubeCluster, err := cluster.ParseCluster(ctx, rkeConfig, clusterFilePath, configDir, nil, nil, nil)
|
||||
kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, clusterFilePath, configDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
desiredState, err := cluster.RebuildState(ctx, &kubeCluster.RancherKubernetesEngineConfig, rkeFullState.DesiredState)
|
||||
fullState, err := cluster.RebuildState(ctx, &kubeCluster.RancherKubernetesEngineConfig, rkeFullState, clusterFilePath, configDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rkeState := cluster.RKEFullState{
|
||||
DesiredState: desiredState,
|
||||
CurrentState: rkeFullState.CurrentState,
|
||||
DesiredState: fullState.DesiredState,
|
||||
CurrentState: fullState.CurrentState,
|
||||
}
|
||||
return rkeState.WriteStateFile(ctx, stateFilePath)
|
||||
}
|
||||
@@ -102,12 +103,13 @@ func ClusterUp(
|
||||
if err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
kubeCluster, err := cluster.InitClusterObject(ctx, clusterState.DesiredState.RancherKubernetesEngineConfig, clusterFilePath, configDir)
|
||||
|
||||
kubeCluster, err := cluster.InitClusterObject(ctx, clusterState.DesiredState.RancherKubernetesEngineConfig.DeepCopy(), clusterFilePath, configDir)
|
||||
if err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
|
||||
err = kubeCluster.SetupDialers(ctx, dockerDialerFactory, localConnDialerFactory, k8sWrapTransport)
|
||||
// kubeCluster, err := cluster.ParseCluster(ctx, clusterState.DesiredState.RancherKubernetesEngineConfig, clusterFilePath, configDir, dockerDialerFactory, localConnDialerFactory, k8sWrapTransport)
|
||||
if err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
@@ -117,43 +119,33 @@ func ClusterUp(
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
|
||||
// 1. fix the kube config if it's broken
|
||||
// 2. connect to k8s
|
||||
// 3. get the state from k8s
|
||||
// 4. if not on k8s we get it from the nodes.
|
||||
// 5. get cluster certificates
|
||||
// 6. update etcd hosts certs
|
||||
// 7. set cluster defaults
|
||||
// 8. regenerate api certificates
|
||||
currentCluster, err := kubeCluster.NewGetClusterState(ctx, clusterState, configDir)
|
||||
currentCluster, err := kubeCluster.GetClusterState(ctx, clusterState, configDir)
|
||||
if err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
|
||||
if !disablePortCheck {
|
||||
if err = kubeCluster.CheckClusterPorts(ctx, currentCluster); err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// 0. check on the auth strategy
|
||||
// 1. if current cluster != nil copy over certs to kubeCluster
|
||||
// 1.1. if there is no pki.RequestHeaderCACertName, generate it
|
||||
// 2. fi there is no current_cluster try to fetch backup
|
||||
// 2.1 if you found backup, handle weird fucking cases
|
||||
// 3. if you don't find backup, generate new certs!
|
||||
// 4. deploy backups
|
||||
// This looks very weird now..
|
||||
err = cluster.NewSetUpAuthentication(ctx, kubeCluster, currentCluster, clusterState)
|
||||
err = cluster.SetUpAuthentication(ctx, kubeCluster, currentCluster, clusterState)
|
||||
if err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
if len(kubeCluster.ControlPlaneHosts) > 0 {
|
||||
APIURL = fmt.Sprintf("https://" + kubeCluster.ControlPlaneHosts[0].Address + ":6443")
|
||||
}
|
||||
clientCert = string(cert.EncodeCertPEM(kubeCluster.Certificates[pki.KubeAdminCertName].Certificate))
|
||||
clientKey = string(cert.EncodePrivateKeyPEM(kubeCluster.Certificates[pki.KubeAdminCertName].Key))
|
||||
caCrt = string(cert.EncodeCertPEM(kubeCluster.Certificates[pki.CACertName].Certificate))
|
||||
|
||||
// if len(kubeCluster.ControlPlaneHosts) > 0 {
|
||||
// APIURL = fmt.Sprintf("https://" + kubeCluster.ControlPlaneHosts[0].Address + ":6443")
|
||||
// }
|
||||
// clientCert = string(cert.EncodeCertPEM(kubeCluster.Certificates[pki.KubeAdminCertName].Certificate))
|
||||
// clientKey = string(cert.EncodePrivateKeyPEM(kubeCluster.Certificates[pki.KubeAdminCertName].Key))
|
||||
// caCrt = string(cert.EncodeCertPEM(kubeCluster.Certificates[pki.CACertName].Certificate))
|
||||
// moved deploying certs before reconcile to remove all unneeded certs generation from reconcile
|
||||
err = kubeCluster.SetUpHosts(ctx, false)
|
||||
if err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
|
||||
err = cluster.ReconcileCluster(ctx, kubeCluster, currentCluster, updateOnly)
|
||||
if err != nil {
|
||||
@@ -164,11 +156,6 @@ func ClusterUp(
|
||||
APIURL = fmt.Sprintf("https://" + kubeCluster.ControlPlaneHosts[0].Address + ":6443")
|
||||
}
|
||||
|
||||
err = kubeCluster.SetUpHosts(ctx, false)
|
||||
if err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
|
||||
if err := kubeCluster.PrePullK8sImages(ctx); err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
@@ -184,10 +171,7 @@ func ClusterUp(
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
|
||||
// 1. save cluster certificates
|
||||
// 2. save cluster state
|
||||
//err = kubeCluster.SaveClusterState(ctx, &kubeCluster.RancherKubernetesEngineConfig)
|
||||
err = kubeCluster.UpdateClusterSate(ctx, clusterState)
|
||||
err = kubeCluster.UpdateClusterState(ctx, clusterState)
|
||||
if err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
|
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"crypto/rsa"
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
"github.com/rancher/rke/hosts"
|
||||
"github.com/rancher/rke/log"
|
||||
@@ -13,8 +14,6 @@ import (
|
||||
|
||||
func GenerateKubeAPICertificate(ctx context.Context, certs map[string]CertificatePKI, rkeConfig v3.RancherKubernetesEngineConfig, configPath, configDir string) error {
|
||||
// generate API certificate and key
|
||||
log.Infof(ctx, "[certificates] Generating Kubernetes API server certificates")
|
||||
var privateAPIKey *rsa.PrivateKey
|
||||
caCrt := certs[CACertName].Certificate
|
||||
caKey := certs[CACertName].Key
|
||||
kubernetesServiceIP, err := GetKubernetesServiceIP(rkeConfig.Services.KubeAPI.ServiceClusterIPRange)
|
||||
@@ -24,15 +23,24 @@ func GenerateKubeAPICertificate(ctx context.Context, certs map[string]Certificat
|
||||
clusterDomain := rkeConfig.Services.Kubelet.ClusterDomain
|
||||
cpHosts := hosts.NodesToHosts(rkeConfig.Nodes, controlRole)
|
||||
kubeAPIAltNames := GetAltNames(cpHosts, clusterDomain, kubernetesServiceIP, rkeConfig.Authentication.SANs)
|
||||
// handle rotation on old clusters
|
||||
if certs[ServiceAccountTokenKeyName].Key == nil {
|
||||
privateAPIKey = certs[KubeAPICertName].Key
|
||||
kubeAPICert := certs[KubeAPICertName].Certificate
|
||||
if kubeAPICert != nil &&
|
||||
reflect.DeepEqual(kubeAPIAltNames.DNSNames, kubeAPICert.DNSNames) &&
|
||||
deepEqualIPsAltNames(kubeAPIAltNames.IPs, kubeAPICert.IPAddresses) {
|
||||
return nil
|
||||
}
|
||||
kubeAPICrt, kubeAPIKey, err := GenerateSignedCertAndKey(caCrt, caKey, true, KubeAPICertName, kubeAPIAltNames, privateAPIKey, nil)
|
||||
log.Infof(ctx, "[certificates] Generating Kubernetes API server certificates")
|
||||
kubeAPICrt, kubeAPIKey, err := GenerateSignedCertAndKey(caCrt, caKey, true, KubeAPICertName, kubeAPIAltNames, certs[KubeAPICertName].Key, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
certs[KubeAPICertName] = ToCertObject(KubeAPICertName, "", "", kubeAPICrt, kubeAPIKey)
|
||||
// handle service account tokens in old clusters
|
||||
apiCert := certs[KubeAPICertName]
|
||||
if certs[ServiceAccountTokenKeyName].Key == nil {
|
||||
log.Infof(ctx, "[certificates] Creating service account token key")
|
||||
certs[ServiceAccountTokenKeyName] = ToCertObject(ServiceAccountTokenKeyName, ServiceAccountTokenKeyName, "", apiCert.Certificate, apiCert.Key)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -163,8 +171,11 @@ func GenerateEtcdCertificates(ctx context.Context, certs map[string]CertificateP
|
||||
etcdHosts := hosts.NodesToHosts(rkeConfig.Nodes, etcdRole)
|
||||
etcdAltNames := GetAltNames(etcdHosts, clusterDomain, kubernetesServiceIP, []string{})
|
||||
for _, host := range etcdHosts {
|
||||
log.Infof(ctx, "[certificates] Generating etcd-%s certificate and key", host.InternalAddress)
|
||||
etcdName := GetEtcdCrtName(host.InternalAddress)
|
||||
if _, ok := certs[etcdName]; ok {
|
||||
continue
|
||||
}
|
||||
log.Infof(ctx, "[certificates] Generating etcd-%s certificate and key", host.InternalAddress)
|
||||
etcdCrt, etcdKey, err := GenerateSignedCertAndKey(caCrt, caKey, true, EtcdCertName, etcdAltNames, nil, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
|
14
pki/util.go
14
pki/util.go
@@ -12,6 +12,7 @@ import (
|
||||
"net"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -380,3 +381,16 @@ func isFileNotFoundErr(e error) bool {
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func deepEqualIPsAltNames(oldIPs, newIPs []net.IP) bool {
|
||||
if len(oldIPs) != len(newIPs) {
|
||||
return false
|
||||
}
|
||||
oldIPsStrings := make([]string, len(oldIPs))
|
||||
newIPsStrings := make([]string, len(newIPs))
|
||||
for i := range oldIPs {
|
||||
oldIPsStrings = append(oldIPsStrings, oldIPs[i].String())
|
||||
newIPsStrings = append(newIPsStrings, newIPs[i].String())
|
||||
}
|
||||
return reflect.DeepEqual(oldIPsStrings, newIPsStrings)
|
||||
}
|
||||
|
Reference in New Issue
Block a user