1
0
mirror of https://github.com/rancher/rke.git synced 2025-05-10 01:15:43 +00:00

Change RKE upgrade logic for zero downtime

This commit is contained in:
rajashree 2020-02-04 11:27:52 -08:00
parent 2fc40203ed
commit 11678a3f98
10 changed files with 628 additions and 27 deletions

2
.gitignore vendored
View File

@ -8,3 +8,5 @@
kube_config*
/rke
.vscode
cluster.rkestate
cluster.yml

View File

@ -65,6 +65,7 @@ type Cluster struct {
v3.RancherKubernetesEngineConfig `yaml:",inline"`
WorkerHosts []*hosts.Host
EncryptionConfig encryptionConfig
NewHosts map[string]bool
}
type encryptionConfig struct {
@ -98,7 +99,12 @@ const (
SystemNamespace = "kube-system"
)
func (c *Cluster) DeployControlPlane(ctx context.Context, svcOptionData map[string]*v3.KubernetesServicesOptions) error {
func (c *Cluster) DeployControlPlane(ctx context.Context, svcOptionData map[string]*v3.KubernetesServicesOptions, reconcileCluster bool) error {
kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
if err != nil {
return fmt.Errorf("failed to initialize new kubernetes client: %v", err)
}
// Deploy Etcd Plane
etcdNodePlanMap := make(map[string]v3.RKEConfigNodePlan)
// Build etcd node plan map
@ -120,37 +126,73 @@ func (c *Cluster) DeployControlPlane(ctx context.Context, svcOptionData map[stri
for _, cpHost := range c.ControlPlaneHosts {
cpNodePlanMap[cpHost.Address] = BuildRKEConfigNodePlan(ctx, c, cpHost, cpHost.DockerInfo, svcOptionData)
}
if err := services.RunControlPlane(ctx, c.ControlPlaneHosts,
if !reconcileCluster {
if err := services.RunControlPlane(ctx, c.ControlPlaneHosts,
c.LocalConnDialerFactory,
c.PrivateRegistriesMap,
cpNodePlanMap,
c.UpdateWorkersOnly,
c.SystemImages.Alpine,
c.Certificates); err != nil {
return fmt.Errorf("[controlPlane] Failed to bring up Control Plane: %v", err)
}
return nil
}
if err := services.UpgradeControlPlane(ctx, kubeClient, c.ControlPlaneHosts,
c.LocalConnDialerFactory,
c.PrivateRegistriesMap,
cpNodePlanMap,
c.UpdateWorkersOnly,
c.SystemImages.Alpine,
c.Certificates); err != nil {
return fmt.Errorf("[controlPlane] Failed to bring up Control Plane: %v", err)
c.Certificates, c.UpgradeStrategy, c.NewHosts); err != nil {
return fmt.Errorf("[controlPlane] Failed to upgrade Control Plane: %v", err)
}
return nil
}
func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[string]*v3.KubernetesServicesOptions) error {
func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[string]*v3.KubernetesServicesOptions, reconcileCluster bool) (string, error) {
var workerOnlyHosts, multipleRolesHosts []*hosts.Host
kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
if err != nil {
return "", fmt.Errorf("failed to initialize new kubernetes client: %v", err)
}
// Deploy Worker plane
workerNodePlanMap := make(map[string]v3.RKEConfigNodePlan)
// Build cp node plan map
allHosts := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts)
for _, workerHost := range allHosts {
workerNodePlanMap[workerHost.Address] = BuildRKEConfigNodePlan(ctx, c, workerHost, workerHost.DockerInfo, svcOptionData)
if !workerHost.IsControl && !workerHost.IsEtcd {
workerOnlyHosts = append(workerOnlyHosts, workerHost)
} else {
multipleRolesHosts = append(multipleRolesHosts, workerHost)
}
}
if err := services.RunWorkerPlane(ctx, allHosts,
if !reconcileCluster {
if err := services.RunWorkerPlane(ctx, allHosts,
c.LocalConnDialerFactory,
c.PrivateRegistriesMap,
workerNodePlanMap,
c.Certificates,
c.UpdateWorkersOnly,
c.SystemImages.Alpine); err != nil {
return "", fmt.Errorf("[workerPlane] Failed to bring up Worker Plane: %v", err)
}
return "", nil
}
errMsgMaxUnavailableNotFailed, err := services.UpgradeWorkerPlane(ctx, kubeClient, multipleRolesHosts, workerOnlyHosts, c.InactiveHosts,
c.LocalConnDialerFactory,
c.PrivateRegistriesMap,
workerNodePlanMap,
c.Certificates,
c.UpdateWorkersOnly,
c.SystemImages.Alpine); err != nil {
return fmt.Errorf("[workerPlane] Failed to bring up Worker Plane: %v", err)
c.SystemImages.Alpine, c.UpgradeStrategy, c.NewHosts)
if err != nil {
return "", fmt.Errorf("[workerPlane] Failed to upgrade Worker Plane: %v", err)
}
return nil
return errMsgMaxUnavailableNotFailed, nil
}
func parseAuditLogConfig(clusterFile string, rkeConfig *v3.RancherKubernetesEngineConfig) error {
@ -328,6 +370,60 @@ func parseIngressExtraVolumeMounts(ingressMap map[string]interface{}, rkeConfig
return nil
}
func parseNodeDrainInput(clusterFile string, rkeConfig *v3.RancherKubernetesEngineConfig) error {
// setting some defaults here because for these fields there's no way of differentiating between user provided null value vs golang setting it to null during unmarshal
if rkeConfig.UpgradeStrategy == nil || rkeConfig.UpgradeStrategy.DrainInput == nil {
return nil
}
var config map[string]interface{}
err := ghodssyaml.Unmarshal([]byte(clusterFile), &config)
if err != nil {
return fmt.Errorf("[parseNodeDrainInput] error unmarshalling: %v", err)
}
upgradeStrategy, err := convert.EncodeToMap(config["upgrade_strategy"])
if err != nil {
return err
}
nodeDrainInputMap, err := convert.EncodeToMap(upgradeStrategy["node_drain_input"])
if err != nil {
return err
}
nodeDrainInputBytes, err := ghodssyaml.Marshal(nodeDrainInputMap)
if err != nil {
return err
}
// this will only have fields that user set and none of the default empty values
var nodeDrainInput v3.NodeDrainInput
if err := ghodssyaml.Unmarshal(nodeDrainInputBytes, &nodeDrainInput); err != nil {
return err
}
var update bool
if _, ok := nodeDrainInputMap["ignore_daemonsets"]; !ok {
// user hasn't provided any input, default to true
nodeDrainInput.IgnoreDaemonSets = DefaultNodeDrainIgnoreDaemonsets
update = true
}
if _, ok := nodeDrainInputMap["timeout"]; !ok {
// user hasn't provided any input, default to 120
nodeDrainInput.Timeout = DefaultNodeDrainTimeout
update = true
}
if providedGracePeriod, ok := nodeDrainInputMap["grace_period"].(float64); !ok {
// user hasn't provided any input, default to -1
nodeDrainInput.GracePeriod = DefaultNodeDrainGracePeriod
update = true
} else {
// TODO: ghodssyaml.Marshal is losing the user provided value for GracePeriod, investigate why, till then assign the provided value explicitly
nodeDrainInput.GracePeriod = int(providedGracePeriod)
}
if update {
rkeConfig.UpgradeStrategy.DrainInput = &nodeDrainInput
}
return nil
}
func ParseConfig(clusterFile string) (*v3.RancherKubernetesEngineConfig, error) {
logrus.Debugf("Parsing cluster file [%v]", clusterFile)
var rkeConfig v3.RancherKubernetesEngineConfig
@ -356,6 +452,9 @@ func ParseConfig(clusterFile string) (*v3.RancherKubernetesEngineConfig, error)
if err := parseIngressConfig(clusterFile, &rkeConfig); err != nil {
return &rkeConfig, fmt.Errorf("error parsing ingress config: %v", err)
}
if err := parseNodeDrainInput(clusterFile, &rkeConfig); err != nil {
return &rkeConfig, fmt.Errorf("error parsing upgrade strategy and node drain input: %v", err)
}
return &rkeConfig, nil
}

View File

@ -76,6 +76,11 @@ const (
KubeAPIArgAuditPolicyFile = "audit-policy-file"
DefaultKubeAPIArgAuditLogPathValue = "/var/log/kube-audit/audit-log.json"
DefaultKubeAPIArgAuditPolicyFileValue = "/etc/kubernetes/audit-policy.yaml"
DefaultMaxUnavailable = "10%"
DefaultNodeDrainTimeout = 120
DefaultNodeDrainGracePeriod = -1
DefaultNodeDrainIgnoreDaemonsets = true
)
type ExternalFlags struct {
@ -188,10 +193,35 @@ func (c *Cluster) setClusterDefaults(ctx context.Context, flags ExternalFlags) e
c.setClusterServicesDefaults()
c.setClusterNetworkDefaults()
c.setClusterAuthnDefaults()
c.setNodeUpgradeStrategy()
return nil
}
func (c *Cluster) setNodeUpgradeStrategy() {
if c.UpgradeStrategy == nil {
logrus.Info("No input provided for maxUnavailable, setting it to default value of 10%")
c.UpgradeStrategy = &v3.NodeUpgradeStrategy{
MaxUnavailable: DefaultMaxUnavailable,
}
return
}
setDefaultIfEmpty(&c.UpgradeStrategy.MaxUnavailable, DefaultMaxUnavailable)
if !c.UpgradeStrategy.Drain {
return
}
if c.UpgradeStrategy.DrainInput == nil {
c.UpgradeStrategy.DrainInput = &v3.NodeDrainInput{
IgnoreDaemonSets: DefaultNodeDrainIgnoreDaemonsets,
// default to 120 seems to work better for controlplane nodes
Timeout: DefaultNodeDrainTimeout,
//Period of time in seconds given to each pod to terminate gracefully.
// If negative, the default value specified in the pod will be used
GracePeriod: DefaultNodeDrainGracePeriod,
}
}
}
func (c *Cluster) setClusterServicesDefaults() {
// We don't accept per service images anymore.
c.Services.KubeAPI.Image = c.SystemImages.Kubernetes

View File

@ -3,14 +3,14 @@ package cluster
import (
"context"
"fmt"
"github.com/rancher/rke/metadata"
"k8s.io/api/core/v1"
"strings"
"github.com/rancher/rke/log"
"github.com/rancher/rke/metadata"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/services"
"github.com/rancher/rke/util"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/validation"
)
@ -196,6 +196,39 @@ func ValidateHostCount(c *Cluster) error {
return nil
}
func (c *Cluster) ValidateHostCountForUpgrade() error {
var inactiveControlPlaneHosts, inactiveWorkerOnlyHosts []string
var workerOnlyHosts int
for _, host := range c.InactiveHosts {
if host.IsControl {
inactiveControlPlaneHosts = append(inactiveControlPlaneHosts, host.HostnameOverride)
}
if !host.IsEtcd && !host.IsControl {
inactiveWorkerOnlyHosts = append(inactiveWorkerOnlyHosts, host.HostnameOverride)
}
// not breaking out of the loop so we can log all of the inactive hosts
}
if len(inactiveControlPlaneHosts) >= 1 {
return fmt.Errorf("cannot proceed with upgrade of controlplane if one or more controlplane hosts are inactive; found inactive hosts: %v", strings.Join(inactiveControlPlaneHosts, ","))
}
for _, host := range c.WorkerHosts {
if host.IsControl || host.IsEtcd {
continue
}
workerOnlyHosts++
}
maxUnavailable, err := services.CalculateMaxUnavailable(c.UpgradeStrategy.MaxUnavailable, workerOnlyHosts)
if err != nil {
return err
}
if len(inactiveWorkerOnlyHosts) >= maxUnavailable {
return fmt.Errorf("cannot proceed with upgrade of worker components since %v (>=maxUnavailable) hosts are inactive; found inactive hosts: %v", len(inactiveWorkerOnlyHosts), strings.Join(inactiveWorkerOnlyHosts, ","))
}
return nil
}
func validateDuplicateNodes(c *Cluster) error {
for i := range c.Nodes {
for j := range c.Nodes {

View File

@ -185,7 +185,7 @@ func rebuildClusterWithRotatedCertificates(ctx context.Context,
}
if isLegacyKubeAPI {
log.Infof(ctx, "[controlplane] Redeploying controlplane to update kubeapi parameters")
if err := kubeCluster.DeployControlPlane(ctx, svcOptionData); err != nil {
if err := kubeCluster.DeployControlPlane(ctx, svcOptionData, true); err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
}

View File

@ -79,6 +79,7 @@ func UpCommand() cli.Command {
func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags cluster.ExternalFlags, data map[string]interface{}) (string, string, string, string, map[string]pki.CertificatePKI, error) {
var APIURL, caCrt, clientCert, clientKey string
var reconcileCluster bool
clusterState, err := cluster.ReadStateFile(ctx, cluster.GetStateFilePath(flags.ClusterFilePath, flags.ConfigDir))
if err != nil {
@ -113,11 +114,34 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c
return APIURL, caCrt, clientCert, clientKey, nil, err
}
err = kubeCluster.ValidateHostCountForUpgrade()
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
currentCluster, err := kubeCluster.GetClusterState(ctx, clusterState)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
if currentCluster != nil {
// reconcile this cluster, to check if upgrade is needed, or new nodes are getting added/removed
/*This is to separate newly added nodes, so we don't try to check their status/cordon them before upgrade.
This will also cover nodes that were considered inactive first time cluster was provisioned, but are now active during upgrade*/
currentClusterNodes := make(map[string]bool)
for _, node := range clusterState.CurrentState.RancherKubernetesEngineConfig.Nodes {
currentClusterNodes[node.HostnameOverride] = true
}
newNodes := make(map[string]bool)
for _, node := range clusterState.DesiredState.RancherKubernetesEngineConfig.Nodes {
if !currentClusterNodes[node.HostnameOverride] {
newNodes[node.HostnameOverride] = true
}
}
kubeCluster.NewHosts = newNodes
reconcileCluster = true
}
if !flags.DisablePortCheck {
if err = kubeCluster.CheckClusterPorts(ctx, currentCluster); err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
@ -158,7 +182,7 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c
return APIURL, caCrt, clientCert, clientKey, nil, err
}
err = kubeCluster.DeployControlPlane(ctx, svcOptionsData)
err = kubeCluster.DeployControlPlane(ctx, svcOptionsData, reconcileCluster)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
@ -178,7 +202,7 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c
return APIURL, caCrt, clientCert, clientKey, nil, err
}
err = kubeCluster.DeployWorkerPlane(ctx, svcOptionsData)
errMsgMaxUnavailableNotFailed, err := kubeCluster.DeployWorkerPlane(ctx, svcOptionsData, reconcileCluster)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
@ -206,6 +230,9 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c
return APIURL, caCrt, clientCert, clientKey, nil, err
}
if errMsgMaxUnavailableNotFailed != "" {
return APIURL, caCrt, clientCert, clientKey, nil, fmt.Errorf(errMsgMaxUnavailableNotFailed)
}
log.Infof(ctx, "Finished building Kubernetes cluster successfully")
return APIURL, caCrt, clientCert, clientKey, kubeCluster.Certificates, nil
}

View File

@ -18,6 +18,8 @@ const (
InternalAddressAnnotation = "rke.cattle.io/internal-ip"
ExternalAddressAnnotation = "rke.cattle.io/external-ip"
AWSCloudProvider = "aws"
MaxRetries = 5
RetryInterval = 5
)
func DeleteNode(k8sClient *kubernetes.Clientset, nodeName, cloudProvider string) error {
@ -37,26 +39,35 @@ func GetNodeList(k8sClient *kubernetes.Clientset) (*v1.NodeList, error) {
}
func GetNode(k8sClient *kubernetes.Clientset, nodeName string) (*v1.Node, error) {
nodes, err := GetNodeList(k8sClient)
if err != nil {
return nil, err
}
for _, node := range nodes.Items {
if strings.ToLower(node.Labels[HostnameLabel]) == strings.ToLower(nodeName) {
return &node, nil
var listErr error
for retries := 0; retries < MaxRetries; retries++ {
nodes, err := GetNodeList(k8sClient)
if err != nil {
listErr = err
time.Sleep(time.Second * RetryInterval)
continue
}
for _, node := range nodes.Items {
if strings.ToLower(node.Labels[HostnameLabel]) == strings.ToLower(nodeName) {
return &node, nil
}
}
time.Sleep(time.Second * RetryInterval)
}
if listErr != nil {
return nil, listErr
}
return nil, apierrors.NewNotFound(schema.GroupResource{}, nodeName)
}
func CordonUncordon(k8sClient *kubernetes.Clientset, nodeName string, cordoned bool) error {
updated := false
for retries := 0; retries <= 5; retries++ {
for retries := 0; retries < MaxRetries; retries++ {
node, err := GetNode(k8sClient, nodeName)
if err != nil {
logrus.Debugf("Error getting node %s: %v", nodeName, err)
time.Sleep(time.Second * 5)
continue
// no need to retry here since GetNode already retries
return err
}
if node.Spec.Unschedulable == cordoned {
logrus.Debugf("Node %s is already cordoned: %v", nodeName, cordoned)
@ -66,7 +77,7 @@ func CordonUncordon(k8sClient *kubernetes.Clientset, nodeName string, cordoned b
_, err = k8sClient.CoreV1().Nodes().Update(node)
if err != nil {
logrus.Debugf("Error setting cordoned state for node %s: %v", nodeName, err)
time.Sleep(time.Second * 5)
time.Sleep(time.Second * RetryInterval)
continue
}
updated = true
@ -80,7 +91,7 @@ func CordonUncordon(k8sClient *kubernetes.Clientset, nodeName string, cordoned b
func IsNodeReady(node v1.Node) bool {
nodeConditions := node.Status.Conditions
for _, condition := range nodeConditions {
if condition.Type == "Ready" && condition.Status == v1.ConditionTrue {
if condition.Type == v1.NodeReady && condition.Status == v1.ConditionTrue {
return true
}
}

View File

@ -2,13 +2,21 @@ package services
import (
"context"
"strings"
"sync"
"github.com/docker/docker/client"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/k8s"
"github.com/rancher/rke/log"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/util"
v3 "github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"k8s.io/client-go/kubernetes"
"k8s.io/kubectl/pkg/drain"
)
func RunControlPlane(ctx context.Context, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string, certMap map[string]pki.CertificatePKI) error {
@ -39,6 +47,80 @@ func RunControlPlane(ctx context.Context, controlHosts []*hosts.Host, localConnD
return nil
}
func UpgradeControlPlane(ctx context.Context, kubeClient *kubernetes.Clientset, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string, certMap map[string]pki.CertificatePKI, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts map[string]bool) error {
if updateWorkersOnly {
return nil
}
var drainHelper drain.Helper
log.Infof(ctx, "[%s] Processing control plane components for upgrade one at a time", ControlRole)
if len(newHosts) > 0 {
var nodes []string
for _, host := range controlHosts {
if newHosts[host.HostnameOverride] {
nodes = append(nodes, host.HostnameOverride)
}
}
if len(nodes) > 0 {
log.Infof(ctx, "[%s] Adding controlplane nodes %v to the cluster", ControlRole, strings.Join(nodes, ","))
}
}
if upgradeStrategy.Drain {
drainHelper = getDrainHelper(kubeClient, *upgradeStrategy)
}
// upgrade control plane hosts one at a time for zero downtime upgrades
for _, host := range controlHosts {
log.Infof(ctx, "Processing controlplane host %v", host.HostnameOverride)
if newHosts[host.HostnameOverride] {
if err := doDeployControlHost(ctx, host, localConnDialerFactory, prsMap, cpNodePlanMap[host.Address].Processes, alpineImage, certMap); err != nil {
return err
}
continue
}
nodes, err := getNodeListForUpgrade(kubeClient, &sync.Map{}, newHosts, false)
if err != nil {
return err
}
var maxUnavailableHit bool
for _, node := range nodes {
// in case any previously added nodes or till now unprocessed nodes become unreachable during upgrade
if !k8s.IsNodeReady(node) {
maxUnavailableHit = true
break
}
}
if maxUnavailableHit {
return err
}
upgradable, err := isControlPlaneHostUpgradable(ctx, host, cpNodePlanMap[host.Address].Processes)
if err != nil {
return err
}
if !upgradable {
log.Infof(ctx, "Upgrade not required for controlplane components of host %v", host.HostnameOverride)
continue
}
if err := checkNodeReady(kubeClient, host, ControlRole); err != nil {
return err
}
if err := cordonAndDrainNode(kubeClient, host, upgradeStrategy.Drain, drainHelper, ControlRole); err != nil {
return err
}
if err := doDeployControlHost(ctx, host, localConnDialerFactory, prsMap, cpNodePlanMap[host.Address].Processes, alpineImage, certMap); err != nil {
return err
}
if err := checkNodeReady(kubeClient, host, ControlRole); err != nil {
return err
}
if err := k8s.CordonUncordon(kubeClient, host.HostnameOverride, false); err != nil {
return err
}
}
log.Infof(ctx, "[%s] Successfully upgraded Controller Plane..", ControlRole)
return nil
}
func RemoveControlPlane(ctx context.Context, controlHosts []*hosts.Host, force bool) error {
log.Infof(ctx, "[%s] Tearing down the Controller Plane..", ControlRole)
var errgrp errgroup.Group
@ -139,3 +221,26 @@ func doDeployControlHost(ctx context.Context, host *hosts.Host, localConnDialerF
// run scheduler
return runScheduler(ctx, host, localConnDialerFactory, prsMap, processMap[SchedulerContainerName], alpineImage)
}
func isControlPlaneHostUpgradable(ctx context.Context, host *hosts.Host, processMap map[string]v3.Process) (bool, error) {
for _, service := range []string{SidekickContainerName, KubeAPIContainerName, KubeControllerContainerName, SchedulerContainerName} {
process := processMap[service]
imageCfg, hostCfg, _ := GetProcessConfig(process, host)
upgradable, err := docker.IsContainerUpgradable(ctx, host.DClient, imageCfg, hostCfg, service, host.Address, ControlRole)
if err != nil {
if client.IsErrNotFound(err) {
// doDeployControlHost should be called so this container gets recreated
logrus.Debugf("[%s] Host %v is upgradable because %v needs to run", ControlRole, host.HostnameOverride, service)
return true, nil
}
return false, err
}
if upgradable {
logrus.Debugf("[%s] Host %v is upgradable because %v has changed", ControlRole, host.HostnameOverride, service)
// host upgradable even if a single service is upgradable
return true, nil
}
}
logrus.Debugf("[%s] Host %v is not upgradable", ControlRole, host.HostnameOverride)
return false, nil
}

83
services/node_util.go Normal file
View File

@ -0,0 +1,83 @@
package services
import (
"bytes"
"fmt"
"sync"
"time"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/k8s"
v3 "github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/kubectl/pkg/drain"
)
func checkNodeReady(kubeClient *kubernetes.Clientset, runHost *hosts.Host, component string) error {
for retries := 0; retries < k8s.MaxRetries; retries++ {
logrus.Debugf("[%s] Now checking status of node %v", component, runHost.HostnameOverride)
k8sNode, err := k8s.GetNode(kubeClient, runHost.HostnameOverride)
if err != nil {
return fmt.Errorf("[%s] Error getting node %v: %v", component, runHost.HostnameOverride, err)
}
logrus.Debugf("[%s] Found node by name %s", component, runHost.HostnameOverride)
if k8s.IsNodeReady(*k8sNode) {
return nil
}
time.Sleep(time.Second * k8s.RetryInterval)
}
return fmt.Errorf("host %v not ready", runHost.HostnameOverride)
}
func cordonAndDrainNode(kubeClient *kubernetes.Clientset, host *hosts.Host, drainNode bool, drainHelper drain.Helper, component string) error {
logrus.Debugf("[%s] Cordoning node %v", component, host.HostnameOverride)
if err := k8s.CordonUncordon(kubeClient, host.HostnameOverride, true); err != nil {
return err
}
if !drainNode {
return nil
}
logrus.Debugf("[%s] Draining node %v", component, host.HostnameOverride)
if err := drain.RunNodeDrain(&drainHelper, host.HostnameOverride); err != nil {
return fmt.Errorf("error draining node %v: %v", host.HostnameOverride, err)
}
return nil
}
func getDrainHelper(kubeClient *kubernetes.Clientset, upgradeStrategy v3.NodeUpgradeStrategy) drain.Helper {
drainHelper := drain.Helper{
Client: kubeClient,
Force: upgradeStrategy.DrainInput.Force,
IgnoreAllDaemonSets: upgradeStrategy.DrainInput.IgnoreDaemonSets,
DeleteLocalData: upgradeStrategy.DrainInput.DeleteLocalData,
GracePeriodSeconds: upgradeStrategy.DrainInput.GracePeriod,
Timeout: time.Second * time.Duration(upgradeStrategy.DrainInput.Timeout),
Out: bytes.NewBuffer([]byte{}),
ErrOut: bytes.NewBuffer([]byte{}),
}
return drainHelper
}
func getNodeListForUpgrade(kubeClient *kubernetes.Clientset, hostsFailed *sync.Map, newHosts map[string]bool, isUpgradeForWorkerPlane bool) ([]v1.Node, error) {
var nodeList []v1.Node
nodes, err := k8s.GetNodeList(kubeClient)
if err != nil {
return nodeList, err
}
for _, node := range nodes.Items {
if isUpgradeForWorkerPlane {
// exclude hosts that are already included in failed hosts list
if _, ok := hostsFailed.Load(node.Name); ok {
continue
}
}
// exclude hosts that are newly added to the cluster since they can take time to come up
if newHosts[node.Name] {
continue
}
nodeList = append(nodeList, node)
}
return nodeList, nil
}

View File

@ -2,13 +2,24 @@ package services
import (
"context"
"fmt"
"strings"
"sync"
"github.com/docker/docker/client"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/k8s"
"github.com/rancher/rke/log"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/util"
v3 "github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
apierrors "k8s.io/apimachinery/pkg/api/errors"
k8sutil "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"k8s.io/kubectl/pkg/drain"
)
const (
@ -42,6 +53,179 @@ func RunWorkerPlane(ctx context.Context, allHosts []*hosts.Host, localConnDialer
return nil
}
func UpgradeWorkerPlane(ctx context.Context, kubeClient *kubernetes.Clientset, multipleRolesHosts []*hosts.Host, workerOnlyHosts []*hosts.Host, inactiveHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts map[string]bool) (string, error) {
log.Infof(ctx, "[%s] Upgrading Worker Plane..", WorkerRole)
var errMsgMaxUnavailableNotFailed string
maxUnavailable, err := CalculateMaxUnavailable(upgradeStrategy.MaxUnavailable, len(workerOnlyHosts))
if err != nil {
return errMsgMaxUnavailableNotFailed, err
}
if maxUnavailable > WorkerThreads {
/* upgrading a large number of nodes in parallel leads to a large number of goroutines, which has led to errors regarding too many open sockets
Because of this RKE switched to using workerpools. 50 workerthreads has been sufficient to optimize rke up, upgrading at most 50 nodes in parallel.
So the user configurable maxUnavailable will be respected only as long as it's less than 50 and capped at 50 */
maxUnavailable = WorkerThreads
logrus.Info("Setting maxUnavailable to 50, to avoid issues related to upgrading large number of nodes in parallel")
}
maxUnavailable -= len(inactiveHosts)
updateNewHostsList(kubeClient, append(multipleRolesHosts, workerOnlyHosts...), newHosts)
log.Infof(ctx, "First checking and processing worker components for upgrades on nodes with etcd/controlplane roles one at a time")
multipleRolesHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, multipleRolesHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, 1, upgradeStrategy, newHosts)
if err != nil {
logrus.Errorf("Failed to upgrade hosts: %v with error %v", strings.Join(multipleRolesHostsFailedToUpgrade, ","), err)
return errMsgMaxUnavailableNotFailed, err
}
log.Infof(ctx, "Now checking and upgrading worker components on nodes with only worker role %v at a time", maxUnavailable)
workerOnlyHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, workerOnlyHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, maxUnavailable, upgradeStrategy, newHosts)
if err != nil {
logrus.Errorf("Failed to upgrade hosts: %v with error %v", strings.Join(workerOnlyHostsFailedToUpgrade, ","), err)
if len(workerOnlyHostsFailedToUpgrade) >= maxUnavailable {
return errMsgMaxUnavailableNotFailed, err
}
errMsgMaxUnavailableNotFailed = fmt.Sprintf("Failed to upgrade hosts: %v with error %v", strings.Join(workerOnlyHostsFailedToUpgrade, ","), err)
}
log.Infof(ctx, "[%s] Successfully upgraded Worker Plane..", WorkerRole)
return errMsgMaxUnavailableNotFailed, nil
}
func CalculateMaxUnavailable(maxUnavailableVal string, numHosts int) (int, error) {
// if maxUnavailable is given in percent, round down
maxUnavailableParsed := k8sutil.Parse(maxUnavailableVal)
logrus.Debugf("Provided value for maxUnavailable: %v", maxUnavailableParsed)
maxUnavailable, err := k8sutil.GetValueFromIntOrPercent(&maxUnavailableParsed, numHosts, false)
if err != nil {
logrus.Errorf("Unable to parse max_unavailable, should be a number or percentage of nodes, error: %v", err)
return 0, err
}
if maxUnavailable == 0 {
// In case there is only one node and rounding down maxUnvailable percentage led to 0
maxUnavailable = 1
}
logrus.Infof("%v worker nodes can be unavailable at a time", maxUnavailable)
return maxUnavailable, nil
}
func updateNewHostsList(kubeClient *kubernetes.Clientset, allHosts []*hosts.Host, newHosts map[string]bool) {
for _, h := range allHosts {
_, err := k8s.GetNode(kubeClient, h.HostnameOverride)
if err != nil && apierrors.IsNotFound(err) {
// this host could have been added to cluster state upon successful controlplane upgrade but isn't a node yet.
newHosts[h.HostnameOverride] = true
}
}
}
func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Clientset, allHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string,
maxUnavailable int, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts map[string]bool) ([]string, error) {
var errgrp errgroup.Group
var drainHelper drain.Helper
var failedHosts []string
var hostsFailedToUpgrade = make(chan string, maxUnavailable)
var hostsFailed sync.Map
hostsQueue := util.GetObjectQueue(allHosts)
if upgradeStrategy.Drain {
drainHelper = getDrainHelper(kubeClient, *upgradeStrategy)
}
/* Each worker thread starts a goroutine that reads the hostsQueue channel in a for loop
Using same number of worker threads as maxUnavailable ensures only maxUnavailable number of nodes are being processed at a time
Node is done upgrading only after it is listed as ready and uncordoned.*/
for w := 0; w < maxUnavailable; w++ {
errgrp.Go(func() error {
var errList []error
for host := range hostsQueue {
runHost := host.(*hosts.Host)
logrus.Infof("[workerplane] Processing host %v", runHost.HostnameOverride)
if newHosts[runHost.HostnameOverride] {
if err := doDeployWorkerPlaneHost(ctx, runHost, localConnDialerFactory, prsMap, workerNodePlanMap[runHost.Address].Processes, certMap, updateWorkersOnly, alpineImage); err != nil {
errList = append(errList, err)
hostsFailedToUpgrade <- runHost.HostnameOverride
hostsFailed.Store(runHost.HostnameOverride, true)
break
}
continue
}
nodes, err := getNodeListForUpgrade(kubeClient, &hostsFailed, newHosts, true)
if err != nil {
errList = append(errList, err)
}
var maxUnavailableHit bool
for _, node := range nodes {
// in case any previously added nodes or till now unprocessed nodes become unreachable during upgrade
if !k8s.IsNodeReady(node) {
if len(hostsFailedToUpgrade) >= maxUnavailable {
maxUnavailableHit = true
break
}
hostsFailed.Store(node.Labels[k8s.HostnameLabel], true)
hostsFailedToUpgrade <- node.Labels[k8s.HostnameLabel]
errList = append(errList, fmt.Errorf("host %v not ready", node.Labels[k8s.HostnameLabel]))
}
}
if maxUnavailableHit || len(hostsFailedToUpgrade) >= maxUnavailable {
break
}
upgradable, err := isWorkerHostUpgradable(ctx, runHost, workerNodePlanMap[runHost.Address].Processes)
if err != nil {
errList = append(errList, err)
hostsFailed.Store(runHost.HostnameOverride, true)
hostsFailedToUpgrade <- runHost.HostnameOverride
break
}
if !upgradable {
logrus.Infof("[workerplane] Upgrade not required for worker components of host %v", runHost.HostnameOverride)
continue
}
if err := upgradeWorkerHost(ctx, kubeClient, runHost, upgradeStrategy.Drain, drainHelper, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage); err != nil {
errList = append(errList, err)
hostsFailed.Store(runHost.HostnameOverride, true)
hostsFailedToUpgrade <- runHost.HostnameOverride
break
}
}
return util.ErrList(errList)
})
}
err := errgrp.Wait()
close(hostsFailedToUpgrade)
if err != nil {
for host := range hostsFailedToUpgrade {
failedHosts = append(failedHosts, host)
}
}
return failedHosts, err
}
func upgradeWorkerHost(ctx context.Context, kubeClient *kubernetes.Clientset, runHost *hosts.Host, drainFlag bool, drainHelper drain.Helper,
localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool,
alpineImage string) error {
if err := checkNodeReady(kubeClient, runHost, WorkerRole); err != nil {
return err
}
// cordon and drain
if err := cordonAndDrainNode(kubeClient, runHost, drainFlag, drainHelper, WorkerRole); err != nil {
return err
}
logrus.Debugf("[workerplane] upgrading host %v", runHost.HostnameOverride)
if err := doDeployWorkerPlaneHost(ctx, runHost, localConnDialerFactory, prsMap, workerNodePlanMap[runHost.Address].Processes, certMap, updateWorkersOnly, alpineImage); err != nil {
return err
}
// consider upgrade done when kubeclient lists node as ready
if err := checkNodeReady(kubeClient, runHost, WorkerRole); err != nil {
return err
}
// uncordon node
if err := k8s.CordonUncordon(kubeClient, runHost.HostnameOverride, false); err != nil {
return err
}
return nil
}
func doDeployWorkerPlaneHost(ctx context.Context, host *hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, processMap map[string]v3.Process, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string) error {
if updateWorkersOnly {
if !host.UpdateWorker {
@ -149,3 +333,30 @@ func doDeployWorkerPlane(ctx context.Context, host *hosts.Host,
}
return runKubeproxy(ctx, host, localConnDialerFactory, prsMap, processMap[KubeproxyContainerName], alpineImage)
}
func isWorkerHostUpgradable(ctx context.Context, host *hosts.Host, processMap map[string]v3.Process) (bool, error) {
for _, service := range []string{NginxProxyContainerName, SidekickContainerName, KubeletContainerName, KubeproxyContainerName} {
process := processMap[service]
imageCfg, hostCfg, _ := GetProcessConfig(process, host)
upgradable, err := docker.IsContainerUpgradable(ctx, host.DClient, imageCfg, hostCfg, service, host.Address, WorkerRole)
if err != nil {
if client.IsErrNotFound(err) {
if service == NginxProxyContainerName && host.IsControl {
// nginxProxy should not exist on control hosts, so no changes needed
continue
}
// doDeployWorkerPlane should be called so this container gets recreated
logrus.Debugf("[%s] Host %v is upgradable because %v needs to run", WorkerRole, host.HostnameOverride, service)
return true, nil
}
return false, err
}
if upgradable {
logrus.Debugf("[%s] Host %v is upgradable because %v has changed", WorkerRole, host.HostnameOverride, service)
// host upgradable even if a single service is upgradable
return true, nil
}
}
logrus.Debugf("[%s] Host %v is not upgradable", WorkerRole, host.HostnameOverride)
return false, nil
}