1
0
mirror of https://github.com/rancher/rke.git synced 2025-08-22 08:35:49 +00:00

Merge pull request #497 from galal-hussein/hostpath_prefix

Add prefix path to services and generate plan
This commit is contained in:
Alena Prokharchyk 2018-04-11 16:11:12 -07:00 committed by GitHub
commit 7e44bc6283
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 110 additions and 89 deletions

View File

@ -3,6 +3,7 @@ package cluster
import ( import (
"context" "context"
"fmt" "fmt"
"path"
"github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/container"
"github.com/rancher/rke/docker" "github.com/rancher/rke/docker"
@ -46,7 +47,7 @@ func doDeployConfigFile(ctx context.Context, host *hosts.Host, cloudConfig, alpi
} }
hostCfg := &container.HostConfig{ hostCfg := &container.HostConfig{
Binds: []string{ Binds: []string{
"/etc/kubernetes:/etc/kubernetes", fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(host.PrefixPath, "/etc/kubernetes")),
}, },
Privileged: true, Privileged: true,
} }

View File

@ -64,26 +64,30 @@ const (
func (c *Cluster) DeployControlPlane(ctx context.Context) error { func (c *Cluster) DeployControlPlane(ctx context.Context) error {
// Deploy Etcd Plane // Deploy Etcd Plane
etcdProcessHostMap := c.getEtcdProcessHostMap(nil) etcdNodePlanMap := make(map[string]v3.RKEConfigNodePlan)
// Build etcd node plan map
for _, etcdHost := range c.EtcdHosts {
etcdNodePlanMap[etcdHost.Address] = BuildRKEConfigNodePlan(ctx, c, etcdHost, etcdHost.DockerInfo)
}
if len(c.Services.Etcd.ExternalURLs) > 0 { if len(c.Services.Etcd.ExternalURLs) > 0 {
log.Infof(ctx, "[etcd] External etcd connection string has been specified, skipping etcd plane") log.Infof(ctx, "[etcd] External etcd connection string has been specified, skipping etcd plane")
} else { } else {
if err := services.RunEtcdPlane(ctx, c.EtcdHosts, etcdProcessHostMap, c.LocalConnDialerFactory, c.PrivateRegistriesMap, c.UpdateWorkersOnly, c.SystemImages.Alpine); err != nil { if err := services.RunEtcdPlane(ctx, c.EtcdHosts, etcdNodePlanMap, c.LocalConnDialerFactory, c.PrivateRegistriesMap, c.UpdateWorkersOnly, c.SystemImages.Alpine); err != nil {
return fmt.Errorf("[etcd] Failed to bring up Etcd Plane: %v", err) return fmt.Errorf("[etcd] Failed to bring up Etcd Plane: %v", err)
} }
} }
// Deploy Control plane // Deploy Control plane
processMap := map[string]v3.Process{ cpNodePlanMap := make(map[string]v3.RKEConfigNodePlan)
services.SidekickContainerName: c.BuildSidecarProcess(), // Build cp node plan map
services.KubeAPIContainerName: c.BuildKubeAPIProcess(), for _, cpHost := range c.ControlPlaneHosts {
services.KubeControllerContainerName: c.BuildKubeControllerProcess(), cpNodePlanMap[cpHost.Address] = BuildRKEConfigNodePlan(ctx, c, cpHost, cpHost.DockerInfo)
services.SchedulerContainerName: c.BuildSchedulerProcess(),
} }
if err := services.RunControlPlane(ctx, c.ControlPlaneHosts, if err := services.RunControlPlane(ctx, c.ControlPlaneHosts,
c.LocalConnDialerFactory, c.LocalConnDialerFactory,
c.PrivateRegistriesMap, c.PrivateRegistriesMap,
processMap, cpNodePlanMap,
c.UpdateWorkersOnly, c.UpdateWorkersOnly,
c.SystemImages.Alpine); err != nil { c.SystemImages.Alpine); err != nil {
return fmt.Errorf("[controlPlane] Failed to bring up Control Plane: %v", err) return fmt.Errorf("[controlPlane] Failed to bring up Control Plane: %v", err)
@ -93,22 +97,17 @@ func (c *Cluster) DeployControlPlane(ctx context.Context) error {
} }
func (c *Cluster) DeployWorkerPlane(ctx context.Context) error { func (c *Cluster) DeployWorkerPlane(ctx context.Context) error {
// Deploy Worker Plane // Deploy Worker plane
processMap := map[string]v3.Process{ workerNodePlanMap := make(map[string]v3.RKEConfigNodePlan)
services.SidekickContainerName: c.BuildSidecarProcess(), // Build cp node plan map
services.KubeproxyContainerName: c.BuildKubeProxyProcess(), for _, workerHost := range c.ControlPlaneHosts {
services.NginxProxyContainerName: c.BuildProxyProcess(), workerNodePlanMap[workerHost.Address] = BuildRKEConfigNodePlan(ctx, c, workerHost, workerHost.DockerInfo)
}
kubeletProcessHostMap := make(map[*hosts.Host]v3.Process)
for _, host := range hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts) {
kubeletProcessHostMap[host] = c.BuildKubeletProcess(host)
} }
allHosts := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts) allHosts := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts)
if err := services.RunWorkerPlane(ctx, allHosts, if err := services.RunWorkerPlane(ctx, allHosts,
c.LocalConnDialerFactory, c.LocalConnDialerFactory,
c.PrivateRegistriesMap, c.PrivateRegistriesMap,
processMap, workerNodePlanMap,
kubeletProcessHostMap,
c.Certificates, c.Certificates,
c.UpdateWorkersOnly, c.UpdateWorkersOnly,
c.SystemImages.Alpine); err != nil { c.SystemImages.Alpine); err != nil {
@ -350,16 +349,6 @@ func ConfigureCluster(
return nil return nil
} }
func (c *Cluster) getEtcdProcessHostMap(readyEtcdHosts []*hosts.Host) map[*hosts.Host]v3.Process {
etcdProcessHostMap := make(map[*hosts.Host]v3.Process)
for _, host := range c.EtcdHosts {
if !host.ToAddEtcdMember {
etcdProcessHostMap[host] = c.BuildEtcdProcess(host, readyEtcdHosts)
}
}
return etcdProcessHostMap
}
func (c *Cluster) parseCloudConfig(ctx context.Context) (string, error) { func (c *Cluster) parseCloudConfig(ctx context.Context) (string, error) {
// check for azure cloud provider // check for azure cloud provider
if c.CloudProvider.AzureCloudProvider != nil { if c.CloudProvider.AzureCloudProvider != nil {

View File

@ -47,7 +47,10 @@ func (c *Cluster) setClusterDefaults(ctx context.Context) {
if len(c.SSHKeyPath) == 0 { if len(c.SSHKeyPath) == 0 {
c.SSHKeyPath = DefaultClusterSSHKeyPath c.SSHKeyPath = DefaultClusterSSHKeyPath
} }
// Default Path prefix
if len(c.PrefixPath) == 0 {
c.PrefixPath = "/"
}
for i, host := range c.Nodes { for i, host := range c.Nodes {
if len(host.InternalAddress) == 0 { if len(host.InternalAddress) == 0 {
c.Nodes[i].InternalAddress = c.Nodes[i].Address c.Nodes[i].InternalAddress = c.Nodes[i].Address

View File

@ -40,6 +40,7 @@ func (c *Cluster) TunnelHosts(ctx context.Context, local bool) error {
log.Warnf(ctx, "Failed to set up SSH tunneling for host [%s]: %v", uniqueHosts[i].Address, err) log.Warnf(ctx, "Failed to set up SSH tunneling for host [%s]: %v", uniqueHosts[i].Address, err)
c.InactiveHosts = append(c.InactiveHosts, uniqueHosts[i]) c.InactiveHosts = append(c.InactiveHosts, uniqueHosts[i])
} }
uniqueHosts[i].PrefixPath = c.getPrefixPath(uniqueHosts[i].DockerInfo.OperatingSystem)
} }
for _, host := range c.InactiveHosts { for _, host := range c.InactiveHosts {
log.Warnf(ctx, "Removing host [%s] from node lists", host.Address) log.Warnf(ctx, "Removing host [%s] from node lists", host.Address)

View File

@ -3,11 +3,13 @@ package cluster
import ( import (
"context" "context"
"fmt" "fmt"
"path"
"strconv" "strconv"
"strings" "strings"
b64 "encoding/base64" b64 "encoding/base64"
"github.com/docker/docker/api/types"
"github.com/rancher/rke/docker" "github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts" "github.com/rancher/rke/hosts"
"github.com/rancher/rke/k8s" "github.com/rancher/rke/k8s"
@ -18,26 +20,31 @@ import (
const ( const (
EtcdPathPrefix = "/registry" EtcdPathPrefix = "/registry"
B2DOS = "Boot2Docker"
B2DPrefixPath = "/mnt/sda1/rke"
ROS = "RancherOS"
ROSPrefixPath = "/opt/rke"
) )
func GeneratePlan(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig) (v3.RKEPlan, error) { func GeneratePlan(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, hostsInfoMap map[string]types.Info) (v3.RKEPlan, error) {
clusterPlan := v3.RKEPlan{} clusterPlan := v3.RKEPlan{}
myCluster, _ := ParseCluster(ctx, rkeConfig, "", "", nil, nil, nil) myCluster, _ := ParseCluster(ctx, rkeConfig, "", "", nil, nil, nil)
// rkeConfig.Nodes are already unique. But they don't have role flags. So I will use the parsed cluster.Hosts to make use of the role flags. // rkeConfig.Nodes are already unique. But they don't have role flags. So I will use the parsed cluster.Hosts to make use of the role flags.
uniqHosts := hosts.GetUniqueHostList(myCluster.EtcdHosts, myCluster.ControlPlaneHosts, myCluster.WorkerHosts) uniqHosts := hosts.GetUniqueHostList(myCluster.EtcdHosts, myCluster.ControlPlaneHosts, myCluster.WorkerHosts)
for _, host := range uniqHosts { for _, host := range uniqHosts {
clusterPlan.Nodes = append(clusterPlan.Nodes, BuildRKEConfigNodePlan(ctx, myCluster, host)) clusterPlan.Nodes = append(clusterPlan.Nodes, BuildRKEConfigNodePlan(ctx, myCluster, host, hostsInfoMap[host.Address]))
} }
return clusterPlan, nil return clusterPlan, nil
} }
func BuildRKEConfigNodePlan(ctx context.Context, myCluster *Cluster, host *hosts.Host) v3.RKEConfigNodePlan { func BuildRKEConfigNodePlan(ctx context.Context, myCluster *Cluster, host *hosts.Host, hostDockerInfo types.Info) v3.RKEConfigNodePlan {
prefixPath := myCluster.getPrefixPath(hostDockerInfo.OperatingSystem)
processes := map[string]v3.Process{} processes := map[string]v3.Process{}
portChecks := []v3.PortCheck{} portChecks := []v3.PortCheck{}
// Everybody gets a sidecar and a kubelet.. // Everybody gets a sidecar and a kubelet..
processes[services.SidekickContainerName] = myCluster.BuildSidecarProcess() processes[services.SidekickContainerName] = myCluster.BuildSidecarProcess()
processes[services.KubeletContainerName] = myCluster.BuildKubeletProcess(host) processes[services.KubeletContainerName] = myCluster.BuildKubeletProcess(host, prefixPath)
processes[services.KubeproxyContainerName] = myCluster.BuildKubeProxyProcess() processes[services.KubeproxyContainerName] = myCluster.BuildKubeProxyProcess(prefixPath)
portChecks = append(portChecks, BuildPortChecksFromPortList(host, WorkerPortList, ProtocolTCP)...) portChecks = append(portChecks, BuildPortChecksFromPortList(host, WorkerPortList, ProtocolTCP)...)
// Do we need an nginxProxy for this one ? // Do we need an nginxProxy for this one ?
@ -45,14 +52,14 @@ func BuildRKEConfigNodePlan(ctx context.Context, myCluster *Cluster, host *hosts
processes[services.NginxProxyContainerName] = myCluster.BuildProxyProcess() processes[services.NginxProxyContainerName] = myCluster.BuildProxyProcess()
} }
if host.IsControl { if host.IsControl {
processes[services.KubeAPIContainerName] = myCluster.BuildKubeAPIProcess() processes[services.KubeAPIContainerName] = myCluster.BuildKubeAPIProcess(prefixPath)
processes[services.KubeControllerContainerName] = myCluster.BuildKubeControllerProcess() processes[services.KubeControllerContainerName] = myCluster.BuildKubeControllerProcess(prefixPath)
processes[services.SchedulerContainerName] = myCluster.BuildSchedulerProcess() processes[services.SchedulerContainerName] = myCluster.BuildSchedulerProcess(prefixPath)
portChecks = append(portChecks, BuildPortChecksFromPortList(host, ControlPlanePortList, ProtocolTCP)...) portChecks = append(portChecks, BuildPortChecksFromPortList(host, ControlPlanePortList, ProtocolTCP)...)
} }
if host.IsEtcd { if host.IsEtcd {
processes[services.EtcdContainerName] = myCluster.BuildEtcdProcess(host, nil) processes[services.EtcdContainerName] = myCluster.BuildEtcdProcess(host, nil, prefixPath)
portChecks = append(portChecks, BuildPortChecksFromPortList(host, EtcdPortList, ProtocolTCP)...) portChecks = append(portChecks, BuildPortChecksFromPortList(host, EtcdPortList, ProtocolTCP)...)
} }
@ -73,7 +80,7 @@ func BuildRKEConfigNodePlan(ctx context.Context, myCluster *Cluster, host *hosts
} }
} }
func (c *Cluster) BuildKubeAPIProcess() v3.Process { func (c *Cluster) BuildKubeAPIProcess(prefixPath string) v3.Process {
// check if external etcd is used // check if external etcd is used
etcdConnectionString := services.GetEtcdConnString(c.EtcdHosts) etcdConnectionString := services.GetEtcdConnString(c.EtcdHosts)
etcdPathPrefix := EtcdPathPrefix etcdPathPrefix := EtcdPathPrefix
@ -142,7 +149,7 @@ func (c *Cluster) BuildKubeAPIProcess() v3.Process {
services.SidekickContainerName, services.SidekickContainerName,
} }
Binds := []string{ Binds := []string{
"/etc/kubernetes:/etc/kubernetes:z", fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(prefixPath, "/etc/kubernetes")),
} }
// Override args if they exist, add additional args // Override args if they exist, add additional args
@ -178,7 +185,7 @@ func (c *Cluster) BuildKubeAPIProcess() v3.Process {
} }
} }
func (c *Cluster) BuildKubeControllerProcess() v3.Process { func (c *Cluster) BuildKubeControllerProcess(prefixPath string) v3.Process {
Command := []string{ Command := []string{
"/opt/rke/entrypoint.sh", "/opt/rke/entrypoint.sh",
"kube-controller-manager", "kube-controller-manager",
@ -221,7 +228,7 @@ func (c *Cluster) BuildKubeControllerProcess() v3.Process {
services.SidekickContainerName, services.SidekickContainerName,
} }
Binds := []string{ Binds := []string{
"/etc/kubernetes:/etc/kubernetes:z", fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(prefixPath, "/etc/kubernetes")),
} }
for arg, value := range c.Services.KubeController.ExtraArgs { for arg, value := range c.Services.KubeController.ExtraArgs {
@ -256,7 +263,7 @@ func (c *Cluster) BuildKubeControllerProcess() v3.Process {
} }
} }
func (c *Cluster) BuildKubeletProcess(host *hosts.Host) v3.Process { func (c *Cluster) BuildKubeletProcess(host *hosts.Host, prefixPath string) v3.Process {
Command := []string{ Command := []string{
"/opt/rke/entrypoint.sh", "/opt/rke/entrypoint.sh",
@ -285,6 +292,7 @@ func (c *Cluster) BuildKubeletProcess(host *hosts.Host) v3.Process {
"anonymous-auth": "false", "anonymous-auth": "false",
"volume-plugin-dir": "/var/lib/kubelet/volumeplugins", "volume-plugin-dir": "/var/lib/kubelet/volumeplugins",
"fail-swap-on": strconv.FormatBool(c.Services.Kubelet.FailSwapOn), "fail-swap-on": strconv.FormatBool(c.Services.Kubelet.FailSwapOn),
"root-dir": path.Join(prefixPath, "/var/lib/kubelet"),
} }
if host.Address != host.InternalAddress { if host.Address != host.InternalAddress {
CommandArgs["node-ip"] = host.InternalAddress CommandArgs["node-ip"] = host.InternalAddress
@ -305,20 +313,20 @@ func (c *Cluster) BuildKubeletProcess(host *hosts.Host) v3.Process {
services.SidekickContainerName, services.SidekickContainerName,
} }
Binds := []string{ Binds := []string{
"/etc/kubernetes:/etc/kubernetes:z", fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(prefixPath, "/etc/kubernetes")),
"/etc/cni:/etc/cni:ro,z", "/etc/cni:/etc/cni:ro,z",
"/opt/cni:/opt/cni:ro,z", "/opt/cni:/opt/cni:ro,z",
"/var/lib/cni:/var/lib/cni:z", fmt.Sprintf("%s:/var/lib/cni:z", path.Join(prefixPath, "/var/lib/cni")),
"/etc/resolv.conf:/etc/resolv.conf", "/etc/resolv.conf:/etc/resolv.conf",
"/sys:/sys:rprivate", "/sys:/sys:rprivate",
host.DockerInfo.DockerRootDir + ":" + host.DockerInfo.DockerRootDir + ":rw,rprivate,z", host.DockerInfo.DockerRootDir + ":" + host.DockerInfo.DockerRootDir + ":rw,rprivate,z",
"/var/lib/kubelet:/var/lib/kubelet:shared,z", fmt.Sprintf("%s:%s:shared,z", path.Join(prefixPath, "/var/lib/kubelet"), path.Join(prefixPath, "/var/lib/kubelet")),
"/var/run:/var/run:rw,rprivate", "/var/run:/var/run:rw,rprivate",
"/run:/run:rprivate", "/run:/run:rprivate",
"/etc/ceph:/etc/ceph", fmt.Sprintf("%s:/etc/ceph", path.Join(prefixPath, "/etc/ceph")),
"/dev:/host/dev:rprivate", "/dev:/host/dev:rprivate",
"/var/log/containers:/var/log/containers:z", fmt.Sprintf("%s:/var/log/containers:z", path.Join(prefixPath, "/var/log/containers")),
"/var/log/pods:/var/log/pods:z", fmt.Sprintf("%s:/var/log/pods:z", path.Join(prefixPath, "/var/log/pods")),
} }
for arg, value := range c.Services.Kubelet.ExtraArgs { for arg, value := range c.Services.Kubelet.ExtraArgs {
@ -354,7 +362,7 @@ func (c *Cluster) BuildKubeletProcess(host *hosts.Host) v3.Process {
} }
} }
func (c *Cluster) BuildKubeProxyProcess() v3.Process { func (c *Cluster) BuildKubeProxyProcess(prefixPath string) v3.Process {
Command := []string{ Command := []string{
"/opt/rke/entrypoint.sh", "/opt/rke/entrypoint.sh",
"kube-proxy", "kube-proxy",
@ -378,7 +386,7 @@ func (c *Cluster) BuildKubeProxyProcess() v3.Process {
services.SidekickContainerName, services.SidekickContainerName,
} }
Binds := []string{ Binds := []string{
"/etc/kubernetes:/etc/kubernetes:z", fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(prefixPath, "/etc/kubernetes")),
} }
for arg, value := range c.Services.Kubeproxy.ExtraArgs { for arg, value := range c.Services.Kubeproxy.ExtraArgs {
@ -436,7 +444,7 @@ func (c *Cluster) BuildProxyProcess() v3.Process {
} }
} }
func (c *Cluster) BuildSchedulerProcess() v3.Process { func (c *Cluster) BuildSchedulerProcess(prefixPath string) v3.Process {
Command := []string{ Command := []string{
"/opt/rke/entrypoint.sh", "/opt/rke/entrypoint.sh",
"kube-scheduler", "kube-scheduler",
@ -461,7 +469,7 @@ func (c *Cluster) BuildSchedulerProcess() v3.Process {
services.SidekickContainerName, services.SidekickContainerName,
} }
Binds := []string{ Binds := []string{
"/etc/kubernetes:/etc/kubernetes:z", fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(prefixPath, "/etc/kubernetes")),
} }
for arg, value := range c.Services.Scheduler.ExtraArgs { for arg, value := range c.Services.Scheduler.ExtraArgs {
@ -505,7 +513,7 @@ func (c *Cluster) BuildSidecarProcess() v3.Process {
} }
} }
func (c *Cluster) BuildEtcdProcess(host *hosts.Host, etcdHosts []*hosts.Host) v3.Process { func (c *Cluster) BuildEtcdProcess(host *hosts.Host, etcdHosts []*hosts.Host, prefixPath string) v3.Process {
nodeName := pki.GetEtcdCrtName(host.InternalAddress) nodeName := pki.GetEtcdCrtName(host.InternalAddress)
initCluster := "" initCluster := ""
if len(etcdHosts) == 0 { if len(etcdHosts) == 0 {
@ -543,8 +551,8 @@ func (c *Cluster) BuildEtcdProcess(host *hosts.Host, etcdHosts []*hosts.Host) v3
} }
Binds := []string{ Binds := []string{
"/var/lib/etcd:/var/lib/rancher/etcd:z", fmt.Sprintf("%s:/var/lib/rancher/etcd:z", path.Join(prefixPath, "/var/lib/etcd")),
"/etc/kubernetes:/etc/kubernetes:z", fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(prefixPath, "/etc/kubernetes")),
} }
for arg, value := range c.Services.Etcd.ExtraArgs { for arg, value := range c.Services.Etcd.ExtraArgs {
@ -589,3 +597,15 @@ func BuildPortChecksFromPortList(host *hosts.Host, portList []string, proto stri
} }
return portChecks return portChecks
} }
func (c *Cluster) getPrefixPath(osType string) string {
var prefixPath string
if strings.Contains(osType, B2DOS) {
prefixPath = B2DPrefixPath
} else if strings.Contains(osType, ROS) {
prefixPath = ROSPrefixPath
} else {
prefixPath = c.PrefixPath
}
return prefixPath
}

View File

@ -209,9 +209,13 @@ func reconcileEtcd(ctx context.Context, currentCluster, kubeCluster *Cluster, ku
} }
etcdHost.ToAddEtcdMember = false etcdHost.ToAddEtcdMember = false
readyHosts := getReadyEtcdHosts(kubeCluster.EtcdHosts) readyHosts := getReadyEtcdHosts(kubeCluster.EtcdHosts)
etcdProcessHostMap := kubeCluster.getEtcdProcessHostMap(readyHosts)
if err := services.ReloadEtcdCluster(ctx, readyHosts, currentCluster.LocalConnDialerFactory, clientCert, clientkey, currentCluster.PrivateRegistriesMap, etcdProcessHostMap, kubeCluster.SystemImages.Alpine); err != nil { etcdNodePlanMap := make(map[string]v3.RKEConfigNodePlan)
for _, etcdReadyHost := range readyHosts {
etcdNodePlanMap[etcdReadyHost.Address] = BuildRKEConfigNodePlan(ctx, kubeCluster, etcdReadyHost, etcdReadyHost.DockerInfo)
}
if err := services.ReloadEtcdCluster(ctx, readyHosts, currentCluster.LocalConnDialerFactory, clientCert, clientkey, currentCluster.PrivateRegistriesMap, etcdNodePlanMap, kubeCluster.SystemImages.Alpine); err != nil {
return err return err
} }
} }

View File

@ -3,6 +3,7 @@ package hosts
import ( import (
"context" "context"
"fmt" "fmt"
"path"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/container"
@ -33,6 +34,7 @@ type Host struct {
ToDelTaints []string ToDelTaints []string
DockerInfo types.Info DockerInfo types.Info
UpdateWorker bool UpdateWorker bool
PrefixPath string
} }
const ( const (
@ -49,15 +51,15 @@ const (
func (h *Host) CleanUpAll(ctx context.Context, cleanerImage string, prsMap map[string]v3.PrivateRegistry, externalEtcd bool) error { func (h *Host) CleanUpAll(ctx context.Context, cleanerImage string, prsMap map[string]v3.PrivateRegistry, externalEtcd bool) error {
log.Infof(ctx, "[hosts] Cleaning up host [%s]", h.Address) log.Infof(ctx, "[hosts] Cleaning up host [%s]", h.Address)
toCleanPaths := []string{ toCleanPaths := []string{
ToCleanSSLDir, path.Join(h.PrefixPath, ToCleanSSLDir),
ToCleanCNIConf, ToCleanCNIConf,
ToCleanCNIBin, ToCleanCNIBin,
ToCleanCalicoRun, ToCleanCalicoRun,
ToCleanTempCertPath, path.Join(h.PrefixPath, ToCleanTempCertPath),
ToCleanCNILib, path.Join(h.PrefixPath, ToCleanCNILib),
} }
if !externalEtcd { if !externalEtcd {
toCleanPaths = append(toCleanPaths, ToCleanEtcdDir) toCleanPaths = append(toCleanPaths, path.Join(h.PrefixPath, ToCleanEtcdDir))
} }
return h.CleanUp(ctx, toCleanPaths, cleanerImage, prsMap) return h.CleanUp(ctx, toCleanPaths, cleanerImage, prsMap)
} }
@ -68,11 +70,11 @@ func (h *Host) CleanUpWorkerHost(ctx context.Context, cleanerImage string, prsMa
return nil return nil
} }
toCleanPaths := []string{ toCleanPaths := []string{
ToCleanSSLDir, path.Join(h.PrefixPath, ToCleanSSLDir),
ToCleanCNIConf, ToCleanCNIConf,
ToCleanCNIBin, ToCleanCNIBin,
ToCleanCalicoRun, ToCleanCalicoRun,
ToCleanCNILib, path.Join(h.PrefixPath, ToCleanCNILib),
} }
return h.CleanUp(ctx, toCleanPaths, cleanerImage, prsMap) return h.CleanUp(ctx, toCleanPaths, cleanerImage, prsMap)
} }
@ -83,24 +85,24 @@ func (h *Host) CleanUpControlHost(ctx context.Context, cleanerImage string, prsM
return nil return nil
} }
toCleanPaths := []string{ toCleanPaths := []string{
ToCleanSSLDir, path.Join(h.PrefixPath, ToCleanSSLDir),
ToCleanCNIConf, ToCleanCNIConf,
ToCleanCNIBin, ToCleanCNIBin,
ToCleanCalicoRun, ToCleanCalicoRun,
ToCleanCNILib, path.Join(h.PrefixPath, ToCleanCNILib),
} }
return h.CleanUp(ctx, toCleanPaths, cleanerImage, prsMap) return h.CleanUp(ctx, toCleanPaths, cleanerImage, prsMap)
} }
func (h *Host) CleanUpEtcdHost(ctx context.Context, cleanerImage string, prsMap map[string]v3.PrivateRegistry) error { func (h *Host) CleanUpEtcdHost(ctx context.Context, cleanerImage string, prsMap map[string]v3.PrivateRegistry) error {
toCleanPaths := []string{ toCleanPaths := []string{
ToCleanEtcdDir, path.Join(h.PrefixPath, ToCleanEtcdDir),
ToCleanSSLDir, path.Join(h.PrefixPath, ToCleanSSLDir),
} }
if h.IsWorker || h.IsControl { if h.IsWorker || h.IsControl {
log.Infof(ctx, "[hosts] Host [%s] is already a worker or control host, skipping cleanup certs.", h.Address) log.Infof(ctx, "[hosts] Host [%s] is already a worker or control host, skipping cleanup certs.", h.Address)
toCleanPaths = []string{ toCleanPaths = []string{
ToCleanEtcdDir, path.Join(h.PrefixPath, ToCleanEtcdDir),
} }
} }
return h.CleanUp(ctx, toCleanPaths, cleanerImage, prsMap) return h.CleanUp(ctx, toCleanPaths, cleanerImage, prsMap)

View File

@ -50,7 +50,7 @@ func doRunDeployer(ctx context.Context, host *hosts.Host, containerEnv []string,
} }
hostCfg := &container.HostConfig{ hostCfg := &container.HostConfig{
Binds: []string{ Binds: []string{
"/etc/kubernetes:/etc/kubernetes", fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(host.PrefixPath, "/etc/kubernetes")),
}, },
Privileged: true, Privileged: true,
} }
@ -178,7 +178,7 @@ func fetchFileFromHost(ctx context.Context, filePath, image string, host *hosts.
} }
hostCfg := &container.HostConfig{ hostCfg := &container.HostConfig{
Binds: []string{ Binds: []string{
"/etc/kubernetes:/etc/kubernetes", fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(host.PrefixPath, "/etc/kubernetes")),
}, },
Privileged: true, Privileged: true,
} }

View File

@ -9,7 +9,7 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
func RunControlPlane(ctx context.Context, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, processMap map[string]v3.Process, updateWorkersOnly bool, alpineImage string) error { func RunControlPlane(ctx context.Context, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string) error {
log.Infof(ctx, "[%s] Building up Controller Plane..", ControlRole) log.Infof(ctx, "[%s] Building up Controller Plane..", ControlRole)
var errgrp errgroup.Group var errgrp errgroup.Group
for _, host := range controlHosts { for _, host := range controlHosts {
@ -18,7 +18,7 @@ func RunControlPlane(ctx context.Context, controlHosts []*hosts.Host, localConnD
continue continue
} }
errgrp.Go(func() error { errgrp.Go(func() error {
return doDeployControlHost(ctx, runHost, localConnDialerFactory, prsMap, processMap, alpineImage) return doDeployControlHost(ctx, runHost, localConnDialerFactory, prsMap, cpNodePlanMap[host.Address].Processes, alpineImage)
}) })
} }
if err := errgrp.Wait(); err != nil { if err := errgrp.Wait(); err != nil {

View File

@ -20,13 +20,14 @@ const (
EtcdHealthCheckURL = "https://127.0.0.1:2379/health" EtcdHealthCheckURL = "https://127.0.0.1:2379/health"
) )
func RunEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, etcdProcessHostMap map[*hosts.Host]v3.Process, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, updateWorkersOnly bool, alpineImage string) error { func RunEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, etcdNodePlanMap map[string]v3.RKEConfigNodePlan, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, updateWorkersOnly bool, alpineImage string) error {
log.Infof(ctx, "[%s] Building up etcd plane..", ETCDRole) log.Infof(ctx, "[%s] Building up etcd plane..", ETCDRole)
for _, host := range etcdHosts { for _, host := range etcdHosts {
if updateWorkersOnly { if updateWorkersOnly {
continue continue
} }
imageCfg, hostCfg, _ := GetProcessConfig(etcdProcessHostMap[host]) etcdProcess := etcdNodePlanMap[host.Address].Processes[EtcdContainerName]
imageCfg, hostCfg, _ := GetProcessConfig(etcdProcess)
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, EtcdContainerName, host.Address, ETCDRole, prsMap); err != nil { if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, EtcdContainerName, host.Address, ETCDRole, prsMap); err != nil {
return err return err
} }
@ -130,20 +131,20 @@ func RemoveEtcdMember(ctx context.Context, etcdHost *hosts.Host, etcdHosts []*ho
return nil return nil
} }
func ReloadEtcdCluster(ctx context.Context, readyEtcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte, prsMap map[string]v3.PrivateRegistry, etcdProcessHostMap map[*hosts.Host]v3.Process, alpineImage string) error { func ReloadEtcdCluster(ctx context.Context, readyEtcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte, prsMap map[string]v3.PrivateRegistry, etcdNodePlanMap map[string]v3.RKEConfigNodePlan, alpineImage string) error {
for host, process := range etcdProcessHostMap { for _, etcdHost := range readyEtcdHosts {
imageCfg, hostCfg, _ := GetProcessConfig(process) imageCfg, hostCfg, _ := GetProcessConfig(etcdNodePlanMap[etcdHost.Address].Processes[EtcdContainerName])
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, EtcdContainerName, host.Address, ETCDRole, prsMap); err != nil { if err := docker.DoRunContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdContainerName, etcdHost.Address, ETCDRole, prsMap); err != nil {
return err return err
} }
if err := createLogLink(ctx, host, EtcdContainerName, ETCDRole, alpineImage, prsMap); err != nil { if err := createLogLink(ctx, etcdHost, EtcdContainerName, ETCDRole, alpineImage, prsMap); err != nil {
return err return err
} }
} }
time.Sleep(10 * time.Second) time.Sleep(10 * time.Second)
var healthy bool var healthy bool
for _, host := range readyEtcdHosts { for _, host := range readyEtcdHosts {
_, _, healthCheckURL := GetProcessConfig(etcdProcessHostMap[host]) _, _, healthCheckURL := GetProcessConfig(etcdNodePlanMap[host.Address].Processes[EtcdContainerName])
if healthy = isEtcdHealthy(ctx, localConnDialerFactory, host, cert, key, healthCheckURL); healthy { if healthy = isEtcdHealthy(ctx, localConnDialerFactory, host, cert, key, healthCheckURL); healthy {
break break
} }

View File

@ -3,6 +3,7 @@ package services
import ( import (
"context" "context"
"fmt" "fmt"
"path"
"github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/container"
"github.com/rancher/rke/docker" "github.com/rancher/rke/docker"
@ -114,7 +115,7 @@ func createLogLink(ctx context.Context, host *hosts.Host, containerName, plane,
} }
hostCfg := &container.HostConfig{ hostCfg := &container.HostConfig{
Binds: []string{ Binds: []string{
"/var/lib:/var/lib", fmt.Sprintf("%s:/var/lib", path.Join(host.PrefixPath, "/var/lib")),
}, },
Privileged: true, Privileged: true,
} }

View File

@ -14,7 +14,7 @@ const (
unschedulableEtcdTaint = "node-role.kubernetes.io/etcd=true:NoExecute" unschedulableEtcdTaint = "node-role.kubernetes.io/etcd=true:NoExecute"
) )
func RunWorkerPlane(ctx context.Context, allHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, processMap map[string]v3.Process, kubeletProcessHostMap map[*hosts.Host]v3.Process, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string) error { func RunWorkerPlane(ctx context.Context, allHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string) error {
log.Infof(ctx, "[%s] Building up Worker Plane..", WorkerRole) log.Infof(ctx, "[%s] Building up Worker Plane..", WorkerRole)
var errgrp errgroup.Group var errgrp errgroup.Group
for _, host := range allHosts { for _, host := range allHosts {
@ -29,9 +29,8 @@ func RunWorkerPlane(ctx context.Context, allHosts []*hosts.Host, localConnDialer
} }
runHost := host runHost := host
// maps are not thread safe // maps are not thread safe
hostProcessMap := copyProcessMap(processMap) hostProcessMap := copyProcessMap(workerNodePlanMap[runHost.Address].Processes)
errgrp.Go(func() error { errgrp.Go(func() error {
hostProcessMap[KubeletContainerName] = kubeletProcessHostMap[runHost]
return doDeployWorkerPlane(ctx, runHost, localConnDialerFactory, prsMap, hostProcessMap, certMap, alpineImage) return doDeployWorkerPlane(ctx, runHost, localConnDialerFactory, prsMap, hostProcessMap, certMap, alpineImage)
}) })
} }

View File

@ -24,4 +24,4 @@ github.com/coreos/go-semver e214231b295a8ea9479f11b70b35d5acf3556d9
github.com/ugorji/go/codec ccfe18359b55b97855cee1d3f74e5efbda4869dc github.com/ugorji/go/codec ccfe18359b55b97855cee1d3f74e5efbda4869dc
github.com/rancher/norman ff60298f31f081b06d198815b4c178a578664f7d github.com/rancher/norman ff60298f31f081b06d198815b4c178a578664f7d
github.com/rancher/types aa395f86d553756013766ca164fd01c91c645ac6 github.com/rancher/types 574e26d2fb850f15b9269a54cacb9467505d44c5

View File

@ -34,7 +34,7 @@ type RancherKubernetesEngineConfig struct {
// Cloud Provider options // Cloud Provider options
CloudProvider CloudProvider `yaml:"cloud_provider" json:"cloudProvider,omitempty"` CloudProvider CloudProvider `yaml:"cloud_provider" json:"cloudProvider,omitempty"`
// kubernetes directory path // kubernetes directory path
KubernetesDirPath string `yaml:"kubernetes_dir_path" json:"kubernetesDirPath,omitempty"` PrefixPath string `yaml:"prefix_path" json:"prefixPath,omitempty"`
} }
type PrivateRegistry struct { type PrivateRegistry struct {