diff --git a/cluster/cluster.go b/cluster/cluster.go index 6af5a8fb..981482cf 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -54,17 +54,22 @@ const ( func (c *Cluster) DeployControlPlane(ctx context.Context) error { // Deploy Etcd Plane - if err := services.RunEtcdPlane(ctx, c.EtcdHosts, c.Services.Etcd, c.LocalConnDialerFactory, c.PrivateRegistriesMap); err != nil { + etcdProcessHostMap := c.getEtcdProcessHostMap(nil) + + if err := services.RunEtcdPlane(ctx, c.EtcdHosts, etcdProcessHostMap, c.LocalConnDialerFactory, c.PrivateRegistriesMap); err != nil { return fmt.Errorf("[etcd] Failed to bring up Etcd Plane: %v", err) } // Deploy Control plane + processMap := map[string]v3.Process{ + services.SidekickContainerName: c.BuildSidecarProcess(), + services.KubeAPIContainerName: c.BuildKubeAPIProcess(), + services.KubeControllerContainerName: c.BuildKubeControllerProcess(), + services.SchedulerContainerName: c.BuildSchedulerProcess(), + } if err := services.RunControlPlane(ctx, c.ControlPlaneHosts, - c.EtcdHosts, - c.Services, - c.SystemImages.KubernetesServicesSidecar, - c.Authorization.Mode, c.LocalConnDialerFactory, - c.PrivateRegistriesMap); err != nil { + c.PrivateRegistriesMap, + processMap); err != nil { return fmt.Errorf("[controlPlane] Failed to bring up Control Plane: %v", err) } // Apply Authz configuration after deploying controlplane @@ -76,14 +81,22 @@ func (c *Cluster) DeployControlPlane(ctx context.Context) error { func (c *Cluster) DeployWorkerPlane(ctx context.Context) error { // Deploy Worker Plane - if err := services.RunWorkerPlane(ctx, c.ControlPlaneHosts, - c.WorkerHosts, - c.EtcdHosts, - c.Services, - c.SystemImages.NginxProxy, - c.SystemImages.KubernetesServicesSidecar, + processMap := map[string]v3.Process{ + services.SidekickContainerName: c.BuildSidecarProcess(), + services.KubeproxyContainerName: c.BuildKubeProxyProcess(), + services.NginxProxyContainerName: c.BuildProxyProcess(), + } + kubeletProcessHostMap := make(map[*hosts.Host]v3.Process) + for _, host := range hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts) { + kubeletProcessHostMap[host] = c.BuildKubeletProcess(host) + } + allHosts := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts) + if err := services.RunWorkerPlane(ctx, allHosts, c.LocalConnDialerFactory, - c.PrivateRegistriesMap); err != nil { + c.PrivateRegistriesMap, + processMap, + kubeletProcessHostMap, + ); err != nil { return fmt.Errorf("[workerPlane] Failed to bring up Worker Plane: %v", err) } return nil @@ -284,3 +297,11 @@ func ConfigureCluster(ctx context.Context, rkeConfig v3.RancherKubernetesEngineC return kubeCluster.deployAddons(ctx) } + +func (c *Cluster) getEtcdProcessHostMap(readyEtcdHosts []*hosts.Host) map[*hosts.Host]v3.Process { + etcdProcessHostMap := make(map[*hosts.Host]v3.Process) + for _, host := range c.EtcdHosts { + etcdProcessHostMap[host] = c.BuildEtcdProcess(host, readyEtcdHosts) + } + return etcdProcessHostMap +} diff --git a/cluster/network.go b/cluster/network.go index cd759df4..f51b085e 100644 --- a/cluster/network.go +++ b/cluster/network.go @@ -41,6 +41,9 @@ const ( KubeProxyPort = "10256" FlannetVXLANPortUDP = "8472" + ProtocolTCP = "TCP" + ProtocolUDP = "UDP" + FlannelNetworkPlugin = "flannel" FlannelImage = "flannel_image" FlannelCNIImage = "flannel_cni_image" @@ -96,6 +99,19 @@ const ( RBACConfig = "RBACConfig" ) +var EtcdPortList = []string{ + EtcdPort1, + EtcdPort2, +} + +var ControlPlanePortList = []string{ + KubeAPIPort, +} + +var WorkerPortList = []string{ + KubeletPort, +} + func (c *Cluster) deployNetworkPlugin(ctx context.Context) error { log.Infof(ctx, "[network] Setting up network plugin: %s", c.Network.Plugin) switch c.Network.Plugin { @@ -262,27 +278,17 @@ func (c *Cluster) deployTCPPortListeners(ctx context.Context, currentCluster *Cl workerHosts = c.WorkerHosts } // deploy ectd listeners - etcdPortList := []string{ - EtcdPort1, - EtcdPort2, - } - if err := c.deployListenerOnPlane(ctx, etcdPortList, etcdHosts, EtcdPortListenContainer); err != nil { + if err := c.deployListenerOnPlane(ctx, EtcdPortList, etcdHosts, EtcdPortListenContainer); err != nil { return err } // deploy controlplane listeners - controlPlanePortList := []string{ - KubeAPIPort, - } - if err := c.deployListenerOnPlane(ctx, controlPlanePortList, cpHosts, CPPortListenContainer); err != nil { + if err := c.deployListenerOnPlane(ctx, ControlPlanePortList, cpHosts, CPPortListenContainer); err != nil { return err } // deploy worker listeners - workerPortList := []string{ - KubeletPort, - } - if err := c.deployListenerOnPlane(ctx, workerPortList, workerHosts, WorkerPortListenContainer); err != nil { + if err := c.deployListenerOnPlane(ctx, WorkerPortList, workerHosts, WorkerPortListenContainer); err != nil { return err } log.Infof(ctx, "[network] Port listener containers deployed successfully") @@ -360,17 +366,13 @@ func removeListenerFromPlane(ctx context.Context, hostPlane []*hosts.Host, conta func (c *Cluster) runServicePortChecks(ctx context.Context) error { var errgrp errgroup.Group // check etcd <-> etcd - etcdPortList := []string{ - EtcdPort1, - EtcdPort2, - } // one etcd host is a pass if len(c.EtcdHosts) > 1 { log.Infof(ctx, "[network] Running etcd <-> etcd port checks") for _, host := range c.EtcdHosts { runHost := host errgrp.Go(func() error { - return checkPlaneTCPPortsFromHost(ctx, runHost, etcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap) + return checkPlaneTCPPortsFromHost(ctx, runHost, EtcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap) }) } if err := errgrp.Wait(); err != nil { @@ -382,7 +384,7 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error { for _, host := range c.ControlPlaneHosts { runHost := host errgrp.Go(func() error { - return checkPlaneTCPPortsFromHost(ctx, runHost, etcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap) + return checkPlaneTCPPortsFromHost(ctx, runHost, EtcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap) }) } if err := errgrp.Wait(); err != nil { @@ -392,7 +394,7 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error { for _, host := range c.WorkerHosts { runHost := host errgrp.Go(func() error { - return checkPlaneTCPPortsFromHost(ctx, runHost, etcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap) + return checkPlaneTCPPortsFromHost(ctx, runHost, EtcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap) }) } if err := errgrp.Wait(); err != nil { @@ -400,13 +402,10 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error { } // check controle plane -> Workers log.Infof(ctx, "[network] Running control plane -> etcd port checks") - workerPortList := []string{ - KubeletPort, - } for _, host := range c.ControlPlaneHosts { runHost := host errgrp.Go(func() error { - return checkPlaneTCPPortsFromHost(ctx, runHost, workerPortList, c.WorkerHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap) + return checkPlaneTCPPortsFromHost(ctx, runHost, WorkerPortList, c.WorkerHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap) }) } if err := errgrp.Wait(); err != nil { @@ -414,13 +413,10 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error { } // check workers -> control plane log.Infof(ctx, "[network] Running workers -> control plane port checks") - controlPlanePortList := []string{ - KubeAPIPort, - } for _, host := range c.WorkerHosts { runHost := host errgrp.Go(func() error { - return checkPlaneTCPPortsFromHost(ctx, runHost, controlPlanePortList, c.ControlPlaneHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap) + return checkPlaneTCPPortsFromHost(ctx, runHost, ControlPlanePortList, c.ControlPlaneHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap) }) } return errgrp.Wait() diff --git a/cluster/plan.go b/cluster/plan.go new file mode 100644 index 00000000..6609497b --- /dev/null +++ b/cluster/plan.go @@ -0,0 +1,387 @@ +package cluster + +import ( + "context" + "fmt" + "strconv" + + "github.com/rancher/rke/hosts" + "github.com/rancher/rke/pki" + "github.com/rancher/rke/services" + "github.com/rancher/types/apis/management.cattle.io/v3" +) + +func GeneratePlan(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig) (v3.RKEPlan, error) { + clusterPlan := v3.RKEPlan{} + myCluster, _ := ParseCluster(ctx, rkeConfig, "", "", nil, nil) + // rkeConfig.Nodes are already unique. But they don't have role flags. So I will use the parsed cluster.Hosts to make use of the role flags. + uniqHosts := hosts.GetUniqueHostList(myCluster.EtcdHosts, myCluster.ControlPlaneHosts, myCluster.WorkerHosts) + for _, host := range uniqHosts { + clusterPlan.Nodes = append(clusterPlan.Nodes, BuildRKEConfigNodePlan(ctx, myCluster, host)) + } + return clusterPlan, nil +} + +func BuildRKEConfigNodePlan(ctx context.Context, myCluster *Cluster, host *hosts.Host) v3.RKEConfigNodePlan { + processes := []v3.Process{} + portChecks := []v3.PortCheck{} + // Everybody gets a sidecar and a kubelet.. + processes = append(processes, myCluster.BuildSidecarProcess()) + processes = append(processes, myCluster.BuildKubeletProcess(host)) + processes = append(processes, myCluster.BuildKubeProxyProcess()) + + portChecks = append(portChecks, BuildPortChecksFromPortList(host, WorkerPortList, ProtocolTCP)...) + // Do we need an nginxProxy for this one ? + if host.IsWorker && !host.IsControl { + processes = append(processes, myCluster.BuildProxyProcess()) + } + if host.IsControl { + processes = append(processes, myCluster.BuildKubeAPIProcess()) + processes = append(processes, myCluster.BuildKubeControllerProcess()) + processes = append(processes, myCluster.BuildSchedulerProcess()) + + portChecks = append(portChecks, BuildPortChecksFromPortList(host, ControlPlanePortList, ProtocolTCP)...) + } + if host.IsEtcd { + processes = append(processes, myCluster.BuildEtcdProcess(host, nil)) + + portChecks = append(portChecks, BuildPortChecksFromPortList(host, EtcdPortList, ProtocolTCP)...) + } + return v3.RKEConfigNodePlan{ + Address: host.Address, + Processes: processes, + PortChecks: portChecks, + } +} + +func (c *Cluster) BuildKubeAPIProcess() v3.Process { + etcdConnString := services.GetEtcdConnString(c.EtcdHosts) + args := []string{} + Command := []string{ + "/opt/rke/entrypoint.sh", + "kube-apiserver", + "--insecure-bind-address=127.0.0.1", + "--bind-address=0.0.0.0", + "--insecure-port=0", + "--secure-port=6443", + "--cloud-provider=", + "--allow_privileged=true", + "--kubelet-preferred-address-types=InternalIP,ExternalIP,Hostname", + "--service-cluster-ip-range=" + c.Services.KubeAPI.ServiceClusterIPRange, + "--admission-control=ServiceAccount,NamespaceLifecycle,LimitRanger,PersistentVolumeLabel,DefaultStorageClass,ResourceQuota,DefaultTolerationSeconds", + "--runtime-config=batch/v2alpha1", + "--runtime-config=authentication.k8s.io/v1beta1=true", + "--storage-backend=etcd3", + "--client-ca-file=" + pki.GetCertPath(pki.CACertName), + "--tls-cert-file=" + pki.GetCertPath(pki.KubeAPICertName), + "--tls-private-key-file=" + pki.GetKeyPath(pki.KubeAPICertName), + "--service-account-key-file=" + pki.GetKeyPath(pki.KubeAPICertName), + "--etcd-cafile=" + pki.GetCertPath(pki.CACertName), + "--etcd-certfile=" + pki.GetCertPath(pki.KubeAPICertName), + "--etcd-keyfile=" + pki.GetKeyPath(pki.KubeAPICertName), + } + args = append(args, "--etcd-servers="+etcdConnString) + + if c.Authorization.Mode == services.RBACAuthorizationMode { + args = append(args, "--authorization-mode=RBAC") + } + if c.Services.KubeAPI.PodSecurityPolicy { + args = append(args, "--runtime-config=extensions/v1beta1/podsecuritypolicy=true", "--admission-control=PodSecurityPolicy") + } + + VolumesFrom := []string{ + services.SidekickContainerName, + } + Binds := []string{ + "/etc/kubernetes:/etc/kubernetes", + } + + for arg, value := range c.Services.KubeAPI.ExtraArgs { + cmd := fmt.Sprintf("--%s=%s", arg, value) + Command = append(Command, cmd) + } + healthCheck := v3.HealthCheck{ + URL: services.GetHealthCheckURL(true, services.KubeAPIPort), + } + return v3.Process{ + Command: Command, + Args: args, + VolumesFrom: VolumesFrom, + Binds: Binds, + NetworkMode: "host", + RestartPolicy: "always", + Image: c.Services.KubeAPI.Image, + HealthCheck: healthCheck, + } +} + +func (c *Cluster) BuildKubeControllerProcess() v3.Process { + Command := []string{"/opt/rke/entrypoint.sh", + "kube-controller-manager", + "--address=0.0.0.0", + "--cloud-provider=", + "--leader-elect=true", + "--kubeconfig=" + pki.GetConfigPath(pki.KubeControllerCertName), + "--enable-hostpath-provisioner=false", + "--node-monitor-grace-period=40s", + "--pod-eviction-timeout=5m0s", + "--v=2", + "--allocate-node-cidrs=true", + "--cluster-cidr=" + c.ClusterCIDR, + "--service-cluster-ip-range=" + c.Services.KubeController.ServiceClusterIPRange, + "--service-account-private-key-file=" + pki.GetKeyPath(pki.KubeAPICertName), + "--root-ca-file=" + pki.GetCertPath(pki.CACertName), + } + args := []string{} + if c.Authorization.Mode == services.RBACAuthorizationMode { + args = append(args, "--use-service-account-credentials=true") + } + VolumesFrom := []string{ + services.SidekickContainerName, + } + Binds := []string{ + "/etc/kubernetes:/etc/kubernetes", + } + + for arg, value := range c.Services.KubeController.ExtraArgs { + cmd := fmt.Sprintf("--%s=%s", arg, value) + Command = append(Command, cmd) + } + healthCheck := v3.HealthCheck{ + URL: services.GetHealthCheckURL(false, services.KubeControllerPort), + } + return v3.Process{ + Command: Command, + Args: args, + VolumesFrom: VolumesFrom, + Binds: Binds, + NetworkMode: "host", + RestartPolicy: "always", + Image: c.Services.KubeController.Image, + HealthCheck: healthCheck, + } +} + +func (c *Cluster) BuildKubeletProcess(host *hosts.Host) v3.Process { + + Command := []string{"/opt/rke/entrypoint.sh", + "kubelet", + "--v=2", + "--address=0.0.0.0", + "--cluster-domain=" + c.ClusterDomain, + "--pod-infra-container-image=" + c.Services.Kubelet.InfraContainerImage, + "--cgroups-per-qos=True", + "--enforce-node-allocatable=", + "--hostname-override=" + host.HostnameOverride, + "--cluster-dns=" + c.ClusterDNSServer, + "--network-plugin=cni", + "--cni-conf-dir=/etc/cni/net.d", + "--cni-bin-dir=/opt/cni/bin", + "--resolv-conf=/etc/resolv.conf", + "--allow-privileged=true", + "--cloud-provider=", + "--kubeconfig=" + pki.GetConfigPath(pki.KubeNodeCertName), + "--require-kubeconfig=True", + "--fail-swap-on=" + strconv.FormatBool(c.Services.Kubelet.FailSwapOn), + } + + VolumesFrom := []string{ + services.SidekickContainerName, + } + Binds := []string{ + "/etc/kubernetes:/etc/kubernetes", + "/usr/libexec/kubernetes/kubelet-plugins:/usr/libexec/kubernetes/kubelet-plugins", + "/etc/cni:/etc/cni:ro", + "/opt/cni:/opt/cni:ro", + "/etc/resolv.conf:/etc/resolv.conf", + "/sys:/sys", + "/var/lib/docker:/var/lib/docker:rw", + "/var/lib/kubelet:/var/lib/kubelet:shared", + "/var/run:/var/run:rw", + "/run:/run", + "/etc/ceph:/etc/ceph", + "/dev:/host/dev", + "/var/log/containers:/var/log/containers", + "/var/log/pods:/var/log/pods", + } + + for arg, value := range c.Services.Kubelet.ExtraArgs { + cmd := fmt.Sprintf("--%s=%s", arg, value) + Command = append(Command, cmd) + } + healthCheck := v3.HealthCheck{ + URL: services.GetHealthCheckURL(true, services.KubeletPort), + } + return v3.Process{ + Command: Command, + VolumesFrom: VolumesFrom, + Binds: Binds, + NetworkMode: "host", + RestartPolicy: "always", + Image: c.Services.Kubelet.Image, + PidMode: "host", + Privileged: true, + HealthCheck: healthCheck, + } +} + +func (c *Cluster) BuildKubeProxyProcess() v3.Process { + Command := []string{"/opt/rke/entrypoint.sh", + "kube-proxy", + "--v=2", + "--healthz-bind-address=0.0.0.0", + "--kubeconfig=" + pki.GetConfigPath(pki.KubeProxyCertName), + } + VolumesFrom := []string{ + services.SidekickContainerName, + } + Binds := []string{ + "/etc/kubernetes:/etc/kubernetes", + } + + for arg, value := range c.Services.Kubeproxy.ExtraArgs { + cmd := fmt.Sprintf("--%s=%s", arg, value) + Command = append(Command, cmd) + } + healthCheck := v3.HealthCheck{ + URL: services.GetHealthCheckURL(false, services.KubeproxyPort), + } + return v3.Process{ + Command: Command, + VolumesFrom: VolumesFrom, + Binds: Binds, + NetworkMode: "host", + RestartPolicy: "always", + PidMode: "host", + Privileged: true, + HealthCheck: healthCheck, + Image: c.Services.Kubeproxy.Image, + } +} + +func (c *Cluster) BuildProxyProcess() v3.Process { + nginxProxyEnv := "" + for i, host := range c.ControlPlaneHosts { + nginxProxyEnv += fmt.Sprintf("%s", host.InternalAddress) + if i < (len(c.ControlPlaneHosts) - 1) { + nginxProxyEnv += "," + } + } + Env := []string{fmt.Sprintf("%s=%s", services.NginxProxyEnvName, nginxProxyEnv)} + + return v3.Process{ + Env: Env, + NetworkMode: "host", + RestartPolicy: "always", + HealthCheck: v3.HealthCheck{}, + Image: c.SystemImages.NginxProxy, + } +} + +func (c *Cluster) BuildSchedulerProcess() v3.Process { + Command := []string{"/opt/rke/entrypoint.sh", + "kube-scheduler", + "--leader-elect=true", + "--v=2", + "--address=0.0.0.0", + "--kubeconfig=" + pki.GetConfigPath(pki.KubeSchedulerCertName), + } + VolumesFrom := []string{ + services.SidekickContainerName, + } + Binds := []string{ + "/etc/kubernetes:/etc/kubernetes", + } + + for arg, value := range c.Services.Scheduler.ExtraArgs { + cmd := fmt.Sprintf("--%s=%s", arg, value) + Command = append(Command, cmd) + } + healthCheck := v3.HealthCheck{ + URL: services.GetHealthCheckURL(false, services.SchedulerPort), + } + return v3.Process{ + Command: Command, + Binds: Binds, + VolumesFrom: VolumesFrom, + NetworkMode: "host", + RestartPolicy: "always", + Image: c.Services.Scheduler.Image, + HealthCheck: healthCheck, + } +} + +func (c *Cluster) BuildSidecarProcess() v3.Process { + + return v3.Process{ + NetworkMode: "none", + Image: c.SystemImages.KubernetesServicesSidecar, + HealthCheck: v3.HealthCheck{}, + } +} + +func (c *Cluster) BuildEtcdProcess(host *hosts.Host, etcdHosts []*hosts.Host) v3.Process { + nodeName := pki.GetEtcdCrtName(host.InternalAddress) + initCluster := "" + if len(etcdHosts) == 0 { + initCluster = services.GetEtcdInitialCluster(c.EtcdHosts) + } else { + initCluster = services.GetEtcdInitialCluster(etcdHosts) + } + + clusterState := "new" + if host.ExistingEtcdCluster { + clusterState = "existing" + } + args := []string{"/usr/local/bin/etcd", + "--name=etcd-" + host.HostnameOverride, + "--data-dir=/etcd-data", + "--advertise-client-urls=https://" + host.InternalAddress + ":2379,https://" + host.InternalAddress + ":4001", + "--listen-client-urls=https://0.0.0.0:2379", + "--initial-advertise-peer-urls=https://" + host.InternalAddress + ":2380", + "--listen-peer-urls=https://0.0.0.0:2380", + "--initial-cluster-token=etcd-cluster-1", + "--initial-cluster=" + initCluster, + "--initial-cluster-state=" + clusterState, + "--peer-client-cert-auth", + "--client-cert-auth", + "--trusted-ca-file=" + pki.GetCertPath(pki.CACertName), + "--peer-trusted-ca-file=" + pki.GetCertPath(pki.CACertName), + "--cert-file=" + pki.GetCertPath(nodeName), + "--key-file=" + pki.GetKeyPath(nodeName), + "--peer-cert-file=" + pki.GetCertPath(nodeName), + "--peer-key-file=" + pki.GetKeyPath(nodeName), + } + + Binds := []string{ + "/var/lib/etcd:/etcd-data:z", + "/etc/kubernetes:/etc/kubernetes:z", + } + for arg, value := range c.Services.Etcd.ExtraArgs { + cmd := fmt.Sprintf("--%s=%s", arg, value) + args = append(args, cmd) + } + healthCheck := v3.HealthCheck{ + URL: services.EtcdHealthCheckURL, + } + return v3.Process{ + Args: args, + Binds: Binds, + NetworkMode: "host", + Image: c.Services.Etcd.Image, + HealthCheck: healthCheck, + } +} + +func BuildPortChecksFromPortList(host *hosts.Host, portList []string, proto string) []v3.PortCheck { + portChecks := []v3.PortCheck{} + for _, port := range portList { + intPort, _ := strconv.Atoi(port) + portChecks = append(portChecks, v3.PortCheck{ + Address: host.Address, + Port: intPort, + Protocol: proto, + }) + } + return portChecks +} diff --git a/cluster/reconcile.go b/cluster/reconcile.go index e947e2e0..5398afb9 100644 --- a/cluster/reconcile.go +++ b/cluster/reconcile.go @@ -197,7 +197,10 @@ func reconcileEtcd(ctx context.Context, currentCluster, kubeCluster *Cluster, ku return err } etcdHost.ToAddEtcdMember = false - if err := services.ReloadEtcdCluster(ctx, kubeCluster.EtcdHosts, kubeCluster.Services.Etcd, currentCluster.LocalConnDialerFactory, clientCert, clientkey, currentCluster.PrivateRegistriesMap); err != nil { + readyHosts := getReadyEtcdHosts(kubeCluster.EtcdHosts) + etcdProcessHostMap := kubeCluster.getEtcdProcessHostMap(readyHosts) + + if err := services.ReloadEtcdCluster(ctx, readyHosts, currentCluster.LocalConnDialerFactory, clientCert, clientkey, currentCluster.PrivateRegistriesMap, etcdProcessHostMap); err != nil { return err } } @@ -220,3 +223,14 @@ func syncLabels(ctx context.Context, currentCluster, kubeCluster *Cluster) { } } } + +func getReadyEtcdHosts(etcdHosts []*hosts.Host) []*hosts.Host { + readyEtcdHosts := []*hosts.Host{} + for _, host := range etcdHosts { + if !host.ToAddEtcdMember { + readyEtcdHosts = append(readyEtcdHosts, host) + host.ExistingEtcdCluster = true + } + } + return readyEtcdHosts +} diff --git a/services/controlplane.go b/services/controlplane.go index 008fd81d..b2fb36ff 100644 --- a/services/controlplane.go +++ b/services/controlplane.go @@ -9,13 +9,13 @@ import ( "golang.org/x/sync/errgroup" ) -func RunControlPlane(ctx context.Context, controlHosts, etcdHosts []*hosts.Host, controlServices v3.RKEConfigServices, sidekickImage, authorizationMode string, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error { +func RunControlPlane(ctx context.Context, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, processMap map[string]v3.Process) error { log.Infof(ctx, "[%s] Building up Controller Plane..", ControlRole) var errgrp errgroup.Group for _, host := range controlHosts { runHost := host errgrp.Go(func() error { - return doDeployControlHost(ctx, runHost, etcdHosts, controlServices, sidekickImage, authorizationMode, localConnDialerFactory, prsMap) + return doDeployControlHost(ctx, runHost, localConnDialerFactory, prsMap, processMap) }) } if err := errgrp.Wait(); err != nil { @@ -66,24 +66,24 @@ func RemoveControlPlane(ctx context.Context, controlHosts []*hosts.Host, force b return nil } -func doDeployControlHost(ctx context.Context, host *hosts.Host, etcdHosts []*hosts.Host, controlServices v3.RKEConfigServices, sidekickImage, authorizationMode string, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error { +func doDeployControlHost(ctx context.Context, host *hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, processMap map[string]v3.Process) error { if host.IsWorker { if err := removeNginxProxy(ctx, host); err != nil { return err } } // run sidekick - if err := runSidekick(ctx, host, sidekickImage, prsMap); err != nil { + if err := runSidekick(ctx, host, prsMap, processMap[SidekickContainerName]); err != nil { return err } // run kubeapi - if err := runKubeAPI(ctx, host, etcdHosts, controlServices.KubeAPI, authorizationMode, localConnDialerFactory, prsMap); err != nil { + if err := runKubeAPI(ctx, host, localConnDialerFactory, prsMap, processMap[KubeAPIContainerName]); err != nil { return err } // run kubecontroller - if err := runKubeController(ctx, host, controlServices.KubeController, authorizationMode, localConnDialerFactory, prsMap); err != nil { + if err := runKubeController(ctx, host, localConnDialerFactory, prsMap, processMap[KubeControllerContainerName]); err != nil { return err } // run scheduler - return runScheduler(ctx, host, controlServices.Scheduler, localConnDialerFactory, prsMap) + return runScheduler(ctx, host, localConnDialerFactory, prsMap, processMap[SchedulerContainerName]) } diff --git a/services/etcd.go b/services/etcd.go index afd93035..df833508 100644 --- a/services/etcd.go +++ b/services/etcd.go @@ -7,22 +7,21 @@ import ( "context" etcdclient "github.com/coreos/etcd/client" - "github.com/docker/docker/api/types/container" "github.com/rancher/rke/docker" "github.com/rancher/rke/hosts" "github.com/rancher/rke/log" - "github.com/rancher/rke/pki" "github.com/rancher/types/apis/management.cattle.io/v3" "github.com/sirupsen/logrus" ) -func RunEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, etcdService v3.ETCDService, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error { - log.Infof(ctx, "[%s] Building up Etcd Plane..", ETCDRole) - initCluster := getEtcdInitialCluster(etcdHosts) - for _, host := range etcdHosts { +const ( + EtcdHealthCheckURL = "https://127.0.0.1:2379/health" +) - nodeName := pki.GetEtcdCrtName(host.InternalAddress) - imageCfg, hostCfg := buildEtcdConfig(host, etcdService, initCluster, nodeName) +func RunEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, etcdProcessHostMap map[*hosts.Host]v3.Process, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error { + log.Infof(ctx, "[%s] Building up Etcd Plane..", ETCDRole) + for _, host := range etcdHosts { + imageCfg, hostCfg, _ := getProcessConfig(etcdProcessHostMap[host]) err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, EtcdContainerName, host.Address, ETCDRole, prsMap) if err != nil { return err @@ -60,49 +59,6 @@ func RemoveEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, force bool) e return nil } -func buildEtcdConfig(host *hosts.Host, etcdService v3.ETCDService, initCluster, nodeName string) (*container.Config, *container.HostConfig) { - clusterState := "new" - if host.ExistingEtcdCluster { - clusterState = "existing" - } - imageCfg := &container.Config{ - Image: etcdService.Image, - Cmd: []string{"/usr/local/bin/etcd", - "--name=etcd-" + host.HostnameOverride, - "--data-dir=/etcd-data", - "--advertise-client-urls=https://" + host.InternalAddress + ":2379,https://" + host.InternalAddress + ":4001", - "--listen-client-urls=https://0.0.0.0:2379", - "--initial-advertise-peer-urls=https://" + host.InternalAddress + ":2380", - "--listen-peer-urls=https://0.0.0.0:2380", - "--initial-cluster-token=etcd-cluster-1", - "--initial-cluster=" + initCluster, - "--initial-cluster-state=" + clusterState, - "--peer-client-cert-auth", - "--client-cert-auth", - "--trusted-ca-file=" + pki.GetCertPath(pki.CACertName), - "--peer-trusted-ca-file=" + pki.GetCertPath(pki.CACertName), - "--cert-file=" + pki.GetCertPath(nodeName), - "--key-file=" + pki.GetKeyPath(nodeName), - "--peer-cert-file=" + pki.GetCertPath(nodeName), - "--peer-key-file=" + pki.GetKeyPath(nodeName), - }, - } - hostCfg := &container.HostConfig{ - RestartPolicy: container.RestartPolicy{Name: "always"}, - Binds: []string{ - "/var/lib/etcd:/etcd-data:z", - "/etc/kubernetes:/etc/kubernetes:z", - }, - NetworkMode: "host", - } - for arg, value := range etcdService.ExtraArgs { - cmd := fmt.Sprintf("--%s=%s", arg, value) - imageCfg.Entrypoint = append(imageCfg.Entrypoint, cmd) - } - - return imageCfg, hostCfg -} - func AddEtcdMember(ctx context.Context, etcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte) error { log.Infof(ctx, "[add/%s] Adding member [etcd-%s] to etcd cluster", ETCDRole, etcdHost.HostnameOverride) peerURL := fmt.Sprintf("https://%s:2380", etcdHost.InternalAddress) @@ -164,17 +120,9 @@ func RemoveEtcdMember(ctx context.Context, etcdHost *hosts.Host, etcdHosts []*ho return nil } -func ReloadEtcdCluster(ctx context.Context, etcdHosts []*hosts.Host, etcdService v3.ETCDService, localConnDialerFactory hosts.DialerFactory, cert, key []byte, prsMap map[string]v3.PrivateRegistry) error { - readyEtcdHosts := []*hosts.Host{} - for _, host := range etcdHosts { - if !host.ToAddEtcdMember { - readyEtcdHosts = append(readyEtcdHosts, host) - host.ExistingEtcdCluster = true - } - } - initCluster := getEtcdInitialCluster(readyEtcdHosts) - for _, host := range readyEtcdHosts { - imageCfg, hostCfg := buildEtcdConfig(host, etcdService, initCluster, pki.GetEtcdCrtName(host.InternalAddress)) +func ReloadEtcdCluster(ctx context.Context, readyEtcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte, prsMap map[string]v3.PrivateRegistry, etcdProcessHostMap map[*hosts.Host]v3.Process) error { + for host, process := range etcdProcessHostMap { + imageCfg, hostCfg, _ := getProcessConfig(process) if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, EtcdContainerName, host.Address, ETCDRole, prsMap); err != nil { return err } @@ -182,7 +130,8 @@ func ReloadEtcdCluster(ctx context.Context, etcdHosts []*hosts.Host, etcdService time.Sleep(10 * time.Second) var healthy bool for _, host := range readyEtcdHosts { - if healthy = isEtcdHealthy(ctx, localConnDialerFactory, host, cert, key); healthy { + _, _, healthCheckURL := getProcessConfig(etcdProcessHostMap[host]) + if healthy = isEtcdHealthy(ctx, localConnDialerFactory, host, cert, key, healthCheckURL); healthy { break } } diff --git a/services/etcd_test.go b/services/etcd_test.go index cffb87ff..6d7a1622 100644 --- a/services/etcd_test.go +++ b/services/etcd_test.go @@ -43,7 +43,7 @@ func TestEtcdConfig(t *testing.T) { etcdService.Image = TestEtcdImage etcdService.ExtraArgs = map[string]string{"foo": "bar"} // Test init cluster string - initCluster := getEtcdInitialCluster(etcdHosts) + initCluster := GetEtcdInitialCluster(etcdHosts) assertEqual(t, initCluster, TestInitEtcdClusterString, "") for _, host := range etcdHosts { diff --git a/services/etcd_util.go b/services/etcd_util.go index 536048c7..96398b66 100644 --- a/services/etcd_util.go +++ b/services/etcd_util.go @@ -39,7 +39,7 @@ func getEtcdClient(ctx context.Context, etcdHost *hosts.Host, localConnDialerFac return etcdclient.New(cfg) } -func isEtcdHealthy(ctx context.Context, localConnDialerFactory hosts.DialerFactory, host *hosts.Host, cert, key []byte) bool { +func isEtcdHealthy(ctx context.Context, localConnDialerFactory hosts.DialerFactory, host *hosts.Host, cert, key []byte, url string) bool { logrus.Debugf("[etcd] Check etcd cluster health") for i := 0; i < 3; i++ { dialer, err := getEtcdDialer(localConnDialerFactory, host) @@ -59,7 +59,7 @@ func isEtcdHealthy(ctx context.Context, localConnDialerFactory hosts.DialerFacto TLSHandshakeTimeout: 10 * time.Second, }, } - healthy, err := getHealthEtcd(hc, host) + healthy, err := getHealthEtcd(hc, host, url) if err != nil { logrus.Debug(err) time.Sleep(5 * time.Second) @@ -73,9 +73,9 @@ func isEtcdHealthy(ctx context.Context, localConnDialerFactory hosts.DialerFacto return false } -func getHealthEtcd(hc http.Client, host *hosts.Host) (string, error) { +func getHealthEtcd(hc http.Client, host *hosts.Host, url string) (string, error) { healthy := struct{ Health string }{} - resp, err := hc.Get("https://127.0.0.1:2379/health") + resp, err := hc.Get(url) if err != nil { return healthy.Health, fmt.Errorf("Failed to get /health for host [%s]: %v", host.Address, err) } @@ -90,7 +90,7 @@ func getHealthEtcd(hc http.Client, host *hosts.Host) (string, error) { return healthy.Health, nil } -func getEtcdInitialCluster(hosts []*hosts.Host) string { +func GetEtcdInitialCluster(hosts []*hosts.Host) string { initialCluster := "" for i, host := range hosts { initialCluster += fmt.Sprintf("etcd-%s=https://%s:2380", host.HostnameOverride, host.InternalAddress) diff --git a/services/healthcheck.go b/services/healthcheck.go index 11f883ff..dcd333a8 100644 --- a/services/healthcheck.go +++ b/services/healthcheck.go @@ -6,6 +6,8 @@ import ( "fmt" "io/ioutil" "net/http" + "strconv" + "strings" "time" "github.com/rancher/rke/hosts" @@ -20,14 +22,18 @@ const ( HTTPSProtoPrefix = "https://" ) -func runHealthcheck(ctx context.Context, host *hosts.Host, port int, useTLS bool, serviceName string, localConnDialerFactory hosts.DialerFactory) error { +func runHealthcheck(ctx context.Context, host *hosts.Host, serviceName string, localConnDialerFactory hosts.DialerFactory, url string) error { log.Infof(ctx, "[healthcheck] Start Healthcheck on service [%s] on host [%s]", serviceName, host.Address) + port, err := getPortFromURL(url) + if err != nil { + return err + } client, err := getHealthCheckHTTPClient(host, port, localConnDialerFactory) if err != nil { return fmt.Errorf("Failed to initiate new HTTP client for service [%s] for host [%s]", serviceName, host.Address) } for retries := 0; retries < 10; retries++ { - if err = getHealthz(client, useTLS, serviceName, host.Address, port); err != nil { + if err = getHealthz(client, serviceName, host.Address, url); err != nil { logrus.Debugf("[healthcheck] %v", err) time.Sleep(5 * time.Second) continue @@ -58,14 +64,10 @@ func getHealthCheckHTTPClient(host *hosts.Host, port int, localConnDialerFactory }, nil } -func getHealthz(client *http.Client, useTLS bool, serviceName, hostAddress string, port int) error { - proto := HTTPProtoPrefix - if useTLS { - proto = HTTPSProtoPrefix - } - resp, err := client.Get(fmt.Sprintf("%s%s:%d%s", proto, HealthzAddress, port, HealthzEndpoint)) +func getHealthz(client *http.Client, serviceName, hostAddress, url string) error { + resp, err := client.Get(url) if err != nil { - return fmt.Errorf("Failed to check %s for service [%s] on host [%s]: %v", HealthzEndpoint, serviceName, hostAddress, err) + return fmt.Errorf("Failed to check %s for service [%s] on host [%s]: %v", url, serviceName, hostAddress, err) } if resp.StatusCode != http.StatusOK { statusBody, _ := ioutil.ReadAll(resp.Body) @@ -73,3 +75,12 @@ func getHealthz(client *http.Client, useTLS bool, serviceName, hostAddress strin } return nil } + +func getPortFromURL(url string) (int, error) { + port := strings.Split(strings.Split(url, ":")[2], "/")[0] + intPort, err := strconv.Atoi(port) + if err != nil { + return 0, err + } + return intPort, nil +} diff --git a/services/kubeapi.go b/services/kubeapi.go index 114ec8c7..0c428b5a 100644 --- a/services/kubeapi.go +++ b/services/kubeapi.go @@ -2,75 +2,20 @@ package services import ( "context" - "fmt" - "github.com/docker/docker/api/types/container" "github.com/rancher/rke/docker" "github.com/rancher/rke/hosts" - "github.com/rancher/rke/pki" "github.com/rancher/types/apis/management.cattle.io/v3" ) -func runKubeAPI(ctx context.Context, host *hosts.Host, etcdHosts []*hosts.Host, kubeAPIService v3.KubeAPIService, authorizationMode string, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error { - etcdConnString := GetEtcdConnString(etcdHosts) - imageCfg, hostCfg := buildKubeAPIConfig(host, kubeAPIService, etcdConnString, authorizationMode) +func runKubeAPI(ctx context.Context, host *hosts.Host, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, kubeAPIProcess v3.Process) error { + imageCfg, hostCfg, healthCheckURL := getProcessConfig(kubeAPIProcess) if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeAPIContainerName, host.Address, ControlRole, prsMap); err != nil { return err } - return runHealthcheck(ctx, host, KubeAPIPort, true, KubeAPIContainerName, df) + return runHealthcheck(ctx, host, KubeAPIContainerName, df, healthCheckURL) } func removeKubeAPI(ctx context.Context, host *hosts.Host) error { return docker.DoRemoveContainer(ctx, host.DClient, KubeAPIContainerName, host.Address) } - -func buildKubeAPIConfig(host *hosts.Host, kubeAPIService v3.KubeAPIService, etcdConnString, authorizationMode string) (*container.Config, *container.HostConfig) { - imageCfg := &container.Config{ - Image: kubeAPIService.Image, - Entrypoint: []string{"/opt/rke/entrypoint.sh", - "kube-apiserver", - "--insecure-bind-address=127.0.0.1", - "--bind-address=0.0.0.0", - "--insecure-port=0", - "--secure-port=6443", - "--cloud-provider=", - "--allow_privileged=true", - "--kubelet-preferred-address-types=InternalIP,ExternalIP,Hostname", - "--service-cluster-ip-range=" + kubeAPIService.ServiceClusterIPRange, - "--admission-control=ServiceAccount,NamespaceLifecycle,LimitRanger,PersistentVolumeLabel,DefaultStorageClass,ResourceQuota,DefaultTolerationSeconds", - "--runtime-config=batch/v2alpha1", - "--runtime-config=authentication.k8s.io/v1beta1=true", - "--storage-backend=etcd3", - "--client-ca-file=" + pki.GetCertPath(pki.CACertName), - "--tls-cert-file=" + pki.GetCertPath(pki.KubeAPICertName), - "--tls-private-key-file=" + pki.GetKeyPath(pki.KubeAPICertName), - "--service-account-key-file=" + pki.GetKeyPath(pki.KubeAPICertName), - "--etcd-cafile=" + pki.GetCertPath(pki.CACertName), - "--etcd-certfile=" + pki.GetCertPath(pki.KubeAPICertName), - "--etcd-keyfile=" + pki.GetKeyPath(pki.KubeAPICertName)}, - } - imageCfg.Cmd = append(imageCfg.Cmd, "--etcd-servers="+etcdConnString) - - if authorizationMode == RBACAuthorizationMode { - imageCfg.Cmd = append(imageCfg.Cmd, "--authorization-mode=RBAC") - } - if kubeAPIService.PodSecurityPolicy { - imageCfg.Cmd = append(imageCfg.Cmd, "--runtime-config=extensions/v1beta1/podsecuritypolicy=true", "--admission-control=PodSecurityPolicy") - } - hostCfg := &container.HostConfig{ - VolumesFrom: []string{ - SidekickContainerName, - }, - Binds: []string{ - "/etc/kubernetes:/etc/kubernetes:z", - }, - NetworkMode: "host", - RestartPolicy: container.RestartPolicy{Name: "always"}, - } - - for arg, value := range kubeAPIService.ExtraArgs { - cmd := fmt.Sprintf("--%s=%s", arg, value) - imageCfg.Entrypoint = append(imageCfg.Entrypoint, cmd) - } - return imageCfg, hostCfg -} diff --git a/services/kubecontroller.go b/services/kubecontroller.go index f38bea83..15e61934 100644 --- a/services/kubecontroller.go +++ b/services/kubecontroller.go @@ -2,63 +2,20 @@ package services import ( "context" - "fmt" - "github.com/docker/docker/api/types/container" "github.com/rancher/rke/docker" "github.com/rancher/rke/hosts" - "github.com/rancher/rke/pki" "github.com/rancher/types/apis/management.cattle.io/v3" ) -func runKubeController(ctx context.Context, host *hosts.Host, kubeControllerService v3.KubeControllerService, authorizationMode string, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error { - imageCfg, hostCfg := buildKubeControllerConfig(kubeControllerService, authorizationMode) +func runKubeController(ctx context.Context, host *hosts.Host, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, controllerProcess v3.Process) error { + imageCfg, hostCfg, healthCheckURL := getProcessConfig(controllerProcess) if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeControllerContainerName, host.Address, ControlRole, prsMap); err != nil { return err } - return runHealthcheck(ctx, host, KubeControllerPort, false, KubeControllerContainerName, df) + return runHealthcheck(ctx, host, KubeControllerContainerName, df, healthCheckURL) } func removeKubeController(ctx context.Context, host *hosts.Host) error { return docker.DoRemoveContainer(ctx, host.DClient, KubeControllerContainerName, host.Address) } - -func buildKubeControllerConfig(kubeControllerService v3.KubeControllerService, authorizationMode string) (*container.Config, *container.HostConfig) { - imageCfg := &container.Config{ - Image: kubeControllerService.Image, - Entrypoint: []string{"/opt/rke/entrypoint.sh", - "kube-controller-manager", - "--address=0.0.0.0", - "--cloud-provider=", - "--leader-elect=true", - "--kubeconfig=" + pki.GetConfigPath(pki.KubeControllerCertName), - "--enable-hostpath-provisioner=false", - "--node-monitor-grace-period=40s", - "--pod-eviction-timeout=5m0s", - "--v=2", - "--allocate-node-cidrs=true", - "--cluster-cidr=" + kubeControllerService.ClusterCIDR, - "--service-cluster-ip-range=" + kubeControllerService.ServiceClusterIPRange, - "--service-account-private-key-file=" + pki.GetKeyPath(pki.KubeAPICertName), - "--root-ca-file=" + pki.GetCertPath(pki.CACertName), - }, - } - if authorizationMode == RBACAuthorizationMode { - imageCfg.Cmd = append(imageCfg.Cmd, "--use-service-account-credentials=true") - } - hostCfg := &container.HostConfig{ - VolumesFrom: []string{ - SidekickContainerName, - }, - Binds: []string{ - "/etc/kubernetes:/etc/kubernetes:z", - }, - NetworkMode: "host", - RestartPolicy: container.RestartPolicy{Name: "always"}, - } - for arg, value := range kubeControllerService.ExtraArgs { - cmd := fmt.Sprintf("--%s=%s", arg, value) - imageCfg.Entrypoint = append(imageCfg.Entrypoint, cmd) - } - return imageCfg, hostCfg -} diff --git a/services/kubelet.go b/services/kubelet.go index 0f650189..c1e13bc2 100644 --- a/services/kubelet.go +++ b/services/kubelet.go @@ -2,79 +2,20 @@ package services import ( "context" - "fmt" - "strconv" - "github.com/docker/docker/api/types/container" "github.com/rancher/rke/docker" "github.com/rancher/rke/hosts" - "github.com/rancher/rke/pki" "github.com/rancher/types/apis/management.cattle.io/v3" ) -func runKubelet(ctx context.Context, host *hosts.Host, kubeletService v3.KubeletService, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error { - imageCfg, hostCfg := buildKubeletConfig(host, kubeletService) +func runKubelet(ctx context.Context, host *hosts.Host, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, kubeletProcess v3.Process) error { + imageCfg, hostCfg, healthCheckURL := getProcessConfig(kubeletProcess) if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeletContainerName, host.Address, WorkerRole, prsMap); err != nil { return err } - return runHealthcheck(ctx, host, KubeletPort, true, KubeletContainerName, df) + return runHealthcheck(ctx, host, KubeletContainerName, df, healthCheckURL) } func removeKubelet(ctx context.Context, host *hosts.Host) error { return docker.DoRemoveContainer(ctx, host.DClient, KubeletContainerName, host.Address) } - -func buildKubeletConfig(host *hosts.Host, kubeletService v3.KubeletService) (*container.Config, *container.HostConfig) { - imageCfg := &container.Config{ - Image: kubeletService.Image, - Entrypoint: []string{"/opt/rke/entrypoint.sh", - "kubelet", - "--v=2", - "--address=0.0.0.0", - "--cluster-domain=" + kubeletService.ClusterDomain, - "--pod-infra-container-image=" + kubeletService.InfraContainerImage, - "--cgroups-per-qos=True", - "--enforce-node-allocatable=", - "--hostname-override=" + host.HostnameOverride, - "--cluster-dns=" + kubeletService.ClusterDNSServer, - "--network-plugin=cni", - "--cni-conf-dir=/etc/cni/net.d", - "--cni-bin-dir=/opt/cni/bin", - "--resolv-conf=/etc/resolv.conf", - "--allow-privileged=true", - "--cloud-provider=", - "--kubeconfig=" + pki.GetConfigPath(pki.KubeNodeCertName), - "--volume-plugin-dir=/var/lib/kubelet/volumeplugins", - "--require-kubeconfig=True", - "--fail-swap-on=" + strconv.FormatBool(kubeletService.FailSwapOn), - }, - } - hostCfg := &container.HostConfig{ - VolumesFrom: []string{ - SidekickContainerName, - }, - Binds: []string{ - "/etc/kubernetes:/etc/kubernetes:z", - "/etc/cni:/etc/cni:ro,z", - "/opt/cni:/opt/cni:ro,z", - "/etc/resolv.conf:/etc/resolv.conf", - "/sys:/sys", - "/var/lib/docker:/var/lib/docker:rw,z", - "/var/lib/kubelet:/var/lib/kubelet:shared,z", - "/var/run:/var/run:rw", - "/run:/run", - "/etc/ceph:/etc/ceph", - "/dev:/host/dev", - "/var/log/containers:/var/log/containers:z", - "/var/log/pods:/var/log/pods:z"}, - NetworkMode: "host", - PidMode: "host", - Privileged: true, - RestartPolicy: container.RestartPolicy{Name: "always"}, - } - for arg, value := range kubeletService.ExtraArgs { - cmd := fmt.Sprintf("--%s=%s", arg, value) - imageCfg.Entrypoint = append(imageCfg.Entrypoint, cmd) - } - return imageCfg, hostCfg -} diff --git a/services/kubeproxy.go b/services/kubeproxy.go index 265feab3..e2392b14 100644 --- a/services/kubeproxy.go +++ b/services/kubeproxy.go @@ -2,51 +2,20 @@ package services import ( "context" - "fmt" - "github.com/docker/docker/api/types/container" "github.com/rancher/rke/docker" "github.com/rancher/rke/hosts" - "github.com/rancher/rke/pki" "github.com/rancher/types/apis/management.cattle.io/v3" ) -func runKubeproxy(ctx context.Context, host *hosts.Host, kubeproxyService v3.KubeproxyService, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error { - imageCfg, hostCfg := buildKubeproxyConfig(host, kubeproxyService) +func runKubeproxy(ctx context.Context, host *hosts.Host, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, kubeProxyProcess v3.Process) error { + imageCfg, hostCfg, healthCheckURL := getProcessConfig(kubeProxyProcess) if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeproxyContainerName, host.Address, WorkerRole, prsMap); err != nil { return err } - return runHealthcheck(ctx, host, KubeproxyPort, false, KubeproxyContainerName, df) + return runHealthcheck(ctx, host, KubeproxyContainerName, df, healthCheckURL) } func removeKubeproxy(ctx context.Context, host *hosts.Host) error { return docker.DoRemoveContainer(ctx, host.DClient, KubeproxyContainerName, host.Address) } - -func buildKubeproxyConfig(host *hosts.Host, kubeproxyService v3.KubeproxyService) (*container.Config, *container.HostConfig) { - imageCfg := &container.Config{ - Image: kubeproxyService.Image, - Entrypoint: []string{"/opt/rke/entrypoint.sh", - "kube-proxy", - "--v=2", - "--healthz-bind-address=0.0.0.0", - "--kubeconfig=" + pki.GetConfigPath(pki.KubeProxyCertName), - }, - } - hostCfg := &container.HostConfig{ - VolumesFrom: []string{ - SidekickContainerName, - }, - Binds: []string{ - "/etc/kubernetes:/etc/kubernetes:z", - }, - NetworkMode: "host", - RestartPolicy: container.RestartPolicy{Name: "always"}, - Privileged: true, - } - for arg, value := range kubeproxyService.ExtraArgs { - cmd := fmt.Sprintf("--%s=%s", arg, value) - imageCfg.Entrypoint = append(imageCfg.Entrypoint, cmd) - } - return imageCfg, hostCfg -} diff --git a/services/proxy.go b/services/proxy.go index 9befd355..2842622b 100644 --- a/services/proxy.go +++ b/services/proxy.go @@ -2,9 +2,7 @@ package services import ( "context" - "fmt" - "github.com/docker/docker/api/types/container" "github.com/rancher/rke/docker" "github.com/rancher/rke/hosts" "github.com/rancher/types/apis/management.cattle.io/v3" @@ -15,48 +13,11 @@ const ( NginxProxyEnvName = "CP_HOSTS" ) -func RollingUpdateNginxProxy(ctx context.Context, cpHosts []*hosts.Host, workerHosts []*hosts.Host, nginxProxyImage string, prsMap map[string]v3.PrivateRegistry) error { - nginxProxyEnv := buildProxyEnv(cpHosts) - for _, host := range workerHosts { - imageCfg, hostCfg := buildNginxProxyConfig(host, nginxProxyEnv, nginxProxyImage) - if err := docker.DoRollingUpdateContainer(ctx, host.DClient, imageCfg, hostCfg, NginxProxyContainerName, host.Address, WorkerRole, prsMap); err != nil { - return err - } - } - return nil -} - -func runNginxProxy(ctx context.Context, host *hosts.Host, cpHosts []*hosts.Host, nginxProxyImage string, prsMap map[string]v3.PrivateRegistry) error { - nginxProxyEnv := buildProxyEnv(cpHosts) - imageCfg, hostCfg := buildNginxProxyConfig(host, nginxProxyEnv, nginxProxyImage) +func runNginxProxy(ctx context.Context, host *hosts.Host, prsMap map[string]v3.PrivateRegistry, proxyProcess v3.Process) error { + imageCfg, hostCfg, _ := getProcessConfig(proxyProcess) return docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, NginxProxyContainerName, host.Address, WorkerRole, prsMap) } func removeNginxProxy(ctx context.Context, host *hosts.Host) error { return docker.DoRemoveContainer(ctx, host.DClient, NginxProxyContainerName, host.Address) } - -func buildNginxProxyConfig(host *hosts.Host, nginxProxyEnv, nginxProxyImage string) (*container.Config, *container.HostConfig) { - imageCfg := &container.Config{ - Image: nginxProxyImage, - Env: []string{fmt.Sprintf("%s=%s", NginxProxyEnvName, nginxProxyEnv)}, - Cmd: []string{fmt.Sprintf("%s=%s", NginxProxyEnvName, nginxProxyEnv)}, - } - hostCfg := &container.HostConfig{ - NetworkMode: "host", - RestartPolicy: container.RestartPolicy{Name: "always"}, - } - - return imageCfg, hostCfg -} - -func buildProxyEnv(cpHosts []*hosts.Host) string { - proxyEnv := "" - for i, cpHost := range cpHosts { - proxyEnv += fmt.Sprintf("%s", cpHost.InternalAddress) - if i < (len(cpHosts) - 1) { - proxyEnv += "," - } - } - return proxyEnv -} diff --git a/services/scheduler.go b/services/scheduler.go index b101907b..9c6e7b1d 100644 --- a/services/scheduler.go +++ b/services/scheduler.go @@ -2,51 +2,20 @@ package services import ( "context" - "fmt" - "github.com/docker/docker/api/types/container" "github.com/rancher/rke/docker" "github.com/rancher/rke/hosts" - "github.com/rancher/rke/pki" "github.com/rancher/types/apis/management.cattle.io/v3" ) -func runScheduler(ctx context.Context, host *hosts.Host, schedulerService v3.SchedulerService, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error { - imageCfg, hostCfg := buildSchedulerConfig(host, schedulerService) +func runScheduler(ctx context.Context, host *hosts.Host, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, schedulerProcess v3.Process) error { + imageCfg, hostCfg, healthCheckURL := getProcessConfig(schedulerProcess) if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, SchedulerContainerName, host.Address, ControlRole, prsMap); err != nil { return err } - return runHealthcheck(ctx, host, SchedulerPort, false, SchedulerContainerName, df) + return runHealthcheck(ctx, host, SchedulerContainerName, df, healthCheckURL) } func removeScheduler(ctx context.Context, host *hosts.Host) error { return docker.DoRemoveContainer(ctx, host.DClient, SchedulerContainerName, host.Address) } - -func buildSchedulerConfig(host *hosts.Host, schedulerService v3.SchedulerService) (*container.Config, *container.HostConfig) { - imageCfg := &container.Config{ - Image: schedulerService.Image, - Entrypoint: []string{"/opt/rke/entrypoint.sh", - "kube-scheduler", - "--leader-elect=true", - "--v=2", - "--address=0.0.0.0", - "--kubeconfig=" + pki.GetConfigPath(pki.KubeSchedulerCertName), - }, - } - hostCfg := &container.HostConfig{ - VolumesFrom: []string{ - SidekickContainerName, - }, - Binds: []string{ - "/etc/kubernetes:/etc/kubernetes:z", - }, - NetworkMode: "host", - RestartPolicy: container.RestartPolicy{Name: "always"}, - } - for arg, value := range schedulerService.ExtraArgs { - cmd := fmt.Sprintf("--%s=%s", arg, value) - imageCfg.Entrypoint = append(imageCfg.Entrypoint, cmd) - } - return imageCfg, hostCfg -} diff --git a/services/services.go b/services/services.go index adf1729e..a93fce58 100644 --- a/services/services.go +++ b/services/services.go @@ -2,6 +2,7 @@ package services import ( "context" + "fmt" "github.com/docker/docker/api/types/container" "github.com/rancher/rke/docker" @@ -34,17 +35,7 @@ const ( KubeproxyPort = 10256 ) -func buildSidekickConfig(sidekickImage string) (*container.Config, *container.HostConfig) { - imageCfg := &container.Config{ - Image: sidekickImage, - } - hostCfg := &container.HostConfig{ - NetworkMode: "none", - } - return imageCfg, hostCfg -} - -func runSidekick(ctx context.Context, host *hosts.Host, sidekickImage string, prsMap map[string]v3.PrivateRegistry) error { +func runSidekick(ctx context.Context, host *hosts.Host, prsMap map[string]v3.PrivateRegistry, sidecarProcess v3.Process) error { isRunning, err := docker.IsContainerRunning(ctx, host.DClient, host.Address, SidekickContainerName, true) if err != nil { return err @@ -53,8 +44,10 @@ func runSidekick(ctx context.Context, host *hosts.Host, sidekickImage string, pr log.Infof(ctx, "[%s] Sidekick container already created on host [%s]", SidekickServiceName, host.Address) return nil } - imageCfg, hostCfg := buildSidekickConfig(sidekickImage) - if err := docker.UseLocalOrPull(ctx, host.DClient, host.Address, sidekickImage, SidekickServiceName, prsMap); err != nil { + + imageCfg, hostCfg, _ := getProcessConfig(sidecarProcess) + sidecarImage := sidecarProcess.Image + if err := docker.UseLocalOrPull(ctx, host.DClient, host.Address, sidecarImage, SidekickServiceName, prsMap); err != nil { return err } if _, err := docker.CreateContiner(ctx, host.DClient, host.Address, SidekickContainerName, imageCfg, hostCfg); err != nil { @@ -66,3 +59,32 @@ func runSidekick(ctx context.Context, host *hosts.Host, sidekickImage string, pr func removeSidekick(ctx context.Context, host *hosts.Host) error { return docker.DoRemoveContainer(ctx, host.DClient, SidekickContainerName, host.Address) } + +func getProcessConfig(process v3.Process) (*container.Config, *container.HostConfig, string) { + imageCfg := &container.Config{ + Entrypoint: process.Command, + Cmd: process.Args, + Env: process.Env, + Image: process.Image, + } + // var pidMode container.PidMode + // pidMode = process.PidMode + hostCfg := &container.HostConfig{ + VolumesFrom: process.VolumesFrom, + Binds: process.Binds, + NetworkMode: container.NetworkMode(process.NetworkMode), + PidMode: container.PidMode(process.PidMode), + Privileged: process.Privileged, + } + if len(process.RestartPolicy) > 0 { + hostCfg.RestartPolicy = container.RestartPolicy{Name: process.RestartPolicy} + } + return imageCfg, hostCfg, process.HealthCheck.URL +} + +func GetHealthCheckURL(useTLS bool, port int) string { + if useTLS { + return fmt.Sprintf("%s%s:%d%s", HTTPSProtoPrefix, HealthzAddress, port, HealthzEndpoint) + } + return fmt.Sprintf("%s%s:%d%s", HTTPProtoPrefix, HealthzAddress, port, HealthzEndpoint) +} diff --git a/services/workerplane.go b/services/workerplane.go index d9a5c872..89fa2294 100644 --- a/services/workerplane.go +++ b/services/workerplane.go @@ -13,18 +13,20 @@ const ( unschedulableEtcdTaint = "node-role.kubernetes.io/etcd=true:NoExecute" ) -func RunWorkerPlane(ctx context.Context, controlHosts, workerHosts, etcdHosts []*hosts.Host, workerServices v3.RKEConfigServices, nginxProxyImage, sidekickImage string, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error { +func RunWorkerPlane(ctx context.Context, allHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, processMap map[string]v3.Process, kubeletProcessHostMap map[*hosts.Host]v3.Process) error { log.Infof(ctx, "[%s] Building up Worker Plane..", WorkerRole) var errgrp errgroup.Group - allHosts := hosts.GetUniqueHostList(etcdHosts, controlHosts, workerHosts) for _, host := range allHosts { if !host.IsControl && !host.IsWorker { // Add unschedulable taint host.ToAddTaints = append(host.ToAddTaints, unschedulableEtcdTaint) } runHost := host + // maps are not thread safe + hostProcessMap := copyProcessMap(processMap) errgrp.Go(func() error { - return doDeployWorkerPlane(ctx, runHost, workerServices, nginxProxyImage, sidekickImage, localConnDialerFactory, controlHosts, prsMap) + hostProcessMap[KubeletContainerName] = kubeletProcessHostMap[runHost] + return doDeployWorkerPlane(ctx, runHost, localConnDialerFactory, prsMap, hostProcessMap) }) } if err := errgrp.Wait(); err != nil { @@ -62,25 +64,29 @@ func RemoveWorkerPlane(ctx context.Context, workerHosts []*hosts.Host, force boo } func doDeployWorkerPlane(ctx context.Context, host *hosts.Host, - workerServices v3.RKEConfigServices, - nginxProxyImage, sidekickImage string, localConnDialerFactory hosts.DialerFactory, - controlHosts []*hosts.Host, - prsMap map[string]v3.PrivateRegistry) error { - + prsMap map[string]v3.PrivateRegistry, processMap map[string]v3.Process) error { // run nginx proxy if !host.IsControl { - if err := runNginxProxy(ctx, host, controlHosts, nginxProxyImage, prsMap); err != nil { + if err := runNginxProxy(ctx, host, prsMap, processMap[NginxProxyContainerName]); err != nil { return err } } // run sidekick - if err := runSidekick(ctx, host, sidekickImage, prsMap); err != nil { + if err := runSidekick(ctx, host, prsMap, processMap[SidekickContainerName]); err != nil { return err } // run kubelet - if err := runKubelet(ctx, host, workerServices.Kubelet, localConnDialerFactory, prsMap); err != nil { + if err := runKubelet(ctx, host, localConnDialerFactory, prsMap, processMap[KubeletContainerName]); err != nil { return err } - return runKubeproxy(ctx, host, workerServices.Kubeproxy, localConnDialerFactory, prsMap) + return runKubeproxy(ctx, host, localConnDialerFactory, prsMap, processMap[KubeproxyContainerName]) +} + +func copyProcessMap(m map[string]v3.Process) map[string]v3.Process { + c := make(map[string]v3.Process) + for k, v := range m { + c[k] = v + } + return c }