1
0
mirror of https://github.com/rancher/rke.git synced 2025-09-16 15:10:12 +00:00

Switch all concurrent tasks to use worker pool

This commit is contained in:
moelsayed
2018-10-18 00:26:54 +02:00
committed by Alena Prokharchyk
parent 5163f2a00f
commit 90c426d73e
9 changed files with 213 additions and 73 deletions

View File

@@ -13,6 +13,7 @@ import (
"github.com/rancher/rke/log"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/templates"
"github.com/rancher/rke/util"
"github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
@@ -282,16 +283,24 @@ func (c *Cluster) deployTCPPortListeners(ctx context.Context, currentCluster *Cl
return nil
}
func (c *Cluster) deployListenerOnPlane(ctx context.Context, portList []string, holstPlane []*hosts.Host, containerName string) error {
func (c *Cluster) deployListenerOnPlane(ctx context.Context, portList []string, hostPlane []*hosts.Host, containerName string) error {
var errgrp errgroup.Group
for _, host := range holstPlane {
runHost := host
hostsQueue := util.GetObjectQueue(hostPlane)
for w := 0; w < WorkerThreads; w++ {
errgrp.Go(func() error {
return c.deployListener(ctx, runHost, portList, containerName)
var errList []error
for host := range hostsQueue {
err := c.deployListener(ctx, host.(*hosts.Host), portList, containerName)
if err != nil {
errList = append(errList, err)
}
}
return util.ErrList(errList)
})
}
return errgrp.Wait()
}
func (c *Cluster) deployListener(ctx context.Context, host *hosts.Host, portList []string, containerName string) error {
imageCfg := &container.Config{
Image: c.SystemImages.Alpine,
@@ -342,24 +351,41 @@ func (c *Cluster) removeTCPPortListeners(ctx context.Context) error {
func removeListenerFromPlane(ctx context.Context, hostPlane []*hosts.Host, containerName string) error {
var errgrp errgroup.Group
for _, host := range hostPlane {
runHost := host
hostsQueue := util.GetObjectQueue(hostPlane)
for w := 0; w < WorkerThreads; w++ {
errgrp.Go(func() error {
return docker.DoRemoveContainer(ctx, runHost.DClient, containerName, runHost.Address)
var errList []error
for host := range hostsQueue {
runHost := host.(*hosts.Host)
err := docker.DoRemoveContainer(ctx, runHost.DClient, containerName, runHost.Address)
if err != nil {
errList = append(errList, err)
}
}
return util.ErrList(errList)
})
}
return errgrp.Wait()
}
func (c *Cluster) runServicePortChecks(ctx context.Context) error {
var errgrp errgroup.Group
// check etcd <-> etcd
// one etcd host is a pass
if len(c.EtcdHosts) > 1 {
log.Infof(ctx, "[network] Running etcd <-> etcd port checks")
for _, host := range c.EtcdHosts {
runHost := host
hostsQueue := util.GetObjectQueue(c.EtcdHosts)
for w := 0; w < WorkerThreads; w++ {
errgrp.Go(func() error {
return checkPlaneTCPPortsFromHost(ctx, runHost, EtcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
var errList []error
for host := range hostsQueue {
err := checkPlaneTCPPortsFromHost(ctx, host.(*hosts.Host), EtcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
if err != nil {
errList = append(errList, err)
}
}
return util.ErrList(errList)
})
}
if err := errgrp.Wait(); err != nil {
@@ -368,10 +394,17 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error {
}
// check control -> etcd connectivity
log.Infof(ctx, "[network] Running control plane -> etcd port checks")
for _, host := range c.ControlPlaneHosts {
runHost := host
hostsQueue := util.GetObjectQueue(c.ControlPlaneHosts)
for w := 0; w < WorkerThreads; w++ {
errgrp.Go(func() error {
return checkPlaneTCPPortsFromHost(ctx, runHost, EtcdClientPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
var errList []error
for host := range hostsQueue {
err := checkPlaneTCPPortsFromHost(ctx, host.(*hosts.Host), EtcdClientPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
if err != nil {
errList = append(errList, err)
}
}
return util.ErrList(errList)
})
}
if err := errgrp.Wait(); err != nil {
@@ -379,10 +412,17 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error {
}
// check controle plane -> Workers
log.Infof(ctx, "[network] Running control plane -> worker port checks")
for _, host := range c.ControlPlaneHosts {
runHost := host
hostsQueue = util.GetObjectQueue(c.ControlPlaneHosts)
for w := 0; w < WorkerThreads; w++ {
errgrp.Go(func() error {
return checkPlaneTCPPortsFromHost(ctx, runHost, WorkerPortList, c.WorkerHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
var errList []error
for host := range hostsQueue {
err := checkPlaneTCPPortsFromHost(ctx, host.(*hosts.Host), WorkerPortList, c.WorkerHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
if err != nil {
errList = append(errList, err)
}
}
return util.ErrList(errList)
})
}
if err := errgrp.Wait(); err != nil {
@@ -390,10 +430,17 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error {
}
// check workers -> control plane
log.Infof(ctx, "[network] Running workers -> control plane port checks")
for _, host := range c.WorkerHosts {
runHost := host
hostsQueue = util.GetObjectQueue(c.WorkerHosts)
for w := 0; w < WorkerThreads; w++ {
errgrp.Go(func() error {
return checkPlaneTCPPortsFromHost(ctx, runHost, ControlPlanePortList, c.ControlPlaneHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
var errList []error
for host := range hostsQueue {
err := checkPlaneTCPPortsFromHost(ctx, host.(*hosts.Host), ControlPlanePortList, c.ControlPlaneHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
if err != nil {
errList = append(errList, err)
}
}
return util.ErrList(errList)
})
}
return errgrp.Wait()