diff --git a/cluster/cluster.go b/cluster/cluster.go index 157cca2c..cda6f970 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -69,6 +69,7 @@ type Cluster struct { EncryptionConfig encryptionConfig NewHosts map[string]bool MaxUnavailableForWorkerNodes int + MaxUnavailableForControlNodes int HostsLabeledToIgnoreUpgrade map[string]bool } @@ -109,10 +110,10 @@ const ( networkAddon = "network" ) -func (c *Cluster) DeployControlPlane(ctx context.Context, svcOptionData map[string]*v3.KubernetesServicesOptions, reconcileCluster bool) error { +func (c *Cluster) DeployControlPlane(ctx context.Context, svcOptionData map[string]*v3.KubernetesServicesOptions, reconcileCluster bool) (string, error) { kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport) if err != nil { - return fmt.Errorf("failed to initialize new kubernetes client: %v", err) + return "", fmt.Errorf("failed to initialize new kubernetes client: %v", err) } // Deploy Etcd Plane @@ -126,15 +127,19 @@ func (c *Cluster) DeployControlPlane(ctx context.Context, svcOptionData map[stri log.Infof(ctx, "[etcd] External etcd connection string has been specified, skipping etcd plane") } else { if err := services.RunEtcdPlane(ctx, c.EtcdHosts, etcdNodePlanMap, c.LocalConnDialerFactory, c.PrivateRegistriesMap, c.UpdateWorkersOnly, c.SystemImages.Alpine, c.Services.Etcd, c.Certificates); err != nil { - return fmt.Errorf("[etcd] Failed to bring up Etcd Plane: %v", err) + return "", fmt.Errorf("[etcd] Failed to bring up Etcd Plane: %v", err) } } // Deploy Control plane cpNodePlanMap := make(map[string]v3.RKEConfigNodePlan) // Build cp node plan map + var notReadyHosts []*hosts.Host for _, cpHost := range c.ControlPlaneHosts { cpNodePlanMap[cpHost.Address] = BuildRKEConfigNodePlan(ctx, c, cpHost, cpHost.DockerInfo, svcOptionData) + if err := services.CheckNodeReady(kubeClient, cpHost, services.ControlRole); err != nil { + notReadyHosts = append(notReadyHosts, cpHost) + } } if !reconcileCluster { @@ -145,12 +150,18 @@ func (c *Cluster) DeployControlPlane(ctx context.Context, svcOptionData map[stri c.UpdateWorkersOnly, c.SystemImages.Alpine, c.Certificates); err != nil { - return fmt.Errorf("[controlPlane] Failed to bring up Control Plane: %v", err) + return "", fmt.Errorf("[controlPlane] Failed to bring up Control Plane: %v", err) } - return nil + return "", nil } + return c.UpgradeControlPlane(ctx, kubeClient, cpNodePlanMap, notReadyHosts) +} + +func (c *Cluster) UpgradeControlPlane(ctx context.Context, kubeClient *kubernetes.Clientset, cpNodePlanMap map[string]v3.RKEConfigNodePlan, notReadyHosts []*hosts.Host) (string, error) { inactiveHosts := make(map[string]bool) var controlPlaneHosts []*hosts.Host + var notReadyHostNames []string + for _, host := range c.InactiveHosts { if !c.HostsLabeledToIgnoreUpgrade[host.Address] { inactiveHosts[host.HostnameOverride] = true @@ -161,16 +172,35 @@ func (c *Cluster) DeployControlPlane(ctx context.Context, svcOptionData map[stri controlPlaneHosts = append(controlPlaneHosts, host) } } - if err := services.UpgradeControlPlaneNodes(ctx, kubeClient, controlPlaneHosts, + + for _, host := range notReadyHosts { + notReadyHostNames = append(notReadyHostNames, host.HostnameOverride) + } + // attempt upgrade on NotReady hosts without respecting maxUnavailable + logrus.Infof("Attempting upgrade of controlplane components on following hosts in NotReady status: %v", strings.Join(notReadyHostNames, ",")) + + services.RunControlPlane(ctx, notReadyHosts, c.LocalConnDialerFactory, c.PrivateRegistriesMap, cpNodePlanMap, c.UpdateWorkersOnly, c.SystemImages.Alpine, - c.Certificates, c.UpgradeStrategy, c.NewHosts, inactiveHosts); err != nil { - return fmt.Errorf("[controlPlane] Failed to upgrade Control Plane: %v", err) + c.Certificates) + for _, host := range notReadyHosts { + services.CheckNodeReady(kubeClient, host, services.ControlRole) } - return nil + // rolling upgrade respecting maxUnavailable + errMsgMaxUnavailableNotFailed, err := services.UpgradeControlPlaneNodes(ctx, kubeClient, controlPlaneHosts, + c.LocalConnDialerFactory, + c.PrivateRegistriesMap, + cpNodePlanMap, + c.UpdateWorkersOnly, + c.SystemImages.Alpine, + c.Certificates, c.UpgradeStrategy, c.NewHosts, inactiveHosts, c.MaxUnavailableForControlNodes) + if err != nil { + return "", fmt.Errorf("[controlPlane] Failed to upgrade Control Plane: %v", err) + } + return errMsgMaxUnavailableNotFailed, nil } func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[string]*v3.KubernetesServicesOptions, reconcileCluster bool) (string, error) { @@ -182,12 +212,16 @@ func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[strin // Deploy Worker plane workerNodePlanMap := make(map[string]v3.RKEConfigNodePlan) // Build cp node plan map + var notReadyHosts []*hosts.Host allHosts := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts) for _, host := range allHosts { workerNodePlanMap[host.Address] = BuildRKEConfigNodePlan(ctx, c, host, host.DockerInfo, svcOptionData) if host.IsControl || c.HostsLabeledToIgnoreUpgrade[host.Address] { continue } + if err := services.CheckNodeReady(kubeClient, host, services.WorkerRole); err != nil { + notReadyHosts = append(notReadyHosts, host) + } if !host.IsEtcd { // separating hosts with only worker role so they undergo upgrade in maxUnavailable batches workerOnlyHosts = append(workerOnlyHosts, host) @@ -211,12 +245,32 @@ func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[strin return "", nil } + return c.UpgradeWorkerPlane(ctx, kubeClient, workerNodePlanMap, notReadyHosts, etcdAndWorkerHosts, workerOnlyHosts) +} + +func (c *Cluster) UpgradeWorkerPlane(ctx context.Context, kubeClient *kubernetes.Clientset, workerNodePlanMap map[string]v3.RKEConfigNodePlan, notReadyHosts, etcdAndWorkerHosts, workerOnlyHosts []*hosts.Host) (string, error) { inactiveHosts := make(map[string]bool) + var notReadyHostNames []string for _, host := range c.InactiveHosts { if !c.HostsLabeledToIgnoreUpgrade[host.Address] { inactiveHosts[host.HostnameOverride] = true } } + for _, host := range notReadyHosts { + notReadyHostNames = append(notReadyHostNames, host.HostnameOverride) + } + // attempt upgrade on NotReady hosts without respecting maxUnavailable + logrus.Infof("Attempting upgrade of worker components on following hosts in NotReady status: %v", strings.Join(notReadyHostNames, ",")) + services.RunWorkerPlane(ctx, notReadyHosts, + c.LocalConnDialerFactory, + c.PrivateRegistriesMap, + workerNodePlanMap, + c.Certificates, + c.UpdateWorkersOnly, + c.SystemImages.Alpine) + for _, host := range notReadyHosts { + services.CheckNodeReady(kubeClient, host, services.WorkerRole) + } errMsgMaxUnavailableNotFailed, err := services.UpgradeWorkerPlaneForWorkerAndEtcdNodes(ctx, kubeClient, etcdAndWorkerHosts, workerOnlyHosts, inactiveHosts, c.LocalConnDialerFactory, c.PrivateRegistriesMap, diff --git a/cluster/defaults.go b/cluster/defaults.go index f5ad8b68..c747f594 100644 --- a/cluster/defaults.go +++ b/cluster/defaults.go @@ -79,10 +79,11 @@ const ( DefaultKubeAPIArgAuditLogPathValue = "/var/log/kube-audit/audit-log.json" DefaultKubeAPIArgAuditPolicyFileValue = "/etc/kubernetes/audit-policy.yaml" - DefaultMaxUnavailable = "10%" - DefaultNodeDrainTimeout = 120 - DefaultNodeDrainGracePeriod = -1 - DefaultNodeDrainIgnoreDaemonsets = true + DefaultMaxUnavailableWorker = "10%" + DefaultMaxUnavailableControlplane = "1" + DefaultNodeDrainTimeout = 120 + DefaultNodeDrainGracePeriod = -1 + DefaultNodeDrainIgnoreDaemonsets = true ) var ( @@ -220,14 +221,16 @@ func (c *Cluster) setClusterDefaults(ctx context.Context, flags ExternalFlags) e func (c *Cluster) setNodeUpgradeStrategy() { if c.UpgradeStrategy == nil { - // we need to escape the "%" at the end of "10%" here so its not interpreted - logrus.Debugf("No input provided for maxUnavailable, setting it to default value of %v", DefaultMaxUnavailable+"%") + logrus.Debugf("No input provided for maxUnavailableWorker, setting it to default value of %v percent", strings.TrimRight(DefaultMaxUnavailableWorker, "%")) + logrus.Debugf("No input provided for maxUnavailableControlplane, setting it to default value of %v", DefaultMaxUnavailableControlplane) c.UpgradeStrategy = &v3.NodeUpgradeStrategy{ - MaxUnavailable: DefaultMaxUnavailable, + MaxUnavailableWorker: DefaultMaxUnavailableWorker, + MaxUnavailableControlplane: DefaultMaxUnavailableControlplane, } return } - setDefaultIfEmpty(&c.UpgradeStrategy.MaxUnavailable, DefaultMaxUnavailable) + setDefaultIfEmpty(&c.UpgradeStrategy.MaxUnavailableWorker, DefaultMaxUnavailableWorker) + setDefaultIfEmpty(&c.UpgradeStrategy.MaxUnavailableControlplane, DefaultMaxUnavailableControlplane) if !c.UpgradeStrategy.Drain { return } diff --git a/cluster/hosts.go b/cluster/hosts.go index 4ccd4dde..58847740 100644 --- a/cluster/hosts.go +++ b/cluster/hosts.go @@ -67,10 +67,10 @@ func (c *Cluster) TunnelHosts(ctx context.Context, flags ExternalFlags) error { return ValidateHostCount(c) } -func (c *Cluster) RemoveHostsLabeledToIgnoreUpgrade(ctx context.Context) { +func (c *Cluster) FindHostsLabeledToIgnoreUpgrade(ctx context.Context) { kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport) if err != nil { - logrus.Errorf("Error generating kube client in RemoveHostsLabeledToIgnoreUpgrade: %v", err) + logrus.Errorf("Error generating kube client in FindHostsLabeledToIgnoreUpgrade: %v", err) return } var nodes *v1.NodeList @@ -152,6 +152,46 @@ func (c *Cluster) InvertIndexHosts() error { return nil } +func (c *Cluster) CalculateMaxUnavailable() (int, int, error) { + var inactiveControlPlaneHosts, inactiveWorkerHosts []string + var workerHosts, controlHosts, maxUnavailableWorker, maxUnavailableControl int + + for _, host := range c.InactiveHosts { + if host.IsControl && !c.HostsLabeledToIgnoreUpgrade[host.Address] { + inactiveControlPlaneHosts = append(inactiveControlPlaneHosts, host.HostnameOverride) + } + if !host.IsWorker && !c.HostsLabeledToIgnoreUpgrade[host.Address] { + inactiveWorkerHosts = append(inactiveWorkerHosts, host.HostnameOverride) + } + // not breaking out of the loop so we can log all of the inactive hosts + } + + for _, host := range c.WorkerHosts { + if c.HostsLabeledToIgnoreUpgrade[host.Address] { + continue + } + workerHosts++ + } + // maxUnavailable should be calculated against all hosts provided in cluster.yml except the ones labelled to be ignored for upgrade + workerHosts += len(inactiveWorkerHosts) + maxUnavailableWorker, err := services.CalculateMaxUnavailable(c.UpgradeStrategy.MaxUnavailableWorker, workerHosts) + if err != nil { + return maxUnavailableWorker, maxUnavailableControl, err + } + for _, host := range c.ControlPlaneHosts { + if c.HostsLabeledToIgnoreUpgrade[host.Address] { + continue + } + controlHosts++ + } + controlHosts += len(inactiveControlPlaneHosts) + maxUnavailableControl, err = services.CalculateMaxUnavailable(c.UpgradeStrategy.MaxUnavailableControlplane, controlHosts) + if err != nil { + return maxUnavailableWorker, maxUnavailableControl, err + } + return maxUnavailableWorker, maxUnavailableControl, nil +} + func (c *Cluster) getConsolidatedAdmissionConfiguration() (*v1alpha1.AdmissionConfiguration, error) { var err error var admissionConfig *v1alpha1.AdmissionConfiguration diff --git a/cluster/validation.go b/cluster/validation.go index ca290f08..9561d349 100644 --- a/cluster/validation.go +++ b/cluster/validation.go @@ -196,40 +196,6 @@ func ValidateHostCount(c *Cluster) error { return nil } -func (c *Cluster) ValidateHostCountForUpgradeAndCalculateMaxUnavailable() (int, error) { - var inactiveControlPlaneHosts, inactiveWorkerOnlyHosts []string - var workerOnlyHosts, maxUnavailable int - - for _, host := range c.InactiveHosts { - if host.IsControl && !c.HostsLabeledToIgnoreUpgrade[host.Address] { - inactiveControlPlaneHosts = append(inactiveControlPlaneHosts, host.HostnameOverride) - } - if !host.IsEtcd && !host.IsControl && !c.HostsLabeledToIgnoreUpgrade[host.Address] { - inactiveWorkerOnlyHosts = append(inactiveWorkerOnlyHosts, host.HostnameOverride) - } - // not breaking out of the loop so we can log all of the inactive hosts - } - if len(inactiveControlPlaneHosts) >= 1 { - return maxUnavailable, fmt.Errorf("cannot proceed with upgrade of controlplane if one or more controlplane hosts are inactive; found inactive hosts: %v", strings.Join(inactiveControlPlaneHosts, ",")) - } - for _, host := range c.WorkerHosts { - if host.IsControl || host.IsEtcd || c.HostsLabeledToIgnoreUpgrade[host.Address] { - continue - } - workerOnlyHosts++ - } - // maxUnavailable should be calculated against all hosts provided in cluster.yml except the ones labelled to be ignored for upgrade - workerOnlyHosts += len(inactiveWorkerOnlyHosts) - maxUnavailable, err := services.CalculateMaxUnavailable(c.UpgradeStrategy.MaxUnavailable, workerOnlyHosts) - if err != nil { - return maxUnavailable, err - } - if len(inactiveWorkerOnlyHosts) >= maxUnavailable { - return maxUnavailable, fmt.Errorf("cannot proceed with upgrade of worker components since %v (>=maxUnavailable) hosts are inactive; found inactive hosts: %v", len(inactiveWorkerOnlyHosts), strings.Join(inactiveWorkerOnlyHosts, ",")) - } - return maxUnavailable, nil -} - func validateDuplicateNodes(c *Cluster) error { for i := range c.Nodes { for j := range c.Nodes { diff --git a/cmd/cert.go b/cmd/cert.go index c2901923..c1e04a8d 100644 --- a/cmd/cert.go +++ b/cmd/cert.go @@ -185,7 +185,7 @@ func rebuildClusterWithRotatedCertificates(ctx context.Context, } if isLegacyKubeAPI { log.Infof(ctx, "[controlplane] Redeploying controlplane to update kubeapi parameters") - if err := kubeCluster.DeployControlPlane(ctx, svcOptionData, true); err != nil { + if _, err := kubeCluster.DeployControlPlane(ctx, svcOptionData, true); err != nil { return APIURL, caCrt, clientCert, clientKey, nil, err } } diff --git a/cmd/up.go b/cmd/up.go index ceb6dca2..47abdae5 100644 --- a/cmd/up.go +++ b/cmd/up.go @@ -117,34 +117,6 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c if err != nil { return APIURL, caCrt, clientCert, clientKey, nil, err } - - if currentCluster != nil { - // reconcile this cluster, to check if upgrade is needed, or new nodes are getting added/removed - /*This is to separate newly added nodes, so we don't try to check their status/cordon them before upgrade. - This will also cover nodes that were considered inactive first time cluster was provisioned, but are now active during upgrade*/ - currentClusterNodes := make(map[string]bool) - for _, node := range clusterState.CurrentState.RancherKubernetesEngineConfig.Nodes { - currentClusterNodes[node.HostnameOverride] = true - } - - newNodes := make(map[string]bool) - for _, node := range clusterState.DesiredState.RancherKubernetesEngineConfig.Nodes { - if !currentClusterNodes[node.HostnameOverride] { - newNodes[node.HostnameOverride] = true - } - } - kubeCluster.NewHosts = newNodes - reconcileCluster = true - - kubeCluster.RemoveHostsLabeledToIgnoreUpgrade(ctx) - maxUnavailable, err := kubeCluster.ValidateHostCountForUpgradeAndCalculateMaxUnavailable() - if err != nil { - return APIURL, caCrt, clientCert, clientKey, nil, err - } - logrus.Infof("Setting maxUnavailable for worker nodes to: %v", maxUnavailable) - kubeCluster.MaxUnavailableForWorkerNodes = maxUnavailable - } - if !flags.DisablePortCheck { if err = kubeCluster.CheckClusterPorts(ctx, currentCluster); err != nil { return APIURL, caCrt, clientCert, clientKey, nil, err @@ -173,6 +145,34 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c return APIURL, caCrt, clientCert, clientKey, nil, err } + if currentCluster != nil { + // reconcile this cluster, to check if upgrade is needed, or new nodes are getting added/removed + /*This is to separate newly added nodes, so we don't try to check their status/cordon them before upgrade. + This will also cover nodes that were considered inactive first time cluster was provisioned, but are now active during upgrade*/ + currentClusterNodes := make(map[string]bool) + for _, node := range clusterState.CurrentState.RancherKubernetesEngineConfig.Nodes { + currentClusterNodes[node.HostnameOverride] = true + } + + newNodes := make(map[string]bool) + for _, node := range clusterState.DesiredState.RancherKubernetesEngineConfig.Nodes { + if !currentClusterNodes[node.HostnameOverride] { + newNodes[node.HostnameOverride] = true + } + } + kubeCluster.NewHosts = newNodes + reconcileCluster = true + + kubeCluster.FindHostsLabeledToIgnoreUpgrade(ctx) + maxUnavailableWorker, maxUnavailableControl, err := kubeCluster.CalculateMaxUnavailable() + if err != nil { + return APIURL, caCrt, clientCert, clientKey, nil, err + } + logrus.Infof("Setting maxUnavailable for worker nodes to: %v", maxUnavailableWorker) + logrus.Infof("Setting maxUnavailable for control nodes to: %v", maxUnavailableControl) + kubeCluster.MaxUnavailableForWorkerNodes, kubeCluster.MaxUnavailableForControlNodes = maxUnavailableWorker, maxUnavailableControl + } + // update APIURL after reconcile if len(kubeCluster.ControlPlaneHosts) > 0 { APIURL = fmt.Sprintf("https://%s:6443", kubeCluster.ControlPlaneHosts[0].Address) @@ -185,7 +185,7 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c return APIURL, caCrt, clientCert, clientKey, nil, err } - err = kubeCluster.DeployControlPlane(ctx, svcOptionsData, reconcileCluster) + errMsgMaxUnavailableNotFailedCtrl, err := kubeCluster.DeployControlPlane(ctx, svcOptionsData, reconcileCluster) if err != nil { return APIURL, caCrt, clientCert, clientKey, nil, err } @@ -205,7 +205,7 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c return APIURL, caCrt, clientCert, clientKey, nil, err } - errMsgMaxUnavailableNotFailed, err := kubeCluster.DeployWorkerPlane(ctx, svcOptionsData, reconcileCluster) + errMsgMaxUnavailableNotFailedWrkr, err := kubeCluster.DeployWorkerPlane(ctx, svcOptionsData, reconcileCluster) if err != nil { return APIURL, caCrt, clientCert, clientKey, nil, err } @@ -233,8 +233,8 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c return APIURL, caCrt, clientCert, clientKey, nil, err } - if errMsgMaxUnavailableNotFailed != "" { - return APIURL, caCrt, clientCert, clientKey, nil, fmt.Errorf(errMsgMaxUnavailableNotFailed) + if errMsgMaxUnavailableNotFailedCtrl != "" || errMsgMaxUnavailableNotFailedWrkr != "" { + return APIURL, caCrt, clientCert, clientKey, nil, fmt.Errorf(errMsgMaxUnavailableNotFailedCtrl + errMsgMaxUnavailableNotFailedWrkr) } log.Infof(ctx, "Finished building Kubernetes cluster successfully") return APIURL, caCrt, clientCert, clientKey, kubeCluster.Certificates, nil diff --git a/services/controlplane.go b/services/controlplane.go index 9c0d687d..645609f4 100644 --- a/services/controlplane.go +++ b/services/controlplane.go @@ -50,12 +50,12 @@ func RunControlPlane(ctx context.Context, controlHosts []*hosts.Host, localConnD func UpgradeControlPlaneNodes(ctx context.Context, kubeClient *kubernetes.Clientset, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string, certMap map[string]pki.CertificatePKI, - upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool) error { + upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool, maxUnavailable int) (string, error) { if updateWorkersOnly { - return nil + return "", nil } + var errMsgMaxUnavailableNotFailed string var drainHelper drain.Helper - log.Infof(ctx, "[%s] Processing controlplane hosts for upgrade one at a time", ControlRole) if len(newHosts) > 0 { var nodes []string @@ -72,67 +72,133 @@ func UpgradeControlPlaneNodes(ctx context.Context, kubeClient *kubernetes.Client drainHelper = getDrainHelper(kubeClient, *upgradeStrategy) log.Infof(ctx, "[%s] Parameters provided to drain command: %#v", ControlRole, fmt.Sprintf("Force: %v, IgnoreAllDaemonSets: %v, DeleteLocalData: %v, Timeout: %v, GracePeriodSeconds: %v", drainHelper.Force, drainHelper.IgnoreAllDaemonSets, drainHelper.DeleteLocalData, drainHelper.Timeout, drainHelper.GracePeriodSeconds)) } + maxUnavailable = resetMaxUnavailable(maxUnavailable, len(inactiveHosts)) + hostsFailedToUpgrade, err := processControlPlaneForUpgrade(ctx, kubeClient, controlHosts, localConnDialerFactory, prsMap, cpNodePlanMap, updateWorkersOnly, alpineImage, certMap, + upgradeStrategy, newHosts, inactiveHosts, maxUnavailable, drainHelper) + if err != nil { + logrus.Errorf("Failed to upgrade hosts: %v with error %v", strings.Join(hostsFailedToUpgrade, ","), err) + if len(hostsFailedToUpgrade) >= maxUnavailable { + return errMsgMaxUnavailableNotFailed, err + } + errMsgMaxUnavailableNotFailed = fmt.Sprintf("Failed to upgrade hosts: %v with error %v", strings.Join(hostsFailedToUpgrade, ","), err) + } + log.Infof(ctx, "[%s] Successfully upgraded Controller Plane..", ControlRole) + return errMsgMaxUnavailableNotFailed, nil +} + +func processControlPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Clientset, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, + prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string, certMap map[string]pki.CertificatePKI, + upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool, maxUnavailable int, drainHelper drain.Helper) ([]string, error) { + var errgrp errgroup.Group + var failedHosts []string + var hostsFailedToUpgrade = make(chan string, maxUnavailable) + var hostsFailed sync.Map currentHostsPool := make(map[string]bool) for _, host := range controlHosts { currentHostsPool[host.HostnameOverride] = true } - // upgrade control plane hosts one at a time for zero downtime upgrades - for _, host := range controlHosts { - log.Infof(ctx, "Processing controlplane host %v", host.HostnameOverride) - if newHosts[host.HostnameOverride] { - if err := doDeployControlHost(ctx, host, localConnDialerFactory, prsMap, cpNodePlanMap[host.Address].Processes, alpineImage, certMap); err != nil { - return err + // upgrade control plane hosts maxUnavailable nodes time for zero downtime upgrades + hostsQueue := util.GetObjectQueue(controlHosts) + for w := 0; w < maxUnavailable; w++ { + errgrp.Go(func() error { + var errList []error + for host := range hostsQueue { + runHost := host.(*hosts.Host) + log.Infof(ctx, "Processing controlplane host %v", runHost.HostnameOverride) + if newHosts[runHost.HostnameOverride] { + if err := startNewControlHost(ctx, runHost, localConnDialerFactory, prsMap, cpNodePlanMap, updateWorkersOnly, alpineImage, certMap); err != nil { + errList = append(errList, err) + hostsFailedToUpgrade <- runHost.HostnameOverride + hostsFailed.Store(runHost.HostnameOverride, true) + break + } + continue + } + if err := CheckNodeReady(kubeClient, runHost, ControlRole); err != nil { + errList = append(errList, err) + hostsFailedToUpgrade <- runHost.HostnameOverride + hostsFailed.Store(runHost.HostnameOverride, true) + break + } + nodes, err := getNodeListForUpgrade(kubeClient, &sync.Map{}, newHosts, inactiveHosts, ControlRole) + if err != nil { + errList = append(errList, err) + } + var maxUnavailableHit bool + for _, node := range nodes { + // in case any previously added nodes or till now unprocessed nodes become unreachable during upgrade + if !k8s.IsNodeReady(node) && currentHostsPool[node.Labels[k8s.HostnameLabel]] { + if len(hostsFailedToUpgrade) >= maxUnavailable { + maxUnavailableHit = true + break + } + hostsFailed.Store(node.Labels[k8s.HostnameLabel], true) + hostsFailedToUpgrade <- node.Labels[k8s.HostnameLabel] + errList = append(errList, fmt.Errorf("host %v not ready", node.Labels[k8s.HostnameLabel])) + } + } + if maxUnavailableHit || len(hostsFailedToUpgrade) >= maxUnavailable { + break + } + controlPlaneUpgradable, workerPlaneUpgradable, err := checkHostUpgradable(ctx, runHost, cpNodePlanMap) + if err != nil { + errList = append(errList, err) + hostsFailedToUpgrade <- runHost.HostnameOverride + hostsFailed.Store(runHost.HostnameOverride, true) + break + } + if !controlPlaneUpgradable && !workerPlaneUpgradable { + log.Infof(ctx, "Upgrade not required for controlplane and worker components of host %v", runHost.HostnameOverride) + continue + } + if err := upgradeControlHost(ctx, kubeClient, runHost, upgradeStrategy.Drain, drainHelper, localConnDialerFactory, prsMap, cpNodePlanMap, updateWorkersOnly, alpineImage, certMap, controlPlaneUpgradable, workerPlaneUpgradable); err != nil { + errList = append(errList, err) + hostsFailedToUpgrade <- runHost.HostnameOverride + hostsFailed.Store(runHost.HostnameOverride, true) + break + } } - if err := doDeployWorkerPlaneHost(ctx, host, localConnDialerFactory, prsMap, cpNodePlanMap[host.Address].Processes, certMap, updateWorkersOnly, alpineImage); err != nil { - return err - } - continue - } - nodes, err := getNodeListForUpgrade(kubeClient, &sync.Map{}, newHosts, inactiveHosts) - if err != nil { - return err - } - var maxUnavailableHit bool - var nodeNotReady string - for _, node := range nodes { - // in case any previously added nodes or till now unprocessed nodes become unreachable during upgrade - if !k8s.IsNodeReady(node) && currentHostsPool[node.Labels[k8s.HostnameLabel]] { - maxUnavailableHit = true - nodeNotReady = node.Labels[k8s.HostnameLabel] - break - } - } - if maxUnavailableHit { - return fmt.Errorf("maxUnavailable limit hit for controlplane since node %v is in NotReady state", nodeNotReady) - } - - controlPlaneUpgradable, err := isControlPlaneHostUpgradable(ctx, host, cpNodePlanMap[host.Address].Processes) - if err != nil { - return err - } - workerPlaneUpgradable, err := isWorkerHostUpgradable(ctx, host, cpNodePlanMap[host.Address].Processes) - if err != nil { - return err - } - if !controlPlaneUpgradable && !workerPlaneUpgradable { - log.Infof(ctx, "Upgrade not required for controlplane and worker components of host %v", host.HostnameOverride) - continue - } - if err := upgradeControlHost(ctx, kubeClient, host, upgradeStrategy.Drain, drainHelper, localConnDialerFactory, prsMap, cpNodePlanMap, updateWorkersOnly, alpineImage, certMap, controlPlaneUpgradable, workerPlaneUpgradable); err != nil { - return err + return util.ErrList(errList) + }) + } + err := errgrp.Wait() + close(hostsFailedToUpgrade) + if err != nil { + for host := range hostsFailedToUpgrade { + failedHosts = append(failedHosts, host) } } - log.Infof(ctx, "[%s] Successfully upgraded Controller Plane..", ControlRole) + return failedHosts, err +} + +func startNewControlHost(ctx context.Context, runHost *hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, + cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string, certMap map[string]pki.CertificatePKI) error { + if err := doDeployControlHost(ctx, runHost, localConnDialerFactory, prsMap, cpNodePlanMap[runHost.Address].Processes, alpineImage, certMap); err != nil { + return err + } + if err := doDeployWorkerPlaneHost(ctx, runHost, localConnDialerFactory, prsMap, cpNodePlanMap[runHost.Address].Processes, certMap, updateWorkersOnly, alpineImage); err != nil { + return err + } return nil } +func checkHostUpgradable(ctx context.Context, runHost *hosts.Host, cpNodePlanMap map[string]v3.RKEConfigNodePlan) (bool, bool, error) { + var controlPlaneUpgradable, workerPlaneUpgradable bool + controlPlaneUpgradable, err := isControlPlaneHostUpgradable(ctx, runHost, cpNodePlanMap[runHost.Address].Processes) + if err != nil { + return controlPlaneUpgradable, workerPlaneUpgradable, err + } + workerPlaneUpgradable, err = isWorkerHostUpgradable(ctx, runHost, cpNodePlanMap[runHost.Address].Processes) + if err != nil { + return controlPlaneUpgradable, workerPlaneUpgradable, err + } + return controlPlaneUpgradable, workerPlaneUpgradable, nil +} + func upgradeControlHost(ctx context.Context, kubeClient *kubernetes.Clientset, host *hosts.Host, drain bool, drainHelper drain.Helper, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string, certMap map[string]pki.CertificatePKI, controlPlaneUpgradable, workerPlaneUpgradable bool) error { - if err := checkNodeReady(kubeClient, host, ControlRole); err != nil { - return err - } if err := cordonAndDrainNode(kubeClient, host, drain, drainHelper, ControlRole); err != nil { return err } @@ -149,7 +215,7 @@ func upgradeControlHost(ctx context.Context, kubeClient *kubernetes.Clientset, h } } - if err := checkNodeReady(kubeClient, host, ControlRole); err != nil { + if err := CheckNodeReady(kubeClient, host, ControlRole); err != nil { return err } if err := k8s.CordonUncordon(kubeClient, host.HostnameOverride, false); err != nil { diff --git a/services/kubeapi.go b/services/kubeapi.go index 914e8315..cfc359b4 100644 --- a/services/kubeapi.go +++ b/services/kubeapi.go @@ -12,7 +12,6 @@ import ( ) func runKubeAPI(ctx context.Context, host *hosts.Host, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, kubeAPIProcess v3.Process, alpineImage string, certMap map[string]pki.CertificatePKI) error { - imageCfg, hostCfg, healthCheckURL := GetProcessConfig(kubeAPIProcess, host) if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeAPIContainerName, host.Address, ControlRole, prsMap); err != nil { return err diff --git a/services/node_util.go b/services/node_util.go index c52513db..abd44f46 100644 --- a/services/node_util.go +++ b/services/node_util.go @@ -11,13 +11,14 @@ import ( v3 "github.com/rancher/types/apis/management.cattle.io/v3" "github.com/sirupsen/logrus" "k8s.io/api/core/v1" + k8sutil "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes" "k8s.io/kubectl/pkg/drain" ) -func checkNodeReady(kubeClient *kubernetes.Clientset, runHost *hosts.Host, component string) error { +func CheckNodeReady(kubeClient *kubernetes.Clientset, runHost *hosts.Host, component string) error { for retries := 0; retries < k8s.MaxRetries; retries++ { - logrus.Debugf("[%s] Now checking status of node %v", component, runHost.HostnameOverride) + logrus.Infof("[%s] Now checking status of node %v", component, runHost.HostnameOverride) k8sNode, err := k8s.GetNode(kubeClient, runHost.HostnameOverride) if err != nil { return fmt.Errorf("[%s] Error getting node %v: %v", component, runHost.HostnameOverride, err) @@ -60,12 +61,13 @@ func getDrainHelper(kubeClient *kubernetes.Clientset, upgradeStrategy v3.NodeUpg return drainHelper } -func getNodeListForUpgrade(kubeClient *kubernetes.Clientset, hostsFailed *sync.Map, newHosts, inactiveHosts map[string]bool) ([]v1.Node, error) { +func getNodeListForUpgrade(kubeClient *kubernetes.Clientset, hostsFailed *sync.Map, newHosts, inactiveHosts map[string]bool, component string) ([]v1.Node, error) { var nodeList []v1.Node nodes, err := k8s.GetNodeList(kubeClient) if err != nil { return nodeList, err } + logrus.Infof("[%s] Getting list of nodes for upgrade", component) for _, node := range nodes.Items { if _, ok := hostsFailed.Load(node.Labels[k8s.HostnameLabel]); ok { continue @@ -84,3 +86,36 @@ func getNodeListForUpgrade(kubeClient *kubernetes.Clientset, hostsFailed *sync.M } return nodeList, nil } + +func CalculateMaxUnavailable(maxUnavailableVal string, numHosts int) (int, error) { + // if maxUnavailable is given in percent, round down + maxUnavailableParsed := k8sutil.Parse(maxUnavailableVal) + logrus.Debugf("Provided value for maxUnavailable: %v", maxUnavailableParsed) + maxUnavailable, err := k8sutil.GetValueFromIntOrPercent(&maxUnavailableParsed, numHosts, false) + if err != nil { + logrus.Errorf("Unable to parse max_unavailable, should be a number or percentage of nodes, error: %v", err) + return 0, err + } + if maxUnavailable == 0 { + // In case there is only one node and rounding down maxUnvailable percentage led to 0 + maxUnavailable = 1 + } + logrus.Debugf("Parsed value of maxUnavailable: %v", maxUnavailable) + return maxUnavailable, nil +} + +func resetMaxUnavailable(maxUnavailable, lenInactiveHosts int) int { + if maxUnavailable > WorkerThreads { + /* upgrading a large number of nodes in parallel leads to a large number of goroutines, which has led to errors regarding too many open sockets + Because of this RKE switched to using workerpools. 50 workerthreads has been sufficient to optimize rke up, upgrading at most 50 nodes in parallel. + So the user configurable maxUnavailable will be respected only as long as it's less than 50 and capped at 50 */ + maxUnavailable = WorkerThreads + logrus.Info("Resetting maxUnavailable to 50, to avoid issues related to upgrading large number of nodes in parallel") + } + + if lenInactiveHosts > 0 { + maxUnavailable -= lenInactiveHosts + logrus.Infof("Resetting maxUnavailable to %v since %v host(s) are found to be inactive/unavailable prior to upgrade", maxUnavailable, lenInactiveHosts) + } + return maxUnavailable +} diff --git a/services/workerplane.go b/services/workerplane.go index 3d2c3227..06038e4e 100644 --- a/services/workerplane.go +++ b/services/workerplane.go @@ -17,7 +17,6 @@ import ( "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" apierrors "k8s.io/apimachinery/pkg/api/errors" - k8sutil "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes" "k8s.io/kubectl/pkg/drain" ) @@ -56,19 +55,7 @@ func RunWorkerPlane(ctx context.Context, allHosts []*hosts.Host, localConnDialer func UpgradeWorkerPlaneForWorkerAndEtcdNodes(ctx context.Context, kubeClient *kubernetes.Clientset, mixedRolesHosts []*hosts.Host, workerOnlyHosts []*hosts.Host, inactiveHosts map[string]bool, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts map[string]bool, maxUnavailable int) (string, error) { log.Infof(ctx, "[%s] Upgrading Worker Plane..", WorkerRole) var errMsgMaxUnavailableNotFailed string - if maxUnavailable > WorkerThreads { - /* upgrading a large number of nodes in parallel leads to a large number of goroutines, which has led to errors regarding too many open sockets - Because of this RKE switched to using workerpools. 50 workerthreads has been sufficient to optimize rke up, upgrading at most 50 nodes in parallel. - So the user configurable maxUnavailable will be respected only as long as it's less than 50 and capped at 50 */ - maxUnavailable = WorkerThreads - logrus.Info("Setting maxUnavailable to 50, to avoid issues related to upgrading large number of nodes in parallel") - } - - if len(inactiveHosts) > 0 { - maxUnavailable -= len(inactiveHosts) - logrus.Infof("Resetting maxUnavailable to %v since %v host(s) are found to be inactive/unavailable prior to upgrade", maxUnavailable, len(inactiveHosts)) - } - + maxUnavailable = resetMaxUnavailable(maxUnavailable, len(inactiveHosts)) updateNewHostsList(kubeClient, append(mixedRolesHosts, workerOnlyHosts...), newHosts) if len(mixedRolesHosts) > 0 { log.Infof(ctx, "First checking and processing worker components for upgrades on nodes with etcd role one at a time") @@ -95,23 +82,6 @@ func UpgradeWorkerPlaneForWorkerAndEtcdNodes(ctx context.Context, kubeClient *ku return errMsgMaxUnavailableNotFailed, nil } -func CalculateMaxUnavailable(maxUnavailableVal string, numHosts int) (int, error) { - // if maxUnavailable is given in percent, round down - maxUnavailableParsed := k8sutil.Parse(maxUnavailableVal) - logrus.Debugf("Provided value for maxUnavailable: %v", maxUnavailableParsed) - maxUnavailable, err := k8sutil.GetValueFromIntOrPercent(&maxUnavailableParsed, numHosts, false) - if err != nil { - logrus.Errorf("Unable to parse max_unavailable, should be a number or percentage of nodes, error: %v", err) - return 0, err - } - if maxUnavailable == 0 { - // In case there is only one node and rounding down maxUnvailable percentage led to 0 - maxUnavailable = 1 - } - logrus.Debugf("Parsed value of maxUnavailable: %v", maxUnavailable) - return maxUnavailable, nil -} - func updateNewHostsList(kubeClient *kubernetes.Clientset, allHosts []*hosts.Host, newHosts map[string]bool) { for _, h := range allHosts { _, err := k8s.GetNode(kubeClient, h.HostnameOverride) @@ -159,7 +129,13 @@ func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Cl } continue } - nodes, err := getNodeListForUpgrade(kubeClient, &hostsFailed, newHosts, inactiveHosts) + if err := CheckNodeReady(kubeClient, runHost, WorkerRole); err != nil { + errList = append(errList, err) + hostsFailed.Store(runHost.HostnameOverride, true) + hostsFailedToUpgrade <- runHost.HostnameOverride + break + } + nodes, err := getNodeListForUpgrade(kubeClient, &hostsFailed, newHosts, inactiveHosts, WorkerRole) if err != nil { errList = append(errList, err) } @@ -214,9 +190,6 @@ func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Cl func upgradeWorkerHost(ctx context.Context, kubeClient *kubernetes.Clientset, runHost *hosts.Host, drainFlag bool, drainHelper drain.Helper, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string) error { - if err := checkNodeReady(kubeClient, runHost, WorkerRole); err != nil { - return err - } // cordon and drain if err := cordonAndDrainNode(kubeClient, runHost, drainFlag, drainHelper, WorkerRole); err != nil { return err @@ -226,7 +199,7 @@ func upgradeWorkerHost(ctx context.Context, kubeClient *kubernetes.Clientset, ru return err } // consider upgrade done when kubeclient lists node as ready - if err := checkNodeReady(kubeClient, runHost, WorkerRole); err != nil { + if err := CheckNodeReady(kubeClient, runHost, WorkerRole); err != nil { return err } // uncordon node