1
0
mirror of https://github.com/rancher/rke.git synced 2025-09-01 23:16:22 +00:00

Pass private registries list through the function calls

This commit is contained in:
moelsayed
2018-01-31 19:50:55 +02:00
parent cb290d23e3
commit 4159d7f156
17 changed files with 93 additions and 86 deletions

View File

@@ -22,7 +22,7 @@ func SetUpAuthentication(ctx context.Context, kubeCluster, currentCluster *Clust
kubeCluster.Certificates = currentCluster.Certificates kubeCluster.Certificates = currentCluster.Certificates
} else { } else {
log.Infof(ctx, "[certificates] Attempting to recover certificates from backup on host [%s]", kubeCluster.EtcdHosts[0].Address) log.Infof(ctx, "[certificates] Attempting to recover certificates from backup on host [%s]", kubeCluster.EtcdHosts[0].Address)
kubeCluster.Certificates, err = pki.FetchCertificatesFromHost(ctx, kubeCluster.EtcdHosts, kubeCluster.EtcdHosts[0], kubeCluster.SystemImages.Alpine, kubeCluster.LocalKubeConfigPath) kubeCluster.Certificates, err = pki.FetchCertificatesFromHost(ctx, kubeCluster.EtcdHosts, kubeCluster.EtcdHosts[0], kubeCluster.SystemImages.Alpine, kubeCluster.LocalKubeConfigPath, kubeCluster.PrivateRegistriesMap)
if err != nil { if err != nil {
return err return err
} }
@@ -42,7 +42,7 @@ func SetUpAuthentication(ctx context.Context, kubeCluster, currentCluster *Clust
return fmt.Errorf("Failed to generate Kubernetes certificates: %v", err) return fmt.Errorf("Failed to generate Kubernetes certificates: %v", err)
} }
log.Infof(ctx, "[certificates] Temporarily saving certs to etcd host [%s]", kubeCluster.EtcdHosts[0].Address) log.Infof(ctx, "[certificates] Temporarily saving certs to etcd host [%s]", kubeCluster.EtcdHosts[0].Address)
if err := pki.DeployCertificatesOnHost(ctx, kubeCluster.EtcdHosts, kubeCluster.EtcdHosts[0], kubeCluster.Certificates, kubeCluster.SystemImages.CertDownloader, pki.TempCertPath); err != nil { if err := pki.DeployCertificatesOnHost(ctx, kubeCluster.EtcdHosts, kubeCluster.EtcdHosts[0], kubeCluster.Certificates, kubeCluster.SystemImages.CertDownloader, pki.TempCertPath, kubeCluster.PrivateRegistriesMap); err != nil {
return err return err
} }
log.Infof(ctx, "[certificates] Saved certs to etcd host [%s]", kubeCluster.EtcdHosts[0].Address) log.Infof(ctx, "[certificates] Saved certs to etcd host [%s]", kubeCluster.EtcdHosts[0].Address)

View File

