1
0
mirror of https://github.com/rancher/rke.git synced 2025-04-27 19:25:44 +00:00

Attempt upgrade on NotReady hosts

This commit is contained in:
rajashree 2020-02-26 13:33:22 -08:00
parent ca3a3b1814
commit e27a05f8b1
10 changed files with 313 additions and 177 deletions

View File

@ -69,6 +69,7 @@ type Cluster struct {
EncryptionConfig encryptionConfig
NewHosts map[string]bool
MaxUnavailableForWorkerNodes int
MaxUnavailableForControlNodes int
HostsLabeledToIgnoreUpgrade map[string]bool
}
@ -109,10 +110,10 @@ const (
networkAddon = "network"
)
func (c *Cluster) DeployControlPlane(ctx context.Context, svcOptionData map[string]*v3.KubernetesServicesOptions, reconcileCluster bool) error {
func (c *Cluster) DeployControlPlane(ctx context.Context, svcOptionData map[string]*v3.KubernetesServicesOptions, reconcileCluster bool) (string, error) {
kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
if err != nil {
return fmt.Errorf("failed to initialize new kubernetes client: %v", err)
return "", fmt.Errorf("failed to initialize new kubernetes client: %v", err)
}
// Deploy Etcd Plane
@ -126,15 +127,19 @@ func (c *Cluster) DeployControlPlane(ctx context.Context, svcOptionData map[stri
log.Infof(ctx, "[etcd] External etcd connection string has been specified, skipping etcd plane")
} else {
if err := services.RunEtcdPlane(ctx, c.EtcdHosts, etcdNodePlanMap, c.LocalConnDialerFactory, c.PrivateRegistriesMap, c.UpdateWorkersOnly, c.SystemImages.Alpine, c.Services.Etcd, c.Certificates); err != nil {
return fmt.Errorf("[etcd] Failed to bring up Etcd Plane: %v", err)
return "", fmt.Errorf("[etcd] Failed to bring up Etcd Plane: %v", err)
}
}
// Deploy Control plane
cpNodePlanMap := make(map[string]v3.RKEConfigNodePlan)
// Build cp node plan map
var notReadyHosts []*hosts.Host
for _, cpHost := range c.ControlPlaneHosts {
cpNodePlanMap[cpHost.Address] = BuildRKEConfigNodePlan(ctx, c, cpHost, cpHost.DockerInfo, svcOptionData)
if err := services.CheckNodeReady(kubeClient, cpHost, services.ControlRole); err != nil {
notReadyHosts = append(notReadyHosts, cpHost)
}
}
if !reconcileCluster {
@ -145,12 +150,18 @@ func (c *Cluster) DeployControlPlane(ctx context.Context, svcOptionData map[stri
c.UpdateWorkersOnly,
c.SystemImages.Alpine,
c.Certificates); err != nil {
return fmt.Errorf("[controlPlane] Failed to bring up Control Plane: %v", err)
return "", fmt.Errorf("[controlPlane] Failed to bring up Control Plane: %v", err)
}
return nil
return "", nil
}
return c.UpgradeControlPlane(ctx, kubeClient, cpNodePlanMap, notReadyHosts)
}
func (c *Cluster) UpgradeControlPlane(ctx context.Context, kubeClient *kubernetes.Clientset, cpNodePlanMap map[string]v3.RKEConfigNodePlan, notReadyHosts []*hosts.Host) (string, error) {
inactiveHosts := make(map[string]bool)
var controlPlaneHosts []*hosts.Host
var notReadyHostNames []string
for _, host := range c.InactiveHosts {
if !c.HostsLabeledToIgnoreUpgrade[host.Address] {
inactiveHosts[host.HostnameOverride] = true
@ -161,16 +172,35 @@ func (c *Cluster) DeployControlPlane(ctx context.Context, svcOptionData map[stri
controlPlaneHosts = append(controlPlaneHosts, host)
}
}
if err := services.UpgradeControlPlaneNodes(ctx, kubeClient, controlPlaneHosts,
for _, host := range notReadyHosts {
notReadyHostNames = append(notReadyHostNames, host.HostnameOverride)
}
// attempt upgrade on NotReady hosts without respecting maxUnavailable
logrus.Infof("Attempting upgrade of controlplane components on following hosts in NotReady status: %v", strings.Join(notReadyHostNames, ","))
services.RunControlPlane(ctx, notReadyHosts,
c.LocalConnDialerFactory,
c.PrivateRegistriesMap,
cpNodePlanMap,
c.UpdateWorkersOnly,
c.SystemImages.Alpine,
c.Certificates, c.UpgradeStrategy, c.NewHosts, inactiveHosts); err != nil {
return fmt.Errorf("[controlPlane] Failed to upgrade Control Plane: %v", err)
c.Certificates)
for _, host := range notReadyHosts {
services.CheckNodeReady(kubeClient, host, services.ControlRole)
}
return nil
// rolling upgrade respecting maxUnavailable
errMsgMaxUnavailableNotFailed, err := services.UpgradeControlPlaneNodes(ctx, kubeClient, controlPlaneHosts,
c.LocalConnDialerFactory,
c.PrivateRegistriesMap,
cpNodePlanMap,
c.UpdateWorkersOnly,
c.SystemImages.Alpine,
c.Certificates, c.UpgradeStrategy, c.NewHosts, inactiveHosts, c.MaxUnavailableForControlNodes)
if err != nil {
return "", fmt.Errorf("[controlPlane] Failed to upgrade Control Plane: %v", err)
}
return errMsgMaxUnavailableNotFailed, nil
}
func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[string]*v3.KubernetesServicesOptions, reconcileCluster bool) (string, error) {
@ -182,12 +212,16 @@ func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[strin
// Deploy Worker plane
workerNodePlanMap := make(map[string]v3.RKEConfigNodePlan)
// Build cp node plan map
var notReadyHosts []*hosts.Host
allHosts := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts)
for _, host := range allHosts {
workerNodePlanMap[host.Address] = BuildRKEConfigNodePlan(ctx, c, host, host.DockerInfo, svcOptionData)
if host.IsControl || c.HostsLabeledToIgnoreUpgrade[host.Address] {
continue
}
if err := services.CheckNodeReady(kubeClient, host, services.WorkerRole); err != nil {
notReadyHosts = append(notReadyHosts, host)
}
if !host.IsEtcd {
// separating hosts with only worker role so they undergo upgrade in maxUnavailable batches
workerOnlyHosts = append(workerOnlyHosts, host)
@ -211,12 +245,32 @@ func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[strin
return "", nil
}
return c.UpgradeWorkerPlane(ctx, kubeClient, workerNodePlanMap, notReadyHosts, etcdAndWorkerHosts, workerOnlyHosts)
}
func (c *Cluster) UpgradeWorkerPlane(ctx context.Context, kubeClient *kubernetes.Clientset, workerNodePlanMap map[string]v3.RKEConfigNodePlan, notReadyHosts, etcdAndWorkerHosts, workerOnlyHosts []*hosts.Host) (string, error) {
inactiveHosts := make(map[string]bool)
var notReadyHostNames []string
for _, host := range c.InactiveHosts {
if !c.HostsLabeledToIgnoreUpgrade[host.Address] {
inactiveHosts[host.HostnameOverride] = true
}
}
for _, host := range notReadyHosts {
notReadyHostNames = append(notReadyHostNames, host.HostnameOverride)
}
// attempt upgrade on NotReady hosts without respecting maxUnavailable
logrus.Infof("Attempting upgrade of worker components on following hosts in NotReady status: %v", strings.Join(notReadyHostNames, ","))
services.RunWorkerPlane(ctx, notReadyHosts,
c.LocalConnDialerFactory,
c.PrivateRegistriesMap,
workerNodePlanMap,
c.Certificates,
c.UpdateWorkersOnly,
c.SystemImages.Alpine)
for _, host := range notReadyHosts {
services.CheckNodeReady(kubeClient, host, services.WorkerRole)
}
errMsgMaxUnavailableNotFailed, err := services.UpgradeWorkerPlaneForWorkerAndEtcdNodes(ctx, kubeClient, etcdAndWorkerHosts, workerOnlyHosts, inactiveHosts,
c.LocalConnDialerFactory,
c.PrivateRegistriesMap,

View File

@ -79,10 +79,11 @@ const (
DefaultKubeAPIArgAuditLogPathValue = "/var/log/kube-audit/audit-log.json"
DefaultKubeAPIArgAuditPolicyFileValue = "/etc/kubernetes/audit-policy.yaml"
DefaultMaxUnavailable = "10%"
DefaultNodeDrainTimeout = 120
DefaultNodeDrainGracePeriod = -1
DefaultNodeDrainIgnoreDaemonsets = true
DefaultMaxUnavailableWorker = "10%"
DefaultMaxUnavailableControlplane = "1"
DefaultNodeDrainTimeout = 120
DefaultNodeDrainGracePeriod = -1
DefaultNodeDrainIgnoreDaemonsets = true
)
var (
@ -220,14 +221,16 @@ func (c *Cluster) setClusterDefaults(ctx context.Context, flags ExternalFlags) e
func (c *Cluster) setNodeUpgradeStrategy() {
if c.UpgradeStrategy == nil {
// we need to escape the "%" at the end of "10%" here so its not interpreted
logrus.Debugf("No input provided for maxUnavailable, setting it to default value of %v", DefaultMaxUnavailable+"%")
logrus.Debugf("No input provided for maxUnavailableWorker, setting it to default value of %v percent", strings.TrimRight(DefaultMaxUnavailableWorker, "%"))
logrus.Debugf("No input provided for maxUnavailableControlplane, setting it to default value of %v", DefaultMaxUnavailableControlplane)
c.UpgradeStrategy = &v3.NodeUpgradeStrategy{
MaxUnavailable: DefaultMaxUnavailable,
MaxUnavailableWorker: DefaultMaxUnavailableWorker,
MaxUnavailableControlplane: DefaultMaxUnavailableControlplane,
}
return
}
setDefaultIfEmpty(&c.UpgradeStrategy.MaxUnavailable, DefaultMaxUnavailable)
setDefaultIfEmpty(&c.UpgradeStrategy.MaxUnavailableWorker, DefaultMaxUnavailableWorker)
setDefaultIfEmpty(&c.UpgradeStrategy.MaxUnavailableControlplane, DefaultMaxUnavailableControlplane)
if !c.UpgradeStrategy.Drain {
return
}

View File

@ -67,10 +67,10 @@ func (c *Cluster) TunnelHosts(ctx context.Context, flags ExternalFlags) error {
return ValidateHostCount(c)
}
func (c *Cluster) RemoveHostsLabeledToIgnoreUpgrade(ctx context.Context) {
func (c *Cluster) FindHostsLabeledToIgnoreUpgrade(ctx context.Context) {
kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
if err != nil {
logrus.Errorf("Error generating kube client in RemoveHostsLabeledToIgnoreUpgrade: %v", err)
logrus.Errorf("Error generating kube client in FindHostsLabeledToIgnoreUpgrade: %v", err)
return
}
var nodes *v1.NodeList
@ -152,6 +152,46 @@ func (c *Cluster) InvertIndexHosts() error {
return nil
}
func (c *Cluster) CalculateMaxUnavailable() (int, int, error) {
var inactiveControlPlaneHosts, inactiveWorkerHosts []string
var workerHosts, controlHosts, maxUnavailableWorker, maxUnavailableControl int
for _, host := range c.InactiveHosts {
if host.IsControl && !c.HostsLabeledToIgnoreUpgrade[host.Address] {
inactiveControlPlaneHosts = append(inactiveControlPlaneHosts, host.HostnameOverride)
}
if !host.IsWorker && !c.HostsLabeledToIgnoreUpgrade[host.Address] {
inactiveWorkerHosts = append(inactiveWorkerHosts, host.HostnameOverride)
}
// not breaking out of the loop so we can log all of the inactive hosts
}
for _, host := range c.WorkerHosts {
if c.HostsLabeledToIgnoreUpgrade[host.Address] {
continue
}
workerHosts++
}
// maxUnavailable should be calculated against all hosts provided in cluster.yml except the ones labelled to be ignored for upgrade
workerHosts += len(inactiveWorkerHosts)
maxUnavailableWorker, err := services.CalculateMaxUnavailable(c.UpgradeStrategy.MaxUnavailableWorker, workerHosts)
if err != nil {
return maxUnavailableWorker, maxUnavailableControl, err
}
for _, host := range c.ControlPlaneHosts {
if c.HostsLabeledToIgnoreUpgrade[host.Address] {
continue
}
controlHosts++
}
controlHosts += len(inactiveControlPlaneHosts)
maxUnavailableControl, err = services.CalculateMaxUnavailable(c.UpgradeStrategy.MaxUnavailableControlplane, controlHosts)
if err != nil {
return maxUnavailableWorker, maxUnavailableControl, err
}
return maxUnavailableWorker, maxUnavailableControl, nil
}
func (c *Cluster) getConsolidatedAdmissionConfiguration() (*v1alpha1.AdmissionConfiguration, error) {
var err error
var admissionConfig *v1alpha1.AdmissionConfiguration

View File

@ -196,40 +196,6 @@ func ValidateHostCount(c *Cluster) error {
return nil
}
func (c *Cluster) ValidateHostCountForUpgradeAndCalculateMaxUnavailable() (int, error) {
var inactiveControlPlaneHosts, inactiveWorkerOnlyHosts []string
var workerOnlyHosts, maxUnavailable int
for _, host := range c.InactiveHosts {
if host.IsControl && !c.HostsLabeledToIgnoreUpgrade[host.Address] {
inactiveControlPlaneHosts = append(inactiveControlPlaneHosts, host.HostnameOverride)
}
if !host.IsEtcd && !host.IsControl && !c.HostsLabeledToIgnoreUpgrade[host.Address] {
inactiveWorkerOnlyHosts = append(inactiveWorkerOnlyHosts, host.HostnameOverride)
}
// not breaking out of the loop so we can log all of the inactive hosts
}
if len(inactiveControlPlaneHosts) >= 1 {
return maxUnavailable, fmt.Errorf("cannot proceed with upgrade of controlplane if one or more controlplane hosts are inactive; found inactive hosts: %v", strings.Join(inactiveControlPlaneHosts, ","))
}
for _, host := range c.WorkerHosts {
if host.IsControl || host.IsEtcd || c.HostsLabeledToIgnoreUpgrade[host.Address] {
continue
}
workerOnlyHosts++
}
// maxUnavailable should be calculated against all hosts provided in cluster.yml except the ones labelled to be ignored for upgrade
workerOnlyHosts += len(inactiveWorkerOnlyHosts)
maxUnavailable, err := services.CalculateMaxUnavailable(c.UpgradeStrategy.MaxUnavailable, workerOnlyHosts)
if err != nil {
return maxUnavailable, err
}
if len(inactiveWorkerOnlyHosts) >= maxUnavailable {
return maxUnavailable, fmt.Errorf("cannot proceed with upgrade of worker components since %v (>=maxUnavailable) hosts are inactive; found inactive hosts: %v", len(inactiveWorkerOnlyHosts), strings.Join(inactiveWorkerOnlyHosts, ","))
}
return maxUnavailable, nil
}
func validateDuplicateNodes(c *Cluster) error {
for i := range c.Nodes {
for j := range c.Nodes {

View File

@ -185,7 +185,7 @@ func rebuildClusterWithRotatedCertificates(ctx context.Context,
}
if isLegacyKubeAPI {
log.Infof(ctx, "[controlplane] Redeploying controlplane to update kubeapi parameters")
if err := kubeCluster.DeployControlPlane(ctx, svcOptionData, true); err != nil {
if _, err := kubeCluster.DeployControlPlane(ctx, svcOptionData, true); err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
}

View File

@ -117,34 +117,6 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
if currentCluster != nil {
// reconcile this cluster, to check if upgrade is needed, or new nodes are getting added/removed
/*This is to separate newly added nodes, so we don't try to check their status/cordon them before upgrade.
This will also cover nodes that were considered inactive first time cluster was provisioned, but are now active during upgrade*/
currentClusterNodes := make(map[string]bool)
for _, node := range clusterState.CurrentState.RancherKubernetesEngineConfig.Nodes {
currentClusterNodes[node.HostnameOverride] = true
}
newNodes := make(map[string]bool)
for _, node := range clusterState.DesiredState.RancherKubernetesEngineConfig.Nodes {
if !currentClusterNodes[node.HostnameOverride] {
newNodes[node.HostnameOverride] = true
}
}
kubeCluster.NewHosts = newNodes
reconcileCluster = true
kubeCluster.RemoveHostsLabeledToIgnoreUpgrade(ctx)
maxUnavailable, err := kubeCluster.ValidateHostCountForUpgradeAndCalculateMaxUnavailable()
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
logrus.Infof("Setting maxUnavailable for worker nodes to: %v", maxUnavailable)
kubeCluster.MaxUnavailableForWorkerNodes = maxUnavailable
}
if !flags.DisablePortCheck {
if err = kubeCluster.CheckClusterPorts(ctx, currentCluster); err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
@ -173,6 +145,34 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c
return APIURL, caCrt, clientCert, clientKey, nil, err
}
if currentCluster != nil {
// reconcile this cluster, to check if upgrade is needed, or new nodes are getting added/removed
/*This is to separate newly added nodes, so we don't try to check their status/cordon them before upgrade.
This will also cover nodes that were considered inactive first time cluster was provisioned, but are now active during upgrade*/
currentClusterNodes := make(map[string]bool)
for _, node := range clusterState.CurrentState.RancherKubernetesEngineConfig.Nodes {
currentClusterNodes[node.HostnameOverride] = true
}
newNodes := make(map[string]bool)
for _, node := range clusterState.DesiredState.RancherKubernetesEngineConfig.Nodes {
if !currentClusterNodes[node.HostnameOverride] {
newNodes[node.HostnameOverride] = true
}
}
kubeCluster.NewHosts = newNodes
reconcileCluster = true
kubeCluster.FindHostsLabeledToIgnoreUpgrade(ctx)
maxUnavailableWorker, maxUnavailableControl, err := kubeCluster.CalculateMaxUnavailable()
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
logrus.Infof("Setting maxUnavailable for worker nodes to: %v", maxUnavailableWorker)
logrus.Infof("Setting maxUnavailable for control nodes to: %v", maxUnavailableControl)
kubeCluster.MaxUnavailableForWorkerNodes, kubeCluster.MaxUnavailableForControlNodes = maxUnavailableWorker, maxUnavailableControl
}
// update APIURL after reconcile
if len(kubeCluster.ControlPlaneHosts) > 0 {
APIURL = fmt.Sprintf("https://%s:6443", kubeCluster.ControlPlaneHosts[0].Address)
@ -185,7 +185,7 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c
return APIURL, caCrt, clientCert, clientKey, nil, err
}
err = kubeCluster.DeployControlPlane(ctx, svcOptionsData, reconcileCluster)
errMsgMaxUnavailableNotFailedCtrl, err := kubeCluster.DeployControlPlane(ctx, svcOptionsData, reconcileCluster)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
@ -205,7 +205,7 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c
return APIURL, caCrt, clientCert, clientKey, nil, err
}
errMsgMaxUnavailableNotFailed, err := kubeCluster.DeployWorkerPlane(ctx, svcOptionsData, reconcileCluster)
errMsgMaxUnavailableNotFailedWrkr, err := kubeCluster.DeployWorkerPlane(ctx, svcOptionsData, reconcileCluster)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
@ -233,8 +233,8 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c
return APIURL, caCrt, clientCert, clientKey, nil, err
}
if errMsgMaxUnavailableNotFailed != "" {
return APIURL, caCrt, clientCert, clientKey, nil, fmt.Errorf(errMsgMaxUnavailableNotFailed)
if errMsgMaxUnavailableNotFailedCtrl != "" || errMsgMaxUnavailableNotFailedWrkr != "" {
return APIURL, caCrt, clientCert, clientKey, nil, fmt.Errorf(errMsgMaxUnavailableNotFailedCtrl + errMsgMaxUnavailableNotFailedWrkr)
}
log.Infof(ctx, "Finished building Kubernetes cluster successfully")
return APIURL, caCrt, clientCert, clientKey, kubeCluster.Certificates, nil

View File

@ -50,12 +50,12 @@ func RunControlPlane(ctx context.Context, controlHosts []*hosts.Host, localConnD
func UpgradeControlPlaneNodes(ctx context.Context, kubeClient *kubernetes.Clientset, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory,
prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string, certMap map[string]pki.CertificatePKI,
upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool) error {
upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool, maxUnavailable int) (string, error) {
if updateWorkersOnly {
return nil
return "", nil
}
var errMsgMaxUnavailableNotFailed string
var drainHelper drain.Helper
log.Infof(ctx, "[%s] Processing controlplane hosts for upgrade one at a time", ControlRole)
if len(newHosts) > 0 {
var nodes []string
@ -72,67 +72,133 @@ func UpgradeControlPlaneNodes(ctx context.Context, kubeClient *kubernetes.Client
drainHelper = getDrainHelper(kubeClient, *upgradeStrategy)
log.Infof(ctx, "[%s] Parameters provided to drain command: %#v", ControlRole, fmt.Sprintf("Force: %v, IgnoreAllDaemonSets: %v, DeleteLocalData: %v, Timeout: %v, GracePeriodSeconds: %v", drainHelper.Force, drainHelper.IgnoreAllDaemonSets, drainHelper.DeleteLocalData, drainHelper.Timeout, drainHelper.GracePeriodSeconds))
}
maxUnavailable = resetMaxUnavailable(maxUnavailable, len(inactiveHosts))
hostsFailedToUpgrade, err := processControlPlaneForUpgrade(ctx, kubeClient, controlHosts, localConnDialerFactory, prsMap, cpNodePlanMap, updateWorkersOnly, alpineImage, certMap,
upgradeStrategy, newHosts, inactiveHosts, maxUnavailable, drainHelper)
if err != nil {
logrus.Errorf("Failed to upgrade hosts: %v with error %v", strings.Join(hostsFailedToUpgrade, ","), err)
if len(hostsFailedToUpgrade) >= maxUnavailable {
return errMsgMaxUnavailableNotFailed, err
}
errMsgMaxUnavailableNotFailed = fmt.Sprintf("Failed to upgrade hosts: %v with error %v", strings.Join(hostsFailedToUpgrade, ","), err)
}
log.Infof(ctx, "[%s] Successfully upgraded Controller Plane..", ControlRole)
return errMsgMaxUnavailableNotFailed, nil
}
func processControlPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Clientset, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory,
prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string, certMap map[string]pki.CertificatePKI,
upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool, maxUnavailable int, drainHelper drain.Helper) ([]string, error) {
var errgrp errgroup.Group
var failedHosts []string
var hostsFailedToUpgrade = make(chan string, maxUnavailable)
var hostsFailed sync.Map
currentHostsPool := make(map[string]bool)
for _, host := range controlHosts {
currentHostsPool[host.HostnameOverride] = true
}
// upgrade control plane hosts one at a time for zero downtime upgrades
for _, host := range controlHosts {
log.Infof(ctx, "Processing controlplane host %v", host.HostnameOverride)
if newHosts[host.HostnameOverride] {
if err := doDeployControlHost(ctx, host, localConnDialerFactory, prsMap, cpNodePlanMap[host.Address].Processes, alpineImage, certMap); err != nil {
return err
// upgrade control plane hosts maxUnavailable nodes time for zero downtime upgrades
hostsQueue := util.GetObjectQueue(controlHosts)
for w := 0; w < maxUnavailable; w++ {
errgrp.Go(func() error {
var errList []error
for host := range hostsQueue {
runHost := host.(*hosts.Host)
log.Infof(ctx, "Processing controlplane host %v", runHost.HostnameOverride)
if newHosts[runHost.HostnameOverride] {
if err := startNewControlHost(ctx, runHost, localConnDialerFactory, prsMap, cpNodePlanMap, updateWorkersOnly, alpineImage, certMap); err != nil {
errList = append(errList, err)
hostsFailedToUpgrade <- runHost.HostnameOverride
hostsFailed.Store(runHost.HostnameOverride, true)
break
}
continue
}
if err := CheckNodeReady(kubeClient, runHost, ControlRole); err != nil {
errList = append(errList, err)
hostsFailedToUpgrade <- runHost.HostnameOverride
hostsFailed.Store(runHost.HostnameOverride, true)
break
}
nodes, err := getNodeListForUpgrade(kubeClient, &sync.Map{}, newHosts, inactiveHosts, ControlRole)
if err != nil {
errList = append(errList, err)
}
var maxUnavailableHit bool
for _, node := range nodes {
// in case any previously added nodes or till now unprocessed nodes become unreachable during upgrade
if !k8s.IsNodeReady(node) && currentHostsPool[node.Labels[k8s.HostnameLabel]] {
if len(hostsFailedToUpgrade) >= maxUnavailable {
maxUnavailableHit = true
break
}
hostsFailed.Store(node.Labels[k8s.HostnameLabel], true)
hostsFailedToUpgrade <- node.Labels[k8s.HostnameLabel]
errList = append(errList, fmt.Errorf("host %v not ready", node.Labels[k8s.HostnameLabel]))
}
}
if maxUnavailableHit || len(hostsFailedToUpgrade) >= maxUnavailable {
break
}
controlPlaneUpgradable, workerPlaneUpgradable, err := checkHostUpgradable(ctx, runHost, cpNodePlanMap)
if err != nil {
errList = append(errList, err)
hostsFailedToUpgrade <- runHost.HostnameOverride
hostsFailed.Store(runHost.HostnameOverride, true)
break
}
if !controlPlaneUpgradable && !workerPlaneUpgradable {
log.Infof(ctx, "Upgrade not required for controlplane and worker components of host %v", runHost.HostnameOverride)
continue
}
if err := upgradeControlHost(ctx, kubeClient, runHost, upgradeStrategy.Drain, drainHelper, localConnDialerFactory, prsMap, cpNodePlanMap, updateWorkersOnly, alpineImage, certMap, controlPlaneUpgradable, workerPlaneUpgradable); err != nil {
errList = append(errList, err)
hostsFailedToUpgrade <- runHost.HostnameOverride
hostsFailed.Store(runHost.HostnameOverride, true)
break
}
}
if err := doDeployWorkerPlaneHost(ctx, host, localConnDialerFactory, prsMap, cpNodePlanMap[host.Address].Processes, certMap, updateWorkersOnly, alpineImage); err != nil {
return err
}
continue
}
nodes, err := getNodeListForUpgrade(kubeClient, &sync.Map{}, newHosts, inactiveHosts)
if err != nil {
return err
}
var maxUnavailableHit bool
var nodeNotReady string
for _, node := range nodes {
// in case any previously added nodes or till now unprocessed nodes become unreachable during upgrade
if !k8s.IsNodeReady(node) && currentHostsPool[node.Labels[k8s.HostnameLabel]] {
maxUnavailableHit = true
nodeNotReady = node.Labels[k8s.HostnameLabel]
break
}
}
if maxUnavailableHit {
return fmt.Errorf("maxUnavailable limit hit for controlplane since node %v is in NotReady state", nodeNotReady)
}
controlPlaneUpgradable, err := isControlPlaneHostUpgradable(ctx, host, cpNodePlanMap[host.Address].Processes)
if err != nil {
return err
}
workerPlaneUpgradable, err := isWorkerHostUpgradable(ctx, host, cpNodePlanMap[host.Address].Processes)
if err != nil {
return err
}
if !controlPlaneUpgradable && !workerPlaneUpgradable {
log.Infof(ctx, "Upgrade not required for controlplane and worker components of host %v", host.HostnameOverride)
continue
}
if err := upgradeControlHost(ctx, kubeClient, host, upgradeStrategy.Drain, drainHelper, localConnDialerFactory, prsMap, cpNodePlanMap, updateWorkersOnly, alpineImage, certMap, controlPlaneUpgradable, workerPlaneUpgradable); err != nil {
return err
return util.ErrList(errList)
})
}
err := errgrp.Wait()
close(hostsFailedToUpgrade)
if err != nil {
for host := range hostsFailedToUpgrade {
failedHosts = append(failedHosts, host)
}
}
log.Infof(ctx, "[%s] Successfully upgraded Controller Plane..", ControlRole)
return failedHosts, err
}
func startNewControlHost(ctx context.Context, runHost *hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry,
cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string, certMap map[string]pki.CertificatePKI) error {
if err := doDeployControlHost(ctx, runHost, localConnDialerFactory, prsMap, cpNodePlanMap[runHost.Address].Processes, alpineImage, certMap); err != nil {
return err
}
if err := doDeployWorkerPlaneHost(ctx, runHost, localConnDialerFactory, prsMap, cpNodePlanMap[runHost.Address].Processes, certMap, updateWorkersOnly, alpineImage); err != nil {
return err
}
return nil
}
func checkHostUpgradable(ctx context.Context, runHost *hosts.Host, cpNodePlanMap map[string]v3.RKEConfigNodePlan) (bool, bool, error) {
var controlPlaneUpgradable, workerPlaneUpgradable bool
controlPlaneUpgradable, err := isControlPlaneHostUpgradable(ctx, runHost, cpNodePlanMap[runHost.Address].Processes)
if err != nil {
return controlPlaneUpgradable, workerPlaneUpgradable, err
}
workerPlaneUpgradable, err = isWorkerHostUpgradable(ctx, runHost, cpNodePlanMap[runHost.Address].Processes)
if err != nil {
return controlPlaneUpgradable, workerPlaneUpgradable, err
}
return controlPlaneUpgradable, workerPlaneUpgradable, nil
}
func upgradeControlHost(ctx context.Context, kubeClient *kubernetes.Clientset, host *hosts.Host, drain bool, drainHelper drain.Helper,
localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool,
alpineImage string, certMap map[string]pki.CertificatePKI, controlPlaneUpgradable, workerPlaneUpgradable bool) error {
if err := checkNodeReady(kubeClient, host, ControlRole); err != nil {
return err
}
if err := cordonAndDrainNode(kubeClient, host, drain, drainHelper, ControlRole); err != nil {
return err
}
@ -149,7 +215,7 @@ func upgradeControlHost(ctx context.Context, kubeClient *kubernetes.Clientset, h
}
}
if err := checkNodeReady(kubeClient, host, ControlRole); err != nil {
if err := CheckNodeReady(kubeClient, host, ControlRole); err != nil {
return err
}
if err := k8s.CordonUncordon(kubeClient, host.HostnameOverride, false); err != nil {

View File

@ -12,7 +12,6 @@ import (
)
func runKubeAPI(ctx context.Context, host *hosts.Host, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, kubeAPIProcess v3.Process, alpineImage string, certMap map[string]pki.CertificatePKI) error {
imageCfg, hostCfg, healthCheckURL := GetProcessConfig(kubeAPIProcess, host)
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeAPIContainerName, host.Address, ControlRole, prsMap); err != nil {
return err

View File

@ -11,13 +11,14 @@ import (
v3 "github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
k8sutil "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"k8s.io/kubectl/pkg/drain"
)
func checkNodeReady(kubeClient *kubernetes.Clientset, runHost *hosts.Host, component string) error {
func CheckNodeReady(kubeClient *kubernetes.Clientset, runHost *hosts.Host, component string) error {
for retries := 0; retries < k8s.MaxRetries; retries++ {
logrus.Debugf("[%s] Now checking status of node %v", component, runHost.HostnameOverride)
logrus.Infof("[%s] Now checking status of node %v", component, runHost.HostnameOverride)
k8sNode, err := k8s.GetNode(kubeClient, runHost.HostnameOverride)
if err != nil {
return fmt.Errorf("[%s] Error getting node %v: %v", component, runHost.HostnameOverride, err)
@ -60,12 +61,13 @@ func getDrainHelper(kubeClient *kubernetes.Clientset, upgradeStrategy v3.NodeUpg
return drainHelper
}
func getNodeListForUpgrade(kubeClient *kubernetes.Clientset, hostsFailed *sync.Map, newHosts, inactiveHosts map[string]bool) ([]v1.Node, error) {
func getNodeListForUpgrade(kubeClient *kubernetes.Clientset, hostsFailed *sync.Map, newHosts, inactiveHosts map[string]bool, component string) ([]v1.Node, error) {
var nodeList []v1.Node
nodes, err := k8s.GetNodeList(kubeClient)
if err != nil {
return nodeList, err
}
logrus.Infof("[%s] Getting list of nodes for upgrade", component)
for _, node := range nodes.Items {
if _, ok := hostsFailed.Load(node.Labels[k8s.HostnameLabel]); ok {
continue
@ -84,3 +86,36 @@ func getNodeListForUpgrade(kubeClient *kubernetes.Clientset, hostsFailed *sync.M
}
return nodeList, nil
}
func CalculateMaxUnavailable(maxUnavailableVal string, numHosts int) (int, error) {
// if maxUnavailable is given in percent, round down
maxUnavailableParsed := k8sutil.Parse(maxUnavailableVal)
logrus.Debugf("Provided value for maxUnavailable: %v", maxUnavailableParsed)
maxUnavailable, err := k8sutil.GetValueFromIntOrPercent(&maxUnavailableParsed, numHosts, false)
if err != nil {
logrus.Errorf("Unable to parse max_unavailable, should be a number or percentage of nodes, error: %v", err)
return 0, err
}
if maxUnavailable == 0 {
// In case there is only one node and rounding down maxUnvailable percentage led to 0
maxUnavailable = 1
}
logrus.Debugf("Parsed value of maxUnavailable: %v", maxUnavailable)
return maxUnavailable, nil
}
func resetMaxUnavailable(maxUnavailable, lenInactiveHosts int) int {
if maxUnavailable > WorkerThreads {
/* upgrading a large number of nodes in parallel leads to a large number of goroutines, which has led to errors regarding too many open sockets
Because of this RKE switched to using workerpools. 50 workerthreads has been sufficient to optimize rke up, upgrading at most 50 nodes in parallel.
So the user configurable maxUnavailable will be respected only as long as it's less than 50 and capped at 50 */
maxUnavailable = WorkerThreads
logrus.Info("Resetting maxUnavailable to 50, to avoid issues related to upgrading large number of nodes in parallel")
}
if lenInactiveHosts > 0 {
maxUnavailable -= lenInactiveHosts
logrus.Infof("Resetting maxUnavailable to %v since %v host(s) are found to be inactive/unavailable prior to upgrade", maxUnavailable, lenInactiveHosts)
}
return maxUnavailable
}

View File

@ -17,7 +17,6 @@ import (
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
apierrors "k8s.io/apimachinery/pkg/api/errors"
k8sutil "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"k8s.io/kubectl/pkg/drain"
)
@ -56,19 +55,7 @@ func RunWorkerPlane(ctx context.Context, allHosts []*hosts.Host, localConnDialer
func UpgradeWorkerPlaneForWorkerAndEtcdNodes(ctx context.Context, kubeClient *kubernetes.Clientset, mixedRolesHosts []*hosts.Host, workerOnlyHosts []*hosts.Host, inactiveHosts map[string]bool, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts map[string]bool, maxUnavailable int) (string, error) {
log.Infof(ctx, "[%s] Upgrading Worker Plane..", WorkerRole)
var errMsgMaxUnavailableNotFailed string
if maxUnavailable > WorkerThreads {
/* upgrading a large number of nodes in parallel leads to a large number of goroutines, which has led to errors regarding too many open sockets
Because of this RKE switched to using workerpools. 50 workerthreads has been sufficient to optimize rke up, upgrading at most 50 nodes in parallel.
So the user configurable maxUnavailable will be respected only as long as it's less than 50 and capped at 50 */
maxUnavailable = WorkerThreads
logrus.Info("Setting maxUnavailable to 50, to avoid issues related to upgrading large number of nodes in parallel")
}
if len(inactiveHosts) > 0 {
maxUnavailable -= len(inactiveHosts)
logrus.Infof("Resetting maxUnavailable to %v since %v host(s) are found to be inactive/unavailable prior to upgrade", maxUnavailable, len(inactiveHosts))
}
maxUnavailable = resetMaxUnavailable(maxUnavailable, len(inactiveHosts))
updateNewHostsList(kubeClient, append(mixedRolesHosts, workerOnlyHosts...), newHosts)
if len(mixedRolesHosts) > 0 {
log.Infof(ctx, "First checking and processing worker components for upgrades on nodes with etcd role one at a time")
@ -95,23 +82,6 @@ func UpgradeWorkerPlaneForWorkerAndEtcdNodes(ctx context.Context, kubeClient *ku
return errMsgMaxUnavailableNotFailed, nil
}
func CalculateMaxUnavailable(maxUnavailableVal string, numHosts int) (int, error) {
// if maxUnavailable is given in percent, round down
maxUnavailableParsed := k8sutil.Parse(maxUnavailableVal)
logrus.Debugf("Provided value for maxUnavailable: %v", maxUnavailableParsed)
maxUnavailable, err := k8sutil.GetValueFromIntOrPercent(&maxUnavailableParsed, numHosts, false)
if err != nil {
logrus.Errorf("Unable to parse max_unavailable, should be a number or percentage of nodes, error: %v", err)
return 0, err
}
if maxUnavailable == 0 {
// In case there is only one node and rounding down maxUnvailable percentage led to 0
maxUnavailable = 1
}
logrus.Debugf("Parsed value of maxUnavailable: %v", maxUnavailable)
return maxUnavailable, nil
}
func updateNewHostsList(kubeClient *kubernetes.Clientset, allHosts []*hosts.Host, newHosts map[string]bool) {
for _, h := range allHosts {
_, err := k8s.GetNode(kubeClient, h.HostnameOverride)
@ -159,7 +129,13 @@ func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Cl
}
continue
}
nodes, err := getNodeListForUpgrade(kubeClient, &hostsFailed, newHosts, inactiveHosts)
if err := CheckNodeReady(kubeClient, runHost, WorkerRole); err != nil {
errList = append(errList, err)
hostsFailed.Store(runHost.HostnameOverride, true)
hostsFailedToUpgrade <- runHost.HostnameOverride
break
}
nodes, err := getNodeListForUpgrade(kubeClient, &hostsFailed, newHosts, inactiveHosts, WorkerRole)
if err != nil {
errList = append(errList, err)
}
@ -214,9 +190,6 @@ func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Cl
func upgradeWorkerHost(ctx context.Context, kubeClient *kubernetes.Clientset, runHost *hosts.Host, drainFlag bool, drainHelper drain.Helper,
localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool,
alpineImage string) error {
if err := checkNodeReady(kubeClient, runHost, WorkerRole); err != nil {
return err
}
// cordon and drain
if err := cordonAndDrainNode(kubeClient, runHost, drainFlag, drainHelper, WorkerRole); err != nil {
return err
@ -226,7 +199,7 @@ func upgradeWorkerHost(ctx context.Context, kubeClient *kubernetes.Clientset, ru
return err
}
// consider upgrade done when kubeclient lists node as ready
if err := checkNodeReady(kubeClient, runHost, WorkerRole); err != nil {
if err := CheckNodeReady(kubeClient, runHost, WorkerRole); err != nil {
return err
}
// uncordon node