diff --git a/cluster/certificates.go b/cluster/certificates.go index bca05317..e75ad84e 100644 --- a/cluster/certificates.go +++ b/cluster/certificates.go @@ -12,6 +12,7 @@ import ( "github.com/rancher/rke/log" "github.com/rancher/rke/pki" "github.com/rancher/rke/services" + "github.com/rancher/rke/util" "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" "k8s.io/client-go/kubernetes" @@ -240,11 +241,17 @@ func saveCertToKubernetes(kubeClient *kubernetes.Clientset, crtName string, crt func deployBackupCertificates(ctx context.Context, backupHosts []*hosts.Host, kubeCluster *Cluster) error { var errgrp errgroup.Group - - for _, host := range backupHosts { - runHost := host + hostsQueue := util.GetObjectQueue(backupHosts) + for w := 0; w < WorkerThreads; w++ { errgrp.Go(func() error { - return pki.DeployCertificatesOnHost(ctx, runHost, kubeCluster.Certificates, kubeCluster.SystemImages.CertDownloader, pki.TempCertPath, kubeCluster.PrivateRegistriesMap) + var errList []error + for host := range hostsQueue { + err := pki.DeployCertificatesOnHost(ctx, host.(*hosts.Host), kubeCluster.Certificates, kubeCluster.SystemImages.CertDownloader, pki.TempCertPath, kubeCluster.PrivateRegistriesMap) + if err != nil { + errList = append(errList, err) + } + } + return util.ErrList(errList) }) } return errgrp.Wait() @@ -282,15 +289,22 @@ func fetchCertificatesFromEtcd(ctx context.Context, kubeCluster *Cluster) ([]byt } func (c *Cluster) SaveBackupCertificateBundle(ctx context.Context) error { - backupHosts := c.getBackupHosts() var errgrp errgroup.Group - for _, host := range backupHosts { - runHost := host + hostsQueue := util.GetObjectQueue(c.getBackupHosts()) + for w := 0; w < WorkerThreads; w++ { errgrp.Go(func() error { - return pki.SaveBackupBundleOnHost(ctx, runHost, c.SystemImages.Alpine, services.EtcdSnapshotPath, c.PrivateRegistriesMap) + var errList []error + for host := range hostsQueue { + err := pki.SaveBackupBundleOnHost(ctx, host.(*hosts.Host), c.SystemImages.Alpine, services.EtcdSnapshotPath, c.PrivateRegistriesMap) + if err != nil { + errList = append(errList, err) + } + } + return util.ErrList(errList) }) } + return errgrp.Wait() } @@ -298,16 +312,20 @@ func (c *Cluster) ExtractBackupCertificateBundle(ctx context.Context) error { backupHosts := c.getBackupHosts() var errgrp errgroup.Group errList := []string{} - for _, host := range backupHosts { - runHost := host + + hostsQueue := util.GetObjectQueue(backupHosts) + for w := 0; w < WorkerThreads; w++ { errgrp.Go(func() error { - if err := pki.ExtractBackupBundleOnHost(ctx, runHost, c.SystemImages.Alpine, services.EtcdSnapshotPath, c.PrivateRegistriesMap); err != nil { - errList = append(errList, fmt.Errorf( - "Failed to extract certificate bundle on host [%s], please make sure etcd bundle exist in /opt/rke/etcd-snapshots/pki.bundle.tar.gz: %v", runHost.Address, err).Error()) + for host := range hostsQueue { + if err := pki.ExtractBackupBundleOnHost(ctx, host.(*hosts.Host), c.SystemImages.Alpine, services.EtcdSnapshotPath, c.PrivateRegistriesMap); err != nil { + errList = append(errList, fmt.Errorf( + "Failed to extract certificate bundle on host [%s], please make sure etcd bundle exist in /opt/rke/etcd-snapshots/pki.bundle.tar.gz: %v", host.(*hosts.Host).Address, err).Error()) + } } return nil }) } + errgrp.Wait() if len(errList) == len(backupHosts) { return fmt.Errorf(strings.Join(errList, ",")) diff --git a/cluster/cluster.go b/cluster/cluster.go index 9f88bf0e..019cb64e 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -16,6 +16,7 @@ import ( "github.com/rancher/rke/log" "github.com/rancher/rke/pki" "github.com/rancher/rke/services" + "github.com/rancher/rke/util" "github.com/rancher/types/apis/management.cattle.io/v3" "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" @@ -65,6 +66,8 @@ const ( ControlPlane = "controlPlane" WorkerPlane = "workerPlan" EtcdPlane = "etcd" + + WorkerThreads = util.WorkerThreads ) func (c *Cluster) DeployControlPlane(ctx context.Context) error { @@ -405,13 +408,22 @@ func setNodeAnnotationsLabelsTaints(k8sClient *kubernetes.Clientset, host *hosts func (c *Cluster) PrePullK8sImages(ctx context.Context) error { log.Infof(ctx, "Pre-pulling kubernetes images") var errgrp errgroup.Group - hosts := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts) - for _, host := range hosts { - runHost := host + hostList := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts) + hostsQueue := util.GetObjectQueue(hostList) + for w := 0; w < WorkerThreads; w++ { errgrp.Go(func() error { - return docker.UseLocalOrPull(ctx, runHost.DClient, runHost.Address, c.SystemImages.Kubernetes, "pre-deploy", c.PrivateRegistriesMap) + var errList []error + for host := range hostsQueue { + runHost := host.(*hosts.Host) + err := docker.UseLocalOrPull(ctx, runHost.DClient, runHost.Address, c.SystemImages.Kubernetes, "pre-deploy", c.PrivateRegistriesMap) + if err != nil { + errList = append(errList, err) + } + } + return util.ErrList(errList) }) } + if err := errgrp.Wait(); err != nil { return err } diff --git a/cluster/hosts.go b/cluster/hosts.go index d7dae17e..7050fccf 100644 --- a/cluster/hosts.go +++ b/cluster/hosts.go @@ -11,6 +11,7 @@ import ( "github.com/rancher/rke/log" "github.com/rancher/rke/pki" "github.com/rancher/rke/services" + "github.com/rancher/rke/util" "github.com/rancher/types/apis/management.cattle.io/v3" "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" @@ -118,13 +119,20 @@ func (c *Cluster) InvertIndexHosts() error { func (c *Cluster) SetUpHosts(ctx context.Context) error { if c.Authentication.Strategy == X509AuthenticationProvider { log.Infof(ctx, "[certificates] Deploying kubernetes certificates to Cluster nodes") - hosts := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts) + hostList := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts) var errgrp errgroup.Group - for _, host := range hosts { - runHost := host + hostsQueue := util.GetObjectQueue(hostList) + for w := 0; w < WorkerThreads; w++ { errgrp.Go(func() error { - return pki.DeployCertificatesOnPlaneHost(ctx, runHost, c.RancherKubernetesEngineConfig, c.Certificates, c.SystemImages.CertDownloader, c.PrivateRegistriesMap) + var errList []error + for host := range hostsQueue { + err := pki.DeployCertificatesOnPlaneHost(ctx, host.(*hosts.Host), c.RancherKubernetesEngineConfig, c.Certificates, c.SystemImages.CertDownloader, c.PrivateRegistriesMap) + if err != nil { + errList = append(errList, err) + } + } + return util.ErrList(errList) }) } if err := errgrp.Wait(); err != nil { @@ -136,7 +144,7 @@ func (c *Cluster) SetUpHosts(ctx context.Context) error { } log.Infof(ctx, "[certificates] Successfully deployed kubernetes certificates to Cluster nodes") if c.CloudProvider.Name != "" { - if err := deployCloudProviderConfig(ctx, hosts, c.SystemImages.Alpine, c.PrivateRegistriesMap, c.CloudConfigFile); err != nil { + if err := deployCloudProviderConfig(ctx, hostList, c.SystemImages.Alpine, c.PrivateRegistriesMap, c.CloudConfigFile); err != nil { return err } log.Infof(ctx, "[%s] Successfully deployed kubernetes cloud config to Cluster nodes", CloudConfigServiceName) diff --git a/cluster/logs.go b/cluster/logs.go index 829a1ec1..88f3e559 100644 --- a/cluster/logs.go +++ b/cluster/logs.go @@ -4,6 +4,7 @@ import ( "context" "github.com/rancher/rke/hosts" + "github.com/rancher/rke/util" "golang.org/x/sync/errgroup" ) @@ -12,13 +13,17 @@ func (c *Cluster) CleanDeadLogs(ctx context.Context) error { var errgrp errgroup.Group - for _, host := range hostList { - if !host.UpdateWorker { - continue - } - runHost := host + hostsQueue := util.GetObjectQueue(hostList) + for w := 0; w < WorkerThreads; w++ { errgrp.Go(func() error { - return hosts.DoRunLogCleaner(ctx, runHost, c.SystemImages.Alpine, c.PrivateRegistriesMap) + var errList []error + for host := range hostsQueue { + err := hosts.DoRunLogCleaner(ctx, host.(*hosts.Host), c.SystemImages.Alpine, c.PrivateRegistriesMap) + if err != nil { + errList = append(errList, err) + } + } + return util.ErrList(errList) }) } return errgrp.Wait() diff --git a/cluster/network.go b/cluster/network.go index d2ecd969..30d456a5 100644 --- a/cluster/network.go +++ b/cluster/network.go @@ -13,6 +13,7 @@ import ( "github.com/rancher/rke/log" "github.com/rancher/rke/pki" "github.com/rancher/rke/templates" + "github.com/rancher/rke/util" "github.com/rancher/types/apis/management.cattle.io/v3" "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" @@ -282,16 +283,24 @@ func (c *Cluster) deployTCPPortListeners(ctx context.Context, currentCluster *Cl return nil } -func (c *Cluster) deployListenerOnPlane(ctx context.Context, portList []string, holstPlane []*hosts.Host, containerName string) error { +func (c *Cluster) deployListenerOnPlane(ctx context.Context, portList []string, hostPlane []*hosts.Host, containerName string) error { var errgrp errgroup.Group - for _, host := range holstPlane { - runHost := host + hostsQueue := util.GetObjectQueue(hostPlane) + for w := 0; w < WorkerThreads; w++ { errgrp.Go(func() error { - return c.deployListener(ctx, runHost, portList, containerName) + var errList []error + for host := range hostsQueue { + err := c.deployListener(ctx, host.(*hosts.Host), portList, containerName) + if err != nil { + errList = append(errList, err) + } + } + return util.ErrList(errList) }) } return errgrp.Wait() } + func (c *Cluster) deployListener(ctx context.Context, host *hosts.Host, portList []string, containerName string) error { imageCfg := &container.Config{ Image: c.SystemImages.Alpine, @@ -342,24 +351,41 @@ func (c *Cluster) removeTCPPortListeners(ctx context.Context) error { func removeListenerFromPlane(ctx context.Context, hostPlane []*hosts.Host, containerName string) error { var errgrp errgroup.Group - for _, host := range hostPlane { - runHost := host + + hostsQueue := util.GetObjectQueue(hostPlane) + for w := 0; w < WorkerThreads; w++ { errgrp.Go(func() error { - return docker.DoRemoveContainer(ctx, runHost.DClient, containerName, runHost.Address) + var errList []error + for host := range hostsQueue { + runHost := host.(*hosts.Host) + err := docker.DoRemoveContainer(ctx, runHost.DClient, containerName, runHost.Address) + if err != nil { + errList = append(errList, err) + } + } + return util.ErrList(errList) }) } return errgrp.Wait() } + func (c *Cluster) runServicePortChecks(ctx context.Context) error { var errgrp errgroup.Group // check etcd <-> etcd // one etcd host is a pass if len(c.EtcdHosts) > 1 { log.Infof(ctx, "[network] Running etcd <-> etcd port checks") - for _, host := range c.EtcdHosts { - runHost := host + hostsQueue := util.GetObjectQueue(c.EtcdHosts) + for w := 0; w < WorkerThreads; w++ { errgrp.Go(func() error { - return checkPlaneTCPPortsFromHost(ctx, runHost, EtcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap) + var errList []error + for host := range hostsQueue { + err := checkPlaneTCPPortsFromHost(ctx, host.(*hosts.Host), EtcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap) + if err != nil { + errList = append(errList, err) + } + } + return util.ErrList(errList) }) } if err := errgrp.Wait(); err != nil { @@ -368,10 +394,17 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error { } // check control -> etcd connectivity log.Infof(ctx, "[network] Running control plane -> etcd port checks") - for _, host := range c.ControlPlaneHosts { - runHost := host + hostsQueue := util.GetObjectQueue(c.ControlPlaneHosts) + for w := 0; w < WorkerThreads; w++ { errgrp.Go(func() error { - return checkPlaneTCPPortsFromHost(ctx, runHost, EtcdClientPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap) + var errList []error + for host := range hostsQueue { + err := checkPlaneTCPPortsFromHost(ctx, host.(*hosts.Host), EtcdClientPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap) + if err != nil { + errList = append(errList, err) + } + } + return util.ErrList(errList) }) } if err := errgrp.Wait(); err != nil { @@ -379,10 +412,17 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error { } // check controle plane -> Workers log.Infof(ctx, "[network] Running control plane -> worker port checks") - for _, host := range c.ControlPlaneHosts { - runHost := host + hostsQueue = util.GetObjectQueue(c.ControlPlaneHosts) + for w := 0; w < WorkerThreads; w++ { errgrp.Go(func() error { - return checkPlaneTCPPortsFromHost(ctx, runHost, WorkerPortList, c.WorkerHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap) + var errList []error + for host := range hostsQueue { + err := checkPlaneTCPPortsFromHost(ctx, host.(*hosts.Host), WorkerPortList, c.WorkerHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap) + if err != nil { + errList = append(errList, err) + } + } + return util.ErrList(errList) }) } if err := errgrp.Wait(); err != nil { @@ -390,10 +430,17 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error { } // check workers -> control plane log.Infof(ctx, "[network] Running workers -> control plane port checks") - for _, host := range c.WorkerHosts { - runHost := host + hostsQueue = util.GetObjectQueue(c.WorkerHosts) + for w := 0; w < WorkerThreads; w++ { errgrp.Go(func() error { - return checkPlaneTCPPortsFromHost(ctx, runHost, ControlPlanePortList, c.ControlPlaneHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap) + var errList []error + for host := range hostsQueue { + err := checkPlaneTCPPortsFromHost(ctx, host.(*hosts.Host), ControlPlanePortList, c.ControlPlaneHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap) + if err != nil { + errList = append(errList, err) + } + } + return util.ErrList(errList) }) } return errgrp.Wait() diff --git a/services/controlplane.go b/services/controlplane.go index 2c23b03d..52ed21c6 100644 --- a/services/controlplane.go +++ b/services/controlplane.go @@ -6,20 +6,30 @@ import ( "github.com/rancher/rke/hosts" "github.com/rancher/rke/log" "github.com/rancher/rke/pki" + "github.com/rancher/rke/util" "github.com/rancher/types/apis/management.cattle.io/v3" "golang.org/x/sync/errgroup" ) func RunControlPlane(ctx context.Context, controlHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, cpNodePlanMap map[string]v3.RKEConfigNodePlan, updateWorkersOnly bool, alpineImage string, certMap map[string]pki.CertificatePKI) error { + if updateWorkersOnly { + return nil + } log.Infof(ctx, "[%s] Building up Controller Plane..", ControlRole) var errgrp errgroup.Group - for _, host := range controlHosts { - runHost := host - if updateWorkersOnly { - continue - } + + hostsQueue := util.GetObjectQueue(controlHosts) + for w := 0; w < WorkerThreads; w++ { errgrp.Go(func() error { - return doDeployControlHost(ctx, runHost, localConnDialerFactory, prsMap, cpNodePlanMap[runHost.Address].Processes, alpineImage, certMap) + var errList []error + for host := range hostsQueue { + runHost := host.(*hosts.Host) + err := doDeployControlHost(ctx, runHost, localConnDialerFactory, prsMap, cpNodePlanMap[runHost.Address].Processes, alpineImage, certMap) + if err != nil { + errList = append(errList, err) + } + } + return util.ErrList(errList) }) } if err := errgrp.Wait(); err != nil { diff --git a/services/services.go b/services/services.go index 3051a3cb..589176f4 100644 --- a/services/services.go +++ b/services/services.go @@ -9,6 +9,7 @@ import ( "github.com/rancher/rke/docker" "github.com/rancher/rke/hosts" "github.com/rancher/rke/log" + "github.com/rancher/rke/util" "github.com/rancher/types/apis/management.cattle.io/v3" "github.com/sirupsen/logrus" ) @@ -40,6 +41,8 @@ const ( KubeControllerPort = 10252 KubeletPort = 10250 KubeproxyPort = 10256 + + WorkerThreads = util.WorkerThreads ) func runSidekick(ctx context.Context, host *hosts.Host, prsMap map[string]v3.PrivateRegistry, sidecarProcess v3.Process) error { diff --git a/services/workerplane.go b/services/workerplane.go index 51dd7f0f..8b1db3a2 100644 --- a/services/workerplane.go +++ b/services/workerplane.go @@ -6,6 +6,7 @@ import ( "github.com/rancher/rke/hosts" "github.com/rancher/rke/log" "github.com/rancher/rke/pki" + "github.com/rancher/rke/util" "github.com/rancher/types/apis/management.cattle.io/v3" "golang.org/x/sync/errgroup" ) @@ -18,29 +19,22 @@ const ( func RunWorkerPlane(ctx context.Context, allHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string) error { log.Infof(ctx, "[%s] Building up Worker Plane..", WorkerRole) var errgrp errgroup.Group - for _, host := range allHosts { - if updateWorkersOnly { - if !host.UpdateWorker { - continue - } - } - if !host.IsWorker { - if host.IsEtcd { - // Add unschedulable taint - host.ToAddTaints = append(host.ToAddTaints, unschedulableEtcdTaint) - } - if host.IsControl { - // Add unschedulable taint - host.ToAddTaints = append(host.ToAddTaints, unschedulableControlTaint) - } - } - runHost := host - // maps are not thread safe - hostProcessMap := copyProcessMap(workerNodePlanMap[runHost.Address].Processes) + + hostsQueue := util.GetObjectQueue(allHosts) + for w := 0; w < WorkerThreads; w++ { errgrp.Go(func() error { - return doDeployWorkerPlane(ctx, runHost, localConnDialerFactory, prsMap, hostProcessMap, certMap, alpineImage) + var errList []error + for host := range hostsQueue { + runHost := host.(*hosts.Host) + err := doDeployWorkerPlaneHost(ctx, runHost, localConnDialerFactory, prsMap, workerNodePlanMap[runHost.Address].Processes, certMap, updateWorkersOnly, alpineImage) + if err != nil { + errList = append(errList, err) + } + } + return util.ErrList(errList) }) } + if err := errgrp.Wait(); err != nil { return err } @@ -48,6 +42,25 @@ func RunWorkerPlane(ctx context.Context, allHosts []*hosts.Host, localConnDialer return nil } +func doDeployWorkerPlaneHost(ctx context.Context, host *hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, processMap map[string]v3.Process, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string) error { + if updateWorkersOnly { + if !host.UpdateWorker { + return nil + } + } + if !host.IsWorker { + if host.IsEtcd { + // Add unschedulable taint + host.ToAddTaints = append(host.ToAddTaints, unschedulableEtcdTaint) + } + if host.IsControl { + // Add unschedulable taint + host.ToAddTaints = append(host.ToAddTaints, unschedulableControlTaint) + } + } + return doDeployWorkerPlane(ctx, host, localConnDialerFactory, prsMap, processMap, certMap, alpineImage) +} + func RemoveWorkerPlane(ctx context.Context, workerHosts []*hosts.Host, force bool) error { log.Infof(ctx, "[%s] Tearing down Worker Plane..", WorkerRole) for _, host := range workerHosts { diff --git a/util/util.go b/util/util.go index 1b5f5001..e1027dbd 100644 --- a/util/util.go +++ b/util/util.go @@ -1,11 +1,17 @@ package util import ( + "fmt" + "reflect" "strings" "github.com/coreos/go-semver/semver" ) +const ( + WorkerThreads = 50 +) + func StrToSemVer(version string) (*semver.Version, error) { v, err := semver.NewVersion(strings.TrimPrefix(version, "v")) if err != nil { @@ -13,3 +19,21 @@ func StrToSemVer(version string) (*semver.Version, error) { } return v, nil } + +func GetObjectQueue(l interface{}) chan interface{} { + s := reflect.ValueOf(l) + c := make(chan interface{}, s.Len()) + + for i := 0; i < s.Len(); i++ { + c <- s.Index(i).Interface() + } + close(c) + return c +} + +func ErrList(e []error) error { + if len(e) > 0 { + return fmt.Errorf("%v", e) + } + return nil +}