1
0
mirror of https://github.com/rancher/rke.git synced 2025-08-02 07:43:04 +00:00

Merge pull request #1912 from mrajashree/zero_downtime_bugfixes

Accept label to ignore nodes during upgrade
This commit is contained in:
Rajashree Mandaogane 2020-02-24 10:06:57 -08:00 committed by GitHub
commit 6664be5e3b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 197 additions and 77 deletions

View File

@ -68,6 +68,8 @@ type Cluster struct {
WorkerHosts []*hosts.Host
EncryptionConfig encryptionConfig
NewHosts map[string]bool
MaxUnavailableForWorkerNodes int
HostsLabeledToIgnoreUpgrade map[string]bool
}
type encryptionConfig struct {
@ -147,20 +149,32 @@ func (c *Cluster) DeployControlPlane(ctx context.Context, svcOptionData map[stri
}
return nil
}
if err := services.UpgradeControlPlane(ctx, kubeClient, c.ControlPlaneHosts,
inactiveHosts := make(map[string]bool)
var controlPlaneHosts []*hosts.Host
for _, host := range c.InactiveHosts {
if !c.HostsLabeledToIgnoreUpgrade[host.Address] {
inactiveHosts[host.HostnameOverride] = true
}
}
for _, host := range c.ControlPlaneHosts {
if !c.HostsLabeledToIgnoreUpgrade[host.Address] {
controlPlaneHosts = append(controlPlaneHosts, host)
}
}
if err := services.UpgradeControlPlaneNodes(ctx, kubeClient, controlPlaneHosts,
c.LocalConnDialerFactory,
c.PrivateRegistriesMap,
cpNodePlanMap,
c.UpdateWorkersOnly,
c.SystemImages.Alpine,
c.Certificates, c.UpgradeStrategy, c.NewHosts); err != nil {
c.Certificates, c.UpgradeStrategy, c.NewHosts, inactiveHosts); err != nil {
return fmt.Errorf("[controlPlane] Failed to upgrade Control Plane: %v", err)
}
return nil
}
func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[string]*v3.KubernetesServicesOptions, reconcileCluster bool) (string, error) {
var workerOnlyHosts, multipleRolesHosts []*hosts.Host
var workerOnlyHosts, etcdAndWorkerHosts []*hosts.Host
kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
if err != nil {
return "", fmt.Errorf("failed to initialize new kubernetes client: %v", err)
@ -169,12 +183,18 @@ func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[strin
workerNodePlanMap := make(map[string]v3.RKEConfigNodePlan)
// Build cp node plan map
allHosts := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts)
for _, workerHost := range allHosts {
workerNodePlanMap[workerHost.Address] = BuildRKEConfigNodePlan(ctx, c, workerHost, workerHost.DockerInfo, svcOptionData)
if !workerHost.IsControl && !workerHost.IsEtcd {
workerOnlyHosts = append(workerOnlyHosts, workerHost)
for _, host := range allHosts {
workerNodePlanMap[host.Address] = BuildRKEConfigNodePlan(ctx, c, host, host.DockerInfo, svcOptionData)
if host.IsControl || c.HostsLabeledToIgnoreUpgrade[host.Address] {
continue
}
if !host.IsEtcd {
// separating hosts with only worker role so they undergo upgrade in maxUnavailable batches
workerOnlyHosts = append(workerOnlyHosts, host)
} else {
multipleRolesHosts = append(multipleRolesHosts, workerHost)
// separating nodes with etcd role, since at this point worker components in controlplane nodes are already upgraded by `UpgradeControlPlaneNodes`
// and these nodes will undergo upgrade of worker components sequentially
etcdAndWorkerHosts = append(etcdAndWorkerHosts, host)
}
}
@ -190,13 +210,20 @@ func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[strin
}
return "", nil
}
errMsgMaxUnavailableNotFailed, err := services.UpgradeWorkerPlane(ctx, kubeClient, multipleRolesHosts, workerOnlyHosts, c.InactiveHosts,
inactiveHosts := make(map[string]bool)
for _, host := range c.InactiveHosts {
if !c.HostsLabeledToIgnoreUpgrade[host.Address] {
inactiveHosts[host.HostnameOverride] = true
}
}
errMsgMaxUnavailableNotFailed, err := services.UpgradeWorkerPlaneForWorkerAndEtcdNodes(ctx, kubeClient, etcdAndWorkerHosts, workerOnlyHosts, inactiveHosts,
c.LocalConnDialerFactory,
c.PrivateRegistriesMap,
workerNodePlanMap,
c.Certificates,
c.UpdateWorkersOnly,
c.SystemImages.Alpine, c.UpgradeStrategy, c.NewHosts)
c.SystemImages.Alpine, c.UpgradeStrategy, c.NewHosts, c.MaxUnavailableForWorkerNodes)
if err != nil {
return "", fmt.Errorf("[workerPlane] Failed to upgrade Worker Plane: %v", err)
}
@ -554,6 +581,7 @@ func InitClusterObject(ctx context.Context, rkeConfig *v3.RancherKubernetesEngin
EncryptionConfig: encryptionConfig{
EncryptionProviderFile: encryptConfig,
},
HostsLabeledToIgnoreUpgrade: make(map[string]bool),
}
if metadata.K8sVersionToRKESystemImages == nil {
if err := metadata.InitMetadata(ctx); err != nil {

View File

@ -220,7 +220,7 @@ func (c *Cluster) setClusterDefaults(ctx context.Context, flags ExternalFlags) e
func (c *Cluster) setNodeUpgradeStrategy() {
if c.UpgradeStrategy == nil {
logrus.Info("No input provided for maxUnavailable, setting it to default value of 10%")
logrus.Infof("No input provided for maxUnavailable, setting it to default value of %v", DefaultMaxUnavailable)
c.UpgradeStrategy = &v3.NodeUpgradeStrategy{
MaxUnavailable: DefaultMaxUnavailable,
}

View File

@ -4,9 +4,11 @@ import (
"context"
"fmt"
"strings"
"time"
"github.com/docker/docker/api/types"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/k8s"
"github.com/rancher/rke/log"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/services"
@ -14,6 +16,8 @@ import (
v3 "github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/apis/apiserver/v1alpha1"
"sigs.k8s.io/yaml"
)
@ -61,7 +65,36 @@ func (c *Cluster) TunnelHosts(ctx context.Context, flags ExternalFlags) error {
c.RancherKubernetesEngineConfig.Nodes = removeFromRKENodes(host.RKEConfigNode, c.RancherKubernetesEngineConfig.Nodes)
}
return ValidateHostCount(c)
}
func (c *Cluster) RemoveHostsLabeledToIgnoreUpgrade(ctx context.Context) {
kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
if err != nil {
logrus.Errorf("Error generating kube client in RemoveHostsLabeledToIgnoreUpgrade: %v", err)
return
}
var nodes *v1.NodeList
for retries := 0; retries < k8s.MaxRetries; retries++ {
nodes, err = kubeClient.CoreV1().Nodes().List(metav1.ListOptions{})
if err == nil {
break
}
time.Sleep(time.Second * k8s.RetryInterval)
}
if err != nil {
log.Infof(ctx, "Error listing nodes but continuing upgrade: %v", err)
return
}
if nodes == nil {
return
}
for _, node := range nodes.Items {
if val, ok := node.Labels[k8s.IgnoreHostDuringUpgradeLabel]; ok && val == "true" {
host := hosts.Host{RKEConfigNode: v3.RKEConfigNode{Address: node.Annotations[k8s.ExternalAddressAnnotation]}}
c.HostsLabeledToIgnoreUpgrade[host.Address] = true
}
}
return
}
func (c *Cluster) InvertIndexHosts() error {

View File

@ -196,37 +196,38 @@ func ValidateHostCount(c *Cluster) error {
return nil
}
func (c *Cluster) ValidateHostCountForUpgrade() error {
func (c *Cluster) ValidateHostCountForUpgradeAndCalculateMaxUnavailable() (int, error) {
var inactiveControlPlaneHosts, inactiveWorkerOnlyHosts []string
var workerOnlyHosts int
var workerOnlyHosts, maxUnavailable int
for _, host := range c.InactiveHosts {
if host.IsControl {
if host.IsControl && !c.HostsLabeledToIgnoreUpgrade[host.Address] {
inactiveControlPlaneHosts = append(inactiveControlPlaneHosts, host.HostnameOverride)
}
if !host.IsEtcd && !host.IsControl {
if !host.IsEtcd && !host.IsControl && !c.HostsLabeledToIgnoreUpgrade[host.Address] {
inactiveWorkerOnlyHosts = append(inactiveWorkerOnlyHosts, host.HostnameOverride)
}
// not breaking out of the loop so we can log all of the inactive hosts
}
if len(inactiveControlPlaneHosts) >= 1 {
return fmt.Errorf("cannot proceed with upgrade of controlplane if one or more controlplane hosts are inactive; found inactive hosts: %v", strings.Join(inactiveControlPlaneHosts, ","))
return maxUnavailable, fmt.Errorf("cannot proceed with upgrade of controlplane if one or more controlplane hosts are inactive; found inactive hosts: %v", strings.Join(inactiveControlPlaneHosts, ","))
}
for _, host := range c.WorkerHosts {
if host.IsControl || host.IsEtcd {
if host.IsControl || host.IsEtcd || c.HostsLabeledToIgnoreUpgrade[host.Address] {
continue
}
workerOnlyHosts++
}
// maxUnavailable should be calculated against all hosts provided in cluster.yml except the ones labelled to be ignored for upgrade
workerOnlyHosts += len(inactiveWorkerOnlyHosts)
maxUnavailable, err := services.CalculateMaxUnavailable(c.UpgradeStrategy.MaxUnavailable, workerOnlyHosts)
if err != nil {
return err
return maxUnavailable, err
}
if len(inactiveWorkerOnlyHosts) >= maxUnavailable {
return fmt.Errorf("cannot proceed with upgrade of worker components since %v (>=maxUnavailable) hosts are inactive; found inactive hosts: %v", len(inactiveWorkerOnlyHosts), strings.Join(inactiveWorkerOnlyHosts, ","))
return maxUnavailable, fmt.Errorf("cannot proceed with upgrade of worker components since %v (>=maxUnavailable) hosts are inactive; found inactive hosts: %v", len(inactiveWorkerOnlyHosts), strings.Join(inactiveWorkerOnlyHosts, ","))
}
return nil
return maxUnavailable, nil
}
func validateDuplicateNodes(c *Cluster) error {

View File

@ -113,11 +113,6 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
err = kubeCluster.ValidateHostCountForUpgrade()
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
currentCluster, err := kubeCluster.GetClusterState(ctx, clusterState)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
@ -140,6 +135,14 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c
}
kubeCluster.NewHosts = newNodes
reconcileCluster = true
kubeCluster.RemoveHostsLabeledToIgnoreUpgrade(ctx)
maxUnavailable, err := kubeCluster.ValidateHostCountForUpgradeAndCalculateMaxUnavailable()
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
logrus.Infof("Setting maxUnavailable for worker nodes to: %v", maxUnavailable)
kubeCluster.MaxUnavailableForWorkerNodes = maxUnavailable
}
if !flags.DisablePortCheck {
@ -244,10 +247,16 @@ func checkAllIncluded(cluster *cluster.Cluster) error {
var names []string
for _, host := range cluster.InactiveHosts {
if cluster.HostsLabeledToIgnoreUpgrade[host.Address] {
continue
}
names = append(names, host.Address)
}
return fmt.Errorf("Provisioning incomplete, host(s) [%s] skipped because they could not be contacted", strings.Join(names, ","))
if len(names) > 0 {
return fmt.Errorf("Provisioning incomplete, host(s) [%s] skipped because they could not be contacted", strings.Join(names, ","))
}
return nil
}
func clusterUpFromCli(ctx *cli.Context) error {

View File

@ -14,12 +14,13 @@ import (
)
const (
HostnameLabel = "kubernetes.io/hostname"
InternalAddressAnnotation = "rke.cattle.io/internal-ip"
ExternalAddressAnnotation = "rke.cattle.io/external-ip"
AWSCloudProvider = "aws"
MaxRetries = 5
RetryInterval = 5
HostnameLabel = "kubernetes.io/hostname"
InternalAddressAnnotation = "rke.cattle.io/internal-ip"
ExternalAddressAnnotation = "rke.cattle.io/external-ip"
IgnoreHostDuringUpgradeLabel = "rke.cattle.io/ignore-during-upgrade"
AWSCloudProvider = "aws"
MaxRetries = 5
RetryInterval = 5
)
func DeleteNode(k8sClient *kubernetes.Clientset, nodeName, cloudProvider string) error {

View File

@ -2,6 +2,7 @@ package services
import (
"context"
"fmt"
"strings"
"sync"
@ -47,13 +48,15 @@ func RunControlPlane(ctx context.Context, controlHosts []*hosts.Host, localConnD
return nil
}
func UpgradeControlPlane(ctx context.Context, kubeClient *kubernetes.Clientset, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string, certMap map[string]pki.CertificatePKI, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts map[string]bool) error {
func UpgradeControlPlaneNodes(ctx context.Context, kubeClient *kubernetes.Clientset, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory,
prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string, certMap map[string]pki.CertificatePKI,
upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool) error {
if updateWorkersOnly {
return nil
}
var drainHelper drain.Helper
log.Infof(ctx, "[%s] Processing control plane components for upgrade one at a time", ControlRole)
log.Infof(ctx, "[%s] Processing controlplane hosts for upgrade one at a time", ControlRole)
if len(newHosts) > 0 {
var nodes []string
for _, host := range controlHosts {
@ -67,6 +70,12 @@ func UpgradeControlPlane(ctx context.Context, kubeClient *kubernetes.Clientset,
}
if upgradeStrategy.Drain {
drainHelper = getDrainHelper(kubeClient, *upgradeStrategy)
log.Infof(ctx, "[%s] Parameters provided to drain command: %#v", ControlRole, fmt.Sprintf("Force: %v, IgnoreAllDaemonSets: %v, DeleteLocalData: %v, Timeout: %v, GracePeriodSeconds: %v", drainHelper.Force, drainHelper.IgnoreAllDaemonSets, drainHelper.DeleteLocalData, drainHelper.Timeout, drainHelper.GracePeriodSeconds))
}
currentHostsPool := make(map[string]bool)
for _, host := range controlHosts {
currentHostsPool[host.HostnameOverride] = true
}
// upgrade control plane hosts one at a time for zero downtime upgrades
for _, host := range controlHosts {
@ -75,16 +84,19 @@ func UpgradeControlPlane(ctx context.Context, kubeClient *kubernetes.Clientset,
if err := doDeployControlHost(ctx, host, localConnDialerFactory, prsMap, cpNodePlanMap[host.Address].Processes, alpineImage, certMap); err != nil {
return err
}
if err := doDeployWorkerPlaneHost(ctx, host, localConnDialerFactory, prsMap, cpNodePlanMap[host.Address].Processes, certMap, updateWorkersOnly, alpineImage); err != nil {
return err
}
continue
}
nodes, err := getNodeListForUpgrade(kubeClient, &sync.Map{}, newHosts, false)
nodes, err := getNodeListForUpgrade(kubeClient, &sync.Map{}, newHosts, inactiveHosts)
if err != nil {
return err
}
var maxUnavailableHit bool
for _, node := range nodes {
// in case any previously added nodes or till now unprocessed nodes become unreachable during upgrade
if !k8s.IsNodeReady(node) {
if !k8s.IsNodeReady(node) && currentHostsPool[node.Labels[k8s.HostnameLabel]] {
maxUnavailableHit = true
break
}
@ -93,27 +105,19 @@ func UpgradeControlPlane(ctx context.Context, kubeClient *kubernetes.Clientset,
return err
}
upgradable, err := isControlPlaneHostUpgradable(ctx, host, cpNodePlanMap[host.Address].Processes)
controlPlaneUpgradable, err := isControlPlaneHostUpgradable(ctx, host, cpNodePlanMap[host.Address].Processes)
if err != nil {
return err
}
if !upgradable {
log.Infof(ctx, "Upgrade not required for controlplane components of host %v", host.HostnameOverride)
workerPlaneUpgradable, err := isWorkerHostUpgradable(ctx, host, cpNodePlanMap[host.Address].Processes)
if err != nil {
return err
}
if !controlPlaneUpgradable && !workerPlaneUpgradable {
log.Infof(ctx, "Upgrade not required for controlplane and worker components of host %v", host.HostnameOverride)
continue
}
if err := checkNodeReady(kubeClient, host, ControlRole); err != nil {
return err
}
if err := cordonAndDrainNode(kubeClient, host, upgradeStrategy.Drain, drainHelper, ControlRole); err != nil {
return err
}
if err := doDeployControlHost(ctx, host, localConnDialerFactory, prsMap, cpNodePlanMap[host.Address].Processes, alpineImage, certMap); err != nil {
return err
}
if err := checkNodeReady(kubeClient, host, ControlRole); err != nil {
return err
}
if err := k8s.CordonUncordon(kubeClient, host.HostnameOverride, false); err != nil {
if err := upgradeControlHost(ctx, kubeClient, host, upgradeStrategy.Drain, drainHelper, localConnDialerFactory, prsMap, cpNodePlanMap, updateWorkersOnly, alpineImage, certMap, controlPlaneUpgradable, workerPlaneUpgradable); err != nil {
return err
}
}
@ -121,6 +125,37 @@ func UpgradeControlPlane(ctx context.Context, kubeClient *kubernetes.Clientset,
return nil
}
func upgradeControlHost(ctx context.Context, kubeClient *kubernetes.Clientset, host *hosts.Host, drain bool, drainHelper drain.Helper,
localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool,
alpineImage string, certMap map[string]pki.CertificatePKI, controlPlaneUpgradable, workerPlaneUpgradable bool) error {
if err := checkNodeReady(kubeClient, host, ControlRole); err != nil {
return err
}
if err := cordonAndDrainNode(kubeClient, host, drain, drainHelper, ControlRole); err != nil {
return err
}
if controlPlaneUpgradable {
log.Infof(ctx, "Upgrading controlplane components for control host %v", host.HostnameOverride)
if err := doDeployControlHost(ctx, host, localConnDialerFactory, prsMap, cpNodePlanMap[host.Address].Processes, alpineImage, certMap); err != nil {
return err
}
}
if workerPlaneUpgradable {
log.Infof(ctx, "Upgrading workerplane components for control host %v", host.HostnameOverride)
if err := doDeployWorkerPlaneHost(ctx, host, localConnDialerFactory, prsMap, cpNodePlanMap[host.Address].Processes, certMap, updateWorkersOnly, alpineImage); err != nil {
return err
}
}
if err := checkNodeReady(kubeClient, host, ControlRole); err != nil {
return err
}
if err := k8s.CordonUncordon(kubeClient, host.HostnameOverride, false); err != nil {
return err
}
return nil
}
func RemoveControlPlane(ctx context.Context, controlHosts []*hosts.Host, force bool) error {
log.Infof(ctx, "[%s] Tearing down the Controller Plane..", ControlRole)
var errgrp errgroup.Group

View File

@ -60,21 +60,24 @@ func getDrainHelper(kubeClient *kubernetes.Clientset, upgradeStrategy v3.NodeUpg
return drainHelper
}
func getNodeListForUpgrade(kubeClient *kubernetes.Clientset, hostsFailed *sync.Map, newHosts map[string]bool, isUpgradeForWorkerPlane bool) ([]v1.Node, error) {
func getNodeListForUpgrade(kubeClient *kubernetes.Clientset, hostsFailed *sync.Map, newHosts, inactiveHosts map[string]bool) ([]v1.Node, error) {
var nodeList []v1.Node
nodes, err := k8s.GetNodeList(kubeClient)
if err != nil {
return nodeList, err
}
for _, node := range nodes.Items {
if isUpgradeForWorkerPlane {
// exclude hosts that are already included in failed hosts list
if _, ok := hostsFailed.Load(node.Name); ok {
continue
}
if _, ok := hostsFailed.Load(node.Labels[k8s.HostnameLabel]); ok {
continue
}
// exclude hosts that are newly added to the cluster since they can take time to come up
if newHosts[node.Name] {
if newHosts[node.Labels[k8s.HostnameLabel]] {
continue
}
if inactiveHosts[node.Labels[k8s.HostnameLabel]] {
continue
}
if val, ok := node.Labels[k8s.IgnoreHostDuringUpgradeLabel]; ok && val == "true" {
continue
}
nodeList = append(nodeList, node)

View File

@ -53,13 +53,9 @@ func RunWorkerPlane(ctx context.Context, allHosts []*hosts.Host, localConnDialer
return nil
}
func UpgradeWorkerPlane(ctx context.Context, kubeClient *kubernetes.Clientset, multipleRolesHosts []*hosts.Host, workerOnlyHosts []*hosts.Host, inactiveHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts map[string]bool) (string, error) {
func UpgradeWorkerPlaneForWorkerAndEtcdNodes(ctx context.Context, kubeClient *kubernetes.Clientset, mixedRolesHosts []*hosts.Host, workerOnlyHosts []*hosts.Host, inactiveHosts map[string]bool, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts map[string]bool, maxUnavailable int) (string, error) {
log.Infof(ctx, "[%s] Upgrading Worker Plane..", WorkerRole)
var errMsgMaxUnavailableNotFailed string
maxUnavailable, err := CalculateMaxUnavailable(upgradeStrategy.MaxUnavailable, len(workerOnlyHosts))
if err != nil {
return errMsgMaxUnavailableNotFailed, err
}
if maxUnavailable > WorkerThreads {
/* upgrading a large number of nodes in parallel leads to a large number of goroutines, which has led to errors regarding too many open sockets
Because of this RKE switched to using workerpools. 50 workerthreads has been sufficient to optimize rke up, upgrading at most 50 nodes in parallel.
@ -68,18 +64,25 @@ func UpgradeWorkerPlane(ctx context.Context, kubeClient *kubernetes.Clientset, m
logrus.Info("Setting maxUnavailable to 50, to avoid issues related to upgrading large number of nodes in parallel")
}
maxUnavailable -= len(inactiveHosts)
if len(inactiveHosts) > 0 {
maxUnavailable -= len(inactiveHosts)
logrus.Infof("Resetting maxUnavailable to %v since %v host(s) are found to be inactive/unavailable prior to upgrade", maxUnavailable, len(inactiveHosts))
}
updateNewHostsList(kubeClient, append(multipleRolesHosts, workerOnlyHosts...), newHosts)
log.Infof(ctx, "First checking and processing worker components for upgrades on nodes with etcd/controlplane roles one at a time")
multipleRolesHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, multipleRolesHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, 1, upgradeStrategy, newHosts)
updateNewHostsList(kubeClient, append(mixedRolesHosts, workerOnlyHosts...), newHosts)
if len(mixedRolesHosts) > 0 {
log.Infof(ctx, "First checking and processing worker components for upgrades on nodes with etcd role one at a time")
}
multipleRolesHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, mixedRolesHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, 1, upgradeStrategy, newHosts, inactiveHosts)
if err != nil {
logrus.Errorf("Failed to upgrade hosts: %v with error %v", strings.Join(multipleRolesHostsFailedToUpgrade, ","), err)
return errMsgMaxUnavailableNotFailed, err
}
log.Infof(ctx, "Now checking and upgrading worker components on nodes with only worker role %v at a time", maxUnavailable)
workerOnlyHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, workerOnlyHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, maxUnavailable, upgradeStrategy, newHosts)
if len(workerOnlyHosts) > 0 {
log.Infof(ctx, "Now checking and upgrading worker components on nodes with only worker role %v at a time", maxUnavailable)
}
workerOnlyHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, workerOnlyHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, maxUnavailable, upgradeStrategy, newHosts, inactiveHosts)
if err != nil {
logrus.Errorf("Failed to upgrade hosts: %v with error %v", strings.Join(workerOnlyHostsFailedToUpgrade, ","), err)
if len(workerOnlyHostsFailedToUpgrade) >= maxUnavailable {
@ -105,7 +108,7 @@ func CalculateMaxUnavailable(maxUnavailableVal string, numHosts int) (int, error
// In case there is only one node and rounding down maxUnvailable percentage led to 0
maxUnavailable = 1
}
logrus.Infof("%v worker nodes can be unavailable at a time", maxUnavailable)
logrus.Debugf("Parsed value of maxUnavailable: %v", maxUnavailable)
return maxUnavailable, nil
}
@ -119,8 +122,9 @@ func updateNewHostsList(kubeClient *kubernetes.Clientset, allHosts []*hosts.Host
}
}
func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Clientset, allHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string,
maxUnavailable int, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts map[string]bool) ([]string, error) {
func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Clientset, allHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory,
prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string,
maxUnavailable int, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool) ([]string, error) {
var errgrp errgroup.Group
var drainHelper drain.Helper
var failedHosts []string
@ -130,6 +134,12 @@ func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Cl
hostsQueue := util.GetObjectQueue(allHosts)
if upgradeStrategy.Drain {
drainHelper = getDrainHelper(kubeClient, *upgradeStrategy)
log.Infof(ctx, "[%s] Parameters provided to drain command: %#v", WorkerRole, fmt.Sprintf("Force: %v, IgnoreAllDaemonSets: %v, DeleteLocalData: %v, Timeout: %v, GracePeriodSeconds: %v", drainHelper.Force, drainHelper.IgnoreAllDaemonSets, drainHelper.DeleteLocalData, drainHelper.Timeout, drainHelper.GracePeriodSeconds))
}
currentHostsPool := make(map[string]bool)
for _, host := range allHosts {
currentHostsPool[host.HostnameOverride] = true
}
/* Each worker thread starts a goroutine that reads the hostsQueue channel in a for loop
Using same number of worker threads as maxUnavailable ensures only maxUnavailable number of nodes are being processed at a time
@ -149,14 +159,14 @@ func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Cl
}
continue
}
nodes, err := getNodeListForUpgrade(kubeClient, &hostsFailed, newHosts, true)
nodes, err := getNodeListForUpgrade(kubeClient, &hostsFailed, newHosts, inactiveHosts)
if err != nil {
errList = append(errList, err)
}
var maxUnavailableHit bool
for _, node := range nodes {
// in case any previously added nodes or till now unprocessed nodes become unreachable during upgrade
if !k8s.IsNodeReady(node) {
if !k8s.IsNodeReady(node) && currentHostsPool[node.Labels[k8s.HostnameLabel]] {
if len(hostsFailedToUpgrade) >= maxUnavailable {
maxUnavailableHit = true
break