1
0
mirror of https://github.com/rancher/rke.git synced 2025-08-29 19:53:12 +00:00

add logic for external aws cloud provider

This commit is contained in:
Kinara Shah 2023-10-16 12:42:42 -07:00 committed by Jiaqi Luo
parent 49b67c0fe0
commit 1ce3e2aa50
9 changed files with 107 additions and 109 deletions

View File

@ -492,7 +492,7 @@ func (c *Cluster) doAddonDeploy(ctx context.Context, addonYaml, resourceName str
if err != nil {
return &addonError{fmt.Sprintf("%v", err), isCritical}
}
node, err := k8s.GetNode(k8sClient, c.ControlPlaneHosts[0].HostnameOverride)
node, err := k8s.GetNode(k8sClient, c.ControlPlaneHosts[0].HostnameOverride, c.CloudProvider.Name)
if err != nil {
return &addonError{fmt.Sprintf("Failed to get Node [%s]: %v", c.ControlPlaneHosts[0].HostnameOverride, err), isCritical}
}
@ -513,7 +513,7 @@ func (c *Cluster) doAddonDelete(ctx context.Context, resourceName string, isCrit
if err != nil {
return &addonError{fmt.Sprintf("%v", err), isCritical}
}
node, err := k8s.GetNode(k8sClient, c.ControlPlaneHosts[0].HostnameOverride)
node, err := k8s.GetNode(k8sClient, c.ControlPlaneHosts[0].HostnameOverride, c.CloudProvider.Name)
if err != nil {
return &addonError{fmt.Sprintf("Failed to get Node [%s]: %v", c.ControlPlaneHosts[0].HostnameOverride, err), isCritical}
}

View File

@ -189,7 +189,7 @@ func (c *Cluster) UpgradeControlPlane(ctx context.Context, kubeClient *kubernete
continue
}
// find existing nodes that are in NotReady state
if err := services.CheckNodeReady(kubeClient, host, services.ControlRole); err != nil {
if err := services.CheckNodeReady(kubeClient, host, services.ControlRole, c.CloudProvider.Name); err != nil {
logrus.Debugf("Found node %v in NotReady state", host.HostnameOverride)
notReadyHosts = append(notReadyHosts, host)
notReadyHostNames = append(notReadyHostNames, host.HostnameOverride)
@ -223,7 +223,7 @@ func (c *Cluster) UpgradeControlPlane(ctx context.Context, kubeClient *kubernete
}
// Calling CheckNodeReady wil give some time for nodes to get in Ready state
for _, host := range notReadyHosts {
err = services.CheckNodeReady(kubeClient, host, services.ControlRole)
err = services.CheckNodeReady(kubeClient, host, services.ControlRole, c.CloudProvider.Name)
if err != nil {
logrus.Errorf("Host %v failed to report Ready status with error: %v", host.HostnameOverride, err)
}
@ -236,7 +236,8 @@ func (c *Cluster) UpgradeControlPlane(ctx context.Context, kubeClient *kubernete
cpNodePlanMap,
c.UpdateWorkersOnly,
c.SystemImages.Alpine,
c.Certificates, c.UpgradeStrategy, c.NewHosts, inactiveHosts, c.MaxUnavailableForControlNodes, c.Version)
c.Certificates, c.UpgradeStrategy, c.NewHosts, inactiveHosts, c.MaxUnavailableForControlNodes,
c.Version, c.CloudProvider.Name)
if err != nil {
return "", fmt.Errorf("[controlPlane] Failed to upgrade Control Plane: %v", err)
}
@ -310,7 +311,7 @@ func (c *Cluster) UpgradeWorkerPlane(ctx context.Context, kubeClient *kubernetes
continue
}
// find existing nodes that are in NotReady state
if err := services.CheckNodeReady(kubeClient, host, services.WorkerRole); err != nil {
if err := services.CheckNodeReady(kubeClient, host, services.WorkerRole, c.CloudProvider.Name); err != nil {
logrus.Debugf("Found node %v in NotReady state", host.HostnameOverride)
notReadyHosts = append(notReadyHosts, host)
notReadyHostNames = append(notReadyHostNames, host.HostnameOverride)
@ -332,7 +333,7 @@ func (c *Cluster) UpgradeWorkerPlane(ctx context.Context, kubeClient *kubernetes
}
// Calling CheckNodeReady wil give some time for nodes to get in Ready state
for _, host := range notReadyHosts {
err = services.CheckNodeReady(kubeClient, host, services.WorkerRole)
err = services.CheckNodeReady(kubeClient, host, services.WorkerRole, c.CloudProvider.Name)
if err != nil {
logrus.Errorf("Host %v failed to report Ready status with error: %v", host.HostnameOverride, err)
}
@ -349,7 +350,8 @@ func (c *Cluster) UpgradeWorkerPlane(ctx context.Context, kubeClient *kubernetes
c.UpgradeStrategy,
c.NewHosts,
c.MaxUnavailableForWorkerNodes,
c.Version)
c.Version,
c.CloudProvider.Name)
if err != nil {
return "", fmt.Errorf("[workerPlane] Failed to upgrade Worker Plane: %v", err)
}
@ -994,7 +996,7 @@ func (c *Cluster) SyncLabelsAndTaints(ctx context.Context, currentCluster *Clust
var errs []error
for host := range hostQueue {
logrus.Debugf("worker [%d] starting sync for node [%s]", w, host.HostnameOverride)
if err := setNodeAnnotationsLabelsTaints(k8sClient, host); err != nil {
if err := setNodeAnnotationsLabelsTaints(k8sClient, host, c.CloudProvider.Name); err != nil {
errs = append(errs, err)
}
}
@ -1012,11 +1014,11 @@ func (c *Cluster) SyncLabelsAndTaints(ctx context.Context, currentCluster *Clust
return nil
}
func setNodeAnnotationsLabelsTaints(k8sClient *kubernetes.Clientset, host *hosts.Host) error {
func setNodeAnnotationsLabelsTaints(k8sClient *kubernetes.Clientset, host *hosts.Host, cloudProviderName string) error {
node := &v1.Node{}
var err error
for retries := 0; retries <= 5; retries++ {
node, err = k8s.GetNode(k8sClient, host.HostnameOverride)
node, err = k8s.GetNode(k8sClient, host.HostnameOverride, cloudProviderName)
if err != nil {
logrus.Debugf("[hosts] Can't find node by name [%s], error: %v", host.HostnameOverride, err)
time.Sleep(2 * time.Second)

View File

@ -9,6 +9,7 @@ import (
"github.com/blang/semver"
"github.com/rancher/rke/cloudprovider"
"github.com/rancher/rke/cloudprovider/aws"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/k8s"
"github.com/rancher/rke/log"
@ -1061,11 +1062,25 @@ func (c *Cluster) setCloudProvider() error {
if p != nil {
c.CloudConfigFile, err = p.GenerateCloudConfigFile()
if err != nil {
return fmt.Errorf("Failed to parse cloud config file: %v", err)
return fmt.Errorf("failed to parse cloud config file: %v", err)
}
c.CloudProvider.Name = p.GetName()
if c.CloudProvider.Name == "" {
return fmt.Errorf("Name of the cloud provider is not defined for custom provider")
return fmt.Errorf("name of the cloud provider is not defined for custom provider")
}
if c.CloudProvider.Name == aws.AWSCloudProviderName {
clusterVersion, err := getClusterVersion(c.Version)
if err != nil {
return fmt.Errorf("failed to get cluster version for checking cloud provider: %v", err)
}
// cloud provider must be external or external-aws for >=1.27
defaultExternalAwsRange, err := semver.ParseRange(">=1.27.0-rancher0")
if err != nil {
return fmt.Errorf("failed to parse semver range for checking cloud provider %v", err)
}
if defaultExternalAwsRange(clusterVersion) {
return fmt.Errorf(fmt.Sprintf("Cloud provider %s is invalid for [%s]", aws.AWSCloudProviderName, c.Version))
}
}
}
return nil

View File

@ -13,6 +13,7 @@ import (
"github.com/blang/semver"
"github.com/docker/docker/api/types"
"github.com/rancher/rke/cloudprovider/aws"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/k8s"
@ -180,7 +181,7 @@ func (c *Cluster) BuildKubeAPIProcess(host *hosts.Host, serviceOptions v3.Kubern
CommandArgs := map[string]string{
"admission-control-config-file": DefaultKubeAPIArgAdmissionControlConfigFileValue,
"client-ca-file": pki.GetCertPath(pki.CACertName),
"cloud-provider": c.CloudProvider.Name,
"cloud-provider": getCloudProviderName(c.CloudProvider.Name),
"etcd-cafile": etcdCAClientCert,
"etcd-certfile": etcdClientCert,
"etcd-keyfile": etcdClientKey,
@ -345,7 +346,7 @@ func (c *Cluster) BuildKubeAPIProcess(host *hosts.Host, serviceOptions v3.Kubern
func (c *Cluster) BuildKubeControllerProcess(host *hosts.Host, serviceOptions v3.KubernetesServicesOptions) v3.Process {
Command := c.getRKEToolsEntryPoint(host.OS(), "kube-controller-manager")
CommandArgs := map[string]string{
"cloud-provider": c.CloudProvider.Name,
"cloud-provider": getCloudProviderName(c.CloudProvider.Name),
"cluster-cidr": c.ClusterCIDR,
"kubeconfig": pki.GetConfigPath(pki.KubeControllerCertName),
"root-ca-file": pki.GetCertPath(pki.CACertName),
@ -464,7 +465,7 @@ func (c *Cluster) BuildKubeletProcess(host *hosts.Host, serviceOptions v3.Kubern
Command := c.getRKEToolsEntryPoint(host.OS(), "kubelet")
CommandArgs := map[string]string{
"client-ca-file": pki.GetCertPath(pki.CACertName),
"cloud-provider": c.CloudProvider.Name,
"cloud-provider": getCloudProviderName(c.CloudProvider.Name),
"cluster-dns": c.ClusterDNSServer,
"cluster-domain": c.ClusterDomain,
"fail-swap-on": strconv.FormatBool(kubelet.FailSwapOn),
@ -496,6 +497,11 @@ func (c *Cluster) BuildKubeletProcess(host *hosts.Host, serviceOptions v3.Kubern
if host.IsWindows() { // compatible with Windows
CommandArgs["cloud-config"] = path.Join(host.PrefixPath, cloudConfigFileName)
}
if c.CloudProvider.Name == k8s.ExternalAWSCloudProviderName && c.CloudProvider.UseInstanceMetadataHostname != nil && *c.CloudProvider.UseInstanceMetadataHostname {
// rke-tools will inject hostname-override from ec2 instance metadata to match with the spec.nodeName set by cloud provider https://github.com/rancher/rke-tools/blob/3eab4f07aa97a8aeeaaef55b1b7bbc82e2a3374a/entrypoint.sh#L17
delete(CommandArgs, "hostname-override")
}
}
if c.IsKubeletGenerateServingCertificateEnabled() {
@ -695,7 +701,8 @@ func (c *Cluster) BuildKubeProxyProcess(host *hosts.Host, serviceOptions v3.Kube
} else {
CommandArgs["bind-address"] = host.Address
}
if c.CloudProvider.Name == k8s.AWSCloudProvider && c.CloudProvider.UseInstanceMetadataHostname != nil && *c.CloudProvider.UseInstanceMetadataHostname {
if (c.CloudProvider.Name == k8s.ExternalAWSCloudProviderName || c.CloudProvider.Name == aws.AWSCloudProviderName) &&
c.CloudProvider.UseInstanceMetadataHostname != nil && *c.CloudProvider.UseInstanceMetadataHostname {
// rke-tools will inject hostname-override from ec2 instance metadata to match with the spec.nodeName set by cloud provider https://github.com/rancher/rke-tools/blob/3eab4f07aa97a8aeeaaef55b1b7bbc82e2a3374a/entrypoint.sh#L17
delete(CommandArgs, "hostname-override")
}
@ -1289,3 +1296,10 @@ func (c *Cluster) IsCRIDockerdEnabled() bool {
}
return false
}
func getCloudProviderName(name string) string {
if name == k8s.ExternalAWSCloudProviderName {
return "external"
}
return name
}

View File

@ -191,13 +191,13 @@ func (h *Host) ProcessFilter(processes map[string]v3.Process) map[string]v3.Proc
return processes
}
func DeleteNode(ctx context.Context, toDeleteHost *Host, kubeClient *kubernetes.Clientset, hasAnotherRole bool, cloudProvider string) error {
func DeleteNode(ctx context.Context, toDeleteHost *Host, kubeClient *kubernetes.Clientset, hasAnotherRole bool, cloudProviderName string) error {
if hasAnotherRole {
log.Infof(ctx, "[hosts] host [%s] has another role, skipping delete from kubernetes cluster", toDeleteHost.Address)
return nil
}
log.Infof(ctx, "[hosts] Cordoning host [%s]", toDeleteHost.Address)
if _, err := k8s.GetNode(kubeClient, toDeleteHost.HostnameOverride); err != nil {
if _, err := k8s.GetNode(kubeClient, toDeleteHost.HostnameOverride, cloudProviderName); err != nil {
if apierrors.IsNotFound(err) {
log.Warnf(ctx, "[hosts] Can't find node by name [%s]", toDeleteHost.Address)
return nil
@ -205,26 +205,17 @@ func DeleteNode(ctx context.Context, toDeleteHost *Host, kubeClient *kubernetes.
return err
}
if err := k8s.CordonUncordon(kubeClient, toDeleteHost.HostnameOverride, true); err != nil {
if err := k8s.CordonUncordon(kubeClient, toDeleteHost.HostnameOverride, cloudProviderName, true); err != nil {
return err
}
log.Infof(ctx, "[hosts] Deleting host [%s] from the cluster", toDeleteHost.Address)
if err := k8s.DeleteNode(kubeClient, toDeleteHost.HostnameOverride, cloudProvider); err != nil {
if err := k8s.DeleteNode(kubeClient, toDeleteHost.HostnameOverride, cloudProviderName); err != nil {
return err
}
log.Infof(ctx, "[hosts] Successfully deleted host [%s] from the cluster", toDeleteHost.Address)
return nil
}
func RemoveTaintFromHost(ctx context.Context, host *Host, taintKey string, kubeClient *kubernetes.Clientset) error {
log.Infof(ctx, "[hosts] removing taint [%s] from host [%s]", taintKey, host.Address)
if err := k8s.RemoveTaintFromNodeByKey(kubeClient, host.HostnameOverride, taintKey); err != nil {
return err
}
log.Infof(ctx, "[hosts] Successfully deleted taint [%s] from host [%s]", taintKey, host.Address)
return nil
}
func GetToDeleteHosts(currentHosts, configHosts, inactiveHosts []*Host, includeInactive bool) []*Host {
toDeleteHosts := []*Host{}
for _, currentHost := range currentHosts {

View File

@ -15,18 +15,19 @@ import (
)
const (
HostnameLabel = "kubernetes.io/hostname"
InternalAddressAnnotation = "rke.cattle.io/internal-ip"
ExternalAddressAnnotation = "rke.cattle.io/external-ip"
AWSCloudProvider = "aws"
MaxRetries = 5
RetryInterval = 5
HostnameLabel = "kubernetes.io/hostname"
InternalAddressAnnotation = "rke.cattle.io/internal-ip"
ExternalAddressAnnotation = "rke.cattle.io/external-ip"
AWSCloudProvider = "aws"
ExternalAWSCloudProviderName = "external-aws"
MaxRetries = 5
RetryInterval = 5
)
func DeleteNode(k8sClient *kubernetes.Clientset, nodeName, cloudProvider string) error {
func DeleteNode(k8sClient *kubernetes.Clientset, nodeName, cloudProviderName string) error {
// If cloud provider is configured, the node name can be set by the cloud provider, which can be different from the original node name
if cloudProvider != "" {
node, err := GetNode(k8sClient, nodeName)
if cloudProviderName != "" {
node, err := GetNode(k8sClient, nodeName, cloudProviderName)
if err != nil {
return err
}
@ -39,7 +40,7 @@ func GetNodeList(k8sClient *kubernetes.Clientset) (*v1.NodeList, error) {
return k8sClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
}
func GetNode(k8sClient *kubernetes.Clientset, nodeName string) (*v1.Node, error) {
func GetNode(k8sClient *kubernetes.Clientset, nodeName, cloudProviderName string) (*v1.Node, error) {
var listErr error
for retries := 0; retries < MaxRetries; retries++ {
logrus.Debugf("Checking node list for node [%v], try #%v", nodeName, retries+1)
@ -55,6 +56,14 @@ func GetNode(k8sClient *kubernetes.Clientset, nodeName string) (*v1.Node, error)
if strings.ToLower(node.Labels[HostnameLabel]) == strings.ToLower(nodeName) {
return &node, nil
}
if cloudProviderName == ExternalAWSCloudProviderName {
logrus.Debugf("Checking hostname address for node [%v], cloud provider: %v", nodeName, cloudProviderName)
for _, addr := range node.Status.Addresses {
if addr.Type == v1.NodeHostName && strings.ToLower(node.Labels[HostnameLabel]) == addr.Address {
return &node, nil
}
}
}
}
time.Sleep(time.Second * RetryInterval)
}
@ -64,10 +73,10 @@ func GetNode(k8sClient *kubernetes.Clientset, nodeName string) (*v1.Node, error)
return nil, apierrors.NewNotFound(schema.GroupResource{}, nodeName)
}
func CordonUncordon(k8sClient *kubernetes.Clientset, nodeName string, cordoned bool) error {
func CordonUncordon(k8sClient *kubernetes.Clientset, nodeName string, cloudProviderName string, cordoned bool) error {
updated := false
for retries := 0; retries < MaxRetries; retries++ {
node, err := GetNode(k8sClient, nodeName)
node, err := GetNode(k8sClient, nodeName, cloudProviderName)
if err != nil {
logrus.Debugf("Error getting node %s: %v", nodeName, err)
// no need to retry here since GetNode already retries
@ -102,45 +111,6 @@ func IsNodeReady(node v1.Node) bool {
return false
}
func RemoveTaintFromNodeByKey(k8sClient *kubernetes.Clientset, nodeName, taintKey string) error {
updated := false
var err error
var node *v1.Node
for retries := 0; retries <= 5; retries++ {
node, err = GetNode(k8sClient, nodeName)
if err != nil {
if apierrors.IsNotFound(err) {
logrus.Debugf("[hosts] Can't find node by name [%s]", nodeName)
return nil
}
return err
}
foundTaint := false
for i, taint := range node.Spec.Taints {
if taint.Key == taintKey {
foundTaint = true
node.Spec.Taints = append(node.Spec.Taints[:i], node.Spec.Taints[i+1:]...)
break
}
}
if !foundTaint {
return nil
}
_, err = k8sClient.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{})
if err != nil {
logrus.Debugf("Error updating node [%s] with new set of taints: %v", node.Name, err)
time.Sleep(time.Second * 5)
continue
}
updated = true
break
}
if !updated {
return fmt.Errorf("Timeout waiting for node [%s] to be updated with new set of taints: %v", node.Name, err)
}
return nil
}
func SyncNodeLabels(node *v1.Node, toAddLabels, toDelLabels map[string]string) {
oldLabels := map[string]string{}
if node.Labels == nil {

View File

@ -52,7 +52,7 @@ func RunControlPlane(ctx context.Context, controlHosts []*hosts.Host, localConnD
func UpgradeControlPlaneNodes(ctx context.Context, kubeClient *kubernetes.Clientset, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory,
prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string, certMap map[string]pki.CertificatePKI,
upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool, maxUnavailable int, k8sVersion string) (string, error) {
upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool, maxUnavailable int, k8sVersion, cloudProviderName string) (string, error) {
if updateWorkersOnly {
return "", nil
}
@ -83,7 +83,7 @@ func UpgradeControlPlaneNodes(ctx context.Context, kubeClient *kubernetes.Client
inactiveHostErr = fmt.Errorf("provisioning incomplete, host(s) [%s] skipped because they could not be contacted", strings.Join(inactiveHostNames, ","))
}
hostsFailedToUpgrade, err := processControlPlaneForUpgrade(ctx, kubeClient, controlHosts, localConnDialerFactory, prsMap, cpNodePlanMap, updateWorkersOnly, alpineImage, certMap,
upgradeStrategy, newHosts, inactiveHosts, maxUnavailable, drainHelper, k8sVersion)
upgradeStrategy, newHosts, inactiveHosts, maxUnavailable, drainHelper, k8sVersion, cloudProviderName)
if err != nil || inactiveHostErr != nil {
if len(hostsFailedToUpgrade) > 0 {
logrus.Errorf("Failed to upgrade hosts: %v with error %v", strings.Join(hostsFailedToUpgrade, ","), err)
@ -103,7 +103,7 @@ func UpgradeControlPlaneNodes(ctx context.Context, kubeClient *kubernetes.Client
func processControlPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Clientset, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory,
prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string, certMap map[string]pki.CertificatePKI,
upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool, maxUnavailable int, drainHelper drain.Helper, k8sVersion string) ([]string, error) {
upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool, maxUnavailable int, drainHelper drain.Helper, k8sVersion, cloudProviderName string) ([]string, error) {
var errgrp errgroup.Group
var failedHosts []string
var hostsFailedToUpgrade = make(chan string, maxUnavailable)
@ -130,7 +130,7 @@ func processControlPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.C
}
continue
}
if err := CheckNodeReady(kubeClient, runHost, ControlRole); err != nil {
if err := CheckNodeReady(kubeClient, runHost, ControlRole, cloudProviderName); err != nil {
errList = append(errList, err)
hostsFailedToUpgrade <- runHost.HostnameOverride
hostsFailed.Store(runHost.HostnameOverride, true)
@ -165,7 +165,7 @@ func processControlPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.C
}
if !controlPlaneUpgradable && !workerPlaneUpgradable {
log.Infof(ctx, "Upgrade not required for controlplane and worker components of host %v", runHost.HostnameOverride)
if err := k8s.CordonUncordon(kubeClient, runHost.HostnameOverride, false); err != nil {
if err := k8s.CordonUncordon(kubeClient, runHost.HostnameOverride, cloudProviderName, false); err != nil {
// This node didn't undergo an upgrade, so RKE will only log any error after uncordoning it and won't count this in maxUnavailable
logrus.Errorf("[controlplane] Failed to uncordon node %v, error: %v", runHost.HostnameOverride, err)
}
@ -173,7 +173,8 @@ func processControlPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.C
}
shouldDrain := upgradeStrategy.Drain != nil && *upgradeStrategy.Drain
if err := upgradeControlHost(ctx, kubeClient, runHost, shouldDrain, drainHelper, localConnDialerFactory, prsMap, cpNodePlanMap, updateWorkersOnly, alpineImage, certMap, controlPlaneUpgradable, workerPlaneUpgradable, k8sVersion); err != nil {
if err := upgradeControlHost(ctx, kubeClient, runHost, shouldDrain, drainHelper, localConnDialerFactory, prsMap, cpNodePlanMap, updateWorkersOnly,
alpineImage, certMap, controlPlaneUpgradable, workerPlaneUpgradable, k8sVersion, cloudProviderName); err != nil {
errList = append(errList, err)
hostsFailedToUpgrade <- runHost.HostnameOverride
hostsFailed.Store(runHost.HostnameOverride, true)
@ -216,8 +217,8 @@ func checkHostUpgradable(ctx context.Context, runHost *hosts.Host, cpNodePlanMap
func upgradeControlHost(ctx context.Context, kubeClient *kubernetes.Clientset, host *hosts.Host, drain bool, drainHelper drain.Helper,
localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool,
alpineImage string, certMap map[string]pki.CertificatePKI, controlPlaneUpgradable, workerPlaneUpgradable bool, k8sVersion string) error {
if err := cordonAndDrainNode(kubeClient, host, drain, drainHelper, ControlRole); err != nil {
alpineImage string, certMap map[string]pki.CertificatePKI, controlPlaneUpgradable, workerPlaneUpgradable bool, k8sVersion, cloudProviderName string) error {
if err := cordonAndDrainNode(kubeClient, host, drain, drainHelper, ControlRole, cloudProviderName); err != nil {
return err
}
if controlPlaneUpgradable {
@ -233,10 +234,10 @@ func upgradeControlHost(ctx context.Context, kubeClient *kubernetes.Clientset, h
}
}
if err := CheckNodeReady(kubeClient, host, ControlRole); err != nil {
if err := CheckNodeReady(kubeClient, host, ControlRole, cloudProviderName); err != nil {
return err
}
return k8s.CordonUncordon(kubeClient, host.HostnameOverride, false)
return k8s.CordonUncordon(kubeClient, host.HostnameOverride, cloudProviderName, false)
}
func RemoveControlPlane(ctx context.Context, controlHosts []*hosts.Host, force bool) error {

View File

@ -17,10 +17,10 @@ import (
"k8s.io/kubectl/pkg/drain"
)
func CheckNodeReady(kubeClient *kubernetes.Clientset, runHost *hosts.Host, component string) error {
func CheckNodeReady(kubeClient *kubernetes.Clientset, runHost *hosts.Host, component, cloudProviderName string) error {
for retries := 0; retries < k8s.MaxRetries; retries++ {
logrus.Infof("[%s] Now checking status of node %v, try #%v", component, runHost.HostnameOverride, retries+1)
k8sNode, err := k8s.GetNode(kubeClient, runHost.HostnameOverride)
k8sNode, err := k8s.GetNode(kubeClient, runHost.HostnameOverride, cloudProviderName)
if err != nil {
return fmt.Errorf("[%s] Error getting node %v: %v", component, runHost.HostnameOverride, err)
}
@ -33,9 +33,9 @@ func CheckNodeReady(kubeClient *kubernetes.Clientset, runHost *hosts.Host, compo
return fmt.Errorf("host %v not ready", runHost.HostnameOverride)
}
func cordonAndDrainNode(kubeClient *kubernetes.Clientset, host *hosts.Host, drainNode bool, drainHelper drain.Helper, component string) error {
func cordonAndDrainNode(kubeClient *kubernetes.Clientset, host *hosts.Host, drainNode bool, drainHelper drain.Helper, component, cloudProviderName string) error {
logrus.Debugf("[%s] Cordoning node %v", component, host.HostnameOverride)
if err := k8s.CordonUncordon(kubeClient, host.HostnameOverride, true); err != nil {
if err := k8s.CordonUncordon(kubeClient, host.HostnameOverride, cloudProviderName, true); err != nil {
return err
}
if !drainNode {

View File

@ -52,14 +52,17 @@ func RunWorkerPlane(ctx context.Context, allHosts []*hosts.Host, localConnDialer
return nil
}
func UpgradeWorkerPlaneForWorkerAndEtcdNodes(ctx context.Context, kubeClient *kubernetes.Clientset, mixedRolesHosts []*hosts.Host, workerOnlyHosts []*hosts.Host, inactiveHosts map[string]bool, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts map[string]bool, maxUnavailable int, k8sVersion string) (string, error) {
func UpgradeWorkerPlaneForWorkerAndEtcdNodes(ctx context.Context, kubeClient *kubernetes.Clientset, mixedRolesHosts []*hosts.Host, workerOnlyHosts []*hosts.Host, inactiveHosts map[string]bool, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI,
updateWorkersOnly bool, alpineImage string, upgradeStrategy *v3.NodeUpgradeStrategy,
newHosts map[string]bool, maxUnavailable int, k8sVersion, cloudProviderName string) (string, error) {
log.Infof(ctx, "[%s] Upgrading Worker Plane..", WorkerRole)
var errMsgMaxUnavailableNotFailed string
updateNewHostsList(kubeClient, append(mixedRolesHosts, workerOnlyHosts...), newHosts)
updateNewHostsList(kubeClient, append(mixedRolesHosts, workerOnlyHosts...), newHosts, cloudProviderName)
if len(mixedRolesHosts) > 0 {
log.Infof(ctx, "First checking and processing worker components for upgrades on nodes with etcd role one at a time")
}
multipleRolesHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, mixedRolesHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, 1, upgradeStrategy, newHosts, inactiveHosts, k8sVersion)
multipleRolesHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, mixedRolesHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage,
1, upgradeStrategy, newHosts, inactiveHosts, k8sVersion, cloudProviderName)
if err != nil {
logrus.Errorf("Failed to upgrade hosts: %v with error %v", strings.Join(multipleRolesHostsFailedToUpgrade, ","), err)
return errMsgMaxUnavailableNotFailed, err
@ -68,7 +71,8 @@ func UpgradeWorkerPlaneForWorkerAndEtcdNodes(ctx context.Context, kubeClient *ku
if len(workerOnlyHosts) > 0 {
log.Infof(ctx, "Now checking and upgrading worker components on nodes with only worker role %v at a time", maxUnavailable)
}
workerOnlyHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, workerOnlyHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, maxUnavailable, upgradeStrategy, newHosts, inactiveHosts, k8sVersion)
workerOnlyHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, workerOnlyHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage,
maxUnavailable, upgradeStrategy, newHosts, inactiveHosts, k8sVersion, cloudProviderName)
if err != nil {
logrus.Errorf("Failed to upgrade hosts: %v with error %v", strings.Join(workerOnlyHostsFailedToUpgrade, ","), err)
if len(workerOnlyHostsFailedToUpgrade) >= maxUnavailable {
@ -81,9 +85,9 @@ func UpgradeWorkerPlaneForWorkerAndEtcdNodes(ctx context.Context, kubeClient *ku
return errMsgMaxUnavailableNotFailed, nil
}
func updateNewHostsList(kubeClient *kubernetes.Clientset, allHosts []*hosts.Host, newHosts map[string]bool) {
func updateNewHostsList(kubeClient *kubernetes.Clientset, allHosts []*hosts.Host, newHosts map[string]bool, cloudProviderName string) {
for _, h := range allHosts {
_, err := k8s.GetNode(kubeClient, h.HostnameOverride)
_, err := k8s.GetNode(kubeClient, h.HostnameOverride, cloudProviderName)
if err != nil && apierrors.IsNotFound(err) {
// this host could have been added to cluster state upon successful controlplane upgrade but isn't a node yet.
newHosts[h.HostnameOverride] = true
@ -93,7 +97,7 @@ func updateNewHostsList(kubeClient *kubernetes.Clientset, allHosts []*hosts.Host
func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Clientset, allHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory,
prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string,
maxUnavailable int, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool, k8sVersion string) ([]string, error) {
maxUnavailable int, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool, k8sVersion, cloudProviderName string) ([]string, error) {
var errgrp errgroup.Group
var drainHelper drain.Helper
var failedHosts []string
@ -128,7 +132,7 @@ func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Cl
}
continue
}
if err := CheckNodeReady(kubeClient, runHost, WorkerRole); err != nil {
if err := CheckNodeReady(kubeClient, runHost, WorkerRole, cloudProviderName); err != nil {
errList = append(errList, err)
hostsFailed.Store(runHost.HostnameOverride, true)
hostsFailedToUpgrade <- runHost.HostnameOverride
@ -163,13 +167,14 @@ func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Cl
}
if !upgradable {
logrus.Infof("[workerplane] Upgrade not required for worker components of host %v", runHost.HostnameOverride)
if err := k8s.CordonUncordon(kubeClient, runHost.HostnameOverride, false); err != nil {
if err := k8s.CordonUncordon(kubeClient, runHost.HostnameOverride, cloudProviderName, false); err != nil {
// This node didn't undergo an upgrade, so RKE will only log any error after uncordoning it and won't count this in maxUnavailable
logrus.Errorf("[workerplane] Failed to uncordon node %v, error: %v", runHost.HostnameOverride, err)
}
continue
}
if err := upgradeWorkerHost(ctx, kubeClient, runHost, upgradeStrategy.Drain != nil && *upgradeStrategy.Drain, drainHelper, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, k8sVersion); err != nil {
if err := upgradeWorkerHost(ctx, kubeClient, runHost, upgradeStrategy.Drain != nil && *upgradeStrategy.Drain, drainHelper, localConnDialerFactory, prsMap, workerNodePlanMap, certMap,
updateWorkersOnly, alpineImage, k8sVersion, cloudProviderName); err != nil {
errList = append(errList, err)
hostsFailed.Store(runHost.HostnameOverride, true)
hostsFailedToUpgrade <- runHost.HostnameOverride
@ -192,9 +197,9 @@ func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Cl
func upgradeWorkerHost(ctx context.Context, kubeClient *kubernetes.Clientset, runHost *hosts.Host, drainFlag bool, drainHelper drain.Helper,
localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool,
alpineImage, k8sVersion string) error {
alpineImage, k8sVersion, cloudProviderName string) error {
// cordon and drain
if err := cordonAndDrainNode(kubeClient, runHost, drainFlag, drainHelper, WorkerRole); err != nil {
if err := cordonAndDrainNode(kubeClient, runHost, drainFlag, drainHelper, WorkerRole, cloudProviderName); err != nil {
return err
}
logrus.Debugf("[workerplane] upgrading host %v", runHost.HostnameOverride)
@ -202,11 +207,11 @@ func upgradeWorkerHost(ctx context.Context, kubeClient *kubernetes.Clientset, ru
return err
}
// consider upgrade done when kubeclient lists node as ready
if err := CheckNodeReady(kubeClient, runHost, WorkerRole); err != nil {
if err := CheckNodeReady(kubeClient, runHost, WorkerRole, cloudProviderName); err != nil {
return err
}
// uncordon node
return k8s.CordonUncordon(kubeClient, runHost.HostnameOverride, false)
return k8s.CordonUncordon(kubeClient, runHost.HostnameOverride, cloudProviderName, false)
}
func doDeployWorkerPlaneHost(ctx context.Context, host *hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, processMap map[string]v3.Process, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage, k8sVersion string) error {