diff --git a/cluster/cluster.go b/cluster/cluster.go index 61bef66a..9f88bf0e 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -4,7 +4,9 @@ import ( "context" "fmt" "net" + "reflect" "strings" + "time" "github.com/rancher/rke/authz" "github.com/rancher/rke/cloudprovider" @@ -18,6 +20,7 @@ import ( "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" "gopkg.in/yaml.v2" + "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/cert" @@ -53,6 +56,7 @@ const ( UpdateStateTimeout = 30 GetStateTimeout = 30 KubernetesClientTimeOut = 30 + SyncWorkers = 10 NoneAuthorizationMode = "none" LocalNodeAddress = "127.0.0.1" LocalNodeHostname = "localhost" @@ -335,23 +339,69 @@ func (c *Cluster) SyncLabelsAndTaints(ctx context.Context, currentCluster *Clust if err != nil { return fmt.Errorf("Failed to initialize new kubernetes client: %v", err) } - for _, host := range hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts) { - if err := k8s.SetAddressesAnnotations(k8sClient, host.HostnameOverride, host.InternalAddress, host.Address); err != nil { - return err - } - if err := k8s.SyncLabels(k8sClient, host.HostnameOverride, host.ToAddLabels, host.ToDelLabels); err != nil { - return err - } - // Taints are not being added by user - if err := k8s.SyncTaints(k8sClient, host.HostnameOverride, host.ToAddTaints, host.ToDelTaints); err != nil { - return err - } + hostList := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts) + var errgrp errgroup.Group + hostQueue := make(chan *hosts.Host, len(hostList)) + for _, host := range hostList { + hostQueue <- host + } + close(hostQueue) + + for i := 0; i < SyncWorkers; i++ { + w := i + errgrp.Go(func() error { + var errs []error + for host := range hostQueue { + logrus.Debugf("worker [%d] starting sync for node [%s]", w, host.HostnameOverride) + if err := setNodeAnnotationsLabelsTaints(k8sClient, host); err != nil { + errs = append(errs, err) + } + } + if len(errs) > 0 { + return fmt.Errorf("%v", errs) + } + return nil + }) + } + if err := errgrp.Wait(); err != nil { + return err } log.Infof(ctx, "[sync] Successfully synced nodes Labels and Taints") } return nil } +func setNodeAnnotationsLabelsTaints(k8sClient *kubernetes.Clientset, host *hosts.Host) error { + node := &v1.Node{} + var err error + for retries := 0; retries <= 5; retries++ { + node, err = k8s.GetNode(k8sClient, host.HostnameOverride) + if err != nil { + logrus.Debugf("[hosts] Can't find node by name [%s], retrying..", host.HostnameOverride) + time.Sleep(2 * time.Second) + continue + } + + oldNode := node.DeepCopy() + k8s.SetNodeAddressesAnnotations(node, host.InternalAddress, host.Address) + k8s.SyncNodeLabels(node, host.ToAddLabels, host.ToDelLabels) + k8s.SyncNodeTaints(node, host.ToAddTaints, host.ToDelTaints) + + if reflect.DeepEqual(oldNode, node) { + logrus.Debugf("skipping syncing labels for node [%s]", node.Name) + return nil + } + _, err = k8sClient.CoreV1().Nodes().Update(node) + if err != nil { + logrus.Debugf("Error syncing labels for node [%s]: %v", node.Name, err) + time.Sleep(5 * time.Second) + continue + } + return nil + } + return err +} + func (c *Cluster) PrePullK8sImages(ctx context.Context) error { log.Infof(ctx, "Pre-pulling kubernetes images") var errgrp errgroup.Group diff --git a/k8s/node.go b/k8s/node.go index e7e777a5..66a666e0 100644 --- a/k8s/node.go +++ b/k8s/node.go @@ -2,11 +2,9 @@ package k8s import ( "fmt" - "reflect" "strings" "time" - "github.com/pkg/errors" "github.com/sirupsen/logrus" "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -128,49 +126,7 @@ func RemoveTaintFromNodeByKey(k8sClient *kubernetes.Clientset, nodeName, taintKe return nil } -func SyncLabels(k8sClient *kubernetes.Clientset, nodeName string, toAddLabels, toDelLabels map[string]string) error { - updated := false - var err error - for retries := 0; retries <= 5; retries++ { - if err = doSyncLabels(k8sClient, nodeName, toAddLabels, toDelLabels); err != nil { - time.Sleep(5 * time.Second) - continue - } - updated = true - break - } - if !updated { - return fmt.Errorf("Timeout waiting for labels to be synced for node [%s]: %v", nodeName, err) - } - return nil -} - -func SyncTaints(k8sClient *kubernetes.Clientset, nodeName string, toAddTaints, toDelTaints []string) error { - updated := false - var err error - for retries := 0; retries <= 5; retries++ { - if err = doSyncTaints(k8sClient, nodeName, toAddTaints, toDelTaints); err != nil { - time.Sleep(5 * time.Second) - continue - } - updated = true - break - } - if !updated { - return fmt.Errorf("Timeout waiting for node [%s] to be updated with new set of taints: %v", nodeName, err) - } - return nil -} - -func doSyncLabels(k8sClient *kubernetes.Clientset, nodeName string, toAddLabels, toDelLabels map[string]string) error { - node, err := GetNode(k8sClient, nodeName) - if err != nil { - if apierrors.IsNotFound(err) { - logrus.Debugf("[hosts] Can't find node by name [%s]", nodeName) - return nil - } - return err - } +func SyncNodeLabels(node *v1.Node, toAddLabels, toDelLabels map[string]string) { oldLabels := map[string]string{} if node.Labels == nil { node.Labels = map[string]string{} @@ -190,27 +146,9 @@ func doSyncLabels(k8sClient *kubernetes.Clientset, nodeName string, toAddLabels, for key, value := range toAddLabels { node.Labels[key] = value } - if reflect.DeepEqual(oldLabels, node.Labels) { - logrus.Debugf("Labels are not changed for node [%s]", node.Name) - return nil - } - _, err = k8sClient.CoreV1().Nodes().Update(node) - if err != nil { - logrus.Debugf("Error syncing labels for node [%s]: %v", node.Name, err) - return err - } - return nil } -func doSyncTaints(k8sClient *kubernetes.Clientset, nodeName string, toAddTaints, toDelTaints []string) error { - node, err := GetNode(k8sClient, nodeName) - if err != nil { - if apierrors.IsNotFound(err) { - logrus.Debugf("[hosts] Can't find node by name [%s]", nodeName) - return nil - } - return err - } +func SyncNodeTaints(node *v1.Node, toAddTaints, toDelTaints []string) { // Add taints to node for _, taintStr := range toAddTaints { if isTaintExist(toTaint(taintStr), node.Spec.Taints) { @@ -222,14 +160,6 @@ func doSyncTaints(k8sClient *kubernetes.Clientset, nodeName string, toAddTaints, for _, taintStr := range toDelTaints { node.Spec.Taints = delTaintFromList(node.Spec.Taints, toTaint(taintStr)) } - - //node.Spec.Taints - _, err = k8sClient.CoreV1().Nodes().Update(node) - if err != nil { - logrus.Debugf("Error updating node [%s] with new set of taints: %v", node.Name, err) - return err - } - return nil } func isTaintExist(taint v1.Taint, taintList []v1.Taint) bool { @@ -254,31 +184,14 @@ func toTaint(taintStr string) v1.Taint { } } -func SetAddressesAnnotations(k8sClient *kubernetes.Clientset, nodeName, internalAddress, externalAddress string) error { - var listErr error - for retries := 0; retries <= 5; retries++ { - node, err := GetNode(k8sClient, nodeName) - if err != nil { - listErr = errors.Wrapf(err, "Failed to get kubernetes node [%s]", nodeName) - time.Sleep(time.Second * 5) - continue - } - currentExternalAnnotation := node.Annotations[ExternalAddressAnnotation] - currentInternalAnnotation := node.Annotations[ExternalAddressAnnotation] - if currentExternalAnnotation == externalAddress && currentInternalAnnotation == internalAddress { - return nil - } - node.Annotations[ExternalAddressAnnotation] = externalAddress - node.Annotations[InternalAddressAnnotation] = internalAddress - _, err = k8sClient.CoreV1().Nodes().Update(node) - if err != nil { - listErr = errors.Wrapf(err, "Error updating node [%s] with address annotations: %v", nodeName, err) - time.Sleep(time.Second * 5) - continue - } - return nil +func SetNodeAddressesAnnotations(node *v1.Node, internalAddress, externalAddress string) { + currentExternalAnnotation := node.Annotations[ExternalAddressAnnotation] + currentInternalAnnotation := node.Annotations[ExternalAddressAnnotation] + if currentExternalAnnotation == externalAddress && currentInternalAnnotation == internalAddress { + return } - return listErr + node.Annotations[ExternalAddressAnnotation] = externalAddress + node.Annotations[InternalAddressAnnotation] = internalAddress } func delTaintFromList(l []v1.Taint, t v1.Taint) []v1.Taint {