diff --git a/cluster/remove.go b/cluster/remove.go index a9c957de..08fa9981 100644 --- a/cluster/remove.go +++ b/cluster/remove.go @@ -6,7 +6,9 @@ import ( "github.com/rancher/rke/hosts" "github.com/rancher/rke/pki" "github.com/rancher/rke/services" + "github.com/rancher/rke/util" "github.com/rancher/types/apis/management.cattle.io/v3" + "golang.org/x/sync/errgroup" ) func (c *Cluster) ClusterRemove(ctx context.Context) error { @@ -18,15 +20,16 @@ func (c *Cluster) ClusterRemove(ctx context.Context) error { if err := services.RemoveWorkerPlane(ctx, c.WorkerHosts, true); err != nil { return err } - // Remove Contol Plane if err := services.RemoveControlPlane(ctx, c.ControlPlaneHosts, true); err != nil { return err } // Remove Etcd Plane - if err := services.RemoveEtcdPlane(ctx, c.EtcdHosts, true); err != nil { - return err + if !externalEtcd { + if err := services.RemoveEtcdPlane(ctx, c.EtcdHosts, true); err != nil { + return err + } } // Clean up all hosts @@ -39,15 +42,23 @@ func (c *Cluster) ClusterRemove(ctx context.Context) error { } func cleanUpHosts(ctx context.Context, cpHosts, workerHosts, etcdHosts []*hosts.Host, cleanerImage string, prsMap map[string]v3.PrivateRegistry, externalEtcd bool) error { - allHosts := []*hosts.Host{} - allHosts = append(allHosts, cpHosts...) - allHosts = append(allHosts, workerHosts...) - allHosts = append(allHosts, etcdHosts...) - for _, host := range allHosts { - if err := host.CleanUpAll(ctx, cleanerImage, prsMap, externalEtcd); err != nil { - return err - } + uniqueHosts := hosts.GetUniqueHostList(cpHosts, workerHosts, etcdHosts) + + var errgrp errgroup.Group + hostsQueue := util.GetObjectQueue(uniqueHosts) + for w := 0; w < WorkerThreads; w++ { + errgrp.Go(func() error { + var errList []error + for host := range hostsQueue { + runHost := host.(*hosts.Host) + if err := runHost.CleanUpAll(ctx, cleanerImage, prsMap, externalEtcd); err != nil { + errList = append(errList, err) + } + } + return util.ErrList(errList) + }) } - return nil + + return errgrp.Wait() } diff --git a/services/controlplane.go b/services/controlplane.go index 52ed21c6..0e6a7fc9 100644 --- a/services/controlplane.go +++ b/services/controlplane.go @@ -41,41 +41,43 @@ func RunControlPlane(ctx context.Context, controlHosts []*hosts.Host, localConnD func RemoveControlPlane(ctx context.Context, controlHosts []*hosts.Host, force bool) error { log.Infof(ctx, "[%s] Tearing down the Controller Plane..", ControlRole) - for _, host := range controlHosts { - // remove KubeAPI - if err := removeKubeAPI(ctx, host); err != nil { - return err - } - - // remove KubeController - if err := removeKubeController(ctx, host); err != nil { - return nil - } - - // remove scheduler - err := removeScheduler(ctx, host) - if err != nil { - return err - } - - // check if the host already is a worker - if host.IsWorker { - log.Infof(ctx, "[%s] Host [%s] is already a worker host, skipping delete kubelet and kubeproxy.", ControlRole, host.Address) - } else { - // remove KubeAPI - if err := removeKubelet(ctx, host); err != nil { - return err + var errgrp errgroup.Group + hostsQueue := util.GetObjectQueue(controlHosts) + for w := 0; w < WorkerThreads; w++ { + errgrp.Go(func() error { + var errList []error + for host := range hostsQueue { + runHost := host.(*hosts.Host) + if err := removeKubeAPI(ctx, runHost); err != nil { + errList = append(errList, err) + } + if err := removeKubeController(ctx, runHost); err != nil { + errList = append(errList, err) + } + if err := removeScheduler(ctx, runHost); err != nil { + errList = append(errList, err) + } + // force is true in remove, false in reconcile + if force { + if err := removeKubelet(ctx, runHost); err != nil { + errList = append(errList, err) + } + if err := removeKubeproxy(ctx, runHost); err != nil { + errList = append(errList, err) + } + if err := removeSidekick(ctx, runHost); err != nil { + errList = append(errList, err) + } + } } - // remove KubeController - if err := removeKubeproxy(ctx, host); err != nil { - return nil - } - // remove Sidekick - if err := removeSidekick(ctx, host); err != nil { - return err - } - } + return util.ErrList(errList) + }) } + + if err := errgrp.Wait(); err != nil { + return err + } + log.Infof(ctx, "[%s] Successfully tore down Controller Plane..", ControlRole) return nil } diff --git a/services/etcd.go b/services/etcd.go index 8d4044c9..e8892ed3 100644 --- a/services/etcd.go +++ b/services/etcd.go @@ -15,8 +15,10 @@ import ( "github.com/rancher/rke/hosts" "github.com/rancher/rke/log" "github.com/rancher/rke/pki" + "github.com/rancher/rke/util" "github.com/rancher/types/apis/management.cattle.io/v3" "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" ) const ( @@ -72,27 +74,38 @@ func RunEtcdPlane( func RemoveEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, force bool) error { log.Infof(ctx, "[%s] Tearing down etcd plane..", ETCDRole) - for _, host := range etcdHosts { - err := docker.DoRemoveContainer(ctx, host.DClient, EtcdContainerName, host.Address) - if err != nil { - return err - } - if !host.IsWorker || !host.IsControl || force { - // remove unschedulable kubelet on etcd host - if err := removeKubelet(ctx, host); err != nil { - return err - } - if err := removeKubeproxy(ctx, host); err != nil { - return err - } - if err := removeNginxProxy(ctx, host); err != nil { - return err - } - if err := removeSidekick(ctx, host); err != nil { - return err - } - } + var errgrp errgroup.Group + hostsQueue := util.GetObjectQueue(etcdHosts) + for w := 0; w < WorkerThreads; w++ { + errgrp.Go(func() error { + var errList []error + for host := range hostsQueue { + runHost := host.(*hosts.Host) + if err := docker.DoRemoveContainer(ctx, runHost.DClient, EtcdContainerName, runHost.Address); err != nil { + errList = append(errList, err) + } + if !runHost.IsWorker || !runHost.IsControl || force { + // remove unschedulable kubelet on etcd host + if err := removeKubelet(ctx, runHost); err != nil { + errList = append(errList, err) + } + if err := removeKubeproxy(ctx, runHost); err != nil { + errList = append(errList, err) + } + if err := removeNginxProxy(ctx, runHost); err != nil { + errList = append(errList, err) + } + if err := removeSidekick(ctx, runHost); err != nil { + errList = append(errList, err) + } + } + } + return util.ErrList(errList) + }) + } + if err := errgrp.Wait(); err != nil { + return err } log.Infof(ctx, "[%s] Successfully tore down etcd plane..", ETCDRole) return nil diff --git a/services/workerplane.go b/services/workerplane.go index 8b1db3a2..800fe57f 100644 --- a/services/workerplane.go +++ b/services/workerplane.go @@ -63,28 +63,39 @@ func doDeployWorkerPlaneHost(ctx context.Context, host *hosts.Host, localConnDia func RemoveWorkerPlane(ctx context.Context, workerHosts []*hosts.Host, force bool) error { log.Infof(ctx, "[%s] Tearing down Worker Plane..", WorkerRole) - for _, host := range workerHosts { - // check if the host already is a controlplane - if host.IsControl && !force { - log.Infof(ctx, "[%s] Host [%s] is already a controlplane host, nothing to do.", WorkerRole, host.Address) - return nil - } - - if err := removeKubelet(ctx, host); err != nil { - return err - } - if err := removeKubeproxy(ctx, host); err != nil { - return err - } - if err := removeNginxProxy(ctx, host); err != nil { - return err - } - if err := removeSidekick(ctx, host); err != nil { - return err - } - log.Infof(ctx, "[%s] Successfully tore down Worker Plane..", WorkerRole) + var errgrp errgroup.Group + hostsQueue := util.GetObjectQueue(workerHosts) + for w := 0; w < WorkerThreads; w++ { + errgrp.Go(func() error { + var errList []error + for host := range hostsQueue { + runHost := host.(*hosts.Host) + if runHost.IsControl && !force { + log.Infof(ctx, "[%s] Host [%s] is already a controlplane host, nothing to do.", WorkerRole, runHost.Address) + return nil + } + if err := removeKubelet(ctx, runHost); err != nil { + errList = append(errList, err) + } + if err := removeKubeproxy(ctx, runHost); err != nil { + errList = append(errList, err) + } + if err := removeNginxProxy(ctx, runHost); err != nil { + errList = append(errList, err) + } + if err := removeSidekick(ctx, runHost); err != nil { + errList = append(errList, err) + } + } + return util.ErrList(errList) + }) } + if err := errgrp.Wait(); err != nil { + return err + } + log.Infof(ctx, "[%s] Successfully tore down Worker Plane..", WorkerRole) + return nil }