From 1ce3e2aa50d87429e270c86964c0ef241bcfc21a Mon Sep 17 00:00:00 2001 From: Kinara Shah Date: Mon, 16 Oct 2023 12:42:42 -0700 Subject: [PATCH] add logic for external aws cloud provider --- cluster/addons.go | 4 +-- cluster/cluster.go | 20 ++++++----- cluster/defaults.go | 19 +++++++++-- cluster/plan.go | 22 +++++++++--- hosts/hosts.go | 17 +++------- k8s/node.go | 72 ++++++++++++---------------------------- services/controlplane.go | 21 ++++++------ services/node_util.go | 8 ++--- services/workerplane.go | 33 ++++++++++-------- 9 files changed, 107 insertions(+), 109 deletions(-) diff --git a/cluster/addons.go b/cluster/addons.go index c08b55a8..7e3457c0 100644 --- a/cluster/addons.go +++ b/cluster/addons.go @@ -492,7 +492,7 @@ func (c *Cluster) doAddonDeploy(ctx context.Context, addonYaml, resourceName str if err != nil { return &addonError{fmt.Sprintf("%v", err), isCritical} } - node, err := k8s.GetNode(k8sClient, c.ControlPlaneHosts[0].HostnameOverride) + node, err := k8s.GetNode(k8sClient, c.ControlPlaneHosts[0].HostnameOverride, c.CloudProvider.Name) if err != nil { return &addonError{fmt.Sprintf("Failed to get Node [%s]: %v", c.ControlPlaneHosts[0].HostnameOverride, err), isCritical} } @@ -513,7 +513,7 @@ func (c *Cluster) doAddonDelete(ctx context.Context, resourceName string, isCrit if err != nil { return &addonError{fmt.Sprintf("%v", err), isCritical} } - node, err := k8s.GetNode(k8sClient, c.ControlPlaneHosts[0].HostnameOverride) + node, err := k8s.GetNode(k8sClient, c.ControlPlaneHosts[0].HostnameOverride, c.CloudProvider.Name) if err != nil { return &addonError{fmt.Sprintf("Failed to get Node [%s]: %v", c.ControlPlaneHosts[0].HostnameOverride, err), isCritical} } diff --git a/cluster/cluster.go b/cluster/cluster.go index 254af7ca..3b3f0efd 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -189,7 +189,7 @@ func (c *Cluster) UpgradeControlPlane(ctx context.Context, kubeClient *kubernete continue } // find existing nodes that are in NotReady state - if err := services.CheckNodeReady(kubeClient, host, services.ControlRole); err != nil { + if err := services.CheckNodeReady(kubeClient, host, services.ControlRole, c.CloudProvider.Name); err != nil { logrus.Debugf("Found node %v in NotReady state", host.HostnameOverride) notReadyHosts = append(notReadyHosts, host) notReadyHostNames = append(notReadyHostNames, host.HostnameOverride) @@ -223,7 +223,7 @@ func (c *Cluster) UpgradeControlPlane(ctx context.Context, kubeClient *kubernete } // Calling CheckNodeReady wil give some time for nodes to get in Ready state for _, host := range notReadyHosts { - err = services.CheckNodeReady(kubeClient, host, services.ControlRole) + err = services.CheckNodeReady(kubeClient, host, services.ControlRole, c.CloudProvider.Name) if err != nil { logrus.Errorf("Host %v failed to report Ready status with error: %v", host.HostnameOverride, err) } @@ -236,7 +236,8 @@ func (c *Cluster) UpgradeControlPlane(ctx context.Context, kubeClient *kubernete cpNodePlanMap, c.UpdateWorkersOnly, c.SystemImages.Alpine, - c.Certificates, c.UpgradeStrategy, c.NewHosts, inactiveHosts, c.MaxUnavailableForControlNodes, c.Version) + c.Certificates, c.UpgradeStrategy, c.NewHosts, inactiveHosts, c.MaxUnavailableForControlNodes, + c.Version, c.CloudProvider.Name) if err != nil { return "", fmt.Errorf("[controlPlane] Failed to upgrade Control Plane: %v", err) } @@ -310,7 +311,7 @@ func (c *Cluster) UpgradeWorkerPlane(ctx context.Context, kubeClient *kubernetes continue } // find existing nodes that are in NotReady state - if err := services.CheckNodeReady(kubeClient, host, services.WorkerRole); err != nil { + if err := services.CheckNodeReady(kubeClient, host, services.WorkerRole, c.CloudProvider.Name); err != nil { logrus.Debugf("Found node %v in NotReady state", host.HostnameOverride) notReadyHosts = append(notReadyHosts, host) notReadyHostNames = append(notReadyHostNames, host.HostnameOverride) @@ -332,7 +333,7 @@ func (c *Cluster) UpgradeWorkerPlane(ctx context.Context, kubeClient *kubernetes } // Calling CheckNodeReady wil give some time for nodes to get in Ready state for _, host := range notReadyHosts { - err = services.CheckNodeReady(kubeClient, host, services.WorkerRole) + err = services.CheckNodeReady(kubeClient, host, services.WorkerRole, c.CloudProvider.Name) if err != nil { logrus.Errorf("Host %v failed to report Ready status with error: %v", host.HostnameOverride, err) } @@ -349,7 +350,8 @@ func (c *Cluster) UpgradeWorkerPlane(ctx context.Context, kubeClient *kubernetes c.UpgradeStrategy, c.NewHosts, c.MaxUnavailableForWorkerNodes, - c.Version) + c.Version, + c.CloudProvider.Name) if err != nil { return "", fmt.Errorf("[workerPlane] Failed to upgrade Worker Plane: %v", err) } @@ -994,7 +996,7 @@ func (c *Cluster) SyncLabelsAndTaints(ctx context.Context, currentCluster *Clust var errs []error for host := range hostQueue { logrus.Debugf("worker [%d] starting sync for node [%s]", w, host.HostnameOverride) - if err := setNodeAnnotationsLabelsTaints(k8sClient, host); err != nil { + if err := setNodeAnnotationsLabelsTaints(k8sClient, host, c.CloudProvider.Name); err != nil { errs = append(errs, err) } } @@ -1012,11 +1014,11 @@ func (c *Cluster) SyncLabelsAndTaints(ctx context.Context, currentCluster *Clust return nil } -func setNodeAnnotationsLabelsTaints(k8sClient *kubernetes.Clientset, host *hosts.Host) error { +func setNodeAnnotationsLabelsTaints(k8sClient *kubernetes.Clientset, host *hosts.Host, cloudProviderName string) error { node := &v1.Node{} var err error for retries := 0; retries <= 5; retries++ { - node, err = k8s.GetNode(k8sClient, host.HostnameOverride) + node, err = k8s.GetNode(k8sClient, host.HostnameOverride, cloudProviderName) if err != nil { logrus.Debugf("[hosts] Can't find node by name [%s], error: %v", host.HostnameOverride, err) time.Sleep(2 * time.Second) diff --git a/cluster/defaults.go b/cluster/defaults.go index e14324e4..740ca952 100644 --- a/cluster/defaults.go +++ b/cluster/defaults.go @@ -9,6 +9,7 @@ import ( "github.com/blang/semver" "github.com/rancher/rke/cloudprovider" + "github.com/rancher/rke/cloudprovider/aws" "github.com/rancher/rke/docker" "github.com/rancher/rke/k8s" "github.com/rancher/rke/log" @@ -1061,11 +1062,25 @@ func (c *Cluster) setCloudProvider() error { if p != nil { c.CloudConfigFile, err = p.GenerateCloudConfigFile() if err != nil { - return fmt.Errorf("Failed to parse cloud config file: %v", err) + return fmt.Errorf("failed to parse cloud config file: %v", err) } c.CloudProvider.Name = p.GetName() if c.CloudProvider.Name == "" { - return fmt.Errorf("Name of the cloud provider is not defined for custom provider") + return fmt.Errorf("name of the cloud provider is not defined for custom provider") + } + if c.CloudProvider.Name == aws.AWSCloudProviderName { + clusterVersion, err := getClusterVersion(c.Version) + if err != nil { + return fmt.Errorf("failed to get cluster version for checking cloud provider: %v", err) + } + // cloud provider must be external or external-aws for >=1.27 + defaultExternalAwsRange, err := semver.ParseRange(">=1.27.0-rancher0") + if err != nil { + return fmt.Errorf("failed to parse semver range for checking cloud provider %v", err) + } + if defaultExternalAwsRange(clusterVersion) { + return fmt.Errorf(fmt.Sprintf("Cloud provider %s is invalid for [%s]", aws.AWSCloudProviderName, c.Version)) + } } } return nil diff --git a/cluster/plan.go b/cluster/plan.go index 02021659..2c1d7113 100644 --- a/cluster/plan.go +++ b/cluster/plan.go @@ -13,6 +13,7 @@ import ( "github.com/blang/semver" "github.com/docker/docker/api/types" + "github.com/rancher/rke/cloudprovider/aws" "github.com/rancher/rke/docker" "github.com/rancher/rke/hosts" "github.com/rancher/rke/k8s" @@ -180,7 +181,7 @@ func (c *Cluster) BuildKubeAPIProcess(host *hosts.Host, serviceOptions v3.Kubern CommandArgs := map[string]string{ "admission-control-config-file": DefaultKubeAPIArgAdmissionControlConfigFileValue, "client-ca-file": pki.GetCertPath(pki.CACertName), - "cloud-provider": c.CloudProvider.Name, + "cloud-provider": getCloudProviderName(c.CloudProvider.Name), "etcd-cafile": etcdCAClientCert, "etcd-certfile": etcdClientCert, "etcd-keyfile": etcdClientKey, @@ -345,7 +346,7 @@ func (c *Cluster) BuildKubeAPIProcess(host *hosts.Host, serviceOptions v3.Kubern func (c *Cluster) BuildKubeControllerProcess(host *hosts.Host, serviceOptions v3.KubernetesServicesOptions) v3.Process { Command := c.getRKEToolsEntryPoint(host.OS(), "kube-controller-manager") CommandArgs := map[string]string{ - "cloud-provider": c.CloudProvider.Name, + "cloud-provider": getCloudProviderName(c.CloudProvider.Name), "cluster-cidr": c.ClusterCIDR, "kubeconfig": pki.GetConfigPath(pki.KubeControllerCertName), "root-ca-file": pki.GetCertPath(pki.CACertName), @@ -464,7 +465,7 @@ func (c *Cluster) BuildKubeletProcess(host *hosts.Host, serviceOptions v3.Kubern Command := c.getRKEToolsEntryPoint(host.OS(), "kubelet") CommandArgs := map[string]string{ "client-ca-file": pki.GetCertPath(pki.CACertName), - "cloud-provider": c.CloudProvider.Name, + "cloud-provider": getCloudProviderName(c.CloudProvider.Name), "cluster-dns": c.ClusterDNSServer, "cluster-domain": c.ClusterDomain, "fail-swap-on": strconv.FormatBool(kubelet.FailSwapOn), @@ -496,6 +497,11 @@ func (c *Cluster) BuildKubeletProcess(host *hosts.Host, serviceOptions v3.Kubern if host.IsWindows() { // compatible with Windows CommandArgs["cloud-config"] = path.Join(host.PrefixPath, cloudConfigFileName) } + + if c.CloudProvider.Name == k8s.ExternalAWSCloudProviderName && c.CloudProvider.UseInstanceMetadataHostname != nil && *c.CloudProvider.UseInstanceMetadataHostname { + // rke-tools will inject hostname-override from ec2 instance metadata to match with the spec.nodeName set by cloud provider https://github.com/rancher/rke-tools/blob/3eab4f07aa97a8aeeaaef55b1b7bbc82e2a3374a/entrypoint.sh#L17 + delete(CommandArgs, "hostname-override") + } } if c.IsKubeletGenerateServingCertificateEnabled() { @@ -695,7 +701,8 @@ func (c *Cluster) BuildKubeProxyProcess(host *hosts.Host, serviceOptions v3.Kube } else { CommandArgs["bind-address"] = host.Address } - if c.CloudProvider.Name == k8s.AWSCloudProvider && c.CloudProvider.UseInstanceMetadataHostname != nil && *c.CloudProvider.UseInstanceMetadataHostname { + if (c.CloudProvider.Name == k8s.ExternalAWSCloudProviderName || c.CloudProvider.Name == aws.AWSCloudProviderName) && + c.CloudProvider.UseInstanceMetadataHostname != nil && *c.CloudProvider.UseInstanceMetadataHostname { // rke-tools will inject hostname-override from ec2 instance metadata to match with the spec.nodeName set by cloud provider https://github.com/rancher/rke-tools/blob/3eab4f07aa97a8aeeaaef55b1b7bbc82e2a3374a/entrypoint.sh#L17 delete(CommandArgs, "hostname-override") } @@ -1289,3 +1296,10 @@ func (c *Cluster) IsCRIDockerdEnabled() bool { } return false } + +func getCloudProviderName(name string) string { + if name == k8s.ExternalAWSCloudProviderName { + return "external" + } + return name +} diff --git a/hosts/hosts.go b/hosts/hosts.go index 86dc1361..721aae8c 100644 --- a/hosts/hosts.go +++ b/hosts/hosts.go @@ -191,13 +191,13 @@ func (h *Host) ProcessFilter(processes map[string]v3.Process) map[string]v3.Proc return processes } -func DeleteNode(ctx context.Context, toDeleteHost *Host, kubeClient *kubernetes.Clientset, hasAnotherRole bool, cloudProvider string) error { +func DeleteNode(ctx context.Context, toDeleteHost *Host, kubeClient *kubernetes.Clientset, hasAnotherRole bool, cloudProviderName string) error { if hasAnotherRole { log.Infof(ctx, "[hosts] host [%s] has another role, skipping delete from kubernetes cluster", toDeleteHost.Address) return nil } log.Infof(ctx, "[hosts] Cordoning host [%s]", toDeleteHost.Address) - if _, err := k8s.GetNode(kubeClient, toDeleteHost.HostnameOverride); err != nil { + if _, err := k8s.GetNode(kubeClient, toDeleteHost.HostnameOverride, cloudProviderName); err != nil { if apierrors.IsNotFound(err) { log.Warnf(ctx, "[hosts] Can't find node by name [%s]", toDeleteHost.Address) return nil @@ -205,26 +205,17 @@ func DeleteNode(ctx context.Context, toDeleteHost *Host, kubeClient *kubernetes. return err } - if err := k8s.CordonUncordon(kubeClient, toDeleteHost.HostnameOverride, true); err != nil { + if err := k8s.CordonUncordon(kubeClient, toDeleteHost.HostnameOverride, cloudProviderName, true); err != nil { return err } log.Infof(ctx, "[hosts] Deleting host [%s] from the cluster", toDeleteHost.Address) - if err := k8s.DeleteNode(kubeClient, toDeleteHost.HostnameOverride, cloudProvider); err != nil { + if err := k8s.DeleteNode(kubeClient, toDeleteHost.HostnameOverride, cloudProviderName); err != nil { return err } log.Infof(ctx, "[hosts] Successfully deleted host [%s] from the cluster", toDeleteHost.Address) return nil } -func RemoveTaintFromHost(ctx context.Context, host *Host, taintKey string, kubeClient *kubernetes.Clientset) error { - log.Infof(ctx, "[hosts] removing taint [%s] from host [%s]", taintKey, host.Address) - if err := k8s.RemoveTaintFromNodeByKey(kubeClient, host.HostnameOverride, taintKey); err != nil { - return err - } - log.Infof(ctx, "[hosts] Successfully deleted taint [%s] from host [%s]", taintKey, host.Address) - return nil -} - func GetToDeleteHosts(currentHosts, configHosts, inactiveHosts []*Host, includeInactive bool) []*Host { toDeleteHosts := []*Host{} for _, currentHost := range currentHosts { diff --git a/k8s/node.go b/k8s/node.go index 53eb3184..c48fc24c 100644 --- a/k8s/node.go +++ b/k8s/node.go @@ -15,18 +15,19 @@ import ( ) const ( - HostnameLabel = "kubernetes.io/hostname" - InternalAddressAnnotation = "rke.cattle.io/internal-ip" - ExternalAddressAnnotation = "rke.cattle.io/external-ip" - AWSCloudProvider = "aws" - MaxRetries = 5 - RetryInterval = 5 + HostnameLabel = "kubernetes.io/hostname" + InternalAddressAnnotation = "rke.cattle.io/internal-ip" + ExternalAddressAnnotation = "rke.cattle.io/external-ip" + AWSCloudProvider = "aws" + ExternalAWSCloudProviderName = "external-aws" + MaxRetries = 5 + RetryInterval = 5 ) -func DeleteNode(k8sClient *kubernetes.Clientset, nodeName, cloudProvider string) error { +func DeleteNode(k8sClient *kubernetes.Clientset, nodeName, cloudProviderName string) error { // If cloud provider is configured, the node name can be set by the cloud provider, which can be different from the original node name - if cloudProvider != "" { - node, err := GetNode(k8sClient, nodeName) + if cloudProviderName != "" { + node, err := GetNode(k8sClient, nodeName, cloudProviderName) if err != nil { return err } @@ -39,7 +40,7 @@ func GetNodeList(k8sClient *kubernetes.Clientset) (*v1.NodeList, error) { return k8sClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) } -func GetNode(k8sClient *kubernetes.Clientset, nodeName string) (*v1.Node, error) { +func GetNode(k8sClient *kubernetes.Clientset, nodeName, cloudProviderName string) (*v1.Node, error) { var listErr error for retries := 0; retries < MaxRetries; retries++ { logrus.Debugf("Checking node list for node [%v], try #%v", nodeName, retries+1) @@ -55,6 +56,14 @@ func GetNode(k8sClient *kubernetes.Clientset, nodeName string) (*v1.Node, error) if strings.ToLower(node.Labels[HostnameLabel]) == strings.ToLower(nodeName) { return &node, nil } + if cloudProviderName == ExternalAWSCloudProviderName { + logrus.Debugf("Checking hostname address for node [%v], cloud provider: %v", nodeName, cloudProviderName) + for _, addr := range node.Status.Addresses { + if addr.Type == v1.NodeHostName && strings.ToLower(node.Labels[HostnameLabel]) == addr.Address { + return &node, nil + } + } + } } time.Sleep(time.Second * RetryInterval) } @@ -64,10 +73,10 @@ func GetNode(k8sClient *kubernetes.Clientset, nodeName string) (*v1.Node, error) return nil, apierrors.NewNotFound(schema.GroupResource{}, nodeName) } -func CordonUncordon(k8sClient *kubernetes.Clientset, nodeName string, cordoned bool) error { +func CordonUncordon(k8sClient *kubernetes.Clientset, nodeName string, cloudProviderName string, cordoned bool) error { updated := false for retries := 0; retries < MaxRetries; retries++ { - node, err := GetNode(k8sClient, nodeName) + node, err := GetNode(k8sClient, nodeName, cloudProviderName) if err != nil { logrus.Debugf("Error getting node %s: %v", nodeName, err) // no need to retry here since GetNode already retries @@ -102,45 +111,6 @@ func IsNodeReady(node v1.Node) bool { return false } -func RemoveTaintFromNodeByKey(k8sClient *kubernetes.Clientset, nodeName, taintKey string) error { - updated := false - var err error - var node *v1.Node - for retries := 0; retries <= 5; retries++ { - node, err = GetNode(k8sClient, nodeName) - if err != nil { - if apierrors.IsNotFound(err) { - logrus.Debugf("[hosts] Can't find node by name [%s]", nodeName) - return nil - } - return err - } - foundTaint := false - for i, taint := range node.Spec.Taints { - if taint.Key == taintKey { - foundTaint = true - node.Spec.Taints = append(node.Spec.Taints[:i], node.Spec.Taints[i+1:]...) - break - } - } - if !foundTaint { - return nil - } - _, err = k8sClient.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{}) - if err != nil { - logrus.Debugf("Error updating node [%s] with new set of taints: %v", node.Name, err) - time.Sleep(time.Second * 5) - continue - } - updated = true - break - } - if !updated { - return fmt.Errorf("Timeout waiting for node [%s] to be updated with new set of taints: %v", node.Name, err) - } - return nil -} - func SyncNodeLabels(node *v1.Node, toAddLabels, toDelLabels map[string]string) { oldLabels := map[string]string{} if node.Labels == nil { diff --git a/services/controlplane.go b/services/controlplane.go index 61b912a6..2bcca06d 100644 --- a/services/controlplane.go +++ b/services/controlplane.go @@ -52,7 +52,7 @@ func RunControlPlane(ctx context.Context, controlHosts []*hosts.Host, localConnD func UpgradeControlPlaneNodes(ctx context.Context, kubeClient *kubernetes.Clientset, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string, certMap map[string]pki.CertificatePKI, - upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool, maxUnavailable int, k8sVersion string) (string, error) { + upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool, maxUnavailable int, k8sVersion, cloudProviderName string) (string, error) { if updateWorkersOnly { return "", nil } @@ -83,7 +83,7 @@ func UpgradeControlPlaneNodes(ctx context.Context, kubeClient *kubernetes.Client inactiveHostErr = fmt.Errorf("provisioning incomplete, host(s) [%s] skipped because they could not be contacted", strings.Join(inactiveHostNames, ",")) } hostsFailedToUpgrade, err := processControlPlaneForUpgrade(ctx, kubeClient, controlHosts, localConnDialerFactory, prsMap, cpNodePlanMap, updateWorkersOnly, alpineImage, certMap, - upgradeStrategy, newHosts, inactiveHosts, maxUnavailable, drainHelper, k8sVersion) + upgradeStrategy, newHosts, inactiveHosts, maxUnavailable, drainHelper, k8sVersion, cloudProviderName) if err != nil || inactiveHostErr != nil { if len(hostsFailedToUpgrade) > 0 { logrus.Errorf("Failed to upgrade hosts: %v with error %v", strings.Join(hostsFailedToUpgrade, ","), err) @@ -103,7 +103,7 @@ func UpgradeControlPlaneNodes(ctx context.Context, kubeClient *kubernetes.Client func processControlPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Clientset, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string, certMap map[string]pki.CertificatePKI, - upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool, maxUnavailable int, drainHelper drain.Helper, k8sVersion string) ([]string, error) { + upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool, maxUnavailable int, drainHelper drain.Helper, k8sVersion, cloudProviderName string) ([]string, error) { var errgrp errgroup.Group var failedHosts []string var hostsFailedToUpgrade = make(chan string, maxUnavailable) @@ -130,7 +130,7 @@ func processControlPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.C } continue } - if err := CheckNodeReady(kubeClient, runHost, ControlRole); err != nil { + if err := CheckNodeReady(kubeClient, runHost, ControlRole, cloudProviderName); err != nil { errList = append(errList, err) hostsFailedToUpgrade <- runHost.HostnameOverride hostsFailed.Store(runHost.HostnameOverride, true) @@ -165,7 +165,7 @@ func processControlPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.C } if !controlPlaneUpgradable && !workerPlaneUpgradable { log.Infof(ctx, "Upgrade not required for controlplane and worker components of host %v", runHost.HostnameOverride) - if err := k8s.CordonUncordon(kubeClient, runHost.HostnameOverride, false); err != nil { + if err := k8s.CordonUncordon(kubeClient, runHost.HostnameOverride, cloudProviderName, false); err != nil { // This node didn't undergo an upgrade, so RKE will only log any error after uncordoning it and won't count this in maxUnavailable logrus.Errorf("[controlplane] Failed to uncordon node %v, error: %v", runHost.HostnameOverride, err) } @@ -173,7 +173,8 @@ func processControlPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.C } shouldDrain := upgradeStrategy.Drain != nil && *upgradeStrategy.Drain - if err := upgradeControlHost(ctx, kubeClient, runHost, shouldDrain, drainHelper, localConnDialerFactory, prsMap, cpNodePlanMap, updateWorkersOnly, alpineImage, certMap, controlPlaneUpgradable, workerPlaneUpgradable, k8sVersion); err != nil { + if err := upgradeControlHost(ctx, kubeClient, runHost, shouldDrain, drainHelper, localConnDialerFactory, prsMap, cpNodePlanMap, updateWorkersOnly, + alpineImage, certMap, controlPlaneUpgradable, workerPlaneUpgradable, k8sVersion, cloudProviderName); err != nil { errList = append(errList, err) hostsFailedToUpgrade <- runHost.HostnameOverride hostsFailed.Store(runHost.HostnameOverride, true) @@ -216,8 +217,8 @@ func checkHostUpgradable(ctx context.Context, runHost *hosts.Host, cpNodePlanMap func upgradeControlHost(ctx context.Context, kubeClient *kubernetes.Clientset, host *hosts.Host, drain bool, drainHelper drain.Helper, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, - alpineImage string, certMap map[string]pki.CertificatePKI, controlPlaneUpgradable, workerPlaneUpgradable bool, k8sVersion string) error { - if err := cordonAndDrainNode(kubeClient, host, drain, drainHelper, ControlRole); err != nil { + alpineImage string, certMap map[string]pki.CertificatePKI, controlPlaneUpgradable, workerPlaneUpgradable bool, k8sVersion, cloudProviderName string) error { + if err := cordonAndDrainNode(kubeClient, host, drain, drainHelper, ControlRole, cloudProviderName); err != nil { return err } if controlPlaneUpgradable { @@ -233,10 +234,10 @@ func upgradeControlHost(ctx context.Context, kubeClient *kubernetes.Clientset, h } } - if err := CheckNodeReady(kubeClient, host, ControlRole); err != nil { + if err := CheckNodeReady(kubeClient, host, ControlRole, cloudProviderName); err != nil { return err } - return k8s.CordonUncordon(kubeClient, host.HostnameOverride, false) + return k8s.CordonUncordon(kubeClient, host.HostnameOverride, cloudProviderName, false) } func RemoveControlPlane(ctx context.Context, controlHosts []*hosts.Host, force bool) error { diff --git a/services/node_util.go b/services/node_util.go index 42b590c0..df26f45a 100644 --- a/services/node_util.go +++ b/services/node_util.go @@ -17,10 +17,10 @@ import ( "k8s.io/kubectl/pkg/drain" ) -func CheckNodeReady(kubeClient *kubernetes.Clientset, runHost *hosts.Host, component string) error { +func CheckNodeReady(kubeClient *kubernetes.Clientset, runHost *hosts.Host, component, cloudProviderName string) error { for retries := 0; retries < k8s.MaxRetries; retries++ { logrus.Infof("[%s] Now checking status of node %v, try #%v", component, runHost.HostnameOverride, retries+1) - k8sNode, err := k8s.GetNode(kubeClient, runHost.HostnameOverride) + k8sNode, err := k8s.GetNode(kubeClient, runHost.HostnameOverride, cloudProviderName) if err != nil { return fmt.Errorf("[%s] Error getting node %v: %v", component, runHost.HostnameOverride, err) } @@ -33,9 +33,9 @@ func CheckNodeReady(kubeClient *kubernetes.Clientset, runHost *hosts.Host, compo return fmt.Errorf("host %v not ready", runHost.HostnameOverride) } -func cordonAndDrainNode(kubeClient *kubernetes.Clientset, host *hosts.Host, drainNode bool, drainHelper drain.Helper, component string) error { +func cordonAndDrainNode(kubeClient *kubernetes.Clientset, host *hosts.Host, drainNode bool, drainHelper drain.Helper, component, cloudProviderName string) error { logrus.Debugf("[%s] Cordoning node %v", component, host.HostnameOverride) - if err := k8s.CordonUncordon(kubeClient, host.HostnameOverride, true); err != nil { + if err := k8s.CordonUncordon(kubeClient, host.HostnameOverride, cloudProviderName, true); err != nil { return err } if !drainNode { diff --git a/services/workerplane.go b/services/workerplane.go index a75d250d..a9e8e6b7 100644 --- a/services/workerplane.go +++ b/services/workerplane.go @@ -52,14 +52,17 @@ func RunWorkerPlane(ctx context.Context, allHosts []*hosts.Host, localConnDialer return nil } -func UpgradeWorkerPlaneForWorkerAndEtcdNodes(ctx context.Context, kubeClient *kubernetes.Clientset, mixedRolesHosts []*hosts.Host, workerOnlyHosts []*hosts.Host, inactiveHosts map[string]bool, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts map[string]bool, maxUnavailable int, k8sVersion string) (string, error) { +func UpgradeWorkerPlaneForWorkerAndEtcdNodes(ctx context.Context, kubeClient *kubernetes.Clientset, mixedRolesHosts []*hosts.Host, workerOnlyHosts []*hosts.Host, inactiveHosts map[string]bool, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, + updateWorkersOnly bool, alpineImage string, upgradeStrategy *v3.NodeUpgradeStrategy, + newHosts map[string]bool, maxUnavailable int, k8sVersion, cloudProviderName string) (string, error) { log.Infof(ctx, "[%s] Upgrading Worker Plane..", WorkerRole) var errMsgMaxUnavailableNotFailed string - updateNewHostsList(kubeClient, append(mixedRolesHosts, workerOnlyHosts...), newHosts) + updateNewHostsList(kubeClient, append(mixedRolesHosts, workerOnlyHosts...), newHosts, cloudProviderName) if len(mixedRolesHosts) > 0 { log.Infof(ctx, "First checking and processing worker components for upgrades on nodes with etcd role one at a time") } - multipleRolesHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, mixedRolesHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, 1, upgradeStrategy, newHosts, inactiveHosts, k8sVersion) + multipleRolesHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, mixedRolesHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, + 1, upgradeStrategy, newHosts, inactiveHosts, k8sVersion, cloudProviderName) if err != nil { logrus.Errorf("Failed to upgrade hosts: %v with error %v", strings.Join(multipleRolesHostsFailedToUpgrade, ","), err) return errMsgMaxUnavailableNotFailed, err @@ -68,7 +71,8 @@ func UpgradeWorkerPlaneForWorkerAndEtcdNodes(ctx context.Context, kubeClient *ku if len(workerOnlyHosts) > 0 { log.Infof(ctx, "Now checking and upgrading worker components on nodes with only worker role %v at a time", maxUnavailable) } - workerOnlyHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, workerOnlyHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, maxUnavailable, upgradeStrategy, newHosts, inactiveHosts, k8sVersion) + workerOnlyHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, workerOnlyHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, + maxUnavailable, upgradeStrategy, newHosts, inactiveHosts, k8sVersion, cloudProviderName) if err != nil { logrus.Errorf("Failed to upgrade hosts: %v with error %v", strings.Join(workerOnlyHostsFailedToUpgrade, ","), err) if len(workerOnlyHostsFailedToUpgrade) >= maxUnavailable { @@ -81,9 +85,9 @@ func UpgradeWorkerPlaneForWorkerAndEtcdNodes(ctx context.Context, kubeClient *ku return errMsgMaxUnavailableNotFailed, nil } -func updateNewHostsList(kubeClient *kubernetes.Clientset, allHosts []*hosts.Host, newHosts map[string]bool) { +func updateNewHostsList(kubeClient *kubernetes.Clientset, allHosts []*hosts.Host, newHosts map[string]bool, cloudProviderName string) { for _, h := range allHosts { - _, err := k8s.GetNode(kubeClient, h.HostnameOverride) + _, err := k8s.GetNode(kubeClient, h.HostnameOverride, cloudProviderName) if err != nil && apierrors.IsNotFound(err) { // this host could have been added to cluster state upon successful controlplane upgrade but isn't a node yet. newHosts[h.HostnameOverride] = true @@ -93,7 +97,7 @@ func updateNewHostsList(kubeClient *kubernetes.Clientset, allHosts []*hosts.Host func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Clientset, allHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string, - maxUnavailable int, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool, k8sVersion string) ([]string, error) { + maxUnavailable int, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool, k8sVersion, cloudProviderName string) ([]string, error) { var errgrp errgroup.Group var drainHelper drain.Helper var failedHosts []string @@ -128,7 +132,7 @@ func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Cl } continue } - if err := CheckNodeReady(kubeClient, runHost, WorkerRole); err != nil { + if err := CheckNodeReady(kubeClient, runHost, WorkerRole, cloudProviderName); err != nil { errList = append(errList, err) hostsFailed.Store(runHost.HostnameOverride, true) hostsFailedToUpgrade <- runHost.HostnameOverride @@ -163,13 +167,14 @@ func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Cl } if !upgradable { logrus.Infof("[workerplane] Upgrade not required for worker components of host %v", runHost.HostnameOverride) - if err := k8s.CordonUncordon(kubeClient, runHost.HostnameOverride, false); err != nil { + if err := k8s.CordonUncordon(kubeClient, runHost.HostnameOverride, cloudProviderName, false); err != nil { // This node didn't undergo an upgrade, so RKE will only log any error after uncordoning it and won't count this in maxUnavailable logrus.Errorf("[workerplane] Failed to uncordon node %v, error: %v", runHost.HostnameOverride, err) } continue } - if err := upgradeWorkerHost(ctx, kubeClient, runHost, upgradeStrategy.Drain != nil && *upgradeStrategy.Drain, drainHelper, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, k8sVersion); err != nil { + if err := upgradeWorkerHost(ctx, kubeClient, runHost, upgradeStrategy.Drain != nil && *upgradeStrategy.Drain, drainHelper, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, + updateWorkersOnly, alpineImage, k8sVersion, cloudProviderName); err != nil { errList = append(errList, err) hostsFailed.Store(runHost.HostnameOverride, true) hostsFailedToUpgrade <- runHost.HostnameOverride @@ -192,9 +197,9 @@ func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Cl func upgradeWorkerHost(ctx context.Context, kubeClient *kubernetes.Clientset, runHost *hosts.Host, drainFlag bool, drainHelper drain.Helper, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, - alpineImage, k8sVersion string) error { + alpineImage, k8sVersion, cloudProviderName string) error { // cordon and drain - if err := cordonAndDrainNode(kubeClient, runHost, drainFlag, drainHelper, WorkerRole); err != nil { + if err := cordonAndDrainNode(kubeClient, runHost, drainFlag, drainHelper, WorkerRole, cloudProviderName); err != nil { return err } logrus.Debugf("[workerplane] upgrading host %v", runHost.HostnameOverride) @@ -202,11 +207,11 @@ func upgradeWorkerHost(ctx context.Context, kubeClient *kubernetes.Clientset, ru return err } // consider upgrade done when kubeclient lists node as ready - if err := CheckNodeReady(kubeClient, runHost, WorkerRole); err != nil { + if err := CheckNodeReady(kubeClient, runHost, WorkerRole, cloudProviderName); err != nil { return err } // uncordon node - return k8s.CordonUncordon(kubeClient, runHost.HostnameOverride, false) + return k8s.CordonUncordon(kubeClient, runHost.HostnameOverride, cloudProviderName, false) } func doDeployWorkerPlaneHost(ctx context.Context, host *hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, processMap map[string]v3.Process, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage, k8sVersion string) error {