diff --git a/cluster/cluster.go b/cluster/cluster.go index 043f0cf4..02ebf6f5 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -66,6 +66,8 @@ type Cluster struct { WorkerHosts []*hosts.Host EncryptionConfig encryptionConfig NewHosts map[string]bool + MaxUnavailableForWorkerNodes int + HostsLabeledToIgnoreUpgrade map[string]bool } type encryptionConfig struct { @@ -139,20 +141,32 @@ func (c *Cluster) DeployControlPlane(ctx context.Context, svcOptionData map[stri } return nil } - if err := services.UpgradeControlPlane(ctx, kubeClient, c.ControlPlaneHosts, + inactiveHosts := make(map[string]bool) + var controlPlaneHosts []*hosts.Host + for _, host := range c.InactiveHosts { + if !c.HostsLabeledToIgnoreUpgrade[host.Address] { + inactiveHosts[host.HostnameOverride] = true + } + } + for _, host := range c.ControlPlaneHosts { + if !c.HostsLabeledToIgnoreUpgrade[host.Address] { + controlPlaneHosts = append(controlPlaneHosts, host) + } + } + if err := services.UpgradeControlPlaneNodes(ctx, kubeClient, controlPlaneHosts, c.LocalConnDialerFactory, c.PrivateRegistriesMap, cpNodePlanMap, c.UpdateWorkersOnly, c.SystemImages.Alpine, - c.Certificates, c.UpgradeStrategy, c.NewHosts); err != nil { + c.Certificates, c.UpgradeStrategy, c.NewHosts, inactiveHosts); err != nil { return fmt.Errorf("[controlPlane] Failed to upgrade Control Plane: %v", err) } return nil } func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[string]*v3.KubernetesServicesOptions, reconcileCluster bool) (string, error) { - var workerOnlyHosts, multipleRolesHosts []*hosts.Host + var workerOnlyHosts, etcdAndWorkerHosts []*hosts.Host kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport) if err != nil { return "", fmt.Errorf("failed to initialize new kubernetes client: %v", err) @@ -161,12 +175,18 @@ func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[strin workerNodePlanMap := make(map[string]v3.RKEConfigNodePlan) // Build cp node plan map allHosts := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts) - for _, workerHost := range allHosts { - workerNodePlanMap[workerHost.Address] = BuildRKEConfigNodePlan(ctx, c, workerHost, workerHost.DockerInfo, svcOptionData) - if !workerHost.IsControl && !workerHost.IsEtcd { - workerOnlyHosts = append(workerOnlyHosts, workerHost) + for _, host := range allHosts { + workerNodePlanMap[host.Address] = BuildRKEConfigNodePlan(ctx, c, host, host.DockerInfo, svcOptionData) + if host.IsControl || c.HostsLabeledToIgnoreUpgrade[host.Address] { + continue + } + if !host.IsEtcd { + // separating hosts with only worker role so they undergo upgrade in maxUnavailable batches + workerOnlyHosts = append(workerOnlyHosts, host) } else { - multipleRolesHosts = append(multipleRolesHosts, workerHost) + // separating nodes with etcd role, since at this point worker components in controlplane nodes are already upgraded by `UpgradeControlPlaneNodes` + // and these nodes will undergo upgrade of worker components sequentially + etcdAndWorkerHosts = append(etcdAndWorkerHosts, host) } } @@ -182,13 +202,20 @@ func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[strin } return "", nil } - errMsgMaxUnavailableNotFailed, err := services.UpgradeWorkerPlane(ctx, kubeClient, multipleRolesHosts, workerOnlyHosts, c.InactiveHosts, + + inactiveHosts := make(map[string]bool) + for _, host := range c.InactiveHosts { + if !c.HostsLabeledToIgnoreUpgrade[host.Address] { + inactiveHosts[host.HostnameOverride] = true + } + } + errMsgMaxUnavailableNotFailed, err := services.UpgradeWorkerPlaneForWorkerAndEtcdNodes(ctx, kubeClient, etcdAndWorkerHosts, workerOnlyHosts, inactiveHosts, c.LocalConnDialerFactory, c.PrivateRegistriesMap, workerNodePlanMap, c.Certificates, c.UpdateWorkersOnly, - c.SystemImages.Alpine, c.UpgradeStrategy, c.NewHosts) + c.SystemImages.Alpine, c.UpgradeStrategy, c.NewHosts, c.MaxUnavailableForWorkerNodes) if err != nil { return "", fmt.Errorf("[workerPlane] Failed to upgrade Worker Plane: %v", err) } @@ -473,6 +500,7 @@ func InitClusterObject(ctx context.Context, rkeConfig *v3.RancherKubernetesEngin EncryptionConfig: encryptionConfig{ EncryptionProviderFile: encryptConfig, }, + HostsLabeledToIgnoreUpgrade: make(map[string]bool), } if metadata.K8sVersionToRKESystemImages == nil { metadata.InitMetadata(ctx) diff --git a/cluster/defaults.go b/cluster/defaults.go index 1553fccd..601b80f3 100644 --- a/cluster/defaults.go +++ b/cluster/defaults.go @@ -200,7 +200,7 @@ func (c *Cluster) setClusterDefaults(ctx context.Context, flags ExternalFlags) e func (c *Cluster) setNodeUpgradeStrategy() { if c.UpgradeStrategy == nil { - logrus.Info("No input provided for maxUnavailable, setting it to default value of 10%") + logrus.Infof("No input provided for maxUnavailable, setting it to default value of %v", DefaultMaxUnavailable) c.UpgradeStrategy = &v3.NodeUpgradeStrategy{ MaxUnavailable: DefaultMaxUnavailable, } diff --git a/cluster/hosts.go b/cluster/hosts.go index 96f12654..4ccd4dde 100644 --- a/cluster/hosts.go +++ b/cluster/hosts.go @@ -4,9 +4,11 @@ import ( "context" "fmt" "strings" + "time" "github.com/docker/docker/api/types" "github.com/rancher/rke/hosts" + "github.com/rancher/rke/k8s" "github.com/rancher/rke/log" "github.com/rancher/rke/pki" "github.com/rancher/rke/services" @@ -14,6 +16,8 @@ import ( v3 "github.com/rancher/types/apis/management.cattle.io/v3" "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apiserver/pkg/apis/apiserver/v1alpha1" "sigs.k8s.io/yaml" ) @@ -61,7 +65,36 @@ func (c *Cluster) TunnelHosts(ctx context.Context, flags ExternalFlags) error { c.RancherKubernetesEngineConfig.Nodes = removeFromRKENodes(host.RKEConfigNode, c.RancherKubernetesEngineConfig.Nodes) } return ValidateHostCount(c) +} +func (c *Cluster) RemoveHostsLabeledToIgnoreUpgrade(ctx context.Context) { + kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport) + if err != nil { + logrus.Errorf("Error generating kube client in RemoveHostsLabeledToIgnoreUpgrade: %v", err) + return + } + var nodes *v1.NodeList + for retries := 0; retries < k8s.MaxRetries; retries++ { + nodes, err = kubeClient.CoreV1().Nodes().List(metav1.ListOptions{}) + if err == nil { + break + } + time.Sleep(time.Second * k8s.RetryInterval) + } + if err != nil { + log.Infof(ctx, "Error listing nodes but continuing upgrade: %v", err) + return + } + if nodes == nil { + return + } + for _, node := range nodes.Items { + if val, ok := node.Labels[k8s.IgnoreHostDuringUpgradeLabel]; ok && val == "true" { + host := hosts.Host{RKEConfigNode: v3.RKEConfigNode{Address: node.Annotations[k8s.ExternalAddressAnnotation]}} + c.HostsLabeledToIgnoreUpgrade[host.Address] = true + } + } + return } func (c *Cluster) InvertIndexHosts() error { diff --git a/cluster/validation.go b/cluster/validation.go index c7425a42..ca290f08 100644 --- a/cluster/validation.go +++ b/cluster/validation.go @@ -196,37 +196,38 @@ func ValidateHostCount(c *Cluster) error { return nil } -func (c *Cluster) ValidateHostCountForUpgrade() error { +func (c *Cluster) ValidateHostCountForUpgradeAndCalculateMaxUnavailable() (int, error) { var inactiveControlPlaneHosts, inactiveWorkerOnlyHosts []string - var workerOnlyHosts int + var workerOnlyHosts, maxUnavailable int + for _, host := range c.InactiveHosts { - if host.IsControl { + if host.IsControl && !c.HostsLabeledToIgnoreUpgrade[host.Address] { inactiveControlPlaneHosts = append(inactiveControlPlaneHosts, host.HostnameOverride) } - if !host.IsEtcd && !host.IsControl { + if !host.IsEtcd && !host.IsControl && !c.HostsLabeledToIgnoreUpgrade[host.Address] { inactiveWorkerOnlyHosts = append(inactiveWorkerOnlyHosts, host.HostnameOverride) } // not breaking out of the loop so we can log all of the inactive hosts } if len(inactiveControlPlaneHosts) >= 1 { - return fmt.Errorf("cannot proceed with upgrade of controlplane if one or more controlplane hosts are inactive; found inactive hosts: %v", strings.Join(inactiveControlPlaneHosts, ",")) + return maxUnavailable, fmt.Errorf("cannot proceed with upgrade of controlplane if one or more controlplane hosts are inactive; found inactive hosts: %v", strings.Join(inactiveControlPlaneHosts, ",")) } - for _, host := range c.WorkerHosts { - if host.IsControl || host.IsEtcd { + if host.IsControl || host.IsEtcd || c.HostsLabeledToIgnoreUpgrade[host.Address] { continue } workerOnlyHosts++ } - + // maxUnavailable should be calculated against all hosts provided in cluster.yml except the ones labelled to be ignored for upgrade + workerOnlyHosts += len(inactiveWorkerOnlyHosts) maxUnavailable, err := services.CalculateMaxUnavailable(c.UpgradeStrategy.MaxUnavailable, workerOnlyHosts) if err != nil { - return err + return maxUnavailable, err } if len(inactiveWorkerOnlyHosts) >= maxUnavailable { - return fmt.Errorf("cannot proceed with upgrade of worker components since %v (>=maxUnavailable) hosts are inactive; found inactive hosts: %v", len(inactiveWorkerOnlyHosts), strings.Join(inactiveWorkerOnlyHosts, ",")) + return maxUnavailable, fmt.Errorf("cannot proceed with upgrade of worker components since %v (>=maxUnavailable) hosts are inactive; found inactive hosts: %v", len(inactiveWorkerOnlyHosts), strings.Join(inactiveWorkerOnlyHosts, ",")) } - return nil + return maxUnavailable, nil } func validateDuplicateNodes(c *Cluster) error { diff --git a/cmd/up.go b/cmd/up.go index 09f5c266..ceb6dca2 100644 --- a/cmd/up.go +++ b/cmd/up.go @@ -113,11 +113,6 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c if err != nil { return APIURL, caCrt, clientCert, clientKey, nil, err } - - err = kubeCluster.ValidateHostCountForUpgrade() - if err != nil { - return APIURL, caCrt, clientCert, clientKey, nil, err - } currentCluster, err := kubeCluster.GetClusterState(ctx, clusterState) if err != nil { return APIURL, caCrt, clientCert, clientKey, nil, err @@ -140,6 +135,14 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c } kubeCluster.NewHosts = newNodes reconcileCluster = true + + kubeCluster.RemoveHostsLabeledToIgnoreUpgrade(ctx) + maxUnavailable, err := kubeCluster.ValidateHostCountForUpgradeAndCalculateMaxUnavailable() + if err != nil { + return APIURL, caCrt, clientCert, clientKey, nil, err + } + logrus.Infof("Setting maxUnavailable for worker nodes to: %v", maxUnavailable) + kubeCluster.MaxUnavailableForWorkerNodes = maxUnavailable } if !flags.DisablePortCheck { @@ -244,10 +247,16 @@ func checkAllIncluded(cluster *cluster.Cluster) error { var names []string for _, host := range cluster.InactiveHosts { + if cluster.HostsLabeledToIgnoreUpgrade[host.Address] { + continue + } names = append(names, host.Address) } - return fmt.Errorf("Provisioning incomplete, host(s) [%s] skipped because they could not be contacted", strings.Join(names, ",")) + if len(names) > 0 { + return fmt.Errorf("Provisioning incomplete, host(s) [%s] skipped because they could not be contacted", strings.Join(names, ",")) + } + return nil } func clusterUpFromCli(ctx *cli.Context) error { diff --git a/k8s/node.go b/k8s/node.go index f99897c6..2b1b5a5e 100644 --- a/k8s/node.go +++ b/k8s/node.go @@ -14,12 +14,13 @@ import ( ) const ( - HostnameLabel = "kubernetes.io/hostname" - InternalAddressAnnotation = "rke.cattle.io/internal-ip" - ExternalAddressAnnotation = "rke.cattle.io/external-ip" - AWSCloudProvider = "aws" - MaxRetries = 5 - RetryInterval = 5 + HostnameLabel = "kubernetes.io/hostname" + InternalAddressAnnotation = "rke.cattle.io/internal-ip" + ExternalAddressAnnotation = "rke.cattle.io/external-ip" + IgnoreHostDuringUpgradeLabel = "rke.cattle.io/ignore-during-upgrade" + AWSCloudProvider = "aws" + MaxRetries = 5 + RetryInterval = 5 ) func DeleteNode(k8sClient *kubernetes.Clientset, nodeName, cloudProvider string) error { diff --git a/services/controlplane.go b/services/controlplane.go index 88db6387..64876230 100644 --- a/services/controlplane.go +++ b/services/controlplane.go @@ -2,6 +2,7 @@ package services import ( "context" + "fmt" "strings" "sync" @@ -47,13 +48,15 @@ func RunControlPlane(ctx context.Context, controlHosts []*hosts.Host, localConnD return nil } -func UpgradeControlPlane(ctx context.Context, kubeClient *kubernetes.Clientset, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string, certMap map[string]pki.CertificatePKI, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts map[string]bool) error { +func UpgradeControlPlaneNodes(ctx context.Context, kubeClient *kubernetes.Clientset, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, + prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string, certMap map[string]pki.CertificatePKI, + upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool) error { if updateWorkersOnly { return nil } var drainHelper drain.Helper - log.Infof(ctx, "[%s] Processing control plane components for upgrade one at a time", ControlRole) + log.Infof(ctx, "[%s] Processing controlplane hosts for upgrade one at a time", ControlRole) if len(newHosts) > 0 { var nodes []string for _, host := range controlHosts { @@ -67,6 +70,12 @@ func UpgradeControlPlane(ctx context.Context, kubeClient *kubernetes.Clientset, } if upgradeStrategy.Drain { drainHelper = getDrainHelper(kubeClient, *upgradeStrategy) + log.Infof(ctx, "[%s] Parameters provided to drain command: %#v", ControlRole, fmt.Sprintf("Force: %v, IgnoreAllDaemonSets: %v, DeleteLocalData: %v, Timeout: %v, GracePeriodSeconds: %v", drainHelper.Force, drainHelper.IgnoreAllDaemonSets, drainHelper.DeleteLocalData, drainHelper.Timeout, drainHelper.GracePeriodSeconds)) + } + + currentHostsPool := make(map[string]bool) + for _, host := range controlHosts { + currentHostsPool[host.HostnameOverride] = true } // upgrade control plane hosts one at a time for zero downtime upgrades for _, host := range controlHosts { @@ -75,16 +84,19 @@ func UpgradeControlPlane(ctx context.Context, kubeClient *kubernetes.Clientset, if err := doDeployControlHost(ctx, host, localConnDialerFactory, prsMap, cpNodePlanMap[host.Address].Processes, alpineImage, certMap); err != nil { return err } + if err := doDeployWorkerPlaneHost(ctx, host, localConnDialerFactory, prsMap, cpNodePlanMap[host.Address].Processes, certMap, updateWorkersOnly, alpineImage); err != nil { + return err + } continue } - nodes, err := getNodeListForUpgrade(kubeClient, &sync.Map{}, newHosts, false) + nodes, err := getNodeListForUpgrade(kubeClient, &sync.Map{}, newHosts, inactiveHosts) if err != nil { return err } var maxUnavailableHit bool for _, node := range nodes { // in case any previously added nodes or till now unprocessed nodes become unreachable during upgrade - if !k8s.IsNodeReady(node) { + if !k8s.IsNodeReady(node) && currentHostsPool[node.Labels[k8s.HostnameLabel]] { maxUnavailableHit = true break } @@ -93,27 +105,19 @@ func UpgradeControlPlane(ctx context.Context, kubeClient *kubernetes.Clientset, return err } - upgradable, err := isControlPlaneHostUpgradable(ctx, host, cpNodePlanMap[host.Address].Processes) + controlPlaneUpgradable, err := isControlPlaneHostUpgradable(ctx, host, cpNodePlanMap[host.Address].Processes) if err != nil { return err } - if !upgradable { - log.Infof(ctx, "Upgrade not required for controlplane components of host %v", host.HostnameOverride) + workerPlaneUpgradable, err := isWorkerHostUpgradable(ctx, host, cpNodePlanMap[host.Address].Processes) + if err != nil { + return err + } + if !controlPlaneUpgradable && !workerPlaneUpgradable { + log.Infof(ctx, "Upgrade not required for controlplane and worker components of host %v", host.HostnameOverride) continue } - if err := checkNodeReady(kubeClient, host, ControlRole); err != nil { - return err - } - if err := cordonAndDrainNode(kubeClient, host, upgradeStrategy.Drain, drainHelper, ControlRole); err != nil { - return err - } - if err := doDeployControlHost(ctx, host, localConnDialerFactory, prsMap, cpNodePlanMap[host.Address].Processes, alpineImage, certMap); err != nil { - return err - } - if err := checkNodeReady(kubeClient, host, ControlRole); err != nil { - return err - } - if err := k8s.CordonUncordon(kubeClient, host.HostnameOverride, false); err != nil { + if err := upgradeControlHost(ctx, kubeClient, host, upgradeStrategy.Drain, drainHelper, localConnDialerFactory, prsMap, cpNodePlanMap, updateWorkersOnly, alpineImage, certMap, controlPlaneUpgradable, workerPlaneUpgradable); err != nil { return err } } @@ -121,6 +125,37 @@ func UpgradeControlPlane(ctx context.Context, kubeClient *kubernetes.Clientset, return nil } +func upgradeControlHost(ctx context.Context, kubeClient *kubernetes.Clientset, host *hosts.Host, drain bool, drainHelper drain.Helper, + localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, + alpineImage string, certMap map[string]pki.CertificatePKI, controlPlaneUpgradable, workerPlaneUpgradable bool) error { + if err := checkNodeReady(kubeClient, host, ControlRole); err != nil { + return err + } + if err := cordonAndDrainNode(kubeClient, host, drain, drainHelper, ControlRole); err != nil { + return err + } + if controlPlaneUpgradable { + log.Infof(ctx, "Upgrading controlplane components for control host %v", host.HostnameOverride) + if err := doDeployControlHost(ctx, host, localConnDialerFactory, prsMap, cpNodePlanMap[host.Address].Processes, alpineImage, certMap); err != nil { + return err + } + } + if workerPlaneUpgradable { + log.Infof(ctx, "Upgrading workerplane components for control host %v", host.HostnameOverride) + if err := doDeployWorkerPlaneHost(ctx, host, localConnDialerFactory, prsMap, cpNodePlanMap[host.Address].Processes, certMap, updateWorkersOnly, alpineImage); err != nil { + return err + } + } + + if err := checkNodeReady(kubeClient, host, ControlRole); err != nil { + return err + } + if err := k8s.CordonUncordon(kubeClient, host.HostnameOverride, false); err != nil { + return err + } + return nil +} + func RemoveControlPlane(ctx context.Context, controlHosts []*hosts.Host, force bool) error { log.Infof(ctx, "[%s] Tearing down the Controller Plane..", ControlRole) var errgrp errgroup.Group diff --git a/services/node_util.go b/services/node_util.go index e9884ee8..c52513db 100644 --- a/services/node_util.go +++ b/services/node_util.go @@ -60,21 +60,24 @@ func getDrainHelper(kubeClient *kubernetes.Clientset, upgradeStrategy v3.NodeUpg return drainHelper } -func getNodeListForUpgrade(kubeClient *kubernetes.Clientset, hostsFailed *sync.Map, newHosts map[string]bool, isUpgradeForWorkerPlane bool) ([]v1.Node, error) { +func getNodeListForUpgrade(kubeClient *kubernetes.Clientset, hostsFailed *sync.Map, newHosts, inactiveHosts map[string]bool) ([]v1.Node, error) { var nodeList []v1.Node nodes, err := k8s.GetNodeList(kubeClient) if err != nil { return nodeList, err } for _, node := range nodes.Items { - if isUpgradeForWorkerPlane { - // exclude hosts that are already included in failed hosts list - if _, ok := hostsFailed.Load(node.Name); ok { - continue - } + if _, ok := hostsFailed.Load(node.Labels[k8s.HostnameLabel]); ok { + continue } // exclude hosts that are newly added to the cluster since they can take time to come up - if newHosts[node.Name] { + if newHosts[node.Labels[k8s.HostnameLabel]] { + continue + } + if inactiveHosts[node.Labels[k8s.HostnameLabel]] { + continue + } + if val, ok := node.Labels[k8s.IgnoreHostDuringUpgradeLabel]; ok && val == "true" { continue } nodeList = append(nodeList, node) diff --git a/services/workerplane.go b/services/workerplane.go index 1c3a65fd..3d2c3227 100644 --- a/services/workerplane.go +++ b/services/workerplane.go @@ -53,13 +53,9 @@ func RunWorkerPlane(ctx context.Context, allHosts []*hosts.Host, localConnDialer return nil } -func UpgradeWorkerPlane(ctx context.Context, kubeClient *kubernetes.Clientset, multipleRolesHosts []*hosts.Host, workerOnlyHosts []*hosts.Host, inactiveHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts map[string]bool) (string, error) { +func UpgradeWorkerPlaneForWorkerAndEtcdNodes(ctx context.Context, kubeClient *kubernetes.Clientset, mixedRolesHosts []*hosts.Host, workerOnlyHosts []*hosts.Host, inactiveHosts map[string]bool, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts map[string]bool, maxUnavailable int) (string, error) { log.Infof(ctx, "[%s] Upgrading Worker Plane..", WorkerRole) var errMsgMaxUnavailableNotFailed string - maxUnavailable, err := CalculateMaxUnavailable(upgradeStrategy.MaxUnavailable, len(workerOnlyHosts)) - if err != nil { - return errMsgMaxUnavailableNotFailed, err - } if maxUnavailable > WorkerThreads { /* upgrading a large number of nodes in parallel leads to a large number of goroutines, which has led to errors regarding too many open sockets Because of this RKE switched to using workerpools. 50 workerthreads has been sufficient to optimize rke up, upgrading at most 50 nodes in parallel. @@ -68,18 +64,25 @@ func UpgradeWorkerPlane(ctx context.Context, kubeClient *kubernetes.Clientset, m logrus.Info("Setting maxUnavailable to 50, to avoid issues related to upgrading large number of nodes in parallel") } - maxUnavailable -= len(inactiveHosts) + if len(inactiveHosts) > 0 { + maxUnavailable -= len(inactiveHosts) + logrus.Infof("Resetting maxUnavailable to %v since %v host(s) are found to be inactive/unavailable prior to upgrade", maxUnavailable, len(inactiveHosts)) + } - updateNewHostsList(kubeClient, append(multipleRolesHosts, workerOnlyHosts...), newHosts) - log.Infof(ctx, "First checking and processing worker components for upgrades on nodes with etcd/controlplane roles one at a time") - multipleRolesHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, multipleRolesHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, 1, upgradeStrategy, newHosts) + updateNewHostsList(kubeClient, append(mixedRolesHosts, workerOnlyHosts...), newHosts) + if len(mixedRolesHosts) > 0 { + log.Infof(ctx, "First checking and processing worker components for upgrades on nodes with etcd role one at a time") + } + multipleRolesHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, mixedRolesHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, 1, upgradeStrategy, newHosts, inactiveHosts) if err != nil { logrus.Errorf("Failed to upgrade hosts: %v with error %v", strings.Join(multipleRolesHostsFailedToUpgrade, ","), err) return errMsgMaxUnavailableNotFailed, err } - log.Infof(ctx, "Now checking and upgrading worker components on nodes with only worker role %v at a time", maxUnavailable) - workerOnlyHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, workerOnlyHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, maxUnavailable, upgradeStrategy, newHosts) + if len(workerOnlyHosts) > 0 { + log.Infof(ctx, "Now checking and upgrading worker components on nodes with only worker role %v at a time", maxUnavailable) + } + workerOnlyHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, workerOnlyHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, maxUnavailable, upgradeStrategy, newHosts, inactiveHosts) if err != nil { logrus.Errorf("Failed to upgrade hosts: %v with error %v", strings.Join(workerOnlyHostsFailedToUpgrade, ","), err) if len(workerOnlyHostsFailedToUpgrade) >= maxUnavailable { @@ -105,7 +108,7 @@ func CalculateMaxUnavailable(maxUnavailableVal string, numHosts int) (int, error // In case there is only one node and rounding down maxUnvailable percentage led to 0 maxUnavailable = 1 } - logrus.Infof("%v worker nodes can be unavailable at a time", maxUnavailable) + logrus.Debugf("Parsed value of maxUnavailable: %v", maxUnavailable) return maxUnavailable, nil } @@ -119,8 +122,9 @@ func updateNewHostsList(kubeClient *kubernetes.Clientset, allHosts []*hosts.Host } } -func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Clientset, allHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string, - maxUnavailable int, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts map[string]bool) ([]string, error) { +func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Clientset, allHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, + prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string, + maxUnavailable int, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool) ([]string, error) { var errgrp errgroup.Group var drainHelper drain.Helper var failedHosts []string @@ -130,6 +134,12 @@ func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Cl hostsQueue := util.GetObjectQueue(allHosts) if upgradeStrategy.Drain { drainHelper = getDrainHelper(kubeClient, *upgradeStrategy) + log.Infof(ctx, "[%s] Parameters provided to drain command: %#v", WorkerRole, fmt.Sprintf("Force: %v, IgnoreAllDaemonSets: %v, DeleteLocalData: %v, Timeout: %v, GracePeriodSeconds: %v", drainHelper.Force, drainHelper.IgnoreAllDaemonSets, drainHelper.DeleteLocalData, drainHelper.Timeout, drainHelper.GracePeriodSeconds)) + + } + currentHostsPool := make(map[string]bool) + for _, host := range allHosts { + currentHostsPool[host.HostnameOverride] = true } /* Each worker thread starts a goroutine that reads the hostsQueue channel in a for loop Using same number of worker threads as maxUnavailable ensures only maxUnavailable number of nodes are being processed at a time @@ -149,14 +159,14 @@ func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Cl } continue } - nodes, err := getNodeListForUpgrade(kubeClient, &hostsFailed, newHosts, true) + nodes, err := getNodeListForUpgrade(kubeClient, &hostsFailed, newHosts, inactiveHosts) if err != nil { errList = append(errList, err) } var maxUnavailableHit bool for _, node := range nodes { // in case any previously added nodes or till now unprocessed nodes become unreachable during upgrade - if !k8s.IsNodeReady(node) { + if !k8s.IsNodeReady(node) && currentHostsPool[node.Labels[k8s.HostnameLabel]] { if len(hostsFailedToUpgrade) >= maxUnavailable { maxUnavailableHit = true break