mirror of
https://github.com/rancher/rke.git
synced 2025-08-16 22:06:59 +00:00
Refactor taints and labels sync to improve performance
This commit is contained in:
parent
45d79aa359
commit
9c85b5b451
@ -4,7 +4,9 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/rancher/rke/authz"
|
"github.com/rancher/rke/authz"
|
||||||
"github.com/rancher/rke/cloudprovider"
|
"github.com/rancher/rke/cloudprovider"
|
||||||
@ -18,6 +20,7 @@ import (
|
|||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
"gopkg.in/yaml.v2"
|
"gopkg.in/yaml.v2"
|
||||||
|
"k8s.io/api/core/v1"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/tools/clientcmd"
|
"k8s.io/client-go/tools/clientcmd"
|
||||||
"k8s.io/client-go/util/cert"
|
"k8s.io/client-go/util/cert"
|
||||||
@ -53,6 +56,7 @@ const (
|
|||||||
UpdateStateTimeout = 30
|
UpdateStateTimeout = 30
|
||||||
GetStateTimeout = 30
|
GetStateTimeout = 30
|
||||||
KubernetesClientTimeOut = 30
|
KubernetesClientTimeOut = 30
|
||||||
|
SyncWorkers = 10
|
||||||
NoneAuthorizationMode = "none"
|
NoneAuthorizationMode = "none"
|
||||||
LocalNodeAddress = "127.0.0.1"
|
LocalNodeAddress = "127.0.0.1"
|
||||||
LocalNodeHostname = "localhost"
|
LocalNodeHostname = "localhost"
|
||||||
@ -335,23 +339,69 @@ func (c *Cluster) SyncLabelsAndTaints(ctx context.Context, currentCluster *Clust
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Failed to initialize new kubernetes client: %v", err)
|
return fmt.Errorf("Failed to initialize new kubernetes client: %v", err)
|
||||||
}
|
}
|
||||||
for _, host := range hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts) {
|
hostList := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts)
|
||||||
if err := k8s.SetAddressesAnnotations(k8sClient, host.HostnameOverride, host.InternalAddress, host.Address); err != nil {
|
var errgrp errgroup.Group
|
||||||
return err
|
hostQueue := make(chan *hosts.Host, len(hostList))
|
||||||
}
|
for _, host := range hostList {
|
||||||
if err := k8s.SyncLabels(k8sClient, host.HostnameOverride, host.ToAddLabels, host.ToDelLabels); err != nil {
|
hostQueue <- host
|
||||||
return err
|
}
|
||||||
}
|
close(hostQueue)
|
||||||
// Taints are not being added by user
|
|
||||||
if err := k8s.SyncTaints(k8sClient, host.HostnameOverride, host.ToAddTaints, host.ToDelTaints); err != nil {
|
for i := 0; i < SyncWorkers; i++ {
|
||||||
return err
|
w := i
|
||||||
}
|
errgrp.Go(func() error {
|
||||||
|
var errs []error
|
||||||
|
for host := range hostQueue {
|
||||||
|
logrus.Debugf("worker [%d] starting sync for node [%s]", w, host.HostnameOverride)
|
||||||
|
if err := setNodeAnnotationsLabelsTaints(k8sClient, host); err != nil {
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(errs) > 0 {
|
||||||
|
return fmt.Errorf("%v", errs)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if err := errgrp.Wait(); err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
log.Infof(ctx, "[sync] Successfully synced nodes Labels and Taints")
|
log.Infof(ctx, "[sync] Successfully synced nodes Labels and Taints")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func setNodeAnnotationsLabelsTaints(k8sClient *kubernetes.Clientset, host *hosts.Host) error {
|
||||||
|
node := &v1.Node{}
|
||||||
|
var err error
|
||||||
|
for retries := 0; retries <= 5; retries++ {
|
||||||
|
node, err = k8s.GetNode(k8sClient, host.HostnameOverride)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Debugf("[hosts] Can't find node by name [%s], retrying..", host.HostnameOverride)
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
oldNode := node.DeepCopy()
|
||||||
|
k8s.SetNodeAddressesAnnotations(node, host.InternalAddress, host.Address)
|
||||||
|
k8s.SyncNodeLabels(node, host.ToAddLabels, host.ToDelLabels)
|
||||||
|
k8s.SyncNodeTaints(node, host.ToAddTaints, host.ToDelTaints)
|
||||||
|
|
||||||
|
if reflect.DeepEqual(oldNode, node) {
|
||||||
|
logrus.Debugf("skipping syncing labels for node [%s]", node.Name)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
_, err = k8sClient.CoreV1().Nodes().Update(node)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Debugf("Error syncing labels for node [%s]: %v", node.Name, err)
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Cluster) PrePullK8sImages(ctx context.Context) error {
|
func (c *Cluster) PrePullK8sImages(ctx context.Context) error {
|
||||||
log.Infof(ctx, "Pre-pulling kubernetes images")
|
log.Infof(ctx, "Pre-pulling kubernetes images")
|
||||||
var errgrp errgroup.Group
|
var errgrp errgroup.Group
|
||||||
|
105
k8s/node.go
105
k8s/node.go
@ -2,11 +2,9 @@ package k8s
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
@ -128,49 +126,7 @@ func RemoveTaintFromNodeByKey(k8sClient *kubernetes.Clientset, nodeName, taintKe
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func SyncLabels(k8sClient *kubernetes.Clientset, nodeName string, toAddLabels, toDelLabels map[string]string) error {
|
func SyncNodeLabels(node *v1.Node, toAddLabels, toDelLabels map[string]string) {
|
||||||
updated := false
|
|
||||||
var err error
|
|
||||||
for retries := 0; retries <= 5; retries++ {
|
|
||||||
if err = doSyncLabels(k8sClient, nodeName, toAddLabels, toDelLabels); err != nil {
|
|
||||||
time.Sleep(5 * time.Second)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
updated = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if !updated {
|
|
||||||
return fmt.Errorf("Timeout waiting for labels to be synced for node [%s]: %v", nodeName, err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func SyncTaints(k8sClient *kubernetes.Clientset, nodeName string, toAddTaints, toDelTaints []string) error {
|
|
||||||
updated := false
|
|
||||||
var err error
|
|
||||||
for retries := 0; retries <= 5; retries++ {
|
|
||||||
if err = doSyncTaints(k8sClient, nodeName, toAddTaints, toDelTaints); err != nil {
|
|
||||||
time.Sleep(5 * time.Second)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
updated = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if !updated {
|
|
||||||
return fmt.Errorf("Timeout waiting for node [%s] to be updated with new set of taints: %v", nodeName, err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func doSyncLabels(k8sClient *kubernetes.Clientset, nodeName string, toAddLabels, toDelLabels map[string]string) error {
|
|
||||||
node, err := GetNode(k8sClient, nodeName)
|
|
||||||
if err != nil {
|
|
||||||
if apierrors.IsNotFound(err) {
|
|
||||||
logrus.Debugf("[hosts] Can't find node by name [%s]", nodeName)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
oldLabels := map[string]string{}
|
oldLabels := map[string]string{}
|
||||||
if node.Labels == nil {
|
if node.Labels == nil {
|
||||||
node.Labels = map[string]string{}
|
node.Labels = map[string]string{}
|
||||||
@ -190,27 +146,9 @@ func doSyncLabels(k8sClient *kubernetes.Clientset, nodeName string, toAddLabels,
|
|||||||
for key, value := range toAddLabels {
|
for key, value := range toAddLabels {
|
||||||
node.Labels[key] = value
|
node.Labels[key] = value
|
||||||
}
|
}
|
||||||
if reflect.DeepEqual(oldLabels, node.Labels) {
|
|
||||||
logrus.Debugf("Labels are not changed for node [%s]", node.Name)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
_, err = k8sClient.CoreV1().Nodes().Update(node)
|
|
||||||
if err != nil {
|
|
||||||
logrus.Debugf("Error syncing labels for node [%s]: %v", node.Name, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func doSyncTaints(k8sClient *kubernetes.Clientset, nodeName string, toAddTaints, toDelTaints []string) error {
|
func SyncNodeTaints(node *v1.Node, toAddTaints, toDelTaints []string) {
|
||||||
node, err := GetNode(k8sClient, nodeName)
|
|
||||||
if err != nil {
|
|
||||||
if apierrors.IsNotFound(err) {
|
|
||||||
logrus.Debugf("[hosts] Can't find node by name [%s]", nodeName)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// Add taints to node
|
// Add taints to node
|
||||||
for _, taintStr := range toAddTaints {
|
for _, taintStr := range toAddTaints {
|
||||||
if isTaintExist(toTaint(taintStr), node.Spec.Taints) {
|
if isTaintExist(toTaint(taintStr), node.Spec.Taints) {
|
||||||
@ -222,14 +160,6 @@ func doSyncTaints(k8sClient *kubernetes.Clientset, nodeName string, toAddTaints,
|
|||||||
for _, taintStr := range toDelTaints {
|
for _, taintStr := range toDelTaints {
|
||||||
node.Spec.Taints = delTaintFromList(node.Spec.Taints, toTaint(taintStr))
|
node.Spec.Taints = delTaintFromList(node.Spec.Taints, toTaint(taintStr))
|
||||||
}
|
}
|
||||||
|
|
||||||
//node.Spec.Taints
|
|
||||||
_, err = k8sClient.CoreV1().Nodes().Update(node)
|
|
||||||
if err != nil {
|
|
||||||
logrus.Debugf("Error updating node [%s] with new set of taints: %v", node.Name, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func isTaintExist(taint v1.Taint, taintList []v1.Taint) bool {
|
func isTaintExist(taint v1.Taint, taintList []v1.Taint) bool {
|
||||||
@ -254,31 +184,14 @@ func toTaint(taintStr string) v1.Taint {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func SetAddressesAnnotations(k8sClient *kubernetes.Clientset, nodeName, internalAddress, externalAddress string) error {
|
func SetNodeAddressesAnnotations(node *v1.Node, internalAddress, externalAddress string) {
|
||||||
var listErr error
|
currentExternalAnnotation := node.Annotations[ExternalAddressAnnotation]
|
||||||
for retries := 0; retries <= 5; retries++ {
|
currentInternalAnnotation := node.Annotations[ExternalAddressAnnotation]
|
||||||
node, err := GetNode(k8sClient, nodeName)
|
if currentExternalAnnotation == externalAddress && currentInternalAnnotation == internalAddress {
|
||||||
if err != nil {
|
return
|
||||||
listErr = errors.Wrapf(err, "Failed to get kubernetes node [%s]", nodeName)
|
|
||||||
time.Sleep(time.Second * 5)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
currentExternalAnnotation := node.Annotations[ExternalAddressAnnotation]
|
|
||||||
currentInternalAnnotation := node.Annotations[ExternalAddressAnnotation]
|
|
||||||
if currentExternalAnnotation == externalAddress && currentInternalAnnotation == internalAddress {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
node.Annotations[ExternalAddressAnnotation] = externalAddress
|
|
||||||
node.Annotations[InternalAddressAnnotation] = internalAddress
|
|
||||||
_, err = k8sClient.CoreV1().Nodes().Update(node)
|
|
||||||
if err != nil {
|
|
||||||
listErr = errors.Wrapf(err, "Error updating node [%s] with address annotations: %v", nodeName, err)
|
|
||||||
time.Sleep(time.Second * 5)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
return listErr
|
node.Annotations[ExternalAddressAnnotation] = externalAddress
|
||||||
|
node.Annotations[InternalAddressAnnotation] = internalAddress
|
||||||
}
|
}
|
||||||
|
|
||||||
func delTaintFromList(l []v1.Taint, t v1.Taint) []v1.Taint {
|
func delTaintFromList(l []v1.Taint, t v1.Taint) []v1.Taint {
|
||||||
|
Loading…
Reference in New Issue
Block a user