1
0
mirror of https://github.com/rancher/rke.git synced 2025-07-15 08:02:56 +00:00

Merge pull request #3482 from jiaqiluo/support-127

This commit is contained in:
Jiaqi Luo 2024-01-23 16:41:54 -07:00 committed by GitHub
commit f329e90778
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 2146 additions and 1505 deletions

View File

@ -41,6 +41,26 @@
{ {
"linters": "revive", "linters": "revive",
"text": "should be of the form" "text": "should be of the form"
},
{
"linters": "revive",
"text": "unused-parameter"
},
{
"linters": "revive",
"text": "redefines-builtin-id"
},
{
"linters": "revive",
"text": "superfluous-else"
},
{
"linters": "revive",
"text": "empty-block"
},
{
"linters": "revive",
"text": "if-return: redundant if"
} }
] ]
} }

View File

@ -11,9 +11,9 @@ RUN apt-get update && \
ENV GOLANG_ARCH_amd64=amd64 GOLANG_ARCH_arm=armv6l GOLANG_ARCH_arm64=arm64 GOLANG_ARCH=GOLANG_ARCH_${ARCH} \ ENV GOLANG_ARCH_amd64=amd64 GOLANG_ARCH_arm=armv6l GOLANG_ARCH_arm64=arm64 GOLANG_ARCH=GOLANG_ARCH_${ARCH} \
GOPATH=/go PATH=/go/bin:/usr/local/go/bin:${PATH} SHELL=/bin/bash GOPATH=/go PATH=/go/bin:/usr/local/go/bin:${PATH} SHELL=/bin/bash
RUN wget -O - https://storage.googleapis.com/golang/go1.19.3.linux-${!GOLANG_ARCH}.tar.gz | tar -xzf - -C /usr/local RUN wget -O - https://storage.googleapis.com/golang/go1.20.4.linux-${!GOLANG_ARCH}.tar.gz | tar -xzf - -C /usr/local
RUN curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s v1.46.2 RUN curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s v1.52.2
ENV DOCKER_URL_amd64=https://get.docker.com/builds/Linux/x86_64/docker-1.10.3 \ ENV DOCKER_URL_amd64=https://get.docker.com/builds/Linux/x86_64/docker-1.10.3 \
DOCKER_URL_arm=https://github.com/rancher/docker/releases/download/v1.10.3-ros1/docker-1.10.3_arm \ DOCKER_URL_arm=https://github.com/rancher/docker/releases/download/v1.10.3-ros1/docker-1.10.3_arm \

View File

