1
0
mirror of https://github.com/rancher/rke.git synced 2025-05-09 16:56:57 +00:00
rke/cluster/reconcile.go
Jiaqi Luo 2e767c8c28 This PR fixes the issue that kube-apiserver does not restart in all CP nodes after changing the Pod Security Admission Configuration.
A new environment variable RKE_ADMISSION_CONFIG_CHECKSUM whose value is the checksum of the content of the admission configuration file is added to the env list that is set in the `kube-apiserver` container configuration, so any changes in the admission configuration file will result in a change in the container's configuration. RKE will detect the changes during reconciliation and therefore restart the kube-apiserver container on all CP nodes. The upgrade cadence is thresholded by the upgrade strategy in the cluster.

This PR also drops the unnecessary appending of env var to the cluster object which shows in the cluster.rkestate file.

Signed-off-by: Jiaqi Luo <6218999+jiaqiluo@users.noreply.github.com>
2024-04-09 14:24:17 -07:00

491 lines
20 KiB
Go

package cluster
import (
"context"
"fmt"
"time"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/k8s"
"github.com/rancher/rke/log"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/pki/cert"
"github.com/rancher/rke/services"
v3 "github.com/rancher/rke/types"
"github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
)
const (
unschedulableEtcdTaint = "node-role.kubernetes.io/etcd=true:NoExecute"
unschedulableControlTaint = "node-role.kubernetes.io/controlplane=true:NoSchedule"
unschedulableEtcdTaintKey = "node-role.kubernetes.io/etcd=:NoExecute"
unschedulableControlTaintKey = "node-role.kubernetes.io/controlplane=:NoSchedule"
EtcdPlaneNodesReplacedErr = "Etcd plane nodes are replaced. Stopping provisioning. Please restore your cluster from backup."
)
func ReconcileCluster(ctx context.Context, kubeCluster, currentCluster *Cluster, flags ExternalFlags, svcOptionData map[string]*v3.KubernetesServicesOptions) error {
logrus.Tracef("[reconcile] currentCluster: %+v\n", currentCluster)
log.Infof(ctx, "[reconcile] Reconciling cluster state")
kubeCluster.UpdateWorkersOnly = flags.UpdateOnly
if currentCluster == nil {
log.Infof(ctx, "[reconcile] This is newly generated cluster")
kubeCluster.UpdateWorkersOnly = false
return nil
}
// If certificates are not present, this is broken state and should error out
if len(currentCluster.Certificates) == 0 {
return fmt.Errorf("Certificates are not present in cluster state, recover rkestate file or certificate information in cluster state")
}
kubeClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport)
if err != nil {
return fmt.Errorf("Failed to initialize new kubernetes client: %v", err)
}
// sync node labels to define the toDelete labels
syncLabels(ctx, currentCluster, kubeCluster)
syncNodeRoles(ctx, currentCluster, kubeCluster)
if err := reconcileEtcd(ctx, currentCluster, kubeCluster, kubeClient, svcOptionData); err != nil {
return fmt.Errorf("Failed to reconcile etcd plane: %v", err)
}
if err := reconcileWorker(ctx, currentCluster, kubeCluster, kubeClient); err != nil {
return err
}
if err := reconcileControl(ctx, currentCluster, kubeCluster, kubeClient); err != nil {
return err
}
if kubeCluster.ForceDeployCerts {
if err := restartComponentsWhenCertChanges(ctx, currentCluster, kubeCluster); err != nil {
return err
}
}
log.Infof(ctx, "[reconcile] Reconciled cluster state successfully")
return nil
}
func reconcileWorker(ctx context.Context, currentCluster, kubeCluster *Cluster, kubeClient *kubernetes.Clientset) error {
// worker deleted first to avoid issues when worker+controller on same host
logrus.Debugf("[reconcile] Check worker hosts to be deleted")
wpToDelete := hosts.GetToDeleteHosts(currentCluster.WorkerHosts, kubeCluster.WorkerHosts, kubeCluster.InactiveHosts, false)
for _, toDeleteHost := range wpToDelete {
toDeleteHost.IsWorker = false
if err := hosts.DeleteNode(ctx, toDeleteHost, kubeClient, toDeleteHost.IsControl || toDeleteHost.IsEtcd, kubeCluster.CloudProvider.Name); err != nil {
return fmt.Errorf("Failed to delete worker node [%s] from cluster: %v", toDeleteHost.Address, err)
}
// attempting to clean services/files on the host
if err := reconcileHost(ctx, toDeleteHost, true, false, currentCluster); err != nil {
log.Warnf(ctx, "[reconcile] Couldn't clean up worker node [%s]: %v", toDeleteHost.Address, err)
continue
}
}
// attempt to remove unschedulable taint
toAddHosts := hosts.GetToAddHosts(currentCluster.WorkerHosts, kubeCluster.WorkerHosts)
for _, host := range toAddHosts {
host.UpdateWorker = true
if host.IsEtcd {
host.ToDelTaints = append(host.ToDelTaints, unschedulableEtcdTaint)
}
if host.IsControl {
host.ToDelTaints = append(host.ToDelTaints, unschedulableControlTaint)
}
}
return nil
}
func reconcileControl(ctx context.Context, currentCluster, kubeCluster *Cluster, kubeClient *kubernetes.Clientset) error {
logrus.Debugf("[reconcile] Check Control plane hosts to be deleted")
selfDeleteAddress, err := getLocalConfigAddress(kubeCluster.LocalKubeConfigPath)
if err != nil {
return err
}
cpToDelete := hosts.GetToDeleteHosts(currentCluster.ControlPlaneHosts, kubeCluster.ControlPlaneHosts, kubeCluster.InactiveHosts, false)
// move the current host in local kubeconfig to the end of the list
for i, toDeleteHost := range cpToDelete {
if toDeleteHost.Address == selfDeleteAddress {
cpToDelete = append(cpToDelete[:i], cpToDelete[i+1:]...)
cpToDelete = append(cpToDelete, toDeleteHost)
}
}
if len(cpToDelete) == len(currentCluster.ControlPlaneHosts) {
log.Infof(ctx, "[reconcile] Deleting all current controlplane nodes, skipping deleting from k8s cluster")
// rebuilding local admin config to enable saving cluster state
return rebuildLocalAdminConfig(ctx, kubeCluster)
}
for _, toDeleteHost := range cpToDelete {
if err := cleanControlNode(ctx, kubeCluster, currentCluster, toDeleteHost); err != nil {
return err
}
}
// rebuilding local admin config to enable saving cluster state
return rebuildLocalAdminConfig(ctx, kubeCluster)
}
func reconcileHost(ctx context.Context, toDeleteHost *hosts.Host, worker, etcd bool, cluster *Cluster) error {
var retryErr error
retries := 3
sleepSeconds := 3
for i := 0; i < retries; i++ {
if retryErr = toDeleteHost.TunnelUp(ctx, cluster.DockerDialerFactory, cluster.getPrefixPath(toDeleteHost.OS()), cluster.Version); retryErr != nil {
logrus.Debugf("Failed to dial the host %s trying again in %d seconds", toDeleteHost.Address, sleepSeconds)
time.Sleep(time.Second * time.Duration(sleepSeconds))
toDeleteHost.DClient = nil
continue
}
break
}
if retryErr != nil {
return fmt.Errorf("Not able to reach the host: %v", retryErr)
}
if worker {
if err := services.RemoveWorkerPlane(ctx, []*hosts.Host{toDeleteHost}, false); err != nil {
return fmt.Errorf("Couldn't remove worker plane: %v", err)
}
if err := toDeleteHost.CleanUpWorkerHost(ctx, cluster.SystemImages.Alpine, cluster.PrivateRegistriesMap, cluster.Version); err != nil {
return fmt.Errorf("Not able to clean the host: %v", err)
}
} else if etcd {
if err := services.RemoveEtcdPlane(ctx, []*hosts.Host{toDeleteHost}, false); err != nil {
return fmt.Errorf("Couldn't remove etcd plane: %v", err)
}
if err := toDeleteHost.CleanUpEtcdHost(ctx, cluster.SystemImages.Alpine, cluster.PrivateRegistriesMap, cluster.Version); err != nil {
return fmt.Errorf("Not able to clean the host: %v", err)
}
} else {
if err := services.RemoveControlPlane(ctx, []*hosts.Host{toDeleteHost}, false); err != nil {
return fmt.Errorf("Couldn't remove control plane: %v", err)
}
if err := toDeleteHost.CleanUpControlHost(ctx, cluster.SystemImages.Alpine, cluster.PrivateRegistriesMap, cluster.Version); err != nil {
return fmt.Errorf("Not able to clean the host: %v", err)
}
}
return nil
}
func reconcileEtcd(ctx context.Context, currentCluster, kubeCluster *Cluster, kubeClient *kubernetes.Clientset, svcOptionData map[string]*v3.KubernetesServicesOptions) error {
etcdToDelete := hosts.GetToDeleteHosts(currentCluster.EtcdHosts, kubeCluster.EtcdHosts, kubeCluster.InactiveHosts, false)
etcdToAdd := hosts.GetToAddHosts(currentCluster.EtcdHosts, kubeCluster.EtcdHosts)
clientCert := cert.EncodeCertPEM(currentCluster.Certificates[pki.KubeNodeCertName].Certificate)
clientKey := cert.EncodePrivateKeyPEM(currentCluster.Certificates[pki.KubeNodeCertName].Key)
// check if the whole etcd plane is replaced
if isEtcdPlaneReplaced(ctx, currentCluster, kubeCluster) {
return fmt.Errorf("%v", EtcdPlaneNodesReplacedErr)
}
// check if Node changed its public IP
for i := range etcdToDelete {
for j := range etcdToAdd {
if etcdToDelete[i].InternalAddress == etcdToAdd[j].InternalAddress {
etcdToDelete[i].Address = etcdToAdd[j].Address
}
break
}
}
// handle etcd member delete
if err := deleteEtcdMembers(ctx, currentCluster, kubeCluster, kubeClient, svcOptionData, clientCert, clientKey, etcdToDelete); err != nil {
return err
}
// handle etcd member add
return addEtcdMembers(ctx, currentCluster, kubeCluster, kubeClient, svcOptionData, clientCert, clientKey, etcdToAdd)
}
func addEtcdMembers(ctx context.Context, currentCluster, kubeCluster *Cluster, kubeClient *kubernetes.Clientset, svcOptionData map[string]*v3.KubernetesServicesOptions, clientCert, clientKey []byte, etcdToAdd []*hosts.Host) error {
log.Infof(ctx, "[reconcile] Check etcd hosts to be added")
for _, etcdHost := range etcdToAdd {
kubeCluster.UpdateWorkersOnly = false
etcdHost.ToAddEtcdMember = true
}
for _, etcdHost := range etcdToAdd {
// Check if the host already part of the cluster -- this will cover cluster with lost quorum
isEtcdMember, err := services.IsEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory,
currentCluster.Version, clientCert, clientKey)
if err != nil {
return err
}
if !isEtcdMember {
if err := services.AddEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory,
currentCluster.Version, clientCert, clientKey); err != nil {
return err
}
}
etcdHost.ToAddEtcdMember = false
kubeCluster.setReadyEtcdHosts()
etcdNodePlanMap := make(map[string]v3.RKEConfigNodePlan)
for _, etcdReadyHost := range kubeCluster.EtcdReadyHosts {
svcOptions, err := kubeCluster.GetKubernetesServicesOptions(etcdReadyHost.DockerInfo.OSType, svcOptionData)
if err != nil {
return err
}
etcdNodePlanMap[etcdReadyHost.Address] = BuildRKEConfigNodePlan(ctx, kubeCluster, etcdReadyHost, svcOptions)
}
// this will start the newly added etcd node and make sure it started correctly before restarting other node
// https://github.com/etcd-io/etcd/blob/master/Documentation/op-guide/runtime-configuration.md#add-a-new-member
if err := services.ReloadEtcdCluster(ctx, kubeCluster.EtcdReadyHosts, etcdHost, currentCluster.LocalConnDialerFactory, clientCert, clientKey, currentCluster.PrivateRegistriesMap, etcdNodePlanMap, kubeCluster.SystemImages.Alpine, kubeCluster.Version); err != nil {
return err
}
}
return nil
}
func deleteEtcdMembers(ctx context.Context, currentCluster, kubeCluster *Cluster, kubeClient *kubernetes.Clientset, svcOptionData map[string]*v3.KubernetesServicesOptions, clientCert, clientKey []byte, etcdToDelete []*hosts.Host) error {
log.Infof(ctx, "[reconcile] Check etcd hosts to be deleted")
etcdNodePlanMap := make(map[string]v3.RKEConfigNodePlan)
for _, etcdMapHost := range kubeCluster.EtcdHosts {
svcOptions, err := kubeCluster.GetKubernetesServicesOptions(etcdMapHost.DockerInfo.OSType, svcOptionData)
if err != nil {
return err
}
etcdNodePlanMap[etcdMapHost.Address] = BuildRKEConfigNodePlan(ctx, kubeCluster, etcdMapHost, svcOptions)
}
for _, etcdHost := range etcdToDelete {
etcdHost.IsEtcd = false
if err := services.RemoveEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory,
currentCluster.Version, clientCert, clientKey, etcdNodePlanMap); err != nil {
log.Warnf(ctx, "[reconcile] %v", err)
continue
}
if err := hosts.DeleteNode(ctx, etcdHost, kubeClient, etcdHost.IsControl || etcdHost.IsWorker, kubeCluster.CloudProvider.Name); err != nil {
log.Warnf(ctx, "Failed to delete etcd node [%s] from cluster: %v", etcdHost.Address, err)
continue
}
// attempting to clean services/files on the host
if err := reconcileHost(ctx, etcdHost, false, true, currentCluster); err != nil {
log.Warnf(ctx, "[reconcile] Couldn't clean up etcd node [%s]: %v", etcdHost.Address, err)
continue
}
}
return nil
}
func syncLabels(ctx context.Context, currentCluster, kubeCluster *Cluster) {
currentHosts := hosts.GetUniqueHostList(currentCluster.EtcdHosts, currentCluster.ControlPlaneHosts, currentCluster.WorkerHosts)
configHosts := hosts.GetUniqueHostList(kubeCluster.EtcdHosts, kubeCluster.ControlPlaneHosts, kubeCluster.WorkerHosts)
for _, host := range configHosts {
for _, currentHost := range currentHosts {
if host.Address == currentHost.Address {
for k, v := range currentHost.Labels {
if _, ok := host.Labels[k]; !ok {
host.ToDelLabels[k] = v
}
}
break
}
}
}
}
func syncNodeRoles(ctx context.Context, currentCluster, kubeCluster *Cluster) {
currentHosts := hosts.GetUniqueHostList(currentCluster.EtcdHosts, currentCluster.ControlPlaneHosts, currentCluster.WorkerHosts)
configHosts := hosts.GetUniqueHostList(kubeCluster.EtcdHosts, kubeCluster.ControlPlaneHosts, kubeCluster.WorkerHosts)
for _, host := range configHosts {
for _, currentHost := range currentHosts {
if host.Address == currentHost.Address {
currentHost.IsWorker = host.IsWorker
currentHost.IsEtcd = host.IsEtcd
currentHost.IsControl = host.IsControl
break
}
}
}
}
func (c *Cluster) setReadyEtcdHosts() {
c.EtcdReadyHosts = []*hosts.Host{}
for _, host := range c.EtcdHosts {
if !host.ToAddEtcdMember {
c.EtcdReadyHosts = append(c.EtcdReadyHosts, host)
host.ExistingEtcdCluster = true
}
}
}
func cleanControlNode(ctx context.Context, kubeCluster, currentCluster *Cluster, toDeleteHost *hosts.Host) error {
kubeClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport)
if err != nil {
return fmt.Errorf("Failed to initialize new kubernetes client: %v", err)
}
// if I am deleting a node that's already in the config, it's probably being replaced and I shouldn't remove it from ks8
if !hosts.IsNodeInList(toDeleteHost, kubeCluster.ControlPlaneHosts) {
if err := hosts.DeleteNode(ctx, toDeleteHost, kubeClient, toDeleteHost.IsWorker || toDeleteHost.IsEtcd, kubeCluster.CloudProvider.Name); err != nil {
return fmt.Errorf("Failed to delete controlplane node [%s] from cluster: %v", toDeleteHost.Address, err)
}
}
// attempting to clean services/files on the host
if err := reconcileHost(ctx, toDeleteHost, false, false, currentCluster); err != nil {
log.Warnf(ctx, "[reconcile] Couldn't clean up controlplane node [%s]: %v", toDeleteHost.Address, err)
}
return nil
}
func restartComponentsWhenCertChanges(ctx context.Context, currentCluster, kubeCluster *Cluster) error {
AllCertsMap := map[string]bool{
pki.KubeAPICertName: false,
pki.RequestHeaderCACertName: false,
pki.CACertName: false,
pki.ServiceAccountTokenKeyName: false,
pki.APIProxyClientCertName: false,
pki.KubeControllerCertName: false,
pki.KubeSchedulerCertName: false,
pki.KubeProxyCertName: false,
pki.KubeNodeCertName: false,
}
checkCertificateChanges(ctx, currentCluster, kubeCluster, AllCertsMap)
// check Restart Function
allHosts := hosts.GetUniqueHostList(kubeCluster.EtcdHosts, kubeCluster.ControlPlaneHosts, kubeCluster.WorkerHosts)
AllCertsFuncMap := map[string][]services.RestartFunc{
pki.CACertName: []services.RestartFunc{services.RestartKubeAPI, services.RestartKubeController, services.RestartKubelet},
pki.KubeAPICertName: []services.RestartFunc{services.RestartKubeAPI, services.RestartKubeController},
pki.RequestHeaderCACertName: []services.RestartFunc{services.RestartKubeAPI},
pki.ServiceAccountTokenKeyName: []services.RestartFunc{services.RestartKubeAPI, services.RestartKubeController},
pki.APIProxyClientCertName: []services.RestartFunc{services.RestartKubeAPI},
pki.KubeControllerCertName: []services.RestartFunc{services.RestartKubeController},
pki.KubeSchedulerCertName: []services.RestartFunc{services.RestartScheduler},
pki.KubeProxyCertName: []services.RestartFunc{services.RestartKubeproxy},
pki.KubeNodeCertName: []services.RestartFunc{services.RestartKubelet},
}
for certName, changed := range AllCertsMap {
if changed {
for _, host := range allHosts {
runRestartFuncs(ctx, AllCertsFuncMap, certName, host)
}
}
}
for _, host := range kubeCluster.EtcdHosts {
etcdCertName := pki.GetCrtNameForHost(host, pki.EtcdCertName)
certMap := map[string]bool{
etcdCertName: false,
}
checkCertificateChanges(ctx, currentCluster, kubeCluster, certMap)
if certMap[etcdCertName] || AllCertsMap[pki.CACertName] {
if err := docker.DoRestartContainer(ctx, host.DClient, services.EtcdContainerName, host.HostnameOverride); err != nil {
return err
}
}
}
return nil
}
func runRestartFuncs(ctx context.Context, certFuncMap map[string][]services.RestartFunc, certName string, host *hosts.Host) error {
for _, restartFunc := range certFuncMap[certName] {
if err := restartFunc(ctx, host); err != nil {
return err
}
}
return nil
}
func checkCertificateChanges(ctx context.Context, currentCluster, kubeCluster *Cluster, certMap map[string]bool) {
for certName := range certMap {
if currentCluster.Certificates[certName].CertificatePEM != kubeCluster.Certificates[certName].CertificatePEM {
certMap[certName] = true
continue
}
if !(certName == pki.RequestHeaderCACertName || certName == pki.CACertName) {
if currentCluster.Certificates[certName].KeyPEM != kubeCluster.Certificates[certName].KeyPEM {
certMap[certName] = true
}
}
}
}
func isEtcdPlaneReplaced(ctx context.Context, currentCluster, kubeCluster *Cluster) bool {
numCurrentEtcdHosts := len(currentCluster.EtcdHosts)
// We had and have no etcd hosts, nothing was replaced
if numCurrentEtcdHosts == 0 && len(kubeCluster.EtcdHosts) == 0 {
return false
}
// old etcd nodes are down, we added new ones
numEtcdToDeleteInactive := len(hosts.GetToDeleteHosts(currentCluster.EtcdHosts, kubeCluster.EtcdHosts, kubeCluster.InactiveHosts, true))
if numEtcdToDeleteInactive == numCurrentEtcdHosts {
return true
}
// one or more etcd nodes are removed from cluster.yaml and replaced
if len(hosts.GetHostListIntersect(kubeCluster.EtcdHosts, currentCluster.EtcdHosts)) == 0 {
return true
}
return false
}
func syncTaints(ctx context.Context, currentCluster, kubeCluster *Cluster) {
var currentHosts, expectedHosts []*hosts.Host
var currentTaints, expectedTaints map[string]map[string]string
// handling taints in configuration
if currentCluster != nil {
currentHosts = hosts.GetUniqueHostList(currentCluster.EtcdHosts, currentCluster.ControlPlaneHosts, currentCluster.WorkerHosts)
currentTaints = getHostsTaintsMap(currentHosts)
}
expectedHosts = hosts.GetUniqueHostList(kubeCluster.EtcdHosts, kubeCluster.ControlPlaneHosts, kubeCluster.WorkerHosts)
expectedTaints = getHostsTaintsMap(expectedHosts)
for _, host := range expectedHosts {
var toAddTaints, toDelTaints []string
currentSet := currentTaints[host.Address]
expectedSet := expectedTaints[host.Address]
for key, expected := range expectedSet {
current, ok := currentSet[key]
// create or update taints in host.
// by deleting the old taint and creating the new taint to do the update logic.
if expected != current {
toAddTaints = append(toAddTaints, expected)
if ok { // if found but the values are different, the current taint will be deleted.
toDelTaints = append(toDelTaints, current)
}
}
}
for key, current := range currentSet {
_, ok := expectedSet[key]
if !ok { // remove the taints which can't be found in the expected taints
toDelTaints = append(toDelTaints, current)
}
}
host.ToAddTaints = append(host.ToAddTaints, toAddTaints...)
host.ToDelTaints = append(host.ToDelTaints, toDelTaints...)
}
}
// getHostsTaintsMap return the taint set with unique key & effect for each host
func getHostsTaintsMap(list []*hosts.Host) map[string]map[string]string {
rtn := make(map[string]map[string]string)
for _, item := range list {
set := make(map[string]string)
for _, taint := range item.RKEConfigNode.Taints {
key := getTaintKey(taint)
value := getTaintValue(taint)
if key == unschedulableEtcdTaintKey ||
key == unschedulableControlTaintKey {
logrus.Warnf("taint %s is reserved, ignore this taint", value)
continue
}
if _, ok := set[key]; ok {
logrus.Warnf("duplicated taint %s in host %s, ignore this taint", value, item.Address)
continue
}
set[key] = value
}
if len(set) > 0 {
rtn[item.Address] = set
}
}
return rtn
}
func getTaintKey(taint v3.RKETaint) string {
return fmt.Sprintf("%s=:%s", taint.Key, taint.Effect)
}
func getTaintValue(taint v3.RKETaint) string {
return fmt.Sprintf("%s=%s:%s", taint.Key, taint.Value, taint.Effect)
}