mirror of
https://github.com/rancher/rke.git
synced 2025-05-12 10:26:20 +00:00
Final fixes and cleanup for state management
Fix dind and local and etcd snapshots add ExternalFlags and dialer options
This commit is contained in:
parent
6da35256a8
commit
696b61679c
cluster
cmd
hosts
pki
vendor/github.com/rancher/types/apis/management.cattle.io/v3
@ -19,9 +19,9 @@ import (
|
||||
"k8s.io/client-go/util/cert"
|
||||
)
|
||||
|
||||
func SetUpAuthentication(ctx context.Context, kubeCluster, currentCluster *Cluster, fullState *RKEFullState) error {
|
||||
func SetUpAuthentication(ctx context.Context, kubeCluster, currentCluster *Cluster, fullState *FullState) error {
|
||||
if kubeCluster.Authentication.Strategy == X509AuthenticationProvider {
|
||||
kubeCluster.Certificates = TransformV3CertsToCerts(fullState.DesiredState.CertificatesBundle)
|
||||
kubeCluster.Certificates = fullState.DesiredState.CertificatesBundle
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
@ -41,10 +41,10 @@ func regenerateAPICertificate(c *Cluster, certificates map[string]pki.Certificat
|
||||
return certificates, nil
|
||||
}
|
||||
|
||||
func GetClusterCertsFromKubernetes(ctx context.Context, localConfigPath string, k8sWrapTransport k8s.WrapTransport, etcdHosts []*hosts.Host) (map[string]pki.CertificatePKI, error) {
|
||||
func GetClusterCertsFromKubernetes(ctx context.Context, kubeCluster *Cluster) (map[string]pki.CertificatePKI, error) {
|
||||
log.Infof(ctx, "[certificates] Getting Cluster certificates from Kubernetes")
|
||||
|
||||
k8sClient, err := k8s.NewClient(localConfigPath, k8sWrapTransport)
|
||||
k8sClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to create Kubernetes Client: %v", err)
|
||||
}
|
||||
@ -61,7 +61,7 @@ func GetClusterCertsFromKubernetes(ctx context.Context, localConfigPath string,
|
||||
pki.ServiceAccountTokenKeyName,
|
||||
}
|
||||
|
||||
for _, etcdHost := range etcdHosts {
|
||||
for _, etcdHost := range kubeCluster.EtcdHosts {
|
||||
etcdName := pki.GetEtcdCrtName(etcdHost.InternalAddress)
|
||||
certificatesNames = append(certificatesNames, etcdName)
|
||||
}
|
||||
@ -97,16 +97,21 @@ func GetClusterCertsFromKubernetes(ctx context.Context, localConfigPath string,
|
||||
if len(secretCert) == 0 || secretKey == nil {
|
||||
return nil, fmt.Errorf("certificate or key of %s is not found", certName)
|
||||
}
|
||||
certificatePEM := string(cert.EncodeCertPEM(secretCert[0]))
|
||||
keyPEM := string(cert.EncodePrivateKeyPEM(secretKey.(*rsa.PrivateKey)))
|
||||
|
||||
certMap[certName] = pki.CertificatePKI{
|
||||
Certificate: secretCert[0],
|
||||
Key: secretKey.(*rsa.PrivateKey),
|
||||
Config: secretConfig,
|
||||
EnvName: string(secret.Data["EnvName"]),
|
||||
ConfigEnvName: string(secret.Data["ConfigEnvName"]),
|
||||
KeyEnvName: string(secret.Data["KeyEnvName"]),
|
||||
Path: string(secret.Data["Path"]),
|
||||
KeyPath: string(secret.Data["KeyPath"]),
|
||||
ConfigPath: string(secret.Data["ConfigPath"]),
|
||||
Certificate: secretCert[0],
|
||||
Key: secretKey.(*rsa.PrivateKey),
|
||||
CertificatePEM: certificatePEM,
|
||||
KeyPEM: keyPEM,
|
||||
Config: secretConfig,
|
||||
EnvName: string(secret.Data["EnvName"]),
|
||||
ConfigEnvName: string(secret.Data["ConfigEnvName"]),
|
||||
KeyEnvName: string(secret.Data["KeyEnvName"]),
|
||||
Path: string(secret.Data["Path"]),
|
||||
KeyPath: string(secret.Data["KeyPath"]),
|
||||
ConfigPath: string(secret.Data["ConfigPath"]),
|
||||
}
|
||||
}
|
||||
// Handle service account token key issue
|
||||
@ -195,37 +200,6 @@ func deployBackupCertificates(ctx context.Context, backupHosts []*hosts.Host, ku
|
||||
return errgrp.Wait()
|
||||
}
|
||||
|
||||
func fetchBackupCertificates(ctx context.Context, backupHosts []*hosts.Host, kubeCluster *Cluster) (map[string]pki.CertificatePKI, error) {
|
||||
var err error
|
||||
certificates := map[string]pki.CertificatePKI{}
|
||||
for _, host := range backupHosts {
|
||||
certificates, err = pki.FetchCertificatesFromHost(ctx, kubeCluster.EtcdHosts, host, kubeCluster.SystemImages.Alpine, kubeCluster.LocalKubeConfigPath, kubeCluster.PrivateRegistriesMap)
|
||||
if certificates != nil {
|
||||
return certificates, nil
|
||||
}
|
||||
}
|
||||
// reporting the last error only.
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func fetchCertificatesFromEtcd(ctx context.Context, kubeCluster *Cluster) ([]byte, []byte, error) {
|
||||
// Get kubernetes certificates from the etcd hosts
|
||||
certificates := map[string]pki.CertificatePKI{}
|
||||
var err error
|
||||
for _, host := range kubeCluster.EtcdHosts {
|
||||
certificates, err = pki.FetchCertificatesFromHost(ctx, kubeCluster.EtcdHosts, host, kubeCluster.SystemImages.Alpine, kubeCluster.LocalKubeConfigPath, kubeCluster.PrivateRegistriesMap)
|
||||
if certificates != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil || certificates == nil {
|
||||
return nil, nil, fmt.Errorf("Failed to fetch certificates from etcd hosts: %v", err)
|
||||
}
|
||||
clientCert := cert.EncodeCertPEM(certificates[pki.KubeNodeCertName].Certificate)
|
||||
clientkey := cert.EncodePrivateKeyPEM(certificates[pki.KubeNodeCertName].Key)
|
||||
return clientCert, clientkey, nil
|
||||
}
|
||||
|
||||
func (c *Cluster) SaveBackupCertificateBundle(ctx context.Context) error {
|
||||
var errgrp errgroup.Group
|
||||
|
||||
@ -300,7 +274,7 @@ func regenerateAPIAggregationCerts(c *Cluster, certificates map[string]pki.Certi
|
||||
return certificates, nil
|
||||
}
|
||||
|
||||
func RotateRKECertificates(ctx context.Context, c *Cluster, configPath, configDir string, components []string, rotateCACerts bool) error {
|
||||
func RotateRKECertificates(ctx context.Context, c *Cluster, flags ExternalFlags) error {
|
||||
var (
|
||||
serviceAccountTokenKey string
|
||||
)
|
||||
@ -312,27 +286,27 @@ func RotateRKECertificates(ctx context.Context, c *Cluster, configPath, configDi
|
||||
services.KubeletContainerName: pki.GenerateKubeNodeCertificate,
|
||||
services.EtcdContainerName: pki.GenerateEtcdCertificates,
|
||||
}
|
||||
if rotateCACerts {
|
||||
if flags.RotateCACerts {
|
||||
// rotate CA cert and RequestHeader CA cert
|
||||
if err := pki.GenerateRKECACerts(ctx, c.Certificates, configPath, configDir); err != nil {
|
||||
if err := pki.GenerateRKECACerts(ctx, c.Certificates, flags.ClusterFilePath, flags.ConfigDir); err != nil {
|
||||
return err
|
||||
}
|
||||
components = nil
|
||||
flags.RotateComponents = nil
|
||||
}
|
||||
for _, k8sComponent := range components {
|
||||
for _, k8sComponent := range flags.RotateComponents {
|
||||
genFunc := componentsCertsFuncMap[k8sComponent]
|
||||
if genFunc != nil {
|
||||
if err := genFunc(ctx, c.Certificates, c.RancherKubernetesEngineConfig, configPath, configDir); err != nil {
|
||||
if err := genFunc(ctx, c.Certificates, c.RancherKubernetesEngineConfig, flags.ClusterFilePath, flags.ConfigDir); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(components) == 0 {
|
||||
if len(flags.RotateComponents) == 0 {
|
||||
// do not rotate service account token
|
||||
if c.Certificates[pki.ServiceAccountTokenKeyName].Key != nil {
|
||||
serviceAccountTokenKey = string(cert.EncodePrivateKeyPEM(c.Certificates[pki.ServiceAccountTokenKeyName].Key))
|
||||
}
|
||||
if err := pki.GenerateRKEServicesCerts(ctx, c.Certificates, c.RancherKubernetesEngineConfig, configPath, configDir); err != nil {
|
||||
if err := pki.GenerateRKEServicesCerts(ctx, c.Certificates, c.RancherKubernetesEngineConfig, flags.ClusterFilePath, flags.ConfigDir); err != nil {
|
||||
return err
|
||||
}
|
||||
if serviceAccountTokenKey != "" {
|
||||
|
@ -28,28 +28,29 @@ import (
|
||||
)
|
||||
|
||||
type Cluster struct {
|
||||
v3.RancherKubernetesEngineConfig `yaml:",inline"`
|
||||
ConfigPath string
|
||||
LocalKubeConfigPath string
|
||||
StateFilePath string
|
||||
EtcdHosts []*hosts.Host
|
||||
WorkerHosts []*hosts.Host
|
||||
ConfigDir string
|
||||
CloudConfigFile string
|
||||
ControlPlaneHosts []*hosts.Host
|
||||
InactiveHosts []*hosts.Host
|
||||
EtcdReadyHosts []*hosts.Host
|
||||
KubeClient *kubernetes.Clientset
|
||||
KubernetesServiceIP net.IP
|
||||
Certificates map[string]pki.CertificatePKI
|
||||
ClusterDomain string
|
||||
ClusterCIDR string
|
||||
ClusterDNSServer string
|
||||
DockerDialerFactory hosts.DialerFactory
|
||||
EtcdHosts []*hosts.Host
|
||||
EtcdReadyHosts []*hosts.Host
|
||||
InactiveHosts []*hosts.Host
|
||||
K8sWrapTransport k8s.WrapTransport
|
||||
KubeClient *kubernetes.Clientset
|
||||
KubernetesServiceIP net.IP
|
||||
LocalKubeConfigPath string
|
||||
LocalConnDialerFactory hosts.DialerFactory
|
||||
PrivateRegistriesMap map[string]v3.PrivateRegistry
|
||||
K8sWrapTransport k8s.WrapTransport
|
||||
StateFilePath string
|
||||
UseKubectlDeploy bool
|
||||
UpdateWorkersOnly bool
|
||||
CloudConfigFile string
|
||||
v3.RancherKubernetesEngineConfig `yaml:",inline"`
|
||||
WorkerHosts []*hosts.Host
|
||||
}
|
||||
|
||||
const (
|
||||
@ -145,25 +146,30 @@ func ParseConfig(clusterFile string) (*v3.RancherKubernetesEngineConfig, error)
|
||||
return &rkeConfig, nil
|
||||
}
|
||||
|
||||
func InitClusterObject(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, clusterFilePath, configDir string) (*Cluster, error) {
|
||||
func InitClusterObject(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, flags ExternalFlags) (*Cluster, error) {
|
||||
// basic cluster object from rkeConfig
|
||||
c := &Cluster{
|
||||
RancherKubernetesEngineConfig: *rkeConfig,
|
||||
ConfigPath: clusterFilePath,
|
||||
StateFilePath: GetStateFilePath(clusterFilePath, configDir),
|
||||
ConfigPath: flags.ClusterFilePath,
|
||||
ConfigDir: flags.ConfigDir,
|
||||
StateFilePath: GetStateFilePath(flags.ClusterFilePath, flags.ConfigDir),
|
||||
PrivateRegistriesMap: make(map[string]v3.PrivateRegistry),
|
||||
}
|
||||
if len(c.ConfigPath) == 0 {
|
||||
c.ConfigPath = pki.ClusterConfig
|
||||
}
|
||||
// set kube_config and state file
|
||||
c.LocalKubeConfigPath = pki.GetLocalKubeConfig(c.ConfigPath, configDir)
|
||||
c.StateFilePath = GetStateFilePath(c.ConfigPath, configDir)
|
||||
c.LocalKubeConfigPath = pki.GetLocalKubeConfig(c.ConfigPath, c.ConfigDir)
|
||||
c.StateFilePath = GetStateFilePath(c.ConfigPath, c.ConfigDir)
|
||||
|
||||
// Setting cluster Defaults
|
||||
c.setClusterDefaults(ctx)
|
||||
// extract cluster network configuration
|
||||
c.setNetworkOptions()
|
||||
// Register cloud provider
|
||||
if err := c.setCloudProvider(); err != nil {
|
||||
return nil, fmt.Errorf("Failed to register cloud provider: %v", err)
|
||||
}
|
||||
// set hosts groups
|
||||
if err := c.InvertIndexHosts(); err != nil {
|
||||
return nil, fmt.Errorf("Failed to classify hosts from config file: %v", err)
|
||||
@ -187,13 +193,10 @@ func (c *Cluster) setNetworkOptions() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) SetupDialers(ctx context.Context, dockerDialerFactory,
|
||||
localConnDialerFactory hosts.DialerFactory,
|
||||
k8sWrapTransport k8s.WrapTransport) error {
|
||||
|
||||
c.DockerDialerFactory = dockerDialerFactory
|
||||
c.LocalConnDialerFactory = localConnDialerFactory
|
||||
c.K8sWrapTransport = k8sWrapTransport
|
||||
func (c *Cluster) SetupDialers(ctx context.Context, dailersOptions hosts.DialersOptions) error {
|
||||
c.DockerDialerFactory = dailersOptions.DockerDialerFactory
|
||||
c.LocalConnDialerFactory = dailersOptions.LocalConnDialerFactory
|
||||
c.K8sWrapTransport = dailersOptions.K8sWrapTransport
|
||||
// Create k8s wrap transport for bastion host
|
||||
if len(c.BastionHost.Address) > 0 {
|
||||
var err error
|
||||
@ -275,13 +278,13 @@ func getLocalAdminConfigWithNewAddress(localConfigPath, cpAddress string, cluste
|
||||
string(config.KeyData))
|
||||
}
|
||||
|
||||
func ApplyAuthzResources(ctx context.Context, rkeConfig v3.RancherKubernetesEngineConfig, clusterFilePath, configDir string, k8sWrapTransport k8s.WrapTransport) error {
|
||||
func ApplyAuthzResources(ctx context.Context, rkeConfig v3.RancherKubernetesEngineConfig, flags ExternalFlags, dailersOptions hosts.DialersOptions) error {
|
||||
// dialer factories are not needed here since we are not uses docker only k8s jobs
|
||||
kubeCluster, err := InitClusterObject(ctx, &rkeConfig, clusterFilePath, configDir)
|
||||
kubeCluster, err := InitClusterObject(ctx, &rkeConfig, flags)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := kubeCluster.SetupDialers(ctx, nil, nil, k8sWrapTransport); err != nil {
|
||||
if err := kubeCluster.SetupDialers(ctx, dailersOptions); err != nil {
|
||||
return err
|
||||
}
|
||||
if len(kubeCluster.ControlPlaneHosts) == 0 {
|
||||
@ -436,15 +439,15 @@ func ConfigureCluster(
|
||||
ctx context.Context,
|
||||
rkeConfig v3.RancherKubernetesEngineConfig,
|
||||
crtBundle map[string]pki.CertificatePKI,
|
||||
clusterFilePath, configDir string,
|
||||
k8sWrapTransport k8s.WrapTransport,
|
||||
flags ExternalFlags,
|
||||
dailersOptions hosts.DialersOptions,
|
||||
useKubectl bool) error {
|
||||
// dialer factories are not needed here since we are not uses docker only k8s jobs
|
||||
kubeCluster, err := InitClusterObject(ctx, &rkeConfig, clusterFilePath, configDir)
|
||||
kubeCluster, err := InitClusterObject(ctx, &rkeConfig, flags)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := kubeCluster.SetupDialers(ctx, nil, nil, k8sWrapTransport); err != nil {
|
||||
if err := kubeCluster.SetupDialers(ctx, dailersOptions); err != nil {
|
||||
return err
|
||||
}
|
||||
kubeCluster.UseKubectlDeploy = useKubectl
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/rancher/rke/cloudprovider"
|
||||
"github.com/rancher/rke/docker"
|
||||
"github.com/rancher/rke/k8s"
|
||||
"github.com/rancher/rke/log"
|
||||
@ -43,6 +44,16 @@ const (
|
||||
DefaultEtcdElectionTimeoutValue = "5000"
|
||||
)
|
||||
|
||||
type ExternalFlags struct {
|
||||
ConfigDir string
|
||||
ClusterFilePath string
|
||||
DisablePortCheck bool
|
||||
Local bool
|
||||
RotateCACerts bool
|
||||
RotateComponents []string
|
||||
UpdateOnly bool
|
||||
}
|
||||
|
||||
func setDefaultIfEmptyMapValue(configMap map[string]string, key string, value string) {
|
||||
if _, ok := configMap[key]; !ok {
|
||||
configMap[key] = value
|
||||
@ -260,3 +271,33 @@ func d(image, defaultRegistryURL string) string {
|
||||
}
|
||||
return fmt.Sprintf("%s/%s", defaultRegistryURL, image)
|
||||
}
|
||||
|
||||
func (c *Cluster) setCloudProvider() error {
|
||||
p, err := cloudprovider.InitCloudProvider(c.CloudProvider)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to initialize cloud provider: %v", err)
|
||||
}
|
||||
if p != nil {
|
||||
c.CloudConfigFile, err = p.GenerateCloudConfigFile()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to parse cloud config file: %v", err)
|
||||
}
|
||||
c.CloudProvider.Name = p.GetName()
|
||||
if c.CloudProvider.Name == "" {
|
||||
return fmt.Errorf("Name of the cloud provider is not defined for custom provider")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetExternalFlags(local, rotateca, updateOnly, disablePortCheck bool, RotateComponents []string, configDir, clusterFilePath string) ExternalFlags {
|
||||
return ExternalFlags{
|
||||
Local: local,
|
||||
UpdateOnly: updateOnly,
|
||||
DisablePortCheck: disablePortCheck,
|
||||
ConfigDir: configDir,
|
||||
ClusterFilePath: clusterFilePath,
|
||||
RotateCACerts: rotateca,
|
||||
RotateComponents: RotateComponents,
|
||||
}
|
||||
}
|
||||
|
@ -23,8 +23,8 @@ const (
|
||||
workerRoleLabel = "node-role.kubernetes.io/worker"
|
||||
)
|
||||
|
||||
func (c *Cluster) TunnelHosts(ctx context.Context, local bool) error {
|
||||
if local {
|
||||
func (c *Cluster) TunnelHosts(ctx context.Context, flags ExternalFlags) error {
|
||||
if flags.Local {
|
||||
if err := c.ControlPlaneHosts[0].TunnelUpLocal(ctx, c.Version); err != nil {
|
||||
return fmt.Errorf("Failed to connect to docker for local host [%s]: %v", c.EtcdHosts[0].Address, err)
|
||||
}
|
||||
|
@ -41,7 +41,7 @@ var admissionControlOptionNames = []string{"enable-admission-plugins", "admissio
|
||||
|
||||
func GeneratePlan(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, hostsInfoMap map[string]types.Info) (v3.RKEPlan, error) {
|
||||
clusterPlan := v3.RKEPlan{}
|
||||
myCluster, err := InitClusterObject(ctx, rkeConfig, "", "")
|
||||
myCluster, err := InitClusterObject(ctx, rkeConfig, ExternalFlags{})
|
||||
if err != nil {
|
||||
return clusterPlan, err
|
||||
}
|
||||
@ -236,16 +236,16 @@ func (c *Cluster) BuildKubeControllerProcess(prefixPath string) v3.Process {
|
||||
}
|
||||
|
||||
CommandArgs := map[string]string{
|
||||
"address": "0.0.0.0",
|
||||
"cloud-provider": c.CloudProvider.Name,
|
||||
"allow-untagged-cloud": "true",
|
||||
"configure-cloud-routes": "false",
|
||||
"leader-elect": "true",
|
||||
"kubeconfig": pki.GetConfigPath(pki.KubeControllerCertName),
|
||||
"enable-hostpath-provisioner": "false",
|
||||
"node-monitor-grace-period": "40s",
|
||||
"pod-eviction-timeout": "5m0s",
|
||||
"v": "2",
|
||||
"address": "0.0.0.0",
|
||||
"cloud-provider": c.CloudProvider.Name,
|
||||
"allow-untagged-cloud": "true",
|
||||
"configure-cloud-routes": "false",
|
||||
"leader-elect": "true",
|
||||
"kubeconfig": pki.GetConfigPath(pki.KubeControllerCertName),
|
||||
"enable-hostpath-provisioner": "false",
|
||||
"node-monitor-grace-period": "40s",
|
||||
"pod-eviction-timeout": "5m0s",
|
||||
"v": "2",
|
||||
"allocate-node-cidrs": "true",
|
||||
"cluster-cidr": c.ClusterCIDR,
|
||||
"service-cluster-ip-range": c.Services.KubeController.ServiceClusterIPRange,
|
||||
@ -462,8 +462,8 @@ func (c *Cluster) BuildKubeProxyProcess(host *hosts.Host, prefixPath string) v3.
|
||||
}
|
||||
|
||||
CommandArgs := map[string]string{
|
||||
"cluster-cidr": c.ClusterCIDR,
|
||||
"v": "2",
|
||||
"cluster-cidr": c.ClusterCIDR,
|
||||
"v": "2",
|
||||
"healthz-bind-address": "0.0.0.0",
|
||||
"hostname-override": host.HostnameOverride,
|
||||
"kubeconfig": pki.GetConfigPath(pki.KubeProxyCertName),
|
||||
@ -507,17 +507,17 @@ func (c *Cluster) BuildKubeProxyProcess(host *hosts.Host, prefixPath string) v3.
|
||||
}
|
||||
registryAuthConfig, _, _ := docker.GetImageRegistryConfig(c.Services.Kubeproxy.Image, c.PrivateRegistriesMap)
|
||||
return v3.Process{
|
||||
Name: services.KubeproxyContainerName,
|
||||
Command: Command,
|
||||
VolumesFrom: VolumesFrom,
|
||||
Binds: getUniqStringList(Binds),
|
||||
Env: c.Services.Kubeproxy.ExtraEnv,
|
||||
NetworkMode: "host",
|
||||
RestartPolicy: "always",
|
||||
PidMode: "host",
|
||||
Privileged: true,
|
||||
HealthCheck: healthCheck,
|
||||
Image: c.Services.Kubeproxy.Image,
|
||||
Name: services.KubeproxyContainerName,
|
||||
Command: Command,
|
||||
VolumesFrom: VolumesFrom,
|
||||
Binds: getUniqStringList(Binds),
|
||||
Env: c.Services.Kubeproxy.ExtraEnv,
|
||||
NetworkMode: "host",
|
||||
RestartPolicy: "always",
|
||||
PidMode: "host",
|
||||
Privileged: true,
|
||||
HealthCheck: healthCheck,
|
||||
Image: c.Services.Kubeproxy.Image,
|
||||
ImageRegistryAuthConfig: registryAuthConfig,
|
||||
Labels: map[string]string{
|
||||
ContainerNameLabel: services.KubeproxyContainerName,
|
||||
@ -540,12 +540,12 @@ func (c *Cluster) BuildProxyProcess() v3.Process {
|
||||
Name: services.NginxProxyContainerName,
|
||||
Env: Env,
|
||||
// we do this to force container update when CP hosts change.
|
||||
Args: Env,
|
||||
Command: []string{"nginx-proxy"},
|
||||
NetworkMode: "host",
|
||||
RestartPolicy: "always",
|
||||
HealthCheck: v3.HealthCheck{},
|
||||
Image: c.SystemImages.NginxProxy,
|
||||
Args: Env,
|
||||
Command: []string{"nginx-proxy"},
|
||||
NetworkMode: "host",
|
||||
RestartPolicy: "always",
|
||||
HealthCheck: v3.HealthCheck{},
|
||||
Image: c.SystemImages.NginxProxy,
|
||||
ImageRegistryAuthConfig: registryAuthConfig,
|
||||
Labels: map[string]string{
|
||||
ContainerNameLabel: services.NginxProxyContainerName,
|
||||
|
@ -20,9 +20,9 @@ const (
|
||||
unschedulableControlTaint = "node-role.kubernetes.io/controlplane=true:NoSchedule"
|
||||
)
|
||||
|
||||
func ReconcileCluster(ctx context.Context, kubeCluster, currentCluster *Cluster, updateOnly bool) error {
|
||||
func ReconcileCluster(ctx context.Context, kubeCluster, currentCluster *Cluster, flags ExternalFlags) error {
|
||||
log.Infof(ctx, "[reconcile] Reconciling cluster state")
|
||||
kubeCluster.UpdateWorkersOnly = updateOnly
|
||||
kubeCluster.UpdateWorkersOnly = flags.UpdateOnly
|
||||
if currentCluster == nil {
|
||||
log.Infof(ctx, "[reconcile] This is newly generated cluster")
|
||||
kubeCluster.UpdateWorkersOnly = false
|
||||
|
@ -38,7 +38,7 @@ func (c *Cluster) ClusterRemove(ctx context.Context) error {
|
||||
}
|
||||
|
||||
pki.RemoveAdminConfig(ctx, c.LocalKubeConfigPath)
|
||||
RemoveStateFile(ctx, c.StateFilePath)
|
||||
removeStateFile(ctx, c.StateFilePath)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
254
cluster/state.go
254
cluster/state.go
@ -2,12 +2,10 @@ package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rsa"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
@ -16,153 +14,56 @@ import (
|
||||
"github.com/rancher/rke/k8s"
|
||||
"github.com/rancher/rke/log"
|
||||
"github.com/rancher/rke/pki"
|
||||
"github.com/rancher/rke/util"
|
||||
"github.com/rancher/types/apis/management.cattle.io/v3"
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"gopkg.in/yaml.v2"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/util/cert"
|
||||
)
|
||||
|
||||
const (
|
||||
stateFileExt = ".rkestate"
|
||||
)
|
||||
|
||||
type RKEFullState struct {
|
||||
DesiredState RKEState `json:"desiredState,omitempty"`
|
||||
CurrentState RKEState `json:"currentState,omitempty"`
|
||||
type FullState struct {
|
||||
DesiredState State `json:"desiredState,omitempty"`
|
||||
CurrentState State `json:"currentState,omitempty"`
|
||||
}
|
||||
|
||||
type RKEState struct {
|
||||
type State struct {
|
||||
RancherKubernetesEngineConfig *v3.RancherKubernetesEngineConfig `json:"rkeConfig,omitempty"`
|
||||
CertificatesBundle map[string]v3.CertificatePKI `json:"certificatesBundle,omitempty"`
|
||||
CertificatesBundle map[string]pki.CertificatePKI `json:"certificatesBundle,omitempty"`
|
||||
}
|
||||
|
||||
// UpdateClusterState will update the cluster's current state
|
||||
func (c *Cluster) UpdateClusterState(ctx context.Context, fullState *RKEFullState) error {
|
||||
func (c *Cluster) UpdateClusterCurrentState(ctx context.Context, fullState *FullState) error {
|
||||
fullState.CurrentState.RancherKubernetesEngineConfig = c.RancherKubernetesEngineConfig.DeepCopy()
|
||||
fullState.CurrentState.CertificatesBundle = TransformCertsToV3Certs(c.Certificates)
|
||||
fullState.CurrentState.CertificatesBundle = c.Certificates
|
||||
return fullState.WriteStateFile(ctx, c.StateFilePath)
|
||||
}
|
||||
|
||||
func (c *Cluster) SaveClusterState(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig) error {
|
||||
if len(c.ControlPlaneHosts) > 0 {
|
||||
// Reinitialize kubernetes Client
|
||||
var err error
|
||||
c.KubeClient, err = k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to re-initialize Kubernetes Client: %v", err)
|
||||
}
|
||||
err = saveClusterCerts(ctx, c.KubeClient, c.Certificates)
|
||||
if err != nil {
|
||||
return fmt.Errorf("[certificates] Failed to Save Kubernetes certificates: %v", err)
|
||||
}
|
||||
err = saveStateToKubernetes(ctx, c.KubeClient, c.LocalKubeConfigPath, rkeConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("[state] Failed to save configuration state to k8s: %v", err)
|
||||
}
|
||||
}
|
||||
// save state to the cluster nodes as a backup
|
||||
uniqueHosts := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts)
|
||||
if err := saveStateToNodes(ctx, uniqueHosts, rkeConfig, c.SystemImages.Alpine, c.PrivateRegistriesMap); err != nil {
|
||||
return fmt.Errorf("[state] Failed to save configuration state to nodes: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func TransformV3CertsToCerts(in map[string]v3.CertificatePKI) map[string]pki.CertificatePKI {
|
||||
out := map[string]pki.CertificatePKI{}
|
||||
for k, v := range in {
|
||||
certs, _ := cert.ParseCertsPEM([]byte(v.Certificate))
|
||||
key, _ := cert.ParsePrivateKeyPEM([]byte(v.Key))
|
||||
o := pki.CertificatePKI{
|
||||
ConfigEnvName: v.ConfigEnvName,
|
||||
Name: v.Name,
|
||||
Config: v.Config,
|
||||
CommonName: v.CommonName,
|
||||
OUName: v.OUName,
|
||||
EnvName: v.EnvName,
|
||||
Path: v.Path,
|
||||
KeyEnvName: v.KeyEnvName,
|
||||
KeyPath: v.KeyPath,
|
||||
ConfigPath: v.ConfigPath,
|
||||
Certificate: certs[0],
|
||||
Key: key.(*rsa.PrivateKey),
|
||||
}
|
||||
out[k] = o
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func TransformCertsToV3Certs(in map[string]pki.CertificatePKI) map[string]v3.CertificatePKI {
|
||||
out := map[string]v3.CertificatePKI{}
|
||||
for k, v := range in {
|
||||
certificate := string(cert.EncodeCertPEM(v.Certificate))
|
||||
key := string(cert.EncodePrivateKeyPEM(v.Key))
|
||||
o := v3.CertificatePKI{
|
||||
Name: v.Name,
|
||||
Config: v.Config,
|
||||
Certificate: certificate,
|
||||
Key: key,
|
||||
EnvName: v.EnvName,
|
||||
KeyEnvName: v.KeyEnvName,
|
||||
ConfigEnvName: v.ConfigEnvName,
|
||||
Path: v.Path,
|
||||
KeyPath: v.KeyPath,
|
||||
ConfigPath: v.ConfigPath,
|
||||
CommonName: v.CommonName,
|
||||
OUName: v.OUName,
|
||||
}
|
||||
out[k] = o
|
||||
}
|
||||
return out
|
||||
}
|
||||
func (c *Cluster) GetClusterState(ctx context.Context, fullState *RKEFullState, configDir string) (*Cluster, error) {
|
||||
func (c *Cluster) GetClusterState(ctx context.Context, fullState *FullState) (*Cluster, error) {
|
||||
var err error
|
||||
if fullState.CurrentState.RancherKubernetesEngineConfig == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
currentCluster, err := InitClusterObject(ctx, fullState.CurrentState.RancherKubernetesEngineConfig, c.ConfigPath, configDir)
|
||||
// resetup external flags
|
||||
flags := GetExternalFlags(false, false, false, false, nil, c.ConfigDir, c.ConfigPath)
|
||||
currentCluster, err := InitClusterObject(ctx, fullState.CurrentState.RancherKubernetesEngineConfig, flags)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
currentCluster.Certificates = fullState.CurrentState.CertificatesBundle
|
||||
|
||||
currentCluster.Certificates = TransformV3CertsToCerts(fullState.CurrentState.CertificatesBundle)
|
||||
|
||||
currentCluster.SetupDialers(ctx, c.DockerDialerFactory, c.LocalConnDialerFactory, c.K8sWrapTransport)
|
||||
|
||||
activeEtcdHosts := currentCluster.EtcdHosts
|
||||
for _, inactiveHost := range c.InactiveHosts {
|
||||
activeEtcdHosts = removeFromHosts(inactiveHost, activeEtcdHosts)
|
||||
// resetup dialers
|
||||
dialerOptions := hosts.GetDialerOptions(c.DockerDialerFactory, c.LocalConnDialerFactory, c.K8sWrapTransport)
|
||||
if err := currentCluster.SetupDialers(ctx, dialerOptions); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// make sure I have all the etcd certs, We need handle dialer failure for etcd nodes https://github.com/rancher/rancher/issues/12898
|
||||
for _, host := range activeEtcdHosts {
|
||||
certName := pki.GetEtcdCrtName(host.InternalAddress)
|
||||
if (currentCluster.Certificates[certName] == pki.CertificatePKI{}) {
|
||||
if currentCluster.Certificates, err = pki.RegenerateEtcdCertificate(ctx,
|
||||
currentCluster.Certificates,
|
||||
host,
|
||||
activeEtcdHosts,
|
||||
currentCluster.ClusterDomain,
|
||||
currentCluster.KubernetesServiceIP); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
currentCluster.Certificates, err = regenerateAPICertificate(c, currentCluster.Certificates)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to regenerate KubeAPI certificate %v", err)
|
||||
}
|
||||
|
||||
currentCluster.setClusterDefaults(ctx)
|
||||
|
||||
return currentCluster, nil
|
||||
}
|
||||
func SaveFullStateToKubernetes(ctx context.Context, localConfigPath string, k8sWrapTransport k8s.WrapTransport, fullState *RKEFullState) error {
|
||||
k8sClient, err := k8s.NewClient(localConfigPath, k8sWrapTransport)
|
||||
|
||||
func SaveFullStateToKubernetes(ctx context.Context, kubeCluster *Cluster, fullState *FullState) error {
|
||||
k8sClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to create Kubernetes Client: %v", err)
|
||||
}
|
||||
@ -192,63 +93,9 @@ func SaveFullStateToKubernetes(ctx context.Context, localConfigPath string, k8sW
|
||||
}
|
||||
}
|
||||
|
||||
func saveStateToKubernetes(ctx context.Context, kubeClient *kubernetes.Clientset, kubeConfigPath string, rkeConfig *v3.RancherKubernetesEngineConfig) error {
|
||||
log.Infof(ctx, "[state] Saving cluster state to Kubernetes")
|
||||
clusterFile, err := yaml.Marshal(*rkeConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
timeout := make(chan bool, 1)
|
||||
go func() {
|
||||
for {
|
||||
_, err := k8s.UpdateConfigMap(kubeClient, clusterFile, StateConfigMapName)
|
||||
if err != nil {
|
||||
time.Sleep(time.Second * 5)
|
||||
continue
|
||||
}
|
||||
log.Infof(ctx, "[state] Successfully Saved cluster state to Kubernetes ConfigMap: %s", StateConfigMapName)
|
||||
timeout <- true
|
||||
break
|
||||
}
|
||||
}()
|
||||
select {
|
||||
case <-timeout:
|
||||
return nil
|
||||
case <-time.After(time.Second * UpdateStateTimeout):
|
||||
return fmt.Errorf("[state] Timeout waiting for kubernetes to be ready")
|
||||
}
|
||||
}
|
||||
|
||||
func saveStateToNodes(ctx context.Context, uniqueHosts []*hosts.Host, clusterState *v3.RancherKubernetesEngineConfig, alpineImage string, prsMap map[string]v3.PrivateRegistry) error {
|
||||
log.Infof(ctx, "[state] Saving cluster state to cluster nodes")
|
||||
clusterFile, err := yaml.Marshal(*clusterState)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var errgrp errgroup.Group
|
||||
|
||||
hostsQueue := util.GetObjectQueue(uniqueHosts)
|
||||
for w := 0; w < WorkerThreads; w++ {
|
||||
errgrp.Go(func() error {
|
||||
var errList []error
|
||||
for host := range hostsQueue {
|
||||
err := pki.DeployStateOnPlaneHost(ctx, host.(*hosts.Host), alpineImage, prsMap, string(clusterFile))
|
||||
if err != nil {
|
||||
errList = append(errList, err)
|
||||
}
|
||||
}
|
||||
return util.ErrList(errList)
|
||||
})
|
||||
}
|
||||
if err := errgrp.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetStateFromKubernetes(ctx context.Context, kubeConfigPath string, k8sWrapTransport k8s.WrapTransport) (*Cluster, error) {
|
||||
func GetStateFromKubernetes(ctx context.Context, kubeCluster *Cluster) (*Cluster, error) {
|
||||
log.Infof(ctx, "[state] Fetching cluster state from Kubernetes")
|
||||
k8sClient, err := k8s.NewClient(kubeConfigPath, k8sWrapTransport)
|
||||
k8sClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to create Kubernetes Client: %v", err)
|
||||
}
|
||||
@ -281,32 +128,6 @@ func GetStateFromKubernetes(ctx context.Context, kubeConfigPath string, k8sWrapT
|
||||
}
|
||||
}
|
||||
|
||||
func getStateFromNodes(ctx context.Context, uniqueHosts []*hosts.Host, alpineImage string, prsMap map[string]v3.PrivateRegistry) *Cluster {
|
||||
log.Infof(ctx, "[state] Fetching cluster state from Nodes")
|
||||
var currentCluster Cluster
|
||||
var clusterFile string
|
||||
var err error
|
||||
|
||||
for _, host := range uniqueHosts {
|
||||
filePath := path.Join(host.PrefixPath, pki.TempCertPath, pki.ClusterStateFile)
|
||||
clusterFile, err = pki.FetchFileFromHost(ctx, filePath, alpineImage, host, prsMap, pki.StateDeployerContainerName, "state")
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(clusterFile) == 0 {
|
||||
return nil
|
||||
}
|
||||
err = yaml.Unmarshal([]byte(clusterFile), ¤tCluster)
|
||||
if err != nil {
|
||||
logrus.Debugf("[state] Failed to unmarshal the cluster file fetched from nodes: %v", err)
|
||||
return nil
|
||||
}
|
||||
log.Infof(ctx, "[state] Successfully fetched cluster state from Nodes")
|
||||
return ¤tCluster
|
||||
|
||||
}
|
||||
|
||||
func GetK8sVersion(localConfigPath string, k8sWrapTransport k8s.WrapTransport) (string, error) {
|
||||
logrus.Debugf("[version] Using %s to connect to Kubernetes cluster..", localConfigPath)
|
||||
k8sClient, err := k8s.NewClient(localConfigPath, k8sWrapTransport)
|
||||
@ -322,9 +143,9 @@ func GetK8sVersion(localConfigPath string, k8sWrapTransport k8s.WrapTransport) (
|
||||
return fmt.Sprintf("%#v", *serverVersion), nil
|
||||
}
|
||||
|
||||
func RebuildState(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, oldState *RKEFullState, configPath, configDir string) (*RKEFullState, error) {
|
||||
newState := &RKEFullState{
|
||||
DesiredState: RKEState{
|
||||
func RebuildState(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, oldState *FullState, flags ExternalFlags) (*FullState, error) {
|
||||
newState := &FullState{
|
||||
DesiredState: State{
|
||||
RancherKubernetesEngineConfig: rkeConfig.DeepCopy(),
|
||||
},
|
||||
}
|
||||
@ -336,11 +157,10 @@ func RebuildState(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConf
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to generate certificate bundle: %v", err)
|
||||
}
|
||||
// Convert rke certs to v3.certs
|
||||
newState.DesiredState.CertificatesBundle = TransformCertsToV3Certs(certBundle)
|
||||
newState.DesiredState.CertificatesBundle = certBundle
|
||||
} else {
|
||||
// Regenerating etcd certificates for any new etcd nodes
|
||||
pkiCertBundle := TransformV3CertsToCerts(oldState.DesiredState.CertificatesBundle)
|
||||
pkiCertBundle := oldState.DesiredState.CertificatesBundle
|
||||
if err := pki.GenerateEtcdCertificates(ctx, pkiCertBundle, *rkeConfig, "", ""); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -349,16 +169,16 @@ func RebuildState(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConf
|
||||
return nil, err
|
||||
}
|
||||
// Regenerating kubeadmin certificates/config
|
||||
if err := pki.GenerateKubeAdminCertificate(ctx, pkiCertBundle, *rkeConfig, configPath, configDir); err != nil {
|
||||
if err := pki.GenerateKubeAdminCertificate(ctx, pkiCertBundle, *rkeConfig, flags.ClusterFilePath, flags.ConfigDir); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
newState.DesiredState.CertificatesBundle = TransformCertsToV3Certs(pkiCertBundle)
|
||||
newState.DesiredState.CertificatesBundle = pkiCertBundle
|
||||
}
|
||||
newState.CurrentState = oldState.CurrentState
|
||||
return newState, nil
|
||||
}
|
||||
|
||||
func (s *RKEFullState) WriteStateFile(ctx context.Context, statePath string) error {
|
||||
func (s *FullState) WriteStateFile(ctx context.Context, statePath string) error {
|
||||
stateFile, err := json.MarshalIndent(s, "", " ")
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to Marshal state object: %v", err)
|
||||
@ -386,8 +206,8 @@ func GetStateFilePath(configPath, configDir string) string {
|
||||
return trimmedName + stateFileExt
|
||||
}
|
||||
|
||||
func ReadStateFile(ctx context.Context, statePath string) (*RKEFullState, error) {
|
||||
rkeFullState := &RKEFullState{}
|
||||
func ReadStateFile(ctx context.Context, statePath string) (*FullState, error) {
|
||||
rkeFullState := &FullState{}
|
||||
fp, err := filepath.Abs(statePath)
|
||||
if err != nil {
|
||||
return rkeFullState, fmt.Errorf("failed to lookup current directory name: %v", err)
|
||||
@ -404,10 +224,12 @@ func ReadStateFile(ctx context.Context, statePath string) (*RKEFullState, error)
|
||||
if err := json.Unmarshal(buf, rkeFullState); err != nil {
|
||||
return rkeFullState, fmt.Errorf("failed to unmarshal the state file: %v", err)
|
||||
}
|
||||
rkeFullState.DesiredState.CertificatesBundle = pki.TransformPEMToObject(rkeFullState.DesiredState.CertificatesBundle)
|
||||
rkeFullState.CurrentState.CertificatesBundle = pki.TransformPEMToObject(rkeFullState.CurrentState.CertificatesBundle)
|
||||
return rkeFullState, nil
|
||||
}
|
||||
|
||||
func RemoveStateFile(ctx context.Context, statePath string) {
|
||||
func removeStateFile(ctx context.Context, statePath string) {
|
||||
log.Infof(ctx, "Removing state file: %s", statePath)
|
||||
if err := os.Remove(statePath); err != nil {
|
||||
logrus.Warningf("Failed to remove state file: %v", err)
|
||||
@ -415,11 +237,3 @@ func RemoveStateFile(ctx context.Context, statePath string) {
|
||||
}
|
||||
log.Infof(ctx, "State file removed successfully")
|
||||
}
|
||||
|
||||
func RemoveLegacyStateFromKubernets(ctx context.Context, kubeConfigPath string, k8sWrapTransport k8s.WrapTransport) error {
|
||||
k8sClient, err := k8s.NewClient(kubeConfigPath, k8sWrapTransport)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to create Kubernetes Client: %v", err)
|
||||
}
|
||||
return k8s.DeleteConfigMap(k8sClient, StateConfigMapName)
|
||||
}
|
||||
|
35
cmd/cert.go
35
cmd/cert.go
@ -6,7 +6,6 @@ import (
|
||||
|
||||
"github.com/rancher/rke/cluster"
|
||||
"github.com/rancher/rke/hosts"
|
||||
"github.com/rancher/rke/k8s"
|
||||
"github.com/rancher/rke/log"
|
||||
"github.com/rancher/rke/pki"
|
||||
"github.com/rancher/rke/services"
|
||||
@ -58,7 +57,6 @@ func rotateRKECertificatesFromCli(ctx *cli.Context) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to resolve cluster file: %v", err)
|
||||
}
|
||||
clusterFilePath = filePath
|
||||
|
||||
rkeConfig, err := cluster.ParseConfig(clusterFile)
|
||||
if err != nil {
|
||||
@ -68,40 +66,37 @@ func rotateRKECertificatesFromCli(ctx *cli.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// setting up the flags
|
||||
flags := cluster.GetExternalFlags(false, rotateCACert, false, false, k8sComponent, "", filePath)
|
||||
|
||||
return RotateRKECertificates(context.Background(), rkeConfig, nil, nil, nil, false, "", k8sComponent, rotateCACert)
|
||||
return RotateRKECertificates(context.Background(), rkeConfig, hosts.DialersOptions{}, flags)
|
||||
}
|
||||
|
||||
func showRKECertificatesFromCli(ctx *cli.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func RotateRKECertificates(
|
||||
ctx context.Context,
|
||||
rkeConfig *v3.RancherKubernetesEngineConfig,
|
||||
dockerDialerFactory, localConnDialerFactory hosts.DialerFactory,
|
||||
k8sWrapTransport k8s.WrapTransport,
|
||||
local bool, configDir string, components []string, rotateCACerts bool) error {
|
||||
func RotateRKECertificates(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, dialersOptions hosts.DialersOptions, flags cluster.ExternalFlags) error {
|
||||
|
||||
log.Infof(ctx, "Rotating Kubernetes cluster certificates")
|
||||
clusterState, err := cluster.ReadStateFile(ctx, cluster.GetStateFilePath(clusterFilePath, configDir))
|
||||
clusterState, err := cluster.ReadStateFile(ctx, cluster.GetStateFilePath(flags.ClusterFilePath, flags.ConfigDir))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, clusterFilePath, configDir)
|
||||
kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, flags)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := kubeCluster.SetupDialers(ctx, dockerDialerFactory, localConnDialerFactory, k8sWrapTransport); err != nil {
|
||||
if err := kubeCluster.SetupDialers(ctx, dialersOptions); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := kubeCluster.TunnelHosts(ctx, local); err != nil {
|
||||
if err := kubeCluster.TunnelHosts(ctx, flags); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
currentCluster, err := kubeCluster.GetClusterState(ctx, clusterState, configDir)
|
||||
currentCluster, err := kubeCluster.GetClusterState(ctx, clusterState)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -110,7 +105,7 @@ func RotateRKECertificates(
|
||||
return err
|
||||
}
|
||||
|
||||
if err := cluster.RotateRKECertificates(ctx, kubeCluster, clusterFilePath, configDir, components, rotateCACerts); err != nil {
|
||||
if err := cluster.RotateRKECertificates(ctx, kubeCluster, flags); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -119,11 +114,11 @@ func RotateRKECertificates(
|
||||
}
|
||||
// Restarting Kubernetes components
|
||||
servicesMap := make(map[string]bool)
|
||||
for _, component := range components {
|
||||
for _, component := range flags.RotateComponents {
|
||||
servicesMap[component] = true
|
||||
}
|
||||
|
||||
if len(components) == 0 || rotateCACerts || servicesMap[services.EtcdContainerName] {
|
||||
if len(flags.RotateComponents) == 0 || flags.RotateCACerts || servicesMap[services.EtcdContainerName] {
|
||||
if err := services.RestartEtcdPlane(ctx, kubeCluster.EtcdHosts); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -138,11 +133,7 @@ func RotateRKECertificates(
|
||||
return err
|
||||
}
|
||||
|
||||
if err := kubeCluster.SaveClusterState(ctx, &kubeCluster.RancherKubernetesEngineConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if rotateCACerts {
|
||||
if flags.RotateCACerts {
|
||||
return cluster.RestartClusterPods(ctx, kubeCluster)
|
||||
}
|
||||
return nil
|
||||
|
32
cmd/etcd.go
32
cmd/etcd.go
@ -53,19 +53,19 @@ func EtcdCommand() cli.Command {
|
||||
func SnapshotSaveEtcdHosts(
|
||||
ctx context.Context,
|
||||
rkeConfig *v3.RancherKubernetesEngineConfig,
|
||||
dockerDialerFactory hosts.DialerFactory,
|
||||
configDir, snapshotName string) error {
|
||||
dialersOptions hosts.DialersOptions,
|
||||
flags cluster.ExternalFlags, snapshotName string) error {
|
||||
|
||||
log.Infof(ctx, "Starting saving snapshot on etcd hosts")
|
||||
kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, clusterFilePath, configDir)
|
||||
kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, flags)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := kubeCluster.SetupDialers(ctx, dockerDialerFactory, nil, nil); err != nil {
|
||||
if err := kubeCluster.SetupDialers(ctx, dialersOptions); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := kubeCluster.TunnelHosts(ctx, false); err != nil {
|
||||
if err := kubeCluster.TunnelHosts(ctx, flags); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := kubeCluster.SnapshotEtcd(ctx, snapshotName); err != nil {
|
||||
@ -83,19 +83,19 @@ func SnapshotSaveEtcdHosts(
|
||||
func RestoreEtcdSnapshot(
|
||||
ctx context.Context,
|
||||
rkeConfig *v3.RancherKubernetesEngineConfig,
|
||||
dockerDialerFactory hosts.DialerFactory,
|
||||
configDir, snapshotName string) error {
|
||||
dialersOptions hosts.DialersOptions,
|
||||
flags cluster.ExternalFlags, snapshotName string) error {
|
||||
|
||||
log.Infof(ctx, "Starting restoring snapshot on etcd hosts")
|
||||
kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, clusterFilePath, configDir)
|
||||
kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, flags)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := kubeCluster.SetupDialers(ctx, dockerDialerFactory, nil, nil); err != nil {
|
||||
if err := kubeCluster.SetupDialers(ctx, dialersOptions); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := kubeCluster.TunnelHosts(ctx, false); err != nil {
|
||||
if err := kubeCluster.TunnelHosts(ctx, flags); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := kubeCluster.RestoreEtcdSnapshot(ctx, snapshotName); err != nil {
|
||||
@ -113,7 +113,6 @@ func SnapshotSaveEtcdHostsFromCli(ctx *cli.Context) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to resolve cluster file: %v", err)
|
||||
}
|
||||
clusterFilePath = filePath
|
||||
|
||||
rkeConfig, err := cluster.ParseConfig(clusterFile)
|
||||
if err != nil {
|
||||
@ -130,7 +129,10 @@ func SnapshotSaveEtcdHostsFromCli(ctx *cli.Context) error {
|
||||
etcdSnapshotName = fmt.Sprintf("rke_etcd_snapshot_%s", time.Now().Format(time.RFC3339))
|
||||
logrus.Warnf("Name of the snapshot is not specified using [%s]", etcdSnapshotName)
|
||||
}
|
||||
return SnapshotSaveEtcdHosts(context.Background(), rkeConfig, nil, "", etcdSnapshotName)
|
||||
// setting up the flags
|
||||
flags := cluster.GetExternalFlags(false, false, false, false, nil, "", filePath)
|
||||
|
||||
return SnapshotSaveEtcdHosts(context.Background(), rkeConfig, hosts.DialersOptions{}, flags, etcdSnapshotName)
|
||||
}
|
||||
|
||||
func RestoreEtcdSnapshotFromCli(ctx *cli.Context) error {
|
||||
@ -138,7 +140,6 @@ func RestoreEtcdSnapshotFromCli(ctx *cli.Context) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to resolve cluster file: %v", err)
|
||||
}
|
||||
clusterFilePath = filePath
|
||||
|
||||
rkeConfig, err := cluster.ParseConfig(clusterFile)
|
||||
if err != nil {
|
||||
@ -153,6 +154,9 @@ func RestoreEtcdSnapshotFromCli(ctx *cli.Context) error {
|
||||
if etcdSnapshotName == "" {
|
||||
return fmt.Errorf("You must specify the snapshot name to restore")
|
||||
}
|
||||
return RestoreEtcdSnapshot(context.Background(), rkeConfig, nil, "", etcdSnapshotName)
|
||||
// setting up the flags
|
||||
flags := cluster.GetExternalFlags(false, false, false, false, nil, "", filePath)
|
||||
|
||||
return RestoreEtcdSnapshot(context.Background(), rkeConfig, hosts.DialersOptions{}, flags, etcdSnapshotName)
|
||||
|
||||
}
|
||||
|
@ -10,7 +10,6 @@ import (
|
||||
"github.com/rancher/rke/cluster"
|
||||
"github.com/rancher/rke/dind"
|
||||
"github.com/rancher/rke/hosts"
|
||||
"github.com/rancher/rke/k8s"
|
||||
"github.com/rancher/rke/log"
|
||||
"github.com/rancher/rke/pki"
|
||||
"github.com/rancher/types/apis/management.cattle.io/v3"
|
||||
@ -53,20 +52,19 @@ func RemoveCommand() cli.Command {
|
||||
func ClusterRemove(
|
||||
ctx context.Context,
|
||||
rkeConfig *v3.RancherKubernetesEngineConfig,
|
||||
dialerFactory hosts.DialerFactory,
|
||||
k8sWrapTransport k8s.WrapTransport,
|
||||
local bool, configDir string) error {
|
||||
dialersOptions hosts.DialersOptions,
|
||||
flags cluster.ExternalFlags) error {
|
||||
|
||||
log.Infof(ctx, "Tearing down Kubernetes cluster")
|
||||
kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, clusterFilePath, configDir)
|
||||
kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, flags)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := kubeCluster.SetupDialers(ctx, dialerFactory, nil, k8sWrapTransport); err != nil {
|
||||
if err := kubeCluster.SetupDialers(ctx, dialersOptions); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = kubeCluster.TunnelHosts(ctx, local)
|
||||
err = kubeCluster.TunnelHosts(ctx, flags)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -105,7 +103,6 @@ func clusterRemoveFromCli(ctx *cli.Context) error {
|
||||
if ctx.Bool("dind") {
|
||||
return clusterRemoveDind(ctx)
|
||||
}
|
||||
clusterFilePath = filePath
|
||||
rkeConfig, err := cluster.ParseConfig(clusterFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to parse cluster file: %v", err)
|
||||
@ -116,7 +113,10 @@ func clusterRemoveFromCli(ctx *cli.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
return ClusterRemove(context.Background(), rkeConfig, nil, nil, false, "")
|
||||
// setting up the flags
|
||||
flags := cluster.GetExternalFlags(false, false, false, false, nil, "", filePath)
|
||||
|
||||
return ClusterRemove(context.Background(), rkeConfig, hosts.DialersOptions{}, flags)
|
||||
}
|
||||
|
||||
func clusterRemoveLocal(ctx *cli.Context) error {
|
||||
@ -126,7 +126,6 @@ func clusterRemoveLocal(ctx *cli.Context) error {
|
||||
log.Warnf(context.Background(), "Failed to resolve cluster file, using default cluster instead")
|
||||
rkeConfig = cluster.GetLocalRKEConfig()
|
||||
} else {
|
||||
clusterFilePath = filePath
|
||||
rkeConfig, err = cluster.ParseConfig(clusterFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to parse cluster file: %v", err)
|
||||
@ -138,8 +137,10 @@ func clusterRemoveLocal(ctx *cli.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// setting up the flags
|
||||
flags := cluster.GetExternalFlags(true, false, false, false, nil, "", filePath)
|
||||
|
||||
return ClusterRemove(context.Background(), rkeConfig, nil, nil, true, "")
|
||||
return ClusterRemove(context.Background(), rkeConfig, hosts.DialersOptions{}, flags)
|
||||
}
|
||||
|
||||
func clusterRemoveDind(ctx *cli.Context) error {
|
||||
|
123
cmd/up.go
123
cmd/up.go
@ -10,7 +10,6 @@ import (
|
||||
"github.com/rancher/rke/cluster"
|
||||
"github.com/rancher/rke/dind"
|
||||
"github.com/rancher/rke/hosts"
|
||||
"github.com/rancher/rke/k8s"
|
||||
"github.com/rancher/rke/log"
|
||||
"github.com/rancher/rke/pki"
|
||||
"github.com/rancher/types/apis/management.cattle.io/v3"
|
||||
@ -19,8 +18,6 @@ import (
|
||||
"k8s.io/client-go/util/cert"
|
||||
)
|
||||
|
||||
var clusterFilePath string
|
||||
|
||||
const DINDWaitTime = 3
|
||||
|
||||
func UpCommand() cli.Command {
|
||||
@ -53,7 +50,7 @@ func UpCommand() cli.Command {
|
||||
},
|
||||
cli.BoolFlag{
|
||||
Name: "init",
|
||||
Usage: "test init",
|
||||
Usage: "Initiate RKE cluster",
|
||||
},
|
||||
}
|
||||
|
||||
@ -66,13 +63,14 @@ func UpCommand() cli.Command {
|
||||
Flags: upFlags,
|
||||
}
|
||||
}
|
||||
func doUpgradeLegacyCluster(ctx context.Context, kubeCluster *cluster.Cluster, fullState *cluster.RKEFullState, k8sWrapTransport k8s.WrapTransport) error {
|
||||
|
||||
func doUpgradeLegacyCluster(ctx context.Context, kubeCluster *cluster.Cluster, fullState *cluster.FullState) error {
|
||||
if _, err := os.Stat(kubeCluster.LocalKubeConfigPath); os.IsNotExist(err) {
|
||||
// there is no kubeconfig. This is a new cluster
|
||||
logrus.Debug("[state] local kubeconfig not found, this is a new cluster")
|
||||
return nil
|
||||
}
|
||||
if fullState.CurrentState.RancherKubernetesEngineConfig != nil {
|
||||
if _, err := os.Stat(kubeCluster.StateFilePath); err == nil {
|
||||
// this cluster has a previous state, I don't need to upgrade!
|
||||
logrus.Debug("[state] previous state found, this is not a legacy cluster")
|
||||
return nil
|
||||
@ -83,95 +81,86 @@ func doUpgradeLegacyCluster(ctx context.Context, kubeCluster *cluster.Cluster, f
|
||||
if err := cluster.RebuildKubeconfig(ctx, kubeCluster); err != nil {
|
||||
return err
|
||||
}
|
||||
recoveredCluster, err := cluster.GetStateFromKubernetes(ctx, kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport)
|
||||
recoveredCluster, err := cluster.GetStateFromKubernetes(ctx, kubeCluster)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// if we found a recovered cluster, we will need override the current state
|
||||
if recoveredCluster != nil {
|
||||
recoveredCerts, err := cluster.GetClusterCertsFromKubernetes(ctx, kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport, kubeCluster.EtcdHosts)
|
||||
recoveredCerts, err := cluster.GetClusterCertsFromKubernetes(ctx, kubeCluster)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fullState.CurrentState.RancherKubernetesEngineConfig = recoveredCluster.RancherKubernetesEngineConfig.DeepCopy()
|
||||
fullState.CurrentState.CertificatesBundle = cluster.TransformCertsToV3Certs(recoveredCerts)
|
||||
fullState.CurrentState.CertificatesBundle = recoveredCerts
|
||||
|
||||
// we don't want to regenerate certificates
|
||||
fullState.DesiredState.CertificatesBundle = cluster.TransformCertsToV3Certs(recoveredCerts)
|
||||
if err = fullState.WriteStateFile(ctx, kubeCluster.StateFilePath); err != nil {
|
||||
return err
|
||||
}
|
||||
return cluster.RemoveLegacyStateFromKubernets(ctx, kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport)
|
||||
fullState.DesiredState.CertificatesBundle = recoveredCerts
|
||||
return fullState.WriteStateFile(ctx, kubeCluster.StateFilePath)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func ClusterInit(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, configDir string, k8sWrapTransport k8s.WrapTransport) error {
|
||||
func ClusterInit(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, dialersOptions hosts.DialersOptions, flags cluster.ExternalFlags) error {
|
||||
log.Infof(ctx, "Initiating Kubernetes cluster")
|
||||
stateFilePath := cluster.GetStateFilePath(clusterFilePath, configDir)
|
||||
stateFilePath := cluster.GetStateFilePath(flags.ClusterFilePath, flags.ConfigDir)
|
||||
rkeFullState, _ := cluster.ReadStateFile(ctx, stateFilePath)
|
||||
kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, clusterFilePath, configDir)
|
||||
kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, flags)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := kubeCluster.SetupDialers(ctx, dialersOptions); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = doUpgradeLegacyCluster(ctx, kubeCluster, rkeFullState, k8sWrapTransport)
|
||||
err = doUpgradeLegacyCluster(ctx, kubeCluster, rkeFullState)
|
||||
if err != nil {
|
||||
log.Warnf(ctx, "[state] can't fetch legacy cluster state from Kubernetes")
|
||||
}
|
||||
|
||||
fullState, err := cluster.RebuildState(ctx, &kubeCluster.RancherKubernetesEngineConfig, rkeFullState, clusterFilePath, configDir)
|
||||
fullState, err := cluster.RebuildState(ctx, &kubeCluster.RancherKubernetesEngineConfig, rkeFullState, flags)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rkeState := cluster.RKEFullState{
|
||||
rkeState := cluster.FullState{
|
||||
DesiredState: fullState.DesiredState,
|
||||
CurrentState: fullState.CurrentState,
|
||||
}
|
||||
return rkeState.WriteStateFile(ctx, stateFilePath)
|
||||
}
|
||||
|
||||
func ClusterUp(
|
||||
ctx context.Context,
|
||||
rkeConfig *v3.RancherKubernetesEngineConfig,
|
||||
dockerDialerFactory, localConnDialerFactory hosts.DialerFactory,
|
||||
k8sWrapTransport k8s.WrapTransport,
|
||||
local bool, configDir string, updateOnly, disablePortCheck bool) (string, string, string, string, map[string]pki.CertificatePKI, error) {
|
||||
|
||||
func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags cluster.ExternalFlags) (string, string, string, string, map[string]pki.CertificatePKI, error) {
|
||||
log.Infof(ctx, "Building Kubernetes cluster")
|
||||
var APIURL, caCrt, clientCert, clientKey string
|
||||
|
||||
// is tehre any chance we can store the cluster object here instead of the rke config ?
|
||||
// I can change the function signiture, should be simpler
|
||||
// No, I would stil have to parse the cluster
|
||||
clusterState, err := cluster.ReadStateFile(ctx, cluster.GetStateFilePath(clusterFilePath, configDir))
|
||||
clusterState, err := cluster.ReadStateFile(ctx, cluster.GetStateFilePath(flags.ClusterFilePath, flags.ConfigDir))
|
||||
if err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
|
||||
kubeCluster, err := cluster.InitClusterObject(ctx, clusterState.DesiredState.RancherKubernetesEngineConfig.DeepCopy(), clusterFilePath, configDir)
|
||||
kubeCluster, err := cluster.InitClusterObject(ctx, clusterState.DesiredState.RancherKubernetesEngineConfig.DeepCopy(), flags)
|
||||
if err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
|
||||
err = kubeCluster.SetupDialers(ctx, dockerDialerFactory, localConnDialerFactory, k8sWrapTransport)
|
||||
err = kubeCluster.SetupDialers(ctx, dialersOptions)
|
||||
if err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
|
||||
err = kubeCluster.TunnelHosts(ctx, local)
|
||||
err = kubeCluster.TunnelHosts(ctx, flags)
|
||||
if err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
|
||||
currentCluster, err := kubeCluster.GetClusterState(ctx, clusterState, configDir)
|
||||
currentCluster, err := kubeCluster.GetClusterState(ctx, clusterState)
|
||||
if err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
|
||||
if !disablePortCheck {
|
||||
if !flags.DisablePortCheck {
|
||||
if err = kubeCluster.CheckClusterPorts(ctx, currentCluster); err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
@ -194,7 +183,7 @@ func ClusterUp(
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
|
||||
err = cluster.ReconcileCluster(ctx, kubeCluster, currentCluster, updateOnly)
|
||||
err = cluster.ReconcileCluster(ctx, kubeCluster, currentCluster, flags)
|
||||
if err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
@ -213,17 +202,17 @@ func ClusterUp(
|
||||
}
|
||||
|
||||
// Apply Authz configuration after deploying controlplane
|
||||
err = cluster.ApplyAuthzResources(ctx, kubeCluster.RancherKubernetesEngineConfig, clusterFilePath, configDir, k8sWrapTransport)
|
||||
err = cluster.ApplyAuthzResources(ctx, kubeCluster.RancherKubernetesEngineConfig, flags, dialersOptions)
|
||||
if err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
|
||||
err = kubeCluster.UpdateClusterState(ctx, clusterState)
|
||||
err = kubeCluster.UpdateClusterCurrentState(ctx, clusterState)
|
||||
if err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
|
||||
err = cluster.SaveFullStateToKubernetes(ctx, kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport, clusterState)
|
||||
err = cluster.SaveFullStateToKubernetes(ctx, kubeCluster, clusterState)
|
||||
if err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
@ -242,7 +231,7 @@ func ClusterUp(
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
|
||||
err = cluster.ConfigureCluster(ctx, kubeCluster.RancherKubernetesEngineConfig, kubeCluster.Certificates, clusterFilePath, configDir, k8sWrapTransport, false)
|
||||
err = cluster.ConfigureCluster(ctx, kubeCluster.RancherKubernetesEngineConfig, kubeCluster.Certificates, flags, dialersOptions, false)
|
||||
if err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||
}
|
||||
@ -279,7 +268,6 @@ func clusterUpFromCli(ctx *cli.Context) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to resolve cluster file: %v", err)
|
||||
}
|
||||
clusterFilePath = filePath
|
||||
|
||||
rkeConfig, err := cluster.ParseConfig(clusterFile)
|
||||
if err != nil {
|
||||
@ -292,13 +280,17 @@ func clusterUpFromCli(ctx *cli.Context) error {
|
||||
}
|
||||
updateOnly := ctx.Bool("update-only")
|
||||
disablePortCheck := ctx.Bool("disable-port-check")
|
||||
// setting up the flags
|
||||
flags := cluster.GetExternalFlags(false, false, updateOnly, disablePortCheck, nil, "", filePath)
|
||||
|
||||
if ctx.Bool("init") {
|
||||
return ClusterInit(context.Background(), rkeConfig, "", nil)
|
||||
return ClusterInit(context.Background(), rkeConfig, hosts.DialersOptions{}, flags)
|
||||
}
|
||||
if err := ClusterInit(context.Background(), rkeConfig, "", nil); err != nil {
|
||||
if err := ClusterInit(context.Background(), rkeConfig, hosts.DialersOptions{}, flags); err != nil {
|
||||
return err
|
||||
}
|
||||
_, _, _, _, _, err = ClusterUp(context.Background(), rkeConfig, nil, nil, nil, false, "", updateOnly, disablePortCheck)
|
||||
|
||||
_, _, _, _, _, err = ClusterUp(context.Background(), hosts.DialersOptions{}, flags)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -309,7 +301,6 @@ func clusterUpLocal(ctx *cli.Context) error {
|
||||
log.Infof(context.Background(), "Failed to resolve cluster file, using default cluster instead")
|
||||
rkeConfig = cluster.GetLocalRKEConfig()
|
||||
} else {
|
||||
clusterFilePath = filePath
|
||||
rkeConfig, err = cluster.ParseConfig(clusterFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to parse cluster file: %v", err)
|
||||
@ -319,13 +310,24 @@ func clusterUpLocal(ctx *cli.Context) error {
|
||||
|
||||
rkeConfig.IgnoreDockerVersion = ctx.Bool("ignore-docker-version")
|
||||
|
||||
_, _, _, _, _, err = ClusterUp(context.Background(), rkeConfig, nil, hosts.LocalHealthcheckFactory, nil, true, "", false, false)
|
||||
// setting up the dialers
|
||||
dialers := hosts.GetDialerOptions(nil, hosts.LocalHealthcheckFactory, nil)
|
||||
// setting up the flags
|
||||
flags := cluster.GetExternalFlags(true, false, false, false, nil, "", filePath)
|
||||
|
||||
if ctx.Bool("init") {
|
||||
return ClusterInit(context.Background(), rkeConfig, dialers, flags)
|
||||
}
|
||||
if err := ClusterInit(context.Background(), rkeConfig, dialers, flags); err != nil {
|
||||
return err
|
||||
}
|
||||
_, _, _, _, _, err = ClusterUp(context.Background(), dialers, flags)
|
||||
return err
|
||||
}
|
||||
|
||||
func clusterUpDind(ctx *cli.Context) error {
|
||||
// get dind config
|
||||
rkeConfig, disablePortCheck, dindStorageDriver, err := getDindConfig(ctx)
|
||||
rkeConfig, disablePortCheck, dindStorageDriver, filePath, err := getDindConfig(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -333,29 +335,40 @@ func clusterUpDind(ctx *cli.Context) error {
|
||||
if err = createDINDEnv(context.Background(), rkeConfig, dindStorageDriver); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// setting up the dialers
|
||||
dialers := hosts.GetDialerOptions(hosts.DindConnFactory, hosts.DindHealthcheckConnFactory, nil)
|
||||
// setting up flags
|
||||
flags := cluster.GetExternalFlags(false, false, false, disablePortCheck, nil, "", filePath)
|
||||
|
||||
if ctx.Bool("init") {
|
||||
return ClusterInit(context.Background(), rkeConfig, dialers, flags)
|
||||
}
|
||||
if err := ClusterInit(context.Background(), rkeConfig, dialers, flags); err != nil {
|
||||
return err
|
||||
}
|
||||
// start cluster
|
||||
_, _, _, _, _, err = ClusterUp(context.Background(), rkeConfig, hosts.DindConnFactory, hosts.DindHealthcheckConnFactory, nil, false, "", false, disablePortCheck)
|
||||
_, _, _, _, _, err = ClusterUp(context.Background(), dialers, flags)
|
||||
return err
|
||||
}
|
||||
|
||||
func getDindConfig(ctx *cli.Context) (*v3.RancherKubernetesEngineConfig, bool, string, error) {
|
||||
func getDindConfig(ctx *cli.Context) (*v3.RancherKubernetesEngineConfig, bool, string, string, error) {
|
||||
disablePortCheck := ctx.Bool("disable-port-check")
|
||||
dindStorageDriver := ctx.String("dind-storage-driver")
|
||||
|
||||
clusterFile, filePath, err := resolveClusterFile(ctx)
|
||||
if err != nil {
|
||||
return nil, disablePortCheck, "", fmt.Errorf("Failed to resolve cluster file: %v", err)
|
||||
return nil, disablePortCheck, "", "", fmt.Errorf("Failed to resolve cluster file: %v", err)
|
||||
}
|
||||
clusterFilePath = filePath
|
||||
|
||||
rkeConfig, err := cluster.ParseConfig(clusterFile)
|
||||
if err != nil {
|
||||
return nil, disablePortCheck, "", fmt.Errorf("Failed to parse cluster file: %v", err)
|
||||
return nil, disablePortCheck, "", "", fmt.Errorf("Failed to parse cluster file: %v", err)
|
||||
}
|
||||
|
||||
rkeConfig, err = setOptionsFromCLI(ctx, rkeConfig)
|
||||
if err != nil {
|
||||
return nil, disablePortCheck, "", err
|
||||
return nil, disablePortCheck, "", "", err
|
||||
}
|
||||
// Setting conntrack max for kubeproxy to 0
|
||||
if rkeConfig.Services.Kubeproxy.ExtraArgs == nil {
|
||||
@ -363,7 +376,7 @@ func getDindConfig(ctx *cli.Context) (*v3.RancherKubernetesEngineConfig, bool, s
|
||||
}
|
||||
rkeConfig.Services.Kubeproxy.ExtraArgs["conntrack-max-per-core"] = "0"
|
||||
|
||||
return rkeConfig, disablePortCheck, dindStorageDriver, nil
|
||||
return rkeConfig, disablePortCheck, dindStorageDriver, filePath, nil
|
||||
}
|
||||
|
||||
func createDINDEnv(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, dindStorageDriver string) error {
|
||||
|
@ -29,6 +29,20 @@ type dialer struct {
|
||||
bastionDialer *dialer
|
||||
}
|
||||
|
||||
type DialersOptions struct {
|
||||
DockerDialerFactory DialerFactory
|
||||
LocalConnDialerFactory DialerFactory
|
||||
K8sWrapTransport k8s.WrapTransport
|
||||
}
|
||||
|
||||
func GetDialerOptions(d, l DialerFactory, w k8s.WrapTransport) DialersOptions {
|
||||
return DialersOptions{
|
||||
DockerDialerFactory: d,
|
||||
LocalConnDialerFactory: l,
|
||||
K8sWrapTransport: w,
|
||||
}
|
||||
}
|
||||
|
||||
func newDialer(h *Host, kind string) (*dialer, error) {
|
||||
// Check for Bastion host connection
|
||||
var bastionDialer *dialer
|
||||
|
106
pki/deploy.go
106
pki/deploy.go
@ -2,12 +2,10 @@ package pki
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rsa"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker/api/types"
|
||||
@ -17,7 +15,6 @@ import (
|
||||
"github.com/rancher/rke/log"
|
||||
"github.com/rancher/types/apis/management.cattle.io/v3"
|
||||
"github.com/sirupsen/logrus"
|
||||
"k8s.io/client-go/util/cert"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -150,106 +147,3 @@ func DeployCertificatesOnHost(ctx context.Context, host *hosts.Host, crtMap map[
|
||||
}
|
||||
return doRunDeployer(ctx, host, env, certDownloaderImage, prsMap)
|
||||
}
|
||||
|
||||
func FetchCertificatesFromHost(ctx context.Context, extraHosts []*hosts.Host, host *hosts.Host, image, localConfigPath string, prsMap map[string]v3.PrivateRegistry) (map[string]CertificatePKI, error) {
|
||||
// rebuilding the certificates. This should look better after refactoring pki
|
||||
tmpCerts := make(map[string]CertificatePKI)
|
||||
|
||||
crtList := map[string]bool{
|
||||
CACertName: false,
|
||||
KubeAPICertName: false,
|
||||
KubeControllerCertName: true,
|
||||
KubeSchedulerCertName: true,
|
||||
KubeProxyCertName: true,
|
||||
KubeNodeCertName: true,
|
||||
KubeAdminCertName: false,
|
||||
RequestHeaderCACertName: false,
|
||||
APIProxyClientCertName: false,
|
||||
ServiceAccountTokenKeyName: false,
|
||||
}
|
||||
|
||||
for _, etcdHost := range extraHosts {
|
||||
// Fetch etcd certificates
|
||||
crtList[GetEtcdCrtName(etcdHost.InternalAddress)] = false
|
||||
}
|
||||
|
||||
for certName, config := range crtList {
|
||||
certificate := CertificatePKI{}
|
||||
crt, err := FetchFileFromHost(ctx, GetCertTempPath(certName), image, host, prsMap, CertFetcherContainer, "certificates")
|
||||
// I will only exit with an error if it's not a not-found-error and this is not an etcd certificate
|
||||
if err != nil && (!strings.HasPrefix(certName, "kube-etcd") &&
|
||||
!strings.Contains(certName, APIProxyClientCertName) &&
|
||||
!strings.Contains(certName, RequestHeaderCACertName) &&
|
||||
!strings.Contains(certName, ServiceAccountTokenKeyName)) {
|
||||
// IsErrNotFound doesn't catch this because it's a custom error
|
||||
if isFileNotFoundErr(err) {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
|
||||
}
|
||||
// If I can't find an etcd or api aggregator cert, I will not fail and will create it later.
|
||||
if crt == "" && (strings.HasPrefix(certName, "kube-etcd") ||
|
||||
strings.Contains(certName, APIProxyClientCertName) ||
|
||||
strings.Contains(certName, RequestHeaderCACertName) ||
|
||||
strings.Contains(certName, ServiceAccountTokenKeyName)) {
|
||||
tmpCerts[certName] = CertificatePKI{}
|
||||
continue
|
||||
}
|
||||
key, err := FetchFileFromHost(ctx, GetKeyTempPath(certName), image, host, prsMap, CertFetcherContainer, "certificate")
|
||||
|
||||
if config {
|
||||
config, err := FetchFileFromHost(ctx, GetConfigTempPath(certName), image, host, prsMap, CertFetcherContainer, "certificate")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
certificate.Config = config
|
||||
}
|
||||
parsedCert, err := cert.ParseCertsPEM([]byte(crt))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
parsedKey, err := cert.ParsePrivateKeyPEM([]byte(key))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
certificate.Certificate = parsedCert[0]
|
||||
certificate.Key = parsedKey.(*rsa.PrivateKey)
|
||||
tmpCerts[certName] = certificate
|
||||
logrus.Debugf("[certificates] Recovered certificate: %s", certName)
|
||||
}
|
||||
|
||||
if err := docker.RemoveContainer(ctx, host.DClient, host.Address, CertFetcherContainer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return populateCertMap(tmpCerts, localConfigPath, extraHosts), nil
|
||||
|
||||
}
|
||||
|
||||
func FetchFileFromHost(ctx context.Context, filePath, image string, host *hosts.Host, prsMap map[string]v3.PrivateRegistry, containerName, state string) (string, error) {
|
||||
|
||||
imageCfg := &container.Config{
|
||||
Image: image,
|
||||
}
|
||||
hostCfg := &container.HostConfig{
|
||||
Binds: []string{
|
||||
fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(host.PrefixPath, "/etc/kubernetes")),
|
||||
},
|
||||
Privileged: true,
|
||||
}
|
||||
isRunning, err := docker.IsContainerRunning(ctx, host.DClient, host.Address, containerName, true)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if !isRunning {
|
||||
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, containerName, host.Address, state, prsMap); err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
file, err := docker.ReadFileFromContainer(ctx, host.DClient, host.Address, containerName, filePath)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return file, nil
|
||||
}
|
||||
|
26
pki/pki.go
26
pki/pki.go
@ -18,18 +18,20 @@ import (
|
||||
)
|
||||
|
||||
type CertificatePKI struct {
|
||||
Certificate *x509.Certificate
|
||||
Key *rsa.PrivateKey
|
||||
Config string
|
||||
Name string
|
||||
CommonName string
|
||||
OUName string
|
||||
EnvName string
|
||||
Path string
|
||||
KeyEnvName string
|
||||
KeyPath string
|
||||
ConfigEnvName string
|
||||
ConfigPath string
|
||||
Certificate *x509.Certificate `json:"-"`
|
||||
Key *rsa.PrivateKey `json:"-"`
|
||||
CertificatePEM string `json:"certificatePEM"`
|
||||
KeyPEM string `json:"keyPEM"`
|
||||
Config string `json:"config"`
|
||||
Name string `json:"name"`
|
||||
CommonName string `json:"commonName"`
|
||||
OUName string `json:"ouName"`
|
||||
EnvName string `json:"envName"`
|
||||
Path string `json:"path"`
|
||||
KeyEnvName string `json:"keyEnvName"`
|
||||
KeyPath string `json:"keyPath"`
|
||||
ConfigEnvName string `json:"configEnvName"`
|
||||
ConfigPath string `json:"configPath"`
|
||||
}
|
||||
|
||||
type GenFunc func(context.Context, map[string]CertificatePKI, v3.RancherKubernetesEngineConfig, string, string) error
|
||||
|
@ -38,7 +38,7 @@ func GenerateKubeAPICertificate(ctx context.Context, certs map[string]Certificat
|
||||
// handle service account tokens in old clusters
|
||||
apiCert := certs[KubeAPICertName]
|
||||
if certs[ServiceAccountTokenKeyName].Key == nil {
|
||||
log.Infof(ctx, "[certificates] Creating service account token key")
|
||||
log.Infof(ctx, "[certificates] Generating Service account token key")
|
||||
certs[ServiceAccountTokenKeyName] = ToCertObject(ServiceAccountTokenKeyName, ServiceAccountTokenKeyName, "", apiCert.Certificate, apiCert.Key)
|
||||
}
|
||||
return nil
|
||||
|
56
pki/util.go
56
pki/util.go
@ -197,7 +197,7 @@ func GetConfigTempPath(name string) string {
|
||||
return fmt.Sprintf("%skubecfg-%s.yaml", TempCertPath, name)
|
||||
}
|
||||
|
||||
func ToCertObject(componentName, commonName, ouName string, cert *x509.Certificate, key *rsa.PrivateKey) CertificatePKI {
|
||||
func ToCertObject(componentName, commonName, ouName string, certificate *x509.Certificate, key *rsa.PrivateKey) CertificatePKI {
|
||||
var config, configPath, configEnvName string
|
||||
if len(commonName) == 0 {
|
||||
commonName = getDefaultCN(componentName)
|
||||
@ -208,6 +208,8 @@ func ToCertObject(componentName, commonName, ouName string, cert *x509.Certifica
|
||||
caCertPath := GetCertPath(CACertName)
|
||||
path := GetCertPath(componentName)
|
||||
keyPath := GetKeyPath(componentName)
|
||||
certificatePEM := string(cert.EncodeCertPEM(certificate))
|
||||
keyPEM := string(cert.EncodePrivateKeyPEM(key))
|
||||
|
||||
if componentName != CACertName && componentName != KubeAPICertName && !strings.Contains(componentName, EtcdCertName) && componentName != ServiceAccountTokenKeyName {
|
||||
config = getKubeConfigX509("https://127.0.0.1:6443", "local", componentName, caCertPath, path, keyPath)
|
||||
@ -216,18 +218,20 @@ func ToCertObject(componentName, commonName, ouName string, cert *x509.Certifica
|
||||
}
|
||||
|
||||
return CertificatePKI{
|
||||
Certificate: cert,
|
||||
Key: key,
|
||||
Config: config,
|
||||
Name: componentName,
|
||||
CommonName: commonName,
|
||||
OUName: ouName,
|
||||
EnvName: envName,
|
||||
KeyEnvName: keyEnvName,
|
||||
ConfigEnvName: configEnvName,
|
||||
Path: path,
|
||||
KeyPath: keyPath,
|
||||
ConfigPath: configPath,
|
||||
Certificate: certificate,
|
||||
Key: key,
|
||||
CertificatePEM: certificatePEM,
|
||||
KeyPEM: keyPEM,
|
||||
Config: config,
|
||||
Name: componentName,
|
||||
CommonName: commonName,
|
||||
OUName: ouName,
|
||||
EnvName: envName,
|
||||
KeyEnvName: keyEnvName,
|
||||
ConfigEnvName: configEnvName,
|
||||
Path: path,
|
||||
KeyPath: keyPath,
|
||||
ConfigPath: configPath,
|
||||
}
|
||||
}
|
||||
|
||||
@ -394,3 +398,29 @@ func deepEqualIPsAltNames(oldIPs, newIPs []net.IP) bool {
|
||||
}
|
||||
return reflect.DeepEqual(oldIPsStrings, newIPsStrings)
|
||||
}
|
||||
|
||||
func TransformPEMToObject(in map[string]CertificatePKI) map[string]CertificatePKI {
|
||||
out := map[string]CertificatePKI{}
|
||||
for k, v := range in {
|
||||
certs, _ := cert.ParseCertsPEM([]byte(v.CertificatePEM))
|
||||
key, _ := cert.ParsePrivateKeyPEM([]byte(v.KeyPEM))
|
||||
o := CertificatePKI{
|
||||
ConfigEnvName: v.ConfigEnvName,
|
||||
Name: v.Name,
|
||||
Config: v.Config,
|
||||
CommonName: v.CommonName,
|
||||
OUName: v.OUName,
|
||||
EnvName: v.EnvName,
|
||||
Path: v.Path,
|
||||
KeyEnvName: v.KeyEnvName,
|
||||
KeyPath: v.KeyPath,
|
||||
ConfigPath: v.ConfigPath,
|
||||
Certificate: certs[0],
|
||||
Key: key.(*rsa.PrivateKey),
|
||||
CertificatePEM: v.CertificatePEM,
|
||||
KeyPEM: v.KeyPEM,
|
||||
}
|
||||
out[k] = o
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
29
vendor/github.com/rancher/types/apis/management.cattle.io/v3/rke_types.go
generated
vendored
29
vendor/github.com/rancher/types/apis/management.cattle.io/v3/rke_types.go
generated
vendored
@ -282,8 +282,6 @@ type IngressConfig struct {
|
||||
type RKEPlan struct {
|
||||
// List of node Plans
|
||||
Nodes []RKEConfigNodePlan `json:"nodes,omitempty"`
|
||||
// Certificates Key Pair
|
||||
CertificatesBundle map[string]CertificatePKI `json:"certificatesBundle,omitempty"`
|
||||
}
|
||||
|
||||
type RKEConfigNodePlan struct {
|
||||
@ -569,30 +567,3 @@ type MonitoringConfig struct {
|
||||
// Metrics server options
|
||||
Options map[string]string `yaml:"options" json:"options,omitempty"`
|
||||
}
|
||||
|
||||
type CertificatePKI struct {
|
||||
// Name of the certificate pki
|
||||
Name string
|
||||
// Certificate in PEM format
|
||||
Certificate string
|
||||
// Key in PEM Format
|
||||
Key string
|
||||
// Kubeconfig file
|
||||
Config string
|
||||
// CommonName in the certificate
|
||||
CommonName string
|
||||
// Organizational Name in the certificate
|
||||
OUName string
|
||||
// Environment name of the certificate
|
||||
EnvName string
|
||||
// Path of the certificate on disk
|
||||
Path string
|
||||
// Environment name of the key
|
||||
KeyEnvName string
|
||||
// Path of the key on disk
|
||||
KeyPath string
|
||||
// Environment name of the kubeconfig
|
||||
ConfigEnvName string
|
||||
// Path of the kubeconfig on disk
|
||||
ConfigPath string
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user