1
0
mirror of https://github.com/rancher/rke.git synced 2025-09-03 16:04:26 +00:00

Add context.Context to everything and also make logging pluggable

This commit is contained in:
Darren Shepherd
2018-01-09 15:10:56 -07:00
parent 6dbb6cba1d
commit d8dd29765f
30 changed files with 448 additions and 365 deletions

View File

@@ -1,13 +1,15 @@
package authz
import (
"context"
"github.com/rancher/rke/k8s"
"github.com/rancher/rke/log"
"github.com/rancher/rke/templates"
"github.com/sirupsen/logrus"
)
func ApplyJobDeployerServiceAccount(kubeConfigPath string) error {
logrus.Infof("[authz] Creating rke-job-deployer ServiceAccount")
func ApplyJobDeployerServiceAccount(ctx context.Context, kubeConfigPath string) error {
log.Infof(ctx, "[authz] Creating rke-job-deployer ServiceAccount")
k8sClient, err := k8s.NewClient(kubeConfigPath)
if err != nil {
return err
@@ -18,12 +20,12 @@ func ApplyJobDeployerServiceAccount(kubeConfigPath string) error {
if err := k8s.UpdateServiceAccountFromYaml(k8sClient, templates.JobDeployerServiceAccount); err != nil {
return err
}
logrus.Infof("[authz] rke-job-deployer ServiceAccount created successfully")
log.Infof(ctx, "[authz] rke-job-deployer ServiceAccount created successfully")
return nil
}
func ApplySystemNodeClusterRoleBinding(kubeConfigPath string) error {
logrus.Infof("[authz] Creating system:node ClusterRoleBinding")
func ApplySystemNodeClusterRoleBinding(ctx context.Context, kubeConfigPath string) error {
log.Infof(ctx, "[authz] Creating system:node ClusterRoleBinding")
k8sClient, err := k8s.NewClient(kubeConfigPath)
if err != nil {
return err
@@ -31,6 +33,6 @@ func ApplySystemNodeClusterRoleBinding(kubeConfigPath string) error {
if err := k8s.UpdateClusterRoleBindingFromYaml(k8sClient, templates.SystemNodeClusterRoleBinding); err != nil {
return err
}
logrus.Infof("[authz] system:node ClusterRoleBinding created successfully")
log.Infof(ctx, "[authz] system:node ClusterRoleBinding created successfully")
return nil
}

View File

@@ -1,13 +1,15 @@
package authz
import (
"context"
"github.com/rancher/rke/k8s"
"github.com/rancher/rke/log"
"github.com/rancher/rke/templates"
"github.com/sirupsen/logrus"
)
func ApplyDefaultPodSecurityPolicy(kubeConfigPath string) error {
logrus.Infof("[authz] Applying default PodSecurityPolicy")
func ApplyDefaultPodSecurityPolicy(ctx context.Context, kubeConfigPath string) error {
log.Infof(ctx, "[authz] Applying default PodSecurityPolicy")
k8sClient, err := k8s.NewClient(kubeConfigPath)
if err != nil {
return err
@@ -15,12 +17,12 @@ func ApplyDefaultPodSecurityPolicy(kubeConfigPath string) error {
if err := k8s.UpdatePodSecurityPolicyFromYaml(k8sClient, templates.DefaultPodSecurityPolicy); err != nil {
return err
}
logrus.Infof("[authz] Default PodSecurityPolicy applied successfully")
log.Infof(ctx, "[authz] Default PodSecurityPolicy applied successfully")
return nil
}
func ApplyDefaultPodSecurityPolicyRole(kubeConfigPath string) error {
logrus.Infof("[authz] Applying default PodSecurityPolicy Role and RoleBinding")
func ApplyDefaultPodSecurityPolicyRole(ctx context.Context, kubeConfigPath string) error {
log.Infof(ctx, "[authz] Applying default PodSecurityPolicy Role and RoleBinding")
k8sClient, err := k8s.NewClient(kubeConfigPath)
if err != nil {
return err
@@ -31,6 +33,6 @@ func ApplyDefaultPodSecurityPolicyRole(kubeConfigPath string) error {
if err := k8s.UpdateRoleBindingFromYaml(k8sClient, templates.DefaultPodSecurityRoleBinding); err != nil {
return err
}
logrus.Infof("[authz] Default PodSecurityPolicy Role and RoleBinding applied successfully")
log.Infof(ctx, "[authz] Default PodSecurityPolicy Role and RoleBinding applied successfully")
return nil
}

View File

@@ -1,12 +1,13 @@
package cluster
import (
"context"
"fmt"
"time"
"github.com/rancher/rke/addons"
"github.com/rancher/rke/k8s"
"github.com/sirupsen/logrus"
"github.com/rancher/rke/log"
)
const (
@@ -14,28 +15,28 @@ const (
UserAddonResourceName = "rke-user-addon"
)
func (c *Cluster) DeployK8sAddOns() error {
err := c.deployKubeDNS()
func (c *Cluster) DeployK8sAddOns(ctx context.Context) error {
err := c.deployKubeDNS(ctx)
return err
}
func (c *Cluster) DeployUserAddOns() error {
logrus.Infof("[addons] Setting up user addons..")
func (c *Cluster) DeployUserAddOns(ctx context.Context) error {
log.Infof(ctx, "[addons] Setting up user addons..")
if c.Addons == "" {
logrus.Infof("[addons] No user addons configured..")
log.Infof(ctx, "[addons] No user addons configured..")
return nil
}
if err := c.doAddonDeploy(c.Addons, UserAddonResourceName); err != nil {
if err := c.doAddonDeploy(ctx, c.Addons, UserAddonResourceName); err != nil {
return err
}
logrus.Infof("[addons] User addon deployed successfully..")
log.Infof(ctx, "[addons] User addon deployed successfully..")
return nil
}
func (c *Cluster) deployKubeDNS() error {
logrus.Infof("[addons] Setting up KubeDNS")
func (c *Cluster) deployKubeDNS(ctx context.Context) error {
log.Infof(ctx, "[addons] Setting up KubeDNS")
kubeDNSConfig := map[string]string{
addons.KubeDNSServer: c.ClusterDNSServer,
addons.KubeDNSClusterDomain: c.ClusterDomain,
@@ -48,22 +49,22 @@ func (c *Cluster) deployKubeDNS() error {
if err != nil {
return err
}
if err := c.doAddonDeploy(kubeDNSYaml, KubeDNSAddonResourceName); err != nil {
if err := c.doAddonDeploy(ctx, kubeDNSYaml, KubeDNSAddonResourceName); err != nil {
return err
}
logrus.Infof("[addons] KubeDNS deployed successfully..")
log.Infof(ctx, "[addons] KubeDNS deployed successfully..")
return nil
}
func (c *Cluster) doAddonDeploy(addonYaml, resourceName string) error {
func (c *Cluster) doAddonDeploy(ctx context.Context, addonYaml, resourceName string) error {
err := c.StoreAddonConfigMap(addonYaml, resourceName)
err := c.StoreAddonConfigMap(ctx, addonYaml, resourceName)
if err != nil {
return fmt.Errorf("Failed to save addon ConfigMap: %v", err)
}
logrus.Infof("[addons] Executing deploy job..")
log.Infof(ctx, "[addons] Executing deploy job..")
addonJob, err := addons.GetAddonsExcuteJob(resourceName, c.ControlPlaneHosts[0].HostnameOverride, c.Services.KubeAPI.Image)
if err != nil {
@@ -76,8 +77,8 @@ func (c *Cluster) doAddonDeploy(addonYaml, resourceName string) error {
return nil
}
func (c *Cluster) StoreAddonConfigMap(addonYaml string, addonName string) error {
logrus.Infof("[addons] Saving addon ConfigMap to Kubernetes")
func (c *Cluster) StoreAddonConfigMap(ctx context.Context, addonYaml string, addonName string) error {
log.Infof(ctx, "[addons] Saving addon ConfigMap to Kubernetes")
kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath)
if err != nil {
return err
@@ -91,7 +92,7 @@ func (c *Cluster) StoreAddonConfigMap(addonYaml string, addonName string) error
fmt.Println(err)
continue
}
logrus.Infof("[addons] Successfully Saved addon to Kubernetes ConfigMap: %s", addonName)
log.Infof(ctx, "[addons] Successfully Saved addon to Kubernetes ConfigMap: %s", addonName)
timeout <- true
break
}

View File

@@ -1,24 +1,26 @@
package cluster
import (
"context"
"crypto/rsa"
"fmt"
"time"
"github.com/rancher/rke/k8s"
"github.com/rancher/rke/log"
"github.com/rancher/rke/pki"
"github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/cert"
)
func SetUpAuthentication(kubeCluster, currentCluster *Cluster) error {
func SetUpAuthentication(ctx context.Context, kubeCluster, currentCluster *Cluster) error {
if kubeCluster.Authentication.Strategy == X509AuthenticationProvider {
var err error
if currentCluster != nil {
kubeCluster.Certificates = currentCluster.Certificates
} else {
kubeCluster.Certificates, err = pki.StartCertificatesGeneration(
kubeCluster.Certificates, err = pki.StartCertificatesGeneration(ctx,
kubeCluster.ControlPlaneHosts,
kubeCluster.WorkerHosts,
kubeCluster.ClusterDomain,
@@ -53,8 +55,8 @@ func regenerateAPICertificate(c *Cluster, certificates map[string]pki.Certificat
return certificates, nil
}
func getClusterCerts(kubeClient *kubernetes.Clientset) (map[string]pki.CertificatePKI, error) {
logrus.Infof("[certificates] Getting Cluster certificates from Kubernetes")
func getClusterCerts(ctx context.Context, kubeClient *kubernetes.Clientset) (map[string]pki.CertificatePKI, error) {
log.Infof(ctx, "[certificates] Getting Cluster certificates from Kubernetes")
certificatesNames := []string{
pki.CACertName,
pki.KubeAPICertName,
@@ -82,19 +84,19 @@ func getClusterCerts(kubeClient *kubernetes.Clientset) (map[string]pki.Certifica
KeyEnvName: string(secret.Data["KeyEnvName"]),
}
}
logrus.Infof("[certificates] Successfully fetched Cluster certificates from Kubernetes")
log.Infof(ctx, "[certificates] Successfully fetched Cluster certificates from Kubernetes")
return certMap, nil
}
func saveClusterCerts(kubeClient *kubernetes.Clientset, crts map[string]pki.CertificatePKI) error {
logrus.Infof("[certificates] Save kubernetes certificates as secrets")
func saveClusterCerts(ctx context.Context, kubeClient *kubernetes.Clientset, crts map[string]pki.CertificatePKI) error {
log.Infof(ctx, "[certificates] Save kubernetes certificates as secrets")
for crtName, crt := range crts {
err := saveCertToKubernetes(kubeClient, crtName, crt)
if err != nil {
return fmt.Errorf("Failed to save certificate [%s] to kubernetes: %v", crtName, err)
}
}
logrus.Infof("[certificates] Successfully saved certificates as kubernetes secret [%s]", pki.CertificatesSecretName)
log.Infof(ctx, "[certificates] Successfully saved certificates as kubernetes secret [%s]", pki.CertificatesSecretName)
return nil
}

View File

@@ -1,6 +1,7 @@
package cluster
import (
"context"
"fmt"
"net"
"path/filepath"
@@ -8,6 +9,7 @@ import (
"github.com/rancher/rke/authz"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/log"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/services"
"github.com/rancher/types/apis/management.cattle.io/v3"
@@ -52,13 +54,13 @@ const (
NoneAuthorizationMode = "none"
)
func (c *Cluster) DeployControlPlane() error {
func (c *Cluster) DeployControlPlane(ctx context.Context) error {
// Deploy Etcd Plane
if err := services.RunEtcdPlane(c.EtcdHosts, c.Services.Etcd); err != nil {
if err := services.RunEtcdPlane(ctx, c.EtcdHosts, c.Services.Etcd); err != nil {
return fmt.Errorf("[etcd] Failed to bring up Etcd Plane: %v", err)
}
// Deploy Control plane
if err := services.RunControlPlane(c.ControlPlaneHosts,
if err := services.RunControlPlane(ctx, c.ControlPlaneHosts,
c.EtcdHosts,
c.Services,
c.SystemImages[ServiceSidekickImage],
@@ -67,15 +69,15 @@ func (c *Cluster) DeployControlPlane() error {
return fmt.Errorf("[controlPlane] Failed to bring up Control Plane: %v", err)
}
// Apply Authz configuration after deploying controlplane
if err := c.ApplyAuthzResources(); err != nil {
if err := c.ApplyAuthzResources(ctx); err != nil {
return fmt.Errorf("[auths] Failed to apply RBAC resources: %v", err)
}
return nil
}
func (c *Cluster) DeployWorkerPlane() error {
func (c *Cluster) DeployWorkerPlane(ctx context.Context) error {
// Deploy Worker Plane
if err := services.RunWorkerPlane(c.ControlPlaneHosts,
if err := services.RunWorkerPlane(ctx, c.ControlPlaneHosts,
c.WorkerHosts,
c.Services,
c.SystemImages[NginxProxyImage],
@@ -95,7 +97,7 @@ func ParseConfig(clusterFile string) (*v3.RancherKubernetesEngineConfig, error)
return &rkeConfig, nil
}
func ParseCluster(rkeConfig *v3.RancherKubernetesEngineConfig, clusterFilePath string, dockerDialerFactory, healthcheckDialerFactory hosts.DialerFactory) (*Cluster, error) {
func ParseCluster(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, clusterFilePath string, dockerDialerFactory, healthcheckDialerFactory hosts.DialerFactory) (*Cluster, error) {
var err error
c := &Cluster{
RancherKubernetesEngineConfig: *rkeConfig,
@@ -104,7 +106,7 @@ func ParseCluster(rkeConfig *v3.RancherKubernetesEngineConfig, clusterFilePath s
HealthcheckDialerFactory: healthcheckDialerFactory,
}
// Setting cluster Defaults
c.setClusterDefaults()
c.setClusterDefaults(ctx)
if err := c.InvertIndexHosts(); err != nil {
return nil, fmt.Errorf("Failed to classify hosts from config file: %v", err)
@@ -128,7 +130,7 @@ func ParseCluster(rkeConfig *v3.RancherKubernetesEngineConfig, clusterFilePath s
return c, nil
}
func (c *Cluster) setClusterDefaults() {
func (c *Cluster) setClusterDefaults(ctx context.Context) {
if len(c.SSHKeyPath) == 0 {
c.SSHKeyPath = DefaultClusterSSHKeyPath
}
@@ -148,7 +150,7 @@ func (c *Cluster) setClusterDefaults() {
c.Authorization.Mode = DefaultAuthorizationMode
}
if c.Services.KubeAPI.PodSecurityPolicy && c.Authorization.Mode != services.RBACAuthorizationMode {
logrus.Warnf("PodSecurityPolicy can't be enabled with RBAC support disabled")
log.Warnf(ctx, "PodSecurityPolicy can't be enabled with RBAC support disabled")
c.Services.KubeAPI.PodSecurityPolicy = false
}
c.setClusterServicesDefaults()
@@ -204,8 +206,8 @@ func GetLocalKubeConfig(configPath string) string {
return fmt.Sprintf("%s%s%s", baseDir, pki.KubeAdminConfigPrefix, fileName)
}
func rebuildLocalAdminConfig(kubeCluster *Cluster) error {
logrus.Infof("[reconcile] Rebuilding and updating local kube config")
func rebuildLocalAdminConfig(ctx context.Context, kubeCluster *Cluster) error {
log.Infof(ctx, "[reconcile] Rebuilding and updating local kube config")
var workingConfig, newConfig string
currentKubeConfig := kubeCluster.Certificates[pki.KubeAdminCommonName]
caCrt := kubeCluster.Certificates[pki.CACertName].Certificate
@@ -220,12 +222,12 @@ func rebuildLocalAdminConfig(kubeCluster *Cluster) error {
keyData := string(cert.EncodePrivateKeyPEM(currentKubeConfig.Key))
newConfig = pki.GetKubeConfigX509WithData(kubeURL, pki.KubeAdminCommonName, caData, crtData, keyData)
}
if err := pki.DeployAdminConfig(newConfig, kubeCluster.LocalKubeConfigPath); err != nil {
if err := pki.DeployAdminConfig(ctx, newConfig, kubeCluster.LocalKubeConfigPath); err != nil {
return fmt.Errorf("Failed to redeploy local admin config with new host")
}
workingConfig = newConfig
if _, err := GetK8sVersion(kubeCluster.LocalKubeConfigPath); err == nil {
logrus.Infof("[reconcile] host [%s] is active master on the cluster", cpHost.Address)
log.Infof(ctx, "[reconcile] host [%s] is active master on the cluster", cpHost.Address)
break
}
}
@@ -234,9 +236,9 @@ func rebuildLocalAdminConfig(kubeCluster *Cluster) error {
return nil
}
func isLocalConfigWorking(localKubeConfigPath string) bool {
func isLocalConfigWorking(ctx context.Context, localKubeConfigPath string) bool {
if _, err := GetK8sVersion(localKubeConfigPath); err != nil {
logrus.Infof("[reconcile] Local config is not vaild, rebuilding admin config")
log.Infof(ctx, "[reconcile] Local config is not vaild, rebuilding admin config")
return false
}
return true
@@ -263,23 +265,23 @@ func getLocalAdminConfigWithNewAddress(localConfigPath, cpAddress string) string
string(config.KeyData))
}
func (c *Cluster) ApplyAuthzResources() error {
if err := authz.ApplyJobDeployerServiceAccount(c.LocalKubeConfigPath); err != nil {
func (c *Cluster) ApplyAuthzResources(ctx context.Context) error {
if err := authz.ApplyJobDeployerServiceAccount(ctx, c.LocalKubeConfigPath); err != nil {
return fmt.Errorf("Failed to apply the ServiceAccount needed for job execution: %v", err)
}
if c.Authorization.Mode == NoneAuthorizationMode {
return nil
}
if c.Authorization.Mode == services.RBACAuthorizationMode {
if err := authz.ApplySystemNodeClusterRoleBinding(c.LocalKubeConfigPath); err != nil {
if err := authz.ApplySystemNodeClusterRoleBinding(ctx, c.LocalKubeConfigPath); err != nil {
return fmt.Errorf("Failed to apply the ClusterRoleBinding needed for node authorization: %v", err)
}
}
if c.Authorization.Mode == services.RBACAuthorizationMode && c.Services.KubeAPI.PodSecurityPolicy {
if err := authz.ApplyDefaultPodSecurityPolicy(c.LocalKubeConfigPath); err != nil {
if err := authz.ApplyDefaultPodSecurityPolicy(ctx, c.LocalKubeConfigPath); err != nil {
return fmt.Errorf("Failed to apply default PodSecurityPolicy: %v", err)
}
if err := authz.ApplyDefaultPodSecurityPolicyRole(c.LocalKubeConfigPath); err != nil {
if err := authz.ApplyDefaultPodSecurityPolicyRole(ctx, c.LocalKubeConfigPath); err != nil {
return fmt.Errorf("Failed to apply default PodSecurityPolicy ClusterRole and ClusterRoleBinding: %v", err)
}
}

View File

@@ -3,26 +3,29 @@ package cluster
import (
"fmt"
"context"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/log"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/services"
"github.com/sirupsen/logrus"
)
func (c *Cluster) TunnelHosts() error {
func (c *Cluster) TunnelHosts(ctx context.Context) error {
for i := range c.EtcdHosts {
if err := c.EtcdHosts[i].TunnelUp(c.DockerDialerFactory); err != nil {
if err := c.EtcdHosts[i].TunnelUp(ctx, c.DockerDialerFactory); err != nil {
return fmt.Errorf("Failed to set up SSH tunneling for Etcd host [%s]: %v", c.EtcdHosts[i].Address, err)
}
}
for i := range c.ControlPlaneHosts {
err := c.ControlPlaneHosts[i].TunnelUp(c.DockerDialerFactory)
err := c.ControlPlaneHosts[i].TunnelUp(ctx, c.DockerDialerFactory)
if err != nil {
return fmt.Errorf("Failed to set up SSH tunneling for Control host [%s]: %v", c.ControlPlaneHosts[i].Address, err)
}
}
for i := range c.WorkerHosts {
if err := c.WorkerHosts[i].TunnelUp(c.DockerDialerFactory); err != nil {
if err := c.WorkerHosts[i].TunnelUp(ctx, c.DockerDialerFactory); err != nil {
return fmt.Errorf("Failed to set up SSH tunneling for Worker host [%s]: %v", c.WorkerHosts[i].Address, err)
}
}
@@ -59,22 +62,22 @@ func (c *Cluster) InvertIndexHosts() error {
return nil
}
func (c *Cluster) SetUpHosts() error {
func (c *Cluster) SetUpHosts(ctx context.Context) error {
if c.Authentication.Strategy == X509AuthenticationProvider {
logrus.Infof("[certificates] Deploying kubernetes certificates to Cluster nodes")
err := pki.DeployCertificatesOnMasters(c.ControlPlaneHosts, c.Certificates, c.SystemImages[CertDownloaderImage])
log.Infof(ctx, "[certificates] Deploying kubernetes certificates to Cluster nodes")
err := pki.DeployCertificatesOnMasters(ctx, c.ControlPlaneHosts, c.Certificates, c.SystemImages[CertDownloaderImage])
if err != nil {
return err
}
err = pki.DeployCertificatesOnWorkers(c.WorkerHosts, c.Certificates, c.SystemImages[CertDownloaderImage])
err = pki.DeployCertificatesOnWorkers(ctx, c.WorkerHosts, c.Certificates, c.SystemImages[CertDownloaderImage])
if err != nil {
return err
}
err = pki.DeployAdminConfig(c.Certificates[pki.KubeAdminCommonName].Config, c.LocalKubeConfigPath)
err = pki.DeployAdminConfig(ctx, c.Certificates[pki.KubeAdminCommonName].Config, c.LocalKubeConfigPath)
if err != nil {
return err
}
logrus.Infof("[certificates] Successfully deployed kubernetes certificates to Cluster nodes")
log.Infof(ctx, "[certificates] Successfully deployed kubernetes certificates to Cluster nodes")
}
return nil
}

View File

@@ -1,12 +1,13 @@
package cluster
import (
"context"
"fmt"
"github.com/rancher/rke/log"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/services"
"github.com/rancher/rke/templates"
"github.com/sirupsen/logrus"
)
const (
@@ -63,23 +64,23 @@ const (
RBACConfig = "RBACConfig"
)
func (c *Cluster) DeployNetworkPlugin() error {
logrus.Infof("[network] Setting up network plugin: %s", c.Network.Plugin)
func (c *Cluster) DeployNetworkPlugin(ctx context.Context) error {
log.Infof(ctx, "[network] Setting up network plugin: %s", c.Network.Plugin)
switch c.Network.Plugin {
case FlannelNetworkPlugin:
return c.doFlannelDeploy()
return c.doFlannelDeploy(ctx)
case CalicoNetworkPlugin:
return c.doCalicoDeploy()
return c.doCalicoDeploy(ctx)
case CanalNetworkPlugin:
return c.doCanalDeploy()
return c.doCanalDeploy(ctx)
case WeaveNetworkPlugin:
return c.doWeaveDeploy()
return c.doWeaveDeploy(ctx)
default:
return fmt.Errorf("[network] Unsupported network plugin: %s", c.Network.Plugin)
}
}
func (c *Cluster) doFlannelDeploy() error {
func (c *Cluster) doFlannelDeploy(ctx context.Context) error {
flannelConfig := map[string]string{
ClusterCIDR: c.ClusterCIDR,
Image: c.Network.Options[FlannelImage],
@@ -91,10 +92,10 @@ func (c *Cluster) doFlannelDeploy() error {
if err != nil {
return err
}
return c.doAddonDeploy(pluginYaml, NetworkPluginResourceName)
return c.doAddonDeploy(ctx, pluginYaml, NetworkPluginResourceName)
}
func (c *Cluster) doCalicoDeploy() error {
func (c *Cluster) doCalicoDeploy(ctx context.Context) error {
calicoConfig := map[string]string{
EtcdEndpoints: services.GetEtcdConnString(c.EtcdHosts),
APIRoot: "https://127.0.0.1:6443",
@@ -114,10 +115,10 @@ func (c *Cluster) doCalicoDeploy() error {
if err != nil {
return err
}
return c.doAddonDeploy(pluginYaml, NetworkPluginResourceName)
return c.doAddonDeploy(ctx, pluginYaml, NetworkPluginResourceName)
}
func (c *Cluster) doCanalDeploy() error {
func (c *Cluster) doCanalDeploy(ctx context.Context) error {
canalConfig := map[string]string{
ClientCert: pki.KubeNodeCertPath,
APIRoot: "https://127.0.0.1:6443",
@@ -134,10 +135,10 @@ func (c *Cluster) doCanalDeploy() error {
if err != nil {
return err
}
return c.doAddonDeploy(pluginYaml, NetworkPluginResourceName)
return c.doAddonDeploy(ctx, pluginYaml, NetworkPluginResourceName)
}
func (c *Cluster) doWeaveDeploy() error {
func (c *Cluster) doWeaveDeploy(ctx context.Context) error {
weaveConfig := map[string]string{
ClusterCIDR: c.ClusterCIDR,
Image: c.Network.Options[WeaveImage],
@@ -148,7 +149,7 @@ func (c *Cluster) doWeaveDeploy() error {
if err != nil {
return err
}
return c.doAddonDeploy(pluginYaml, NetworkPluginResourceName)
return c.doAddonDeploy(ctx, pluginYaml, NetworkPluginResourceName)
}
func (c *Cluster) setClusterNetworkDefaults() {

View File

@@ -1,19 +1,21 @@
package cluster
import (
"context"
"fmt"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/k8s"
"github.com/rancher/rke/log"
"github.com/rancher/rke/services"
"github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
)
func ReconcileCluster(kubeCluster, currentCluster *Cluster) error {
logrus.Infof("[reconcile] Reconciling cluster state")
func ReconcileCluster(ctx context.Context, kubeCluster, currentCluster *Cluster) error {
log.Infof(ctx, "[reconcile] Reconciling cluster state")
if currentCluster == nil {
logrus.Infof("[reconcile] This is newly generated cluster")
log.Infof(ctx, "[reconcile] This is newly generated cluster")
return nil
}
@@ -22,36 +24,36 @@ func ReconcileCluster(kubeCluster, currentCluster *Cluster) error {
return fmt.Errorf("Failed to initialize new kubernetes client: %v", err)
}
if err := reconcileWorker(currentCluster, kubeCluster, kubeClient); err != nil {
if err := reconcileWorker(ctx, currentCluster, kubeCluster, kubeClient); err != nil {
return err
}
if err := reconcileControl(currentCluster, kubeCluster, kubeClient); err != nil {
if err := reconcileControl(ctx, currentCluster, kubeCluster, kubeClient); err != nil {
return err
}
logrus.Infof("[reconcile] Reconciled cluster state successfully")
log.Infof(ctx, "[reconcile] Reconciled cluster state successfully")
return nil
}
func reconcileWorker(currentCluster, kubeCluster *Cluster, kubeClient *kubernetes.Clientset) error {
func reconcileWorker(ctx context.Context, currentCluster, kubeCluster *Cluster, kubeClient *kubernetes.Clientset) error {
// worker deleted first to avoid issues when worker+controller on same host
logrus.Debugf("[reconcile] Check worker hosts to be deleted")
wpToDelete := hosts.GetToDeleteHosts(currentCluster.WorkerHosts, kubeCluster.WorkerHosts)
for _, toDeleteHost := range wpToDelete {
toDeleteHost.IsWorker = false
if err := hosts.DeleteNode(toDeleteHost, kubeClient, toDeleteHost.IsControl); err != nil {
if err := hosts.DeleteNode(ctx, toDeleteHost, kubeClient, toDeleteHost.IsControl); err != nil {
return fmt.Errorf("Failed to delete worker node %s from cluster", toDeleteHost.Address)
}
// attempting to clean services/files on the host
if err := reconcileHost(toDeleteHost, true, currentCluster.SystemImages[AplineImage], currentCluster.DockerDialerFactory); err != nil {
logrus.Warnf("[reconcile] Couldn't clean up worker node [%s]: %v", toDeleteHost.Address, err)
if err := reconcileHost(ctx, toDeleteHost, true, currentCluster.SystemImages[AplineImage], currentCluster.DockerDialerFactory); err != nil {
log.Warnf(ctx, "[reconcile] Couldn't clean up worker node [%s]: %v", toDeleteHost.Address, err)
continue
}
}
return nil
}
func reconcileControl(currentCluster, kubeCluster *Cluster, kubeClient *kubernetes.Clientset) error {
func reconcileControl(ctx context.Context, currentCluster, kubeCluster *Cluster, kubeClient *kubernetes.Clientset) error {
logrus.Debugf("[reconcile] Check Control plane hosts to be deleted")
selfDeleteAddress, err := getLocalConfigAddress(kubeCluster.LocalKubeConfigPath)
if err != nil {
@@ -71,24 +73,24 @@ func reconcileControl(currentCluster, kubeCluster *Cluster, kubeClient *kubernet
if err != nil {
return fmt.Errorf("Failed to initialize new kubernetes client: %v", err)
}
if err := hosts.DeleteNode(toDeleteHost, kubeClient, toDeleteHost.IsWorker); err != nil {
if err := hosts.DeleteNode(ctx, toDeleteHost, kubeClient, toDeleteHost.IsWorker); err != nil {
return fmt.Errorf("Failed to delete controlplane node %s from cluster", toDeleteHost.Address)
}
// attempting to clean services/files on the host
if err := reconcileHost(toDeleteHost, false, currentCluster.SystemImages[AplineImage], currentCluster.DockerDialerFactory); err != nil {
logrus.Warnf("[reconcile] Couldn't clean up controlplane node [%s]: %v", toDeleteHost.Address, err)
if err := reconcileHost(ctx, toDeleteHost, false, currentCluster.SystemImages[AplineImage], currentCluster.DockerDialerFactory); err != nil {
log.Warnf(ctx, "[reconcile] Couldn't clean up controlplane node [%s]: %v", toDeleteHost.Address, err)
continue
}
}
// rebuilding local admin config to enable saving cluster state
if err := rebuildLocalAdminConfig(kubeCluster); err != nil {
if err := rebuildLocalAdminConfig(ctx, kubeCluster); err != nil {
return err
}
// Rolling update on change for nginx Proxy
cpChanged := hosts.IsHostListChanged(currentCluster.ControlPlaneHosts, kubeCluster.ControlPlaneHosts)
if cpChanged {
logrus.Infof("[reconcile] Rolling update nginx hosts with new list of control plane hosts")
err := services.RollingUpdateNginxProxy(kubeCluster.ControlPlaneHosts, kubeCluster.WorkerHosts, currentCluster.SystemImages[NginxProxyImage])
log.Infof(ctx, "[reconcile] Rolling update nginx hosts with new list of control plane hosts")
err := services.RollingUpdateNginxProxy(ctx, kubeCluster.ControlPlaneHosts, kubeCluster.WorkerHosts, currentCluster.SystemImages[NginxProxyImage])
if err != nil {
return fmt.Errorf("Failed to rolling update Nginx hosts with new control plane hosts")
}
@@ -96,22 +98,22 @@ func reconcileControl(currentCluster, kubeCluster *Cluster, kubeClient *kubernet
return nil
}
func reconcileHost(toDeleteHost *hosts.Host, worker bool, cleanerImage string, dialerFactory hosts.DialerFactory) error {
if err := toDeleteHost.TunnelUp(dialerFactory); err != nil {
func reconcileHost(ctx context.Context, toDeleteHost *hosts.Host, worker bool, cleanerImage string, dialerFactory hosts.DialerFactory) error {
if err := toDeleteHost.TunnelUp(ctx, dialerFactory); err != nil {
return fmt.Errorf("Not able to reach the host: %v", err)
}
if worker {
if err := services.RemoveWorkerPlane([]*hosts.Host{toDeleteHost}, false); err != nil {
if err := services.RemoveWorkerPlane(ctx, []*hosts.Host{toDeleteHost}, false); err != nil {
return fmt.Errorf("Couldn't remove worker plane: %v", err)
}
if err := toDeleteHost.CleanUpWorkerHost(services.ControlRole, cleanerImage); err != nil {
if err := toDeleteHost.CleanUpWorkerHost(ctx, services.ControlRole, cleanerImage); err != nil {
return fmt.Errorf("Not able to clean the host: %v", err)
}
} else {
if err := services.RemoveControlPlane([]*hosts.Host{toDeleteHost}, false); err != nil {
if err := services.RemoveControlPlane(ctx, []*hosts.Host{toDeleteHost}, false); err != nil {
return fmt.Errorf("Couldn't remove control plane: %v", err)
}
if err := toDeleteHost.CleanUpControlHost(services.WorkerRole, cleanerImage); err != nil {
if err := toDeleteHost.CleanUpControlHost(ctx, services.WorkerRole, cleanerImage); err != nil {
return fmt.Errorf("Not able to clean the host: %v", err)
}
}

View File

@@ -1,44 +1,46 @@
package cluster
import (
"context"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/services"
)
func (c *Cluster) ClusterRemove() error {
func (c *Cluster) ClusterRemove(ctx context.Context) error {
// Remove Worker Plane
if err := services.RemoveWorkerPlane(c.WorkerHosts, true); err != nil {
if err := services.RemoveWorkerPlane(ctx, c.WorkerHosts, true); err != nil {
return err
}
// Remove Contol Plane
if err := services.RemoveControlPlane(c.ControlPlaneHosts, true); err != nil {
if err := services.RemoveControlPlane(ctx, c.ControlPlaneHosts, true); err != nil {
return err
}
// Remove Etcd Plane
if err := services.RemoveEtcdPlane(c.EtcdHosts); err != nil {
if err := services.RemoveEtcdPlane(ctx, c.EtcdHosts); err != nil {
return err
}
// Clean up all hosts
if err := cleanUpHosts(c.ControlPlaneHosts, c.WorkerHosts, c.EtcdHosts, c.SystemImages[AplineImage]); err != nil {
if err := cleanUpHosts(ctx, c.ControlPlaneHosts, c.WorkerHosts, c.EtcdHosts, c.SystemImages[AplineImage]); err != nil {
return err
}
pki.RemoveAdminConfig(c.LocalKubeConfigPath)
pki.RemoveAdminConfig(ctx, c.LocalKubeConfigPath)
return nil
}
func cleanUpHosts(cpHosts, workerHosts, etcdHosts []*hosts.Host, cleanerImage string) error {
func cleanUpHosts(ctx context.Context, cpHosts, workerHosts, etcdHosts []*hosts.Host, cleanerImage string) error {
allHosts := []*hosts.Host{}
allHosts = append(allHosts, cpHosts...)
allHosts = append(allHosts, workerHosts...)
allHosts = append(allHosts, etcdHosts...)
for _, host := range allHosts {
if err := host.CleanUpAll(cleanerImage); err != nil {
if err := host.CleanUpAll(ctx, cleanerImage); err != nil {
return err
}
}

View File

@@ -1,47 +1,49 @@
package cluster
import (
"context"
"fmt"
"os"
"time"
"github.com/rancher/rke/k8s"
"github.com/rancher/rke/log"
"github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus"
yaml "gopkg.in/yaml.v2"
"gopkg.in/yaml.v2"
"k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
)
func (c *Cluster) SaveClusterState(rkeConfig *v3.RancherKubernetesEngineConfig) error {
func (c *Cluster) SaveClusterState(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig) error {
// Reinitialize kubernetes Client
var err error
c.KubeClient, err = k8s.NewClient(c.LocalKubeConfigPath)
if err != nil {
return fmt.Errorf("Failed to re-initialize Kubernetes Client: %v", err)
}
err = saveClusterCerts(c.KubeClient, c.Certificates)
err = saveClusterCerts(ctx, c.KubeClient, c.Certificates)
if err != nil {
return fmt.Errorf("[certificates] Failed to Save Kubernetes certificates: %v", err)
}
err = saveStateToKubernetes(c.KubeClient, c.LocalKubeConfigPath, rkeConfig)
err = saveStateToKubernetes(ctx, c.KubeClient, c.LocalKubeConfigPath, rkeConfig)
if err != nil {
return fmt.Errorf("[state] Failed to save configuration state: %v", err)
}
return nil
}
func (c *Cluster) GetClusterState() (*Cluster, error) {
func (c *Cluster) GetClusterState(ctx context.Context) (*Cluster, error) {
var err error
var currentCluster *Cluster
// check if local kubeconfig file exists
if _, err = os.Stat(c.LocalKubeConfigPath); !os.IsNotExist(err) {
logrus.Infof("[state] Found local kube config file, trying to get state from cluster")
log.Infof(ctx, "[state] Found local kube config file, trying to get state from cluster")
// to handle if current local admin is down and we need to use new cp from the list
if !isLocalConfigWorking(c.LocalKubeConfigPath) {
if err := rebuildLocalAdminConfig(c); err != nil {
if !isLocalConfigWorking(ctx, c.LocalKubeConfigPath) {
if err := rebuildLocalAdminConfig(ctx, c); err != nil {
return nil, err
}
}
@@ -49,20 +51,20 @@ func (c *Cluster) GetClusterState() (*Cluster, error) {
// initiate kubernetes client
c.KubeClient, err = k8s.NewClient(c.LocalKubeConfigPath)
if err != nil {
logrus.Warnf("Failed to initiate new Kubernetes Client: %v", err)
log.Warnf(ctx, "Failed to initiate new Kubernetes Client: %v", err)
return nil, nil
}
// Get previous kubernetes state
currentCluster = getStateFromKubernetes(c.KubeClient, c.LocalKubeConfigPath)
currentCluster = getStateFromKubernetes(ctx, c.KubeClient, c.LocalKubeConfigPath)
// Get previous kubernetes certificates
if currentCluster != nil {
currentCluster.Certificates, err = getClusterCerts(c.KubeClient)
currentCluster.Certificates, err = getClusterCerts(ctx, c.KubeClient)
currentCluster.DockerDialerFactory = c.DockerDialerFactory
if err != nil {
return nil, fmt.Errorf("Failed to Get Kubernetes certificates: %v", err)
}
// setting cluster defaults for the fetched cluster as well
currentCluster.setClusterDefaults()
currentCluster.setClusterDefaults(ctx)
if err := currentCluster.InvertIndexHosts(); err != nil {
return nil, fmt.Errorf("Failed to classify hosts from fetched cluster: %v", err)
@@ -76,8 +78,8 @@ func (c *Cluster) GetClusterState() (*Cluster, error) {
return currentCluster, nil
}
func saveStateToKubernetes(kubeClient *kubernetes.Clientset, kubeConfigPath string, rkeConfig *v3.RancherKubernetesEngineConfig) error {
logrus.Infof("[state] Saving cluster state to Kubernetes")
func saveStateToKubernetes(ctx context.Context, kubeClient *kubernetes.Clientset, kubeConfigPath string, rkeConfig *v3.RancherKubernetesEngineConfig) error {
log.Infof(ctx, "[state] Saving cluster state to Kubernetes")
clusterFile, err := yaml.Marshal(*rkeConfig)
if err != nil {
return err
@@ -90,7 +92,7 @@ func saveStateToKubernetes(kubeClient *kubernetes.Clientset, kubeConfigPath stri
time.Sleep(time.Second * 5)
continue
}
logrus.Infof("[state] Successfully Saved cluster state to Kubernetes ConfigMap: %s", StateConfigMapName)
log.Infof(ctx, "[state] Successfully Saved cluster state to Kubernetes ConfigMap: %s", StateConfigMapName)
timeout <- true
break
}
@@ -103,8 +105,8 @@ func saveStateToKubernetes(kubeClient *kubernetes.Clientset, kubeConfigPath stri
}
}
func getStateFromKubernetes(kubeClient *kubernetes.Clientset, kubeConfigPath string) *Cluster {
logrus.Infof("[state] Fetching cluster state from Kubernetes")
func getStateFromKubernetes(ctx context.Context, kubeClient *kubernetes.Clientset, kubeConfigPath string) *Cluster {
log.Infof(ctx, "[state] Fetching cluster state from Kubernetes")
var cfgMap *v1.ConfigMap
var currentCluster Cluster
var err error
@@ -116,7 +118,7 @@ func getStateFromKubernetes(kubeClient *kubernetes.Clientset, kubeConfigPath str
time.Sleep(time.Second * 5)
continue
}
logrus.Infof("[state] Successfully Fetched cluster state to Kubernetes ConfigMap: %s", StateConfigMapName)
log.Infof(ctx, "[state] Successfully Fetched cluster state to Kubernetes ConfigMap: %s", StateConfigMapName)
timeout <- true
break
}
@@ -130,7 +132,7 @@ func getStateFromKubernetes(kubeClient *kubernetes.Clientset, kubeConfigPath str
}
return &currentCluster
case <-time.After(time.Second * GetStateTimeout):
logrus.Infof("Timed out waiting for kubernetes cluster to get state")
log.Infof(ctx, "Timed out waiting for kubernetes cluster to get state")
return nil
}
}

View File

@@ -2,12 +2,14 @@ package cmd
import (
"bufio"
"context"
"fmt"
"os"
"strings"
"github.com/rancher/rke/cluster"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/log"
"github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
@@ -34,25 +36,25 @@ func RemoveCommand() cli.Command {
}
}
func ClusterRemove(rkeConfig *v3.RancherKubernetesEngineConfig, dialerFactory hosts.DialerFactory) error {
logrus.Infof("Tearing down Kubernetes cluster")
kubeCluster, err := cluster.ParseCluster(rkeConfig, clusterFilePath, dialerFactory, nil)
func ClusterRemove(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, dialerFactory hosts.DialerFactory) error {
log.Infof(ctx, "Tearing down Kubernetes cluster")
kubeCluster, err := cluster.ParseCluster(ctx, rkeConfig, clusterFilePath, dialerFactory, nil)
if err != nil {
return err
}
err = kubeCluster.TunnelHosts()
err = kubeCluster.TunnelHosts(ctx)
if err != nil {
return err
}
logrus.Debugf("Starting Cluster removal")
err = kubeCluster.ClusterRemove()
err = kubeCluster.ClusterRemove(ctx)
if err != nil {
return err
}
logrus.Infof("Cluster removed successfully")
log.Infof(ctx, "Cluster removed successfully")
return nil
}
@@ -80,5 +82,5 @@ func clusterRemoveFromCli(ctx *cli.Context) error {
if err != nil {
return fmt.Errorf("Failed to parse cluster file: %v", err)
}
return ClusterRemove(rkeConfig, nil)
return ClusterRemove(context.Background(), rkeConfig, nil)
}

View File

@@ -1,13 +1,14 @@
package cmd
import (
"context"
"fmt"
"github.com/rancher/rke/cluster"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/log"
"github.com/rancher/rke/pki"
"github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
"k8s.io/client-go/util/cert"
)
@@ -31,20 +32,20 @@ func UpCommand() cli.Command {
}
}
func ClusterUp(rkeConfig *v3.RancherKubernetesEngineConfig, dockerDialerFactory, healthcheckDialerFactory hosts.DialerFactory) (string, string, string, string, error) {
logrus.Infof("Building Kubernetes cluster")
func ClusterUp(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, dockerDialerFactory, healthcheckDialerFactory hosts.DialerFactory) (string, string, string, string, error) {
log.Infof(ctx, "Building Kubernetes cluster")
var APIURL, caCrt, clientCert, clientKey string
kubeCluster, err := cluster.ParseCluster(rkeConfig, clusterFilePath, dockerDialerFactory, healthcheckDialerFactory)
kubeCluster, err := cluster.ParseCluster(ctx, rkeConfig, clusterFilePath, dockerDialerFactory, healthcheckDialerFactory)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, err
}
err = kubeCluster.TunnelHosts()
err = kubeCluster.TunnelHosts(ctx)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, err
}
currentCluster, err := kubeCluster.GetClusterState()
currentCluster, err := kubeCluster.GetClusterState(ctx)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, err
}
@@ -53,47 +54,47 @@ func ClusterUp(rkeConfig *v3.RancherKubernetesEngineConfig, dockerDialerFactory,
return APIURL, caCrt, clientCert, clientKey, err
}
err = cluster.SetUpAuthentication(kubeCluster, currentCluster)
err = cluster.SetUpAuthentication(ctx, kubeCluster, currentCluster)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, err
}
err = cluster.ReconcileCluster(kubeCluster, currentCluster)
err = cluster.ReconcileCluster(ctx, kubeCluster, currentCluster)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, err
}
err = kubeCluster.SetUpHosts()
err = kubeCluster.SetUpHosts(ctx)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, err
}
err = kubeCluster.DeployControlPlane()
err = kubeCluster.DeployControlPlane(ctx)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, err
}
err = kubeCluster.SaveClusterState(rkeConfig)
err = kubeCluster.SaveClusterState(ctx, rkeConfig)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, err
}
err = kubeCluster.DeployWorkerPlane()
err = kubeCluster.DeployWorkerPlane(ctx)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, err
}
err = kubeCluster.DeployNetworkPlugin()
err = kubeCluster.DeployNetworkPlugin(ctx)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, err
}
err = kubeCluster.DeployK8sAddOns()
err = kubeCluster.DeployK8sAddOns(ctx)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, err
}
err = kubeCluster.DeployUserAddOns()
err = kubeCluster.DeployUserAddOns(ctx)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, err
}
@@ -103,7 +104,7 @@ func ClusterUp(rkeConfig *v3.RancherKubernetesEngineConfig, dockerDialerFactory,
clientCert = string(cert.EncodeCertPEM(kubeCluster.Certificates[pki.KubeAdminCommonName].Certificate))
clientKey = string(cert.EncodePrivateKeyPEM(kubeCluster.Certificates[pki.KubeAdminCommonName].Key))
logrus.Infof("Finished building Kubernetes cluster successfully")
log.Infof(ctx, "Finished building Kubernetes cluster successfully")
return APIURL, caCrt, clientCert, clientKey, nil
}
@@ -118,6 +119,6 @@ func clusterUpFromCli(ctx *cli.Context) error {
if err != nil {
return fmt.Errorf("Failed to parse cluster file: %v", err)
}
_, _, _, _, err = ClusterUp(rkeConfig, nil, nil)
_, _, _, _, err = ClusterUp(context.Background(), rkeConfig, nil, nil)
return err
}

View File

@@ -12,6 +12,7 @@ import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/rancher/rke/log"
"github.com/sirupsen/logrus"
)
@@ -19,101 +20,101 @@ var K8sDockerVersions = map[string][]string{
"1.8": {"1.12.6", "1.13.1", "17.03.2"},
}
func DoRunContainer(dClient *client.Client, imageCfg *container.Config, hostCfg *container.HostConfig, containerName string, hostname string, plane string) error {
isRunning, err := IsContainerRunning(dClient, hostname, containerName, false)
func DoRunContainer(ctx context.Context, dClient *client.Client, imageCfg *container.Config, hostCfg *container.HostConfig, containerName string, hostname string, plane string) error {
isRunning, err := IsContainerRunning(ctx, dClient, hostname, containerName, false)
if err != nil {
return err
}
if isRunning {
logrus.Infof("[%s] Container [%s] is already running on host [%s]", plane, containerName, hostname)
isUpgradable, err := IsContainerUpgradable(dClient, imageCfg, containerName, hostname, plane)
log.Infof(ctx, "[%s] Container [%s] is already running on host [%s]", plane, containerName, hostname)
isUpgradable, err := IsContainerUpgradable(ctx, dClient, imageCfg, containerName, hostname, plane)
if err != nil {
return err
}
if isUpgradable {
return DoRollingUpdateContainer(dClient, imageCfg, hostCfg, containerName, hostname, plane)
return DoRollingUpdateContainer(ctx, dClient, imageCfg, hostCfg, containerName, hostname, plane)
}
return nil
}
err = UseLocalOrPull(dClient, hostname, imageCfg.Image, plane)
err = UseLocalOrPull(ctx, dClient, hostname, imageCfg.Image, plane)
if err != nil {
return err
}
resp, err := dClient.ContainerCreate(context.Background(), imageCfg, hostCfg, nil, containerName)
resp, err := dClient.ContainerCreate(ctx, imageCfg, hostCfg, nil, containerName)
if err != nil {
return fmt.Errorf("Failed to create [%s] container on host [%s]: %v", containerName, hostname, err)
}
if err := dClient.ContainerStart(context.Background(), resp.ID, types.ContainerStartOptions{}); err != nil {
if err := dClient.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil {
return fmt.Errorf("Failed to start [%s] container on host [%s]: %v", containerName, hostname, err)
}
logrus.Debugf("[%s] Successfully started [%s] container: [%s]", plane, containerName, resp.ID)
logrus.Infof("[%s] Successfully started [%s] container on host [%s]", plane, containerName, hostname)
log.Infof(ctx, "[%s] Successfully started [%s] container on host [%s]", plane, containerName, hostname)
return nil
}
func DoRollingUpdateContainer(dClient *client.Client, imageCfg *container.Config, hostCfg *container.HostConfig, containerName, hostname, plane string) error {
func DoRollingUpdateContainer(ctx context.Context, dClient *client.Client, imageCfg *container.Config, hostCfg *container.HostConfig, containerName, hostname, plane string) error {
logrus.Debugf("[%s] Checking for deployed [%s]", plane, containerName)
isRunning, err := IsContainerRunning(dClient, hostname, containerName, false)
isRunning, err := IsContainerRunning(ctx, dClient, hostname, containerName, false)
if err != nil {
return err
}
if !isRunning {
logrus.Infof("[%s] Container %s is not running on host [%s]", plane, containerName, hostname)
log.Infof(ctx, "[%s] Container %s is not running on host [%s]", plane, containerName, hostname)
return nil
}
err = UseLocalOrPull(dClient, hostname, imageCfg.Image, plane)
err = UseLocalOrPull(ctx, dClient, hostname, imageCfg.Image, plane)
if err != nil {
return err
}
logrus.Debugf("[%s] Stopping old container", plane)
oldContainerName := "old-" + containerName
if err := StopRenameContainer(dClient, hostname, containerName, oldContainerName); err != nil {
if err := StopRenameContainer(ctx, dClient, hostname, containerName, oldContainerName); err != nil {
return err
}
logrus.Infof("[%s] Successfully stopped old container %s on host [%s]", plane, containerName, hostname)
_, err = CreateContiner(dClient, hostname, containerName, imageCfg, hostCfg)
log.Infof(ctx, "[%s] Successfully stopped old container %s on host [%s]", plane, containerName, hostname)
_, err = CreateContiner(ctx, dClient, hostname, containerName, imageCfg, hostCfg)
if err != nil {
return fmt.Errorf("Failed to create [%s] container on host [%s]: %v", containerName, hostname, err)
}
if err := StartContainer(dClient, hostname, containerName); err != nil {
if err := StartContainer(ctx, dClient, hostname, containerName); err != nil {
return fmt.Errorf("Failed to start [%s] container on host [%s]: %v", containerName, hostname, err)
}
logrus.Infof("[%s] Successfully updated [%s] container on host [%s]", plane, containerName, hostname)
log.Infof(ctx, "[%s] Successfully updated [%s] container on host [%s]", plane, containerName, hostname)
logrus.Debugf("[%s] Removing old container", plane)
err = RemoveContainer(dClient, hostname, oldContainerName)
err = RemoveContainer(ctx, dClient, hostname, oldContainerName)
return err
}
func DoRemoveContainer(dClient *client.Client, containerName, hostname string) error {
logrus.Infof("[remove/%s] Checking if container is running on host [%s]", containerName, hostname)
func DoRemoveContainer(ctx context.Context, dClient *client.Client, containerName, hostname string) error {
log.Infof(ctx, "[remove/%s] Checking if container is running on host [%s]", containerName, hostname)
// not using the wrapper to check if the error is a NotFound error
_, err := dClient.ContainerInspect(context.Background(), containerName)
_, err := dClient.ContainerInspect(ctx, containerName)
if err != nil {
if client.IsErrNotFound(err) {
logrus.Infof("[remove/%s] Container doesn't exist on host [%s]", containerName, hostname)
log.Infof(ctx, "[remove/%s] Container doesn't exist on host [%s]", containerName, hostname)
return nil
}
return err
}
logrus.Infof("[remove/%s] Stopping container on host [%s]", containerName, hostname)
err = StopContainer(dClient, hostname, containerName)
log.Infof(ctx, "[remove/%s] Stopping container on host [%s]", containerName, hostname)
err = StopContainer(ctx, dClient, hostname, containerName)
if err != nil {
return err
}
logrus.Infof("[remove/%s] Removing container on host [%s]", containerName, hostname)
err = RemoveContainer(dClient, hostname, containerName)
log.Infof(ctx, "[remove/%s] Removing container on host [%s]", containerName, hostname)
err = RemoveContainer(ctx, dClient, hostname, containerName)
if err != nil {
return err
}
logrus.Infof("[remove/%s] Successfully removed container on host [%s]", containerName, hostname)
log.Infof(ctx, "[remove/%s] Successfully removed container on host [%s]", containerName, hostname)
return nil
}
func IsContainerRunning(dClient *client.Client, hostname string, containerName string, all bool) (bool, error) {
func IsContainerRunning(ctx context.Context, dClient *client.Client, hostname string, containerName string, all bool) (bool, error) {
logrus.Debugf("Checking if container [%s] is running on host [%s]", containerName, hostname)
containers, err := dClient.ContainerList(context.Background(), types.ContainerListOptions{All: all})
containers, err := dClient.ContainerList(ctx, types.ContainerListOptions{All: all})
if err != nil {
return false, fmt.Errorf("Can't get Docker containers for host [%s]: %v", hostname, err)
@@ -126,9 +127,9 @@ func IsContainerRunning(dClient *client.Client, hostname string, containerName s
return false, nil
}
func localImageExists(dClient *client.Client, hostname string, containerImage string) (bool, error) {
func localImageExists(ctx context.Context, dClient *client.Client, hostname string, containerImage string) (bool, error) {
logrus.Debugf("Checking if image [%s] exists on host [%s]", containerImage, hostname)
_, _, err := dClient.ImageInspectWithRaw(context.Background(), containerImage)
_, _, err := dClient.ImageInspectWithRaw(ctx, containerImage)
if err != nil {
if client.IsErrNotFound(err) {
logrus.Debugf("Image [%s] does not exist on host [%s]: %v", containerImage, hostname, err)
@@ -140,8 +141,8 @@ func localImageExists(dClient *client.Client, hostname string, containerImage st
return true, nil
}
func pullImage(dClient *client.Client, hostname string, containerImage string) error {
out, err := dClient.ImagePull(context.Background(), containerImage, types.ImagePullOptions{})
func pullImage(ctx context.Context, dClient *client.Client, hostname string, containerImage string) error {
out, err := dClient.ImagePull(ctx, containerImage, types.ImagePullOptions{})
if err != nil {
return fmt.Errorf("Can't pull Docker image [%s] for host [%s]: %v", containerImage, hostname, err)
}
@@ -155,84 +156,84 @@ func pullImage(dClient *client.Client, hostname string, containerImage string) e
return nil
}
func UseLocalOrPull(dClient *client.Client, hostname string, containerImage string, plane string) error {
logrus.Infof("[%s] Checking image [%s] on host [%s]", plane, containerImage, hostname)
imageExists, err := localImageExists(dClient, hostname, containerImage)
func UseLocalOrPull(ctx context.Context, dClient *client.Client, hostname string, containerImage string, plane string) error {
log.Infof(ctx, "[%s] Checking image [%s] on host [%s]", plane, containerImage, hostname)
imageExists, err := localImageExists(ctx, dClient, hostname, containerImage)
if err != nil {
return err
}
if imageExists {
logrus.Infof("[%s] No pull necessary, image [%s] exists on host [%s]", plane, containerImage, hostname)
log.Infof(ctx, "[%s] No pull necessary, image [%s] exists on host [%s]", plane, containerImage, hostname)
return nil
}
logrus.Infof("[%s] Pulling image [%s] on host [%s]", plane, containerImage, hostname)
if err := pullImage(dClient, hostname, containerImage); err != nil {
log.Infof(ctx, "[%s] Pulling image [%s] on host [%s]", plane, containerImage, hostname)
if err := pullImage(ctx, dClient, hostname, containerImage); err != nil {
return err
}
logrus.Infof("[%s] Successfully pulled image [%s] on host [%s]", plane, containerImage, hostname)
log.Infof(ctx, "[%s] Successfully pulled image [%s] on host [%s]", plane, containerImage, hostname)
return nil
}
func RemoveContainer(dClient *client.Client, hostname string, containerName string) error {
err := dClient.ContainerRemove(context.Background(), containerName, types.ContainerRemoveOptions{})
func RemoveContainer(ctx context.Context, dClient *client.Client, hostname string, containerName string) error {
err := dClient.ContainerRemove(ctx, containerName, types.ContainerRemoveOptions{})
if err != nil {
return fmt.Errorf("Can't remove Docker container [%s] for host [%s]: %v", containerName, hostname, err)
}
return nil
}
func StopContainer(dClient *client.Client, hostname string, containerName string) error {
err := dClient.ContainerStop(context.Background(), containerName, nil)
func StopContainer(ctx context.Context, dClient *client.Client, hostname string, containerName string) error {
err := dClient.ContainerStop(ctx, containerName, nil)
if err != nil {
return fmt.Errorf("Can't stop Docker container [%s] for host [%s]: %v", containerName, hostname, err)
}
return nil
}
func RenameContainer(dClient *client.Client, hostname string, oldContainerName string, newContainerName string) error {
err := dClient.ContainerRename(context.Background(), oldContainerName, newContainerName)
func RenameContainer(ctx context.Context, dClient *client.Client, hostname string, oldContainerName string, newContainerName string) error {
err := dClient.ContainerRename(ctx, oldContainerName, newContainerName)
if err != nil {
return fmt.Errorf("Can't rename Docker container [%s] for host [%s]: %v", oldContainerName, hostname, err)
}
return nil
}
func StartContainer(dClient *client.Client, hostname string, containerName string) error {
if err := dClient.ContainerStart(context.Background(), containerName, types.ContainerStartOptions{}); err != nil {
func StartContainer(ctx context.Context, dClient *client.Client, hostname string, containerName string) error {
if err := dClient.ContainerStart(ctx, containerName, types.ContainerStartOptions{}); err != nil {
return fmt.Errorf("Failed to start [%s] container on host [%s]: %v", containerName, hostname, err)
}
return nil
}
func CreateContiner(dClient *client.Client, hostname string, containerName string, imageCfg *container.Config, hostCfg *container.HostConfig) (container.ContainerCreateCreatedBody, error) {
created, err := dClient.ContainerCreate(context.Background(), imageCfg, hostCfg, nil, containerName)
func CreateContiner(ctx context.Context, dClient *client.Client, hostname string, containerName string, imageCfg *container.Config, hostCfg *container.HostConfig) (container.ContainerCreateCreatedBody, error) {
created, err := dClient.ContainerCreate(ctx, imageCfg, hostCfg, nil, containerName)
if err != nil {
return container.ContainerCreateCreatedBody{}, fmt.Errorf("Failed to create [%s] container on host [%s]: %v", containerName, hostname, err)
}
return created, nil
}
func InspectContainer(dClient *client.Client, hostname string, containerName string) (types.ContainerJSON, error) {
inspection, err := dClient.ContainerInspect(context.Background(), containerName)
func InspectContainer(ctx context.Context, dClient *client.Client, hostname string, containerName string) (types.ContainerJSON, error) {
inspection, err := dClient.ContainerInspect(ctx, containerName)
if err != nil {
return types.ContainerJSON{}, fmt.Errorf("Failed to inspect [%s] container on host [%s]: %v", containerName, hostname, err)
}
return inspection, nil
}
func StopRenameContainer(dClient *client.Client, hostname string, oldContainerName string, newContainerName string) error {
if err := StopContainer(dClient, hostname, oldContainerName); err != nil {
func StopRenameContainer(ctx context.Context, dClient *client.Client, hostname string, oldContainerName string, newContainerName string) error {
if err := StopContainer(ctx, dClient, hostname, oldContainerName); err != nil {
return err
}
if err := WaitForContainer(dClient, oldContainerName); err != nil {
if err := WaitForContainer(ctx, dClient, oldContainerName); err != nil {
return nil
}
err := RenameContainer(dClient, hostname, oldContainerName, newContainerName)
err := RenameContainer(ctx, dClient, hostname, oldContainerName, newContainerName)
return err
}
func WaitForContainer(dClient *client.Client, containerName string) error {
statusCh, errCh := dClient.ContainerWait(context.Background(), containerName, container.WaitConditionNotRunning)
func WaitForContainer(ctx context.Context, dClient *client.Client, containerName string) error {
statusCh, errCh := dClient.ContainerWait(ctx, containerName, container.WaitConditionNotRunning)
select {
case err := <-errCh:
if err != nil {
@@ -243,11 +244,11 @@ func WaitForContainer(dClient *client.Client, containerName string) error {
return nil
}
func IsContainerUpgradable(dClient *client.Client, imageCfg *container.Config, containerName string, hostname string, plane string) (bool, error) {
func IsContainerUpgradable(ctx context.Context, dClient *client.Client, imageCfg *container.Config, containerName string, hostname string, plane string) (bool, error) {
logrus.Debugf("[%s] Checking if container [%s] is eligible for upgrade on host [%s]", plane, containerName, hostname)
// this should be moved to a higher layer.
containerInspect, err := InspectContainer(dClient, hostname, containerName)
containerInspect, err := InspectContainer(ctx, dClient, hostname, containerName)
if err != nil {
return false, err
}

View File

@@ -1,14 +1,15 @@
package hosts
import (
"context"
"fmt"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/k8s"
"github.com/rancher/rke/log"
"github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/kubernetes"
)
@@ -31,8 +32,8 @@ const (
CleanerContainerName = "kube-cleaner"
)
func (h *Host) CleanUpAll(cleanerImage string) error {
logrus.Infof("[hosts] Cleaning up host [%s]", h.Address)
func (h *Host) CleanUpAll(ctx context.Context, cleanerImage string) error {
log.Infof(ctx, "[hosts] Cleaning up host [%s]", h.Address)
toCleanPaths := []string{
ToCleanEtcdDir,
ToCleanSSLDir,
@@ -40,12 +41,12 @@ func (h *Host) CleanUpAll(cleanerImage string) error {
ToCleanCNIBin,
ToCleanCalicoRun,
}
return h.CleanUp(toCleanPaths, cleanerImage)
return h.CleanUp(ctx, toCleanPaths, cleanerImage)
}
func (h *Host) CleanUpWorkerHost(controlRole, cleanerImage string) error {
func (h *Host) CleanUpWorkerHost(ctx context.Context, controlRole, cleanerImage string) error {
if h.IsControl {
logrus.Infof("[hosts] Host [%s] is already a controlplane host, skipping cleanup.", h.Address)
log.Infof(ctx, "[hosts] Host [%s] is already a controlplane host, skipping cleanup.", h.Address)
return nil
}
toCleanPaths := []string{
@@ -54,12 +55,12 @@ func (h *Host) CleanUpWorkerHost(controlRole, cleanerImage string) error {
ToCleanCNIBin,
ToCleanCalicoRun,
}
return h.CleanUp(toCleanPaths, cleanerImage)
return h.CleanUp(ctx, toCleanPaths, cleanerImage)
}
func (h *Host) CleanUpControlHost(workerRole, cleanerImage string) error {
func (h *Host) CleanUpControlHost(ctx context.Context, workerRole, cleanerImage string) error {
if h.IsWorker {
logrus.Infof("[hosts] Host [%s] is already a worker host, skipping cleanup.", h.Address)
log.Infof(ctx, "[hosts] Host [%s] is already a worker host, skipping cleanup.", h.Address)
return nil
}
toCleanPaths := []string{
@@ -68,38 +69,38 @@ func (h *Host) CleanUpControlHost(workerRole, cleanerImage string) error {
ToCleanCNIBin,
ToCleanCalicoRun,
}
return h.CleanUp(toCleanPaths, cleanerImage)
return h.CleanUp(ctx, toCleanPaths, cleanerImage)
}
func (h *Host) CleanUp(toCleanPaths []string, cleanerImage string) error {
logrus.Infof("[hosts] Cleaning up host [%s]", h.Address)
func (h *Host) CleanUp(ctx context.Context, toCleanPaths []string, cleanerImage string) error {
log.Infof(ctx, "[hosts] Cleaning up host [%s]", h.Address)
imageCfg, hostCfg := buildCleanerConfig(h, toCleanPaths, cleanerImage)
logrus.Infof("[hosts] Running cleaner container on host [%s]", h.Address)
if err := docker.DoRunContainer(h.DClient, imageCfg, hostCfg, CleanerContainerName, h.Address, CleanerContainerName); err != nil {
log.Infof(ctx, "[hosts] Running cleaner container on host [%s]", h.Address)
if err := docker.DoRunContainer(ctx, h.DClient, imageCfg, hostCfg, CleanerContainerName, h.Address, CleanerContainerName); err != nil {
return err
}
if err := docker.WaitForContainer(h.DClient, CleanerContainerName); err != nil {
if err := docker.WaitForContainer(ctx, h.DClient, CleanerContainerName); err != nil {
return err
}
logrus.Infof("[hosts] Removing cleaner container on host [%s]", h.Address)
if err := docker.RemoveContainer(h.DClient, h.Address, CleanerContainerName); err != nil {
log.Infof(ctx, "[hosts] Removing cleaner container on host [%s]", h.Address)
if err := docker.RemoveContainer(ctx, h.DClient, h.Address, CleanerContainerName); err != nil {
return err
}
logrus.Infof("[hosts] Successfully cleaned up host [%s]", h.Address)
log.Infof(ctx, "[hosts] Successfully cleaned up host [%s]", h.Address)
return nil
}
func DeleteNode(toDeleteHost *Host, kubeClient *kubernetes.Clientset, hasAnotherRole bool) error {
func DeleteNode(ctx context.Context, toDeleteHost *Host, kubeClient *kubernetes.Clientset, hasAnotherRole bool) error {
if hasAnotherRole {
logrus.Infof("[hosts] host [%s] has another role, skipping delete from kubernetes cluster", toDeleteHost.Address)
log.Infof(ctx, "[hosts] host [%s] has another role, skipping delete from kubernetes cluster", toDeleteHost.Address)
return nil
}
logrus.Infof("[hosts] Cordoning host [%s]", toDeleteHost.Address)
log.Infof(ctx, "[hosts] Cordoning host [%s]", toDeleteHost.Address)
if _, err := k8s.GetNode(kubeClient, toDeleteHost.HostnameOverride); err != nil {
if apierrors.IsNotFound(err) {
logrus.Warnf("[hosts] Can't find node by name [%s]", toDeleteHost.Address)
log.Warnf(ctx, "[hosts] Can't find node by name [%s]", toDeleteHost.Address)
return nil
}
return err
@@ -108,11 +109,11 @@ func DeleteNode(toDeleteHost *Host, kubeClient *kubernetes.Clientset, hasAnother
if err := k8s.CordonUncordon(kubeClient, toDeleteHost.HostnameOverride, true); err != nil {
return err
}
logrus.Infof("[hosts] Deleting host [%s] from the cluster", toDeleteHost.Address)
log.Infof(ctx, "[hosts] Deleting host [%s] from the cluster", toDeleteHost.Address)
if err := k8s.DeleteNode(kubeClient, toDeleteHost.HostnameOverride); err != nil {
return err
}
logrus.Infof("[hosts] Successfully deleted host [%s] from the cluster", toDeleteHost.Address)
log.Infof(ctx, "[hosts] Successfully deleted host [%s] from the cluster", toDeleteHost.Address)
return nil
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/docker/docker/client"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/log"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
"golang.org/x/crypto/ssh/terminal"
@@ -21,11 +22,11 @@ const (
K8sVersion = "1.8"
)
func (h *Host) TunnelUp(dialerFactory DialerFactory) error {
func (h *Host) TunnelUp(ctx context.Context, dialerFactory DialerFactory) error {
if h.DClient != nil {
return nil
}
logrus.Infof("[dialer] Setup tunnel for host [%s]", h.Address)
log.Infof(ctx, "[dialer] Setup tunnel for host [%s]", h.Address)
httpClient, err := h.newHTTPClient(dialerFactory)
if err != nil {
return fmt.Errorf("Can't establish dialer connection: %v", err)
@@ -37,7 +38,7 @@ func (h *Host) TunnelUp(dialerFactory DialerFactory) error {
if err != nil {
return fmt.Errorf("Can't initiate NewClient: %v", err)
}
info, err := h.DClient.Info(context.Background())
info, err := h.DClient.Info(ctx)
if err != nil {
return fmt.Errorf("Can't retrieve Docker Info: %v", err)
}
@@ -50,7 +51,7 @@ func (h *Host) TunnelUp(dialerFactory DialerFactory) error {
if !isvalid && !h.IgnoreDockerVersion {
return fmt.Errorf("Unsupported Docker version found [%s], supported versions are %v", info.ServerVersion, docker.K8sDockerVersions[K8sVersion])
} else if !isvalid {
logrus.Warnf("Unsupported Docker version found [%s], supported versions are %v", info.ServerVersion, docker.K8sDockerVersions[K8sVersion])
log.Warnf(ctx, "Unsupported Docker version found [%s], supported versions are %v", info.ServerVersion, docker.K8sDockerVersions[K8sVersion])
}
return nil

39
log/log.go Normal file
View File

@@ -0,0 +1,39 @@
package log
import (
"context"
"github.com/sirupsen/logrus"
)
type logKey string
const (
key logKey = "rke-logger"
)
type logger interface {
Infof(msg string, args ...interface{})
Warnf(msg string, args ...interface{})
}
func SetLogger(ctx context.Context, logger logger) context.Context {
return context.WithValue(ctx, key, logger)
}
func getLogger(ctx context.Context) logger {
logger, _ := ctx.Value(key).(logger)
if logger == nil {
return logrus.StandardLogger()
}
return logger
}
func Infof(ctx context.Context, msg string, args ...interface{}) {
getLogger(ctx).Infof(msg, args...)
}
func Warnf(ctx context.Context, msg string, args ...interface{}) {
getLogger(ctx).Warnf(msg, args...)
}

View File

@@ -11,10 +11,11 @@ import (
"github.com/docker/docker/api/types/container"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/log"
"github.com/sirupsen/logrus"
)
func DeployCertificatesOnMasters(cpHosts []*hosts.Host, crtMap map[string]CertificatePKI, certDownloaderImage string) error {
func DeployCertificatesOnMasters(ctx context.Context, cpHosts []*hosts.Host, crtMap map[string]CertificatePKI, certDownloaderImage string) error {
// list of certificates that should be deployed on the masters
crtList := []string{
CACertName,
@@ -31,7 +32,7 @@ func DeployCertificatesOnMasters(cpHosts []*hosts.Host, crtMap map[string]Certif
}
for i := range cpHosts {
err := doRunDeployer(cpHosts[i], env, certDownloaderImage)
err := doRunDeployer(ctx, cpHosts[i], env, certDownloaderImage)
if err != nil {
return err
}
@@ -39,7 +40,7 @@ func DeployCertificatesOnMasters(cpHosts []*hosts.Host, crtMap map[string]Certif
return nil
}
func DeployCertificatesOnWorkers(workerHosts []*hosts.Host, crtMap map[string]CertificatePKI, certDownloaderImage string) error {
func DeployCertificatesOnWorkers(ctx context.Context, workerHosts []*hosts.Host, crtMap map[string]CertificatePKI, certDownloaderImage string) error {
// list of certificates that should be deployed on the workers
crtList := []string{
CACertName,
@@ -53,7 +54,7 @@ func DeployCertificatesOnWorkers(workerHosts []*hosts.Host, crtMap map[string]Ce
}
for i := range workerHosts {
err := doRunDeployer(workerHosts[i], env, certDownloaderImage)
err := doRunDeployer(ctx, workerHosts[i], env, certDownloaderImage)
if err != nil {
return err
}
@@ -61,8 +62,8 @@ func DeployCertificatesOnWorkers(workerHosts []*hosts.Host, crtMap map[string]Ce
return nil
}
func doRunDeployer(host *hosts.Host, containerEnv []string, certDownloaderImage string) error {
if err := docker.UseLocalOrPull(host.DClient, host.Address, certDownloaderImage, CertificatesServiceName); err != nil {
func doRunDeployer(ctx context.Context, host *hosts.Host, containerEnv []string, certDownloaderImage string) error {
if err := docker.UseLocalOrPull(ctx, host.DClient, host.Address, certDownloaderImage, CertificatesServiceName); err != nil {
return err
}
imageCfg := &container.Config{
@@ -75,17 +76,17 @@ func doRunDeployer(host *hosts.Host, containerEnv []string, certDownloaderImage
},
Privileged: true,
}
resp, err := host.DClient.ContainerCreate(context.Background(), imageCfg, hostCfg, nil, CrtDownloaderContainer)
resp, err := host.DClient.ContainerCreate(ctx, imageCfg, hostCfg, nil, CrtDownloaderContainer)
if err != nil {
return fmt.Errorf("Failed to create Certificates deployer container on host [%s]: %v", host.Address, err)
}
if err := host.DClient.ContainerStart(context.Background(), resp.ID, types.ContainerStartOptions{}); err != nil {
if err := host.DClient.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil {
return fmt.Errorf("Failed to start Certificates deployer container on host [%s]: %v", host.Address, err)
}
logrus.Debugf("[certificates] Successfully started Certificate deployer container: %s", resp.ID)
for {
isDeployerRunning, err := docker.IsContainerRunning(host.DClient, host.Address, CrtDownloaderContainer, false)
isDeployerRunning, err := docker.IsContainerRunning(ctx, host.DClient, host.Address, CrtDownloaderContainer, false)
if err != nil {
return err
}
@@ -93,28 +94,28 @@ func doRunDeployer(host *hosts.Host, containerEnv []string, certDownloaderImage
time.Sleep(5 * time.Second)
continue
}
if err := host.DClient.ContainerRemove(context.Background(), resp.ID, types.ContainerRemoveOptions{}); err != nil {
if err := host.DClient.ContainerRemove(ctx, resp.ID, types.ContainerRemoveOptions{}); err != nil {
return fmt.Errorf("Failed to delete Certificates deployer container on host [%s]: %v", host.Address, err)
}
return nil
}
}
func DeployAdminConfig(kubeConfig, localConfigPath string) error {
func DeployAdminConfig(ctx context.Context, kubeConfig, localConfigPath string) error {
logrus.Debugf("Deploying admin Kubeconfig locally: %s", kubeConfig)
err := ioutil.WriteFile(localConfigPath, []byte(kubeConfig), 0640)
if err != nil {
return fmt.Errorf("Failed to create local admin kubeconfig file: %v", err)
}
logrus.Infof("Successfully Deployed local admin kubeconfig at [%s]", localConfigPath)
log.Infof(ctx, "Successfully Deployed local admin kubeconfig at [%s]", localConfigPath)
return nil
}
func RemoveAdminConfig(localConfigPath string) {
logrus.Infof("Removing local admin Kubeconfig: %s", localConfigPath)
func RemoveAdminConfig(ctx context.Context, localConfigPath string) {
log.Infof(ctx, "Removing local admin Kubeconfig: %s", localConfigPath)
if err := os.Remove(localConfigPath); err != nil {
logrus.Warningf("Failed to remove local admin Kubeconfig file: %v", err)
return
}
logrus.Infof("Local admin Kubeconfig removed successfully")
log.Infof(ctx, "Local admin Kubeconfig removed successfully")
}

View File

@@ -1,12 +1,14 @@
package pki
import (
"context"
"crypto/rsa"
"crypto/x509"
"fmt"
"net"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/log"
"github.com/sirupsen/logrus"
"k8s.io/client-go/util/cert"
)
@@ -27,19 +29,19 @@ type CertificatePKI struct {
}
// StartCertificatesGeneration ...
func StartCertificatesGeneration(cpHosts []*hosts.Host, workerHosts []*hosts.Host, clusterDomain, localConfigPath string, KubernetesServiceIP net.IP) (map[string]CertificatePKI, error) {
logrus.Infof("[certificates] Generating kubernetes certificates")
certs, err := generateCerts(cpHosts, clusterDomain, localConfigPath, KubernetesServiceIP)
func StartCertificatesGeneration(ctx context.Context, cpHosts []*hosts.Host, workerHosts []*hosts.Host, clusterDomain, localConfigPath string, KubernetesServiceIP net.IP) (map[string]CertificatePKI, error) {
log.Infof(ctx, "[certificates] Generating kubernetes certificates")
certs, err := generateCerts(ctx, cpHosts, clusterDomain, localConfigPath, KubernetesServiceIP)
if err != nil {
return nil, err
}
return certs, nil
}
func generateCerts(cpHosts []*hosts.Host, clusterDomain, localConfigPath string, KubernetesServiceIP net.IP) (map[string]CertificatePKI, error) {
func generateCerts(ctx context.Context, cpHosts []*hosts.Host, clusterDomain, localConfigPath string, KubernetesServiceIP net.IP) (map[string]CertificatePKI, error) {
certs := make(map[string]CertificatePKI)
// generate CA certificate and key
logrus.Infof("[certificates] Generating CA kubernetes certificates")
log.Infof(ctx, "[certificates] Generating CA kubernetes certificates")
caCrt, caKey, err := generateCACertAndKey()
if err != nil {
return nil, err
@@ -56,7 +58,7 @@ func generateCerts(cpHosts []*hosts.Host, clusterDomain, localConfigPath string,
}
// generate API certificate and key
logrus.Infof("[certificates] Generating Kubernetes API server certificates")
log.Infof(ctx, "[certificates] Generating Kubernetes API server certificates")
kubeAPIAltNames := GetAltNames(cpHosts, clusterDomain, KubernetesServiceIP)
kubeAPICrt, kubeAPIKey, err := GenerateKubeAPICertAndKey(caCrt, caKey, kubeAPIAltNames)
if err != nil {
@@ -74,7 +76,7 @@ func generateCerts(cpHosts []*hosts.Host, clusterDomain, localConfigPath string,
}
// generate Kube controller-manager certificate and key
logrus.Infof("[certificates] Generating Kube Controller certificates")
log.Infof(ctx, "[certificates] Generating Kube Controller certificates")
kubeControllerCrt, kubeControllerKey, err := generateClientCertAndKey(caCrt, caKey, KubeControllerCommonName, []string{})
if err != nil {
return nil, err
@@ -95,7 +97,7 @@ func generateCerts(cpHosts []*hosts.Host, clusterDomain, localConfigPath string,
}
// generate Kube scheduler certificate and key
logrus.Infof("[certificates] Generating Kube Scheduler certificates")
log.Infof(ctx, "[certificates] Generating Kube Scheduler certificates")
kubeSchedulerCrt, kubeSchedulerKey, err := generateClientCertAndKey(caCrt, caKey, KubeSchedulerCommonName, []string{})
if err != nil {
return nil, err
@@ -116,7 +118,7 @@ func generateCerts(cpHosts []*hosts.Host, clusterDomain, localConfigPath string,
}
// generate Kube Proxy certificate and key
logrus.Infof("[certificates] Generating Kube Proxy certificates")
log.Infof(ctx, "[certificates] Generating Kube Proxy certificates")
kubeProxyCrt, kubeProxyKey, err := generateClientCertAndKey(caCrt, caKey, KubeProxyCommonName, []string{})
if err != nil {
return nil, err
@@ -137,7 +139,7 @@ func generateCerts(cpHosts []*hosts.Host, clusterDomain, localConfigPath string,
}
// generate Kubelet certificate and key
logrus.Infof("[certificates] Generating Node certificate")
log.Infof(ctx, "[certificates] Generating Node certificate")
nodeCrt, nodeKey, err := generateClientCertAndKey(caCrt, caKey, KubeNodeCommonName, []string{KubeNodeOrganizationName})
if err != nil {
return nil, err
@@ -157,7 +159,7 @@ func generateCerts(cpHosts []*hosts.Host, clusterDomain, localConfigPath string,
ConfigEnvName: KubeNodeConfigENVName,
ConfigPath: KubeNodeCommonName,
}
logrus.Infof("[certificates] Generating admin certificates and kubeconfig")
log.Infof(ctx, "[certificates] Generating admin certificates and kubeconfig")
kubeAdminCrt, kubeAdminKey, err := generateClientCertAndKey(caCrt, caKey, KubeAdminCommonName, []string{KubeAdminOrganizationName})
if err != nil {
return nil, err

View File

@@ -1,6 +1,7 @@
package pki
import (
"context"
"crypto/x509"
"fmt"
"net"
@@ -27,7 +28,7 @@ func TestPKI(t *testing.T) {
DClient: nil,
},
}
certificateMap, err := StartCertificatesGeneration(cpHosts, cpHosts, FakeClusterDomain, "", net.ParseIP(FakeKubernetesServiceIP))
certificateMap, err := StartCertificatesGeneration(context.Background(), cpHosts, cpHosts, FakeClusterDomain, "", net.ParseIP(FakeKubernetesServiceIP))
if err != nil {
t.Fatalf("Failed To generate certificates: %v", err)
}

View File

@@ -1,81 +1,83 @@
package services
import (
"context"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/log"
"github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus"
)
func RunControlPlane(controlHosts, etcdHosts []*hosts.Host, controlServices v3.RKEConfigServices, sidekickImage, authorizationMode string, healthcheckDialerFactory hosts.DialerFactory) error {
logrus.Infof("[%s] Building up Controller Plane..", ControlRole)
func RunControlPlane(ctx context.Context, controlHosts, etcdHosts []*hosts.Host, controlServices v3.RKEConfigServices, sidekickImage, authorizationMode string, healthcheckDialerFactory hosts.DialerFactory) error {
log.Infof(ctx, "[%s] Building up Controller Plane..", ControlRole)
for _, host := range controlHosts {
if host.IsWorker {
if err := removeNginxProxy(host); err != nil {
if err := removeNginxProxy(ctx, host); err != nil {
return err
}
}
// run sidekick
if err := runSidekick(host, sidekickImage); err != nil {
if err := runSidekick(ctx, host, sidekickImage); err != nil {
return err
}
// run kubeapi
err := runKubeAPI(host, etcdHosts, controlServices.KubeAPI, authorizationMode, healthcheckDialerFactory)
err := runKubeAPI(ctx, host, etcdHosts, controlServices.KubeAPI, authorizationMode, healthcheckDialerFactory)
if err != nil {
return err
}
// run kubecontroller
err = runKubeController(host, controlServices.KubeController, authorizationMode, healthcheckDialerFactory)
err = runKubeController(ctx, host, controlServices.KubeController, authorizationMode, healthcheckDialerFactory)
if err != nil {
return err
}
// run scheduler
err = runScheduler(host, controlServices.Scheduler, healthcheckDialerFactory)
err = runScheduler(ctx, host, controlServices.Scheduler, healthcheckDialerFactory)
if err != nil {
return err
}
}
logrus.Infof("[%s] Successfully started Controller Plane..", ControlRole)
log.Infof(ctx, "[%s] Successfully started Controller Plane..", ControlRole)
return nil
}
func RemoveControlPlane(controlHosts []*hosts.Host, force bool) error {
logrus.Infof("[%s] Tearing down the Controller Plane..", ControlRole)
func RemoveControlPlane(ctx context.Context, controlHosts []*hosts.Host, force bool) error {
log.Infof(ctx, "[%s] Tearing down the Controller Plane..", ControlRole)
for _, host := range controlHosts {
// remove KubeAPI
if err := removeKubeAPI(host); err != nil {
if err := removeKubeAPI(ctx, host); err != nil {
return err
}
// remove KubeController
if err := removeKubeController(host); err != nil {
if err := removeKubeController(ctx, host); err != nil {
return nil
}
// remove scheduler
err := removeScheduler(host)
err := removeScheduler(ctx, host)
if err != nil {
return err
}
// check if the host already is a worker
if host.IsWorker {
logrus.Infof("[%s] Host [%s] is already a worker host, skipping delete kubelet and kubeproxy.", ControlRole, host.Address)
log.Infof(ctx, "[%s] Host [%s] is already a worker host, skipping delete kubelet and kubeproxy.", ControlRole, host.Address)
} else {
// remove KubeAPI
if err := removeKubelet(host); err != nil {
if err := removeKubelet(ctx, host); err != nil {
return err
}
// remove KubeController
if err := removeKubeproxy(host); err != nil {
if err := removeKubeproxy(ctx, host); err != nil {
return nil
}
// remove Sidekick
if err := removeSidekick(host); err != nil {
if err := removeSidekick(ctx, host); err != nil {
return err
}
}
}
logrus.Infof("[%s] Successfully teared down Controller Plane..", ControlRole)
log.Infof(ctx, "[%s] Successfully teared down Controller Plane..", ControlRole)
return nil
}

View File

@@ -3,37 +3,39 @@ package services
import (
"fmt"
"context"
"github.com/docker/docker/api/types/container"
"github.com/docker/go-connections/nat"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/log"
"github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus"
)
func RunEtcdPlane(etcdHosts []*hosts.Host, etcdService v3.ETCDService) error {
logrus.Infof("[%s] Building up Etcd Plane..", ETCDRole)
func RunEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, etcdService v3.ETCDService) error {
log.Infof(ctx, "[%s] Building up Etcd Plane..", ETCDRole)
initCluster := getEtcdInitialCluster(etcdHosts)
for _, host := range etcdHosts {
imageCfg, hostCfg := buildEtcdConfig(host, etcdService, initCluster)
err := docker.DoRunContainer(host.DClient, imageCfg, hostCfg, EtcdContainerName, host.Address, ETCDRole)
err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, EtcdContainerName, host.Address, ETCDRole)
if err != nil {
return err
}
}
logrus.Infof("[%s] Successfully started Etcd Plane..", ETCDRole)
log.Infof(ctx, "[%s] Successfully started Etcd Plane..", ETCDRole)
return nil
}
func RemoveEtcdPlane(etcdHosts []*hosts.Host) error {
logrus.Infof("[%s] Tearing down Etcd Plane..", ETCDRole)
func RemoveEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host) error {
log.Infof(ctx, "[%s] Tearing down Etcd Plane..", ETCDRole)
for _, host := range etcdHosts {
err := docker.DoRemoveContainer(host.DClient, EtcdContainerName, host.Address)
err := docker.DoRemoveContainer(ctx, host.DClient, EtcdContainerName, host.Address)
if err != nil {
return err
}
}
logrus.Infof("[%s] Successfully teared down Etcd Plane..", ETCDRole)
log.Infof(ctx, "[%s] Successfully teared down Etcd Plane..", ETCDRole)
return nil
}

View File

@@ -1,6 +1,7 @@
package services
import (
"context"
"crypto/tls"
"fmt"
"io/ioutil"
@@ -8,6 +9,7 @@ import (
"time"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/log"
"github.com/sirupsen/logrus"
)
@@ -18,8 +20,8 @@ const (
HTTPSProtoPrefix = "https://"
)
func runHealthcheck(host *hosts.Host, port int, useTLS bool, serviceName string, healthcheckDialerFactory hosts.DialerFactory) error {
logrus.Infof("[healthcheck] Start Healthcheck on service [%s] on host [%s]", serviceName, host.Address)
func runHealthcheck(ctx context.Context, host *hosts.Host, port int, useTLS bool, serviceName string, healthcheckDialerFactory hosts.DialerFactory) error {
log.Infof(ctx, "[healthcheck] Start Healthcheck on service [%s] on host [%s]", serviceName, host.Address)
client, err := getHealthCheckHTTPClient(host, port, healthcheckDialerFactory)
if err != nil {
return fmt.Errorf("Failed to initiate new HTTP client for service [%s] for host [%s]", serviceName, host.Address)
@@ -30,7 +32,7 @@ func runHealthcheck(host *hosts.Host, port int, useTLS bool, serviceName string,
time.Sleep(5 * time.Second)
continue
}
logrus.Infof("[healthcheck] service [%s] on host [%s] is healthy", serviceName, host.Address)
log.Infof(ctx, "[healthcheck] service [%s] on host [%s] is healthy", serviceName, host.Address)
return nil
}
return fmt.Errorf("Failed to verify healthcheck: %v", err)

View File

@@ -1,6 +1,7 @@
package services
import (
"context"
"fmt"
"github.com/docker/docker/api/types/container"
@@ -11,17 +12,17 @@ import (
"github.com/rancher/types/apis/management.cattle.io/v3"
)
func runKubeAPI(host *hosts.Host, etcdHosts []*hosts.Host, kubeAPIService v3.KubeAPIService, authorizationMode string, df hosts.DialerFactory) error {
func runKubeAPI(ctx context.Context, host *hosts.Host, etcdHosts []*hosts.Host, kubeAPIService v3.KubeAPIService, authorizationMode string, df hosts.DialerFactory) error {
etcdConnString := GetEtcdConnString(etcdHosts)
imageCfg, hostCfg := buildKubeAPIConfig(host, kubeAPIService, etcdConnString, authorizationMode)
if err := docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeAPIContainerName, host.Address, ControlRole); err != nil {
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeAPIContainerName, host.Address, ControlRole); err != nil {
return err
}
return runHealthcheck(host, KubeAPIPort, false, KubeAPIContainerName, df)
return runHealthcheck(ctx, host, KubeAPIPort, false, KubeAPIContainerName, df)
}
func removeKubeAPI(host *hosts.Host) error {
return docker.DoRemoveContainer(host.DClient, KubeAPIContainerName, host.Address)
func removeKubeAPI(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, KubeAPIContainerName, host.Address)
}
func buildKubeAPIConfig(host *hosts.Host, kubeAPIService v3.KubeAPIService, etcdConnString, authorizationMode string) (*container.Config, *container.HostConfig) {

View File

@@ -1,6 +1,7 @@
package services
import (
"context"
"fmt"
"github.com/docker/docker/api/types/container"
@@ -10,16 +11,16 @@ import (
"github.com/rancher/types/apis/management.cattle.io/v3"
)
func runKubeController(host *hosts.Host, kubeControllerService v3.KubeControllerService, authorizationMode string, df hosts.DialerFactory) error {
func runKubeController(ctx context.Context, host *hosts.Host, kubeControllerService v3.KubeControllerService, authorizationMode string, df hosts.DialerFactory) error {
imageCfg, hostCfg := buildKubeControllerConfig(kubeControllerService, authorizationMode)
if err := docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeControllerContainerName, host.Address, ControlRole); err != nil {
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeControllerContainerName, host.Address, ControlRole); err != nil {
return err
}
return runHealthcheck(host, KubeControllerPort, false, KubeControllerContainerName, df)
return runHealthcheck(ctx, host, KubeControllerPort, false, KubeControllerContainerName, df)
}
func removeKubeController(host *hosts.Host) error {
return docker.DoRemoveContainer(host.DClient, KubeControllerContainerName, host.Address)
func removeKubeController(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, KubeControllerContainerName, host.Address)
}
func buildKubeControllerConfig(kubeControllerService v3.KubeControllerService, authorizationMode string) (*container.Config, *container.HostConfig) {

View File

@@ -1,6 +1,7 @@
package services
import (
"context"
"fmt"
"github.com/docker/docker/api/types/container"
@@ -10,16 +11,16 @@ import (
"github.com/rancher/types/apis/management.cattle.io/v3"
)
func runKubelet(host *hosts.Host, kubeletService v3.KubeletService, df hosts.DialerFactory) error {
func runKubelet(ctx context.Context, host *hosts.Host, kubeletService v3.KubeletService, df hosts.DialerFactory) error {
imageCfg, hostCfg := buildKubeletConfig(host, kubeletService)
if err := docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeletContainerName, host.Address, WorkerRole); err != nil {
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeletContainerName, host.Address, WorkerRole); err != nil {
return err
}
return runHealthcheck(host, KubeletPort, true, KubeletContainerName, df)
return runHealthcheck(ctx, host, KubeletPort, true, KubeletContainerName, df)
}
func removeKubelet(host *hosts.Host) error {
return docker.DoRemoveContainer(host.DClient, KubeletContainerName, host.Address)
func removeKubelet(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, KubeletContainerName, host.Address)
}
func buildKubeletConfig(host *hosts.Host, kubeletService v3.KubeletService) (*container.Config, *container.HostConfig) {

View File

@@ -1,6 +1,7 @@
package services
import (
"context"
"fmt"
"github.com/docker/docker/api/types/container"
@@ -10,16 +11,16 @@ import (
"github.com/rancher/types/apis/management.cattle.io/v3"
)
func runKubeproxy(host *hosts.Host, kubeproxyService v3.KubeproxyService, df hosts.DialerFactory) error {
func runKubeproxy(ctx context.Context, host *hosts.Host, kubeproxyService v3.KubeproxyService, df hosts.DialerFactory) error {
imageCfg, hostCfg := buildKubeproxyConfig(host, kubeproxyService)
if err := docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeproxyContainerName, host.Address, WorkerRole); err != nil {
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeproxyContainerName, host.Address, WorkerRole); err != nil {
return err
}
return runHealthcheck(host, KubeproxyPort, false, KubeproxyContainerName, df)
return runHealthcheck(ctx, host, KubeproxyPort, false, KubeproxyContainerName, df)
}
func removeKubeproxy(host *hosts.Host) error {
return docker.DoRemoveContainer(host.DClient, KubeproxyContainerName, host.Address)
func removeKubeproxy(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, KubeproxyContainerName, host.Address)
}
func buildKubeproxyConfig(host *hosts.Host, kubeproxyService v3.KubeproxyService) (*container.Config, *container.HostConfig) {

View File

@@ -1,6 +1,7 @@
package services
import (
"context"
"fmt"
"github.com/docker/docker/api/types/container"
@@ -13,25 +14,25 @@ const (
NginxProxyEnvName = "CP_HOSTS"
)
func RollingUpdateNginxProxy(cpHosts []*hosts.Host, workerHosts []*hosts.Host, nginxProxyImage string) error {
func RollingUpdateNginxProxy(ctx context.Context, cpHosts []*hosts.Host, workerHosts []*hosts.Host, nginxProxyImage string) error {
nginxProxyEnv := buildProxyEnv(cpHosts)
for _, host := range workerHosts {
imageCfg, hostCfg := buildNginxProxyConfig(host, nginxProxyEnv, nginxProxyImage)
if err := docker.DoRollingUpdateContainer(host.DClient, imageCfg, hostCfg, NginxProxyContainerName, host.Address, WorkerRole); err != nil {
if err := docker.DoRollingUpdateContainer(ctx, host.DClient, imageCfg, hostCfg, NginxProxyContainerName, host.Address, WorkerRole); err != nil {
return err
}
}
return nil
}
func runNginxProxy(host *hosts.Host, cpHosts []*hosts.Host, nginxProxyImage string) error {
func runNginxProxy(ctx context.Context, host *hosts.Host, cpHosts []*hosts.Host, nginxProxyImage string) error {
nginxProxyEnv := buildProxyEnv(cpHosts)
imageCfg, hostCfg := buildNginxProxyConfig(host, nginxProxyEnv, nginxProxyImage)
return docker.DoRunContainer(host.DClient, imageCfg, hostCfg, NginxProxyContainerName, host.Address, WorkerRole)
return docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, NginxProxyContainerName, host.Address, WorkerRole)
}
func removeNginxProxy(host *hosts.Host) error {
return docker.DoRemoveContainer(host.DClient, NginxProxyContainerName, host.Address)
func removeNginxProxy(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, NginxProxyContainerName, host.Address)
}
func buildNginxProxyConfig(host *hosts.Host, nginxProxyEnv, nginxProxyImage string) (*container.Config, *container.HostConfig) {

View File

@@ -1,6 +1,7 @@
package services
import (
"context"
"fmt"
"github.com/docker/docker/api/types/container"
@@ -10,16 +11,16 @@ import (
"github.com/rancher/types/apis/management.cattle.io/v3"
)
func runScheduler(host *hosts.Host, schedulerService v3.SchedulerService, df hosts.DialerFactory) error {
func runScheduler(ctx context.Context, host *hosts.Host, schedulerService v3.SchedulerService, df hosts.DialerFactory) error {
imageCfg, hostCfg := buildSchedulerConfig(host, schedulerService)
if err := docker.DoRunContainer(host.DClient, imageCfg, hostCfg, SchedulerContainerName, host.Address, ControlRole); err != nil {
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, SchedulerContainerName, host.Address, ControlRole); err != nil {
return err
}
return runHealthcheck(host, SchedulerPort, false, SchedulerContainerName, df)
return runHealthcheck(ctx, host, SchedulerPort, false, SchedulerContainerName, df)
}
func removeScheduler(host *hosts.Host) error {
return docker.DoRemoveContainer(host.DClient, SchedulerContainerName, host.Address)
func removeScheduler(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, SchedulerContainerName, host.Address)
}
func buildSchedulerConfig(host *hosts.Host, schedulerService v3.SchedulerService) (*container.Config, *container.HostConfig) {

View File

@@ -1,13 +1,14 @@
package services
import (
"context"
"fmt"
"net"
"github.com/docker/docker/api/types/container"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts"
"github.com/sirupsen/logrus"
"github.com/rancher/rke/log"
)
const (
@@ -59,25 +60,25 @@ func buildSidekickConfig(sidekickImage string) (*container.Config, *container.Ho
return imageCfg, hostCfg
}
func runSidekick(host *hosts.Host, sidekickImage string) error {
isRunning, err := docker.IsContainerRunning(host.DClient, host.Address, SidekickContainerName, true)
func runSidekick(ctx context.Context, host *hosts.Host, sidekickImage string) error {
isRunning, err := docker.IsContainerRunning(ctx, host.DClient, host.Address, SidekickContainerName, true)
if err != nil {
return err
}
if isRunning {
logrus.Infof("[%s] Sidekick container already created on host [%s]", SidekickServiceName, host.Address)
log.Infof(ctx, "[%s] Sidekick container already created on host [%s]", SidekickServiceName, host.Address)
return nil
}
imageCfg, hostCfg := buildSidekickConfig(sidekickImage)
if err := docker.UseLocalOrPull(host.DClient, host.Address, sidekickImage, SidekickServiceName); err != nil {
if err := docker.UseLocalOrPull(ctx, host.DClient, host.Address, sidekickImage, SidekickServiceName); err != nil {
return err
}
if _, err := docker.CreateContiner(host.DClient, host.Address, SidekickContainerName, imageCfg, hostCfg); err != nil {
if _, err := docker.CreateContiner(ctx, host.DClient, host.Address, SidekickContainerName, imageCfg, hostCfg); err != nil {
return err
}
return nil
}
func removeSidekick(host *hosts.Host) error {
return docker.DoRemoveContainer(host.DClient, SidekickContainerName, host.Address)
func removeSidekick(ctx context.Context, host *hosts.Host) error {
return docker.DoRemoveContainer(ctx, host.DClient, SidekickContainerName, host.Address)
}

View File

@@ -1,21 +1,23 @@
package services
import (
"context"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/log"
"github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)
func RunWorkerPlane(controlHosts, workerHosts []*hosts.Host, workerServices v3.RKEConfigServices, nginxProxyImage, sidekickImage string, healthcheckDialerFactory hosts.DialerFactory) error {
logrus.Infof("[%s] Building up Worker Plane..", WorkerRole)
func RunWorkerPlane(ctx context.Context, controlHosts, workerHosts []*hosts.Host, workerServices v3.RKEConfigServices, nginxProxyImage, sidekickImage string, healthcheckDialerFactory hosts.DialerFactory) error {
log.Infof(ctx, "[%s] Building up Worker Plane..", WorkerRole)
var errgrp errgroup.Group
// Deploy worker components on control hosts
for _, host := range controlHosts {
controlHost := host
errgrp.Go(func() error {
return doDeployWorkerPlane(controlHost, workerServices, nginxProxyImage, sidekickImage, healthcheckDialerFactory, controlHosts)
return doDeployWorkerPlane(ctx, controlHost, workerServices, nginxProxyImage, sidekickImage, healthcheckDialerFactory, controlHosts)
})
}
if err := errgrp.Wait(); err != nil {
@@ -25,61 +27,61 @@ func RunWorkerPlane(controlHosts, workerHosts []*hosts.Host, workerServices v3.R
for _, host := range workerHosts {
workerHost := host
errgrp.Go(func() error {
return doDeployWorkerPlane(workerHost, workerServices, nginxProxyImage, sidekickImage, healthcheckDialerFactory, controlHosts)
return doDeployWorkerPlane(ctx, workerHost, workerServices, nginxProxyImage, sidekickImage, healthcheckDialerFactory, controlHosts)
})
}
if err := errgrp.Wait(); err != nil {
return err
}
logrus.Infof("[%s] Successfully started Worker Plane..", WorkerRole)
log.Infof(ctx, "[%s] Successfully started Worker Plane..", WorkerRole)
return nil
}
func RemoveWorkerPlane(workerHosts []*hosts.Host, force bool) error {
logrus.Infof("[%s] Tearing down Worker Plane..", WorkerRole)
func RemoveWorkerPlane(ctx context.Context, workerHosts []*hosts.Host, force bool) error {
log.Infof(ctx, "[%s] Tearing down Worker Plane..", WorkerRole)
for _, host := range workerHosts {
// check if the host already is a controlplane
if host.IsControl && !force {
logrus.Infof("[%s] Host [%s] is already a controlplane host, nothing to do.", WorkerRole, host.Address)
log.Infof(ctx, "[%s] Host [%s] is already a controlplane host, nothing to do.", WorkerRole, host.Address)
return nil
}
if err := removeKubelet(host); err != nil {
if err := removeKubelet(ctx, host); err != nil {
return err
}
if err := removeKubeproxy(host); err != nil {
if err := removeKubeproxy(ctx, host); err != nil {
return err
}
if err := removeNginxProxy(host); err != nil {
if err := removeNginxProxy(ctx, host); err != nil {
return err
}
if err := removeSidekick(host); err != nil {
if err := removeSidekick(ctx, host); err != nil {
return err
}
logrus.Infof("[%s] Successfully teared down Worker Plane..", WorkerRole)
log.Infof(ctx, "[%s] Successfully teared down Worker Plane..", WorkerRole)
}
return nil
}
func doDeployWorkerPlane(host *hosts.Host,
func doDeployWorkerPlane(ctx context.Context, host *hosts.Host,
workerServices v3.RKEConfigServices,
nginxProxyImage, sidekickImage string,
healthcheckDialerFactory hosts.DialerFactory,
controlHosts []*hosts.Host) error {
// run nginx proxy
if !host.IsControl {
if err := runNginxProxy(host, controlHosts, nginxProxyImage); err != nil {
if err := runNginxProxy(ctx, host, controlHosts, nginxProxyImage); err != nil {
return err
}
}
// run sidekick
if err := runSidekick(host, sidekickImage); err != nil {
if err := runSidekick(ctx, host, sidekickImage); err != nil {
return err
}
// run kubelet
if err := runKubelet(host, workerServices.Kubelet, healthcheckDialerFactory); err != nil {
if err := runKubelet(ctx, host, workerServices.Kubelet, healthcheckDialerFactory); err != nil {
return err
}
return runKubeproxy(host, workerServices.Kubeproxy, healthcheckDialerFactory)
return runKubeproxy(ctx, host, workerServices.Kubeproxy, healthcheckDialerFactory)
}