@@ -72,14 +72,14 @@ func (c *Cluster) InvertIndexHosts() error {
func (c *Cluster) SetUpHosts(ctx context.Context) error { func (c *Cluster) SetUpHosts(ctx context.Context) error {
if c.Authentication.Strategy == X509AuthenticationProvider { if c.Authentication.Strategy == X509AuthenticationProvider {
log.Infof(ctx, "[certificates] Deploying kubernetes certificates to Cluster nodes") log.Infof(ctx, "[certificates] Deploying kubernetes certificates to Cluster nodes")
if err := pki.DeployCertificatesOnMasters(ctx, c.ControlPlaneHosts, c.Certificates, c.SystemImages.CertDownloader); err != nil { if err := pki.DeployCertificatesOnMasters(ctx, c.ControlPlaneHosts, c.Certificates, c.SystemImages.CertDownloader, c.PrivateRegistriesMap); err != nil {
return err return err
} }
if err := pki.DeployCertificatesOnWorkers(ctx, c.WorkerHosts, c.Certificates, c.SystemImages.CertDownloader); err != nil { if err := pki.DeployCertificatesOnWorkers(ctx, c.WorkerHosts, c.Certificates, c.SystemImages.CertDownloader, c.PrivateRegistriesMap); err != nil {
return err return err
} }
// Deploying etcd certificates // Deploying etcd certificates
if err := pki.DeployCertificatesOnEtcd(ctx, c.EtcdHosts, c.Certificates, c.SystemImages.CertDownloader); err != nil { if err := pki.DeployCertificatesOnEtcd(ctx, c.EtcdHosts, c.Certificates, c.SystemImages.CertDownloader, c.PrivateRegistriesMap); err != nil {
return err return err
} }

View File

@@ -18,6 +18,7 @@ import (
"github.com/rancher/rke/pki" "github.com/rancher/rke/pki"
"github.com/rancher/rke/services" "github.com/rancher/rke/services"
"github.com/rancher/rke/templates" "github.com/rancher/rke/templates"
"github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"k8s.io/client-go/util/cert" "k8s.io/client-go/util/cert"
@@ -363,7 +364,7 @@ func (c *Cluster) deployListener(ctx context.Context, host *hosts.Host, portList
} }
logrus.Debugf("[network] Starting deployListener [%s] on host [%s]", containerName, host.Address) logrus.Debugf("[network] Starting deployListener [%s] on host [%s]", containerName, host.Address)
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, containerName, host.Address, "network"); err != nil { if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, containerName, host.Address, "network", c.PrivateRegistriesMap); err != nil {
if strings.Contains(err.Error(), "bind: address already in use") { if strings.Contains(err.Error(), "bind: address already in use") {
logrus.Debugf("[network] Service is already up on host [%s]", host.Address) logrus.Debugf("[network] Service is already up on host [%s]", host.Address)
return nil return nil
@@ -412,7 +413,7 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error {
for _, host := range c.EtcdHosts { for _, host := range c.EtcdHosts {
runHost := host runHost := host
errgrp.Go(func() error { errgrp.Go(func() error {
return checkPlaneTCPPortsFromHost(ctx, runHost, etcdPortList, c.EtcdHosts, c.SystemImages.Alpine) return checkPlaneTCPPortsFromHost(ctx, runHost, etcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
}) })
} }
if err := errgrp.Wait(); err != nil { if err := errgrp.Wait(); err != nil {
@@ -424,7 +425,7 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error {
for _, host := range c.ControlPlaneHosts { for _, host := range c.ControlPlaneHosts {
runHost := host runHost := host
errgrp.Go(func() error { errgrp.Go(func() error {
return checkPlaneTCPPortsFromHost(ctx, runHost, etcdPortList, c.EtcdHosts, c.SystemImages.Alpine) return checkPlaneTCPPortsFromHost(ctx, runHost, etcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
}) })
} }
if err := errgrp.Wait(); err != nil { if err := errgrp.Wait(); err != nil {
@@ -434,7 +435,7 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error {
for _, host := range c.WorkerHosts { for _, host := range c.WorkerHosts {
runHost := host runHost := host
errgrp.Go(func() error { errgrp.Go(func() error {
return checkPlaneTCPPortsFromHost(ctx, runHost, etcdPortList, c.EtcdHosts, c.SystemImages.Alpine) return checkPlaneTCPPortsFromHost(ctx, runHost, etcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
}) })
} }
if err := errgrp.Wait(); err != nil { if err := errgrp.Wait(); err != nil {
@@ -448,7 +449,7 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error {
for _, host := range c.ControlPlaneHosts { for _, host := range c.ControlPlaneHosts {
runHost := host runHost := host
errgrp.Go(func() error { errgrp.Go(func() error {
return checkPlaneTCPPortsFromHost(ctx, runHost, workerPortList, c.WorkerHosts, c.SystemImages.Alpine) return checkPlaneTCPPortsFromHost(ctx, runHost, workerPortList, c.WorkerHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
}) })
} }
if err := errgrp.Wait(); err != nil { if err := errgrp.Wait(); err != nil {
@@ -462,13 +463,13 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error {
for _, host := range c.WorkerHosts { for _, host := range c.WorkerHosts {
runHost := host runHost := host
errgrp.Go(func() error { errgrp.Go(func() error {
return checkPlaneTCPPortsFromHost(ctx, runHost, controlPlanePortList, c.ControlPlaneHosts, c.SystemImages.Alpine) return checkPlaneTCPPortsFromHost(ctx, runHost, controlPlanePortList, c.ControlPlaneHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap)
}) })
} }
return errgrp.Wait() return errgrp.Wait()
} }
func checkPlaneTCPPortsFromHost(ctx context.Context, host *hosts.Host, portList []string, planeHosts []*hosts.Host, image string) error { func checkPlaneTCPPortsFromHost(ctx context.Context, host *hosts.Host, portList []string, planeHosts []*hosts.Host, image string, prsMap map[string]v3.PrivateRegistry) error {
hosts := []string{} hosts := []string{}
for _, host := range planeHosts { for _, host := range planeHosts {
hosts = append(hosts, host.InternalAddress) hosts = append(hosts, host.InternalAddress)
@@ -492,7 +493,7 @@ func checkPlaneTCPPortsFromHost(ctx context.Context, host *hosts.Host, portList
if err := docker.DoRemoveContainer(ctx, host.DClient, PortCheckContainer, host.Address); err != nil { if err := docker.DoRemoveContainer(ctx, host.DClient, PortCheckContainer, host.Address); err != nil {
return err return err
} }
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, PortCheckContainer, host.Address, "network"); err != nil { if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, PortCheckContainer, host.Address, "network", prsMap); err != nil {
return err return err
} }
if err := docker.WaitForContainer(ctx, host.DClient, PortCheckContainer); err != nil { if err := docker.WaitForContainer(ctx, host.DClient, PortCheckContainer); err != nil {

View File

@@ -9,6 +9,7 @@ import (
"github.com/rancher/rke/log" "github.com/rancher/rke/log"
"github.com/rancher/rke/pki" "github.com/rancher/rke/pki"
"github.com/rancher/rke/services" "github.com/rancher/rke/services"
"github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/cert" "k8s.io/client-go/util/cert"
@@ -56,7 +57,7 @@ func reconcileWorker(ctx context.Context, currentCluster, kubeCluster *Cluster,
return fmt.Errorf("Failed to delete worker node %s from cluster", toDeleteHost.Address) return fmt.Errorf("Failed to delete worker node %s from cluster", toDeleteHost.Address)
} }
// attempting to clean services/files on the host // attempting to clean services/files on the host
if err := reconcileHost(ctx, toDeleteHost, true, false, currentCluster.SystemImages.Alpine, currentCluster.DockerDialerFactory); err != nil { if err := reconcileHost(ctx, toDeleteHost, true, false, currentCluster.SystemImages.Alpine, currentCluster.DockerDialerFactory, currentCluster.PrivateRegistriesMap); err != nil {
log.Warnf(ctx, "[reconcile] Couldn't clean up worker node [%s]: %v", toDeleteHost.Address, err) log.Warnf(ctx, "[reconcile] Couldn't clean up worker node [%s]: %v", toDeleteHost.Address, err)
continue continue
} }
@@ -97,7 +98,7 @@ func reconcileControl(ctx context.Context, currentCluster, kubeCluster *Cluster,
return fmt.Errorf("Failed to delete controlplane node %s from cluster", toDeleteHost.Address) return fmt.Errorf("Failed to delete controlplane node %s from cluster", toDeleteHost.Address)
} }
// attempting to clean services/files on the host // attempting to clean services/files on the host
if err := reconcileHost(ctx, toDeleteHost, false, false, currentCluster.SystemImages.Alpine, currentCluster.DockerDialerFactory); err != nil { if err := reconcileHost(ctx, toDeleteHost, false, false, currentCluster.SystemImages.Alpine, currentCluster.DockerDialerFactory, currentCluster.PrivateRegistriesMap); err != nil {
log.Warnf(ctx, "[reconcile] Couldn't clean up controlplane node [%s]: %v", toDeleteHost.Address, err) log.Warnf(ctx, "[reconcile] Couldn't clean up controlplane node [%s]: %v", toDeleteHost.Address, err)
continue continue
} }
@@ -110,7 +111,7 @@ func reconcileControl(ctx context.Context, currentCluster, kubeCluster *Cluster,
cpChanged := hosts.IsHostListChanged(currentCluster.ControlPlaneHosts, kubeCluster.ControlPlaneHosts) cpChanged := hosts.IsHostListChanged(currentCluster.ControlPlaneHosts, kubeCluster.ControlPlaneHosts)
if cpChanged { if cpChanged {
log.Infof(ctx, "[reconcile] Rolling update nginx hosts with new list of control plane hosts") log.Infof(ctx, "[reconcile] Rolling update nginx hosts with new list of control plane hosts")
err := services.RollingUpdateNginxProxy(ctx, kubeCluster.ControlPlaneHosts, kubeCluster.WorkerHosts, currentCluster.SystemImages.NginxProxy) err := services.RollingUpdateNginxProxy(ctx, kubeCluster.ControlPlaneHosts, kubeCluster.WorkerHosts, currentCluster.SystemImages.NginxProxy, kubeCluster.PrivateRegistriesMap)
if err != nil { if err != nil {
return fmt.Errorf("Failed to rolling update Nginx hosts with new control plane hosts") return fmt.Errorf("Failed to rolling update Nginx hosts with new control plane hosts")
} }
@@ -127,7 +128,7 @@ func reconcileControl(ctx context.Context, currentCluster, kubeCluster *Cluster,
return nil return nil
} }
func reconcileHost(ctx context.Context, toDeleteHost *hosts.Host, worker, etcd bool, cleanerImage string, dialerFactory hosts.DialerFactory) error { func reconcileHost(ctx context.Context, toDeleteHost *hosts.Host, worker, etcd bool, cleanerImage string, dialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
if err := toDeleteHost.TunnelUp(ctx, dialerFactory); err != nil { if err := toDeleteHost.TunnelUp(ctx, dialerFactory); err != nil {
return fmt.Errorf("Not able to reach the host: %v", err) return fmt.Errorf("Not able to reach the host: %v", err)
} }
@@ -135,21 +136,21 @@ func reconcileHost(ctx context.Context, toDeleteHost *hosts.Host, worker, etcd b
if err := services.RemoveWorkerPlane(ctx, []*hosts.Host{toDeleteHost}, false); err != nil { if err := services.RemoveWorkerPlane(ctx, []*hosts.Host{toDeleteHost}, false); err != nil {
return fmt.Errorf("Couldn't remove worker plane: %v", err) return fmt.Errorf("Couldn't remove worker plane: %v", err)
} }
if err := toDeleteHost.CleanUpWorkerHost(ctx, cleanerImage); err != nil { if err := toDeleteHost.CleanUpWorkerHost(ctx, cleanerImage, prsMap); err != nil {
return fmt.Errorf("Not able to clean the host: %v", err) return fmt.Errorf("Not able to clean the host: %v", err)
} }
} else if etcd { } else if etcd {
if err := services.RemoveEtcdPlane(ctx, []*hosts.Host{toDeleteHost}, false); err != nil { if err := services.RemoveEtcdPlane(ctx, []*hosts.Host{toDeleteHost}, false); err != nil {
return fmt.Errorf("Couldn't remove etcd plane: %v", err) return fmt.Errorf("Couldn't remove etcd plane: %v", err)
} }
if err := toDeleteHost.CleanUpEtcdHost(ctx, cleanerImage); err != nil { if err := toDeleteHost.CleanUpEtcdHost(ctx, cleanerImage, prsMap); err != nil {
return fmt.Errorf("Not able to clean the host: %v", err) return fmt.Errorf("Not able to clean the host: %v", err)
} }
} else { } else {
if err := services.RemoveControlPlane(ctx, []*hosts.Host{toDeleteHost}, false); err != nil { if err := services.RemoveControlPlane(ctx, []*hosts.Host{toDeleteHost}, false); err != nil {
return fmt.Errorf("Couldn't remove control plane: %v", err) return fmt.Errorf("Couldn't remove control plane: %v", err)
} }
if err := toDeleteHost.CleanUpControlHost(ctx, cleanerImage); err != nil { if err := toDeleteHost.CleanUpControlHost(ctx, cleanerImage, prsMap); err != nil {
return fmt.Errorf("Not able to clean the host: %v", err) return fmt.Errorf("Not able to clean the host: %v", err)
} }
} }
@@ -173,7 +174,7 @@ func reconcileEtcd(ctx context.Context, currentCluster, kubeCluster *Cluster, ku
continue continue
} }
// attempting to clean services/files on the host // attempting to clean services/files on the host
if err := reconcileHost(ctx, etcdHost, false, true, currentCluster.SystemImages.Alpine, currentCluster.DockerDialerFactory); err != nil { if err := reconcileHost(ctx, etcdHost, false, true, currentCluster.SystemImages.Alpine, currentCluster.DockerDialerFactory, currentCluster.PrivateRegistriesMap); err != nil {
log.Warnf(ctx, "[reconcile] Couldn't clean up etcd node [%s]: %v", etcdHost.Address, err) log.Warnf(ctx, "[reconcile] Couldn't clean up etcd node [%s]: %v", etcdHost.Address, err)
continue continue
} }
@@ -199,7 +200,7 @@ func reconcileEtcd(ctx context.Context, currentCluster, kubeCluster *Cluster, ku
currentCluster.Certificates = crtMap currentCluster.Certificates = crtMap
for _, etcdHost := range etcdToAdd { for _, etcdHost := range etcdToAdd {
// deploy certificates on new etcd host // deploy certificates on new etcd host
if err := pki.DeployCertificatesOnHost(ctx, kubeCluster.EtcdHosts, etcdHost, currentCluster.Certificates, kubeCluster.SystemImages.CertDownloader, pki.CertPathPrefix); err != nil { if err := pki.DeployCertificatesOnHost(ctx, kubeCluster.EtcdHosts, etcdHost, currentCluster.Certificates, kubeCluster.SystemImages.CertDownloader, pki.CertPathPrefix, kubeCluster.PrivateRegistriesMap); err != nil {
return err return err
} }
@@ -207,7 +208,7 @@ func reconcileEtcd(ctx context.Context, currentCluster, kubeCluster *Cluster, ku
return err return err
} }
etcdHost.ToAddEtcdMember = false etcdHost.ToAddEtcdMember = false
if err := services.ReloadEtcdCluster(ctx, kubeCluster.EtcdHosts, kubeCluster.Services.Etcd, currentCluster.LocalConnDialerFactory, clientCert, clientkey); err != nil { if err := services.ReloadEtcdCluster(ctx, kubeCluster.EtcdHosts, kubeCluster.Services.Etcd, currentCluster.LocalConnDialerFactory, clientCert, clientkey, currentCluster.PrivateRegistriesMap); err != nil {
return err return err
} }
} }

View File

@@ -6,6 +6,7 @@ import (
"github.com/rancher/rke/hosts" "github.com/rancher/rke/hosts"
"github.com/rancher/rke/pki" "github.com/rancher/rke/pki"
"github.com/rancher/rke/services" "github.com/rancher/rke/services"
"github.com/rancher/types/apis/management.cattle.io/v3"
) )
func (c *Cluster) ClusterRemove(ctx context.Context) error { func (c *Cluster) ClusterRemove(ctx context.Context) error {
@@ -25,7 +26,7 @@ func (c *Cluster) ClusterRemove(ctx context.Context) error {
} }
// Clean up all hosts // Clean up all hosts
if err := cleanUpHosts(ctx, c.ControlPlaneHosts, c.WorkerHosts, c.EtcdHosts, c.SystemImages.Alpine); err != nil { if err := cleanUpHosts(ctx, c.ControlPlaneHosts, c.WorkerHosts, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap); err != nil {
return err return err
} }
@@ -33,14 +34,14 @@ func (c *Cluster) ClusterRemove(ctx context.Context) error {
return nil return nil
} }
func cleanUpHosts(ctx context.Context, cpHosts, workerHosts, etcdHosts []*hosts.Host, cleanerImage string) error { func cleanUpHosts(ctx context.Context, cpHosts, workerHosts, etcdHosts []*hosts.Host, cleanerImage string, prsMap map[string]v3.PrivateRegistry) error {
allHosts := []*hosts.Host{} allHosts := []*hosts.Host{}
allHosts = append(allHosts, cpHosts...) allHosts = append(allHosts, cpHosts...)
allHosts = append(allHosts, workerHosts...) allHosts = append(allHosts, workerHosts...)
allHosts = append(allHosts, etcdHosts...) allHosts = append(allHosts, etcdHosts...)
for _, host := range allHosts { for _, host := range allHosts {
if err := host.CleanUpAll(ctx, cleanerImage); err != nil { if err := host.CleanUpAll(ctx, cleanerImage, prsMap); err != nil {
return err return err
} }
} }

View File

@@ -37,7 +37,7 @@ const (
CleanerContainerName = "kube-cleaner" CleanerContainerName = "kube-cleaner"
) )
func (h *Host) CleanUpAll(ctx context.Context, cleanerImage string) error { func (h *Host) CleanUpAll(ctx context.Context, cleanerImage string, prsMap map[string]v3.PrivateRegistry) error {
log.Infof(ctx, "[hosts] Cleaning up host [%s]", h.Address) log.Infof(ctx, "[hosts] Cleaning up host [%s]", h.Address)
toCleanPaths := []string{ toCleanPaths := []string{
ToCleanEtcdDir, ToCleanEtcdDir,
@@ -47,10 +47,10 @@ func (h *Host) CleanUpAll(ctx context.Context, cleanerImage string) error {
ToCleanCalicoRun, ToCleanCalicoRun,
ToCleanTempCertPath, ToCleanTempCertPath,
} }
return h.CleanUp(ctx, toCleanPaths, cleanerImage) return h.CleanUp(ctx, toCleanPaths, cleanerImage, prsMap)
} }
func (h *Host) CleanUpWorkerHost(ctx context.Context, cleanerImage string) error { func (h *Host) CleanUpWorkerHost(ctx context.Context, cleanerImage string, prsMap map[string]v3.PrivateRegistry) error {
if h.IsControl { if h.IsControl {
log.Infof(ctx, "[hosts] Host [%s] is already a controlplane host, skipping cleanup.", h.Address) log.Infof(ctx, "[hosts] Host [%s] is already a controlplane host, skipping cleanup.", h.Address)
return nil return nil
@@ -61,10 +61,10 @@ func (h *Host) CleanUpWorkerHost(ctx context.Context, cleanerImage string) error
ToCleanCNIBin, ToCleanCNIBin,
ToCleanCalicoRun, ToCleanCalicoRun,
} }
return h.CleanUp(ctx, toCleanPaths, cleanerImage) return h.CleanUp(ctx, toCleanPaths, cleanerImage, prsMap)
} }
func (h *Host) CleanUpControlHost(ctx context.Context, cleanerImage string) error { func (h *Host) CleanUpControlHost(ctx context.Context, cleanerImage string, prsMap map[string]v3.PrivateRegistry) error {
if h.IsWorker { if h.IsWorker {
log.Infof(ctx, "[hosts] Host [%s] is already a worker host, skipping cleanup.", h.Address) log.Infof(ctx, "[hosts] Host [%s] is already a worker host, skipping cleanup.", h.Address)
return nil return nil
@@ -75,10 +75,10 @@ func (h *Host) CleanUpControlHost(ctx context.Context, cleanerImage string) erro
ToCleanCNIBin, ToCleanCNIBin,
ToCleanCalicoRun, ToCleanCalicoRun,
} }
return h.CleanUp(ctx, toCleanPaths, cleanerImage) return h.CleanUp(ctx, toCleanPaths, cleanerImage, prsMap)
} }
func (h *Host) CleanUpEtcdHost(ctx context.Context, cleanerImage string) error { func (h *Host) CleanUpEtcdHost(ctx context.Context, cleanerImage string, prsMap map[string]v3.PrivateRegistry) error {
toCleanPaths := []string{ toCleanPaths := []string{
ToCleanEtcdDir, ToCleanEtcdDir,
ToCleanSSLDir, ToCleanSSLDir,
@@ -89,14 +89,14 @@ func (h *Host) CleanUpEtcdHost(ctx context.Context, cleanerImage string) error {
ToCleanEtcdDir, ToCleanEtcdDir,
} }
} }
return h.CleanUp(ctx, toCleanPaths, cleanerImage) return h.CleanUp(ctx, toCleanPaths, cleanerImage, prsMap)
} }
func (h *Host) CleanUp(ctx context.Context, toCleanPaths []string, cleanerImage string) error { func (h *Host) CleanUp(ctx context.Context, toCleanPaths []string, cleanerImage string, prsMap map[string]v3.PrivateRegistry) error {
log.Infof(ctx, "[hosts] Cleaning up host [%s]", h.Address) log.Infof(ctx, "[hosts] Cleaning up host [%s]", h.Address)
imageCfg, hostCfg := buildCleanerConfig(h, toCleanPaths, cleanerImage) imageCfg, hostCfg := buildCleanerConfig(h, toCleanPaths, cleanerImage)
log.Infof(ctx, "[hosts] Running cleaner container on host [%s]", h.Address) log.Infof(ctx, "[hosts] Running cleaner container on host [%s]", h.Address)
if err := docker.DoRunContainer(ctx, h.DClient, imageCfg, hostCfg, CleanerContainerName, h.Address, CleanerContainerName); err != nil { if err := docker.DoRunContainer(ctx, h.DClient, imageCfg, hostCfg, CleanerContainerName, h.Address, CleanerContainerName, prsMap); err != nil {
return err return err
} }

View File

@@ -15,11 +15,12 @@ import (
"github.com/rancher/rke/docker" "github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts" "github.com/rancher/rke/hosts"
"github.com/rancher/rke/log" "github.com/rancher/rke/log"
"github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"k8s.io/client-go/util/cert" "k8s.io/client-go/util/cert"
) )
func DeployCertificatesOnMasters(ctx context.Context, cpHosts []*hosts.Host, crtMap map[string]CertificatePKI, certDownloaderImage string) error { func DeployCertificatesOnMasters(ctx context.Context, cpHosts []*hosts.Host, crtMap map[string]CertificatePKI, certDownloaderImage string, prsMap map[string]v3.PrivateRegistry) error {
// list of certificates that should be deployed on the masters // list of certificates that should be deployed on the masters
crtList := []string{ crtList := []string{
CACertName, CACertName,
@@ -36,7 +37,7 @@ func DeployCertificatesOnMasters(ctx context.Context, cpHosts []*hosts.Host, crt
} }
for i := range cpHosts { for i := range cpHosts {
err := doRunDeployer(ctx, cpHosts[i], env, certDownloaderImage) err := doRunDeployer(ctx, cpHosts[i], env, certDownloaderImage, prsMap)
if err != nil { if err != nil {
return err return err
} }
@@ -44,7 +45,7 @@ func DeployCertificatesOnMasters(ctx context.Context, cpHosts []*hosts.Host, crt
return nil return nil
} }
func DeployCertificatesOnWorkers(ctx context.Context, workerHosts []*hosts.Host, crtMap map[string]CertificatePKI, certDownloaderImage string) error { func DeployCertificatesOnWorkers(ctx context.Context, workerHosts []*hosts.Host, crtMap map[string]CertificatePKI, certDownloaderImage string, prsMap map[string]v3.PrivateRegistry) error {
// list of certificates that should be deployed on the workers // list of certificates that should be deployed on the workers
crtList := []string{ crtList := []string{
CACertName, CACertName,
@@ -58,7 +59,7 @@ func DeployCertificatesOnWorkers(ctx context.Context, workerHosts []*hosts.Host,
} }
for i := range workerHosts { for i := range workerHosts {
err := doRunDeployer(ctx, workerHosts[i], env, certDownloaderImage) err := doRunDeployer(ctx, workerHosts[i], env, certDownloaderImage, prsMap)
if err != nil { if err != nil {
return err return err
} }
@@ -66,7 +67,7 @@ func DeployCertificatesOnWorkers(ctx context.Context, workerHosts []*hosts.Host,
return nil return nil
} }
func DeployCertificatesOnEtcd(ctx context.Context, etcdHosts []*hosts.Host, crtMap map[string]CertificatePKI, certDownloaderImage string) error { func DeployCertificatesOnEtcd(ctx context.Context, etcdHosts []*hosts.Host, crtMap map[string]CertificatePKI, certDownloaderImage string, prsMap map[string]v3.PrivateRegistry) error {
// list of certificates that should be deployed on the etcd // list of certificates that should be deployed on the etcd
crtList := []string{ crtList := []string{
CACertName, CACertName,
@@ -83,7 +84,7 @@ func DeployCertificatesOnEtcd(ctx context.Context, etcdHosts []*hosts.Host, crtM
} }
for i := range etcdHosts { for i := range etcdHosts {
err := doRunDeployer(ctx, etcdHosts[i], env, certDownloaderImage) err := doRunDeployer(ctx, etcdHosts[i], env, certDownloaderImage, prsMap)
if err != nil { if err != nil {
return err return err
} }
@@ -91,7 +92,7 @@ func DeployCertificatesOnEtcd(ctx context.Context, etcdHosts []*hosts.Host, crtM
return nil return nil
} }
func doRunDeployer(ctx context.Context, host *hosts.Host, containerEnv []string, certDownloaderImage string) error { func doRunDeployer(ctx context.Context, host *hosts.Host, containerEnv []string, certDownloaderImage string, prsMap map[string]v3.PrivateRegistry) error {
// remove existing container. Only way it's still here is if previous deployment failed // remove existing container. Only way it's still here is if previous deployment failed
isRunning := false isRunning := false
isRunning, err := docker.IsContainerRunning(ctx, host.DClient, host.Address, CrtDownloaderContainer, true) isRunning, err := docker.IsContainerRunning(ctx, host.DClient, host.Address, CrtDownloaderContainer, true)
@@ -103,7 +104,7 @@ func doRunDeployer(ctx context.Context, host *hosts.Host, containerEnv []string,
return err return err
} }
} }
if err := docker.UseLocalOrPull(ctx, host.DClient, host.Address, certDownloaderImage, CertificatesServiceName); err != nil { if err := docker.UseLocalOrPull(ctx, host.DClient, host.Address, certDownloaderImage, CertificatesServiceName, prsMap); err != nil {
return err return err
} }
imageCfg := &container.Config{ imageCfg := &container.Config{
@@ -160,7 +161,7 @@ func RemoveAdminConfig(ctx context.Context, localConfigPath string) {
log.Infof(ctx, "Local admin Kubeconfig removed successfully") log.Infof(ctx, "Local admin Kubeconfig removed successfully")
} }
func DeployCertificatesOnHost(ctx context.Context, extraHosts []*hosts.Host, host *hosts.Host, crtMap map[string]CertificatePKI, certDownloaderImage, certPath string) error { func DeployCertificatesOnHost(ctx context.Context, extraHosts []*hosts.Host, host *hosts.Host, crtMap map[string]CertificatePKI, certDownloaderImage, certPath string, prsMap map[string]v3.PrivateRegistry) error {
crtList := []string{ crtList := []string{
CACertName, CACertName,
KubeAPICertName, KubeAPICertName,
@@ -182,10 +183,10 @@ func DeployCertificatesOnHost(ctx context.Context, extraHosts []*hosts.Host, hos
// We don't need to edit the cert paths, they are not used in deployment // We don't need to edit the cert paths, they are not used in deployment
env = append(env, c.ToEnv()...) env = append(env, c.ToEnv()...)
} }
return doRunDeployer(ctx, host, env, certDownloaderImage) return doRunDeployer(ctx, host, env, certDownloaderImage, prsMap)
} }
func FetchCertificatesFromHost(ctx context.Context, extraHosts []*hosts.Host, host *hosts.Host, image, localConfigPath string) (map[string]CertificatePKI, error) { func FetchCertificatesFromHost(ctx context.Context, extraHosts []*hosts.Host, host *hosts.Host, image, localConfigPath string, prsMap map[string]v3.PrivateRegistry) (map[string]CertificatePKI, error) {
// rebuilding the certificates. This should look better after refactoring pki // rebuilding the certificates. This should look better after refactoring pki
tmpCerts := make(map[string]CertificatePKI) tmpCerts := make(map[string]CertificatePKI)
@@ -205,7 +206,7 @@ func FetchCertificatesFromHost(ctx context.Context, extraHosts []*hosts.Host, ho
for certName, config := range crtList { for certName, config := range crtList {
certificate := CertificatePKI{} certificate := CertificatePKI{}
crt, err := fetchFileFromHost(ctx, GetCertTempPath(certName), image, host) crt, err := fetchFileFromHost(ctx, GetCertTempPath(certName), image, host, prsMap)
if err != nil { if err != nil {
if strings.Contains(err.Error(), "no such file or directory") || if strings.Contains(err.Error(), "no such file or directory") ||
strings.Contains(err.Error(), "Could not find the file") { strings.Contains(err.Error(), "Could not find the file") {
@@ -213,10 +214,10 @@ func FetchCertificatesFromHost(ctx context.Context, extraHosts []*hosts.Host, ho
} }
return nil, err return nil, err
} }
key, err := fetchFileFromHost(ctx, GetKeyTempPath(certName), image, host) key, err := fetchFileFromHost(ctx, GetKeyTempPath(certName), image, host, prsMap)
if config { if config {
config, err := fetchFileFromHost(ctx, GetConfigTempPath(certName), image, host) config, err := fetchFileFromHost(ctx, GetConfigTempPath(certName), image, host, prsMap)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -243,7 +244,7 @@ func FetchCertificatesFromHost(ctx context.Context, extraHosts []*hosts.Host, ho
} }
func fetchFileFromHost(ctx context.Context, filePath, image string, host *hosts.Host) (string, error) { func fetchFileFromHost(ctx context.Context, filePath, image string, host *hosts.Host, prsMap map[string]v3.PrivateRegistry) (string, error) {
imageCfg := &container.Config{ imageCfg := &container.Config{
Image: image, Image: image,
@@ -259,7 +260,7 @@ func fetchFileFromHost(ctx context.Context, filePath, image string, host *hosts.
return "", err return "", err
} }
if !isRunning { if !isRunning {
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, CertFetcherContainer, host.Address, "certificates"); err != nil { if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, CertFetcherContainer, host.Address, "certificates", prsMap); err != nil {
return "", err return "", err
} }
} }

View File

@@ -8,7 +8,7 @@ import (
"github.com/rancher/types/apis/management.cattle.io/v3" "github.com/rancher/types/apis/management.cattle.io/v3"
) )
func RunControlPlane(ctx context.Context, controlHosts, etcdHosts []*hosts.Host, controlServices v3.RKEConfigServices, sidekickImage, authorizationMode string, localConnDialerFactory hosts.DialerFactory) error { func RunControlPlane(ctx context.Context, controlHosts, etcdHosts []*hosts.Host, controlServices v3.RKEConfigServices, sidekickImage, authorizationMode string, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
log.Infof(ctx, "[%s] Building up Controller Plane..", ControlRole) log.Infof(ctx, "[%s] Building up Controller Plane..", ControlRole)
for _, host := range controlHosts { for _, host := range controlHosts {
@@ -18,21 +18,21 @@ func RunControlPlane(ctx context.Context, controlHosts, etcdHosts []*hosts.Host,
} }
} }
// run sidekick // run sidekick
if err := runSidekick(ctx, host, sidekickImage); err != nil { if err := runSidekick(ctx, host, sidekickImage, prsMap); err != nil {
return err return err
} }
// run kubeapi // run kubeapi
err := runKubeAPI(ctx, host, etcdHosts, controlServices.KubeAPI, authorizationMode, localConnDialerFactory) err := runKubeAPI(ctx, host, etcdHosts, controlServices.KubeAPI, authorizationMode, localConnDialerFactory, prsMap)
if err != nil { if err != nil {
return err return err
} }
// run kubecontroller // run kubecontroller
err = runKubeController(ctx, host, controlServices.KubeController, authorizationMode, localConnDialerFactory) err = runKubeController(ctx, host, controlServices.KubeController, authorizationMode, localConnDialerFactory, prsMap)
if err != nil { if err != nil {
return err return err
} }
// run scheduler // run scheduler
err = runScheduler(ctx, host, controlServices.Scheduler, localConnDialerFactory) err = runScheduler(ctx, host, controlServices.Scheduler, localConnDialerFactory, prsMap)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -16,13 +16,14 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
func RunEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, etcdService v3.ETCDService, localConnDialerFactory hosts.DialerFactory) error { func RunEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, etcdService v3.ETCDService, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
log.Infof(ctx, "[%s] Building up Etcd Plane..", ETCDRole) log.Infof(ctx, "[%s] Building up Etcd Plane..", ETCDRole)
initCluster := getEtcdInitialCluster(etcdHosts) initCluster := getEtcdInitialCluster(etcdHosts)
for _, host := range etcdHosts { for _, host := range etcdHosts {
nodeName := pki.GetEtcdCrtName(host.InternalAddress) nodeName := pki.GetEtcdCrtName(host.InternalAddress)
imageCfg, hostCfg := buildEtcdConfig(host, etcdService, initCluster, nodeName) imageCfg, hostCfg := buildEtcdConfig(host, etcdService, initCluster, nodeName)
err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, EtcdContainerName, host.Address, ETCDRole) err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, EtcdContainerName, host.Address, ETCDRole, prsMap)
if err != nil { if err != nil {
return err return err
} }
@@ -163,7 +164,7 @@ func RemoveEtcdMember(ctx context.Context, etcdHost *hosts.Host, etcdHosts []*ho
return nil return nil
} }
func ReloadEtcdCluster(ctx context.Context, etcdHosts []*hosts.Host, etcdService v3.ETCDService, localConnDialerFactory hosts.DialerFactory, cert, key []byte) error { func ReloadEtcdCluster(ctx context.Context, etcdHosts []*hosts.Host, etcdService v3.ETCDService, localConnDialerFactory hosts.DialerFactory, cert, key []byte, prsMap map[string]v3.PrivateRegistry) error {
readyEtcdHosts := []*hosts.Host{} readyEtcdHosts := []*hosts.Host{}
for _, host := range etcdHosts { for _, host := range etcdHosts {
if !host.ToAddEtcdMember { if !host.ToAddEtcdMember {
@@ -174,7 +175,7 @@ func ReloadEtcdCluster(ctx context.Context, etcdHosts []*hosts.Host, etcdService
initCluster := getEtcdInitialCluster(readyEtcdHosts) initCluster := getEtcdInitialCluster(readyEtcdHosts)
for _, host := range readyEtcdHosts { for _, host := range readyEtcdHosts {
imageCfg, hostCfg := buildEtcdConfig(host, etcdService, initCluster, pki.GetEtcdCrtName(host.InternalAddress)) imageCfg, hostCfg := buildEtcdConfig(host, etcdService, initCluster, pki.GetEtcdCrtName(host.InternalAddress))
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, EtcdContainerName, host.Address, ETCDRole); err != nil { if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, EtcdContainerName, host.Address, ETCDRole, prsMap); err != nil {
return err return err
} }
} }

View File

@@ -11,10 +11,10 @@ import (
"github.com/rancher/types/apis/management.cattle.io/v3" "github.com/rancher/types/apis/management.cattle.io/v3"
) )
func runKubeAPI(ctx context.Context, host *hosts.Host, etcdHosts []*hosts.Host, kubeAPIService v3.KubeAPIService, authorizationMode string, df hosts.DialerFactory) error { func runKubeAPI(ctx context.Context, host *hosts.Host, etcdHosts []*hosts.Host, kubeAPIService v3.KubeAPIService, authorizationMode string, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
etcdConnString := GetEtcdConnString(etcdHosts) etcdConnString := GetEtcdConnString(etcdHosts)
imageCfg, hostCfg := buildKubeAPIConfig(host, kubeAPIService, etcdConnString, authorizationMode) imageCfg, hostCfg := buildKubeAPIConfig(host, kubeAPIService, etcdConnString, authorizationMode)
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeAPIContainerName, host.Address, ControlRole); err != nil { if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeAPIContainerName, host.Address, ControlRole, prsMap); err != nil {
return err return err
} }
return runHealthcheck(ctx, host, KubeAPIPort, true, KubeAPIContainerName, df) return runHealthcheck(ctx, host, KubeAPIPort, true, KubeAPIContainerName, df)

View File

@@ -11,9 +11,9 @@ import (
"github.com/rancher/types/apis/management.cattle.io/v3" "github.com/rancher/types/apis/management.cattle.io/v3"
) )
func runKubeController(ctx context.Context, host *hosts.Host, kubeControllerService v3.KubeControllerService, authorizationMode string, df hosts.DialerFactory) error { func runKubeController(ctx context.Context, host *hosts.Host, kubeControllerService v3.KubeControllerService, authorizationMode string, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
imageCfg, hostCfg := buildKubeControllerConfig(kubeControllerService, authorizationMode) imageCfg, hostCfg := buildKubeControllerConfig(kubeControllerService, authorizationMode)
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeControllerContainerName, host.Address, ControlRole); err != nil { if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeControllerContainerName, host.Address, ControlRole, prsMap); err != nil {
return err return err
} }
return runHealthcheck(ctx, host, KubeControllerPort, false, KubeControllerContainerName, df) return runHealthcheck(ctx, host, KubeControllerPort, false, KubeControllerContainerName, df)

View File

@@ -11,9 +11,9 @@ import (
"github.com/rancher/types/apis/management.cattle.io/v3" "github.com/rancher/types/apis/management.cattle.io/v3"
) )
func runKubelet(ctx context.Context, host *hosts.Host, kubeletService v3.KubeletService, df hosts.DialerFactory, unschedulable bool) error { func runKubelet(ctx context.Context, host *hosts.Host, kubeletService v3.KubeletService, df hosts.DialerFactory, unschedulable bool, prsMap map[string]v3.PrivateRegistry) error {
imageCfg, hostCfg := buildKubeletConfig(host, kubeletService, unschedulable) imageCfg, hostCfg := buildKubeletConfig(host, kubeletService, unschedulable)
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeletContainerName, host.Address, WorkerRole); err != nil { if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeletContainerName, host.Address, WorkerRole, prsMap); err != nil {
return err return err
} }
return runHealthcheck(ctx, host, KubeletPort, true, KubeletContainerName, df) return runHealthcheck(ctx, host, KubeletPort, true, KubeletContainerName, df)

View File

@@ -11,9 +11,9 @@ import (
"github.com/rancher/types/apis/management.cattle.io/v3" "github.com/rancher/types/apis/management.cattle.io/v3"
) )
func runKubeproxy(ctx context.Context, host *hosts.Host, kubeproxyService v3.KubeproxyService, df hosts.DialerFactory) error { func runKubeproxy(ctx context.Context, host *hosts.Host, kubeproxyService v3.KubeproxyService, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
imageCfg, hostCfg := buildKubeproxyConfig(host, kubeproxyService) imageCfg, hostCfg := buildKubeproxyConfig(host, kubeproxyService)
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeproxyContainerName, host.Address, WorkerRole); err != nil { if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeproxyContainerName, host.Address, WorkerRole, prsMap); err != nil {
return err return err
} }
return runHealthcheck(ctx, host, KubeproxyPort, false, KubeproxyContainerName, df) return runHealthcheck(ctx, host, KubeproxyPort, false, KubeproxyContainerName, df)

View File

@@ -7,6 +7,7 @@ import (
"github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/container"
"github.com/rancher/rke/docker" "github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts" "github.com/rancher/rke/hosts"
"github.com/rancher/types/apis/management.cattle.io/v3"
) )
const ( const (
@@ -14,21 +15,21 @@ const (
NginxProxyEnvName = "CP_HOSTS" NginxProxyEnvName = "CP_HOSTS"
) )
func RollingUpdateNginxProxy(ctx context.Context, cpHosts []*hosts.Host, workerHosts []*hosts.Host, nginxProxyImage string) error { func RollingUpdateNginxProxy(ctx context.Context, cpHosts []*hosts.Host, workerHosts []*hosts.Host, nginxProxyImage string, prsMap map[string]v3.PrivateRegistry) error {
nginxProxyEnv := buildProxyEnv(cpHosts) nginxProxyEnv := buildProxyEnv(cpHosts)
for _, host := range workerHosts { for _, host := range workerHosts {
imageCfg, hostCfg := buildNginxProxyConfig(host, nginxProxyEnv, nginxProxyImage) imageCfg, hostCfg := buildNginxProxyConfig(host, nginxProxyEnv, nginxProxyImage)
if err := docker.DoRollingUpdateContainer(ctx, host.DClient, imageCfg, hostCfg, NginxProxyContainerName, host.Address, WorkerRole); err != nil { if err := docker.DoRollingUpdateContainer(ctx, host.DClient, imageCfg, hostCfg, NginxProxyContainerName, host.Address, WorkerRole, prsMap); err != nil {
return err return err
} }
} }
return nil return nil
} }
func runNginxProxy(ctx context.Context, host *hosts.Host, cpHosts []*hosts.Host, nginxProxyImage string) error { func runNginxProxy(ctx context.Context, host *hosts.Host, cpHosts []*hosts.Host, nginxProxyImage string, prsMap map[string]v3.PrivateRegistry) error {
nginxProxyEnv := buildProxyEnv(cpHosts) nginxProxyEnv := buildProxyEnv(cpHosts)
imageCfg, hostCfg := buildNginxProxyConfig(host, nginxProxyEnv, nginxProxyImage) imageCfg, hostCfg := buildNginxProxyConfig(host, nginxProxyEnv, nginxProxyImage)
return docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, NginxProxyContainerName, host.Address, WorkerRole) return docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, NginxProxyContainerName, host.Address, WorkerRole, prsMap)
} }
func removeNginxProxy(ctx context.Context, host *hosts.Host) error { func removeNginxProxy(ctx context.Context, host *hosts.Host) error {

View File

@@ -11,9 +11,9 @@ import (
"github.com/rancher/types/apis/management.cattle.io/v3" "github.com/rancher/types/apis/management.cattle.io/v3"
) )
func runScheduler(ctx context.Context, host *hosts.Host, schedulerService v3.SchedulerService, df hosts.DialerFactory) error { func runScheduler(ctx context.Context, host *hosts.Host, schedulerService v3.SchedulerService, df hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
imageCfg, hostCfg := buildSchedulerConfig(host, schedulerService) imageCfg, hostCfg := buildSchedulerConfig(host, schedulerService)
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, SchedulerContainerName, host.Address, ControlRole); err != nil { if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, SchedulerContainerName, host.Address, ControlRole, prsMap); err != nil {
return err return err
} }
return runHealthcheck(ctx, host, SchedulerPort, false, SchedulerContainerName, df) return runHealthcheck(ctx, host, SchedulerPort, false, SchedulerContainerName, df)

View File

@@ -9,6 +9,7 @@ import (
"github.com/rancher/rke/docker" "github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts" "github.com/rancher/rke/hosts"
"github.com/rancher/rke/log" "github.com/rancher/rke/log"
"github.com/rancher/types/apis/management.cattle.io/v3"
) )
const ( const (
@@ -60,7 +61,7 @@ func buildSidekickConfig(sidekickImage string) (*container.Config, *container.Ho
return imageCfg, hostCfg return imageCfg, hostCfg
} }
func runSidekick(ctx context.Context, host *hosts.Host, sidekickImage string) error { func runSidekick(ctx context.Context, host *hosts.Host, sidekickImage string, prsMap map[string]v3.PrivateRegistry) error {
isRunning, err := docker.IsContainerRunning(ctx, host.DClient, host.Address, SidekickContainerName, true) isRunning, err := docker.IsContainerRunning(ctx, host.DClient, host.Address, SidekickContainerName, true)
if err != nil { if err != nil {
return err return err
@@ -70,7 +71,7 @@ func runSidekick(ctx context.Context, host *hosts.Host, sidekickImage string) er
return nil return nil
} }
imageCfg, hostCfg := buildSidekickConfig(sidekickImage) imageCfg, hostCfg := buildSidekickConfig(sidekickImage)
if err := docker.UseLocalOrPull(ctx, host.DClient, host.Address, sidekickImage, SidekickServiceName); err != nil { if err := docker.UseLocalOrPull(ctx, host.DClient, host.Address, sidekickImage, SidekickServiceName, prsMap); err != nil {
return err return err
} }
if _, err := docker.CreateContiner(ctx, host.DClient, host.Address, SidekickContainerName, imageCfg, hostCfg); err != nil { if _, err := docker.CreateContiner(ctx, host.DClient, host.Address, SidekickContainerName, imageCfg, hostCfg); err != nil {

View File

@@ -9,7 +9,7 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
func RunWorkerPlane(ctx context.Context, controlHosts, workerHosts, etcdHosts []*hosts.Host, workerServices v3.RKEConfigServices, nginxProxyImage, sidekickImage string, localConnDialerFactory hosts.DialerFactory) error { func RunWorkerPlane(ctx context.Context, controlHosts, workerHosts, etcdHosts []*hosts.Host, workerServices v3.RKEConfigServices, nginxProxyImage, sidekickImage string, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry) error {
log.Infof(ctx, "[%s] Building up Worker Plane..", WorkerRole) log.Infof(ctx, "[%s] Building up Worker Plane..", WorkerRole)
var errgrp errgroup.Group var errgrp errgroup.Group
@@ -17,7 +17,7 @@ func RunWorkerPlane(ctx context.Context, controlHosts, workerHosts, etcdHosts []
for _, host := range etcdHosts { for _, host := range etcdHosts {
etcdHost := host etcdHost := host
errgrp.Go(func() error { errgrp.Go(func() error {
return doDeployWorkerPlane(ctx, etcdHost, workerServices, nginxProxyImage, sidekickImage, localConnDialerFactory, controlHosts, true) return doDeployWorkerPlane(ctx, etcdHost, workerServices, nginxProxyImage, sidekickImage, localConnDialerFactory, controlHosts, true, prsMap)
}) })
} }
if err := errgrp.Wait(); err != nil { if err := errgrp.Wait(); err != nil {
@@ -28,7 +28,7 @@ func RunWorkerPlane(ctx context.Context, controlHosts, workerHosts, etcdHosts []
for _, host := range controlHosts { for _, host := range controlHosts {
controlHost := host controlHost := host
errgrp.Go(func() error { errgrp.Go(func() error {
return doDeployWorkerPlane(ctx, controlHost, workerServices, nginxProxyImage, sidekickImage, localConnDialerFactory, controlHosts, false) return doDeployWorkerPlane(ctx, controlHost, workerServices, nginxProxyImage, sidekickImage, localConnDialerFactory, controlHosts, false, prsMap)
}) })
} }
if err := errgrp.Wait(); err != nil { if err := errgrp.Wait(); err != nil {
@@ -38,7 +38,7 @@ func RunWorkerPlane(ctx context.Context, controlHosts, workerHosts, etcdHosts []
for _, host := range workerHosts { for _, host := range workerHosts {
workerHost := host workerHost := host
errgrp.Go(func() error { errgrp.Go(func() error {
return doDeployWorkerPlane(ctx, workerHost, workerServices, nginxProxyImage, sidekickImage, localConnDialerFactory, controlHosts, false) return doDeployWorkerPlane(ctx, workerHost, workerServices, nginxProxyImage, sidekickImage, localConnDialerFactory, controlHosts, false, prsMap)
}) })
} }
if err := errgrp.Wait(); err != nil { if err := errgrp.Wait(); err != nil {
@@ -80,7 +80,7 @@ func doDeployWorkerPlane(ctx context.Context, host *hosts.Host,
nginxProxyImage, sidekickImage string, nginxProxyImage, sidekickImage string,
localConnDialerFactory hosts.DialerFactory, localConnDialerFactory hosts.DialerFactory,
controlHosts []*hosts.Host, controlHosts []*hosts.Host,
unschedulable bool) error { unschedulable bool, prsMap map[string]v3.PrivateRegistry) error {
// skipping deploying unschedulable kubelet on etcd node // skipping deploying unschedulable kubelet on etcd node
if unschedulable && host.IsWorker { if unschedulable && host.IsWorker {
@@ -92,17 +92,17 @@ func doDeployWorkerPlane(ctx context.Context, host *hosts.Host,
} }
// run nginx proxy // run nginx proxy
if !host.IsControl { if !host.IsControl {
if err := runNginxProxy(ctx, host, controlHosts, nginxProxyImage); err != nil { if err := runNginxProxy(ctx, host, controlHosts, nginxProxyImage, prsMap); err != nil {
return err return err
} }
} }
// run sidekick // run sidekick
if err := runSidekick(ctx, host, sidekickImage); err != nil { if err := runSidekick(ctx, host, sidekickImage, prsMap); err != nil {
return err return err
} }
// run kubelet // run kubelet
if err := runKubelet(ctx, host, workerServices.Kubelet, localConnDialerFactory, unschedulable); err != nil { if err := runKubelet(ctx, host, workerServices.Kubelet, localConnDialerFactory, unschedulable, prsMap); err != nil {
return err return err
} }
return runKubeproxy(ctx, host, workerServices.Kubeproxy, localConnDialerFactory) return runKubeproxy(ctx, host, workerServices.Kubeproxy, localConnDialerFactory, prsMap)
} }