@ -492,7 +492,7 @@ func (c *Cluster) doAddonDeploy(ctx context.Context, addonYaml, resourceName str
if err != nil { if err != nil {
return &addonError{fmt.Sprintf("%v", err), isCritical} return &addonError{fmt.Sprintf("%v", err), isCritical}
} }
node, err := k8s.GetNode(k8sClient, c.ControlPlaneHosts[0].HostnameOverride) node, err := k8s.GetNode(k8sClient, c.ControlPlaneHosts[0].HostnameOverride, c.ControlPlaneHosts[0].InternalAddress, c.CloudProvider.Name)
if err != nil { if err != nil {
return &addonError{fmt.Sprintf("Failed to get Node [%s]: %v", c.ControlPlaneHosts[0].HostnameOverride, err), isCritical} return &addonError{fmt.Sprintf("Failed to get Node [%s]: %v", c.ControlPlaneHosts[0].HostnameOverride, err), isCritical}
} }
@ -513,7 +513,7 @@ func (c *Cluster) doAddonDelete(ctx context.Context, resourceName string, isCrit
if err != nil { if err != nil {
return &addonError{fmt.Sprintf("%v", err), isCritical} return &addonError{fmt.Sprintf("%v", err), isCritical}
} }
node, err := k8s.GetNode(k8sClient, c.ControlPlaneHosts[0].HostnameOverride) node, err := k8s.GetNode(k8sClient, c.ControlPlaneHosts[0].HostnameOverride, c.ControlPlaneHosts[0].InternalAddress, c.CloudProvider.Name)
if err != nil { if err != nil {
return &addonError{fmt.Sprintf("Failed to get Node [%s]: %v", c.ControlPlaneHosts[0].HostnameOverride, err), isCritical} return &addonError{fmt.Sprintf("Failed to get Node [%s]: %v", c.ControlPlaneHosts[0].HostnameOverride, err), isCritical}
} }

View File

@ -189,7 +189,7 @@ func (c *Cluster) UpgradeControlPlane(ctx context.Context, kubeClient *kubernete
continue continue
} }
// find existing nodes that are in NotReady state // find existing nodes that are in NotReady state
if err := services.CheckNodeReady(kubeClient, host, services.ControlRole); err != nil { if err := services.CheckNodeReady(kubeClient, host, services.ControlRole, c.CloudProvider.Name); err != nil {
logrus.Debugf("Found node %v in NotReady state", host.HostnameOverride) logrus.Debugf("Found node %v in NotReady state", host.HostnameOverride)
notReadyHosts = append(notReadyHosts, host) notReadyHosts = append(notReadyHosts, host)
notReadyHostNames = append(notReadyHostNames, host.HostnameOverride) notReadyHostNames = append(notReadyHostNames, host.HostnameOverride)
@ -223,7 +223,7 @@ func (c *Cluster) UpgradeControlPlane(ctx context.Context, kubeClient *kubernete
} }
// Calling CheckNodeReady wil give some time for nodes to get in Ready state // Calling CheckNodeReady wil give some time for nodes to get in Ready state
for _, host := range notReadyHosts { for _, host := range notReadyHosts {
err = services.CheckNodeReady(kubeClient, host, services.ControlRole) err = services.CheckNodeReady(kubeClient, host, services.ControlRole, c.CloudProvider.Name)
if err != nil { if err != nil {
logrus.Errorf("Host %v failed to report Ready status with error: %v", host.HostnameOverride, err) logrus.Errorf("Host %v failed to report Ready status with error: %v", host.HostnameOverride, err)
} }
@ -236,7 +236,8 @@ func (c *Cluster) UpgradeControlPlane(ctx context.Context, kubeClient *kubernete
cpNodePlanMap, cpNodePlanMap,
c.UpdateWorkersOnly, c.UpdateWorkersOnly,
c.SystemImages.Alpine, c.SystemImages.Alpine,
c.Certificates, c.UpgradeStrategy, c.NewHosts, inactiveHosts, c.MaxUnavailableForControlNodes, c.Version) c.Certificates, c.UpgradeStrategy, c.NewHosts, inactiveHosts, c.MaxUnavailableForControlNodes,
c.Version, c.CloudProvider.Name)
if err != nil { if err != nil {
return "", fmt.Errorf("[controlPlane] Failed to upgrade Control Plane: %v", err) return "", fmt.Errorf("[controlPlane] Failed to upgrade Control Plane: %v", err)
} }
@ -310,7 +311,7 @@ func (c *Cluster) UpgradeWorkerPlane(ctx context.Context, kubeClient *kubernetes
continue continue
} }
// find existing nodes that are in NotReady state // find existing nodes that are in NotReady state
if err := services.CheckNodeReady(kubeClient, host, services.WorkerRole); err != nil { if err := services.CheckNodeReady(kubeClient, host, services.WorkerRole, c.CloudProvider.Name); err != nil {
logrus.Debugf("Found node %v in NotReady state", host.HostnameOverride) logrus.Debugf("Found node %v in NotReady state", host.HostnameOverride)
notReadyHosts = append(notReadyHosts, host) notReadyHosts = append(notReadyHosts, host)
notReadyHostNames = append(notReadyHostNames, host.HostnameOverride) notReadyHostNames = append(notReadyHostNames, host.HostnameOverride)
@ -332,7 +333,7 @@ func (c *Cluster) UpgradeWorkerPlane(ctx context.Context, kubeClient *kubernetes
} }
// Calling CheckNodeReady wil give some time for nodes to get in Ready state // Calling CheckNodeReady wil give some time for nodes to get in Ready state
for _, host := range notReadyHosts { for _, host := range notReadyHosts {
err = services.CheckNodeReady(kubeClient, host, services.WorkerRole) err = services.CheckNodeReady(kubeClient, host, services.WorkerRole, c.CloudProvider.Name)
if err != nil { if err != nil {
logrus.Errorf("Host %v failed to report Ready status with error: %v", host.HostnameOverride, err) logrus.Errorf("Host %v failed to report Ready status with error: %v", host.HostnameOverride, err)
} }
@ -349,7 +350,8 @@ func (c *Cluster) UpgradeWorkerPlane(ctx context.Context, kubeClient *kubernetes
c.UpgradeStrategy, c.UpgradeStrategy,
c.NewHosts, c.NewHosts,
c.MaxUnavailableForWorkerNodes, c.MaxUnavailableForWorkerNodes,
c.Version) c.Version,
c.CloudProvider.Name)
if err != nil { if err != nil {
return "", fmt.Errorf("[workerPlane] Failed to upgrade Worker Plane: %v", err) return "", fmt.Errorf("[workerPlane] Failed to upgrade Worker Plane: %v", err)
} }
@ -994,7 +996,7 @@ func (c *Cluster) SyncLabelsAndTaints(ctx context.Context, currentCluster *Clust
var errs []error var errs []error
for host := range hostQueue { for host := range hostQueue {
logrus.Debugf("worker [%d] starting sync for node [%s]", w, host.HostnameOverride) logrus.Debugf("worker [%d] starting sync for node [%s]", w, host.HostnameOverride)
if err := setNodeAnnotationsLabelsTaints(k8sClient, host); err != nil { if err := setNodeAnnotationsLabelsTaints(k8sClient, host, c.CloudProvider.Name); err != nil {
errs = append(errs, err) errs = append(errs, err)
} }
} }
@ -1012,17 +1014,16 @@ func (c *Cluster) SyncLabelsAndTaints(ctx context.Context, currentCluster *Clust
return nil return nil
} }
func setNodeAnnotationsLabelsTaints(k8sClient *kubernetes.Clientset, host *hosts.Host) error { func setNodeAnnotationsLabelsTaints(k8sClient *kubernetes.Clientset, host *hosts.Host, cloudProviderName string) error {
node := &v1.Node{} node := &v1.Node{}
var err error var err error
for retries := 0; retries <= 5; retries++ { for retries := 0; retries <= 5; retries++ {
node, err = k8s.GetNode(k8sClient, host.HostnameOverride) node, err = k8s.GetNode(k8sClient, host.HostnameOverride, host.InternalAddress, cloudProviderName)
if err != nil { if err != nil {
logrus.Debugf("[hosts] Can't find node by name [%s], error: %v", host.HostnameOverride, err) logrus.Debugf("[hosts] Can't find node by name [%s], error: %v", host.HostnameOverride, err)
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
continue continue
} }
oldNode := node.DeepCopy() oldNode := node.DeepCopy()
k8s.SetNodeAddressesAnnotations(node, host.InternalAddress, host.Address) k8s.SetNodeAddressesAnnotations(node, host.InternalAddress, host.Address)
k8s.SyncNodeLabels(node, host.ToAddLabels, host.ToDelLabels) k8s.SyncNodeLabels(node, host.ToAddLabels, host.ToDelLabels)

View File

@ -9,6 +9,7 @@ import (
"github.com/blang/semver" "github.com/blang/semver"
"github.com/rancher/rke/cloudprovider" "github.com/rancher/rke/cloudprovider"
"github.com/rancher/rke/cloudprovider/aws"
"github.com/rancher/rke/docker" "github.com/rancher/rke/docker"
"github.com/rancher/rke/k8s" "github.com/rancher/rke/k8s"
"github.com/rancher/rke/log" "github.com/rancher/rke/log"
@ -1061,11 +1062,25 @@ func (c *Cluster) setCloudProvider() error {
if p != nil { if p != nil {
c.CloudConfigFile, err = p.GenerateCloudConfigFile() c.CloudConfigFile, err = p.GenerateCloudConfigFile()
if err != nil { if err != nil {
return fmt.Errorf("Failed to parse cloud config file: %v", err) return fmt.Errorf("failed to parse cloud config file: %v", err)
} }
c.CloudProvider.Name = p.GetName() c.CloudProvider.Name = p.GetName()
if c.CloudProvider.Name == "" { if c.CloudProvider.Name == "" {
return fmt.Errorf("Name of the cloud provider is not defined for custom provider") return fmt.Errorf("name of the cloud provider is not defined for custom provider")
}
if c.CloudProvider.Name == aws.AWSCloudProviderName {
clusterVersion, err := getClusterVersion(c.Version)
if err != nil {
return fmt.Errorf("failed to get cluster version for checking cloud provider: %v", err)
}
// cloud provider must be external or external-aws for >=1.27
defaultExternalAwsRange, err := semver.ParseRange(">=1.27.0-rancher0")
if err != nil {
return fmt.Errorf("failed to parse semver range for checking cloud provider %v", err)
}
if defaultExternalAwsRange(clusterVersion) {
return fmt.Errorf(fmt.Sprintf("Cloud provider %s is invalid for [%s]", aws.AWSCloudProviderName, c.Version))
}
} }
} }
return nil return nil

View File

@ -13,6 +13,7 @@ import (
"github.com/blang/semver" "github.com/blang/semver"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/rancher/rke/cloudprovider/aws"
"github.com/rancher/rke/docker" "github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts" "github.com/rancher/rke/hosts"
"github.com/rancher/rke/k8s" "github.com/rancher/rke/k8s"
@ -69,6 +70,7 @@ var (
parsedRangeAtLeast123 = semver.MustParseRange(">= 1.23.0-rancher0") parsedRangeAtLeast123 = semver.MustParseRange(">= 1.23.0-rancher0")
parsedRangeAtLeast124 = semver.MustParseRange(">= 1.24.0-rancher0") parsedRangeAtLeast124 = semver.MustParseRange(">= 1.24.0-rancher0")
parsedRangeAtLeast125 = semver.MustParseRange(">= 1.25.0-rancher0") parsedRangeAtLeast125 = semver.MustParseRange(">= 1.25.0-rancher0")
parsedRangeBelow127 = semver.MustParseRange("< 1.27.0-rancher0")
parsedRange123 = semver.MustParseRange(">=1.23.0-rancher0 <=1.23.99-rancher-0") parsedRange123 = semver.MustParseRange(">=1.23.0-rancher0 <=1.23.99-rancher-0")
parsedRange124 = semver.MustParseRange(">=1.24.0-rancher0 <=1.24.99-rancher-0") parsedRange124 = semver.MustParseRange(">=1.24.0-rancher0 <=1.24.99-rancher-0")
) )
@ -179,7 +181,7 @@ func (c *Cluster) BuildKubeAPIProcess(host *hosts.Host, serviceOptions v3.Kubern
CommandArgs := map[string]string{ CommandArgs := map[string]string{
"admission-control-config-file": DefaultKubeAPIArgAdmissionControlConfigFileValue, "admission-control-config-file": DefaultKubeAPIArgAdmissionControlConfigFileValue,
"client-ca-file": pki.GetCertPath(pki.CACertName), "client-ca-file": pki.GetCertPath(pki.CACertName),
"cloud-provider": c.CloudProvider.Name, "cloud-provider": getCloudProviderName(c.CloudProvider.Name),
"etcd-cafile": etcdCAClientCert, "etcd-cafile": etcdCAClientCert,
"etcd-certfile": etcdClientCert, "etcd-certfile": etcdClientCert,
"etcd-keyfile": etcdClientKey, "etcd-keyfile": etcdClientKey,
@ -344,7 +346,7 @@ func (c *Cluster) BuildKubeAPIProcess(host *hosts.Host, serviceOptions v3.Kubern
func (c *Cluster) BuildKubeControllerProcess(host *hosts.Host, serviceOptions v3.KubernetesServicesOptions) v3.Process { func (c *Cluster) BuildKubeControllerProcess(host *hosts.Host, serviceOptions v3.KubernetesServicesOptions) v3.Process {
Command := c.getRKEToolsEntryPoint(host.OS(), "kube-controller-manager") Command := c.getRKEToolsEntryPoint(host.OS(), "kube-controller-manager")
CommandArgs := map[string]string{ CommandArgs := map[string]string{
"cloud-provider": c.CloudProvider.Name, "cloud-provider": getCloudProviderName(c.CloudProvider.Name),
"cluster-cidr": c.ClusterCIDR, "cluster-cidr": c.ClusterCIDR,
"kubeconfig": pki.GetConfigPath(pki.KubeControllerCertName), "kubeconfig": pki.GetConfigPath(pki.KubeControllerCertName),
"root-ca-file": pki.GetCertPath(pki.CACertName), "root-ca-file": pki.GetCertPath(pki.CACertName),
@ -463,7 +465,7 @@ func (c *Cluster) BuildKubeletProcess(host *hosts.Host, serviceOptions v3.Kubern
Command := c.getRKEToolsEntryPoint(host.OS(), "kubelet") Command := c.getRKEToolsEntryPoint(host.OS(), "kubelet")
CommandArgs := map[string]string{ CommandArgs := map[string]string{
"client-ca-file": pki.GetCertPath(pki.CACertName), "client-ca-file": pki.GetCertPath(pki.CACertName),
"cloud-provider": c.CloudProvider.Name, "cloud-provider": getCloudProviderName(c.CloudProvider.Name),
"cluster-dns": c.ClusterDNSServer, "cluster-dns": c.ClusterDNSServer,
"cluster-domain": c.ClusterDomain, "cluster-domain": c.ClusterDomain,
"fail-swap-on": strconv.FormatBool(kubelet.FailSwapOn), "fail-swap-on": strconv.FormatBool(kubelet.FailSwapOn),
@ -495,6 +497,11 @@ func (c *Cluster) BuildKubeletProcess(host *hosts.Host, serviceOptions v3.Kubern
if host.IsWindows() { // compatible with Windows if host.IsWindows() { // compatible with Windows
CommandArgs["cloud-config"] = path.Join(host.PrefixPath, cloudConfigFileName) CommandArgs["cloud-config"] = path.Join(host.PrefixPath, cloudConfigFileName)
} }
if c.CloudProvider.Name == k8s.ExternalAWSCloudProviderName && c.CloudProvider.UseInstanceMetadataHostname != nil && *c.CloudProvider.UseInstanceMetadataHostname {
// rke-tools will inject hostname-override from ec2 instance metadata to match with the spec.nodeName set by cloud provider https://github.com/rancher/rke-tools/blob/3eab4f07aa97a8aeeaaef55b1b7bbc82e2a3374a/entrypoint.sh#L17
delete(CommandArgs, "hostname-override")
}
} }
if c.IsKubeletGenerateServingCertificateEnabled() { if c.IsKubeletGenerateServingCertificateEnabled() {
@ -505,12 +512,14 @@ func (c *Cluster) BuildKubeletProcess(host *hosts.Host, serviceOptions v3.Kubern
var Binds []string var Binds []string
if c.IsCRIDockerdEnabled() { if c.IsCRIDockerdEnabled() {
CommandArgs["container-runtime"] = "remote"
CommandArgs["container-runtime-endpoint"] = "/var/run/dockershim.sock"
parsedVersion, err := getClusterVersion(c.Version) parsedVersion, err := getClusterVersion(c.Version)
if err != nil { if err != nil {
logrus.Debugf("Error while parsing cluster version: %s", err) logrus.Debugf("Error while parsing cluster version: %s", err)
} }
if parsedRangeBelow127(parsedVersion) {
CommandArgs["container-runtime"] = "remote" // This flag has been removed from v1.27 https://v1-26.docs.kubernetes.io/docs/reference/command-line-tools-reference/kubelet/
}
CommandArgs["container-runtime-endpoint"] = "/var/run/dockershim.sock"
// cri-dockerd must be enabled if the cluster version is 1.24 and higher // cri-dockerd must be enabled if the cluster version is 1.24 and higher
if parsedRangeAtLeast124(parsedVersion) { if parsedRangeAtLeast124(parsedVersion) {
CommandArgs["container-runtime-endpoint"] = "unix:///var/run/cri-dockerd.sock" CommandArgs["container-runtime-endpoint"] = "unix:///var/run/cri-dockerd.sock"
@ -692,7 +701,8 @@ func (c *Cluster) BuildKubeProxyProcess(host *hosts.Host, serviceOptions v3.Kube
} else { } else {
CommandArgs["bind-address"] = host.Address CommandArgs["bind-address"] = host.Address
} }
if c.CloudProvider.Name == k8s.AWSCloudProvider && c.CloudProvider.UseInstanceMetadataHostname != nil && *c.CloudProvider.UseInstanceMetadataHostname { if (c.CloudProvider.Name == k8s.ExternalAWSCloudProviderName || c.CloudProvider.Name == aws.AWSCloudProviderName) &&
c.CloudProvider.UseInstanceMetadataHostname != nil && *c.CloudProvider.UseInstanceMetadataHostname {
// rke-tools will inject hostname-override from ec2 instance metadata to match with the spec.nodeName set by cloud provider https://github.com/rancher/rke-tools/blob/3eab4f07aa97a8aeeaaef55b1b7bbc82e2a3374a/entrypoint.sh#L17 // rke-tools will inject hostname-override from ec2 instance metadata to match with the spec.nodeName set by cloud provider https://github.com/rancher/rke-tools/blob/3eab4f07aa97a8aeeaaef55b1b7bbc82e2a3374a/entrypoint.sh#L17
delete(CommandArgs, "hostname-override") delete(CommandArgs, "hostname-override")
} }
@ -1286,3 +1296,10 @@ func (c *Cluster) IsCRIDockerdEnabled() bool {
} }
return false return false
} }
func getCloudProviderName(name string) string {
if name == k8s.ExternalAWSCloudProviderName {
return "external"
}
return name
}

View File

@ -11,6 +11,7 @@ import (
v3 "github.com/rancher/rke/types" v3 "github.com/rancher/rke/types"
"github.com/rancher/rke/util" "github.com/rancher/rke/util"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
v1 "k8s.io/api/core/v1"
) )
func (c *Cluster) ClusterRemove(ctx context.Context) error { func (c *Cluster) ClusterRemove(ctx context.Context) error {
@ -92,7 +93,13 @@ func (c *Cluster) RemoveOldNodes(ctx context.Context) error {
host := &hosts.Host{} host := &hosts.Host{}
host.HostnameOverride = node.Name host.HostnameOverride = node.Name
if !hosts.IsNodeInList(host, uniqueHosts) { if !hosts.IsNodeInList(host, uniqueHosts) {
if err := k8s.DeleteNode(kubeClient, node.Name, c.CloudProvider.Name); err != nil { nodeAddress := ""
for _, addr := range node.Status.Addresses {
if addr.Type == v1.NodeInternalIP {
nodeAddress = addr.Address
}
}
if err := k8s.DeleteNode(kubeClient, node.Name, nodeAddress, c.CloudProvider.Name); err != nil {
log.Warnf(ctx, "Failed to delete old node [%s] from kubernetes") log.Warnf(ctx, "Failed to delete old node [%s] from kubernetes")
} }
} }

View File

@ -217,6 +217,13 @@ func validateNetworkOptions(c *Cluster) error {
if c.Network.Plugin == FlannelNetworkPlugin && c.Network.MTU != 0 { if c.Network.Plugin == FlannelNetworkPlugin && c.Network.MTU != 0 {
return fmt.Errorf("Network plugin [%s] does not support configuring MTU", FlannelNetworkPlugin) return fmt.Errorf("Network plugin [%s] does not support configuring MTU", FlannelNetworkPlugin)
} }
if c.Network.Plugin == WeaveNetworkPlugin {
if err := warnWeaveDeprecation(c.Version); err != nil {
return fmt.Errorf("Error while printing Weave deprecation message: %w", err)
}
}
dualStack := false dualStack := false
serviceClusterRanges := strings.Split(c.Services.KubeAPI.ServiceClusterIPRange, ",") serviceClusterRanges := strings.Split(c.Services.KubeAPI.ServiceClusterIPRange, ",")
if len(serviceClusterRanges) > 1 { if len(serviceClusterRanges) > 1 {
@ -731,3 +738,19 @@ func getClusterVersion(version string) (semver.Version, error) {
} }
return parsedVersion, nil return parsedVersion, nil
} }
// warnWeaveDeprecation prints a deprecation warning if version is higher than 1.27
func warnWeaveDeprecation(k8sVersion string) error {
version, err := util.StrToSemVer(k8sVersion)
if err != nil {
return fmt.Errorf("error while parsing cluster version [%s]: %w", k8sVersion, err)
}
version127, err := util.StrToSemVer("v1.27.0")
if err != nil {
return fmt.Errorf("failed to translate v1.27.0 to semver notation: %w", err)
}
if !version.LessThan(*version127) {
logrus.Warn("Weave CNI plugin is deprecated starting with Kubernetes v1.27 and will be removed in Kubernetes v1.30")
}
return nil
}

View File

@ -10,7 +10,7 @@ import (
) )
const ( const (
defaultURL = "https://releases.rancher.com/kontainer-driver-metadata/release-v2.7/data.json" defaultURL = "https://releases.rancher.com/kontainer-driver-metadata/dev-v2.7/data.json"
dataFile = "data/data.json" dataFile = "data/data.json"
) )

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -381,11 +381,11 @@ func RestartContainer(ctx context.Context, dClient *client.Client, hostname, con
return fmt.Errorf("Failed to restart container: docker client is nil for container [%s] on host [%s]", containerName, hostname) return fmt.Errorf("Failed to restart container: docker client is nil for container [%s] on host [%s]", containerName, hostname)
} }
var err error var err error
restartTimeout := RestartTimeout * time.Second restartTimeout := RestartTimeout
// Retry up to RetryCount times to see if image exists // Retry up to RetryCount times to see if image exists
for i := 1; i <= RetryCount; i++ { for i := 1; i <= RetryCount; i++ {
logrus.Infof("Restarting container [%s] on host [%s], try #%d", containerName, hostname, i) logrus.Infof("Restarting container [%s] on host [%s], try #%d", containerName, hostname, i)
err = dClient.ContainerRestart(ctx, containerName, &restartTimeout) err = dClient.ContainerRestart(ctx, containerName, container.StopOptions{Timeout: &restartTimeout})
if err != nil { if err != nil {
logrus.Warningf("Can't restart Docker container [%s] for host [%s]: %v", containerName, hostname, err) logrus.Warningf("Can't restart Docker container [%s] for host [%s]: %v", containerName, hostname, err)
continue continue
@ -400,11 +400,11 @@ func StopContainer(ctx context.Context, dClient *client.Client, hostname string,
} }
var err error var err error
// define the stop timeout // define the stop timeout
stopTimeoutDuration := StopTimeout * time.Second stopTimeout := StopTimeout
// Retry up to RetryCount times to see if image exists // Retry up to RetryCount times to see if image exists
for i := 1; i <= RetryCount; i++ { for i := 1; i <= RetryCount; i++ {
logrus.Infof("Stopping container [%s] on host [%s] with stopTimeoutDuration [%s], try #%d", containerName, hostname, stopTimeoutDuration, i) logrus.Infof("Stopping container [%s] on host [%s] with stopTimeout [%d], try #%d", containerName, hostname, stopTimeout, i)
err = dClient.ContainerStop(ctx, containerName, &stopTimeoutDuration) err = dClient.ContainerStop(ctx, containerName, container.StopOptions{Timeout: &stopTimeout})
if err != nil { if err != nil {
logrus.Warningf("Can't stop Docker container [%s] for host [%s]: %v", containerName, hostname, err) logrus.Warningf("Can't stop Docker container [%s] for host [%s]: %v", containerName, hostname, err)
continue continue
@ -453,11 +453,11 @@ func StartContainer(ctx context.Context, dClient *client.Client, hostname string
return err return err
} }
func CreateContainer(ctx context.Context, dClient *client.Client, hostname string, containerName string, imageCfg *container.Config, hostCfg *container.HostConfig) (container.ContainerCreateCreatedBody, error) { func CreateContainer(ctx context.Context, dClient *client.Client, hostname string, containerName string, imageCfg *container.Config, hostCfg *container.HostConfig) (container.CreateResponse, error) {
if dClient == nil { if dClient == nil {
return container.ContainerCreateCreatedBody{}, fmt.Errorf("Failed to create container: docker client is nil for container [%s] on host [%s]", containerName, hostname) return container.CreateResponse{}, fmt.Errorf("Failed to create container: docker client is nil for container [%s] on host [%s]", containerName, hostname)
} }
var created container.ContainerCreateCreatedBody var created container.CreateResponse
var err error var err error
// Retry up to RetryCount times to see if image exists // Retry up to RetryCount times to see if image exists
for i := 1; i <= RetryCount; i++ { for i := 1; i <= RetryCount; i++ {
@ -468,7 +468,7 @@ func CreateContainer(ctx context.Context, dClient *client.Client, hostname strin
} }
return created, nil return created, nil
} }
return container.ContainerCreateCreatedBody{}, fmt.Errorf("Failed to create Docker container [%s] on host [%s]: %v", containerName, hostname, err) return container.CreateResponse{}, fmt.Errorf("Failed to create Docker container [%s] on host [%s]: %v", containerName, hostname, err)
} }
func InspectContainer(ctx context.Context, dClient *client.Client, hostname string, containerName string) (types.ContainerJSON, error) { func InspectContainer(ctx context.Context, dClient *client.Client, hostname string, containerName string) (types.ContainerJSON, error) {

204
go.mod
View File

@ -1,154 +1,150 @@
module github.com/rancher/rke module github.com/rancher/rke
go 1.19 go 1.20
replace ( replace (
github.com/knative/pkg => github.com/rancher/pkg v0.0.0-20190514055449-b30ab9de040e github.com/knative/pkg => github.com/rancher/pkg v0.0.0-20190514055449-b30ab9de040e
k8s.io/client-go => k8s.io/client-go v0.25.12 k8s.io/client-go => k8s.io/client-go v0.27.4
sigs.k8s.io/json => sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 sigs.k8s.io/json => sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2
) )
require ( require (
github.com/Masterminds/sprig/v3 v3.2.2 github.com/Masterminds/sprig/v3 v3.2.3
github.com/apparentlymart/go-cidr v1.0.1 github.com/apparentlymart/go-cidr v1.1.0
github.com/aws/aws-sdk-go v1.38.65 github.com/aws/aws-sdk-go v1.44.272
github.com/blang/semver v3.5.1+incompatible github.com/blang/semver v3.5.1+incompatible
github.com/coreos/go-semver v0.3.0 github.com/coreos/go-semver v0.3.1
github.com/docker/distribution v2.8.2+incompatible github.com/docker/distribution v2.8.2+incompatible
github.com/docker/docker v20.10.24+incompatible github.com/docker/docker v24.0.2+incompatible
github.com/docker/go-connections v0.4.0 github.com/docker/go-connections v0.4.0
github.com/ghodss/yaml v1.0.0 github.com/ghodss/yaml v1.0.0
github.com/go-bindata/go-bindata v3.1.2+incompatible github.com/go-bindata/go-bindata v3.1.2+incompatible
github.com/go-ini/ini v1.37.0 github.com/go-ini/ini v1.67.0
github.com/mattn/go-colorable v0.1.8 github.com/mattn/go-colorable v0.1.13
github.com/mcuadros/go-version v0.0.0-20180611085657-6d5863ca60fa github.com/mcuadros/go-version v0.0.0-20190830083331-035f6764e8d2
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/rancher/norman v0.0.0-20221205184727-32ef2e185b99 github.com/rancher/norman v0.0.0-20230428185400-96b95e3475e0
github.com/sirupsen/logrus v1.8.1 github.com/sirupsen/logrus v1.9.2
github.com/stretchr/testify v1.8.0 github.com/stretchr/testify v1.8.2
github.com/urfave/cli v1.22.2 github.com/urfave/cli v1.22.13
go.etcd.io/etcd/client/v2 v2.305.4 go.etcd.io/etcd/client/v2 v2.305.9
go.etcd.io/etcd/client/v3 v3.5.4 go.etcd.io/etcd/client/v3 v3.5.9
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 golang.org/x/crypto v0.11.0
golang.org/x/sync v0.1.0 golang.org/x/sync v0.2.0
google.golang.org/grpc v1.53.0 google.golang.org/grpc v1.55.0
gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.25.12 k8s.io/api v0.27.4
k8s.io/apimachinery v0.25.12 k8s.io/apimachinery v0.27.4
k8s.io/apiserver v0.25.12 k8s.io/apiserver v0.27.4
k8s.io/client-go v0.25.12 k8s.io/client-go v1.5.2
k8s.io/gengo v0.0.0-20211129171323-c02415ce4185 k8s.io/gengo v0.0.0-20230306165830-ab3349d207d4
k8s.io/kubectl v0.25.12 k8s.io/kubectl v0.27.4
k8s.io/kubernetes v1.25.12 k8s.io/kubernetes v1.15.0-alpha.0
k8s.io/pod-security-admission v0.25.12 k8s.io/pod-security-admission v0.27.4
sigs.k8s.io/yaml v1.3.0 sigs.k8s.io/yaml v1.3.0
) )
require ( require (
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/MakeNowJust/heredoc v1.0.0 // indirect github.com/MakeNowJust/heredoc v1.0.0 // indirect
github.com/Masterminds/goutils v1.1.1 // indirect github.com/Masterminds/goutils v1.1.1 // indirect
github.com/Masterminds/semver/v3 v3.1.1 // indirect github.com/Masterminds/semver/v3 v3.2.1 // indirect
github.com/Microsoft/go-winio v0.5.2 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/Microsoft/hcsshim v0.9.3 // indirect
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chai2010/gettext-go v1.0.2 // indirect github.com/chai2010/gettext-go v1.0.2 // indirect
github.com/containerd/cgroups v1.0.4 // indirect github.com/containerd/containerd v1.7.1 // indirect
github.com/containerd/containerd v1.5.18 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/go-units v0.4.0 // indirect github.com/docker/go-units v0.5.0 // indirect
github.com/emicklei/go-restful/v3 v3.8.0 // indirect github.com/emicklei/go-restful/v3 v3.10.2 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d // indirect github.com/exponent-io/jsonpath v0.0.0-20210407135951-1de76d718b3f // indirect
github.com/go-errors/errors v1.0.1 // indirect github.com/go-errors/errors v1.4.2 // indirect
github.com/go-logr/logr v1.2.3 // indirect github.com/go-logr/logr v1.2.4 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.19.5 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.19.14 // indirect github.com/go-openapi/swag v0.22.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/protobuf v1.5.2 // indirect github.com/google/btree v1.1.2 // indirect
github.com/google/btree v1.0.1 // indirect github.com/google/gnostic v0.6.9 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/go-cmp v0.5.9 // indirect github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.1.0 // indirect github.com/google/gofuzz v1.2.0 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.3.0 // indirect github.com/google/uuid v1.3.0 // indirect
github.com/gopherjs/gopherjs v0.0.0-20191106031601-ce3c9ade29de // indirect github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 // indirect github.com/huandu/xstrings v1.4.0 // indirect
github.com/huandu/xstrings v1.3.1 // indirect github.com/imdario/mergo v0.3.16 // indirect
github.com/imdario/mergo v0.3.12 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.5 // indirect
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect
github.com/mailru/easyjson v0.7.6 // indirect github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-isatty v0.0.12 // indirect github.com/mattn/go-isatty v0.0.19 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mitchellh/copystructure v1.0.0 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/go-wordwrap v1.0.0 // indirect github.com/mitchellh/go-wordwrap v1.0.1 // indirect
github.com/mitchellh/reflectwalk v1.0.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/moby/patternmatcher v0.5.0 // indirect
github.com/moby/spdystream v0.2.0 // indirect github.com/moby/spdystream v0.2.0 // indirect
github.com/moby/sys/mount v0.2.0 // indirect github.com/moby/sys/sequential v0.5.0 // indirect
github.com/moby/sys/mountinfo v0.6.2 // indirect github.com/moby/term v0.5.0 // indirect
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect github.com/opencontainers/image-spec v1.1.0-rc3 // indirect
github.com/opencontainers/runc v1.1.6 // indirect github.com/opencontainers/runc v1.1.7 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.12.1 // indirect github.com/prometheus/client_golang v1.15.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect github.com/prometheus/procfs v0.10.1 // indirect
github.com/rancher/lasso v0.0.0-20221202205459-e7138f16489c // indirect github.com/rancher/lasso v0.0.0-20230629200414-8a54b32e6792 // indirect
github.com/rancher/wrangler v1.0.1-0.20221202234327-1cafffeaa9a1 // indirect github.com/rancher/wrangler v1.1.1 // indirect
github.com/russross/blackfriday v1.5.2 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shopspring/decimal v1.2.0 // indirect github.com/shopspring/decimal v1.3.1 // indirect
github.com/smartystreets/assertions v1.0.1 // indirect github.com/spf13/cast v1.5.1 // indirect
github.com/spf13/cast v1.3.1 // indirect github.com/spf13/cobra v1.7.0 // indirect
github.com/spf13/cobra v1.4.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/pflag v1.0.5 // indirect
github.com/xlab/treeprint v1.1.0 // indirect github.com/xlab/treeprint v1.2.0 // indirect
go.etcd.io/etcd/api/v3 v3.5.4 // indirect go.etcd.io/etcd/api/v3 v3.5.9 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.4 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.9 // indirect
go.opencensus.io v0.23.0 // indirect go.starlark.net v0.0.0-20230525235612-a134d8f9ddca // indirect
go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5 // indirect go.uber.org/atomic v1.11.0 // indirect
go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.11.0 // indirect
go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.24.0 // indirect
go.uber.org/zap v1.19.0 // indirect golang.org/x/mod v0.10.0 // indirect
golang.org/x/mod v0.8.0 // indirect golang.org/x/net v0.12.0 // indirect
golang.org/x/net v0.8.0 // indirect golang.org/x/oauth2 v0.8.0 // indirect
golang.org/x/oauth2 v0.4.0 // indirect golang.org/x/sys v0.10.0 // indirect
golang.org/x/sys v0.6.0 // indirect golang.org/x/term v0.10.0 // indirect
golang.org/x/term v0.6.0 // indirect golang.org/x/text v0.11.0 // indirect
golang.org/x/text v0.8.0 // indirect golang.org/x/time v0.3.0 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect golang.org/x/tools v0.9.1 // indirect
golang.org/x/tools v0.6.0 // indirect
google.golang.org/appengine v1.6.7 // indirect google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/protobuf v1.28.1 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/cli-runtime v0.25.12 // indirect gotest.tools/v3 v3.4.0 // indirect
k8s.io/component-base v0.25.12 // indirect k8s.io/cli-runtime v0.27.4 // indirect
k8s.io/klog/v2 v2.70.1 // indirect k8s.io/component-base v0.27.4 // indirect
k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 // indirect k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed // indirect k8s.io/kube-openapi v0.0.0-20230530175149-33f04d5d6b58 // indirect
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect k8s.io/utils v0.0.0-20230505201702-9f6742963106 // indirect
sigs.k8s.io/kustomize/api v0.12.1 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/kustomize/kyaml v0.13.9 // indirect sigs.k8s.io/kustomize/api v0.13.4 // indirect
sigs.k8s.io/kustomize/kyaml v0.14.2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
) )

1470
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -191,13 +191,13 @@ func (h *Host) ProcessFilter(processes map[string]v3.Process) map[string]v3.Proc
return processes return processes
} }
func DeleteNode(ctx context.Context, toDeleteHost *Host, kubeClient *kubernetes.Clientset, hasAnotherRole bool, cloudProvider string) error { func DeleteNode(ctx context.Context, toDeleteHost *Host, kubeClient *kubernetes.Clientset, hasAnotherRole bool, cloudProviderName string) error {
if hasAnotherRole { if hasAnotherRole {
log.Infof(ctx, "[hosts] host [%s] has another role, skipping delete from kubernetes cluster", toDeleteHost.Address) log.Infof(ctx, "[hosts] host [%s] has another role, skipping delete from kubernetes cluster", toDeleteHost.Address)
return nil return nil
} }
log.Infof(ctx, "[hosts] Cordoning host [%s]", toDeleteHost.Address) log.Infof(ctx, "[hosts] Cordoning host [%s]", toDeleteHost.Address)
if _, err := k8s.GetNode(kubeClient, toDeleteHost.HostnameOverride); err != nil { if _, err := k8s.GetNode(kubeClient, toDeleteHost.HostnameOverride, toDeleteHost.InternalAddress, cloudProviderName); err != nil {
if apierrors.IsNotFound(err) { if apierrors.IsNotFound(err) {
log.Warnf(ctx, "[hosts] Can't find node by name [%s]", toDeleteHost.Address) log.Warnf(ctx, "[hosts] Can't find node by name [%s]", toDeleteHost.Address)
return nil return nil
@ -205,26 +205,17 @@ func DeleteNode(ctx context.Context, toDeleteHost *Host, kubeClient *kubernetes.
return err return err
} }
if err := k8s.CordonUncordon(kubeClient, toDeleteHost.HostnameOverride, true); err != nil { if err := k8s.CordonUncordon(kubeClient, toDeleteHost.HostnameOverride, toDeleteHost.InternalAddress, cloudProviderName, true); err != nil {
return err return err
} }
log.Infof(ctx, "[hosts] Deleting host [%s] from the cluster", toDeleteHost.Address) log.Infof(ctx, "[hosts] Deleting host [%s] from the cluster", toDeleteHost.Address)
if err := k8s.DeleteNode(kubeClient, toDeleteHost.HostnameOverride, cloudProvider); err != nil { if err := k8s.DeleteNode(kubeClient, toDeleteHost.HostnameOverride, toDeleteHost.InternalAddress, cloudProviderName); err != nil {
return err return err
} }
log.Infof(ctx, "[hosts] Successfully deleted host [%s] from the cluster", toDeleteHost.Address) log.Infof(ctx, "[hosts] Successfully deleted host [%s] from the cluster", toDeleteHost.Address)
return nil return nil
} }
func RemoveTaintFromHost(ctx context.Context, host *Host, taintKey string, kubeClient *kubernetes.Clientset) error {
log.Infof(ctx, "[hosts] removing taint [%s] from host [%s]", taintKey, host.Address)
if err := k8s.RemoveTaintFromNodeByKey(kubeClient, host.HostnameOverride, taintKey); err != nil {
return err
}
log.Infof(ctx, "[hosts] Successfully deleted taint [%s] from host [%s]", taintKey, host.Address)
return nil
}
func GetToDeleteHosts(currentHosts, configHosts, inactiveHosts []*Host, includeInactive bool) []*Host { func GetToDeleteHosts(currentHosts, configHosts, inactiveHosts []*Host, includeInactive bool) []*Host {
toDeleteHosts := []*Host{} toDeleteHosts := []*Host{}
for _, currentHost := range currentHosts { for _, currentHost := range currentHosts {

View File

@ -15,18 +15,19 @@ import (
) )
const ( const (
HostnameLabel = "kubernetes.io/hostname" HostnameLabel = "kubernetes.io/hostname"
InternalAddressAnnotation = "rke.cattle.io/internal-ip" InternalAddressAnnotation = "rke.cattle.io/internal-ip"
ExternalAddressAnnotation = "rke.cattle.io/external-ip" ExternalAddressAnnotation = "rke.cattle.io/external-ip"
AWSCloudProvider = "aws" AWSCloudProvider = "aws"
MaxRetries = 5 ExternalAWSCloudProviderName = "external-aws"
RetryInterval = 5 MaxRetries = 5
RetryInterval = 5
) )
func DeleteNode(k8sClient *kubernetes.Clientset, nodeName, cloudProvider string) error { func DeleteNode(k8sClient *kubernetes.Clientset, nodeName string, nodeAddress string, cloudProviderName string) error {
// If cloud provider is configured, the node name can be set by the cloud provider, which can be different from the original node name // If cloud provider is configured, the node name can be set by the cloud provider, which can be different from the original node name
if cloudProvider != "" { if cloudProviderName != "" {
node, err := GetNode(k8sClient, nodeName) node, err := GetNode(k8sClient, nodeName, nodeAddress, cloudProviderName)
if err != nil { if err != nil {
return err return err
} }
@ -39,7 +40,7 @@ func GetNodeList(k8sClient *kubernetes.Clientset) (*v1.NodeList, error) {
return k8sClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) return k8sClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
} }
func GetNode(k8sClient *kubernetes.Clientset, nodeName string) (*v1.Node, error) { func GetNode(k8sClient *kubernetes.Clientset, nodeName, nodeAddress, cloudProviderName string) (*v1.Node, error) {
var listErr error var listErr error
for retries := 0; retries < MaxRetries; retries++ { for retries := 0; retries < MaxRetries; retries++ {
logrus.Debugf("Checking node list for node [%v], try #%v", nodeName, retries+1) logrus.Debugf("Checking node list for node [%v], try #%v", nodeName, retries+1)
@ -55,6 +56,18 @@ func GetNode(k8sClient *kubernetes.Clientset, nodeName string) (*v1.Node, error)
if strings.ToLower(node.Labels[HostnameLabel]) == strings.ToLower(nodeName) { if strings.ToLower(node.Labels[HostnameLabel]) == strings.ToLower(nodeName) {
return &node, nil return &node, nil
} }
if cloudProviderName == ExternalAWSCloudProviderName {
if nodeAddress == "" {
return nil, fmt.Errorf("failed to find node [%v] with empty nodeAddress, cloud provider: %v", nodeName, cloudProviderName)
}
logrus.Debugf("Checking internal address for node [%v], cloud provider: %v", nodeName, cloudProviderName)
for _, addr := range node.Status.Addresses {
if addr.Type == v1.NodeInternalIP && nodeAddress == addr.Address {
logrus.Debugf("Found node [%s]: %v", nodeName, nodeAddress)
return &node, nil
}
}
}
} }
time.Sleep(time.Second * RetryInterval) time.Sleep(time.Second * RetryInterval)
} }
@ -64,10 +77,10 @@ func GetNode(k8sClient *kubernetes.Clientset, nodeName string) (*v1.Node, error)
return nil, apierrors.NewNotFound(schema.GroupResource{}, nodeName) return nil, apierrors.NewNotFound(schema.GroupResource{}, nodeName)
} }
func CordonUncordon(k8sClient *kubernetes.Clientset, nodeName string, cordoned bool) error { func CordonUncordon(k8sClient *kubernetes.Clientset, nodeName string, nodeAddress string, cloudProviderName string, cordoned bool) error {
updated := false updated := false
for retries := 0; retries < MaxRetries; retries++ { for retries := 0; retries < MaxRetries; retries++ {
node, err := GetNode(k8sClient, nodeName) node, err := GetNode(k8sClient, nodeName, nodeAddress, cloudProviderName)
if err != nil { if err != nil {
logrus.Debugf("Error getting node %s: %v", nodeName, err) logrus.Debugf("Error getting node %s: %v", nodeName, err)
// no need to retry here since GetNode already retries // no need to retry here since GetNode already retries
@ -102,45 +115,6 @@ func IsNodeReady(node v1.Node) bool {
return false return false
} }
func RemoveTaintFromNodeByKey(k8sClient *kubernetes.Clientset, nodeName, taintKey string) error {
updated := false
var err error
var node *v1.Node
for retries := 0; retries <= 5; retries++ {
node, err = GetNode(k8sClient, nodeName)
if err != nil {
if apierrors.IsNotFound(err) {
logrus.Debugf("[hosts] Can't find node by name [%s]", nodeName)
return nil
}
return err
}
foundTaint := false
for i, taint := range node.Spec.Taints {
if taint.Key == taintKey {
foundTaint = true
node.Spec.Taints = append(node.Spec.Taints[:i], node.Spec.Taints[i+1:]...)
break
}
}
if !foundTaint {
return nil
}
_, err = k8sClient.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{})
if err != nil {
logrus.Debugf("Error updating node [%s] with new set of taints: %v", node.Name, err)
time.Sleep(time.Second * 5)
continue
}
updated = true
break
}
if !updated {
return fmt.Errorf("Timeout waiting for node [%s] to be updated with new set of taints: %v", node.Name, err)
}
return nil
}
func SyncNodeLabels(node *v1.Node, toAddLabels, toDelLabels map[string]string) { func SyncNodeLabels(node *v1.Node, toAddLabels, toDelLabels map[string]string) {
oldLabels := map[string]string{} oldLabels := map[string]string{}
if node.Labels == nil { if node.Labels == nil {
@ -157,6 +131,7 @@ func SyncNodeLabels(node *v1.Node, toAddLabels, toDelLabels map[string]string) {
delete(node.Labels, key) delete(node.Labels, key)
} }
} }
// ADD Labels // ADD Labels
for key, value := range toAddLabels { for key, value := range toAddLabels {
node.Labels[key] = value node.Labels[key] = value

View File

@ -52,7 +52,7 @@ func RunControlPlane(ctx context.Context, controlHosts []*hosts.Host, localConnD
func UpgradeControlPlaneNodes(ctx context.Context, kubeClient *kubernetes.Clientset, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, func UpgradeControlPlaneNodes(ctx context.Context, kubeClient *kubernetes.Clientset, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory,
prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string, certMap map[string]pki.CertificatePKI, prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string, certMap map[string]pki.CertificatePKI,
upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool, maxUnavailable int, k8sVersion string) (string, error) { upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool, maxUnavailable int, k8sVersion, cloudProviderName string) (string, error) {
if updateWorkersOnly { if updateWorkersOnly {
return "", nil return "", nil
} }
@ -83,7 +83,7 @@ func UpgradeControlPlaneNodes(ctx context.Context, kubeClient *kubernetes.Client
inactiveHostErr = fmt.Errorf("provisioning incomplete, host(s) [%s] skipped because they could not be contacted", strings.Join(inactiveHostNames, ",")) inactiveHostErr = fmt.Errorf("provisioning incomplete, host(s) [%s] skipped because they could not be contacted", strings.Join(inactiveHostNames, ","))
} }
hostsFailedToUpgrade, err := processControlPlaneForUpgrade(ctx, kubeClient, controlHosts, localConnDialerFactory, prsMap, cpNodePlanMap, updateWorkersOnly, alpineImage, certMap, hostsFailedToUpgrade, err := processControlPlaneForUpgrade(ctx, kubeClient, controlHosts, localConnDialerFactory, prsMap, cpNodePlanMap, updateWorkersOnly, alpineImage, certMap,
upgradeStrategy, newHosts, inactiveHosts, maxUnavailable, drainHelper, k8sVersion) upgradeStrategy, newHosts, inactiveHosts, maxUnavailable, drainHelper, k8sVersion, cloudProviderName)
if err != nil || inactiveHostErr != nil { if err != nil || inactiveHostErr != nil {
if len(hostsFailedToUpgrade) > 0 { if len(hostsFailedToUpgrade) > 0 {
logrus.Errorf("Failed to upgrade hosts: %v with error %v", strings.Join(hostsFailedToUpgrade, ","), err) logrus.Errorf("Failed to upgrade hosts: %v with error %v", strings.Join(hostsFailedToUpgrade, ","), err)
@ -103,7 +103,7 @@ func UpgradeControlPlaneNodes(ctx context.Context, kubeClient *kubernetes.Client
func processControlPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Clientset, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, func processControlPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Clientset, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory,
prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string, certMap map[string]pki.CertificatePKI, prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string, certMap map[string]pki.CertificatePKI,
upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool, maxUnavailable int, drainHelper drain.Helper, k8sVersion string) ([]string, error) { upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool, maxUnavailable int, drainHelper drain.Helper, k8sVersion, cloudProviderName string) ([]string, error) {
var errgrp errgroup.Group var errgrp errgroup.Group
var failedHosts []string var failedHosts []string
var hostsFailedToUpgrade = make(chan string, maxUnavailable) var hostsFailedToUpgrade = make(chan string, maxUnavailable)
@ -130,7 +130,7 @@ func processControlPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.C
} }
continue continue
} }
if err := CheckNodeReady(kubeClient, runHost, ControlRole); err != nil { if err := CheckNodeReady(kubeClient, runHost, ControlRole, cloudProviderName); err != nil {
errList = append(errList, err) errList = append(errList, err)
hostsFailedToUpgrade <- runHost.HostnameOverride hostsFailedToUpgrade <- runHost.HostnameOverride
hostsFailed.Store(runHost.HostnameOverride, true) hostsFailed.Store(runHost.HostnameOverride, true)
@ -165,7 +165,7 @@ func processControlPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.C
} }
if !controlPlaneUpgradable && !workerPlaneUpgradable { if !controlPlaneUpgradable && !workerPlaneUpgradable {
log.Infof(ctx, "Upgrade not required for controlplane and worker components of host %v", runHost.HostnameOverride) log.Infof(ctx, "Upgrade not required for controlplane and worker components of host %v", runHost.HostnameOverride)
if err := k8s.CordonUncordon(kubeClient, runHost.HostnameOverride, false); err != nil { if err := k8s.CordonUncordon(kubeClient, runHost.HostnameOverride, runHost.InternalAddress, cloudProviderName, false); err != nil {
// This node didn't undergo an upgrade, so RKE will only log any error after uncordoning it and won't count this in maxUnavailable // This node didn't undergo an upgrade, so RKE will only log any error after uncordoning it and won't count this in maxUnavailable
logrus.Errorf("[controlplane] Failed to uncordon node %v, error: %v", runHost.HostnameOverride, err) logrus.Errorf("[controlplane] Failed to uncordon node %v, error: %v", runHost.HostnameOverride, err)
} }
@ -173,7 +173,8 @@ func processControlPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.C
} }
shouldDrain := upgradeStrategy.Drain != nil && *upgradeStrategy.Drain shouldDrain := upgradeStrategy.Drain != nil && *upgradeStrategy.Drain
if err := upgradeControlHost(ctx, kubeClient, runHost, shouldDrain, drainHelper, localConnDialerFactory, prsMap, cpNodePlanMap, updateWorkersOnly, alpineImage, certMap, controlPlaneUpgradable, workerPlaneUpgradable, k8sVersion); err != nil { if err := upgradeControlHost(ctx, kubeClient, runHost, shouldDrain, drainHelper, localConnDialerFactory, prsMap, cpNodePlanMap, updateWorkersOnly,
alpineImage, certMap, controlPlaneUpgradable, workerPlaneUpgradable, k8sVersion, cloudProviderName); err != nil {
errList = append(errList, err) errList = append(errList, err)
hostsFailedToUpgrade <- runHost.HostnameOverride hostsFailedToUpgrade <- runHost.HostnameOverride
hostsFailed.Store(runHost.HostnameOverride, true) hostsFailed.Store(runHost.HostnameOverride, true)
@ -216,8 +217,8 @@ func checkHostUpgradable(ctx context.Context, runHost *hosts.Host, cpNodePlanMap
func upgradeControlHost(ctx context.Context, kubeClient *kubernetes.Clientset, host *hosts.Host, drain bool, drainHelper drain.Helper, func upgradeControlHost(ctx context.Context, kubeClient *kubernetes.Clientset, host *hosts.Host, drain bool, drainHelper drain.Helper,
localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool,
alpineImage string, certMap map[string]pki.CertificatePKI, controlPlaneUpgradable, workerPlaneUpgradable bool, k8sVersion string) error { alpineImage string, certMap map[string]pki.CertificatePKI, controlPlaneUpgradable, workerPlaneUpgradable bool, k8sVersion, cloudProviderName string) error {
if err := cordonAndDrainNode(kubeClient, host, drain, drainHelper, ControlRole); err != nil { if err := cordonAndDrainNode(kubeClient, host, drain, drainHelper, ControlRole, cloudProviderName); err != nil {
return err return err
} }
if controlPlaneUpgradable { if controlPlaneUpgradable {
@ -233,10 +234,10 @@ func upgradeControlHost(ctx context.Context, kubeClient *kubernetes.Clientset, h
} }
} }
if err := CheckNodeReady(kubeClient, host, ControlRole); err != nil { if err := CheckNodeReady(kubeClient, host, ControlRole, cloudProviderName); err != nil {
return err return err
} }
return k8s.CordonUncordon(kubeClient, host.HostnameOverride, false) return k8s.CordonUncordon(kubeClient, host.HostnameOverride, host.InternalAddress, cloudProviderName, false)
} }
func RemoveControlPlane(ctx context.Context, controlHosts []*hosts.Host, force bool) error { func RemoveControlPlane(ctx context.Context, controlHosts []*hosts.Host, force bool) error {

View File

@ -17,10 +17,10 @@ import (
"k8s.io/kubectl/pkg/drain" "k8s.io/kubectl/pkg/drain"
) )
func CheckNodeReady(kubeClient *kubernetes.Clientset, runHost *hosts.Host, component string) error { func CheckNodeReady(kubeClient *kubernetes.Clientset, runHost *hosts.Host, component, cloudProviderName string) error {
for retries := 0; retries < k8s.MaxRetries; retries++ { for retries := 0; retries < k8s.MaxRetries; retries++ {
logrus.Infof("[%s] Now checking status of node %v, try #%v", component, runHost.HostnameOverride, retries+1) logrus.Infof("[%s] Now checking status of node %v, try #%v", component, runHost.HostnameOverride, retries+1)
k8sNode, err := k8s.GetNode(kubeClient, runHost.HostnameOverride) k8sNode, err := k8s.GetNode(kubeClient, runHost.HostnameOverride, runHost.InternalAddress, cloudProviderName)
if err != nil { if err != nil {
return fmt.Errorf("[%s] Error getting node %v: %v", component, runHost.HostnameOverride, err) return fmt.Errorf("[%s] Error getting node %v: %v", component, runHost.HostnameOverride, err)
} }
@ -33,9 +33,9 @@ func CheckNodeReady(kubeClient *kubernetes.Clientset, runHost *hosts.Host, compo
return fmt.Errorf("host %v not ready", runHost.HostnameOverride) return fmt.Errorf("host %v not ready", runHost.HostnameOverride)
} }
func cordonAndDrainNode(kubeClient *kubernetes.Clientset, host *hosts.Host, drainNode bool, drainHelper drain.Helper, component string) error { func cordonAndDrainNode(kubeClient *kubernetes.Clientset, host *hosts.Host, drainNode bool, drainHelper drain.Helper, component, cloudProviderName string) error {
logrus.Debugf("[%s] Cordoning node %v", component, host.HostnameOverride) logrus.Debugf("[%s] Cordoning node %v", component, host.HostnameOverride)
if err := k8s.CordonUncordon(kubeClient, host.HostnameOverride, true); err != nil { if err := k8s.CordonUncordon(kubeClient, host.HostnameOverride, host.InternalAddress, cloudProviderName, true); err != nil {
return err return err
} }
if !drainNode { if !drainNode {

View File

@ -52,14 +52,17 @@ func RunWorkerPlane(ctx context.Context, allHosts []*hosts.Host, localConnDialer
return nil return nil
} }
func UpgradeWorkerPlaneForWorkerAndEtcdNodes(ctx context.Context, kubeClient *kubernetes.Clientset, mixedRolesHosts []*hosts.Host, workerOnlyHosts []*hosts.Host, inactiveHosts map[string]bool, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts map[string]bool, maxUnavailable int, k8sVersion string) (string, error) { func UpgradeWorkerPlaneForWorkerAndEtcdNodes(ctx context.Context, kubeClient *kubernetes.Clientset, mixedRolesHosts []*hosts.Host, workerOnlyHosts []*hosts.Host, inactiveHosts map[string]bool, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI,
updateWorkersOnly bool, alpineImage string, upgradeStrategy *v3.NodeUpgradeStrategy,
newHosts map[string]bool, maxUnavailable int, k8sVersion, cloudProviderName string) (string, error) {
log.Infof(ctx, "[%s] Upgrading Worker Plane..", WorkerRole) log.Infof(ctx, "[%s] Upgrading Worker Plane..", WorkerRole)
var errMsgMaxUnavailableNotFailed string var errMsgMaxUnavailableNotFailed string
updateNewHostsList(kubeClient, append(mixedRolesHosts, workerOnlyHosts...), newHosts) updateNewHostsList(kubeClient, append(mixedRolesHosts, workerOnlyHosts...), newHosts, cloudProviderName)
if len(mixedRolesHosts) > 0 { if len(mixedRolesHosts) > 0 {
log.Infof(ctx, "First checking and processing worker components for upgrades on nodes with etcd role one at a time") log.Infof(ctx, "First checking and processing worker components for upgrades on nodes with etcd role one at a time")
} }
multipleRolesHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, mixedRolesHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, 1, upgradeStrategy, newHosts, inactiveHosts, k8sVersion) multipleRolesHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, mixedRolesHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage,
1, upgradeStrategy, newHosts, inactiveHosts, k8sVersion, cloudProviderName)
if err != nil { if err != nil {
logrus.Errorf("Failed to upgrade hosts: %v with error %v", strings.Join(multipleRolesHostsFailedToUpgrade, ","), err) logrus.Errorf("Failed to upgrade hosts: %v with error %v", strings.Join(multipleRolesHostsFailedToUpgrade, ","), err)
return errMsgMaxUnavailableNotFailed, err return errMsgMaxUnavailableNotFailed, err
@ -68,7 +71,8 @@ func UpgradeWorkerPlaneForWorkerAndEtcdNodes(ctx context.Context, kubeClient *ku
if len(workerOnlyHosts) > 0 { if len(workerOnlyHosts) > 0 {
log.Infof(ctx, "Now checking and upgrading worker components on nodes with only worker role %v at a time", maxUnavailable) log.Infof(ctx, "Now checking and upgrading worker components on nodes with only worker role %v at a time", maxUnavailable)
} }
workerOnlyHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, workerOnlyHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, maxUnavailable, upgradeStrategy, newHosts, inactiveHosts, k8sVersion) workerOnlyHostsFailedToUpgrade, err := processWorkerPlaneForUpgrade(ctx, kubeClient, workerOnlyHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage,
maxUnavailable, upgradeStrategy, newHosts, inactiveHosts, k8sVersion, cloudProviderName)
if err != nil { if err != nil {
logrus.Errorf("Failed to upgrade hosts: %v with error %v", strings.Join(workerOnlyHostsFailedToUpgrade, ","), err) logrus.Errorf("Failed to upgrade hosts: %v with error %v", strings.Join(workerOnlyHostsFailedToUpgrade, ","), err)
if len(workerOnlyHostsFailedToUpgrade) >= maxUnavailable { if len(workerOnlyHostsFailedToUpgrade) >= maxUnavailable {
@ -81,9 +85,9 @@ func UpgradeWorkerPlaneForWorkerAndEtcdNodes(ctx context.Context, kubeClient *ku
return errMsgMaxUnavailableNotFailed, nil return errMsgMaxUnavailableNotFailed, nil
} }
func updateNewHostsList(kubeClient *kubernetes.Clientset, allHosts []*hosts.Host, newHosts map[string]bool) { func updateNewHostsList(kubeClient *kubernetes.Clientset, allHosts []*hosts.Host, newHosts map[string]bool, cloudProviderName string) {
for _, h := range allHosts { for _, h := range allHosts {
_, err := k8s.GetNode(kubeClient, h.HostnameOverride) _, err := k8s.GetNode(kubeClient, h.HostnameOverride, h.InternalAddress, cloudProviderName)
if err != nil && apierrors.IsNotFound(err) { if err != nil && apierrors.IsNotFound(err) {
// this host could have been added to cluster state upon successful controlplane upgrade but isn't a node yet. // this host could have been added to cluster state upon successful controlplane upgrade but isn't a node yet.
newHosts[h.HostnameOverride] = true newHosts[h.HostnameOverride] = true
@ -93,7 +97,7 @@ func updateNewHostsList(kubeClient *kubernetes.Clientset, allHosts []*hosts.Host
func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Clientset, allHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Clientset, allHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory,
prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string,
maxUnavailable int, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool, k8sVersion string) ([]string, error) { maxUnavailable int, upgradeStrategy *v3.NodeUpgradeStrategy, newHosts, inactiveHosts map[string]bool, k8sVersion, cloudProviderName string) ([]string, error) {
var errgrp errgroup.Group var errgrp errgroup.Group
var drainHelper drain.Helper var drainHelper drain.Helper
var failedHosts []string var failedHosts []string
@ -128,7 +132,7 @@ func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Cl
} }
continue continue
} }
if err := CheckNodeReady(kubeClient, runHost, WorkerRole); err != nil { if err := CheckNodeReady(kubeClient, runHost, WorkerRole, cloudProviderName); err != nil {
errList = append(errList, err) errList = append(errList, err)
hostsFailed.Store(runHost.HostnameOverride, true) hostsFailed.Store(runHost.HostnameOverride, true)
hostsFailedToUpgrade <- runHost.HostnameOverride hostsFailedToUpgrade <- runHost.HostnameOverride
@ -163,13 +167,14 @@ func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Cl
} }
if !upgradable { if !upgradable {
logrus.Infof("[workerplane] Upgrade not required for worker components of host %v", runHost.HostnameOverride) logrus.Infof("[workerplane] Upgrade not required for worker components of host %v", runHost.HostnameOverride)
if err := k8s.CordonUncordon(kubeClient, runHost.HostnameOverride, false); err != nil { if err := k8s.CordonUncordon(kubeClient, runHost.HostnameOverride, runHost.InternalAddress, cloudProviderName, false); err != nil {
// This node didn't undergo an upgrade, so RKE will only log any error after uncordoning it and won't count this in maxUnavailable // This node didn't undergo an upgrade, so RKE will only log any error after uncordoning it and won't count this in maxUnavailable
logrus.Errorf("[workerplane] Failed to uncordon node %v, error: %v", runHost.HostnameOverride, err) logrus.Errorf("[workerplane] Failed to uncordon node %v, error: %v", runHost.HostnameOverride, err)
} }
continue continue
} }
if err := upgradeWorkerHost(ctx, kubeClient, runHost, upgradeStrategy.Drain != nil && *upgradeStrategy.Drain, drainHelper, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, k8sVersion); err != nil { if err := upgradeWorkerHost(ctx, kubeClient, runHost, upgradeStrategy.Drain != nil && *upgradeStrategy.Drain, drainHelper, localConnDialerFactory, prsMap, workerNodePlanMap, certMap,
updateWorkersOnly, alpineImage, k8sVersion, cloudProviderName); err != nil {
errList = append(errList, err) errList = append(errList, err)
hostsFailed.Store(runHost.HostnameOverride, true) hostsFailed.Store(runHost.HostnameOverride, true)
hostsFailedToUpgrade <- runHost.HostnameOverride hostsFailedToUpgrade <- runHost.HostnameOverride
@ -192,9 +197,9 @@ func processWorkerPlaneForUpgrade(ctx context.Context, kubeClient *kubernetes.Cl
func upgradeWorkerHost(ctx context.Context, kubeClient *kubernetes.Clientset, runHost *hosts.Host, drainFlag bool, drainHelper drain.Helper, func upgradeWorkerHost(ctx context.Context, kubeClient *kubernetes.Clientset, runHost *hosts.Host, drainFlag bool, drainHelper drain.Helper,
localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool,
alpineImage, k8sVersion string) error { alpineImage, k8sVersion, cloudProviderName string) error {
// cordon and drain // cordon and drain
if err := cordonAndDrainNode(kubeClient, runHost, drainFlag, drainHelper, WorkerRole); err != nil { if err := cordonAndDrainNode(kubeClient, runHost, drainFlag, drainHelper, WorkerRole, cloudProviderName); err != nil {
return err return err
} }
logrus.Debugf("[workerplane] upgrading host %v", runHost.HostnameOverride) logrus.Debugf("[workerplane] upgrading host %v", runHost.HostnameOverride)
@ -202,11 +207,11 @@ func upgradeWorkerHost(ctx context.Context, kubeClient *kubernetes.Clientset, ru
return err return err
} }
// consider upgrade done when kubeclient lists node as ready // consider upgrade done when kubeclient lists node as ready
if err := CheckNodeReady(kubeClient, runHost, WorkerRole); err != nil { if err := CheckNodeReady(kubeClient, runHost, WorkerRole, cloudProviderName); err != nil {
return err return err
} }
// uncordon node // uncordon node
return k8s.CordonUncordon(kubeClient, runHost.HostnameOverride, false) return k8s.CordonUncordon(kubeClient, runHost.HostnameOverride, runHost.InternalAddress, cloudProviderName, false)
} }
func doDeployWorkerPlaneHost(ctx context.Context, host *hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, processMap map[string]v3.Process, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage, k8sVersion string) error { func doDeployWorkerPlaneHost(ctx context.Context, host *hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, processMap map[string]v3.Process, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage, k8sVersion string) error {