From 828f6e44f563f2a48b48d69a7ae6f20ad03c13a4 Mon Sep 17 00:00:00 2001 From: galal-hussein Date: Fri, 29 Dec 2017 05:33:20 +0200 Subject: [PATCH] Add concurrency to worker deployments --- services/workerplane.go | 71 ++++++++++++++++++++++++----------------- 1 file changed, 41 insertions(+), 30 deletions(-) diff --git a/services/workerplane.go b/services/workerplane.go index dd7d2d70..ded9d49c 100644 --- a/services/workerplane.go +++ b/services/workerplane.go @@ -4,43 +4,32 @@ import ( "github.com/rancher/rke/hosts" "github.com/rancher/types/apis/management.cattle.io/v3" "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" ) func RunWorkerPlane(controlHosts, workerHosts []*hosts.Host, workerServices v3.RKEConfigServices, nginxProxyImage, sidekickImage string, healthcheckDialerFactory hosts.DialerFactory) error { logrus.Infof("[%s] Building up Worker Plane..", WorkerRole) + var errgrp errgroup.Group + + // Deploy worker components on control hosts for _, host := range controlHosts { - // run sidekick - if err := runSidekick(host, sidekickImage); err != nil { - return err - } - // run kubelet - // only one master for now - if err := runKubelet(host, workerServices.Kubelet, healthcheckDialerFactory); err != nil { - return err - } - if err := runKubeproxy(host, workerServices.Kubeproxy, healthcheckDialerFactory); err != nil { - return err - } + controlHost := host + errgrp.Go(func() error { + return doDeployWorkerPlane(controlHost, workerServices, nginxProxyImage, sidekickImage, healthcheckDialerFactory, controlHosts) + }) } + if err := errgrp.Wait(); err != nil { + return err + } + // Deploy worker components on worker hosts for _, host := range workerHosts { - // run nginx proxy - if !host.IsControl { - if err := runNginxProxy(host, controlHosts, nginxProxyImage); err != nil { - return err - } - } - // run sidekick - if err := runSidekick(host, sidekickImage); err != nil { - return err - } - // run kubelet - if err := runKubelet(host, workerServices.Kubelet, healthcheckDialerFactory); err != nil { - return err - } - // run kubeproxy - if err := runKubeproxy(host, workerServices.Kubeproxy, healthcheckDialerFactory); err != nil { - return err - } + workerHost := host + errgrp.Go(func() error { + return doDeployWorkerPlane(workerHost, workerServices, nginxProxyImage, sidekickImage, healthcheckDialerFactory, controlHosts) + }) + } + if err := errgrp.Wait(); err != nil { + return err } logrus.Infof("[%s] Successfully started Worker Plane..", WorkerRole) return nil @@ -72,3 +61,25 @@ func RemoveWorkerPlane(workerHosts []*hosts.Host, force bool) error { return nil } + +func doDeployWorkerPlane(host *hosts.Host, + workerServices v3.RKEConfigServices, + nginxProxyImage, sidekickImage string, + healthcheckDialerFactory hosts.DialerFactory, + controlHosts []*hosts.Host) error { + // run nginx proxy + if !host.IsControl { + if err := runNginxProxy(host, controlHosts, nginxProxyImage); err != nil { + return err + } + } + // run sidekick + if err := runSidekick(host, sidekickImage); err != nil { + return err + } + // run kubelet + if err := runKubelet(host, workerServices.Kubelet, healthcheckDialerFactory); err != nil { + return err + } + return runKubeproxy(host, workerServices.Kubeproxy, healthcheckDialerFactory) +}