1
0
mirror of https://github.com/rancher/rke.git synced 2025-08-31 14:36:32 +00:00

Add unschedulable kubelet on etcd nodes

This commit is contained in:
galal-hussein
2018-01-19 03:48:51 +02:00
parent 8769116013
commit 9e29b753cb
13 changed files with 145 additions and 15 deletions

View File

@@ -82,6 +82,7 @@ func (c *Cluster) DeployWorkerPlane(ctx context.Context) error {
// Deploy Worker Plane // Deploy Worker Plane
if err := services.RunWorkerPlane(ctx, c.ControlPlaneHosts, if err := services.RunWorkerPlane(ctx, c.ControlPlaneHosts,
c.WorkerHosts, c.WorkerHosts,
c.EtcdHosts,
c.Services, c.Services,
c.SystemImages[NginxProxyImage], c.SystemImages[NginxProxyImage],
c.SystemImages[ServiceSidekickImage], c.SystemImages[ServiceSidekickImage],

View File

@@ -80,6 +80,12 @@ func (c *Cluster) SetUpHosts(ctx context.Context) error {
if err != nil { if err != nil {
return err return err
} }
// Deploying worker certs on etcd hosts as well
err = pki.DeployCertificatesOnWorkers(ctx, c.EtcdHosts, c.Certificates, c.SystemImages[CertDownloaderImage])
if err != nil {
return err
}
err = pki.DeployAdminConfig(ctx, c.Certificates[pki.KubeAdminCommonName].Config, c.LocalKubeConfigPath) err = pki.DeployAdminConfig(ctx, c.Certificates[pki.KubeAdminCommonName].Config, c.LocalKubeConfigPath)
if err != nil { if err != nil {
return err return err

View File

@@ -12,6 +12,10 @@ import (
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
) )
const (
taintKey = "node-role.kubernetes.io/etcd"
)
func ReconcileCluster(ctx context.Context, kubeCluster, currentCluster *Cluster) error { func ReconcileCluster(ctx context.Context, kubeCluster, currentCluster *Cluster) error {
log.Infof(ctx, "[reconcile] Reconciling cluster state") log.Infof(ctx, "[reconcile] Reconciling cluster state")
if currentCluster == nil { if currentCluster == nil {
@@ -55,6 +59,15 @@ func reconcileWorker(ctx context.Context, currentCluster, kubeCluster *Cluster,
continue continue
} }
} }
// attempt to remove unschedulable taint
toAddHosts := hosts.GetToAddHosts(currentCluster.WorkerHosts, kubeCluster.WorkerHosts)
for _, host := range toAddHosts {
if host.IsEtcd {
if err := hosts.RemoveTaintFromHost(ctx, host, taintKey, kubeClient); err != nil {
return fmt.Errorf("[reconcile] Failed to remove unschedulable taint from node [%s]", host.Address)
}
}
}
return nil return nil
} }
@@ -100,6 +113,15 @@ func reconcileControl(ctx context.Context, currentCluster, kubeCluster *Cluster,
return fmt.Errorf("Failed to rolling update Nginx hosts with new control plane hosts") return fmt.Errorf("Failed to rolling update Nginx hosts with new control plane hosts")
} }
} }
// attempt to remove unschedulable taint
toAddHosts := hosts.GetToAddHosts(currentCluster.ControlPlaneHosts, kubeCluster.ControlPlaneHosts)
for _, host := range toAddHosts {
if host.IsEtcd {
if err := hosts.RemoveTaintFromHost(ctx, host, taintKey, kubeClient); err != nil {
log.Warnf(ctx, "[reconcile] Failed to remove unschedulable taint from node [%s]", host.Address)
}
}
}
return nil return nil
} }
@@ -115,7 +137,7 @@ func reconcileHost(ctx context.Context, toDeleteHost *hosts.Host, worker, etcd b
return fmt.Errorf("Not able to clean the host: %v", err) return fmt.Errorf("Not able to clean the host: %v", err)
} }
} else if etcd { } else if etcd {
if err := services.RemoveEtcdPlane(ctx, []*hosts.Host{toDeleteHost}); err != nil { if err := services.RemoveEtcdPlane(ctx, []*hosts.Host{toDeleteHost}, false); err != nil {
return fmt.Errorf("Couldn't remove etcd plane: %v", err) return fmt.Errorf("Couldn't remove etcd plane: %v", err)
} }
if err := toDeleteHost.CleanUpEtcdHost(ctx, cleanerImage); err != nil { if err := toDeleteHost.CleanUpEtcdHost(ctx, cleanerImage); err != nil {
@@ -133,7 +155,7 @@ func reconcileHost(ctx context.Context, toDeleteHost *hosts.Host, worker, etcd b
} }
func reconcileEtcd(ctx context.Context, currentCluster, kubeCluster *Cluster) error { func reconcileEtcd(ctx context.Context, currentCluster, kubeCluster *Cluster) error {
logrus.Infof("[reconcile] Check etcd hosts to be deleted") log.Infof(ctx, "[reconcile] Check etcd hosts to be deleted")
etcdToDelete := hosts.GetToDeleteHosts(currentCluster.EtcdHosts, kubeCluster.EtcdHosts) etcdToDelete := hosts.GetToDeleteHosts(currentCluster.EtcdHosts, kubeCluster.EtcdHosts)
for _, etcdHost := range etcdToDelete { for _, etcdHost := range etcdToDelete {
if err := services.RemoveEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory); err != nil { if err := services.RemoveEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory); err != nil {
@@ -146,7 +168,7 @@ func reconcileEtcd(ctx context.Context, currentCluster, kubeCluster *Cluster) er
continue continue
} }
} }
logrus.Infof("[reconcile] Check etcd hosts to be added") log.Infof(ctx, "[reconcile] Check etcd hosts to be added")
etcdToAdd := hosts.GetToAddHosts(currentCluster.EtcdHosts, kubeCluster.EtcdHosts) etcdToAdd := hosts.GetToAddHosts(currentCluster.EtcdHosts, kubeCluster.EtcdHosts)
for _, etcdHost := range etcdToAdd { for _, etcdHost := range etcdToAdd {
etcdHost.ToAddEtcdMember = true etcdHost.ToAddEtcdMember = true

View File

@@ -20,7 +20,7 @@ func (c *Cluster) ClusterRemove(ctx context.Context) error {
} }
// Remove Etcd Plane // Remove Etcd Plane
if err := services.RemoveEtcdPlane(ctx, c.EtcdHosts); err != nil { if err := services.RemoveEtcdPlane(ctx, c.EtcdHosts, true); err != nil {
return err return err
} }

View File

@@ -137,6 +137,15 @@ func DeleteNode(ctx context.Context, toDeleteHost *Host, kubeClient *kubernetes.
return nil return nil
} }
func RemoveTaintFromHost(ctx context.Context, host *Host, taintKey string, kubeClient *kubernetes.Clientset) error {
log.Infof(ctx, "[hosts] removing taint [%s] from host [%s]", taintKey, host.Address)
if err := k8s.RemoveTaintFromNodeByKey(kubeClient, host.HostnameOverride, taintKey); err != nil {
return err
}
log.Infof(ctx, "[hosts] Successfully deleted taint [%s] from host [%s]", taintKey, host.Address)
return nil
}
func GetToDeleteHosts(currentHosts, configHosts []*Host) []*Host { func GetToDeleteHosts(currentHosts, configHosts []*Host) []*Host {
toDeleteHosts := []*Host{} toDeleteHosts := []*Host{}
for _, currentHost := range currentHosts { for _, currentHost := range currentHosts {

View File

@@ -6,6 +6,7 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
) )
@@ -58,3 +59,42 @@ func IsNodeReady(node v1.Node) bool {
} }
return false return false
} }
func RemoveTaintFromNodeByKey(k8sClient *kubernetes.Clientset, nodeName, taintKey string) error {
updated := false
var err error
var node *v1.Node
for retries := 0; retries <= 5; retries++ {
node, err = GetNode(k8sClient, nodeName)
if err != nil {
if apierrors.IsNotFound(err) {
logrus.Debugf("[hosts] Can't find node by name [%s]", nodeName)
return nil
}
return err
}
foundTaint := false
for i, taint := range node.Spec.Taints {
if taint.Key == taintKey {
foundTaint = true
node.Spec.Taints = append(node.Spec.Taints[:i], node.Spec.Taints[i+1:]...)
break
}
}
if !foundTaint {
return nil
}
_, err = k8sClient.CoreV1().Nodes().Update(node)
if err != nil {
logrus.Debugf("Error updating node [%s] with new set of taints: %v", node.Name, err)
time.Sleep(time.Second * 5)
continue
}
updated = true
break
}
if !updated {
return fmt.Errorf("Timeout waiting for node [%s] to be updated with new set of taints: %v", node.Name, err)
}
return nil
}

View File

@@ -29,13 +29,29 @@ func RunEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, etcdService v3.E
return nil return nil
} }
func RemoveEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host) error { func RemoveEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, force bool) error {
log.Infof(ctx, "[%s] Tearing down Etcd Plane..", ETCDRole) log.Infof(ctx, "[%s] Tearing down Etcd Plane..", ETCDRole)
for _, host := range etcdHosts { for _, host := range etcdHosts {
err := docker.DoRemoveContainer(ctx, host.DClient, EtcdContainerName, host.Address) err := docker.DoRemoveContainer(ctx, host.DClient, EtcdContainerName, host.Address)
if err != nil { if err != nil {
return err return err
} }
if !host.IsWorker || !host.IsControl || force {
// remove unschedulable kubelet on etcd host
if err := removeKubelet(ctx, host); err != nil {
return err
}
if err := removeKubeproxy(ctx, host); err != nil {
return err
}
if err := removeNginxProxy(ctx, host); err != nil {
return err
}
if err := removeSidekick(ctx, host); err != nil {
return err
}
}
} }
log.Infof(ctx, "[%s] Successfully teared down Etcd Plane..", ETCDRole) log.Infof(ctx, "[%s] Successfully teared down Etcd Plane..", ETCDRole)
return nil return nil

View File

@@ -11,8 +11,8 @@ import (
"github.com/rancher/types/apis/management.cattle.io/v3" "github.com/rancher/types/apis/management.cattle.io/v3"
) )
func runKubelet(ctx context.Context, host *hosts.Host, kubeletService v3.KubeletService, df hosts.DialerFactory) error { func runKubelet(ctx context.Context, host *hosts.Host, kubeletService v3.KubeletService, df hosts.DialerFactory, unschedulable bool) error {
imageCfg, hostCfg := buildKubeletConfig(host, kubeletService) imageCfg, hostCfg := buildKubeletConfig(host, kubeletService, unschedulable)
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeletContainerName, host.Address, WorkerRole); err != nil { if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeletContainerName, host.Address, WorkerRole); err != nil {
return err return err
} }
@@ -23,7 +23,7 @@ func removeKubelet(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, KubeletContainerName, host.Address) return docker.DoRemoveContainer(ctx, host.DClient, KubeletContainerName, host.Address)
} }
func buildKubeletConfig(host *hosts.Host, kubeletService v3.KubeletService) (*container.Config, *container.HostConfig) { func buildKubeletConfig(host *hosts.Host, kubeletService v3.KubeletService, unschedulable bool) (*container.Config, *container.HostConfig) {
imageCfg := &container.Config{ imageCfg := &container.Config{
Image: kubeletService.Image, Image: kubeletService.Image,
Entrypoint: []string{"/opt/rke/entrypoint.sh", Entrypoint: []string{"/opt/rke/entrypoint.sh",
@@ -46,6 +46,9 @@ func buildKubeletConfig(host *hosts.Host, kubeletService v3.KubeletService) (*co
"--require-kubeconfig=True", "--require-kubeconfig=True",
}, },
} }
if unschedulable {
imageCfg.Cmd = append(imageCfg.Cmd, "--register-with-taints=node-role.kubernetes.io/etcd=true:NoSchedule")
}
for _, role := range host.Role { for _, role := range host.Role {
switch role { switch role {
case ETCDRole: case ETCDRole:

View File

@@ -43,7 +43,7 @@ func TestKubeletConfig(t *testing.T) {
kubeletService.InfraContainerImage = TestKubeletInfraContainerImage kubeletService.InfraContainerImage = TestKubeletInfraContainerImage
kubeletService.ExtraArgs = map[string]string{"foo": "bar"} kubeletService.ExtraArgs = map[string]string{"foo": "bar"}
imageCfg, hostCfg := buildKubeletConfig(host, kubeletService) imageCfg, hostCfg := buildKubeletConfig(host, kubeletService, false)
// Test image and host config // Test image and host config
assertEqual(t, isStringInSlice(TestClusterDomainPrefix+TestKubeletClusterDomain, imageCfg.Entrypoint), true, assertEqual(t, isStringInSlice(TestClusterDomainPrefix+TestKubeletClusterDomain, imageCfg.Entrypoint), true,
fmt.Sprintf("Failed to find [%s] in Kubelet Command", TestClusterDomainPrefix+TestKubeletClusterDomain)) fmt.Sprintf("Failed to find [%s] in Kubelet Command", TestClusterDomainPrefix+TestKubeletClusterDomain))

View File

@@ -9,15 +9,26 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
func RunWorkerPlane(ctx context.Context, controlHosts, workerHosts []*hosts.Host, workerServices v3.RKEConfigServices, nginxProxyImage, sidekickImage string, localConnDialerFactory hosts.DialerFactory) error { func RunWorkerPlane(ctx context.Context, controlHosts, workerHosts, etcdHosts []*hosts.Host, workerServices v3.RKEConfigServices, nginxProxyImage, sidekickImage string, localConnDialerFactory hosts.DialerFactory) error {
log.Infof(ctx, "[%s] Building up Worker Plane..", WorkerRole) log.Infof(ctx, "[%s] Building up Worker Plane..", WorkerRole)
var errgrp errgroup.Group var errgrp errgroup.Group
// Deploy worker components on etcd hosts
for _, host := range etcdHosts {
etcdHost := host
errgrp.Go(func() error {
return doDeployWorkerPlane(ctx, etcdHost, workerServices, nginxProxyImage, sidekickImage, localConnDialerFactory, controlHosts, true)
})
}
if err := errgrp.Wait(); err != nil {
return err
}
// Deploy worker components on control hosts // Deploy worker components on control hosts
for _, host := range controlHosts { for _, host := range controlHosts {
controlHost := host controlHost := host
errgrp.Go(func() error { errgrp.Go(func() error {
return doDeployWorkerPlane(ctx, controlHost, workerServices, nginxProxyImage, sidekickImage, localConnDialerFactory, controlHosts) return doDeployWorkerPlane(ctx, controlHost, workerServices, nginxProxyImage, sidekickImage, localConnDialerFactory, controlHosts, false)
}) })
} }
if err := errgrp.Wait(); err != nil { if err := errgrp.Wait(); err != nil {
@@ -27,7 +38,7 @@ func RunWorkerPlane(ctx context.Context, controlHosts, workerHosts []*hosts.Host
for _, host := range workerHosts { for _, host := range workerHosts {
workerHost := host workerHost := host
errgrp.Go(func() error { errgrp.Go(func() error {
return doDeployWorkerPlane(ctx, workerHost, workerServices, nginxProxyImage, sidekickImage, localConnDialerFactory, controlHosts) return doDeployWorkerPlane(ctx, workerHost, workerServices, nginxProxyImage, sidekickImage, localConnDialerFactory, controlHosts, false)
}) })
} }
if err := errgrp.Wait(); err != nil { if err := errgrp.Wait(); err != nil {
@@ -68,7 +79,17 @@ func doDeployWorkerPlane(ctx context.Context, host *hosts.Host,
workerServices v3.RKEConfigServices, workerServices v3.RKEConfigServices,
nginxProxyImage, sidekickImage string, nginxProxyImage, sidekickImage string,
localConnDialerFactory hosts.DialerFactory, localConnDialerFactory hosts.DialerFactory,
controlHosts []*hosts.Host) error { controlHosts []*hosts.Host,
unschedulable bool) error {
// skipping deploying unschedulable kubelet on etcd node
if unschedulable && host.IsWorker {
log.Infof(ctx, "[%s] Host [%s] is already worker host, skipping deploying unschedulable kubelet", WorkerRole, host.Address)
return nil
} else if unschedulable && host.IsControl {
log.Infof(ctx, "[%s] Host [%s] is already control host, skipping deploying unschedulable kubelet", WorkerRole, host.Address)
return nil
}
// run nginx proxy // run nginx proxy
if !host.IsControl { if !host.IsControl {
if err := runNginxProxy(ctx, host, controlHosts, nginxProxyImage); err != nil { if err := runNginxProxy(ctx, host, controlHosts, nginxProxyImage); err != nil {
@@ -80,7 +101,7 @@ func doDeployWorkerPlane(ctx context.Context, host *hosts.Host,
return err return err
} }
// run kubelet // run kubelet
if err := runKubelet(ctx, host, workerServices.Kubelet, localConnDialerFactory); err != nil { if err := runKubelet(ctx, host, workerServices.Kubelet, localConnDialerFactory, unschedulable); err != nil {
return err return err
} }
return runKubeproxy(ctx, host, workerServices.Kubeproxy, localConnDialerFactory) return runKubeproxy(ctx, host, workerServices.Kubeproxy, localConnDialerFactory)

View File

@@ -192,6 +192,9 @@ spec:
operator: "Exists" operator: "Exists"
- key: "node-role.kubernetes.io/master" - key: "node-role.kubernetes.io/master"
operator: "Exists" operator: "Exists"
- key: "node-role.kubernetes.io/etcd"
operator: "Exists"
effect: "NoSchedule"
containers: containers:
# Runs calico/node container on each Kubernetes node. This # Runs calico/node container on each Kubernetes node. This
# container programs network policy and routes on each # container programs network policy and routes on each
@@ -375,6 +378,9 @@ spec:
operator: "Exists" operator: "Exists"
- key: "node-role.kubernetes.io/master" - key: "node-role.kubernetes.io/master"
operator: "Exists" operator: "Exists"
- key: "node-role.kubernetes.io/etcd"
operator: "Exists"
effect: "NoSchedule"
containers: containers:
- name: calico-kube-controllers - name: calico-kube-controllers
image: {{.ControllersImage}} image: {{.ControllersImage}}

View File

@@ -212,10 +212,13 @@ spec:
- key: node.cloudprovider.kubernetes.io/uninitialized - key: node.cloudprovider.kubernetes.io/uninitialized
value: "true" value: "true"
effect: NoSchedule effect: NoSchedule
# Allow the pod to run on the master. This is required for # Allow the pod to run on the master abd etcd. This is required for
# the master to communicate with pods. # the master to communicate with pods.
- key: "node-role.kubernetes.io/master" - key: "node-role.kubernetes.io/master"
operator: "Exists" operator: "Exists"
- key: "node-role.kubernetes.io/etcd"
operator: "Exists"
effect: "NoSchedule"
# Mark the pod as a critical add-on for rescheduling. # Mark the pod as a critical add-on for rescheduling.
- key: "CriticalAddonsOnly" - key: "CriticalAddonsOnly"
operator: "Exists" operator: "Exists"

View File

@@ -151,6 +151,9 @@ spec:
- key: node-role.kubernetes.io/master - key: node-role.kubernetes.io/master
operator: Exists operator: Exists
effect: NoSchedule effect: NoSchedule
- key: "node-role.kubernetes.io/etcd"
operator: "Exists"
effect: "NoSchedule"
volumes: volumes:
- name: run - name: run
hostPath: hostPath: