mirror of
https://github.com/rancher/rke.git
synced 2025-08-10 19:22:41 +00:00
Accept label to ignore nodes during upgrade
RKE does a cluster scan to find the unreachable hosts, and if that number is same as or exceeds maxUnavailable, upgrade won't proceed. This commit introduces a label users can provide for their nodes so they don't get counted as unavailable and are excluded from upgrade. This commit also includes a couple of bug fixes
This commit is contained in:
parent
92714e5523
commit
968a399f26
@ -66,6 +66,8 @@ type Cluster struct {
|
|||||||
WorkerHosts []*hosts.Host
|
WorkerHosts []*hosts.Host
|
||||||
EncryptionConfig encryptionConfig
|
EncryptionConfig encryptionConfig
|
||||||
NewHosts map[string]bool
|
NewHosts map[string]bool
|
||||||
|
MaxUnavailableForWorkerNodes int
|
||||||
|
HostsLabeledToIgnoreUpgrade map[string]bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type encryptionConfig struct {
|
type encryptionConfig struct {
|
||||||
@ -139,20 +141,32 @@ func (c *Cluster) DeployControlPlane(ctx context.Context, svcOptionData map[stri
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := services.UpgradeControlPlane(ctx, kubeClient, c.ControlPlaneHosts,
|
inactiveHosts := make(map[string]bool)
|
||||||
|
var controlPlaneHosts []*hosts.Host
|
||||||
|
for _, host := range c.InactiveHosts {
|
||||||
|
if !c.HostsLabeledToIgnoreUpgrade[host.Address] {
|
||||||
|
inactiveHosts[host.HostnameOverride] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, host := range c.ControlPlaneHosts {
|
||||||
|
if !c.HostsLabeledToIgnoreUpgrade[host.Address] {
|
||||||
|
controlPlaneHosts = append(controlPlaneHosts, host)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := services.UpgradeControlPlaneNodes(ctx, kubeClient, controlPlaneHosts,
|
||||||
c.LocalConnDialerFactory,
|
c.LocalConnDialerFactory,
|
||||||
c.PrivateRegistriesMap,
|
c.PrivateRegistriesMap,
|
||||||
cpNodePlanMap,
|
cpNodePlanMap,
|
||||||
c.UpdateWorkersOnly,
|
c.UpdateWorkersOnly,
|
||||||
c.SystemImages.Alpine,
|
c.SystemImages.Alpine,
|
||||||
c.Certificates, c.UpgradeStrategy, c.NewHosts); err != nil {
|
c.Certificates, c.UpgradeStrategy, c.NewHosts, inactiveHosts); err != nil {
|
||||||
return fmt.Errorf("[controlPlane] Failed to upgrade Control Plane: %v", err)
|
return fmt.Errorf("[controlPlane] Failed to upgrade Control Plane: %v", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[string]*v3.KubernetesServicesOptions, reconcileCluster bool) (string, error) {
|
func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[string]*v3.KubernetesServicesOptions, reconcileCluster bool) (string, error) {
|
||||||
var workerOnlyHosts, multipleRolesHosts []*hosts.Host
|
var workerOnlyHosts, etcdAndWorkerHosts []*hosts.Host
|
||||||
kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
|
kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("failed to initialize new kubernetes client: %v", err)
|
return "", fmt.Errorf("failed to initialize new kubernetes client: %v", err)
|
||||||
@ -161,12 +175,18 @@ func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[strin
|
|||||||
workerNodePlanMap := make(map[string]v3.RKEConfigNodePlan)
|
workerNodePlanMap := make(map[string]v3.RKEConfigNodePlan)
|
||||||
// Build cp node plan map
|
// Build cp node plan map
|
||||||
allHosts := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts)
|
allHosts := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts)
|
||||||
for _, workerHost := range allHosts {
|
for _, host := range allHosts {
|
||||||
workerNodePlanMap[workerHost.Address] = BuildRKEConfigNodePlan(ctx, c, workerHost, workerHost.DockerInfo, svcOptionData)
|
workerNodePlanMap[host.Address] = BuildRKEConfigNodePlan(ctx, c, host, host.DockerInfo, svcOptionData)
|
||||||
if !workerHost.IsControl && !workerHost.IsEtcd {
|
if host.IsControl || c.HostsLabeledToIgnoreUpgrade[host.Address] {
|
||||||
workerOnlyHosts = append(workerOnlyHosts, workerHost)
|
continue
|
||||||
|
}
|
||||||
|
if !host.IsEtcd {
|
||||||
|
// separating hosts with only worker role so they undergo upgrade in maxUnavailable batches
|
||||||
|
workerOnlyHosts = append(workerOnlyHosts, host)
|
||||||
} else {
|
} else {
|
||||||
multipleRolesHosts = append(multipleRolesHosts, workerHost)
|
// separating nodes with etcd role, since at this point worker components in controlplane nodes are already upgraded by `UpgradeControlPlaneNodes`
|
||||||
|
// and these nodes will undergo upgrade of worker components sequentially
|
||||||
|
etcdAndWorkerHosts = append(etcdAndWorkerHosts, host)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -182,13 +202,20 @@ func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[strin
|
|||||||
}
|
}
|
||||||
return "", nil
|
return "", nil
|
||||||
}
|
}
|
||||||
errMsgMaxUnavailableNotFailed, err := services.UpgradeWorkerPlane(ctx, kubeClient, multipleRolesHosts, workerOnlyHosts, c.InactiveHosts,
|
|
||||||
|
inactiveHosts := make(map[string]bool)
|
||||||
|
for _, host := range c.InactiveHosts {
|
||||||
|
if !c.HostsLabeledToIgnoreUpgrade[host.Address] {
|
||||||
|
inactiveHosts[host.HostnameOverride] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
errMsgMaxUnavailableNotFailed, err := services.UpgradeWorkerPlaneForWorkerAndEtcdNodes(ctx, kubeClient, etcdAndWorkerHosts, workerOnlyHosts, inactiveHosts,
|
||||||
c.LocalConnDialerFactory,
|
c.LocalConnDialerFactory,
|
||||||
c.PrivateRegistriesMap,
|
c.PrivateRegistriesMap,
|
||||||
workerNodePlanMap,
|
workerNodePlanMap,
|
||||||
c.Certificates,
|
c.Certificates,
|
||||||
c.UpdateWorkersOnly,
|
c.UpdateWorkersOnly,
|
||||||
c.SystemImages.Alpine, c.UpgradeStrategy, c.NewHosts)
|
c.SystemImages.Alpine, c.UpgradeStrategy, c.NewHosts, c.MaxUnavailableForWorkerNodes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("[workerPlane] Failed to upgrade Worker Plane: %v", err)
|
return "", fmt.Errorf("[workerPlane] Failed to upgrade Worker Plane: %v", err)
|
||||||
}
|
}
|
||||||
@ -473,6 +500,7 @@ func InitClusterObject(ctx context.Context, rkeConfig *v3.RancherKubernetesEngin
|
|||||||
EncryptionConfig: encryptionConfig{
|
EncryptionConfig: encryptionConfig{
|
||||||
EncryptionProviderFile: encryptConfig,
|
EncryptionProviderFile: encryptConfig,
|
||||||
},
|
},
|
||||||
|
HostsLabeledToIgnoreUpgrade: make(map[string]bool),
|
||||||
}
|
}
|
||||||
if metadata.K8sVersionToRKESystemImages == nil {
|
if metadata.K8sVersionToRKESystemImages == nil {
|
||||||
metadata.InitMetadata(ctx)
|
metadata.InitMetadata(ctx)
|
||||||
|
@ -200,7 +200,7 @@ func (c *Cluster) setClusterDefaults(ctx context.Context, flags ExternalFlags) e
|
|||||||
|
|
||||||
func (c *Cluster) setNodeUpgradeStrategy() {
|
func (c *Cluster) setNodeUpgradeStrategy() {
|
||||||
if c.UpgradeStrategy == nil {
|
if c.UpgradeStrategy == nil {
|
||||||
logrus.Info("No input provided for maxUnavailable, setting it to default value of 10%")
|
logrus.Infof("No input provided for maxUnavailable, setting it to default value of %v", DefaultMaxUnavailable)
|
||||||
c.UpgradeStrategy = &v3.NodeUpgradeStrategy{
|
c.UpgradeStrategy = &v3.NodeUpgradeStrategy{
|
||||||
MaxUnavailable: DefaultMaxUnavailable,
|
MaxUnavailable: DefaultMaxUnavailable,
|
||||||
}
|
}
|
||||||
|
@ -4,9 +4,11 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/docker/docker/api/types"
|
"github.com/docker/docker/api/types"
|
||||||
"github.com/rancher/rke/hosts"
|
"github.com/rancher/rke/hosts"
|
||||||
|
"github.com/rancher/rke/k8s"
|
||||||
"github.com/rancher/rke/log"
|
"github.com/rancher/rke/log"
|
||||||
"github.com/rancher/rke/pki"
|
"github.com/rancher/rke/pki"
|
||||||
"github.com/rancher/rke/services"
|
"github.com/rancher/rke/services"
|
||||||
@ -14,6 +16,8 @@ import (
|
|||||||
v3 "github.com/rancher/types/apis/management.cattle.io/v3"
|
v3 "github.com/rancher/types/apis/management.cattle.io/v3"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
v1 "k8s.io/api/core/v1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apiserver/pkg/apis/apiserver/v1alpha1"
|
"k8s.io/apiserver/pkg/apis/apiserver/v1alpha1"
|
||||||
"sigs.k8s.io/yaml"
|
"sigs.k8s.io/yaml"
|
||||||
)
|
)
|
||||||
@ -61,7 +65,36 @@ func (c *Cluster) TunnelHosts(ctx context.Context, flags ExternalFlags) error {
|
|||||||
c.RancherKubernetesEngineConfig.Nodes = removeFromRKENodes(host.RKEConfigNode, c.RancherKubernetesEngineConfig.Nodes)
|
c.RancherKubernetesEngineConfig.Nodes = removeFromRKENodes(host.RKEConfigNode, c.RancherKubernetesEngineConfig.Nodes)
|
||||||
}
|
}
|
||||||
return ValidateHostCount(c)
|
return ValidateHostCount(c)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cluster) RemoveHostsLabeledToIgnoreUpgrade(ctx context.Context) {
|
||||||
|
kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("Error generating kube client in RemoveHostsLabeledToIgnoreUpgrade: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var nodes *v1.NodeList
|
||||||
|
for retries := 0; retries < k8s.MaxRetries; retries++ {
|
||||||
|
nodes, err = kubeClient.CoreV1().Nodes().List(metav1.ListOptions{})
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(time.Second * k8s.RetryInterval)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
log.Infof(ctx, "Error listing nodes but continuing upgrade: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if nodes == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, node := range nodes.Items {
|
||||||
|
if val, ok := node.Labels[k8s.IgnoreHostDuringUpgradeLabel]; ok && val == "true" {
|
||||||
|
host := hosts.Host{RKEConfigNode: v3.RKEConfigNode{Address: node.Annotations[k8s.ExternalAddressAnnotation]}}
|
||||||
|
c.HostsLabeledToIgnoreUpgrade[host.Address] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) InvertIndexHosts() error {
|
func (c *Cluster) InvertIndexHosts() error {
|
||||||
|
@ -196,37 +196,38 @@ func ValidateHostCount(c *Cluster) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) ValidateHostCountForUpgrade() error {
|
func (c *Cluster) ValidateHostCountForUpgradeAndCalculateMaxUnavailable() (int, error) {
|
||||||
var inactiveControlPlaneHosts, inactiveWorkerOnlyHosts []string
|
var inactiveControlPlaneHosts, inactiveWorkerOnlyHosts []string
|
||||||
var workerOnlyHosts int
|
var workerOnlyHosts, maxUnavailable int
|
||||||
|
|
||||||
for _, host := range c.InactiveHosts {
|
for _, host := range c.InactiveHosts {
|
||||||
if host.IsControl {
|
if host.IsControl && !c.HostsLabeledToIgnoreUpgrade[host.Address] {
|
||||||
inactiveControlPlaneHosts = append(inactiveControlPlaneHosts, host.HostnameOverride)
|
inactiveControlPlaneHosts = append(inactiveControlPlaneHosts, host.HostnameOverride)
|
||||||
}
|
}
|
||||||
if !host.IsEtcd && !host.IsControl {
|
if !host.IsEtcd && !host.IsControl && !c.HostsLabeledToIgnoreUpgrade[host.Address] {
|
||||||
inactiveWorkerOnlyHosts = append(inactiveWorkerOnlyHosts, host.HostnameOverride)
|
inactiveWorkerOnlyHosts = append(inactiveWorkerOnlyHosts, host.HostnameOverride)
|
||||||
}
|
}
|
||||||
// not breaking out of the loop so we can log all of the inactive hosts
|
// not breaking out of the loop so we can log all of the inactive hosts
|
||||||
}
|
}
|
||||||
if len(inactiveControlPlaneHosts) >= 1 {
|
if len(inactiveControlPlaneHosts) >= 1 {
|
||||||
return fmt.Errorf("cannot proceed with upgrade of controlplane if one or more controlplane hosts are inactive; found inactive hosts: %v", strings.Join(inactiveControlPlaneHosts, ","))
|
return maxUnavailable, fmt.Errorf("cannot proceed with upgrade of controlplane if one or more controlplane hosts are inactive; found inactive hosts: %v", strings.Join(inactiveControlPlaneHosts, ","))
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, host := range c.WorkerHosts {
|
for _, host := range c.WorkerHosts {
|
||||||
if host.IsControl || host.IsEtcd {
|
if host.IsControl || host.IsEtcd || c.HostsLabeledToIgnoreUpgrade[host.Address] {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
workerOnlyHosts++
|
workerOnlyHosts++
|
||||||
}
|
}
|
||||||
|
// maxUnavailable should be calculated against all hosts provided in cluster.yml except the ones labelled to be ignored for upgrade
|
||||||
|
workerOnlyHosts += len(inactiveWorkerOnlyHosts)
|
||||||
maxUnavailable, err := services.CalculateMaxUnavailable(c.UpgradeStrategy.MaxUnavailable, workerOnlyHosts)
|
maxUnavailable, err := services.CalculateMaxUnavailable(c.UpgradeStrategy.MaxUnavailable, workerOnlyHosts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return maxUnavailable, err
|
||||||
}
|
}
|
||||||
if len(inactiveWorkerOnlyHosts) >= maxUnavailable {
|
if len(inactiveWorkerOnlyHosts) >= maxUnavailable {
|
||||||
return fmt.Errorf("cannot proceed with upgrade of worker components since %v (>=maxUnavailable) hosts are inactive; found inactive hosts: %v", len(inactiveWorkerOnlyHosts), strings.Join(inactiveWorkerOnlyHosts, ","))
|
return maxUnavailable, fmt.Errorf("cannot proceed with upgrade of worker components since %v (>=maxUnavailable) hosts are inactive; found inactive hosts: %v", len(inactiveWorkerOnlyHosts), strings.Join(inactiveWorkerOnlyHosts, ","))
|
||||||
}
|
}
|
||||||
return nil
|
return maxUnavailable, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func validateDuplicateNodes(c *Cluster) error {
|
func validateDuplicateNodes(c *Cluster) error {
|
||||||
|
21
cmd/up.go
21
cmd/up.go
@ -113,11 +113,6 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = kubeCluster.ValidateHostCountForUpgrade()
|
|
||||||
if err != nil {
|
|
||||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
|
||||||
}
|
|
||||||
currentCluster, err := kubeCluster.GetClusterState(ctx, clusterState)
|
currentCluster, err := kubeCluster.GetClusterState(ctx, clusterState)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return APIURL, caCrt, clientCert, clientKey, nil, err
|
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||||
@ -140,6 +135,14 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c
|
|||||||
}
|
}
|
||||||
kubeCluster.NewHosts = newNodes
|
kubeCluster.NewHosts = newNodes
|
||||||
reconcileCluster = true
|
reconcileCluster = true
|
||||||
|
|
||||||
|
kubeCluster.RemoveHostsLabeledToIgnoreUpgrade(ctx)
|
||||||
|
maxUnavailable, err := kubeCluster.ValidateHostCountForUpgradeAndCalculateMaxUnavailable()
|
||||||
|
if err != nil {
|
||||||
|
return APIURL, caCrt, clientCert, clientKey, nil, err
|
||||||
|
}
|
||||||
|
logrus.Infof("Setting maxUnavailable for worker nodes to: %v", maxUnavailable)
|
||||||
|
kubeCluster.MaxUnavailableForWorkerNodes = maxUnavailable
|
||||||
}
|
}
|
||||||
|
|
||||||
if !flags.DisablePortCheck {
|
if !flags.DisablePortCheck {
|
||||||
@ -244,10 +247,16 @@ func checkAllIncluded(cluster *cluster.Cluster) error {
|
|||||||
|
|
||||||
var names []string
|
var names []string
|
||||||
for _, host := range cluster.InactiveHosts {
|
for _, host := range cluster.InactiveHosts {
|
||||||
|
if cluster.HostsLabeledToIgnoreUpgrade[host.Address] {
|
||||||
|
continue
|
||||||
|
}
|
||||||
names = append(names, host.Address)
|
names = append(names, host.Address)
|
||||||
}
|
}
|
||||||
|
|
||||||
return fmt.Errorf("Provisioning incomplete, host(s) [%s] skipped because they could not be contacted", strings.Join(names, ","))
|
if len(names) > 0 {
|
||||||
|
return fmt.Errorf("Provisioning incomplete, host(s) [%s] skipped because they could not be contacted", strings.Join(names, ","))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func clusterUpFromCli(ctx *cli.Context) error {
|
func clusterUpFromCli(ctx *cli.Context) error {
|
||||||
|
13
k8s/node.go
13
k8s/node.go
@ -14,12 +14,13 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
HostnameLabel = "kubernetes.io/hostname"
|
HostnameLabel = "kubernetes.io/hostname"
|
||||||
InternalAddressAnnotation = "rke.cattle.io/internal-ip"
|
InternalAddressAnnotation = "rke.cattle.io/internal-ip"
|
||||||
ExternalAddressAnnotation = "rke.cattle.io/external-ip"
|
ExternalAddressAnnotation = "rke.cattle.io/external-ip"
|
||||||
AWSCloudProvider = "aws"
|
IgnoreHostDuringUpgradeLabel = "rke.cattle.io/ignore-during-upgrade"
|
||||||
MaxRetries = 5
|
AWSCloudProvider = "aws"
|
||||||
RetryInterval = 5
|
MaxRetries = 5
|
||||||
|
RetryInterval = 5
|
||||||
)
|
)
|
||||||
|
|
||||||
func DeleteNode(k8sClient *kubernetes.Clientset, nodeName, cloudProvider string) error {
|
func DeleteNode(k8sClient *kubernetes.Clientset, nodeName, cloudProvider string) error {
|
||||||
|
@ -2,6 +2,7 @@ package services
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
@ -47,13 +48,15 @@ func RunControlPlane(ctx context.Context, controlHosts []*hosts.Host, localConnD
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func UpgradeControlPlane(ctx context.Context, kubeClient *kubernetes.Clientset, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string, certMap map[string]pki.CertificatePKI, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts map[string]bool) error {
|
func UpgradeControlPlaneNodes(ctx context.Context, kubeClient *kubernetes.Clientset, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory,
|
||||||
|
prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string, certMap map[string]pki.CertificatePKI,
|
||||||
|
upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool) error {
|
||||||
if updateWorkersOnly {
|
if updateWorkersOnly {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
var drainHelper drain.Helper
|
var drainHelper drain.Helper
|
||||||
|
|
||||||
log.Infof(ctx, "[%s] Processing control plane components for upgrade one at a time", ControlRole)
|
log.Infof(ctx, "[%s] Processing controlplane hosts for upgrade one at a time", ControlRole)
|
||||||
if len(newHosts) > 0 {
|
if len(newHosts) > 0 {
|
||||||
var nodes []string
|
var nodes []string
|
||||||
for _, host := range controlHosts {
|
for _, host := range controlHosts {
|
||||||
@ -67,6 +70,12 @@ func UpgradeControlPlane(ctx context.Context, kubeClient *kubernetes.Clientset,
|
|||||||
}
|
}
|
||||||
if upgradeStrategy.Drain {
|
if upgradeStrategy.Drain {
|
||||||
drainHelper = getDrainHelper(kubeClient, *upgradeStrategy)
|
drainHelper = getDrainHelper(kubeClient, *upgradeStrategy)
|
||||||
|
log.Infof(ctx, "[%s] Parameters provided to drain command: %#v", ControlRole, fmt.Sprintf("Force: %v, IgnoreAllDaemonSets: %v, DeleteLocalData: %v, Timeout: %v, GracePeriodSeconds: %v", drainHelper.Force, drainHelper.IgnoreAllDaemonSets, drainHelper.DeleteLocalData, drainHelper.Timeout, drainHelper.GracePeriodSeconds))
|
||||||
|
}
|
||||||
|
|
||||||
|
currentHostsPool := make(map[string]bool)
|
||||||
|
for _, host := range controlHosts {
|
||||||
|
currentHostsPool[host.HostnameOverride] = true
|
||||||
}
|
}
|
||||||
// upgrade control plane hosts one at a time for zero downtime upgrades
|
// upgrade control plane hosts one at a time for zero downtime upgrades
|
||||||
for _, host := range controlHosts {
|
for _, host := range controlHosts {
|
||||||
@ -75,16 +84,19 @@ func UpgradeControlPlane(ctx context.Context, kubeClient *kubernetes.Clientset,
|
|||||||
if err := doDeployControlHost(ctx, host, localConnDialerFactory, prsMap, cpNodePlanMap[host.Address].Processes, alpineImage, certMap); err != nil {
|
if err := doDeployControlHost(ctx, host, localConnDialerFactory, prsMap, cpNodePlanMap[host.Address].Processes, alpineImage, certMap); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if err := doDeployWorkerPlaneHost(ctx, host, localConnDialerFactory, prsMap, cpNodePlanMap[host.Address].Processes, certMap, updateWorkersOnly, alpineImage); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
nodes, err := getNodeListForUpgrade(kubeClient, &sync.Map{}, newHosts, false)
|
nodes, err := getNodeListForUpgrade(kubeClient, &sync.Map{}, newHosts, inactiveHosts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
var maxUnavailableHit bool
|
var maxUnavailableHit bool
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
// in case any previously added nodes or till now unprocessed nodes become unreachable during upgrade
|
// in case any previously added nodes or till now unprocessed nodes become unreachable during upgrade
|
||||||
if !k8s.IsNodeReady(node) {
|
if !k8s.IsNodeReady(node) && currentHostsPool[node.Labels[k8s.HostnameLabel]] {
|
||||||
maxUnavailableHit = true
|
maxUnavailableHit = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -93,27 +105,19 @@ func UpgradeControlPlane(ctx context.Context, kubeClient *kubernetes.Clientset,
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
upgradable, err := isControlPlaneHostUpgradable(ctx, host, cpNodePlanMap[host.Address].Processes)
|
controlPlaneUpgradable, err := isControlPlaneHostUpgradable(ctx, host, cpNodePlanMap[host.Address].Processes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !upgradable {
|
workerPlaneUpgradable, err := isWorkerHostUpgradable(ctx, host, cpNodePlanMap[host.Address].Processes)
|
||||||
log.Infof(ctx, "Upgrade not required for controlplane components of host %v", host.HostnameOverride)
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !controlPlaneUpgradable && !workerPlaneUpgradable {
|
||||||
|
log.Infof(ctx, "Upgrade not required for controlplane and worker components of host %v", host.HostnameOverride)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := checkNodeReady(kubeClient, host, ControlRole); err != nil {
|
if err := upgradeControlHost(ctx, kubeClient, host, upgradeStrategy.Drain, drainHelper, localConnDialerFactory, prsMap, cpNodePlanMap, updateWorkersOnly, alpineImage, certMap, controlPlaneUpgradable, workerPlaneUpgradable); err != nil {
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := cordonAndDrainNode(kubeClient, host, upgradeStrategy.Drain, drainHelper, ControlRole); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := doDeployControlHost(ctx, host, localConnDialerFactory, prsMap, cpNodePlanMap[host.Address].Processes, alpineImage, certMap); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := checkNodeReady(kubeClient, host, ControlRole); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := k8s.CordonUncordon(kubeClient, host.HostnameOverride, false); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -121,6 +125,37 @@ func UpgradeControlPlane(ctx context.Context, kubeClient *kubernetes.Clientset,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func upgradeControlHost(ctx context.Context, kubeClient *kubernetes.Clientset, host *hosts.Host, drain bool, drainHelper drain.Helper,
|
||||||
|
localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool,
|
||||||
|
alpineImage string, certMap map[string]pki.CertificatePKI, controlPlaneUpgradable, workerPlaneUpgradable bool) error {
|
||||||
|
if err := checkNodeReady(kubeClient, host, ControlRole); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := cordonAndDrainNode(kubeClient, host, drain, drainHelper, ControlRole); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if controlPlaneUpgradable {
|
||||||
|
log.Infof(ctx, "Upgrading controlplane components for control host %v", host.HostnameOverride)
|
||||||
|
if err := doDeployControlHost(ctx, host, localConnDialerFactory, prsMap, cpNodePlanMap[host.Address].Processes, alpineImage, certMap); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if workerPlaneUpgradable {
|
||||||
|
log.Infof(ctx, "Upgrading workerplane components for control host %v", host.HostnameOverride)
|
||||||
|
if err := doDeployWorkerPlaneHost(ctx, host, localConnDialerFactory, prsMap, cpNodePlanMap[host.Address].Processes, certMap, updateWorkersOnly, alpineImage); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := checkNodeReady(kubeClient, host, ControlRole); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := k8s.CordonUncordon(kubeClient, host.HostnameOverride, false); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func RemoveControlPlane(ctx context.Context, controlHosts []*hosts.Host, force bool) error {
|
func RemoveControlPlane(ctx context.Context, controlHosts []*hosts.Host, force bool) error {
|
||||||
log.Infof(ctx, "[%s] Tearing down the Controller Plane..", ControlRole)
|
log.Infof(ctx, "[%s] Tearing down the Controller Plane..", ControlRole)
|
||||||
var errgrp errgroup.Group
|
var errgrp errgroup.Group
|
||||||
|
@ -60,21 +60,24 @@ func getDrainHelper(kubeClient *kubernetes.Clientset, upgradeStrategy v3.NodeUpg
|
|||||||
return drainHelper
|
return drainHelper
|
||||||
}
|
}
|
||||||
|
|
||||||
func getNodeListForUpgrade(kubeClient *kubernetes.Clientset, hostsFailed *sync.Map, newHosts map[string]bool, isUpgradeForWorkerPlane bool) ([]v1.Node, error) {
|
func getNodeListForUpgrade(kubeClient *kubernetes.Clientset, hostsFailed *sync.Map, newHosts, inactiveHosts map[string]bool) ([]v1.Node, error) {
|
||||||
var nodeList []v1.Node
|
var nodeList []v1.Node
|
||||||
nodes, err := k8s.GetNodeList(kubeClient)
|
nodes, err := k8s.GetNodeList(kubeClient)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nodeList, err
|
return nodeList, err
|
||||||
}
|
}
|
||||||
for _, node := range nodes.Items {
|
for _, node := range nodes.Items {
|
||||||
if isUpgradeForWorkerPlane {
|
if _, ok := hostsFailed.Load(node.Labels[k8s.HostnameLabel]); ok {
|
||||||
// exclude hosts that are already included in failed hosts list
|
continue
|
||||||
if _, ok := hostsFailed.Load(node.Name); ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// exclude hosts that are newly added to the cluster since they can take time to come up
|
// exclude hosts that are newly added to the cluster since they can take time to come up
|
||||||
if newHosts[node.Name] {
|
if newHosts[node.Labels[k8s.HostnameLabel]] {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if inactiveHosts[node.Labels[k8s.HostnameLabel]] {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if val, ok := node.Labels[k8s.IgnoreHostDuringUpgradeLabel]; ok && val == "true" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
nodeList = append(nodeList, node)
|
nodeList = append(nodeList, node)
|
||||||
|
@ -53,13 +53,9 @@ func RunWorkerPlane(ctx context.Context, allHosts []*hosts.Host, localConnDialer
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func UpgradeWorkerPlane(ctx context.Context, kubeClient *kubernetes.Clientset, multipleRolesHosts []*hosts.Host, workerOnlyHosts []*hosts.Host, inactiveHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts map[string]bool) (string, error) {
|
func UpgradeWorkerPlaneForWorkerAndEtcdNodes(ctx context.Context, kubeClient *kubernetes.Clientset, mixedRolesHosts []*hosts.Host, workerOnlyHosts []*hosts.Host, inactiveHosts map[string]bool, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts map[string]bool, maxUnavailable int) (string, error) {
|
||||||
log.Infof(ctx, "[%s] Upgrading Worker Plane..", WorkerRole)
|
log.Infof(ctx, "[%s] Upgrading Worker Plane..", WorkerRole)
|
||||||
var errMsgMaxUnavailableNotFailed string
|
var errMsgMaxUnavailableNotFailed string
|
||||||
maxUnavailable, err := CalculateMaxUnavailable(upgradeStrategy.MaxUnavailable, len(workerOnlyHosts))
|
|
||||||
if err != nil {
|
|
||||||
return errMsgMaxUnavailableNotFailed, err
|
|
||||||
}
|
|
||||||
if maxUnavailable > WorkerThreads {
|
if maxUnavailable > WorkerThreads {
|
||||||
/* upgrading a large number of nodes in parallel leads to a large number of goroutines, which has led to errors regarding too many open sockets
|
/* upgrading a large number of nodes in parallel leads to a large number of goroutines, which has led to errors regarding too many open sockets
|
||||||
Because of this RKE switched to using workerpools. 50 workerthreads has been sufficient to optimize rke up, upgrading at most 50 nodes in parallel.
|
Because of this RKE switched to using workerpools. 50 workerthreads has been sufficient to optimize rke up, upgrading at most 50 nodes in parallel.
|
||||||
@ -68,18 +64,25 @@ func UpgradeWorkerPlane(ctx context.Context, kubeClient *kubernetes.Clientset, m
|
|||||||
logrus.Info("Setting maxUnavailable to 50, to avoid issues related to upgrading large number of nodes in parallel")
|
logrus.Info("Setting maxUnavailable to 50, to avoid issues related to upgrading large number of nodes in parallel")
|
||||||
}
|
}
|
||||||
|
|
||||||
maxUnavailable -= len(inactiveHosts)
|
if len(inactiveHosts) > 0 {
|
||||||
|
maxUnavailable -= len(inactiveHosts)
|
||||||
|
logrus.Infof("Resetting maxUnavailable to %v since %v host(s) are found to be inactive/unavailable prior to upgrade", maxUnavailable, len(inactiveHosts))
|
||||||
|
}
|
||||||
|
|
||||||
updateNewHostsList(kubeClient, append(multipleRolesHosts, workerOnlyHosts...), newHosts)
|
updateNewHostsList(kubeClient, append(mixedRolesHosts, workerOnlyHosts...), newHosts)
|
||||||
log.Infof(ctx, "First checking and processing worker components for upgrades on nodes with etcd/controlplane roles one at a time")
|
if len(mixedRolesHosts) > 0 {
|
||||||
multipleRolesHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, multipleRolesHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, 1, upgradeStrategy, newHosts)
|
log.Infof(ctx, "First checking and processing worker components for upgrades on nodes with etcd role one at a time")
|
||||||
|
}
|
||||||
|
multipleRolesHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, mixedRolesHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, 1, upgradeStrategy, newHosts, inactiveHosts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Errorf("Failed to upgrade hosts: %v with error %v", strings.Join(multipleRolesHostsFailedToUpgrade, ","), err)
|
logrus.Errorf("Failed to upgrade hosts: %v with error %v", strings.Join(multipleRolesHostsFailedToUpgrade, ","), err)
|
||||||
return errMsgMaxUnavailableNotFailed, err
|
return errMsgMaxUnavailableNotFailed, err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof(ctx, "Now checking and upgrading worker components on nodes with only worker role %v at a time", maxUnavailable)
|
if len(workerOnlyHosts) > 0 {
|
||||||
workerOnlyHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, workerOnlyHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, maxUnavailable, upgradeStrategy, newHosts)
|
log.Infof(ctx, "Now checking and upgrading worker components on nodes with only worker role %v at a time", maxUnavailable)
|
||||||
|
}
|
||||||
|
workerOnlyHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, workerOnlyHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, maxUnavailable, upgradeStrategy, newHosts, inactiveHosts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Errorf("Failed to upgrade hosts: %v with error %v", strings.Join(workerOnlyHostsFailedToUpgrade, ","), err)
|
logrus.Errorf("Failed to upgrade hosts: %v with error %v", strings.Join(workerOnlyHostsFailedToUpgrade, ","), err)
|
||||||
if len(workerOnlyHostsFailedToUpgrade) >= maxUnavailable {
|
if len(workerOnlyHostsFailedToUpgrade) >= maxUnavailable {
|
||||||
@ -105,7 +108,7 @@ func CalculateMaxUnavailable(maxUnavailableVal string, numHosts int) (int, error
|
|||||||
// In case there is only one node and rounding down maxUnvailable percentage led to 0
|
// In case there is only one node and rounding down maxUnvailable percentage led to 0
|
||||||
maxUnavailable = 1
|
maxUnavailable = 1
|
||||||
}
|
}
|
||||||
logrus.Infof("%v worker nodes can be unavailable at a time", maxUnavailable)
|
logrus.Debugf("Parsed value of maxUnavailable: %v", maxUnavailable)
|
||||||
return maxUnavailable, nil
|
return maxUnavailable, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -119,8 +122,9 @@ func updateNewHostsList(kubeClient *kubernetes.Clientset, allHosts []*hosts.Host
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Clientset, allHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string,
|
func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Clientset, allHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory,
|
||||||
maxUnavailable int, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts map[string]bool) ([]string, error) {
|
prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string,
|
||||||
|
maxUnavailable int, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool) ([]string, error) {
|
||||||
var errgrp errgroup.Group
|
var errgrp errgroup.Group
|
||||||
var drainHelper drain.Helper
|
var drainHelper drain.Helper
|
||||||
var failedHosts []string
|
var failedHosts []string
|
||||||
@ -130,6 +134,12 @@ func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Cl
|
|||||||
hostsQueue := util.GetObjectQueue(allHosts)
|
hostsQueue := util.GetObjectQueue(allHosts)
|
||||||
if upgradeStrategy.Drain {
|
if upgradeStrategy.Drain {
|
||||||
drainHelper = getDrainHelper(kubeClient, *upgradeStrategy)
|
drainHelper = getDrainHelper(kubeClient, *upgradeStrategy)
|
||||||
|
log.Infof(ctx, "[%s] Parameters provided to drain command: %#v", WorkerRole, fmt.Sprintf("Force: %v, IgnoreAllDaemonSets: %v, DeleteLocalData: %v, Timeout: %v, GracePeriodSeconds: %v", drainHelper.Force, drainHelper.IgnoreAllDaemonSets, drainHelper.DeleteLocalData, drainHelper.Timeout, drainHelper.GracePeriodSeconds))
|
||||||
|
|
||||||
|
}
|
||||||
|
currentHostsPool := make(map[string]bool)
|
||||||
|
for _, host := range allHosts {
|
||||||
|
currentHostsPool[host.HostnameOverride] = true
|
||||||
}
|
}
|
||||||
/* Each worker thread starts a goroutine that reads the hostsQueue channel in a for loop
|
/* Each worker thread starts a goroutine that reads the hostsQueue channel in a for loop
|
||||||
Using same number of worker threads as maxUnavailable ensures only maxUnavailable number of nodes are being processed at a time
|
Using same number of worker threads as maxUnavailable ensures only maxUnavailable number of nodes are being processed at a time
|
||||||
@ -149,14 +159,14 @@ func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Cl
|
|||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
nodes, err := getNodeListForUpgrade(kubeClient, &hostsFailed, newHosts, true)
|
nodes, err := getNodeListForUpgrade(kubeClient, &hostsFailed, newHosts, inactiveHosts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errList = append(errList, err)
|
errList = append(errList, err)
|
||||||
}
|
}
|
||||||
var maxUnavailableHit bool
|
var maxUnavailableHit bool
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
// in case any previously added nodes or till now unprocessed nodes become unreachable during upgrade
|
// in case any previously added nodes or till now unprocessed nodes become unreachable during upgrade
|
||||||
if !k8s.IsNodeReady(node) {
|
if !k8s.IsNodeReady(node) && currentHostsPool[node.Labels[k8s.HostnameLabel]] {
|
||||||
if len(hostsFailedToUpgrade) >= maxUnavailable {
|
if len(hostsFailedToUpgrade) >= maxUnavailable {
|
||||||
maxUnavailableHit = true
|
maxUnavailableHit = true
|
||||||
break
|
break
|
||||||
|
Loading…
Reference in New Issue
Block a user