1
0
mirror of https://github.com/rancher/rke.git synced 2025-04-28 03:31:24 +00:00

Add nginx ingress controller and labels/taints sync

This commit is contained in:
galal-hussein 2018-02-01 23:28:31 +02:00
parent 597c844a02
commit f8f877ba27
22 changed files with 706 additions and 147 deletions

View File

@ -195,6 +195,33 @@ rke config --name mycluster.yml
RKE will ask some questions around the cluster file like number of the hosts, ips, ssh users, etc, `--empty` option will generate an empty cluster.yml file, also if you just want to print on the screen and not save it in a file you can use `--print`.
## Ingress Controller
RKE will deploy Nginx controller by default, user can disable this by specifying `none` to `ingress` option in the cluster configuration, user also can specify list of options fo nginx config map listed in this [docs](https://github.com/kubernetes/ingress-nginx/blob/master/docs/user-guide/configmap.md), for example:
```
ingress:
type: nginx
options:
map-hash-bucket-size: "128"
ssl-protocols: SSLv2
```
RKE will deploy ingress controller on all schedulable nodes (controlplane and workers), to specify only certain nodes for ingress controller to be deployed user has to specify `node_selector` for the ingress and the right label on the node, for example:
```
nodes:
- address: 1.1.1.1
role: [controlplane,worker,etcd]
user: root
labels:
app: ingress
ingress:
type: nginx
node_selector:
app: ingress
```
RKE will deploy Nginx Ingress controller as a DaemonSet with `hostnetwork: true`, so ports `80`, and `443` will be opened on each node where the controller is deployed.
## License
Copyright (c) 2017 [Rancher Labs, Inc.](http://rancher.com)

8
addons/ingress.go Normal file
View File

@ -0,0 +1,8 @@
package addons
import "github.com/rancher/rke/templates"
func GetNginxIngressManifest(IngressConfig interface{}) (string, error) {
return templates.CompileTemplateFromMap(templates.NginxIngressTemplate, IngressConfig)
}

View File

@ -13,11 +13,20 @@ import (
const (
KubeDNSAddonResourceName = "rke-kubedns-addon"
UserAddonResourceName = "rke-user-addon"
IngressAddonResourceName = "rke-ingress-controller"
)
type ingressOptions struct {
RBACConfig string
Options map[string]string
NodeSelector map[string]string
}
func (c *Cluster) DeployK8sAddOns(ctx context.Context) error {
err := c.deployKubeDNS(ctx)
return err
if err := c.deployKubeDNS(ctx); err != nil {
return err
}
return c.deployIngress(ctx)
}
func (c *Cluster) DeployUserAddOns(ctx context.Context) error {
@ -113,3 +122,26 @@ func (c *Cluster) ApplySystemAddonExcuteJob(addonJob string) error {
}
return nil
}
func (c *Cluster) deployIngress(ctx context.Context) error {
log.Infof(ctx, "[ingress] Setting up %s ingress controller", c.Ingress.Type)
if c.Ingress.Type == "none" {
log.Infof(ctx, "[ingress] ingress controller is not defined")
return nil
}
ingressConfig := ingressOptions{
RBACConfig: c.Authorization.Mode,
Options: c.Ingress.Options,
NodeSelector: c.Ingress.NodeSelector,
}
// Currently only deploying nginx ingress controller
ingressYaml, err := addons.GetNginxIngressManifest(ingressConfig)
if err != nil {
return err
}
if err := c.doAddonDeploy(ctx, ingressYaml, IngressAddonResourceName); err != nil {
return err
}
log.Infof(ctx, "[ingress] ingress controller %s is successfully deployed", c.Ingress.Type)
return nil
}

View File

@ -7,10 +7,10 @@ import (
"path/filepath"
"strings"
ref "github.com/docker/distribution/reference"
"github.com/rancher/rke/authz"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/k8s"
"github.com/rancher/rke/log"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/services"
@ -145,90 +145,6 @@ func ParseCluster(
return c, nil
}
func (c *Cluster) setClusterDefaults(ctx context.Context) {
if len(c.SSHKeyPath) == 0 {
c.SSHKeyPath = DefaultClusterSSHKeyPath
}
for i, host := range c.Nodes {
if len(host.InternalAddress) == 0 {
c.Nodes[i].InternalAddress = c.Nodes[i].Address
}
if len(host.HostnameOverride) == 0 {
// This is a temporary modification
c.Nodes[i].HostnameOverride = c.Nodes[i].Address
}
if len(host.SSHKeyPath) == 0 {
c.Nodes[i].SSHKeyPath = c.SSHKeyPath
}
}
if len(c.Authorization.Mode) == 0 {
c.Authorization.Mode = DefaultAuthorizationMode
}
if c.Services.KubeAPI.PodSecurityPolicy && c.Authorization.Mode != services.RBACAuthorizationMode {
log.Warnf(ctx, "PodSecurityPolicy can't be enabled with RBAC support disabled")
c.Services.KubeAPI.PodSecurityPolicy = false
}
c.setClusterImageDefaults()
c.setClusterKubernetesImageVersion(ctx)
c.setClusterServicesDefaults()
c.setClusterNetworkDefaults()
}
func (c *Cluster) setClusterKubernetesImageVersion(ctx context.Context) {
k8sImageNamed, _ := ref.ParseNormalizedNamed(c.SystemImages.Kubernetes)
// Kubernetes image is already set by c.setClusterImageDefaults(),
// I will override it here if Version is set.
var VersionedImageNamed ref.NamedTagged
if c.Version != "" {
VersionedImageNamed, _ = ref.WithTag(ref.TrimNamed(k8sImageNamed), c.Version)
c.SystemImages.Kubernetes = VersionedImageNamed.String()
}
normalizedSystemImage, _ := ref.ParseNormalizedNamed(c.SystemImages.Kubernetes)
if normalizedSystemImage.String() != k8sImageNamed.String() {
log.Infof(ctx, "Overrding Kubernetes image [%s] with tag [%s]", VersionedImageNamed.Name(), VersionedImageNamed.Tag())
}
}
func (c *Cluster) setClusterServicesDefaults() {
serviceConfigDefaultsMap := map[*string]string{
&c.Services.KubeAPI.ServiceClusterIPRange: DefaultServiceClusterIPRange,
&c.Services.KubeController.ServiceClusterIPRange: DefaultServiceClusterIPRange,
&c.Services.KubeController.ClusterCIDR: DefaultClusterCIDR,
&c.Services.Kubelet.ClusterDNSServer: DefaultClusterDNSService,
&c.Services.Kubelet.ClusterDomain: DefaultClusterDomain,
&c.Services.Kubelet.InfraContainerImage: DefaultInfraContainerImage,
&c.Authentication.Strategy: DefaultAuthStrategy,
&c.Services.KubeAPI.Image: c.SystemImages.Kubernetes,
&c.Services.Scheduler.Image: c.SystemImages.Kubernetes,
&c.Services.KubeController.Image: c.SystemImages.Kubernetes,
&c.Services.Kubelet.Image: c.SystemImages.Kubernetes,
&c.Services.Kubeproxy.Image: c.SystemImages.Kubernetes,
&c.Services.Etcd.Image: c.SystemImages.Etcd,
}
for k, v := range serviceConfigDefaultsMap {
setDefaultIfEmpty(k, v)
}
}
func (c *Cluster) setClusterImageDefaults() {
systemImagesDefaultsMap := map[*string]string{
&c.SystemImages.Alpine: DefaultAplineImage,
&c.SystemImages.NginxProxy: DefaultNginxProxyImage,
&c.SystemImages.CertDownloader: DefaultCertDownloaderImage,
&c.SystemImages.KubeDNS: DefaultKubeDNSImage,
&c.SystemImages.KubeDNSSidecar: DefaultKubeDNSSidecarImage,
&c.SystemImages.DNSmasq: DefaultDNSmasqImage,
&c.SystemImages.KubeDNSAutoscaler: DefaultKubeDNSAutoScalerImage,
&c.SystemImages.KubernetesServicesSidecar: DefaultKubernetesServicesSidecarImage,
&c.SystemImages.Etcd: DefaultEtcdImage,
&c.SystemImages.Kubernetes: DefaultK8sImage,
}
for k, v := range systemImagesDefaultsMap {
setDefaultIfEmpty(k, v)
}
}
func GetLocalKubeConfig(configPath, configDir string) string {
baseDir := filepath.Dir(configPath)
if len(configDir) > 0 {
@ -337,3 +253,29 @@ func (c *Cluster) getUniqueHostList() []*hosts.Host {
}
return uniqHostList
}
func (c *Cluster) DeployAddons(ctx context.Context) error {
if err := c.DeployK8sAddOns(ctx); err != nil {
return err
}
return c.DeployUserAddOns(ctx)
}
func (c *Cluster) SyncLabelsAndTaints(ctx context.Context) error {
log.Infof(ctx, "[sync] Syncing nodes Labels and Taints")
k8sClient, err := k8s.NewClient(c.LocalKubeConfigPath)
if err != nil {
return fmt.Errorf("Failed to initialize new kubernetes client: %v", err)
}
for _, host := range c.getUniqueHostList() {
if err := k8s.SyncLabels(k8sClient, host.HostnameOverride, host.ToAddLabels, host.ToDelLabels); err != nil {
return err
}
// Taints are not being added by user
if err := k8s.SyncTaints(k8sClient, host.HostnameOverride, host.ToAddTaints, host.ToDelTaints); err != nil {
return err
}
}
log.Infof(ctx, "[sync] Successfully synced nodes Labels and Taints")
return nil
}

View File

@ -1,5 +1,13 @@
package cluster
import (
"context"
ref "github.com/docker/distribution/reference"
"github.com/rancher/rke/log"
"github.com/rancher/rke/services"
)
const (
DefaultClusterConfig = "cluster.yml"
@ -22,6 +30,7 @@ const (
DefaultNginxProxyImage = "rancher/rke-nginx-proxy:v0.1.1"
DefaultCertDownloaderImage = "rancher/rke-cert-deployer:v0.1.1"
DefaultKubernetesServicesSidecarImage = "rancher/rke-service-sidekick:v0.1.0"
DefaultIngressController = "nginx"
DefaultEtcdImage = "quay.io/coreos/etcd:v3.0.17"
DefaultK8sImage = "rancher/k8s:v1.8.5-rancher4"
@ -52,8 +61,97 @@ func setDefaultIfEmptyMapValue(configMap map[string]string, key string, value st
configMap[key] = value
}
}
func setDefaultIfEmpty(varName *string, defaultValue string) {
if len(*varName) == 0 {
*varName = defaultValue
}
}
func (c *Cluster) setClusterDefaults(ctx context.Context) {
if len(c.SSHKeyPath) == 0 {
c.SSHKeyPath = DefaultClusterSSHKeyPath
}
for i, host := range c.Nodes {
if len(host.InternalAddress) == 0 {
c.Nodes[i].InternalAddress = c.Nodes[i].Address
}
if len(host.HostnameOverride) == 0 {
// This is a temporary modification
c.Nodes[i].HostnameOverride = c.Nodes[i].Address
}
if len(host.SSHKeyPath) == 0 {
c.Nodes[i].SSHKeyPath = c.SSHKeyPath
}
}
if len(c.Authorization.Mode) == 0 {
c.Authorization.Mode = DefaultAuthorizationMode
}
if c.Services.KubeAPI.PodSecurityPolicy && c.Authorization.Mode != services.RBACAuthorizationMode {
log.Warnf(ctx, "PodSecurityPolicy can't be enabled with RBAC support disabled")
c.Services.KubeAPI.PodSecurityPolicy = false
}
if len(c.Ingress.Type) == 0 {
c.Ingress.Type = DefaultIngressController
}
c.setClusterImageDefaults()
c.setClusterKubernetesImageVersion(ctx)
c.setClusterServicesDefaults()
c.setClusterNetworkDefaults()
}
func (c *Cluster) setClusterKubernetesImageVersion(ctx context.Context) {
k8sImageNamed, _ := ref.ParseNormalizedNamed(c.SystemImages.Kubernetes)
// Kubernetes image is already set by c.setClusterImageDefaults(),
// I will override it here if Version is set.
var VersionedImageNamed ref.NamedTagged
if c.Version != "" {
VersionedImageNamed, _ = ref.WithTag(ref.TrimNamed(k8sImageNamed), c.Version)
c.SystemImages.Kubernetes = VersionedImageNamed.String()
}
normalizedSystemImage, _ := ref.ParseNormalizedNamed(c.SystemImages.Kubernetes)
if normalizedSystemImage.String() != k8sImageNamed.String() {
log.Infof(ctx, "Overrding Kubernetes image [%s] with tag [%s]", VersionedImageNamed.Name(), VersionedImageNamed.Tag())
}
}
func (c *Cluster) setClusterServicesDefaults() {
serviceConfigDefaultsMap := map[*string]string{
&c.Services.KubeAPI.ServiceClusterIPRange: DefaultServiceClusterIPRange,
&c.Services.KubeController.ServiceClusterIPRange: DefaultServiceClusterIPRange,
&c.Services.KubeController.ClusterCIDR: DefaultClusterCIDR,
&c.Services.Kubelet.ClusterDNSServer: DefaultClusterDNSService,
&c.Services.Kubelet.ClusterDomain: DefaultClusterDomain,
&c.Services.Kubelet.InfraContainerImage: DefaultInfraContainerImage,
&c.Authentication.Strategy: DefaultAuthStrategy,
&c.Services.KubeAPI.Image: c.SystemImages.Kubernetes,
&c.Services.Scheduler.Image: c.SystemImages.Kubernetes,
&c.Services.KubeController.Image: c.SystemImages.Kubernetes,
&c.Services.Kubelet.Image: c.SystemImages.Kubernetes,
&c.Services.Kubeproxy.Image: c.SystemImages.Kubernetes,
&c.Services.Etcd.Image: c.SystemImages.Etcd,
}
for k, v := range serviceConfigDefaultsMap {
setDefaultIfEmpty(k, v)
}
}
func (c *Cluster) setClusterImageDefaults() {
systemImagesDefaultsMap := map[*string]string{
&c.SystemImages.Alpine: DefaultAplineImage,
&c.SystemImages.NginxProxy: DefaultNginxProxyImage,
&c.SystemImages.CertDownloader: DefaultCertDownloaderImage,
&c.SystemImages.KubeDNS: DefaultKubeDNSImage,
&c.SystemImages.KubeDNSSidecar: DefaultKubeDNSSidecarImage,
&c.SystemImages.DNSmasq: DefaultDNSmasqImage,
&c.SystemImages.KubeDNSAutoscaler: DefaultKubeDNSAutoScalerImage,
&c.SystemImages.KubernetesServicesSidecar: DefaultKubernetesServicesSidecarImage,
&c.SystemImages.Etcd: DefaultEtcdImage,
&c.SystemImages.Kubernetes: DefaultK8sImage,
}
for k, v := range systemImagesDefaultsMap {
setDefaultIfEmpty(k, v)
}
}

View File

@ -12,6 +12,12 @@ import (
"github.com/sirupsen/logrus"
)
const (
etcdRoleLabel = "node-role.kubernetes.io/etcd"
masterRoleLabel = "node-role.kubernetes.io/master"
workerRoleLabel = "node-role.kubernetes.io/worker"
)
func (c *Cluster) TunnelHosts(ctx context.Context, local bool) error {
if local {
if err := c.EtcdHosts[0].TunnelUpLocal(ctx); err != nil {
@ -45,8 +51,14 @@ func (c *Cluster) InvertIndexHosts() error {
for _, host := range c.Nodes {
newHost := hosts.Host{
RKEConfigNode: host,
ToAddLabels: map[string]string{},
ToDelLabels: map[string]string{},
ToAddTaints: []string{},
ToDelTaints: []string{},
}
for k, v := range host.Labels {
newHost.ToAddLabels[k] = v
}
newHost.IgnoreDockerVersion = c.IgnoreDockerVersion
for _, role := range host.Role {
@ -54,17 +66,29 @@ func (c *Cluster) InvertIndexHosts() error {
switch role {
case services.ETCDRole:
newHost.IsEtcd = true
newHost.ToAddLabels[etcdRoleLabel] = "true"
c.EtcdHosts = append(c.EtcdHosts, &newHost)
case services.ControlRole:
newHost.IsControl = true
newHost.ToAddLabels[masterRoleLabel] = "true"
c.ControlPlaneHosts = append(c.ControlPlaneHosts, &newHost)
case services.WorkerRole:
newHost.IsWorker = true
newHost.ToAddLabels[workerRoleLabel] = "true"
c.WorkerHosts = append(c.WorkerHosts, &newHost)
default:
return fmt.Errorf("Failed to recognize host [%s] role %s", host.Address, role)
}
}
if !newHost.IsEtcd {
newHost.ToDelLabels[etcdRoleLabel] = "true"
}
if !newHost.IsControl {
newHost.ToDelLabels[masterRoleLabel] = "true"
}
if !newHost.IsWorker {
newHost.ToDelLabels[workerRoleLabel] = "true"
}
}
return nil
}

View File

@ -16,7 +16,7 @@ import (
)
const (
taintKey = "node-role.kubernetes.io/etcd"
unschedulableEtcdTaint = "node-role.kubernetes.io/etcd=true:NoExecute"
)
func ReconcileCluster(ctx context.Context, kubeCluster, currentCluster *Cluster) error {
@ -31,6 +31,8 @@ func ReconcileCluster(ctx context.Context, kubeCluster, currentCluster *Cluster)
if err != nil {
return fmt.Errorf("Failed to initialize new kubernetes client: %v", err)
}
// sync node labels to define the toDelete labels
syncLabels(ctx, currentCluster, kubeCluster)
if err := reconcileEtcd(ctx, currentCluster, kubeCluster, kubeClient); err != nil {
return fmt.Errorf("Failed to reconcile etcd plane: %v", err)
@ -66,9 +68,7 @@ func reconcileWorker(ctx context.Context, currentCluster, kubeCluster *Cluster,
toAddHosts := hosts.GetToAddHosts(currentCluster.WorkerHosts, kubeCluster.WorkerHosts)
for _, host := range toAddHosts {
if host.IsEtcd {
if err := hosts.RemoveTaintFromHost(ctx, host, taintKey, kubeClient); err != nil {
return fmt.Errorf("[reconcile] Failed to remove unschedulable taint from node [%s]", host.Address)
}
host.ToDelTaints = append(host.ToDelTaints, unschedulableEtcdTaint)
}
}
return nil
@ -111,9 +111,7 @@ func reconcileControl(ctx context.Context, currentCluster, kubeCluster *Cluster,
toAddHosts := hosts.GetToAddHosts(currentCluster.ControlPlaneHosts, kubeCluster.ControlPlaneHosts)
for _, host := range toAddHosts {
if host.IsEtcd {
if err := hosts.RemoveTaintFromHost(ctx, host, taintKey, kubeClient); err != nil {
log.Warnf(ctx, "[reconcile] Failed to remove unschedulable taint from node [%s]", host.Address)
}
host.ToDelTaints = append(host.ToDelTaints, unschedulableEtcdTaint)
}
}
return nil
@ -149,7 +147,7 @@ func reconcileHost(ctx context.Context, toDeleteHost *hosts.Host, worker, etcd b
}
func reconcileEtcd(ctx context.Context, currentCluster, kubeCluster *Cluster, kubeClient *kubernetes.Clientset) error {
logrus.Infof("[reconcile] Check etcd hosts to be deleted")
log.Infof(ctx, "[reconcile] Check etcd hosts to be deleted")
// get tls for the first current etcd host
clientCert := cert.EncodeCertPEM(currentCluster.Certificates[pki.KubeNodeCertName].Certificate)
clientkey := cert.EncodePrivateKeyPEM(currentCluster.Certificates[pki.KubeNodeCertName].Key)
@ -205,3 +203,20 @@ func reconcileEtcd(ctx context.Context, currentCluster, kubeCluster *Cluster, ku
}
return nil
}
func syncLabels(ctx context.Context, currentCluster, kubeCluster *Cluster) {
currentHosts := currentCluster.getUniqueHostList()
configHosts := kubeCluster.getUniqueHostList()
for _, host := range configHosts {
for _, currentHost := range currentHosts {
if host.Address == currentHost.Address {
for k, v := range currentHost.Labels {
if _, ok := host.Labels[k]; !ok {
host.ToDelLabels[k] = v
}
}
break
}
}
}
}

View File

@ -31,6 +31,11 @@ func (c *Cluster) ValidateCluster() error {
return err
}
// validate Ingress options
if err := validateIngressOptions(c); err != nil {
return err
}
// validate services options
return validateServicesOptions(c)
}
@ -91,3 +96,11 @@ func validateServicesOptions(c *Cluster) error {
}
return nil
}
func validateIngressOptions(c *Cluster) error {
// Should be changed when adding more ingress types
if c.Ingress.Type != DefaultIngressController && c.Ingress.Type != "none" {
return fmt.Errorf("Ingress controller %s is incorrect", c.Ingress.Type)
}
return nil
}

View File

@ -46,7 +46,7 @@ func ClusterRemove(
dialerFactory hosts.DialerFactory,
local bool, configDir string) error {
logrus.Infof("Tearing down Kubernetes cluster")
log.Infof(ctx, "Tearing down Kubernetes cluster")
kubeCluster, err := cluster.ParseCluster(ctx, rkeConfig, clusterFilePath, configDir, dialerFactory, nil)
if err != nil {
return err

View File

@ -98,12 +98,12 @@ func ClusterUp(
return APIURL, caCrt, clientCert, clientKey, err
}
err = kubeCluster.DeployK8sAddOns(ctx)
err = kubeCluster.SyncLabelsAndTaints(ctx)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, err
}
err = kubeCluster.DeployUserAddOns(ctx)
err = kubeCluster.DeployAddons(ctx)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, err
}

View File

@ -25,6 +25,10 @@ type Host struct {
ToAddEtcdMember bool
ExistingEtcdCluster bool
SavedKeyPhrase string
ToAddLabels map[string]string
ToDelLabels map[string]string
ToAddTaints []string
ToDelTaints []string
}
const (

View File

@ -2,6 +2,8 @@ package k8s
import (
"fmt"
"reflect"
"strings"
"time"
"github.com/sirupsen/logrus"
@ -22,6 +24,7 @@ func GetNodeList(k8sClient *kubernetes.Clientset) (*v1.NodeList, error) {
func GetNode(k8sClient *kubernetes.Clientset, nodeName string) (*v1.Node, error) {
return k8sClient.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
}
func CordonUncordon(k8sClient *kubernetes.Clientset, nodeName string, cordoned bool) error {
updated := false
for retries := 0; retries <= 5; retries++ {
@ -98,3 +101,128 @@ func RemoveTaintFromNodeByKey(k8sClient *kubernetes.Clientset, nodeName, taintKe
}
return nil
}
func SyncLabels(k8sClient *kubernetes.Clientset, nodeName string, toAddLabels, toDelLabels map[string]string) error {
updated := false
var err error
for retries := 0; retries <= 5; retries++ {
if err = doSyncLabels(k8sClient, nodeName, toAddLabels, toDelLabels); err != nil {
time.Sleep(5 * time.Second)
continue
}
updated = true
break
}
if !updated {
return fmt.Errorf("Timeout waiting for labels to be synced for node [%s]: %v", nodeName, err)
}
return nil
}
func SyncTaints(k8sClient *kubernetes.Clientset, nodeName string, toAddTaints, toDelTaints []string) error {
updated := false
var err error
var node *v1.Node
for retries := 0; retries <= 5; retries++ {
if err = doSyncTaints(k8sClient, nodeName, toAddTaints, toDelTaints); err != nil {
time.Sleep(5 * time.Second)
continue
}
updated = true
break
}
if !updated {
return fmt.Errorf("Timeout waiting for node [%s] to be updated with new set of taints: %v", node.Name, err)
}
return nil
}
func doSyncLabels(k8sClient *kubernetes.Clientset, nodeName string, toAddLabels, toDelLabels map[string]string) error {
node, err := GetNode(k8sClient, nodeName)
oldLabels := make(map[string]string)
for k, v := range node.Labels {
oldLabels[k] = v
}
if err != nil {
if apierrors.IsNotFound(err) {
logrus.Debugf("[hosts] Can't find node by name [%s]", nodeName)
return nil
}
return err
}
// Delete Labels
for key := range toDelLabels {
if _, ok := node.Labels[key]; ok {
delete(node.Labels, key)
}
}
// ADD Labels
for key, value := range toAddLabels {
node.Labels[key] = value
}
if reflect.DeepEqual(oldLabels, node.Labels) {
logrus.Debugf("Labels are not changed for node [%s]", node.Name)
return nil
}
_, err = k8sClient.CoreV1().Nodes().Update(node)
if err != nil {
logrus.Debugf("Error syncing labels for node [%s]: %v", node.Name, err)
return err
}
return nil
}
func doSyncTaints(k8sClient *kubernetes.Clientset, nodeName string, toAddTaints, toDelTaints []string) error {
node, err := GetNode(k8sClient, nodeName)
if err != nil {
if apierrors.IsNotFound(err) {
logrus.Debugf("[hosts] Can't find node by name [%s]", nodeName)
return nil
}
return err
}
// Add taints to node
for _, taintStr := range toAddTaints {
if isTaintExist(toTaint(taintStr), node.Spec.Taints) {
continue
}
node.Spec.Taints = append(node.Spec.Taints, toTaint(taintStr))
}
// Remove Taints from node
for i, taintStr := range toDelTaints {
if isTaintExist(toTaint(taintStr), node.Spec.Taints) {
node.Spec.Taints = append(node.Spec.Taints[:i], node.Spec.Taints[i+1:]...)
}
}
//node.Spec.Taints
_, err = k8sClient.CoreV1().Nodes().Update(node)
if err != nil {
logrus.Debugf("Error updating node [%s] with new set of taints: %v", node.Name, err)
return err
}
return nil
}
func isTaintExist(taint v1.Taint, taintList []v1.Taint) bool {
for _, t := range taintList {
if t.Key == taint.Key && t.Value == taint.Value && t.Effect == taint.Effect {
return true
}
}
return false
}
func toTaint(taintStr string) v1.Taint {
taintStruct := strings.Split(taintStr, "=")
tmp := strings.Split(taintStruct[1], ":")
key := taintStruct[0]
value := tmp[0]
effect := v1.TaintEffect(tmp[1])
return v1.Taint{
Key: key,
Value: value,
Effect: effect,
TimeAdded: metav1.Time{time.Now()},
}
}

View File

@ -8,7 +8,6 @@ import (
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/log"
"github.com/sirupsen/logrus"
"k8s.io/client-go/util/cert"
)
@ -29,7 +28,7 @@ type CertificatePKI struct {
// StartCertificatesGeneration ...
func StartCertificatesGeneration(ctx context.Context, cpHosts, etcdHosts []*hosts.Host, clusterDomain, localConfigPath string, KubernetesServiceIP net.IP) (map[string]CertificatePKI, error) {
logrus.Infof("[certificates] Generating kubernetes certificates")
log.Infof(ctx, "[certificates] Generating kubernetes certificates")
certs, err := generateCerts(ctx, cpHosts, etcdHosts, clusterDomain, localConfigPath, KubernetesServiceIP)
if err != nil {
return nil, err
@ -89,7 +88,7 @@ func generateCerts(ctx context.Context, cpHosts, etcdHosts []*hosts.Host, cluste
certs[KubeNodeCertName] = ToCertObject(KubeNodeCertName, KubeNodeCommonName, KubeNodeOrganizationName, nodeCrt, nodeKey)
// generate Admin certificate and key
logrus.Infof("[certificates] Generating admin certificates and kubeconfig")
log.Infof(ctx, "[certificates] Generating admin certificates and kubeconfig")
kubeAdminCrt, kubeAdminKey, err := GenerateSignedCertAndKey(caCrt, caKey, false, KubeAdminCertName, nil, nil, []string{KubeAdminOrganizationName})
if err != nil {
return nil, err
@ -108,7 +107,7 @@ func generateCerts(ctx context.Context, cpHosts, etcdHosts []*hosts.Host, cluste
etcdAltNames := GetAltNames(etcdHosts, clusterDomain, KubernetesServiceIP)
for _, host := range etcdHosts {
logrus.Infof("[certificates] Generating etcd-%s certificate and key", host.InternalAddress)
log.Infof(ctx, "[certificates] Generating etcd-%s certificate and key", host.InternalAddress)
etcdCrt, etcdKey, err := GenerateSignedCertAndKey(caCrt, caKey, true, EtcdCertName, etcdAltNames, nil, nil)
if err != nil {
return nil, err

View File

@ -11,8 +11,8 @@ import (
"github.com/rancher/types/apis/management.cattle.io/v3"
)
func runKubelet(ctx context.Context, host *hosts.Host, kubeletService v3.KubeletService, df hosts.DialerFactory, unschedulable bool, prsMap map[string]v3.PrivateRegistry) error {
imageCfg, hostCfg := buildKubeletConfig(host, kubeletService, unschedulable)
func runKubelet(ctx context.Context, host *hosts.Host, kubeletService v3.KubeletService, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
imageCfg, hostCfg := buildKubeletConfig(host, kubeletService)
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeletContainerName, host.Address, WorkerRole, prsMap); err != nil {
return err
}
@ -23,7 +23,7 @@ func removeKubelet(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, KubeletContainerName, host.Address)
}
func buildKubeletConfig(host *hosts.Host, kubeletService v3.KubeletService, unschedulable bool) (*container.Config, *container.HostConfig) {
func buildKubeletConfig(host *hosts.Host, kubeletService v3.KubeletService) (*container.Config, *container.HostConfig) {
imageCfg := &container.Config{
Image: kubeletService.Image,
Entrypoint: []string{"/opt/rke/entrypoint.sh",
@ -46,19 +46,6 @@ func buildKubeletConfig(host *hosts.Host, kubeletService v3.KubeletService, unsc
"--require-kubeconfig=True",
},
}
if unschedulable {
imageCfg.Cmd = append(imageCfg.Cmd, "--register-with-taints=node-role.kubernetes.io/etcd=true:NoSchedule")
}
for _, role := range host.Role {
switch role {
case ETCDRole:
imageCfg.Cmd = append(imageCfg.Cmd, "--node-labels=node-role.kubernetes.io/etcd=true")
case ControlRole:
imageCfg.Cmd = append(imageCfg.Cmd, "--node-labels=node-role.kubernetes.io/master=true")
case WorkerRole:
imageCfg.Cmd = append(imageCfg.Cmd, "--node-labels=node-role.kubernetes.io/worker=true")
}
}
hostCfg := &container.HostConfig{
VolumesFrom: []string{
SidekickContainerName,

View File

@ -19,9 +19,6 @@ const (
TestClusterDNSServerPrefix = "--cluster-dns="
TestInfraContainerImagePrefix = "--pod-infra-container-image="
TestHostnameOverridePrefix = "--hostname-override="
TestKubeletEtcdNodeLabel = "--node-labels=node-role.kubernetes.io/etcd=true"
TestKubeletCPNodeLabel = "--node-labels=node-role.kubernetes.io/master=true"
TestKubeletWorkerNodeLabel = "--node-labels=node-role.kubernetes.io/worker=true"
)
func TestKubeletConfig(t *testing.T) {
@ -43,7 +40,7 @@ func TestKubeletConfig(t *testing.T) {
kubeletService.InfraContainerImage = TestKubeletInfraContainerImage
kubeletService.ExtraArgs = map[string]string{"foo": "bar"}
imageCfg, hostCfg := buildKubeletConfig(host, kubeletService, false)
imageCfg, hostCfg := buildKubeletConfig(host, kubeletService)
// Test image and host config
assertEqual(t, isStringInSlice(TestClusterDomainPrefix+TestKubeletClusterDomain, imageCfg.Entrypoint), true,
fmt.Sprintf("Failed to find [%s] in Kubelet Command", TestClusterDomainPrefix+TestKubeletClusterDomain))
@ -65,10 +62,4 @@ func TestKubeletConfig(t *testing.T) {
"Failed to verify that Kubelet has host PID mode")
assertEqual(t, true, hostCfg.NetworkMode.IsHost(),
"Failed to verify that Kubelet has host Network mode")
assertEqual(t, isStringInSlice(TestKubeletEtcdNodeLabel, imageCfg.Cmd), true,
fmt.Sprintf("Failed to find [%s] in Kubelet Command", TestKubeletEtcdNodeLabel))
assertEqual(t, isStringInSlice(TestKubeletCPNodeLabel, imageCfg.Cmd), true,
fmt.Sprintf("Failed to find [%s] in Kubelet Command", TestKubeletCPNodeLabel))
assertEqual(t, isStringInSlice(TestKubeletWorkerNodeLabel, imageCfg.Cmd), true,
fmt.Sprintf("Failed to find [%s] in Kubelet Command", TestKubeletWorkerNodeLabel))
}

View File

@ -9,15 +9,23 @@ import (
"golang.org/x/sync/errgroup"
)
const (
unschedulableEtcdTaint = "node-role.kubernetes.io/etcd=true:NoExecute"
)
func RunWorkerPlane(ctx context.Context, controlHosts, workerHosts, etcdHosts []*hosts.Host, workerServices v3.RKEConfigServices, nginxProxyImage, sidekickImage string, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
log.Infof(ctx, "[%s] Building up Worker Plane..", WorkerRole)
var errgrp errgroup.Group
// Deploy worker components on etcd hosts
for _, host := range etcdHosts {
if !host.IsControl && !host.IsWorker {
// Add unschedulable taint
host.ToAddTaints = append(host.ToAddTaints, unschedulableEtcdTaint)
}
etcdHost := host
errgrp.Go(func() error {
return doDeployWorkerPlane(ctx, etcdHost, workerServices, nginxProxyImage, sidekickImage, localConnDialerFactory, controlHosts, true, prsMap)
return doDeployWorkerPlane(ctx, etcdHost, workerServices, nginxProxyImage, sidekickImage, localConnDialerFactory, controlHosts, prsMap)
})
}
if err := errgrp.Wait(); err != nil {
@ -28,7 +36,7 @@ func RunWorkerPlane(ctx context.Context, controlHosts, workerHosts, etcdHosts []
for _, host := range controlHosts {
controlHost := host
errgrp.Go(func() error {
return doDeployWorkerPlane(ctx, controlHost, workerServices, nginxProxyImage, sidekickImage, localConnDialerFactory, controlHosts, false, prsMap)
return doDeployWorkerPlane(ctx, controlHost, workerServices, nginxProxyImage, sidekickImage, localConnDialerFactory, controlHosts, prsMap)
})
}
if err := errgrp.Wait(); err != nil {
@ -38,7 +46,7 @@ func RunWorkerPlane(ctx context.Context, controlHosts, workerHosts, etcdHosts []
for _, host := range workerHosts {
workerHost := host
errgrp.Go(func() error {
return doDeployWorkerPlane(ctx, workerHost, workerServices, nginxProxyImage, sidekickImage, localConnDialerFactory, controlHosts, false, prsMap)
return doDeployWorkerPlane(ctx, workerHost, workerServices, nginxProxyImage, sidekickImage, localConnDialerFactory, controlHosts, prsMap)
})
}
if err := errgrp.Wait(); err != nil {
@ -80,16 +88,8 @@ func doDeployWorkerPlane(ctx context.Context, host *hosts.Host,
nginxProxyImage, sidekickImage string,
localConnDialerFactory hosts.DialerFactory,
controlHosts []*hosts.Host,
unschedulable bool, prsMap map[string]v3.PrivateRegistry) error {
prsMap map[string]v3.PrivateRegistry) error {
// skipping deploying unschedulable kubelet on etcd node
if unschedulable && host.IsWorker {
log.Infof(ctx, "[%s] Host [%s] is already worker host, skipping deploying unschedulable kubelet", WorkerRole, host.Address)
return nil
} else if unschedulable && host.IsControl {
log.Infof(ctx, "[%s] Host [%s] is already control host, skipping deploying unschedulable kubelet", WorkerRole, host.Address)
return nil
}
// run nginx proxy
if !host.IsControl {
if err := runNginxProxy(ctx, host, controlHosts, nginxProxyImage, prsMap); err != nil {
@ -101,7 +101,7 @@ func doDeployWorkerPlane(ctx context.Context, host *hosts.Host,
return err
}
// run kubelet
if err := runKubelet(ctx, host, workerServices.Kubelet, localConnDialerFactory, unschedulable, prsMap); err != nil {
if err := runKubelet(ctx, host, workerServices.Kubelet, localConnDialerFactory, prsMap); err != nil {
return err
}
return runKubeproxy(ctx, host, workerServices.Kubeproxy, localConnDialerFactory, prsMap)

View File

@ -186,7 +186,7 @@ spec:
operator: "Exists"
- key: "node-role.kubernetes.io/etcd"
operator: "Exists"
effect: "NoSchedule"
effect: "NoExecute"
containers:
# Runs calico/node container on each Kubernetes node. This
# container programs network policy and routes on each

View File

@ -218,7 +218,7 @@ spec:
operator: "Exists"
- key: "node-role.kubernetes.io/etcd"
operator: "Exists"
effect: "NoSchedule"
effect: "NoExecute"
# Mark the pod as a critical add-on for rescheduling.
- key: "CriticalAddonsOnly"
operator: "Exists"

View File

@ -153,7 +153,7 @@ spec:
effect: NoSchedule
- key: node-role.kubernetes.io/etcd
operator: Exists
effect: NoSchedule
effect: NoExecute
volumes:
- name: run
hostPath:

291
templates/nginx-ingress.go Normal file
View File

@ -0,0 +1,291 @@
package templates
const NginxIngressTemplate = `
apiVersion: v1
kind: Namespace
metadata:
name: ingress-nginx
---
kind: ConfigMap
apiVersion: v1
metadata:
name: nginx-configuration
namespace: ingress-nginx
labels:
app: ingress-nginx
data:
{{ range $k,$v := .Options }}
{{ $k }}: {{ $v }}
{{ end }}
---
kind: ConfigMap
apiVersion: v1
metadata:
name: tcp-services
namespace: ingress-nginx
---
kind: ConfigMap
apiVersion: v1
metadata:
name: udp-services
namespace: ingress-nginx
{{if eq .RBACConfig "rbac"}}
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: nginx-ingress-serviceaccount
namespace: ingress-nginx
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
name: nginx-ingress-clusterrole
rules:
- apiGroups:
- ""
resources:
- configmaps
- endpoints
- nodes
- pods
- secrets
verbs:
- list
- watch
- apiGroups:
- ""
resources:
- nodes
verbs:
- get
- apiGroups:
- ""
resources:
- services
verbs:
- get
- list
- watch
- apiGroups:
- "extensions"
resources:
- ingresses
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
- patch
- apiGroups:
- "extensions"
resources:
- ingresses/status
verbs:
- update
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: Role
metadata:
name: nginx-ingress-role
namespace: ingress-nginx
rules:
- apiGroups:
- ""
resources:
- configmaps
- pods
- secrets
- namespaces
verbs:
- get
- apiGroups:
- ""
resources:
- configmaps
resourceNames:
# Defaults to "<election-id>-<ingress-class>"
# Here: "<ingress-controller-leader>-<nginx>"
# This has to be adapted if you change either parameter
# when launching the nginx-ingress-controller.
- "ingress-controller-leader-nginx"
verbs:
- get
- update
- apiGroups:
- ""
resources:
- configmaps
verbs:
- create
- apiGroups:
- ""
resources:
- endpoints
verbs:
- get
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: RoleBinding
metadata:
name: nginx-ingress-role-nisa-binding
namespace: ingress-nginx
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: nginx-ingress-role
subjects:
- kind: ServiceAccount
name: nginx-ingress-serviceaccount
namespace: ingress-nginx
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: nginx-ingress-clusterrole-nisa-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: nginx-ingress-clusterrole
subjects:
- kind: ServiceAccount
name: nginx-ingress-serviceaccount
namespace: ingress-nginx
{{ end }}
---
apiVersion: extensions/v1beta1
kind: DaemonSet
metadata:
name: nginx-ingress-controller
namespace: ingress-nginx
spec:
selector:
matchLabels:
app: ingress-nginx
template:
metadata:
labels:
app: ingress-nginx
annotations:
prometheus.io/port: '10254'
prometheus.io/scrape: 'true'
spec:
hostNetwork: true
nodeSelector:
{{ range $k, $v := .NodeSelector }}
{{ $k }}: {{ $v }}
{{ end }}
{{if eq .RBACConfig "rbac"}}
serviceAccountName: nginx-ingress-serviceaccount
{{ end }}
initContainers:
- command:
- sh
- -c
- sysctl -w net.core.somaxconn=32768; sysctl -w net.ipv4.ip_local_port_range="1024 65535"
image: alpine:3.6
imagePullPolicy: IfNotPresent
name: sysctl
securityContext:
privileged: true
containers:
- name: nginx-ingress-controller
image: quay.io/kubernetes-ingress-controller/nginx-ingress-controller:0.10.2
args:
- /nginx-ingress-controller
- --default-backend-service=$(POD_NAMESPACE)/default-http-backend
- --configmap=$(POD_NAMESPACE)/nginx-configuration
- --tcp-services-configmap=$(POD_NAMESPACE)/tcp-services
- --udp-services-configmap=$(POD_NAMESPACE)/udp-services
- --annotations-prefix=nginx.ingress.kubernetes.io
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
ports:
- name: http
containerPort: 80
- name: https
containerPort: 443
livenessProbe:
failureThreshold: 3
httpGet:
path: /healthz
port: 10254
scheme: HTTP
initialDelaySeconds: 10
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 1
readinessProbe:
failureThreshold: 3
httpGet:
path: /healthz
port: 10254
scheme: HTTP
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 1
---
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: default-http-backend
labels:
app: default-http-backend
namespace: ingress-nginx
spec:
replicas: 1
template:
metadata:
labels:
app: default-http-backend
spec:
terminationGracePeriodSeconds: 60
containers:
- name: default-http-backend
# Any image is permissable as long as:
# 1. It serves a 404 page at /
# 2. It serves 200 on a /healthz endpoint
image: gcr.io/google_containers/defaultbackend:1.4
livenessProbe:
httpGet:
path: /healthz
port: 8080
scheme: HTTP
initialDelaySeconds: 30
timeoutSeconds: 5
ports:
- containerPort: 8080
resources:
limits:
cpu: 10m
memory: 20Mi
requests:
cpu: 10m
memory: 20Mi
---
apiVersion: v1
kind: Service
metadata:
name: default-http-backend
namespace: ingress-nginx
labels:
app: default-http-backend
spec:
ports:
- port: 80
targetPort: 8080
selector:
app: default-http-backend
`

View File

@ -5,7 +5,7 @@ import (
"text/template"
)
func CompileTemplateFromMap(tmplt string, configMap map[string]string) (string, error) {
func CompileTemplateFromMap(tmplt string, configMap interface{}) (string, error) {
out := new(bytes.Buffer)
t := template.Must(template.New("compiled_template").Parse(tmplt))
if err := t.Execute(out, configMap); err != nil {

View File

@ -87,7 +87,7 @@ items:
seLinuxOptions: {}
serviceAccountName: weave-net
tolerations:
- effect: NoSchedule
- effect: NoExecute
operator: Exists
volumes:
- name: weavedb