1
0
mirror of https://github.com/rancher/rke.git synced 2025-08-02 15:48:03 +00:00

Add GenetatePlan() and use it internally

This commit is contained in:
moelsayed 2018-02-13 02:47:56 +02:00
parent 6ea9ff01ad
commit 169ac106a5
17 changed files with 575 additions and 427 deletions

View File

@ -54,17 +54,22 @@ const (
func (c *Cluster) DeployControlPlane(ctx context.Context) error {
// Deploy Etcd Plane
if err := services.RunEtcdPlane(ctx, c.EtcdHosts, c.Services.Etcd, c.LocalConnDialerFactory, c.PrivateRegistriesMap); err != nil {
etcdProcessHostMap := c.getEtcdProcessHostMap(nil)
if err := services.RunEtcdPlane(ctx, c.EtcdHosts, etcdProcessHostMap, c.LocalConnDialerFactory, c.PrivateRegistriesMap); err != nil {
return fmt.Errorf("[etcd] Failed to bring up Etcd Plane: %v", err)
}
// Deploy Control plane
processMap := map[string]v3.Process{
services.SidekickContainerName: c.BuildSidecarProcess(),
services.KubeAPIContainerName: c.BuildKubeAPIProcess(),
services.KubeControllerContainerName: c.BuildKubeControllerProcess(),
services.SchedulerContainerName: c.BuildSchedulerProcess(),
}
if err := services.RunControlPlane(ctx, c.ControlPlaneHosts,
c.EtcdHosts,
c.Services,
c.SystemImages.KubernetesServicesSidecar,
c.Authorization.Mode,
c.LocalConnDialerFactory,
c.PrivateRegistriesMap); err != nil {
c.PrivateRegistriesMap,
processMap); err != nil {
return fmt.Errorf("[controlPlane] Failed to bring up Control Plane: %v", err)
}
// Apply Authz configuration after deploying controlplane
@ -76,14 +81,22 @@ func (c *Cluster) DeployControlPlane(ctx context.Context) error {
func (c *Cluster) DeployWorkerPlane(ctx context.Context) error {
// Deploy Worker Plane
if err := services.RunWorkerPlane(ctx, c.ControlPlaneHosts,
c.WorkerHosts,
c.EtcdHosts,
c.Services,
c.SystemImages.NginxProxy,
c.SystemImages.KubernetesServicesSidecar,
processMap := map[string]v3.Process{
services.SidekickContainerName: c.BuildSidecarProcess(),
services.KubeproxyContainerName: c.BuildKubeProxyProcess(),
services.NginxProxyContainerName: c.BuildProxyProcess(),
}
kubeletProcessHostMap := make(map[*hosts.Host]v3.Process)
for _, host := range hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts) {
kubeletProcessHostMap[host] = c.BuildKubeletProcess(host)
}
allHosts := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts)
if err := services.RunWorkerPlane(ctx, allHosts,
c.LocalConnDialerFactory,
c.PrivateRegistriesMap); err != nil {
c.PrivateRegistriesMap,
processMap,
kubeletProcessHostMap,
); err != nil {
return fmt.Errorf("[workerPlane] Failed to bring up Worker Plane: %v", err)
}
return nil
@ -284,3 +297,11 @@ func ConfigureCluster(ctx context.Context, rkeConfig v3.RancherKubernetesEngineC
return kubeCluster.deployAddons(ctx)
}
func (c *Cluster) getEtcdProcessHostMap(readyEtcdHosts []*hosts.Host) map[*hosts.Host]v3.Process {
etcdProcessHostMap := make(map[*hosts.Host]v3.Process)
for _, host := range c.EtcdHosts {
etcdProcessHostMap[host] = c.BuildEtcdProcess(host, readyEtcdHosts)
}
return etcdProcessHostMap
}

View File

@ -41,6 +41,9 @@ const (
KubeProxyPort = "10256"
FlannetVXLANPortUDP = "8472"
ProtocolTCP = "TCP"
ProtocolUDP = "UDP"
FlannelNetworkPlugin = "flannel"
FlannelImage = "flannel_image"
FlannelCNIImage = "flannel_cni_image"
@ -96,6 +99,19 @@ const (
RBACConfig = "RBACConfig"
)
var EtcdPortList = []string{
EtcdPort1,
EtcdPort2,
}
var ControlPlanePortList = []string{
KubeAPIPort,
}
var WorkerPortList = []string{
KubeletPort,
}
func (c *Cluster) deployNetworkPlugin(ctx context.Context) error {
log.Infof(ctx, "[network] Setting up network plugin: %s", c.Network.Plugin)
switch c.Network.Plugin {
@ -262,27 +278,17 @@ func (c *Cluster) deployTCPPortListeners(ctx context.Context, currentCluster *Cl
workerHosts = c.WorkerHosts
}
// deploy ectd listeners
etcdPortList := []string{
EtcdPort1,
EtcdPort2,
}
if err := c.deployListenerOnPlane(ctx, etcdPortList, etcdHosts, EtcdPortListenContainer); err != nil {
if err := c.deployListenerOnPlane(ctx, EtcdPortList, etcdHosts, EtcdPortListenContainer); err != nil {
return err
}
// deploy controlplane listeners
controlPlanePortList := []string{
KubeAPIPort,
}
if err := c.deployListenerOnPlane(ctx, controlPlanePortList, cpHosts, CPPortListenContainer); err != nil {
if err := c.deployListenerOnPlane(ctx, ControlPlanePortList, cpHosts, CPPortListenContainer); err != nil {
return err
}
// deploy worker listeners
workerPortList := []string{
KubeletPort,
}
if err := c.deployListenerOnPlane(ctx, workerPortList, workerHosts, WorkerPortListenContainer); err != nil {
if err := c.deployListenerOnPlane(ctx, WorkerPortList, workerHosts, WorkerPortListenContainer); err != nil {
return err
}
log.Infof(ctx, "[network] Port listener containers deployed successfully")
@ -360,17 +366,13 @@ func removeListenerFromPlane(ctx context.Context, hostPlane []*hosts.Host, conta
func (c *Cluster) runServicePortChecks(ctx context.Context) error {
var errgrp errgroup.Group
// check etcd <-> etcd
etcdPortList := []string{
EtcdPort1,
EtcdPort2,
}
// one etcd host is a pass
if len(c.EtcdHosts) > 1 {
log.Infof(ctx, "[network] Running etcd <-> etcd port checks")
for _, host := range c.EtcdHosts {
runHost := host
errgrp.Go(func() error {
return checkPlaneTCPPortsFromHost(ctx, runHost, etcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
return checkPlaneTCPPortsFromHost(ctx, runHost, EtcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
})
}
if err := errgrp.Wait(); err != nil {
@ -382,7 +384,7 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error {
for _, host := range c.ControlPlaneHosts {
runHost := host
errgrp.Go(func() error {
return checkPlaneTCPPortsFromHost(ctx, runHost, etcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
return checkPlaneTCPPortsFromHost(ctx, runHost, EtcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
})
}
if err := errgrp.Wait(); err != nil {
@ -392,7 +394,7 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error {
for _, host := range c.WorkerHosts {
runHost := host
errgrp.Go(func() error {
return checkPlaneTCPPortsFromHost(ctx, runHost, etcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
return checkPlaneTCPPortsFromHost(ctx, runHost, EtcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
})
}
if err := errgrp.Wait(); err != nil {
@ -400,13 +402,10 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error {
}
// check controle plane -> Workers
log.Infof(ctx, "[network] Running control plane -> etcd port checks")
workerPortList := []string{
KubeletPort,
}
for _, host := range c.ControlPlaneHosts {
runHost := host
errgrp.Go(func() error {
return checkPlaneTCPPortsFromHost(ctx, runHost, workerPortList, c.WorkerHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
return checkPlaneTCPPortsFromHost(ctx, runHost, WorkerPortList, c.WorkerHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
})
}
if err := errgrp.Wait(); err != nil {
@ -414,13 +413,10 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error {
}
// check workers -> control plane
log.Infof(ctx, "[network] Running workers -> control plane port checks")
controlPlanePortList := []string{
KubeAPIPort,
}
for _, host := range c.WorkerHosts {
runHost := host
errgrp.Go(func() error {
return checkPlaneTCPPortsFromHost(ctx, runHost, controlPlanePortList, c.ControlPlaneHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
return checkPlaneTCPPortsFromHost(ctx, runHost, ControlPlanePortList, c.ControlPlaneHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
})
}
return errgrp.Wait()

387
cluster/plan.go Normal file
View File

@ -0,0 +1,387 @@
package cluster
import (
"context"
"fmt"
"strconv"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/services"
"github.com/rancher/types/apis/management.cattle.io/v3"
)
func GeneratePlan(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig) (v3.RKEPlan, error) {
clusterPlan := v3.RKEPlan{}
myCluster, _ := ParseCluster(ctx, rkeConfig, "", "", nil, nil)
// rkeConfig.Nodes are already unique. But they don't have role flags. So I will use the parsed cluster.Hosts to make use of the role flags.
uniqHosts := hosts.GetUniqueHostList(myCluster.EtcdHosts, myCluster.ControlPlaneHosts, myCluster.WorkerHosts)
for _, host := range uniqHosts {
clusterPlan.Nodes = append(clusterPlan.Nodes, BuildRKEConfigNodePlan(ctx, myCluster, host))
}
return clusterPlan, nil
}
func BuildRKEConfigNodePlan(ctx context.Context, myCluster *Cluster, host *hosts.Host) v3.RKEConfigNodePlan {
processes := []v3.Process{}
portChecks := []v3.PortCheck{}
// Everybody gets a sidecar and a kubelet..
processes = append(processes, myCluster.BuildSidecarProcess())
processes = append(processes, myCluster.BuildKubeletProcess(host))
processes = append(processes, myCluster.BuildKubeProxyProcess())
portChecks = append(portChecks, BuildPortChecksFromPortList(host, WorkerPortList, ProtocolTCP)...)
// Do we need an nginxProxy for this one ?
if host.IsWorker && !host.IsControl {
processes = append(processes, myCluster.BuildProxyProcess())
}
if host.IsControl {
processes = append(processes, myCluster.BuildKubeAPIProcess())
processes = append(processes, myCluster.BuildKubeControllerProcess())
processes = append(processes, myCluster.BuildSchedulerProcess())
portChecks = append(portChecks, BuildPortChecksFromPortList(host, ControlPlanePortList, ProtocolTCP)...)
}
if host.IsEtcd {
processes = append(processes, myCluster.BuildEtcdProcess(host, nil))
portChecks = append(portChecks, BuildPortChecksFromPortList(host, EtcdPortList, ProtocolTCP)...)
}
return v3.RKEConfigNodePlan{
Address: host.Address,
Processes: processes,
PortChecks: portChecks,
}
}
func (c *Cluster) BuildKubeAPIProcess() v3.Process {
etcdConnString := services.GetEtcdConnString(c.EtcdHosts)
args := []string{}
Command := []string{
"/opt/rke/entrypoint.sh",
"kube-apiserver",
"--insecure-bind-address=127.0.0.1",
"--bind-address=0.0.0.0",
"--insecure-port=0",
"--secure-port=6443",
"--cloud-provider=",
"--allow_privileged=true",
"--kubelet-preferred-address-types=InternalIP,ExternalIP,Hostname",
"--service-cluster-ip-range=" + c.Services.KubeAPI.ServiceClusterIPRange,
"--admission-control=ServiceAccount,NamespaceLifecycle,LimitRanger,PersistentVolumeLabel,DefaultStorageClass,ResourceQuota,DefaultTolerationSeconds",
"--runtime-config=batch/v2alpha1",
"--runtime-config=authentication.k8s.io/v1beta1=true",
"--storage-backend=etcd3",
"--client-ca-file=" + pki.GetCertPath(pki.CACertName),
"--tls-cert-file=" + pki.GetCertPath(pki.KubeAPICertName),
"--tls-private-key-file=" + pki.GetKeyPath(pki.KubeAPICertName),
"--service-account-key-file=" + pki.GetKeyPath(pki.KubeAPICertName),
"--etcd-cafile=" + pki.GetCertPath(pki.CACertName),
"--etcd-certfile=" + pki.GetCertPath(pki.KubeAPICertName),
"--etcd-keyfile=" + pki.GetKeyPath(pki.KubeAPICertName),
}
args = append(args, "--etcd-servers="+etcdConnString)
if c.Authorization.Mode == services.RBACAuthorizationMode {
args = append(args, "--authorization-mode=RBAC")
}
if c.Services.KubeAPI.PodSecurityPolicy {
args = append(args, "--runtime-config=extensions/v1beta1/podsecuritypolicy=true", "--admission-control=PodSecurityPolicy")
}
VolumesFrom := []string{
services.SidekickContainerName,
}
Binds := []string{
"/etc/kubernetes:/etc/kubernetes",
}
for arg, value := range c.Services.KubeAPI.ExtraArgs {
cmd := fmt.Sprintf("--%s=%s", arg, value)
Command = append(Command, cmd)
}
healthCheck := v3.HealthCheck{
URL: services.GetHealthCheckURL(true, services.KubeAPIPort),
}
return v3.Process{
Command: Command,
Args: args,
VolumesFrom: VolumesFrom,
Binds: Binds,
NetworkMode: "host",
RestartPolicy: "always",
Image: c.Services.KubeAPI.Image,
HealthCheck: healthCheck,
}
}
func (c *Cluster) BuildKubeControllerProcess() v3.Process {
Command := []string{"/opt/rke/entrypoint.sh",
"kube-controller-manager",
"--address=0.0.0.0",
"--cloud-provider=",
"--leader-elect=true",
"--kubeconfig=" + pki.GetConfigPath(pki.KubeControllerCertName),
"--enable-hostpath-provisioner=false",
"--node-monitor-grace-period=40s",
"--pod-eviction-timeout=5m0s",
"--v=2",
"--allocate-node-cidrs=true",
"--cluster-cidr=" + c.ClusterCIDR,
"--service-cluster-ip-range=" + c.Services.KubeController.ServiceClusterIPRange,
"--service-account-private-key-file=" + pki.GetKeyPath(pki.KubeAPICertName),
"--root-ca-file=" + pki.GetCertPath(pki.CACertName),
}
args := []string{}
if c.Authorization.Mode == services.RBACAuthorizationMode {
args = append(args, "--use-service-account-credentials=true")
}
VolumesFrom := []string{
services.SidekickContainerName,
}
Binds := []string{
"/etc/kubernetes:/etc/kubernetes",
}
for arg, value := range c.Services.KubeController.ExtraArgs {
cmd := fmt.Sprintf("--%s=%s", arg, value)
Command = append(Command, cmd)
}
healthCheck := v3.HealthCheck{
URL: services.GetHealthCheckURL(false, services.KubeControllerPort),
}
return v3.Process{
Command: Command,
Args: args,
VolumesFrom: VolumesFrom,
Binds: Binds,
NetworkMode: "host",
RestartPolicy: "always",
Image: c.Services.KubeController.Image,
HealthCheck: healthCheck,
}
}
func (c *Cluster) BuildKubeletProcess(host *hosts.Host) v3.Process {
Command := []string{"/opt/rke/entrypoint.sh",
"kubelet",
"--v=2",
"--address=0.0.0.0",
"--cluster-domain=" + c.ClusterDomain,
"--pod-infra-container-image=" + c.Services.Kubelet.InfraContainerImage,
"--cgroups-per-qos=True",
"--enforce-node-allocatable=",
"--hostname-override=" + host.HostnameOverride,
"--cluster-dns=" + c.ClusterDNSServer,
"--network-plugin=cni",
"--cni-conf-dir=/etc/cni/net.d",
"--cni-bin-dir=/opt/cni/bin",
"--resolv-conf=/etc/resolv.conf",
"--allow-privileged=true",
"--cloud-provider=",
"--kubeconfig=" + pki.GetConfigPath(pki.KubeNodeCertName),
"--require-kubeconfig=True",
"--fail-swap-on=" + strconv.FormatBool(c.Services.Kubelet.FailSwapOn),
}
VolumesFrom := []string{
services.SidekickContainerName,
}
Binds := []string{
"/etc/kubernetes:/etc/kubernetes",
"/usr/libexec/kubernetes/kubelet-plugins:/usr/libexec/kubernetes/kubelet-plugins",
"/etc/cni:/etc/cni:ro",
"/opt/cni:/opt/cni:ro",
"/etc/resolv.conf:/etc/resolv.conf",
"/sys:/sys",
"/var/lib/docker:/var/lib/docker:rw",
"/var/lib/kubelet:/var/lib/kubelet:shared",
"/var/run:/var/run:rw",
"/run:/run",
"/etc/ceph:/etc/ceph",
"/dev:/host/dev",
"/var/log/containers:/var/log/containers",
"/var/log/pods:/var/log/pods",
}
for arg, value := range c.Services.Kubelet.ExtraArgs {
cmd := fmt.Sprintf("--%s=%s", arg, value)
Command = append(Command, cmd)
}
healthCheck := v3.HealthCheck{
URL: services.GetHealthCheckURL(true, services.KubeletPort),
}
return v3.Process{
Command: Command,
VolumesFrom: VolumesFrom,
Binds: Binds,
NetworkMode: "host",
RestartPolicy: "always",
Image: c.Services.Kubelet.Image,
PidMode: "host",
Privileged: true,
HealthCheck: healthCheck,
}
}
func (c *Cluster) BuildKubeProxyProcess() v3.Process {
Command := []string{"/opt/rke/entrypoint.sh",
"kube-proxy",
"--v=2",
"--healthz-bind-address=0.0.0.0",
"--kubeconfig=" + pki.GetConfigPath(pki.KubeProxyCertName),
}
VolumesFrom := []string{
services.SidekickContainerName,
}
Binds := []string{
"/etc/kubernetes:/etc/kubernetes",
}
for arg, value := range c.Services.Kubeproxy.ExtraArgs {
cmd := fmt.Sprintf("--%s=%s", arg, value)
Command = append(Command, cmd)
}
healthCheck := v3.HealthCheck{
URL: services.GetHealthCheckURL(false, services.KubeproxyPort),
}
return v3.Process{
Command: Command,
VolumesFrom: VolumesFrom,
Binds: Binds,
NetworkMode: "host",
RestartPolicy: "always",
PidMode: "host",
Privileged: true,
HealthCheck: healthCheck,
Image: c.Services.Kubeproxy.Image,
}
}
func (c *Cluster) BuildProxyProcess() v3.Process {
nginxProxyEnv := ""
for i, host := range c.ControlPlaneHosts {
nginxProxyEnv += fmt.Sprintf("%s", host.InternalAddress)
if i < (len(c.ControlPlaneHosts) - 1) {
nginxProxyEnv += ","
}
}
Env := []string{fmt.Sprintf("%s=%s", services.NginxProxyEnvName, nginxProxyEnv)}
return v3.Process{
Env: Env,
NetworkMode: "host",
RestartPolicy: "always",
HealthCheck: v3.HealthCheck{},
Image: c.SystemImages.NginxProxy,
}
}
func (c *Cluster) BuildSchedulerProcess() v3.Process {
Command := []string{"/opt/rke/entrypoint.sh",
"kube-scheduler",
"--leader-elect=true",
"--v=2",
"--address=0.0.0.0",
"--kubeconfig=" + pki.GetConfigPath(pki.KubeSchedulerCertName),
}
VolumesFrom := []string{
services.SidekickContainerName,
}
Binds := []string{
"/etc/kubernetes:/etc/kubernetes",
}
for arg, value := range c.Services.Scheduler.ExtraArgs {
cmd := fmt.Sprintf("--%s=%s", arg, value)
Command = append(Command, cmd)
}
healthCheck := v3.HealthCheck{
URL: services.GetHealthCheckURL(false, services.SchedulerPort),
}
return v3.Process{
Command: Command,
Binds: Binds,
VolumesFrom: VolumesFrom,
NetworkMode: "host",
RestartPolicy: "always",
Image: c.Services.Scheduler.Image,
HealthCheck: healthCheck,
}
}
func (c *Cluster) BuildSidecarProcess() v3.Process {
return v3.Process{
NetworkMode: "none",
Image: c.SystemImages.KubernetesServicesSidecar,
HealthCheck: v3.HealthCheck{},
}
}
func (c *Cluster) BuildEtcdProcess(host *hosts.Host, etcdHosts []*hosts.Host) v3.Process {
nodeName := pki.GetEtcdCrtName(host.InternalAddress)
initCluster := ""
if len(etcdHosts) == 0 {
initCluster = services.GetEtcdInitialCluster(c.EtcdHosts)
} else {
initCluster = services.GetEtcdInitialCluster(etcdHosts)
}
clusterState := "new"
if host.ExistingEtcdCluster {
clusterState = "existing"
}
args := []string{"/usr/local/bin/etcd",
"--name=etcd-" + host.HostnameOverride,
"--data-dir=/etcd-data",
"--advertise-client-urls=https://" + host.InternalAddress + ":2379,https://" + host.InternalAddress + ":4001",
"--listen-client-urls=https://0.0.0.0:2379",
"--initial-advertise-peer-urls=https://" + host.InternalAddress + ":2380",
"--listen-peer-urls=https://0.0.0.0:2380",
"--initial-cluster-token=etcd-cluster-1",
"--initial-cluster=" + initCluster,
"--initial-cluster-state=" + clusterState,
"--peer-client-cert-auth",
"--client-cert-auth",
"--trusted-ca-file=" + pki.GetCertPath(pki.CACertName),
"--peer-trusted-ca-file=" + pki.GetCertPath(pki.CACertName),
"--cert-file=" + pki.GetCertPath(nodeName),
"--key-file=" + pki.GetKeyPath(nodeName),
"--peer-cert-file=" + pki.GetCertPath(nodeName),
"--peer-key-file=" + pki.GetKeyPath(nodeName),
}
Binds := []string{
"/var/lib/etcd:/etcd-data:z",
"/etc/kubernetes:/etc/kubernetes:z",
}
for arg, value := range c.Services.Etcd.ExtraArgs {
cmd := fmt.Sprintf("--%s=%s", arg, value)
args = append(args, cmd)
}
healthCheck := v3.HealthCheck{
URL: services.EtcdHealthCheckURL,
}
return v3.Process{
Args: args,
Binds: Binds,
NetworkMode: "host",
Image: c.Services.Etcd.Image,
HealthCheck: healthCheck,
}
}
func BuildPortChecksFromPortList(host *hosts.Host, portList []string, proto string) []v3.PortCheck {
portChecks := []v3.PortCheck{}
for _, port := range portList {
intPort, _ := strconv.Atoi(port)
portChecks = append(portChecks, v3.PortCheck{
Address: host.Address,
Port: intPort,
Protocol: proto,
})
}
return portChecks
}

View File

@ -197,7 +197,10 @@ func reconcileEtcd(ctx context.Context, currentCluster, kubeCluster *Cluster, ku
return err
}
etcdHost.ToAddEtcdMember = false
if err := services.ReloadEtcdCluster(ctx, kubeCluster.EtcdHosts, kubeCluster.Services.Etcd, currentCluster.LocalConnDialerFactory, clientCert, clientkey, currentCluster.PrivateRegistriesMap); err != nil {
readyHosts := getReadyEtcdHosts(kubeCluster.EtcdHosts)
etcdProcessHostMap := kubeCluster.getEtcdProcessHostMap(readyHosts)
if err := services.ReloadEtcdCluster(ctx, readyHosts, currentCluster.LocalConnDialerFactory, clientCert, clientkey, currentCluster.PrivateRegistriesMap, etcdProcessHostMap); err != nil {
return err
}
}
@ -220,3 +223,14 @@ func syncLabels(ctx context.Context, currentCluster, kubeCluster *Cluster) {
}
}
}
func getReadyEtcdHosts(etcdHosts []*hosts.Host) []*hosts.Host {
readyEtcdHosts := []*hosts.Host{}
for _, host := range etcdHosts {
if !host.ToAddEtcdMember {
readyEtcdHosts = append(readyEtcdHosts, host)
host.ExistingEtcdCluster = true
}
}
return readyEtcdHosts
}

View File

@ -9,13 +9,13 @@ import (
"golang.org/x/sync/errgroup"
)
func RunControlPlane(ctx context.Context, controlHosts, etcdHosts []*hosts.Host, controlServices v3.RKEConfigServices, sidekickImage, authorizationMode string, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
func RunControlPlane(ctx context.Context, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, processMap map[string]v3.Process) error {
log.Infof(ctx, "[%s] Building up Controller Plane..", ControlRole)
var errgrp errgroup.Group
for _, host := range controlHosts {
runHost := host
errgrp.Go(func() error {
return doDeployControlHost(ctx, runHost, etcdHosts, controlServices, sidekickImage, authorizationMode, localConnDialerFactory, prsMap)
return doDeployControlHost(ctx, runHost, localConnDialerFactory, prsMap, processMap)
})
}
if err := errgrp.Wait(); err != nil {
@ -66,24 +66,24 @@ func RemoveControlPlane(ctx context.Context, controlHosts []*hosts.Host, force b
return nil
}
func doDeployControlHost(ctx context.Context, host *hosts.Host, etcdHosts []*hosts.Host, controlServices v3.RKEConfigServices, sidekickImage, authorizationMode string, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
func doDeployControlHost(ctx context.Context, host *hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, processMap map[string]v3.Process) error {
if host.IsWorker {
if err := removeNginxProxy(ctx, host); err != nil {
return err
}
}
// run sidekick
if err := runSidekick(ctx, host, sidekickImage, prsMap); err != nil {
if err := runSidekick(ctx, host, prsMap, processMap[SidekickContainerName]); err != nil {
return err
}
// run kubeapi
if err := runKubeAPI(ctx, host, etcdHosts, controlServices.KubeAPI, authorizationMode, localConnDialerFactory, prsMap); err != nil {
if err := runKubeAPI(ctx, host, localConnDialerFactory, prsMap, processMap[KubeAPIContainerName]); err != nil {
return err
}
// run kubecontroller
if err := runKubeController(ctx, host, controlServices.KubeController, authorizationMode, localConnDialerFactory, prsMap); err != nil {
if err := runKubeController(ctx, host, localConnDialerFactory, prsMap, processMap[KubeControllerContainerName]); err != nil {
return err
}
// run scheduler
return runScheduler(ctx, host, controlServices.Scheduler, localConnDialerFactory, prsMap)
return runScheduler(ctx, host, localConnDialerFactory, prsMap, processMap[SchedulerContainerName])
}

View File

@ -7,22 +7,21 @@ import (
"context"
etcdclient "github.com/coreos/etcd/client"
"github.com/docker/docker/api/types/container"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/log"
"github.com/rancher/rke/pki"
"github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus"
)
func RunEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, etcdService v3.ETCDService, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
log.Infof(ctx, "[%s] Building up Etcd Plane..", ETCDRole)
initCluster := getEtcdInitialCluster(etcdHosts)
for _, host := range etcdHosts {
const (
EtcdHealthCheckURL = "https://127.0.0.1:2379/health"
)
nodeName := pki.GetEtcdCrtName(host.InternalAddress)
imageCfg, hostCfg := buildEtcdConfig(host, etcdService, initCluster, nodeName)
func RunEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, etcdProcessHostMap map[*hosts.Host]v3.Process, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
log.Infof(ctx, "[%s] Building up Etcd Plane..", ETCDRole)
for _, host := range etcdHosts {
imageCfg, hostCfg, _ := getProcessConfig(etcdProcessHostMap[host])
err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, EtcdContainerName, host.Address, ETCDRole, prsMap)
if err != nil {
return err
@ -60,49 +59,6 @@ func RemoveEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, force bool) e
return nil
}
func buildEtcdConfig(host *hosts.Host, etcdService v3.ETCDService, initCluster, nodeName string) (*container.Config, *container.HostConfig) {
clusterState := "new"
if host.ExistingEtcdCluster {
clusterState = "existing"
}
imageCfg := &container.Config{
Image: etcdService.Image,
Cmd: []string{"/usr/local/bin/etcd",
"--name=etcd-" + host.HostnameOverride,
"--data-dir=/etcd-data",
"--advertise-client-urls=https://" + host.InternalAddress + ":2379,https://" + host.InternalAddress + ":4001",
"--listen-client-urls=https://0.0.0.0:2379",
"--initial-advertise-peer-urls=https://" + host.InternalAddress + ":2380",
"--listen-peer-urls=https://0.0.0.0:2380",
"--initial-cluster-token=etcd-cluster-1",
"--initial-cluster=" + initCluster,
"--initial-cluster-state=" + clusterState,
"--peer-client-cert-auth",
"--client-cert-auth",
"--trusted-ca-file=" + pki.GetCertPath(pki.CACertName),
"--peer-trusted-ca-file=" + pki.GetCertPath(pki.CACertName),
"--cert-file=" + pki.GetCertPath(nodeName),
"--key-file=" + pki.GetKeyPath(nodeName),
"--peer-cert-file=" + pki.GetCertPath(nodeName),
"--peer-key-file=" + pki.GetKeyPath(nodeName),
},
}
hostCfg := &container.HostConfig{
RestartPolicy: container.RestartPolicy{Name: "always"},
Binds: []string{
"/var/lib/etcd:/etcd-data:z",
"/etc/kubernetes:/etc/kubernetes:z",
},
NetworkMode: "host",
}
for arg, value := range etcdService.ExtraArgs {
cmd := fmt.Sprintf("--%s=%s", arg, value)
imageCfg.Entrypoint = append(imageCfg.Entrypoint, cmd)
}
return imageCfg, hostCfg
}
func AddEtcdMember(ctx context.Context, etcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte) error {
log.Infof(ctx, "[add/%s] Adding member [etcd-%s] to etcd cluster", ETCDRole, etcdHost.HostnameOverride)
peerURL := fmt.Sprintf("https://%s:2380", etcdHost.InternalAddress)
@ -164,17 +120,9 @@ func RemoveEtcdMember(ctx context.Context, etcdHost *hosts.Host, etcdHosts []*ho
return nil
}
func ReloadEtcdCluster(ctx context.Context, etcdHosts []*hosts.Host, etcdService v3.ETCDService, localConnDialerFactory hosts.DialerFactory, cert, key []byte, prsMap map[string]v3.PrivateRegistry) error {
readyEtcdHosts := []*hosts.Host{}
for _, host := range etcdHosts {
if !host.ToAddEtcdMember {
readyEtcdHosts = append(readyEtcdHosts, host)
host.ExistingEtcdCluster = true
}
}
initCluster := getEtcdInitialCluster(readyEtcdHosts)
for _, host := range readyEtcdHosts {
imageCfg, hostCfg := buildEtcdConfig(host, etcdService, initCluster, pki.GetEtcdCrtName(host.InternalAddress))
func ReloadEtcdCluster(ctx context.Context, readyEtcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte, prsMap map[string]v3.PrivateRegistry, etcdProcessHostMap map[*hosts.Host]v3.Process) error {
for host, process := range etcdProcessHostMap {
imageCfg, hostCfg, _ := getProcessConfig(process)
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, EtcdContainerName, host.Address, ETCDRole, prsMap); err != nil {
return err
}
@ -182,7 +130,8 @@ func ReloadEtcdCluster(ctx context.Context, etcdHosts []*hosts.Host, etcdService
time.Sleep(10 * time.Second)
var healthy bool
for _, host := range readyEtcdHosts {
if healthy = isEtcdHealthy(ctx, localConnDialerFactory, host, cert, key); healthy {
_, _, healthCheckURL := getProcessConfig(etcdProcessHostMap[host])
if healthy = isEtcdHealthy(ctx, localConnDialerFactory, host, cert, key, healthCheckURL); healthy {
break
}
}

View File

@ -43,7 +43,7 @@ func TestEtcdConfig(t *testing.T) {
etcdService.Image = TestEtcdImage
etcdService.ExtraArgs = map[string]string{"foo": "bar"}
// Test init cluster string
initCluster := getEtcdInitialCluster(etcdHosts)
initCluster := GetEtcdInitialCluster(etcdHosts)
assertEqual(t, initCluster, TestInitEtcdClusterString, "")
for _, host := range etcdHosts {

View File

@ -39,7 +39,7 @@ func getEtcdClient(ctx context.Context, etcdHost *hosts.Host, localConnDialerFac
return etcdclient.New(cfg)
}
func isEtcdHealthy(ctx context.Context, localConnDialerFactory hosts.DialerFactory, host *hosts.Host, cert, key []byte) bool {
func isEtcdHealthy(ctx context.Context, localConnDialerFactory hosts.DialerFactory, host *hosts.Host, cert, key []byte, url string) bool {
logrus.Debugf("[etcd] Check etcd cluster health")
for i := 0; i < 3; i++ {
dialer, err := getEtcdDialer(localConnDialerFactory, host)
@ -59,7 +59,7 @@ func isEtcdHealthy(ctx context.Context, localConnDialerFactory hosts.DialerFacto
TLSHandshakeTimeout: 10 * time.Second,
},
}
healthy, err := getHealthEtcd(hc, host)
healthy, err := getHealthEtcd(hc, host, url)
if err != nil {
logrus.Debug(err)
time.Sleep(5 * time.Second)
@ -73,9 +73,9 @@ func isEtcdHealthy(ctx context.Context, localConnDialerFactory hosts.DialerFacto
return false
}
func getHealthEtcd(hc http.Client, host *hosts.Host) (string, error) {
func getHealthEtcd(hc http.Client, host *hosts.Host, url string) (string, error) {
healthy := struct{ Health string }{}
resp, err := hc.Get("https://127.0.0.1:2379/health")
resp, err := hc.Get(url)
if err != nil {
return healthy.Health, fmt.Errorf("Failed to get /health for host [%s]: %v", host.Address, err)
}
@ -90,7 +90,7 @@ func getHealthEtcd(hc http.Client, host *hosts.Host) (string, error) {
return healthy.Health, nil
}
func getEtcdInitialCluster(hosts []*hosts.Host) string {
func GetEtcdInitialCluster(hosts []*hosts.Host) string {
initialCluster := ""
for i, host := range hosts {
initialCluster += fmt.Sprintf("etcd-%s=https://%s:2380", host.HostnameOverride, host.InternalAddress)

View File

@ -6,6 +6,8 @@ import (
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
"time"
"github.com/rancher/rke/hosts"
@ -20,14 +22,18 @@ const (
HTTPSProtoPrefix = "https://"
)
func runHealthcheck(ctx context.Context, host *hosts.Host, port int, useTLS bool, serviceName string, localConnDialerFactory hosts.DialerFactory) error {
func runHealthcheck(ctx context.Context, host *hosts.Host, serviceName string, localConnDialerFactory hosts.DialerFactory, url string) error {
log.Infof(ctx, "[healthcheck] Start Healthcheck on service [%s] on host [%s]", serviceName, host.Address)
port, err := getPortFromURL(url)
if err != nil {
return err
}
client, err := getHealthCheckHTTPClient(host, port, localConnDialerFactory)
if err != nil {
return fmt.Errorf("Failed to initiate new HTTP client for service [%s] for host [%s]", serviceName, host.Address)
}
for retries := 0; retries < 10; retries++ {
if err = getHealthz(client, useTLS, serviceName, host.Address, port); err != nil {
if err = getHealthz(client, serviceName, host.Address, url); err != nil {
logrus.Debugf("[healthcheck] %v", err)
time.Sleep(5 * time.Second)
continue
@ -58,14 +64,10 @@ func getHealthCheckHTTPClient(host *hosts.Host, port int, localConnDialerFactory
}, nil
}
func getHealthz(client *http.Client, useTLS bool, serviceName, hostAddress string, port int) error {
proto := HTTPProtoPrefix
if useTLS {
proto = HTTPSProtoPrefix
}
resp, err := client.Get(fmt.Sprintf("%s%s:%d%s", proto, HealthzAddress, port, HealthzEndpoint))
func getHealthz(client *http.Client, serviceName, hostAddress, url string) error {
resp, err := client.Get(url)
if err != nil {
return fmt.Errorf("Failed to check %s for service [%s] on host [%s]: %v", HealthzEndpoint, serviceName, hostAddress, err)
return fmt.Errorf("Failed to check %s for service [%s] on host [%s]: %v", url, serviceName, hostAddress, err)
}
if resp.StatusCode != http.StatusOK {
statusBody, _ := ioutil.ReadAll(resp.Body)
@ -73,3 +75,12 @@ func getHealthz(client *http.Client, useTLS bool, serviceName, hostAddress strin
}
return nil
}
func getPortFromURL(url string) (int, error) {
port := strings.Split(strings.Split(url, ":")[2], "/")[0]
intPort, err := strconv.Atoi(port)
if err != nil {
return 0, err
}
return intPort, nil
}

View File

@ -2,75 +2,20 @@ package services
import (
"context"
"fmt"
"github.com/docker/docker/api/types/container"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/pki"
"github.com/rancher/types/apis/management.cattle.io/v3"
)
func runKubeAPI(ctx context.Context, host *hosts.Host, etcdHosts []*hosts.Host, kubeAPIService v3.KubeAPIService, authorizationMode string, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
etcdConnString := GetEtcdConnString(etcdHosts)
imageCfg, hostCfg := buildKubeAPIConfig(host, kubeAPIService, etcdConnString, authorizationMode)
func runKubeAPI(ctx context.Context, host *hosts.Host, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, kubeAPIProcess v3.Process) error {
imageCfg, hostCfg, healthCheckURL := getProcessConfig(kubeAPIProcess)
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeAPIContainerName, host.Address, ControlRole, prsMap); err != nil {
return err
}
return runHealthcheck(ctx, host, KubeAPIPort, true, KubeAPIContainerName, df)
return runHealthcheck(ctx, host, KubeAPIContainerName, df, healthCheckURL)
}
func removeKubeAPI(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, KubeAPIContainerName, host.Address)
}
func buildKubeAPIConfig(host *hosts.Host, kubeAPIService v3.KubeAPIService, etcdConnString, authorizationMode string) (*container.Config, *container.HostConfig) {
imageCfg := &container.Config{
Image: kubeAPIService.Image,
Entrypoint: []string{"/opt/rke/entrypoint.sh",
"kube-apiserver",
"--insecure-bind-address=127.0.0.1",
"--bind-address=0.0.0.0",
"--insecure-port=0",
"--secure-port=6443",
"--cloud-provider=",
"--allow_privileged=true",
"--kubelet-preferred-address-types=InternalIP,ExternalIP,Hostname",
"--service-cluster-ip-range=" + kubeAPIService.ServiceClusterIPRange,
"--admission-control=ServiceAccount,NamespaceLifecycle,LimitRanger,PersistentVolumeLabel,DefaultStorageClass,ResourceQuota,DefaultTolerationSeconds",
"--runtime-config=batch/v2alpha1",
"--runtime-config=authentication.k8s.io/v1beta1=true",
"--storage-backend=etcd3",
"--client-ca-file=" + pki.GetCertPath(pki.CACertName),
"--tls-cert-file=" + pki.GetCertPath(pki.KubeAPICertName),
"--tls-private-key-file=" + pki.GetKeyPath(pki.KubeAPICertName),
"--service-account-key-file=" + pki.GetKeyPath(pki.KubeAPICertName),
"--etcd-cafile=" + pki.GetCertPath(pki.CACertName),
"--etcd-certfile=" + pki.GetCertPath(pki.KubeAPICertName),
"--etcd-keyfile=" + pki.GetKeyPath(pki.KubeAPICertName)},
}
imageCfg.Cmd = append(imageCfg.Cmd, "--etcd-servers="+etcdConnString)
if authorizationMode == RBACAuthorizationMode {
imageCfg.Cmd = append(imageCfg.Cmd, "--authorization-mode=RBAC")
}
if kubeAPIService.PodSecurityPolicy {
imageCfg.Cmd = append(imageCfg.Cmd, "--runtime-config=extensions/v1beta1/podsecuritypolicy=true", "--admission-control=PodSecurityPolicy")
}
hostCfg := &container.HostConfig{
VolumesFrom: []string{
SidekickContainerName,
},
Binds: []string{
"/etc/kubernetes:/etc/kubernetes:z",
},
NetworkMode: "host",
RestartPolicy: container.RestartPolicy{Name: "always"},
}
for arg, value := range kubeAPIService.ExtraArgs {
cmd := fmt.Sprintf("--%s=%s", arg, value)
imageCfg.Entrypoint = append(imageCfg.Entrypoint, cmd)
}
return imageCfg, hostCfg
}

View File

@ -2,63 +2,20 @@ package services
import (
"context"
"fmt"
"github.com/docker/docker/api/types/container"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/pki"
"github.com/rancher/types/apis/management.cattle.io/v3"
)
func runKubeController(ctx context.Context, host *hosts.Host, kubeControllerService v3.KubeControllerService, authorizationMode string, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
imageCfg, hostCfg := buildKubeControllerConfig(kubeControllerService, authorizationMode)
func runKubeController(ctx context.Context, host *hosts.Host, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, controllerProcess v3.Process) error {
imageCfg, hostCfg, healthCheckURL := getProcessConfig(controllerProcess)
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeControllerContainerName, host.Address, ControlRole, prsMap); err != nil {
return err
}
return runHealthcheck(ctx, host, KubeControllerPort, false, KubeControllerContainerName, df)
return runHealthcheck(ctx, host, KubeControllerContainerName, df, healthCheckURL)
}
func removeKubeController(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, KubeControllerContainerName, host.Address)
}
func buildKubeControllerConfig(kubeControllerService v3.KubeControllerService, authorizationMode string) (*container.Config, *container.HostConfig) {
imageCfg := &container.Config{
Image: kubeControllerService.Image,
Entrypoint: []string{"/opt/rke/entrypoint.sh",
"kube-controller-manager",
"--address=0.0.0.0",
"--cloud-provider=",
"--leader-elect=true",
"--kubeconfig=" + pki.GetConfigPath(pki.KubeControllerCertName),
"--enable-hostpath-provisioner=false",
"--node-monitor-grace-period=40s",
"--pod-eviction-timeout=5m0s",
"--v=2",
"--allocate-node-cidrs=true",
"--cluster-cidr=" + kubeControllerService.ClusterCIDR,
"--service-cluster-ip-range=" + kubeControllerService.ServiceClusterIPRange,
"--service-account-private-key-file=" + pki.GetKeyPath(pki.KubeAPICertName),
"--root-ca-file=" + pki.GetCertPath(pki.CACertName),
},
}
if authorizationMode == RBACAuthorizationMode {
imageCfg.Cmd = append(imageCfg.Cmd, "--use-service-account-credentials=true")
}
hostCfg := &container.HostConfig{
VolumesFrom: []string{
SidekickContainerName,
},
Binds: []string{
"/etc/kubernetes:/etc/kubernetes:z",
},
NetworkMode: "host",
RestartPolicy: container.RestartPolicy{Name: "always"},
}
for arg, value := range kubeControllerService.ExtraArgs {
cmd := fmt.Sprintf("--%s=%s", arg, value)
imageCfg.Entrypoint = append(imageCfg.Entrypoint, cmd)
}
return imageCfg, hostCfg
}

View File

@ -2,79 +2,20 @@ package services
import (
"context"
"fmt"
"strconv"
"github.com/docker/docker/api/types/container"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/pki"
"github.com/rancher/types/apis/management.cattle.io/v3"
)
func runKubelet(ctx context.Context, host *hosts.Host, kubeletService v3.KubeletService, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
imageCfg, hostCfg := buildKubeletConfig(host, kubeletService)
func runKubelet(ctx context.Context, host *hosts.Host, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, kubeletProcess v3.Process) error {
imageCfg, hostCfg, healthCheckURL := getProcessConfig(kubeletProcess)
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeletContainerName, host.Address, WorkerRole, prsMap); err != nil {
return err
}
return runHealthcheck(ctx, host, KubeletPort, true, KubeletContainerName, df)
return runHealthcheck(ctx, host, KubeletContainerName, df, healthCheckURL)
}
func removeKubelet(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, KubeletContainerName, host.Address)
}
func buildKubeletConfig(host *hosts.Host, kubeletService v3.KubeletService) (*container.Config, *container.HostConfig) {
imageCfg := &container.Config{
Image: kubeletService.Image,
Entrypoint: []string{"/opt/rke/entrypoint.sh",
"kubelet",
"--v=2",
"--address=0.0.0.0",
"--cluster-domain=" + kubeletService.ClusterDomain,
"--pod-infra-container-image=" + kubeletService.InfraContainerImage,
"--cgroups-per-qos=True",
"--enforce-node-allocatable=",
"--hostname-override=" + host.HostnameOverride,
"--cluster-dns=" + kubeletService.ClusterDNSServer,
"--network-plugin=cni",
"--cni-conf-dir=/etc/cni/net.d",
"--cni-bin-dir=/opt/cni/bin",
"--resolv-conf=/etc/resolv.conf",
"--allow-privileged=true",
"--cloud-provider=",
"--kubeconfig=" + pki.GetConfigPath(pki.KubeNodeCertName),
"--volume-plugin-dir=/var/lib/kubelet/volumeplugins",
"--require-kubeconfig=True",
"--fail-swap-on=" + strconv.FormatBool(kubeletService.FailSwapOn),
},
}
hostCfg := &container.HostConfig{
VolumesFrom: []string{
SidekickContainerName,
},
Binds: []string{
"/etc/kubernetes:/etc/kubernetes:z",
"/etc/cni:/etc/cni:ro,z",
"/opt/cni:/opt/cni:ro,z",
"/etc/resolv.conf:/etc/resolv.conf",
"/sys:/sys",
"/var/lib/docker:/var/lib/docker:rw,z",
"/var/lib/kubelet:/var/lib/kubelet:shared,z",
"/var/run:/var/run:rw",
"/run:/run",
"/etc/ceph:/etc/ceph",
"/dev:/host/dev",
"/var/log/containers:/var/log/containers:z",
"/var/log/pods:/var/log/pods:z"},
NetworkMode: "host",
PidMode: "host",
Privileged: true,
RestartPolicy: container.RestartPolicy{Name: "always"},
}
for arg, value := range kubeletService.ExtraArgs {
cmd := fmt.Sprintf("--%s=%s", arg, value)
imageCfg.Entrypoint = append(imageCfg.Entrypoint, cmd)
}
return imageCfg, hostCfg
}

View File

@ -2,51 +2,20 @@ package services
import (
"context"
"fmt"
"github.com/docker/docker/api/types/container"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/pki"
"github.com/rancher/types/apis/management.cattle.io/v3"
)
func runKubeproxy(ctx context.Context, host *hosts.Host, kubeproxyService v3.KubeproxyService, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
imageCfg, hostCfg := buildKubeproxyConfig(host, kubeproxyService)
func runKubeproxy(ctx context.Context, host *hosts.Host, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, kubeProxyProcess v3.Process) error {
imageCfg, hostCfg, healthCheckURL := getProcessConfig(kubeProxyProcess)
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeproxyContainerName, host.Address, WorkerRole, prsMap); err != nil {
return err
}
return runHealthcheck(ctx, host, KubeproxyPort, false, KubeproxyContainerName, df)
return runHealthcheck(ctx, host, KubeproxyContainerName, df, healthCheckURL)
}
func removeKubeproxy(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, KubeproxyContainerName, host.Address)
}
func buildKubeproxyConfig(host *hosts.Host, kubeproxyService v3.KubeproxyService) (*container.Config, *container.HostConfig) {
imageCfg := &container.Config{
Image: kubeproxyService.Image,
Entrypoint: []string{"/opt/rke/entrypoint.sh",
"kube-proxy",
"--v=2",
"--healthz-bind-address=0.0.0.0",
"--kubeconfig=" + pki.GetConfigPath(pki.KubeProxyCertName),
},
}
hostCfg := &container.HostConfig{
VolumesFrom: []string{
SidekickContainerName,
},
Binds: []string{
"/etc/kubernetes:/etc/kubernetes:z",
},
NetworkMode: "host",
RestartPolicy: container.RestartPolicy{Name: "always"},
Privileged: true,
}
for arg, value := range kubeproxyService.ExtraArgs {
cmd := fmt.Sprintf("--%s=%s", arg, value)
imageCfg.Entrypoint = append(imageCfg.Entrypoint, cmd)
}
return imageCfg, hostCfg
}

View File

@ -2,9 +2,7 @@ package services
import (
"context"
"fmt"
"github.com/docker/docker/api/types/container"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts"
"github.com/rancher/types/apis/management.cattle.io/v3"
@ -15,48 +13,11 @@ const (
NginxProxyEnvName = "CP_HOSTS"
)
func RollingUpdateNginxProxy(ctx context.Context, cpHosts []*hosts.Host, workerHosts []*hosts.Host, nginxProxyImage string, prsMap map[string]v3.PrivateRegistry) error {
nginxProxyEnv := buildProxyEnv(cpHosts)
for _, host := range workerHosts {
imageCfg, hostCfg := buildNginxProxyConfig(host, nginxProxyEnv, nginxProxyImage)
if err := docker.DoRollingUpdateContainer(ctx, host.DClient, imageCfg, hostCfg, NginxProxyContainerName, host.Address, WorkerRole, prsMap); err != nil {
return err
}
}
return nil
}
func runNginxProxy(ctx context.Context, host *hosts.Host, cpHosts []*hosts.Host, nginxProxyImage string, prsMap map[string]v3.PrivateRegistry) error {
nginxProxyEnv := buildProxyEnv(cpHosts)
imageCfg, hostCfg := buildNginxProxyConfig(host, nginxProxyEnv, nginxProxyImage)
func runNginxProxy(ctx context.Context, host *hosts.Host, prsMap map[string]v3.PrivateRegistry, proxyProcess v3.Process) error {
imageCfg, hostCfg, _ := getProcessConfig(proxyProcess)
return docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, NginxProxyContainerName, host.Address, WorkerRole, prsMap)
}
func removeNginxProxy(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, NginxProxyContainerName, host.Address)
}
func buildNginxProxyConfig(host *hosts.Host, nginxProxyEnv, nginxProxyImage string) (*container.Config, *container.HostConfig) {
imageCfg := &container.Config{
Image: nginxProxyImage,
Env: []string{fmt.Sprintf("%s=%s", NginxProxyEnvName, nginxProxyEnv)},
Cmd: []string{fmt.Sprintf("%s=%s", NginxProxyEnvName, nginxProxyEnv)},
}
hostCfg := &container.HostConfig{
NetworkMode: "host",
RestartPolicy: container.RestartPolicy{Name: "always"},
}
return imageCfg, hostCfg
}
func buildProxyEnv(cpHosts []*hosts.Host) string {
proxyEnv := ""
for i, cpHost := range cpHosts {
proxyEnv += fmt.Sprintf("%s", cpHost.InternalAddress)
if i < (len(cpHosts) - 1) {
proxyEnv += ","
}
}
return proxyEnv
}

View File

@ -2,51 +2,20 @@ package services
import (
"context"
"fmt"
"github.com/docker/docker/api/types/container"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/pki"
"github.com/rancher/types/apis/management.cattle.io/v3"
)
func runScheduler(ctx context.Context, host *hosts.Host, schedulerService v3.SchedulerService, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
imageCfg, hostCfg := buildSchedulerConfig(host, schedulerService)
func runScheduler(ctx context.Context, host *hosts.Host, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, schedulerProcess v3.Process) error {
imageCfg, hostCfg, healthCheckURL := getProcessConfig(schedulerProcess)
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, SchedulerContainerName, host.Address, ControlRole, prsMap); err != nil {
return err
}
return runHealthcheck(ctx, host, SchedulerPort, false, SchedulerContainerName, df)
return runHealthcheck(ctx, host, SchedulerContainerName, df, healthCheckURL)
}
func removeScheduler(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, SchedulerContainerName, host.Address)
}
func buildSchedulerConfig(host *hosts.Host, schedulerService v3.SchedulerService) (*container.Config, *container.HostConfig) {
imageCfg := &container.Config{
Image: schedulerService.Image,
Entrypoint: []string{"/opt/rke/entrypoint.sh",
"kube-scheduler",
"--leader-elect=true",
"--v=2",
"--address=0.0.0.0",
"--kubeconfig=" + pki.GetConfigPath(pki.KubeSchedulerCertName),
},
}
hostCfg := &container.HostConfig{
VolumesFrom: []string{
SidekickContainerName,
},
Binds: []string{
"/etc/kubernetes:/etc/kubernetes:z",
},
NetworkMode: "host",
RestartPolicy: container.RestartPolicy{Name: "always"},
}
for arg, value := range schedulerService.ExtraArgs {
cmd := fmt.Sprintf("--%s=%s", arg, value)
imageCfg.Entrypoint = append(imageCfg.Entrypoint, cmd)
}
return imageCfg, hostCfg
}

View File

@ -2,6 +2,7 @@ package services
import (
"context"
"fmt"
"github.com/docker/docker/api/types/container"
"github.com/rancher/rke/docker"
@ -34,17 +35,7 @@ const (
KubeproxyPort = 10256
)
func buildSidekickConfig(sidekickImage string) (*container.Config, *container.HostConfig) {
imageCfg := &container.Config{
Image: sidekickImage,
}
hostCfg := &container.HostConfig{
NetworkMode: "none",
}
return imageCfg, hostCfg
}
func runSidekick(ctx context.Context, host *hosts.Host, sidekickImage string, prsMap map[string]v3.PrivateRegistry) error {
func runSidekick(ctx context.Context, host *hosts.Host, prsMap map[string]v3.PrivateRegistry, sidecarProcess v3.Process) error {
isRunning, err := docker.IsContainerRunning(ctx, host.DClient, host.Address, SidekickContainerName, true)
if err != nil {
return err
@ -53,8 +44,10 @@ func runSidekick(ctx context.Context, host *hosts.Host, sidekickImage string, pr
log.Infof(ctx, "[%s] Sidekick container already created on host [%s]", SidekickServiceName, host.Address)
return nil
}
imageCfg, hostCfg := buildSidekickConfig(sidekickImage)
if err := docker.UseLocalOrPull(ctx, host.DClient, host.Address, sidekickImage, SidekickServiceName, prsMap); err != nil {
imageCfg, hostCfg, _ := getProcessConfig(sidecarProcess)
sidecarImage := sidecarProcess.Image
if err := docker.UseLocalOrPull(ctx, host.DClient, host.Address, sidecarImage, SidekickServiceName, prsMap); err != nil {
return err
}
if _, err := docker.CreateContiner(ctx, host.DClient, host.Address, SidekickContainerName, imageCfg, hostCfg); err != nil {
@ -66,3 +59,32 @@ func runSidekick(ctx context.Context, host *hosts.Host, sidekickImage string, pr
func removeSidekick(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, SidekickContainerName, host.Address)
}
func getProcessConfig(process v3.Process) (*container.Config, *container.HostConfig, string) {
imageCfg := &container.Config{
Entrypoint: process.Command,
Cmd: process.Args,
Env: process.Env,
Image: process.Image,
}
// var pidMode container.PidMode
// pidMode = process.PidMode
hostCfg := &container.HostConfig{
VolumesFrom: process.VolumesFrom,
Binds: process.Binds,
NetworkMode: container.NetworkMode(process.NetworkMode),
PidMode: container.PidMode(process.PidMode),
Privileged: process.Privileged,
}
if len(process.RestartPolicy) > 0 {
hostCfg.RestartPolicy = container.RestartPolicy{Name: process.RestartPolicy}
}
return imageCfg, hostCfg, process.HealthCheck.URL
}
func GetHealthCheckURL(useTLS bool, port int) string {
if useTLS {
return fmt.Sprintf("%s%s:%d%s", HTTPSProtoPrefix, HealthzAddress, port, HealthzEndpoint)
}
return fmt.Sprintf("%s%s:%d%s", HTTPProtoPrefix, HealthzAddress, port, HealthzEndpoint)
}

View File

@ -13,18 +13,20 @@ const (
unschedulableEtcdTaint = "node-role.kubernetes.io/etcd=true:NoExecute"
)
func RunWorkerPlane(ctx context.Context, controlHosts, workerHosts, etcdHosts []*hosts.Host, workerServices v3.RKEConfigServices, nginxProxyImage, sidekickImage string, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
func RunWorkerPlane(ctx context.Context, allHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, processMap map[string]v3.Process, kubeletProcessHostMap map[*hosts.Host]v3.Process) error {
log.Infof(ctx, "[%s] Building up Worker Plane..", WorkerRole)
var errgrp errgroup.Group
allHosts := hosts.GetUniqueHostList(etcdHosts, controlHosts, workerHosts)
for _, host := range allHosts {
if !host.IsControl && !host.IsWorker {
// Add unschedulable taint
host.ToAddTaints = append(host.ToAddTaints, unschedulableEtcdTaint)
}
runHost := host
// maps are not thread safe
hostProcessMap := copyProcessMap(processMap)
errgrp.Go(func() error {
return doDeployWorkerPlane(ctx, runHost, workerServices, nginxProxyImage, sidekickImage, localConnDialerFactory, controlHosts, prsMap)
hostProcessMap[KubeletContainerName] = kubeletProcessHostMap[runHost]
return doDeployWorkerPlane(ctx, runHost, localConnDialerFactory, prsMap, hostProcessMap)
})
}
if err := errgrp.Wait(); err != nil {
@ -62,25 +64,29 @@ func RemoveWorkerPlane(ctx context.Context, workerHosts []*hosts.Host, force boo
}
func doDeployWorkerPlane(ctx context.Context, host *hosts.Host,
workerServices v3.RKEConfigServices,
nginxProxyImage, sidekickImage string,
localConnDialerFactory hosts.DialerFactory,
controlHosts []*hosts.Host,
prsMap map[string]v3.PrivateRegistry) error {
prsMap map[string]v3.PrivateRegistry, processMap map[string]v3.Process) error {
// run nginx proxy
if !host.IsControl {
if err := runNginxProxy(ctx, host, controlHosts, nginxProxyImage, prsMap); err != nil {
if err := runNginxProxy(ctx, host, prsMap, processMap[NginxProxyContainerName]); err != nil {
return err
}
}
// run sidekick
if err := runSidekick(ctx, host, sidekickImage, prsMap); err != nil {
if err := runSidekick(ctx, host, prsMap, processMap[SidekickContainerName]); err != nil {
return err
}
// run kubelet
if err := runKubelet(ctx, host, workerServices.Kubelet, localConnDialerFactory, prsMap); err != nil {
if err := runKubelet(ctx, host, localConnDialerFactory, prsMap, processMap[KubeletContainerName]); err != nil {
return err
}
return runKubeproxy(ctx, host, workerServices.Kubeproxy, localConnDialerFactory, prsMap)
return runKubeproxy(ctx, host, localConnDialerFactory, prsMap, processMap[KubeproxyContainerName])
}
func copyProcessMap(m map[string]v3.Process) map[string]v3.Process {
c := make(map[string]v3.Process)
for k, v := range m {
c[k] = v
}
return c
}