diff --git a/cluster/cloud-provider.go b/cluster/cloud-provider.go index 70cbad79..a582eb58 100644 --- a/cluster/cloud-provider.go +++ b/cluster/cloud-provider.go @@ -3,6 +3,7 @@ package cluster import ( "context" "fmt" + "path" "github.com/docker/docker/api/types/container" "github.com/rancher/rke/docker" @@ -46,7 +47,7 @@ func doDeployConfigFile(ctx context.Context, host *hosts.Host, cloudConfig, alpi } hostCfg := &container.HostConfig{ Binds: []string{ - "/etc/kubernetes:/etc/kubernetes", + fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(host.PrefixPath, "/etc/kubernetes")), }, Privileged: true, } diff --git a/cluster/cluster.go b/cluster/cluster.go index f582d398..18a946c3 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -64,26 +64,30 @@ const ( func (c *Cluster) DeployControlPlane(ctx context.Context) error { // Deploy Etcd Plane - etcdProcessHostMap := c.getEtcdProcessHostMap(nil) + etcdNodePlanMap := make(map[string]v3.RKEConfigNodePlan) + // Build etcd node plan map + for _, etcdHost := range c.EtcdHosts { + etcdNodePlanMap[etcdHost.Address] = BuildRKEConfigNodePlan(ctx, c, etcdHost, etcdHost.DockerInfo) + } + if len(c.Services.Etcd.ExternalURLs) > 0 { log.Infof(ctx, "[etcd] External etcd connection string has been specified, skipping etcd plane") } else { - if err := services.RunEtcdPlane(ctx, c.EtcdHosts, etcdProcessHostMap, c.LocalConnDialerFactory, c.PrivateRegistriesMap, c.UpdateWorkersOnly, c.SystemImages.Alpine); err != nil { + if err := services.RunEtcdPlane(ctx, c.EtcdHosts, etcdNodePlanMap, c.LocalConnDialerFactory, c.PrivateRegistriesMap, c.UpdateWorkersOnly, c.SystemImages.Alpine); err != nil { return fmt.Errorf("[etcd] Failed to bring up Etcd Plane: %v", err) } } // Deploy Control plane - processMap := map[string]v3.Process{ - services.SidekickContainerName: c.BuildSidecarProcess(), - services.KubeAPIContainerName: c.BuildKubeAPIProcess(), - services.KubeControllerContainerName: c.BuildKubeControllerProcess(), - services.SchedulerContainerName: c.BuildSchedulerProcess(), + cpNodePlanMap := make(map[string]v3.RKEConfigNodePlan) + // Build cp node plan map + for _, cpHost := range c.ControlPlaneHosts { + cpNodePlanMap[cpHost.Address] = BuildRKEConfigNodePlan(ctx, c, cpHost, cpHost.DockerInfo) } if err := services.RunControlPlane(ctx, c.ControlPlaneHosts, c.LocalConnDialerFactory, c.PrivateRegistriesMap, - processMap, + cpNodePlanMap, c.UpdateWorkersOnly, c.SystemImages.Alpine); err != nil { return fmt.Errorf("[controlPlane] Failed to bring up Control Plane: %v", err) @@ -93,22 +97,17 @@ func (c *Cluster) DeployControlPlane(ctx context.Context) error { } func (c *Cluster) DeployWorkerPlane(ctx context.Context) error { - // Deploy Worker Plane - processMap := map[string]v3.Process{ - services.SidekickContainerName: c.BuildSidecarProcess(), - services.KubeproxyContainerName: c.BuildKubeProxyProcess(), - services.NginxProxyContainerName: c.BuildProxyProcess(), - } - kubeletProcessHostMap := make(map[*hosts.Host]v3.Process) - for _, host := range hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts) { - kubeletProcessHostMap[host] = c.BuildKubeletProcess(host) + // Deploy Worker plane + workerNodePlanMap := make(map[string]v3.RKEConfigNodePlan) + // Build cp node plan map + for _, workerHost := range c.ControlPlaneHosts { + workerNodePlanMap[workerHost.Address] = BuildRKEConfigNodePlan(ctx, c, workerHost, workerHost.DockerInfo) } allHosts := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts) if err := services.RunWorkerPlane(ctx, allHosts, c.LocalConnDialerFactory, c.PrivateRegistriesMap, - processMap, - kubeletProcessHostMap, + workerNodePlanMap, c.Certificates, c.UpdateWorkersOnly, c.SystemImages.Alpine); err != nil { @@ -350,16 +349,6 @@ func ConfigureCluster( return nil } -func (c *Cluster) getEtcdProcessHostMap(readyEtcdHosts []*hosts.Host) map[*hosts.Host]v3.Process { - etcdProcessHostMap := make(map[*hosts.Host]v3.Process) - for _, host := range c.EtcdHosts { - if !host.ToAddEtcdMember { - etcdProcessHostMap[host] = c.BuildEtcdProcess(host, readyEtcdHosts) - } - } - return etcdProcessHostMap -} - func (c *Cluster) parseCloudConfig(ctx context.Context) (string, error) { // check for azure cloud provider if c.CloudProvider.AzureCloudProvider != nil { diff --git a/cluster/defaults.go b/cluster/defaults.go index 276d105d..7398f70a 100644 --- a/cluster/defaults.go +++ b/cluster/defaults.go @@ -47,7 +47,10 @@ func (c *Cluster) setClusterDefaults(ctx context.Context) { if len(c.SSHKeyPath) == 0 { c.SSHKeyPath = DefaultClusterSSHKeyPath } - + // Default Path prefix + if len(c.PrefixPath) == 0 { + c.PrefixPath = "/" + } for i, host := range c.Nodes { if len(host.InternalAddress) == 0 { c.Nodes[i].InternalAddress = c.Nodes[i].Address diff --git a/cluster/hosts.go b/cluster/hosts.go index 2fc6df91..5637f938 100644 --- a/cluster/hosts.go +++ b/cluster/hosts.go @@ -40,6 +40,7 @@ func (c *Cluster) TunnelHosts(ctx context.Context, local bool) error { log.Warnf(ctx, "Failed to set up SSH tunneling for host [%s]: %v", uniqueHosts[i].Address, err) c.InactiveHosts = append(c.InactiveHosts, uniqueHosts[i]) } + uniqueHosts[i].PrefixPath = c.getPrefixPath(uniqueHosts[i].DockerInfo.OperatingSystem) } for _, host := range c.InactiveHosts { log.Warnf(ctx, "Removing host [%s] from node lists", host.Address) diff --git a/cluster/plan.go b/cluster/plan.go index e1c6dac3..1e3e224f 100644 --- a/cluster/plan.go +++ b/cluster/plan.go @@ -3,11 +3,13 @@ package cluster import ( "context" "fmt" + "path" "strconv" "strings" b64 "encoding/base64" + "github.com/docker/docker/api/types" "github.com/rancher/rke/docker" "github.com/rancher/rke/hosts" "github.com/rancher/rke/k8s" @@ -18,26 +20,31 @@ import ( const ( EtcdPathPrefix = "/registry" + B2DOS = "Boot2Docker" + B2DPrefixPath = "/mnt/sda1/rke" + ROS = "RancherOS" + ROSPrefixPath = "/opt/rke" ) -func GeneratePlan(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig) (v3.RKEPlan, error) { +func GeneratePlan(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, hostsInfoMap map[string]types.Info) (v3.RKEPlan, error) { clusterPlan := v3.RKEPlan{} myCluster, _ := ParseCluster(ctx, rkeConfig, "", "", nil, nil, nil) // rkeConfig.Nodes are already unique. But they don't have role flags. So I will use the parsed cluster.Hosts to make use of the role flags. uniqHosts := hosts.GetUniqueHostList(myCluster.EtcdHosts, myCluster.ControlPlaneHosts, myCluster.WorkerHosts) for _, host := range uniqHosts { - clusterPlan.Nodes = append(clusterPlan.Nodes, BuildRKEConfigNodePlan(ctx, myCluster, host)) + clusterPlan.Nodes = append(clusterPlan.Nodes, BuildRKEConfigNodePlan(ctx, myCluster, host, hostsInfoMap[host.Address])) } return clusterPlan, nil } -func BuildRKEConfigNodePlan(ctx context.Context, myCluster *Cluster, host *hosts.Host) v3.RKEConfigNodePlan { +func BuildRKEConfigNodePlan(ctx context.Context, myCluster *Cluster, host *hosts.Host, hostDockerInfo types.Info) v3.RKEConfigNodePlan { + prefixPath := myCluster.getPrefixPath(hostDockerInfo.OperatingSystem) processes := map[string]v3.Process{} portChecks := []v3.PortCheck{} // Everybody gets a sidecar and a kubelet.. processes[services.SidekickContainerName] = myCluster.BuildSidecarProcess() - processes[services.KubeletContainerName] = myCluster.BuildKubeletProcess(host) - processes[services.KubeproxyContainerName] = myCluster.BuildKubeProxyProcess() + processes[services.KubeletContainerName] = myCluster.BuildKubeletProcess(host, prefixPath) + processes[services.KubeproxyContainerName] = myCluster.BuildKubeProxyProcess(prefixPath) portChecks = append(portChecks, BuildPortChecksFromPortList(host, WorkerPortList, ProtocolTCP)...) // Do we need an nginxProxy for this one ? @@ -45,14 +52,14 @@ func BuildRKEConfigNodePlan(ctx context.Context, myCluster *Cluster, host *hosts processes[services.NginxProxyContainerName] = myCluster.BuildProxyProcess() } if host.IsControl { - processes[services.KubeAPIContainerName] = myCluster.BuildKubeAPIProcess() - processes[services.KubeControllerContainerName] = myCluster.BuildKubeControllerProcess() - processes[services.SchedulerContainerName] = myCluster.BuildSchedulerProcess() + processes[services.KubeAPIContainerName] = myCluster.BuildKubeAPIProcess(prefixPath) + processes[services.KubeControllerContainerName] = myCluster.BuildKubeControllerProcess(prefixPath) + processes[services.SchedulerContainerName] = myCluster.BuildSchedulerProcess(prefixPath) portChecks = append(portChecks, BuildPortChecksFromPortList(host, ControlPlanePortList, ProtocolTCP)...) } if host.IsEtcd { - processes[services.EtcdContainerName] = myCluster.BuildEtcdProcess(host, nil) + processes[services.EtcdContainerName] = myCluster.BuildEtcdProcess(host, nil, prefixPath) portChecks = append(portChecks, BuildPortChecksFromPortList(host, EtcdPortList, ProtocolTCP)...) } @@ -73,7 +80,7 @@ func BuildRKEConfigNodePlan(ctx context.Context, myCluster *Cluster, host *hosts } } -func (c *Cluster) BuildKubeAPIProcess() v3.Process { +func (c *Cluster) BuildKubeAPIProcess(prefixPath string) v3.Process { // check if external etcd is used etcdConnectionString := services.GetEtcdConnString(c.EtcdHosts) etcdPathPrefix := EtcdPathPrefix @@ -142,7 +149,7 @@ func (c *Cluster) BuildKubeAPIProcess() v3.Process { services.SidekickContainerName, } Binds := []string{ - "/etc/kubernetes:/etc/kubernetes:z", + fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(prefixPath, "/etc/kubernetes")), } // Override args if they exist, add additional args @@ -178,7 +185,7 @@ func (c *Cluster) BuildKubeAPIProcess() v3.Process { } } -func (c *Cluster) BuildKubeControllerProcess() v3.Process { +func (c *Cluster) BuildKubeControllerProcess(prefixPath string) v3.Process { Command := []string{ "/opt/rke/entrypoint.sh", "kube-controller-manager", @@ -221,7 +228,7 @@ func (c *Cluster) BuildKubeControllerProcess() v3.Process { services.SidekickContainerName, } Binds := []string{ - "/etc/kubernetes:/etc/kubernetes:z", + fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(prefixPath, "/etc/kubernetes")), } for arg, value := range c.Services.KubeController.ExtraArgs { @@ -256,7 +263,7 @@ func (c *Cluster) BuildKubeControllerProcess() v3.Process { } } -func (c *Cluster) BuildKubeletProcess(host *hosts.Host) v3.Process { +func (c *Cluster) BuildKubeletProcess(host *hosts.Host, prefixPath string) v3.Process { Command := []string{ "/opt/rke/entrypoint.sh", @@ -285,6 +292,7 @@ func (c *Cluster) BuildKubeletProcess(host *hosts.Host) v3.Process { "anonymous-auth": "false", "volume-plugin-dir": "/var/lib/kubelet/volumeplugins", "fail-swap-on": strconv.FormatBool(c.Services.Kubelet.FailSwapOn), + "root-dir": path.Join(prefixPath, "/var/lib/kubelet"), } if host.Address != host.InternalAddress { CommandArgs["node-ip"] = host.InternalAddress @@ -305,20 +313,20 @@ func (c *Cluster) BuildKubeletProcess(host *hosts.Host) v3.Process { services.SidekickContainerName, } Binds := []string{ - "/etc/kubernetes:/etc/kubernetes:z", + fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(prefixPath, "/etc/kubernetes")), "/etc/cni:/etc/cni:ro,z", "/opt/cni:/opt/cni:ro,z", - "/var/lib/cni:/var/lib/cni:z", + fmt.Sprintf("%s:/var/lib/cni:z", path.Join(prefixPath, "/var/lib/cni")), "/etc/resolv.conf:/etc/resolv.conf", "/sys:/sys:rprivate", host.DockerInfo.DockerRootDir + ":" + host.DockerInfo.DockerRootDir + ":rw,rprivate,z", - "/var/lib/kubelet:/var/lib/kubelet:shared,z", + fmt.Sprintf("%s:%s:shared,z", path.Join(prefixPath, "/var/lib/kubelet"), path.Join(prefixPath, "/var/lib/kubelet")), "/var/run:/var/run:rw,rprivate", "/run:/run:rprivate", - "/etc/ceph:/etc/ceph", + fmt.Sprintf("%s:/etc/ceph", path.Join(prefixPath, "/etc/ceph")), "/dev:/host/dev:rprivate", - "/var/log/containers:/var/log/containers:z", - "/var/log/pods:/var/log/pods:z", + fmt.Sprintf("%s:/var/log/containers:z", path.Join(prefixPath, "/var/log/containers")), + fmt.Sprintf("%s:/var/log/pods:z", path.Join(prefixPath, "/var/log/pods")), } for arg, value := range c.Services.Kubelet.ExtraArgs { @@ -354,7 +362,7 @@ func (c *Cluster) BuildKubeletProcess(host *hosts.Host) v3.Process { } } -func (c *Cluster) BuildKubeProxyProcess() v3.Process { +func (c *Cluster) BuildKubeProxyProcess(prefixPath string) v3.Process { Command := []string{ "/opt/rke/entrypoint.sh", "kube-proxy", @@ -378,7 +386,7 @@ func (c *Cluster) BuildKubeProxyProcess() v3.Process { services.SidekickContainerName, } Binds := []string{ - "/etc/kubernetes:/etc/kubernetes:z", + fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(prefixPath, "/etc/kubernetes")), } for arg, value := range c.Services.Kubeproxy.ExtraArgs { @@ -436,7 +444,7 @@ func (c *Cluster) BuildProxyProcess() v3.Process { } } -func (c *Cluster) BuildSchedulerProcess() v3.Process { +func (c *Cluster) BuildSchedulerProcess(prefixPath string) v3.Process { Command := []string{ "/opt/rke/entrypoint.sh", "kube-scheduler", @@ -461,7 +469,7 @@ func (c *Cluster) BuildSchedulerProcess() v3.Process { services.SidekickContainerName, } Binds := []string{ - "/etc/kubernetes:/etc/kubernetes:z", + fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(prefixPath, "/etc/kubernetes")), } for arg, value := range c.Services.Scheduler.ExtraArgs { @@ -505,7 +513,7 @@ func (c *Cluster) BuildSidecarProcess() v3.Process { } } -func (c *Cluster) BuildEtcdProcess(host *hosts.Host, etcdHosts []*hosts.Host) v3.Process { +func (c *Cluster) BuildEtcdProcess(host *hosts.Host, etcdHosts []*hosts.Host, prefixPath string) v3.Process { nodeName := pki.GetEtcdCrtName(host.InternalAddress) initCluster := "" if len(etcdHosts) == 0 { @@ -543,8 +551,8 @@ func (c *Cluster) BuildEtcdProcess(host *hosts.Host, etcdHosts []*hosts.Host) v3 } Binds := []string{ - "/var/lib/etcd:/var/lib/rancher/etcd:z", - "/etc/kubernetes:/etc/kubernetes:z", + fmt.Sprintf("%s:/var/lib/rancher/etcd:z", path.Join(prefixPath, "/var/lib/etcd")), + fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(prefixPath, "/etc/kubernetes")), } for arg, value := range c.Services.Etcd.ExtraArgs { @@ -589,3 +597,15 @@ func BuildPortChecksFromPortList(host *hosts.Host, portList []string, proto stri } return portChecks } + +func (c *Cluster) getPrefixPath(osType string) string { + var prefixPath string + if strings.Contains(osType, B2DOS) { + prefixPath = B2DPrefixPath + } else if strings.Contains(osType, ROS) { + prefixPath = ROSPrefixPath + } else { + prefixPath = c.PrefixPath + } + return prefixPath +} diff --git a/cluster/reconcile.go b/cluster/reconcile.go index f2a15032..5464db35 100644 --- a/cluster/reconcile.go +++ b/cluster/reconcile.go @@ -209,9 +209,13 @@ func reconcileEtcd(ctx context.Context, currentCluster, kubeCluster *Cluster, ku } etcdHost.ToAddEtcdMember = false readyHosts := getReadyEtcdHosts(kubeCluster.EtcdHosts) - etcdProcessHostMap := kubeCluster.getEtcdProcessHostMap(readyHosts) - if err := services.ReloadEtcdCluster(ctx, readyHosts, currentCluster.LocalConnDialerFactory, clientCert, clientkey, currentCluster.PrivateRegistriesMap, etcdProcessHostMap, kubeCluster.SystemImages.Alpine); err != nil { + etcdNodePlanMap := make(map[string]v3.RKEConfigNodePlan) + for _, etcdReadyHost := range readyHosts { + etcdNodePlanMap[etcdReadyHost.Address] = BuildRKEConfigNodePlan(ctx, kubeCluster, etcdReadyHost, etcdReadyHost.DockerInfo) + } + + if err := services.ReloadEtcdCluster(ctx, readyHosts, currentCluster.LocalConnDialerFactory, clientCert, clientkey, currentCluster.PrivateRegistriesMap, etcdNodePlanMap, kubeCluster.SystemImages.Alpine); err != nil { return err } } diff --git a/hosts/hosts.go b/hosts/hosts.go index a5c1c466..c47459b2 100644 --- a/hosts/hosts.go +++ b/hosts/hosts.go @@ -3,6 +3,7 @@ package hosts import ( "context" "fmt" + "path" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" @@ -33,6 +34,7 @@ type Host struct { ToDelTaints []string DockerInfo types.Info UpdateWorker bool + PrefixPath string } const ( @@ -49,15 +51,15 @@ const ( func (h *Host) CleanUpAll(ctx context.Context, cleanerImage string, prsMap map[string]v3.PrivateRegistry, externalEtcd bool) error { log.Infof(ctx, "[hosts] Cleaning up host [%s]", h.Address) toCleanPaths := []string{ - ToCleanSSLDir, + path.Join(h.PrefixPath, ToCleanSSLDir), ToCleanCNIConf, ToCleanCNIBin, ToCleanCalicoRun, - ToCleanTempCertPath, - ToCleanCNILib, + path.Join(h.PrefixPath, ToCleanTempCertPath), + path.Join(h.PrefixPath, ToCleanCNILib), } if !externalEtcd { - toCleanPaths = append(toCleanPaths, ToCleanEtcdDir) + toCleanPaths = append(toCleanPaths, path.Join(h.PrefixPath, ToCleanEtcdDir)) } return h.CleanUp(ctx, toCleanPaths, cleanerImage, prsMap) } @@ -68,11 +70,11 @@ func (h *Host) CleanUpWorkerHost(ctx context.Context, cleanerImage string, prsMa return nil } toCleanPaths := []string{ - ToCleanSSLDir, + path.Join(h.PrefixPath, ToCleanSSLDir), ToCleanCNIConf, ToCleanCNIBin, ToCleanCalicoRun, - ToCleanCNILib, + path.Join(h.PrefixPath, ToCleanCNILib), } return h.CleanUp(ctx, toCleanPaths, cleanerImage, prsMap) } @@ -83,24 +85,24 @@ func (h *Host) CleanUpControlHost(ctx context.Context, cleanerImage string, prsM return nil } toCleanPaths := []string{ - ToCleanSSLDir, + path.Join(h.PrefixPath, ToCleanSSLDir), ToCleanCNIConf, ToCleanCNIBin, ToCleanCalicoRun, - ToCleanCNILib, + path.Join(h.PrefixPath, ToCleanCNILib), } return h.CleanUp(ctx, toCleanPaths, cleanerImage, prsMap) } func (h *Host) CleanUpEtcdHost(ctx context.Context, cleanerImage string, prsMap map[string]v3.PrivateRegistry) error { toCleanPaths := []string{ - ToCleanEtcdDir, - ToCleanSSLDir, + path.Join(h.PrefixPath, ToCleanEtcdDir), + path.Join(h.PrefixPath, ToCleanSSLDir), } if h.IsWorker || h.IsControl { log.Infof(ctx, "[hosts] Host [%s] is already a worker or control host, skipping cleanup certs.", h.Address) toCleanPaths = []string{ - ToCleanEtcdDir, + path.Join(h.PrefixPath, ToCleanEtcdDir), } } return h.CleanUp(ctx, toCleanPaths, cleanerImage, prsMap) diff --git a/pki/deploy.go b/pki/deploy.go index 1fb41486..41e256a2 100644 --- a/pki/deploy.go +++ b/pki/deploy.go @@ -50,7 +50,7 @@ func doRunDeployer(ctx context.Context, host *hosts.Host, containerEnv []string, } hostCfg := &container.HostConfig{ Binds: []string{ - "/etc/kubernetes:/etc/kubernetes", + fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(host.PrefixPath, "/etc/kubernetes")), }, Privileged: true, } @@ -178,7 +178,7 @@ func fetchFileFromHost(ctx context.Context, filePath, image string, host *hosts. } hostCfg := &container.HostConfig{ Binds: []string{ - "/etc/kubernetes:/etc/kubernetes", + fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(host.PrefixPath, "/etc/kubernetes")), }, Privileged: true, } diff --git a/services/controlplane.go b/services/controlplane.go index 52340d6b..238c14c1 100644 --- a/services/controlplane.go +++ b/services/controlplane.go @@ -9,7 +9,7 @@ import ( "golang.org/x/sync/errgroup" ) -func RunControlPlane(ctx context.Context, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, processMap map[string]v3.Process, updateWorkersOnly bool, alpineImage string) error { +func RunControlPlane(ctx context.Context, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string) error { log.Infof(ctx, "[%s] Building up Controller Plane..", ControlRole) var errgrp errgroup.Group for _, host := range controlHosts { @@ -18,7 +18,7 @@ func RunControlPlane(ctx context.Context, controlHosts []*hosts.Host, localConnD continue } errgrp.Go(func() error { - return doDeployControlHost(ctx, runHost, localConnDialerFactory, prsMap, processMap, alpineImage) + return doDeployControlHost(ctx, runHost, localConnDialerFactory, prsMap, cpNodePlanMap[host.Address].Processes, alpineImage) }) } if err := errgrp.Wait(); err != nil { diff --git a/services/etcd.go b/services/etcd.go index 7b42ff9f..d2aa09f2 100644 --- a/services/etcd.go +++ b/services/etcd.go @@ -20,13 +20,14 @@ const ( EtcdHealthCheckURL = "https://127.0.0.1:2379/health" ) -func RunEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, etcdProcessHostMap map[*hosts.Host]v3.Process, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, updateWorkersOnly bool, alpineImage string) error { +func RunEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, etcdNodePlanMap map[string]v3.RKEConfigNodePlan, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, updateWorkersOnly bool, alpineImage string) error { log.Infof(ctx, "[%s] Building up etcd plane..", ETCDRole) for _, host := range etcdHosts { if updateWorkersOnly { continue } - imageCfg, hostCfg, _ := GetProcessConfig(etcdProcessHostMap[host]) + etcdProcess := etcdNodePlanMap[host.Address].Processes[EtcdContainerName] + imageCfg, hostCfg, _ := GetProcessConfig(etcdProcess) if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, EtcdContainerName, host.Address, ETCDRole, prsMap); err != nil { return err } @@ -130,20 +131,20 @@ func RemoveEtcdMember(ctx context.Context, etcdHost *hosts.Host, etcdHosts []*ho return nil } -func ReloadEtcdCluster(ctx context.Context, readyEtcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte, prsMap map[string]v3.PrivateRegistry, etcdProcessHostMap map[*hosts.Host]v3.Process, alpineImage string) error { - for host, process := range etcdProcessHostMap { - imageCfg, hostCfg, _ := GetProcessConfig(process) - if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, EtcdContainerName, host.Address, ETCDRole, prsMap); err != nil { +func ReloadEtcdCluster(ctx context.Context, readyEtcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte, prsMap map[string]v3.PrivateRegistry, etcdNodePlanMap map[string]v3.RKEConfigNodePlan, alpineImage string) error { + for _, etcdHost := range readyEtcdHosts { + imageCfg, hostCfg, _ := GetProcessConfig(etcdNodePlanMap[etcdHost.Address].Processes[EtcdContainerName]) + if err := docker.DoRunContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdContainerName, etcdHost.Address, ETCDRole, prsMap); err != nil { return err } - if err := createLogLink(ctx, host, EtcdContainerName, ETCDRole, alpineImage, prsMap); err != nil { + if err := createLogLink(ctx, etcdHost, EtcdContainerName, ETCDRole, alpineImage, prsMap); err != nil { return err } } time.Sleep(10 * time.Second) var healthy bool for _, host := range readyEtcdHosts { - _, _, healthCheckURL := GetProcessConfig(etcdProcessHostMap[host]) + _, _, healthCheckURL := GetProcessConfig(etcdNodePlanMap[host.Address].Processes[EtcdContainerName]) if healthy = isEtcdHealthy(ctx, localConnDialerFactory, host, cert, key, healthCheckURL); healthy { break } diff --git a/services/services.go b/services/services.go index b8bf177b..869f3683 100644 --- a/services/services.go +++ b/services/services.go @@ -3,6 +3,7 @@ package services import ( "context" "fmt" + "path" "github.com/docker/docker/api/types/container" "github.com/rancher/rke/docker" @@ -114,7 +115,7 @@ func createLogLink(ctx context.Context, host *hosts.Host, containerName, plane, } hostCfg := &container.HostConfig{ Binds: []string{ - "/var/lib:/var/lib", + fmt.Sprintf("%s:/var/lib", path.Join(host.PrefixPath, "/var/lib")), }, Privileged: true, } diff --git a/services/workerplane.go b/services/workerplane.go index fa4705fe..de6792fd 100644 --- a/services/workerplane.go +++ b/services/workerplane.go @@ -14,7 +14,7 @@ const ( unschedulableEtcdTaint = "node-role.kubernetes.io/etcd=true:NoExecute" ) -func RunWorkerPlane(ctx context.Context, allHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, processMap map[string]v3.Process, kubeletProcessHostMap map[*hosts.Host]v3.Process, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string) error { +func RunWorkerPlane(ctx context.Context, allHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string) error { log.Infof(ctx, "[%s] Building up Worker Plane..", WorkerRole) var errgrp errgroup.Group for _, host := range allHosts { @@ -29,9 +29,8 @@ func RunWorkerPlane(ctx context.Context, allHosts []*hosts.Host, localConnDialer } runHost := host // maps are not thread safe - hostProcessMap := copyProcessMap(processMap) + hostProcessMap := copyProcessMap(workerNodePlanMap[runHost.Address].Processes) errgrp.Go(func() error { - hostProcessMap[KubeletContainerName] = kubeletProcessHostMap[runHost] return doDeployWorkerPlane(ctx, runHost, localConnDialerFactory, prsMap, hostProcessMap, certMap, alpineImage) }) }