diff --git a/authz/authz.go b/authz/authz.go index fca11296..0ea5b79b 100644 --- a/authz/authz.go +++ b/authz/authz.go @@ -1,13 +1,15 @@ package authz import ( + "context" + "github.com/rancher/rke/k8s" + "github.com/rancher/rke/log" "github.com/rancher/rke/templates" - "github.com/sirupsen/logrus" ) -func ApplyJobDeployerServiceAccount(kubeConfigPath string) error { - logrus.Infof("[authz] Creating rke-job-deployer ServiceAccount") +func ApplyJobDeployerServiceAccount(ctx context.Context, kubeConfigPath string) error { + log.Infof(ctx, "[authz] Creating rke-job-deployer ServiceAccount") k8sClient, err := k8s.NewClient(kubeConfigPath) if err != nil { return err @@ -18,12 +20,12 @@ func ApplyJobDeployerServiceAccount(kubeConfigPath string) error { if err := k8s.UpdateServiceAccountFromYaml(k8sClient, templates.JobDeployerServiceAccount); err != nil { return err } - logrus.Infof("[authz] rke-job-deployer ServiceAccount created successfully") + log.Infof(ctx, "[authz] rke-job-deployer ServiceAccount created successfully") return nil } -func ApplySystemNodeClusterRoleBinding(kubeConfigPath string) error { - logrus.Infof("[authz] Creating system:node ClusterRoleBinding") +func ApplySystemNodeClusterRoleBinding(ctx context.Context, kubeConfigPath string) error { + log.Infof(ctx, "[authz] Creating system:node ClusterRoleBinding") k8sClient, err := k8s.NewClient(kubeConfigPath) if err != nil { return err @@ -31,6 +33,6 @@ func ApplySystemNodeClusterRoleBinding(kubeConfigPath string) error { if err := k8s.UpdateClusterRoleBindingFromYaml(k8sClient, templates.SystemNodeClusterRoleBinding); err != nil { return err } - logrus.Infof("[authz] system:node ClusterRoleBinding created successfully") + log.Infof(ctx, "[authz] system:node ClusterRoleBinding created successfully") return nil } diff --git a/authz/psp.go b/authz/psp.go index cef39f39..ddf849c2 100644 --- a/authz/psp.go +++ b/authz/psp.go @@ -1,13 +1,15 @@ package authz import ( + "context" + "github.com/rancher/rke/k8s" + "github.com/rancher/rke/log" "github.com/rancher/rke/templates" - "github.com/sirupsen/logrus" ) -func ApplyDefaultPodSecurityPolicy(kubeConfigPath string) error { - logrus.Infof("[authz] Applying default PodSecurityPolicy") +func ApplyDefaultPodSecurityPolicy(ctx context.Context, kubeConfigPath string) error { + log.Infof(ctx, "[authz] Applying default PodSecurityPolicy") k8sClient, err := k8s.NewClient(kubeConfigPath) if err != nil { return err @@ -15,12 +17,12 @@ func ApplyDefaultPodSecurityPolicy(kubeConfigPath string) error { if err := k8s.UpdatePodSecurityPolicyFromYaml(k8sClient, templates.DefaultPodSecurityPolicy); err != nil { return err } - logrus.Infof("[authz] Default PodSecurityPolicy applied successfully") + log.Infof(ctx, "[authz] Default PodSecurityPolicy applied successfully") return nil } -func ApplyDefaultPodSecurityPolicyRole(kubeConfigPath string) error { - logrus.Infof("[authz] Applying default PodSecurityPolicy Role and RoleBinding") +func ApplyDefaultPodSecurityPolicyRole(ctx context.Context, kubeConfigPath string) error { + log.Infof(ctx, "[authz] Applying default PodSecurityPolicy Role and RoleBinding") k8sClient, err := k8s.NewClient(kubeConfigPath) if err != nil { return err @@ -31,6 +33,6 @@ func ApplyDefaultPodSecurityPolicyRole(kubeConfigPath string) error { if err := k8s.UpdateRoleBindingFromYaml(k8sClient, templates.DefaultPodSecurityRoleBinding); err != nil { return err } - logrus.Infof("[authz] Default PodSecurityPolicy Role and RoleBinding applied successfully") + log.Infof(ctx, "[authz] Default PodSecurityPolicy Role and RoleBinding applied successfully") return nil } diff --git a/cluster/addons.go b/cluster/addons.go index faccb13f..c7d16c3d 100644 --- a/cluster/addons.go +++ b/cluster/addons.go @@ -1,12 +1,13 @@ package cluster import ( + "context" "fmt" "time" "github.com/rancher/rke/addons" "github.com/rancher/rke/k8s" - "github.com/sirupsen/logrus" + "github.com/rancher/rke/log" ) const ( @@ -14,28 +15,28 @@ const ( UserAddonResourceName = "rke-user-addon" ) -func (c *Cluster) DeployK8sAddOns() error { - err := c.deployKubeDNS() +func (c *Cluster) DeployK8sAddOns(ctx context.Context) error { + err := c.deployKubeDNS(ctx) return err } -func (c *Cluster) DeployUserAddOns() error { - logrus.Infof("[addons] Setting up user addons..") +func (c *Cluster) DeployUserAddOns(ctx context.Context) error { + log.Infof(ctx, "[addons] Setting up user addons..") if c.Addons == "" { - logrus.Infof("[addons] No user addons configured..") + log.Infof(ctx, "[addons] No user addons configured..") return nil } - if err := c.doAddonDeploy(c.Addons, UserAddonResourceName); err != nil { + if err := c.doAddonDeploy(ctx, c.Addons, UserAddonResourceName); err != nil { return err } - logrus.Infof("[addons] User addon deployed successfully..") + log.Infof(ctx, "[addons] User addon deployed successfully..") return nil } -func (c *Cluster) deployKubeDNS() error { - logrus.Infof("[addons] Setting up KubeDNS") +func (c *Cluster) deployKubeDNS(ctx context.Context) error { + log.Infof(ctx, "[addons] Setting up KubeDNS") kubeDNSConfig := map[string]string{ addons.KubeDNSServer: c.ClusterDNSServer, addons.KubeDNSClusterDomain: c.ClusterDomain, @@ -48,22 +49,22 @@ func (c *Cluster) deployKubeDNS() error { if err != nil { return err } - if err := c.doAddonDeploy(kubeDNSYaml, KubeDNSAddonResourceName); err != nil { + if err := c.doAddonDeploy(ctx, kubeDNSYaml, KubeDNSAddonResourceName); err != nil { return err } - logrus.Infof("[addons] KubeDNS deployed successfully..") + log.Infof(ctx, "[addons] KubeDNS deployed successfully..") return nil } -func (c *Cluster) doAddonDeploy(addonYaml, resourceName string) error { +func (c *Cluster) doAddonDeploy(ctx context.Context, addonYaml, resourceName string) error { - err := c.StoreAddonConfigMap(addonYaml, resourceName) + err := c.StoreAddonConfigMap(ctx, addonYaml, resourceName) if err != nil { return fmt.Errorf("Failed to save addon ConfigMap: %v", err) } - logrus.Infof("[addons] Executing deploy job..") + log.Infof(ctx, "[addons] Executing deploy job..") addonJob, err := addons.GetAddonsExcuteJob(resourceName, c.ControlPlaneHosts[0].HostnameOverride, c.Services.KubeAPI.Image) if err != nil { @@ -76,8 +77,8 @@ func (c *Cluster) doAddonDeploy(addonYaml, resourceName string) error { return nil } -func (c *Cluster) StoreAddonConfigMap(addonYaml string, addonName string) error { - logrus.Infof("[addons] Saving addon ConfigMap to Kubernetes") +func (c *Cluster) StoreAddonConfigMap(ctx context.Context, addonYaml string, addonName string) error { + log.Infof(ctx, "[addons] Saving addon ConfigMap to Kubernetes") kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath) if err != nil { return err @@ -91,7 +92,7 @@ func (c *Cluster) StoreAddonConfigMap(addonYaml string, addonName string) error fmt.Println(err) continue } - logrus.Infof("[addons] Successfully Saved addon to Kubernetes ConfigMap: %s", addonName) + log.Infof(ctx, "[addons] Successfully Saved addon to Kubernetes ConfigMap: %s", addonName) timeout <- true break } diff --git a/cluster/certificates.go b/cluster/certificates.go index c5fe407c..768d560a 100644 --- a/cluster/certificates.go +++ b/cluster/certificates.go @@ -1,24 +1,26 @@ package cluster import ( + "context" "crypto/rsa" "fmt" "time" "github.com/rancher/rke/k8s" + "github.com/rancher/rke/log" "github.com/rancher/rke/pki" "github.com/sirupsen/logrus" "k8s.io/client-go/kubernetes" "k8s.io/client-go/util/cert" ) -func SetUpAuthentication(kubeCluster, currentCluster *Cluster) error { +func SetUpAuthentication(ctx context.Context, kubeCluster, currentCluster *Cluster) error { if kubeCluster.Authentication.Strategy == X509AuthenticationProvider { var err error if currentCluster != nil { kubeCluster.Certificates = currentCluster.Certificates } else { - kubeCluster.Certificates, err = pki.StartCertificatesGeneration( + kubeCluster.Certificates, err = pki.StartCertificatesGeneration(ctx, kubeCluster.ControlPlaneHosts, kubeCluster.WorkerHosts, kubeCluster.ClusterDomain, @@ -53,8 +55,8 @@ func regenerateAPICertificate(c *Cluster, certificates map[string]pki.Certificat return certificates, nil } -func getClusterCerts(kubeClient *kubernetes.Clientset) (map[string]pki.CertificatePKI, error) { - logrus.Infof("[certificates] Getting Cluster certificates from Kubernetes") +func getClusterCerts(ctx context.Context, kubeClient *kubernetes.Clientset) (map[string]pki.CertificatePKI, error) { + log.Infof(ctx, "[certificates] Getting Cluster certificates from Kubernetes") certificatesNames := []string{ pki.CACertName, pki.KubeAPICertName, @@ -82,19 +84,19 @@ func getClusterCerts(kubeClient *kubernetes.Clientset) (map[string]pki.Certifica KeyEnvName: string(secret.Data["KeyEnvName"]), } } - logrus.Infof("[certificates] Successfully fetched Cluster certificates from Kubernetes") + log.Infof(ctx, "[certificates] Successfully fetched Cluster certificates from Kubernetes") return certMap, nil } -func saveClusterCerts(kubeClient *kubernetes.Clientset, crts map[string]pki.CertificatePKI) error { - logrus.Infof("[certificates] Save kubernetes certificates as secrets") +func saveClusterCerts(ctx context.Context, kubeClient *kubernetes.Clientset, crts map[string]pki.CertificatePKI) error { + log.Infof(ctx, "[certificates] Save kubernetes certificates as secrets") for crtName, crt := range crts { err := saveCertToKubernetes(kubeClient, crtName, crt) if err != nil { return fmt.Errorf("Failed to save certificate [%s] to kubernetes: %v", crtName, err) } } - logrus.Infof("[certificates] Successfully saved certificates as kubernetes secret [%s]", pki.CertificatesSecretName) + log.Infof(ctx, "[certificates] Successfully saved certificates as kubernetes secret [%s]", pki.CertificatesSecretName) return nil } diff --git a/cluster/cluster.go b/cluster/cluster.go index c8482dbc..83fd595f 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -1,6 +1,7 @@ package cluster import ( + "context" "fmt" "net" "path/filepath" @@ -8,6 +9,7 @@ import ( "github.com/rancher/rke/authz" "github.com/rancher/rke/hosts" + "github.com/rancher/rke/log" "github.com/rancher/rke/pki" "github.com/rancher/rke/services" "github.com/rancher/types/apis/management.cattle.io/v3" @@ -52,13 +54,13 @@ const ( NoneAuthorizationMode = "none" ) -func (c *Cluster) DeployControlPlane() error { +func (c *Cluster) DeployControlPlane(ctx context.Context) error { // Deploy Etcd Plane - if err := services.RunEtcdPlane(c.EtcdHosts, c.Services.Etcd); err != nil { + if err := services.RunEtcdPlane(ctx, c.EtcdHosts, c.Services.Etcd); err != nil { return fmt.Errorf("[etcd] Failed to bring up Etcd Plane: %v", err) } // Deploy Control plane - if err := services.RunControlPlane(c.ControlPlaneHosts, + if err := services.RunControlPlane(ctx, c.ControlPlaneHosts, c.EtcdHosts, c.Services, c.SystemImages[ServiceSidekickImage], @@ -67,15 +69,15 @@ func (c *Cluster) DeployControlPlane() error { return fmt.Errorf("[controlPlane] Failed to bring up Control Plane: %v", err) } // Apply Authz configuration after deploying controlplane - if err := c.ApplyAuthzResources(); err != nil { + if err := c.ApplyAuthzResources(ctx); err != nil { return fmt.Errorf("[auths] Failed to apply RBAC resources: %v", err) } return nil } -func (c *Cluster) DeployWorkerPlane() error { +func (c *Cluster) DeployWorkerPlane(ctx context.Context) error { // Deploy Worker Plane - if err := services.RunWorkerPlane(c.ControlPlaneHosts, + if err := services.RunWorkerPlane(ctx, c.ControlPlaneHosts, c.WorkerHosts, c.Services, c.SystemImages[NginxProxyImage], @@ -95,7 +97,7 @@ func ParseConfig(clusterFile string) (*v3.RancherKubernetesEngineConfig, error) return &rkeConfig, nil } -func ParseCluster(rkeConfig *v3.RancherKubernetesEngineConfig, clusterFilePath string, dockerDialerFactory, healthcheckDialerFactory hosts.DialerFactory) (*Cluster, error) { +func ParseCluster(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, clusterFilePath string, dockerDialerFactory, healthcheckDialerFactory hosts.DialerFactory) (*Cluster, error) { var err error c := &Cluster{ RancherKubernetesEngineConfig: *rkeConfig, @@ -104,7 +106,7 @@ func ParseCluster(rkeConfig *v3.RancherKubernetesEngineConfig, clusterFilePath s HealthcheckDialerFactory: healthcheckDialerFactory, } // Setting cluster Defaults - c.setClusterDefaults() + c.setClusterDefaults(ctx) if err := c.InvertIndexHosts(); err != nil { return nil, fmt.Errorf("Failed to classify hosts from config file: %v", err) @@ -128,7 +130,7 @@ func ParseCluster(rkeConfig *v3.RancherKubernetesEngineConfig, clusterFilePath s return c, nil } -func (c *Cluster) setClusterDefaults() { +func (c *Cluster) setClusterDefaults(ctx context.Context) { if len(c.SSHKeyPath) == 0 { c.SSHKeyPath = DefaultClusterSSHKeyPath } @@ -148,7 +150,7 @@ func (c *Cluster) setClusterDefaults() { c.Authorization.Mode = DefaultAuthorizationMode } if c.Services.KubeAPI.PodSecurityPolicy && c.Authorization.Mode != services.RBACAuthorizationMode { - logrus.Warnf("PodSecurityPolicy can't be enabled with RBAC support disabled") + log.Warnf(ctx, "PodSecurityPolicy can't be enabled with RBAC support disabled") c.Services.KubeAPI.PodSecurityPolicy = false } c.setClusterServicesDefaults() @@ -204,8 +206,8 @@ func GetLocalKubeConfig(configPath string) string { return fmt.Sprintf("%s%s%s", baseDir, pki.KubeAdminConfigPrefix, fileName) } -func rebuildLocalAdminConfig(kubeCluster *Cluster) error { - logrus.Infof("[reconcile] Rebuilding and updating local kube config") +func rebuildLocalAdminConfig(ctx context.Context, kubeCluster *Cluster) error { + log.Infof(ctx, "[reconcile] Rebuilding and updating local kube config") var workingConfig, newConfig string currentKubeConfig := kubeCluster.Certificates[pki.KubeAdminCommonName] caCrt := kubeCluster.Certificates[pki.CACertName].Certificate @@ -220,12 +222,12 @@ func rebuildLocalAdminConfig(kubeCluster *Cluster) error { keyData := string(cert.EncodePrivateKeyPEM(currentKubeConfig.Key)) newConfig = pki.GetKubeConfigX509WithData(kubeURL, pki.KubeAdminCommonName, caData, crtData, keyData) } - if err := pki.DeployAdminConfig(newConfig, kubeCluster.LocalKubeConfigPath); err != nil { + if err := pki.DeployAdminConfig(ctx, newConfig, kubeCluster.LocalKubeConfigPath); err != nil { return fmt.Errorf("Failed to redeploy local admin config with new host") } workingConfig = newConfig if _, err := GetK8sVersion(kubeCluster.LocalKubeConfigPath); err == nil { - logrus.Infof("[reconcile] host [%s] is active master on the cluster", cpHost.Address) + log.Infof(ctx, "[reconcile] host [%s] is active master on the cluster", cpHost.Address) break } } @@ -234,9 +236,9 @@ func rebuildLocalAdminConfig(kubeCluster *Cluster) error { return nil } -func isLocalConfigWorking(localKubeConfigPath string) bool { +func isLocalConfigWorking(ctx context.Context, localKubeConfigPath string) bool { if _, err := GetK8sVersion(localKubeConfigPath); err != nil { - logrus.Infof("[reconcile] Local config is not vaild, rebuilding admin config") + log.Infof(ctx, "[reconcile] Local config is not vaild, rebuilding admin config") return false } return true @@ -263,23 +265,23 @@ func getLocalAdminConfigWithNewAddress(localConfigPath, cpAddress string) string string(config.KeyData)) } -func (c *Cluster) ApplyAuthzResources() error { - if err := authz.ApplyJobDeployerServiceAccount(c.LocalKubeConfigPath); err != nil { +func (c *Cluster) ApplyAuthzResources(ctx context.Context) error { + if err := authz.ApplyJobDeployerServiceAccount(ctx, c.LocalKubeConfigPath); err != nil { return fmt.Errorf("Failed to apply the ServiceAccount needed for job execution: %v", err) } if c.Authorization.Mode == NoneAuthorizationMode { return nil } if c.Authorization.Mode == services.RBACAuthorizationMode { - if err := authz.ApplySystemNodeClusterRoleBinding(c.LocalKubeConfigPath); err != nil { + if err := authz.ApplySystemNodeClusterRoleBinding(ctx, c.LocalKubeConfigPath); err != nil { return fmt.Errorf("Failed to apply the ClusterRoleBinding needed for node authorization: %v", err) } } if c.Authorization.Mode == services.RBACAuthorizationMode && c.Services.KubeAPI.PodSecurityPolicy { - if err := authz.ApplyDefaultPodSecurityPolicy(c.LocalKubeConfigPath); err != nil { + if err := authz.ApplyDefaultPodSecurityPolicy(ctx, c.LocalKubeConfigPath); err != nil { return fmt.Errorf("Failed to apply default PodSecurityPolicy: %v", err) } - if err := authz.ApplyDefaultPodSecurityPolicyRole(c.LocalKubeConfigPath); err != nil { + if err := authz.ApplyDefaultPodSecurityPolicyRole(ctx, c.LocalKubeConfigPath); err != nil { return fmt.Errorf("Failed to apply default PodSecurityPolicy ClusterRole and ClusterRoleBinding: %v", err) } } diff --git a/cluster/hosts.go b/cluster/hosts.go index ec3c6aa5..81a780bd 100644 --- a/cluster/hosts.go +++ b/cluster/hosts.go @@ -3,26 +3,29 @@ package cluster import ( "fmt" + "context" + "github.com/rancher/rke/hosts" + "github.com/rancher/rke/log" "github.com/rancher/rke/pki" "github.com/rancher/rke/services" "github.com/sirupsen/logrus" ) -func (c *Cluster) TunnelHosts() error { +func (c *Cluster) TunnelHosts(ctx context.Context) error { for i := range c.EtcdHosts { - if err := c.EtcdHosts[i].TunnelUp(c.DockerDialerFactory); err != nil { + if err := c.EtcdHosts[i].TunnelUp(ctx, c.DockerDialerFactory); err != nil { return fmt.Errorf("Failed to set up SSH tunneling for Etcd host [%s]: %v", c.EtcdHosts[i].Address, err) } } for i := range c.ControlPlaneHosts { - err := c.ControlPlaneHosts[i].TunnelUp(c.DockerDialerFactory) + err := c.ControlPlaneHosts[i].TunnelUp(ctx, c.DockerDialerFactory) if err != nil { return fmt.Errorf("Failed to set up SSH tunneling for Control host [%s]: %v", c.ControlPlaneHosts[i].Address, err) } } for i := range c.WorkerHosts { - if err := c.WorkerHosts[i].TunnelUp(c.DockerDialerFactory); err != nil { + if err := c.WorkerHosts[i].TunnelUp(ctx, c.DockerDialerFactory); err != nil { return fmt.Errorf("Failed to set up SSH tunneling for Worker host [%s]: %v", c.WorkerHosts[i].Address, err) } } @@ -59,22 +62,22 @@ func (c *Cluster) InvertIndexHosts() error { return nil } -func (c *Cluster) SetUpHosts() error { +func (c *Cluster) SetUpHosts(ctx context.Context) error { if c.Authentication.Strategy == X509AuthenticationProvider { - logrus.Infof("[certificates] Deploying kubernetes certificates to Cluster nodes") - err := pki.DeployCertificatesOnMasters(c.ControlPlaneHosts, c.Certificates, c.SystemImages[CertDownloaderImage]) + log.Infof(ctx, "[certificates] Deploying kubernetes certificates to Cluster nodes") + err := pki.DeployCertificatesOnMasters(ctx, c.ControlPlaneHosts, c.Certificates, c.SystemImages[CertDownloaderImage]) if err != nil { return err } - err = pki.DeployCertificatesOnWorkers(c.WorkerHosts, c.Certificates, c.SystemImages[CertDownloaderImage]) + err = pki.DeployCertificatesOnWorkers(ctx, c.WorkerHosts, c.Certificates, c.SystemImages[CertDownloaderImage]) if err != nil { return err } - err = pki.DeployAdminConfig(c.Certificates[pki.KubeAdminCommonName].Config, c.LocalKubeConfigPath) + err = pki.DeployAdminConfig(ctx, c.Certificates[pki.KubeAdminCommonName].Config, c.LocalKubeConfigPath) if err != nil { return err } - logrus.Infof("[certificates] Successfully deployed kubernetes certificates to Cluster nodes") + log.Infof(ctx, "[certificates] Successfully deployed kubernetes certificates to Cluster nodes") } return nil } diff --git a/cluster/network.go b/cluster/network.go index 15c1495f..a759cf52 100644 --- a/cluster/network.go +++ b/cluster/network.go @@ -1,12 +1,13 @@ package cluster import ( + "context" "fmt" + "github.com/rancher/rke/log" "github.com/rancher/rke/pki" "github.com/rancher/rke/services" "github.com/rancher/rke/templates" - "github.com/sirupsen/logrus" ) const ( @@ -63,23 +64,23 @@ const ( RBACConfig = "RBACConfig" ) -func (c *Cluster) DeployNetworkPlugin() error { - logrus.Infof("[network] Setting up network plugin: %s", c.Network.Plugin) +func (c *Cluster) DeployNetworkPlugin(ctx context.Context) error { + log.Infof(ctx, "[network] Setting up network plugin: %s", c.Network.Plugin) switch c.Network.Plugin { case FlannelNetworkPlugin: - return c.doFlannelDeploy() + return c.doFlannelDeploy(ctx) case CalicoNetworkPlugin: - return c.doCalicoDeploy() + return c.doCalicoDeploy(ctx) case CanalNetworkPlugin: - return c.doCanalDeploy() + return c.doCanalDeploy(ctx) case WeaveNetworkPlugin: - return c.doWeaveDeploy() + return c.doWeaveDeploy(ctx) default: return fmt.Errorf("[network] Unsupported network plugin: %s", c.Network.Plugin) } } -func (c *Cluster) doFlannelDeploy() error { +func (c *Cluster) doFlannelDeploy(ctx context.Context) error { flannelConfig := map[string]string{ ClusterCIDR: c.ClusterCIDR, Image: c.Network.Options[FlannelImage], @@ -91,10 +92,10 @@ func (c *Cluster) doFlannelDeploy() error { if err != nil { return err } - return c.doAddonDeploy(pluginYaml, NetworkPluginResourceName) + return c.doAddonDeploy(ctx, pluginYaml, NetworkPluginResourceName) } -func (c *Cluster) doCalicoDeploy() error { +func (c *Cluster) doCalicoDeploy(ctx context.Context) error { calicoConfig := map[string]string{ EtcdEndpoints: services.GetEtcdConnString(c.EtcdHosts), APIRoot: "https://127.0.0.1:6443", @@ -114,10 +115,10 @@ func (c *Cluster) doCalicoDeploy() error { if err != nil { return err } - return c.doAddonDeploy(pluginYaml, NetworkPluginResourceName) + return c.doAddonDeploy(ctx, pluginYaml, NetworkPluginResourceName) } -func (c *Cluster) doCanalDeploy() error { +func (c *Cluster) doCanalDeploy(ctx context.Context) error { canalConfig := map[string]string{ ClientCert: pki.KubeNodeCertPath, APIRoot: "https://127.0.0.1:6443", @@ -134,10 +135,10 @@ func (c *Cluster) doCanalDeploy() error { if err != nil { return err } - return c.doAddonDeploy(pluginYaml, NetworkPluginResourceName) + return c.doAddonDeploy(ctx, pluginYaml, NetworkPluginResourceName) } -func (c *Cluster) doWeaveDeploy() error { +func (c *Cluster) doWeaveDeploy(ctx context.Context) error { weaveConfig := map[string]string{ ClusterCIDR: c.ClusterCIDR, Image: c.Network.Options[WeaveImage], @@ -148,7 +149,7 @@ func (c *Cluster) doWeaveDeploy() error { if err != nil { return err } - return c.doAddonDeploy(pluginYaml, NetworkPluginResourceName) + return c.doAddonDeploy(ctx, pluginYaml, NetworkPluginResourceName) } func (c *Cluster) setClusterNetworkDefaults() { diff --git a/cluster/reconcile.go b/cluster/reconcile.go index 19cff21e..1b5d6d9d 100644 --- a/cluster/reconcile.go +++ b/cluster/reconcile.go @@ -1,19 +1,21 @@ package cluster import ( + "context" "fmt" "github.com/rancher/rke/hosts" "github.com/rancher/rke/k8s" + "github.com/rancher/rke/log" "github.com/rancher/rke/services" "github.com/sirupsen/logrus" "k8s.io/client-go/kubernetes" ) -func ReconcileCluster(kubeCluster, currentCluster *Cluster) error { - logrus.Infof("[reconcile] Reconciling cluster state") +func ReconcileCluster(ctx context.Context, kubeCluster, currentCluster *Cluster) error { + log.Infof(ctx, "[reconcile] Reconciling cluster state") if currentCluster == nil { - logrus.Infof("[reconcile] This is newly generated cluster") + log.Infof(ctx, "[reconcile] This is newly generated cluster") return nil } @@ -22,36 +24,36 @@ func ReconcileCluster(kubeCluster, currentCluster *Cluster) error { return fmt.Errorf("Failed to initialize new kubernetes client: %v", err) } - if err := reconcileWorker(currentCluster, kubeCluster, kubeClient); err != nil { + if err := reconcileWorker(ctx, currentCluster, kubeCluster, kubeClient); err != nil { return err } - if err := reconcileControl(currentCluster, kubeCluster, kubeClient); err != nil { + if err := reconcileControl(ctx, currentCluster, kubeCluster, kubeClient); err != nil { return err } - logrus.Infof("[reconcile] Reconciled cluster state successfully") + log.Infof(ctx, "[reconcile] Reconciled cluster state successfully") return nil } -func reconcileWorker(currentCluster, kubeCluster *Cluster, kubeClient *kubernetes.Clientset) error { +func reconcileWorker(ctx context.Context, currentCluster, kubeCluster *Cluster, kubeClient *kubernetes.Clientset) error { // worker deleted first to avoid issues when worker+controller on same host logrus.Debugf("[reconcile] Check worker hosts to be deleted") wpToDelete := hosts.GetToDeleteHosts(currentCluster.WorkerHosts, kubeCluster.WorkerHosts) for _, toDeleteHost := range wpToDelete { toDeleteHost.IsWorker = false - if err := hosts.DeleteNode(toDeleteHost, kubeClient, toDeleteHost.IsControl); err != nil { + if err := hosts.DeleteNode(ctx, toDeleteHost, kubeClient, toDeleteHost.IsControl); err != nil { return fmt.Errorf("Failed to delete worker node %s from cluster", toDeleteHost.Address) } // attempting to clean services/files on the host - if err := reconcileHost(toDeleteHost, true, currentCluster.SystemImages[AplineImage], currentCluster.DockerDialerFactory); err != nil { - logrus.Warnf("[reconcile] Couldn't clean up worker node [%s]: %v", toDeleteHost.Address, err) + if err := reconcileHost(ctx, toDeleteHost, true, currentCluster.SystemImages[AplineImage], currentCluster.DockerDialerFactory); err != nil { + log.Warnf(ctx, "[reconcile] Couldn't clean up worker node [%s]: %v", toDeleteHost.Address, err) continue } } return nil } -func reconcileControl(currentCluster, kubeCluster *Cluster, kubeClient *kubernetes.Clientset) error { +func reconcileControl(ctx context.Context, currentCluster, kubeCluster *Cluster, kubeClient *kubernetes.Clientset) error { logrus.Debugf("[reconcile] Check Control plane hosts to be deleted") selfDeleteAddress, err := getLocalConfigAddress(kubeCluster.LocalKubeConfigPath) if err != nil { @@ -71,24 +73,24 @@ func reconcileControl(currentCluster, kubeCluster *Cluster, kubeClient *kubernet if err != nil { return fmt.Errorf("Failed to initialize new kubernetes client: %v", err) } - if err := hosts.DeleteNode(toDeleteHost, kubeClient, toDeleteHost.IsWorker); err != nil { + if err := hosts.DeleteNode(ctx, toDeleteHost, kubeClient, toDeleteHost.IsWorker); err != nil { return fmt.Errorf("Failed to delete controlplane node %s from cluster", toDeleteHost.Address) } // attempting to clean services/files on the host - if err := reconcileHost(toDeleteHost, false, currentCluster.SystemImages[AplineImage], currentCluster.DockerDialerFactory); err != nil { - logrus.Warnf("[reconcile] Couldn't clean up controlplane node [%s]: %v", toDeleteHost.Address, err) + if err := reconcileHost(ctx, toDeleteHost, false, currentCluster.SystemImages[AplineImage], currentCluster.DockerDialerFactory); err != nil { + log.Warnf(ctx, "[reconcile] Couldn't clean up controlplane node [%s]: %v", toDeleteHost.Address, err) continue } } // rebuilding local admin config to enable saving cluster state - if err := rebuildLocalAdminConfig(kubeCluster); err != nil { + if err := rebuildLocalAdminConfig(ctx, kubeCluster); err != nil { return err } // Rolling update on change for nginx Proxy cpChanged := hosts.IsHostListChanged(currentCluster.ControlPlaneHosts, kubeCluster.ControlPlaneHosts) if cpChanged { - logrus.Infof("[reconcile] Rolling update nginx hosts with new list of control plane hosts") - err := services.RollingUpdateNginxProxy(kubeCluster.ControlPlaneHosts, kubeCluster.WorkerHosts, currentCluster.SystemImages[NginxProxyImage]) + log.Infof(ctx, "[reconcile] Rolling update nginx hosts with new list of control plane hosts") + err := services.RollingUpdateNginxProxy(ctx, kubeCluster.ControlPlaneHosts, kubeCluster.WorkerHosts, currentCluster.SystemImages[NginxProxyImage]) if err != nil { return fmt.Errorf("Failed to rolling update Nginx hosts with new control plane hosts") } @@ -96,22 +98,22 @@ func reconcileControl(currentCluster, kubeCluster *Cluster, kubeClient *kubernet return nil } -func reconcileHost(toDeleteHost *hosts.Host, worker bool, cleanerImage string, dialerFactory hosts.DialerFactory) error { - if err := toDeleteHost.TunnelUp(dialerFactory); err != nil { +func reconcileHost(ctx context.Context, toDeleteHost *hosts.Host, worker bool, cleanerImage string, dialerFactory hosts.DialerFactory) error { + if err := toDeleteHost.TunnelUp(ctx, dialerFactory); err != nil { return fmt.Errorf("Not able to reach the host: %v", err) } if worker { - if err := services.RemoveWorkerPlane([]*hosts.Host{toDeleteHost}, false); err != nil { + if err := services.RemoveWorkerPlane(ctx, []*hosts.Host{toDeleteHost}, false); err != nil { return fmt.Errorf("Couldn't remove worker plane: %v", err) } - if err := toDeleteHost.CleanUpWorkerHost(services.ControlRole, cleanerImage); err != nil { + if err := toDeleteHost.CleanUpWorkerHost(ctx, services.ControlRole, cleanerImage); err != nil { return fmt.Errorf("Not able to clean the host: %v", err) } } else { - if err := services.RemoveControlPlane([]*hosts.Host{toDeleteHost}, false); err != nil { + if err := services.RemoveControlPlane(ctx, []*hosts.Host{toDeleteHost}, false); err != nil { return fmt.Errorf("Couldn't remove control plane: %v", err) } - if err := toDeleteHost.CleanUpControlHost(services.WorkerRole, cleanerImage); err != nil { + if err := toDeleteHost.CleanUpControlHost(ctx, services.WorkerRole, cleanerImage); err != nil { return fmt.Errorf("Not able to clean the host: %v", err) } } diff --git a/cluster/remove.go b/cluster/remove.go index 5aa58246..089b33ca 100644 --- a/cluster/remove.go +++ b/cluster/remove.go @@ -1,44 +1,46 @@ package cluster import ( + "context" + "github.com/rancher/rke/hosts" "github.com/rancher/rke/pki" "github.com/rancher/rke/services" ) -func (c *Cluster) ClusterRemove() error { +func (c *Cluster) ClusterRemove(ctx context.Context) error { // Remove Worker Plane - if err := services.RemoveWorkerPlane(c.WorkerHosts, true); err != nil { + if err := services.RemoveWorkerPlane(ctx, c.WorkerHosts, true); err != nil { return err } // Remove Contol Plane - if err := services.RemoveControlPlane(c.ControlPlaneHosts, true); err != nil { + if err := services.RemoveControlPlane(ctx, c.ControlPlaneHosts, true); err != nil { return err } // Remove Etcd Plane - if err := services.RemoveEtcdPlane(c.EtcdHosts); err != nil { + if err := services.RemoveEtcdPlane(ctx, c.EtcdHosts); err != nil { return err } // Clean up all hosts - if err := cleanUpHosts(c.ControlPlaneHosts, c.WorkerHosts, c.EtcdHosts, c.SystemImages[AplineImage]); err != nil { + if err := cleanUpHosts(ctx, c.ControlPlaneHosts, c.WorkerHosts, c.EtcdHosts, c.SystemImages[AplineImage]); err != nil { return err } - pki.RemoveAdminConfig(c.LocalKubeConfigPath) + pki.RemoveAdminConfig(ctx, c.LocalKubeConfigPath) return nil } -func cleanUpHosts(cpHosts, workerHosts, etcdHosts []*hosts.Host, cleanerImage string) error { +func cleanUpHosts(ctx context.Context, cpHosts, workerHosts, etcdHosts []*hosts.Host, cleanerImage string) error { allHosts := []*hosts.Host{} allHosts = append(allHosts, cpHosts...) allHosts = append(allHosts, workerHosts...) allHosts = append(allHosts, etcdHosts...) for _, host := range allHosts { - if err := host.CleanUpAll(cleanerImage); err != nil { + if err := host.CleanUpAll(ctx, cleanerImage); err != nil { return err } } diff --git a/cluster/state.go b/cluster/state.go index 2dfe8118..bb3cf1b7 100644 --- a/cluster/state.go +++ b/cluster/state.go @@ -1,47 +1,49 @@ package cluster import ( + "context" "fmt" "os" "time" "github.com/rancher/rke/k8s" + "github.com/rancher/rke/log" "github.com/rancher/types/apis/management.cattle.io/v3" "github.com/sirupsen/logrus" - yaml "gopkg.in/yaml.v2" + "gopkg.in/yaml.v2" "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" ) -func (c *Cluster) SaveClusterState(rkeConfig *v3.RancherKubernetesEngineConfig) error { +func (c *Cluster) SaveClusterState(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig) error { // Reinitialize kubernetes Client var err error c.KubeClient, err = k8s.NewClient(c.LocalKubeConfigPath) if err != nil { return fmt.Errorf("Failed to re-initialize Kubernetes Client: %v", err) } - err = saveClusterCerts(c.KubeClient, c.Certificates) + err = saveClusterCerts(ctx, c.KubeClient, c.Certificates) if err != nil { return fmt.Errorf("[certificates] Failed to Save Kubernetes certificates: %v", err) } - err = saveStateToKubernetes(c.KubeClient, c.LocalKubeConfigPath, rkeConfig) + err = saveStateToKubernetes(ctx, c.KubeClient, c.LocalKubeConfigPath, rkeConfig) if err != nil { return fmt.Errorf("[state] Failed to save configuration state: %v", err) } return nil } -func (c *Cluster) GetClusterState() (*Cluster, error) { +func (c *Cluster) GetClusterState(ctx context.Context) (*Cluster, error) { var err error var currentCluster *Cluster // check if local kubeconfig file exists if _, err = os.Stat(c.LocalKubeConfigPath); !os.IsNotExist(err) { - logrus.Infof("[state] Found local kube config file, trying to get state from cluster") + log.Infof(ctx, "[state] Found local kube config file, trying to get state from cluster") // to handle if current local admin is down and we need to use new cp from the list - if !isLocalConfigWorking(c.LocalKubeConfigPath) { - if err := rebuildLocalAdminConfig(c); err != nil { + if !isLocalConfigWorking(ctx, c.LocalKubeConfigPath) { + if err := rebuildLocalAdminConfig(ctx, c); err != nil { return nil, err } } @@ -49,20 +51,20 @@ func (c *Cluster) GetClusterState() (*Cluster, error) { // initiate kubernetes client c.KubeClient, err = k8s.NewClient(c.LocalKubeConfigPath) if err != nil { - logrus.Warnf("Failed to initiate new Kubernetes Client: %v", err) + log.Warnf(ctx, "Failed to initiate new Kubernetes Client: %v", err) return nil, nil } // Get previous kubernetes state - currentCluster = getStateFromKubernetes(c.KubeClient, c.LocalKubeConfigPath) + currentCluster = getStateFromKubernetes(ctx, c.KubeClient, c.LocalKubeConfigPath) // Get previous kubernetes certificates if currentCluster != nil { - currentCluster.Certificates, err = getClusterCerts(c.KubeClient) + currentCluster.Certificates, err = getClusterCerts(ctx, c.KubeClient) currentCluster.DockerDialerFactory = c.DockerDialerFactory if err != nil { return nil, fmt.Errorf("Failed to Get Kubernetes certificates: %v", err) } // setting cluster defaults for the fetched cluster as well - currentCluster.setClusterDefaults() + currentCluster.setClusterDefaults(ctx) if err := currentCluster.InvertIndexHosts(); err != nil { return nil, fmt.Errorf("Failed to classify hosts from fetched cluster: %v", err) @@ -76,8 +78,8 @@ func (c *Cluster) GetClusterState() (*Cluster, error) { return currentCluster, nil } -func saveStateToKubernetes(kubeClient *kubernetes.Clientset, kubeConfigPath string, rkeConfig *v3.RancherKubernetesEngineConfig) error { - logrus.Infof("[state] Saving cluster state to Kubernetes") +func saveStateToKubernetes(ctx context.Context, kubeClient *kubernetes.Clientset, kubeConfigPath string, rkeConfig *v3.RancherKubernetesEngineConfig) error { + log.Infof(ctx, "[state] Saving cluster state to Kubernetes") clusterFile, err := yaml.Marshal(*rkeConfig) if err != nil { return err @@ -90,7 +92,7 @@ func saveStateToKubernetes(kubeClient *kubernetes.Clientset, kubeConfigPath stri time.Sleep(time.Second * 5) continue } - logrus.Infof("[state] Successfully Saved cluster state to Kubernetes ConfigMap: %s", StateConfigMapName) + log.Infof(ctx, "[state] Successfully Saved cluster state to Kubernetes ConfigMap: %s", StateConfigMapName) timeout <- true break } @@ -103,8 +105,8 @@ func saveStateToKubernetes(kubeClient *kubernetes.Clientset, kubeConfigPath stri } } -func getStateFromKubernetes(kubeClient *kubernetes.Clientset, kubeConfigPath string) *Cluster { - logrus.Infof("[state] Fetching cluster state from Kubernetes") +func getStateFromKubernetes(ctx context.Context, kubeClient *kubernetes.Clientset, kubeConfigPath string) *Cluster { + log.Infof(ctx, "[state] Fetching cluster state from Kubernetes") var cfgMap *v1.ConfigMap var currentCluster Cluster var err error @@ -116,7 +118,7 @@ func getStateFromKubernetes(kubeClient *kubernetes.Clientset, kubeConfigPath str time.Sleep(time.Second * 5) continue } - logrus.Infof("[state] Successfully Fetched cluster state to Kubernetes ConfigMap: %s", StateConfigMapName) + log.Infof(ctx, "[state] Successfully Fetched cluster state to Kubernetes ConfigMap: %s", StateConfigMapName) timeout <- true break } @@ -130,7 +132,7 @@ func getStateFromKubernetes(kubeClient *kubernetes.Clientset, kubeConfigPath str } return ¤tCluster case <-time.After(time.Second * GetStateTimeout): - logrus.Infof("Timed out waiting for kubernetes cluster to get state") + log.Infof(ctx, "Timed out waiting for kubernetes cluster to get state") return nil } } diff --git a/cmd/remove.go b/cmd/remove.go index dc649b21..3a8dead3 100644 --- a/cmd/remove.go +++ b/cmd/remove.go @@ -2,12 +2,14 @@ package cmd import ( "bufio" + "context" "fmt" "os" "strings" "github.com/rancher/rke/cluster" "github.com/rancher/rke/hosts" + "github.com/rancher/rke/log" "github.com/rancher/types/apis/management.cattle.io/v3" "github.com/sirupsen/logrus" "github.com/urfave/cli" @@ -34,25 +36,25 @@ func RemoveCommand() cli.Command { } } -func ClusterRemove(rkeConfig *v3.RancherKubernetesEngineConfig, dialerFactory hosts.DialerFactory) error { - logrus.Infof("Tearing down Kubernetes cluster") - kubeCluster, err := cluster.ParseCluster(rkeConfig, clusterFilePath, dialerFactory, nil) +func ClusterRemove(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, dialerFactory hosts.DialerFactory) error { + log.Infof(ctx, "Tearing down Kubernetes cluster") + kubeCluster, err := cluster.ParseCluster(ctx, rkeConfig, clusterFilePath, dialerFactory, nil) if err != nil { return err } - err = kubeCluster.TunnelHosts() + err = kubeCluster.TunnelHosts(ctx) if err != nil { return err } logrus.Debugf("Starting Cluster removal") - err = kubeCluster.ClusterRemove() + err = kubeCluster.ClusterRemove(ctx) if err != nil { return err } - logrus.Infof("Cluster removed successfully") + log.Infof(ctx, "Cluster removed successfully") return nil } @@ -80,5 +82,5 @@ func clusterRemoveFromCli(ctx *cli.Context) error { if err != nil { return fmt.Errorf("Failed to parse cluster file: %v", err) } - return ClusterRemove(rkeConfig, nil) + return ClusterRemove(context.Background(), rkeConfig, nil) } diff --git a/cmd/up.go b/cmd/up.go index 81a044be..dc24a00f 100644 --- a/cmd/up.go +++ b/cmd/up.go @@ -1,13 +1,14 @@ package cmd import ( + "context" "fmt" "github.com/rancher/rke/cluster" "github.com/rancher/rke/hosts" + "github.com/rancher/rke/log" "github.com/rancher/rke/pki" "github.com/rancher/types/apis/management.cattle.io/v3" - "github.com/sirupsen/logrus" "github.com/urfave/cli" "k8s.io/client-go/util/cert" ) @@ -31,20 +32,20 @@ func UpCommand() cli.Command { } } -func ClusterUp(rkeConfig *v3.RancherKubernetesEngineConfig, dockerDialerFactory, healthcheckDialerFactory hosts.DialerFactory) (string, string, string, string, error) { - logrus.Infof("Building Kubernetes cluster") +func ClusterUp(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, dockerDialerFactory, healthcheckDialerFactory hosts.DialerFactory) (string, string, string, string, error) { + log.Infof(ctx, "Building Kubernetes cluster") var APIURL, caCrt, clientCert, clientKey string - kubeCluster, err := cluster.ParseCluster(rkeConfig, clusterFilePath, dockerDialerFactory, healthcheckDialerFactory) + kubeCluster, err := cluster.ParseCluster(ctx, rkeConfig, clusterFilePath, dockerDialerFactory, healthcheckDialerFactory) if err != nil { return APIURL, caCrt, clientCert, clientKey, err } - err = kubeCluster.TunnelHosts() + err = kubeCluster.TunnelHosts(ctx) if err != nil { return APIURL, caCrt, clientCert, clientKey, err } - currentCluster, err := kubeCluster.GetClusterState() + currentCluster, err := kubeCluster.GetClusterState(ctx) if err != nil { return APIURL, caCrt, clientCert, clientKey, err } @@ -53,47 +54,47 @@ func ClusterUp(rkeConfig *v3.RancherKubernetesEngineConfig, dockerDialerFactory, return APIURL, caCrt, clientCert, clientKey, err } - err = cluster.SetUpAuthentication(kubeCluster, currentCluster) + err = cluster.SetUpAuthentication(ctx, kubeCluster, currentCluster) if err != nil { return APIURL, caCrt, clientCert, clientKey, err } - err = cluster.ReconcileCluster(kubeCluster, currentCluster) + err = cluster.ReconcileCluster(ctx, kubeCluster, currentCluster) if err != nil { return APIURL, caCrt, clientCert, clientKey, err } - err = kubeCluster.SetUpHosts() + err = kubeCluster.SetUpHosts(ctx) if err != nil { return APIURL, caCrt, clientCert, clientKey, err } - err = kubeCluster.DeployControlPlane() + err = kubeCluster.DeployControlPlane(ctx) if err != nil { return APIURL, caCrt, clientCert, clientKey, err } - err = kubeCluster.SaveClusterState(rkeConfig) + err = kubeCluster.SaveClusterState(ctx, rkeConfig) if err != nil { return APIURL, caCrt, clientCert, clientKey, err } - err = kubeCluster.DeployWorkerPlane() + err = kubeCluster.DeployWorkerPlane(ctx) if err != nil { return APIURL, caCrt, clientCert, clientKey, err } - err = kubeCluster.DeployNetworkPlugin() + err = kubeCluster.DeployNetworkPlugin(ctx) if err != nil { return APIURL, caCrt, clientCert, clientKey, err } - err = kubeCluster.DeployK8sAddOns() + err = kubeCluster.DeployK8sAddOns(ctx) if err != nil { return APIURL, caCrt, clientCert, clientKey, err } - err = kubeCluster.DeployUserAddOns() + err = kubeCluster.DeployUserAddOns(ctx) if err != nil { return APIURL, caCrt, clientCert, clientKey, err } @@ -103,7 +104,7 @@ func ClusterUp(rkeConfig *v3.RancherKubernetesEngineConfig, dockerDialerFactory, clientCert = string(cert.EncodeCertPEM(kubeCluster.Certificates[pki.KubeAdminCommonName].Certificate)) clientKey = string(cert.EncodePrivateKeyPEM(kubeCluster.Certificates[pki.KubeAdminCommonName].Key)) - logrus.Infof("Finished building Kubernetes cluster successfully") + log.Infof(ctx, "Finished building Kubernetes cluster successfully") return APIURL, caCrt, clientCert, clientKey, nil } @@ -118,6 +119,6 @@ func clusterUpFromCli(ctx *cli.Context) error { if err != nil { return fmt.Errorf("Failed to parse cluster file: %v", err) } - _, _, _, _, err = ClusterUp(rkeConfig, nil, nil) + _, _, _, _, err = ClusterUp(context.Background(), rkeConfig, nil, nil) return err } diff --git a/docker/docker.go b/docker/docker.go index a2ae7ee8..086966ca 100644 --- a/docker/docker.go +++ b/docker/docker.go @@ -12,6 +12,7 @@ import ( "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/client" + "github.com/rancher/rke/log" "github.com/sirupsen/logrus" ) @@ -19,101 +20,101 @@ var K8sDockerVersions = map[string][]string{ "1.8": {"1.12.6", "1.13.1", "17.03.2"}, } -func DoRunContainer(dClient *client.Client, imageCfg *container.Config, hostCfg *container.HostConfig, containerName string, hostname string, plane string) error { - isRunning, err := IsContainerRunning(dClient, hostname, containerName, false) +func DoRunContainer(ctx context.Context, dClient *client.Client, imageCfg *container.Config, hostCfg *container.HostConfig, containerName string, hostname string, plane string) error { + isRunning, err := IsContainerRunning(ctx, dClient, hostname, containerName, false) if err != nil { return err } if isRunning { - logrus.Infof("[%s] Container [%s] is already running on host [%s]", plane, containerName, hostname) - isUpgradable, err := IsContainerUpgradable(dClient, imageCfg, containerName, hostname, plane) + log.Infof(ctx, "[%s] Container [%s] is already running on host [%s]", plane, containerName, hostname) + isUpgradable, err := IsContainerUpgradable(ctx, dClient, imageCfg, containerName, hostname, plane) if err != nil { return err } if isUpgradable { - return DoRollingUpdateContainer(dClient, imageCfg, hostCfg, containerName, hostname, plane) + return DoRollingUpdateContainer(ctx, dClient, imageCfg, hostCfg, containerName, hostname, plane) } return nil } - err = UseLocalOrPull(dClient, hostname, imageCfg.Image, plane) + err = UseLocalOrPull(ctx, dClient, hostname, imageCfg.Image, plane) if err != nil { return err } - resp, err := dClient.ContainerCreate(context.Background(), imageCfg, hostCfg, nil, containerName) + resp, err := dClient.ContainerCreate(ctx, imageCfg, hostCfg, nil, containerName) if err != nil { return fmt.Errorf("Failed to create [%s] container on host [%s]: %v", containerName, hostname, err) } - if err := dClient.ContainerStart(context.Background(), resp.ID, types.ContainerStartOptions{}); err != nil { + if err := dClient.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil { return fmt.Errorf("Failed to start [%s] container on host [%s]: %v", containerName, hostname, err) } logrus.Debugf("[%s] Successfully started [%s] container: [%s]", plane, containerName, resp.ID) - logrus.Infof("[%s] Successfully started [%s] container on host [%s]", plane, containerName, hostname) + log.Infof(ctx, "[%s] Successfully started [%s] container on host [%s]", plane, containerName, hostname) return nil } -func DoRollingUpdateContainer(dClient *client.Client, imageCfg *container.Config, hostCfg *container.HostConfig, containerName, hostname, plane string) error { +func DoRollingUpdateContainer(ctx context.Context, dClient *client.Client, imageCfg *container.Config, hostCfg *container.HostConfig, containerName, hostname, plane string) error { logrus.Debugf("[%s] Checking for deployed [%s]", plane, containerName) - isRunning, err := IsContainerRunning(dClient, hostname, containerName, false) + isRunning, err := IsContainerRunning(ctx, dClient, hostname, containerName, false) if err != nil { return err } if !isRunning { - logrus.Infof("[%s] Container %s is not running on host [%s]", plane, containerName, hostname) + log.Infof(ctx, "[%s] Container %s is not running on host [%s]", plane, containerName, hostname) return nil } - err = UseLocalOrPull(dClient, hostname, imageCfg.Image, plane) + err = UseLocalOrPull(ctx, dClient, hostname, imageCfg.Image, plane) if err != nil { return err } logrus.Debugf("[%s] Stopping old container", plane) oldContainerName := "old-" + containerName - if err := StopRenameContainer(dClient, hostname, containerName, oldContainerName); err != nil { + if err := StopRenameContainer(ctx, dClient, hostname, containerName, oldContainerName); err != nil { return err } - logrus.Infof("[%s] Successfully stopped old container %s on host [%s]", plane, containerName, hostname) - _, err = CreateContiner(dClient, hostname, containerName, imageCfg, hostCfg) + log.Infof(ctx, "[%s] Successfully stopped old container %s on host [%s]", plane, containerName, hostname) + _, err = CreateContiner(ctx, dClient, hostname, containerName, imageCfg, hostCfg) if err != nil { return fmt.Errorf("Failed to create [%s] container on host [%s]: %v", containerName, hostname, err) } - if err := StartContainer(dClient, hostname, containerName); err != nil { + if err := StartContainer(ctx, dClient, hostname, containerName); err != nil { return fmt.Errorf("Failed to start [%s] container on host [%s]: %v", containerName, hostname, err) } - logrus.Infof("[%s] Successfully updated [%s] container on host [%s]", plane, containerName, hostname) + log.Infof(ctx, "[%s] Successfully updated [%s] container on host [%s]", plane, containerName, hostname) logrus.Debugf("[%s] Removing old container", plane) - err = RemoveContainer(dClient, hostname, oldContainerName) + err = RemoveContainer(ctx, dClient, hostname, oldContainerName) return err } -func DoRemoveContainer(dClient *client.Client, containerName, hostname string) error { - logrus.Infof("[remove/%s] Checking if container is running on host [%s]", containerName, hostname) +func DoRemoveContainer(ctx context.Context, dClient *client.Client, containerName, hostname string) error { + log.Infof(ctx, "[remove/%s] Checking if container is running on host [%s]", containerName, hostname) // not using the wrapper to check if the error is a NotFound error - _, err := dClient.ContainerInspect(context.Background(), containerName) + _, err := dClient.ContainerInspect(ctx, containerName) if err != nil { if client.IsErrNotFound(err) { - logrus.Infof("[remove/%s] Container doesn't exist on host [%s]", containerName, hostname) + log.Infof(ctx, "[remove/%s] Container doesn't exist on host [%s]", containerName, hostname) return nil } return err } - logrus.Infof("[remove/%s] Stopping container on host [%s]", containerName, hostname) - err = StopContainer(dClient, hostname, containerName) + log.Infof(ctx, "[remove/%s] Stopping container on host [%s]", containerName, hostname) + err = StopContainer(ctx, dClient, hostname, containerName) if err != nil { return err } - logrus.Infof("[remove/%s] Removing container on host [%s]", containerName, hostname) - err = RemoveContainer(dClient, hostname, containerName) + log.Infof(ctx, "[remove/%s] Removing container on host [%s]", containerName, hostname) + err = RemoveContainer(ctx, dClient, hostname, containerName) if err != nil { return err } - logrus.Infof("[remove/%s] Successfully removed container on host [%s]", containerName, hostname) + log.Infof(ctx, "[remove/%s] Successfully removed container on host [%s]", containerName, hostname) return nil } -func IsContainerRunning(dClient *client.Client, hostname string, containerName string, all bool) (bool, error) { +func IsContainerRunning(ctx context.Context, dClient *client.Client, hostname string, containerName string, all bool) (bool, error) { logrus.Debugf("Checking if container [%s] is running on host [%s]", containerName, hostname) - containers, err := dClient.ContainerList(context.Background(), types.ContainerListOptions{All: all}) + containers, err := dClient.ContainerList(ctx, types.ContainerListOptions{All: all}) if err != nil { return false, fmt.Errorf("Can't get Docker containers for host [%s]: %v", hostname, err) @@ -126,9 +127,9 @@ func IsContainerRunning(dClient *client.Client, hostname string, containerName s return false, nil } -func localImageExists(dClient *client.Client, hostname string, containerImage string) (bool, error) { +func localImageExists(ctx context.Context, dClient *client.Client, hostname string, containerImage string) (bool, error) { logrus.Debugf("Checking if image [%s] exists on host [%s]", containerImage, hostname) - _, _, err := dClient.ImageInspectWithRaw(context.Background(), containerImage) + _, _, err := dClient.ImageInspectWithRaw(ctx, containerImage) if err != nil { if client.IsErrNotFound(err) { logrus.Debugf("Image [%s] does not exist on host [%s]: %v", containerImage, hostname, err) @@ -140,8 +141,8 @@ func localImageExists(dClient *client.Client, hostname string, containerImage st return true, nil } -func pullImage(dClient *client.Client, hostname string, containerImage string) error { - out, err := dClient.ImagePull(context.Background(), containerImage, types.ImagePullOptions{}) +func pullImage(ctx context.Context, dClient *client.Client, hostname string, containerImage string) error { + out, err := dClient.ImagePull(ctx, containerImage, types.ImagePullOptions{}) if err != nil { return fmt.Errorf("Can't pull Docker image [%s] for host [%s]: %v", containerImage, hostname, err) } @@ -155,84 +156,84 @@ func pullImage(dClient *client.Client, hostname string, containerImage string) e return nil } -func UseLocalOrPull(dClient *client.Client, hostname string, containerImage string, plane string) error { - logrus.Infof("[%s] Checking image [%s] on host [%s]", plane, containerImage, hostname) - imageExists, err := localImageExists(dClient, hostname, containerImage) +func UseLocalOrPull(ctx context.Context, dClient *client.Client, hostname string, containerImage string, plane string) error { + log.Infof(ctx, "[%s] Checking image [%s] on host [%s]", plane, containerImage, hostname) + imageExists, err := localImageExists(ctx, dClient, hostname, containerImage) if err != nil { return err } if imageExists { - logrus.Infof("[%s] No pull necessary, image [%s] exists on host [%s]", plane, containerImage, hostname) + log.Infof(ctx, "[%s] No pull necessary, image [%s] exists on host [%s]", plane, containerImage, hostname) return nil } - logrus.Infof("[%s] Pulling image [%s] on host [%s]", plane, containerImage, hostname) - if err := pullImage(dClient, hostname, containerImage); err != nil { + log.Infof(ctx, "[%s] Pulling image [%s] on host [%s]", plane, containerImage, hostname) + if err := pullImage(ctx, dClient, hostname, containerImage); err != nil { return err } - logrus.Infof("[%s] Successfully pulled image [%s] on host [%s]", plane, containerImage, hostname) + log.Infof(ctx, "[%s] Successfully pulled image [%s] on host [%s]", plane, containerImage, hostname) return nil } -func RemoveContainer(dClient *client.Client, hostname string, containerName string) error { - err := dClient.ContainerRemove(context.Background(), containerName, types.ContainerRemoveOptions{}) +func RemoveContainer(ctx context.Context, dClient *client.Client, hostname string, containerName string) error { + err := dClient.ContainerRemove(ctx, containerName, types.ContainerRemoveOptions{}) if err != nil { return fmt.Errorf("Can't remove Docker container [%s] for host [%s]: %v", containerName, hostname, err) } return nil } -func StopContainer(dClient *client.Client, hostname string, containerName string) error { - err := dClient.ContainerStop(context.Background(), containerName, nil) +func StopContainer(ctx context.Context, dClient *client.Client, hostname string, containerName string) error { + err := dClient.ContainerStop(ctx, containerName, nil) if err != nil { return fmt.Errorf("Can't stop Docker container [%s] for host [%s]: %v", containerName, hostname, err) } return nil } -func RenameContainer(dClient *client.Client, hostname string, oldContainerName string, newContainerName string) error { - err := dClient.ContainerRename(context.Background(), oldContainerName, newContainerName) +func RenameContainer(ctx context.Context, dClient *client.Client, hostname string, oldContainerName string, newContainerName string) error { + err := dClient.ContainerRename(ctx, oldContainerName, newContainerName) if err != nil { return fmt.Errorf("Can't rename Docker container [%s] for host [%s]: %v", oldContainerName, hostname, err) } return nil } -func StartContainer(dClient *client.Client, hostname string, containerName string) error { - if err := dClient.ContainerStart(context.Background(), containerName, types.ContainerStartOptions{}); err != nil { +func StartContainer(ctx context.Context, dClient *client.Client, hostname string, containerName string) error { + if err := dClient.ContainerStart(ctx, containerName, types.ContainerStartOptions{}); err != nil { return fmt.Errorf("Failed to start [%s] container on host [%s]: %v", containerName, hostname, err) } return nil } -func CreateContiner(dClient *client.Client, hostname string, containerName string, imageCfg *container.Config, hostCfg *container.HostConfig) (container.ContainerCreateCreatedBody, error) { - created, err := dClient.ContainerCreate(context.Background(), imageCfg, hostCfg, nil, containerName) +func CreateContiner(ctx context.Context, dClient *client.Client, hostname string, containerName string, imageCfg *container.Config, hostCfg *container.HostConfig) (container.ContainerCreateCreatedBody, error) { + created, err := dClient.ContainerCreate(ctx, imageCfg, hostCfg, nil, containerName) if err != nil { return container.ContainerCreateCreatedBody{}, fmt.Errorf("Failed to create [%s] container on host [%s]: %v", containerName, hostname, err) } return created, nil } -func InspectContainer(dClient *client.Client, hostname string, containerName string) (types.ContainerJSON, error) { - inspection, err := dClient.ContainerInspect(context.Background(), containerName) +func InspectContainer(ctx context.Context, dClient *client.Client, hostname string, containerName string) (types.ContainerJSON, error) { + inspection, err := dClient.ContainerInspect(ctx, containerName) if err != nil { return types.ContainerJSON{}, fmt.Errorf("Failed to inspect [%s] container on host [%s]: %v", containerName, hostname, err) } return inspection, nil } -func StopRenameContainer(dClient *client.Client, hostname string, oldContainerName string, newContainerName string) error { - if err := StopContainer(dClient, hostname, oldContainerName); err != nil { +func StopRenameContainer(ctx context.Context, dClient *client.Client, hostname string, oldContainerName string, newContainerName string) error { + if err := StopContainer(ctx, dClient, hostname, oldContainerName); err != nil { return err } - if err := WaitForContainer(dClient, oldContainerName); err != nil { + if err := WaitForContainer(ctx, dClient, oldContainerName); err != nil { return nil } - err := RenameContainer(dClient, hostname, oldContainerName, newContainerName) + err := RenameContainer(ctx, dClient, hostname, oldContainerName, newContainerName) return err } -func WaitForContainer(dClient *client.Client, containerName string) error { - statusCh, errCh := dClient.ContainerWait(context.Background(), containerName, container.WaitConditionNotRunning) +func WaitForContainer(ctx context.Context, dClient *client.Client, containerName string) error { + statusCh, errCh := dClient.ContainerWait(ctx, containerName, container.WaitConditionNotRunning) select { case err := <-errCh: if err != nil { @@ -243,11 +244,11 @@ func WaitForContainer(dClient *client.Client, containerName string) error { return nil } -func IsContainerUpgradable(dClient *client.Client, imageCfg *container.Config, containerName string, hostname string, plane string) (bool, error) { +func IsContainerUpgradable(ctx context.Context, dClient *client.Client, imageCfg *container.Config, containerName string, hostname string, plane string) (bool, error) { logrus.Debugf("[%s] Checking if container [%s] is eligible for upgrade on host [%s]", plane, containerName, hostname) // this should be moved to a higher layer. - containerInspect, err := InspectContainer(dClient, hostname, containerName) + containerInspect, err := InspectContainer(ctx, dClient, hostname, containerName) if err != nil { return false, err } diff --git a/hosts/hosts.go b/hosts/hosts.go index 1790d0e8..2465bcf9 100644 --- a/hosts/hosts.go +++ b/hosts/hosts.go @@ -1,14 +1,15 @@ package hosts import ( + "context" "fmt" "github.com/docker/docker/api/types/container" "github.com/docker/docker/client" "github.com/rancher/rke/docker" "github.com/rancher/rke/k8s" + "github.com/rancher/rke/log" "github.com/rancher/types/apis/management.cattle.io/v3" - "github.com/sirupsen/logrus" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/kubernetes" ) @@ -31,8 +32,8 @@ const ( CleanerContainerName = "kube-cleaner" ) -func (h *Host) CleanUpAll(cleanerImage string) error { - logrus.Infof("[hosts] Cleaning up host [%s]", h.Address) +func (h *Host) CleanUpAll(ctx context.Context, cleanerImage string) error { + log.Infof(ctx, "[hosts] Cleaning up host [%s]", h.Address) toCleanPaths := []string{ ToCleanEtcdDir, ToCleanSSLDir, @@ -40,12 +41,12 @@ func (h *Host) CleanUpAll(cleanerImage string) error { ToCleanCNIBin, ToCleanCalicoRun, } - return h.CleanUp(toCleanPaths, cleanerImage) + return h.CleanUp(ctx, toCleanPaths, cleanerImage) } -func (h *Host) CleanUpWorkerHost(controlRole, cleanerImage string) error { +func (h *Host) CleanUpWorkerHost(ctx context.Context, controlRole, cleanerImage string) error { if h.IsControl { - logrus.Infof("[hosts] Host [%s] is already a controlplane host, skipping cleanup.", h.Address) + log.Infof(ctx, "[hosts] Host [%s] is already a controlplane host, skipping cleanup.", h.Address) return nil } toCleanPaths := []string{ @@ -54,12 +55,12 @@ func (h *Host) CleanUpWorkerHost(controlRole, cleanerImage string) error { ToCleanCNIBin, ToCleanCalicoRun, } - return h.CleanUp(toCleanPaths, cleanerImage) + return h.CleanUp(ctx, toCleanPaths, cleanerImage) } -func (h *Host) CleanUpControlHost(workerRole, cleanerImage string) error { +func (h *Host) CleanUpControlHost(ctx context.Context, workerRole, cleanerImage string) error { if h.IsWorker { - logrus.Infof("[hosts] Host [%s] is already a worker host, skipping cleanup.", h.Address) + log.Infof(ctx, "[hosts] Host [%s] is already a worker host, skipping cleanup.", h.Address) return nil } toCleanPaths := []string{ @@ -68,38 +69,38 @@ func (h *Host) CleanUpControlHost(workerRole, cleanerImage string) error { ToCleanCNIBin, ToCleanCalicoRun, } - return h.CleanUp(toCleanPaths, cleanerImage) + return h.CleanUp(ctx, toCleanPaths, cleanerImage) } -func (h *Host) CleanUp(toCleanPaths []string, cleanerImage string) error { - logrus.Infof("[hosts] Cleaning up host [%s]", h.Address) +func (h *Host) CleanUp(ctx context.Context, toCleanPaths []string, cleanerImage string) error { + log.Infof(ctx, "[hosts] Cleaning up host [%s]", h.Address) imageCfg, hostCfg := buildCleanerConfig(h, toCleanPaths, cleanerImage) - logrus.Infof("[hosts] Running cleaner container on host [%s]", h.Address) - if err := docker.DoRunContainer(h.DClient, imageCfg, hostCfg, CleanerContainerName, h.Address, CleanerContainerName); err != nil { + log.Infof(ctx, "[hosts] Running cleaner container on host [%s]", h.Address) + if err := docker.DoRunContainer(ctx, h.DClient, imageCfg, hostCfg, CleanerContainerName, h.Address, CleanerContainerName); err != nil { return err } - if err := docker.WaitForContainer(h.DClient, CleanerContainerName); err != nil { + if err := docker.WaitForContainer(ctx, h.DClient, CleanerContainerName); err != nil { return err } - logrus.Infof("[hosts] Removing cleaner container on host [%s]", h.Address) - if err := docker.RemoveContainer(h.DClient, h.Address, CleanerContainerName); err != nil { + log.Infof(ctx, "[hosts] Removing cleaner container on host [%s]", h.Address) + if err := docker.RemoveContainer(ctx, h.DClient, h.Address, CleanerContainerName); err != nil { return err } - logrus.Infof("[hosts] Successfully cleaned up host [%s]", h.Address) + log.Infof(ctx, "[hosts] Successfully cleaned up host [%s]", h.Address) return nil } -func DeleteNode(toDeleteHost *Host, kubeClient *kubernetes.Clientset, hasAnotherRole bool) error { +func DeleteNode(ctx context.Context, toDeleteHost *Host, kubeClient *kubernetes.Clientset, hasAnotherRole bool) error { if hasAnotherRole { - logrus.Infof("[hosts] host [%s] has another role, skipping delete from kubernetes cluster", toDeleteHost.Address) + log.Infof(ctx, "[hosts] host [%s] has another role, skipping delete from kubernetes cluster", toDeleteHost.Address) return nil } - logrus.Infof("[hosts] Cordoning host [%s]", toDeleteHost.Address) + log.Infof(ctx, "[hosts] Cordoning host [%s]", toDeleteHost.Address) if _, err := k8s.GetNode(kubeClient, toDeleteHost.HostnameOverride); err != nil { if apierrors.IsNotFound(err) { - logrus.Warnf("[hosts] Can't find node by name [%s]", toDeleteHost.Address) + log.Warnf(ctx, "[hosts] Can't find node by name [%s]", toDeleteHost.Address) return nil } return err @@ -108,11 +109,11 @@ func DeleteNode(toDeleteHost *Host, kubeClient *kubernetes.Clientset, hasAnother if err := k8s.CordonUncordon(kubeClient, toDeleteHost.HostnameOverride, true); err != nil { return err } - logrus.Infof("[hosts] Deleting host [%s] from the cluster", toDeleteHost.Address) + log.Infof(ctx, "[hosts] Deleting host [%s] from the cluster", toDeleteHost.Address) if err := k8s.DeleteNode(kubeClient, toDeleteHost.HostnameOverride); err != nil { return err } - logrus.Infof("[hosts] Successfully deleted host [%s] from the cluster", toDeleteHost.Address) + log.Infof(ctx, "[hosts] Successfully deleted host [%s] from the cluster", toDeleteHost.Address) return nil } diff --git a/hosts/tunnel.go b/hosts/tunnel.go index b6f5bef6..29666283 100644 --- a/hosts/tunnel.go +++ b/hosts/tunnel.go @@ -11,6 +11,7 @@ import ( "github.com/docker/docker/client" "github.com/rancher/rke/docker" + "github.com/rancher/rke/log" "github.com/sirupsen/logrus" "golang.org/x/crypto/ssh" "golang.org/x/crypto/ssh/terminal" @@ -21,11 +22,11 @@ const ( K8sVersion = "1.8" ) -func (h *Host) TunnelUp(dialerFactory DialerFactory) error { +func (h *Host) TunnelUp(ctx context.Context, dialerFactory DialerFactory) error { if h.DClient != nil { return nil } - logrus.Infof("[dialer] Setup tunnel for host [%s]", h.Address) + log.Infof(ctx, "[dialer] Setup tunnel for host [%s]", h.Address) httpClient, err := h.newHTTPClient(dialerFactory) if err != nil { return fmt.Errorf("Can't establish dialer connection: %v", err) @@ -37,7 +38,7 @@ func (h *Host) TunnelUp(dialerFactory DialerFactory) error { if err != nil { return fmt.Errorf("Can't initiate NewClient: %v", err) } - info, err := h.DClient.Info(context.Background()) + info, err := h.DClient.Info(ctx) if err != nil { return fmt.Errorf("Can't retrieve Docker Info: %v", err) } @@ -50,7 +51,7 @@ func (h *Host) TunnelUp(dialerFactory DialerFactory) error { if !isvalid && !h.IgnoreDockerVersion { return fmt.Errorf("Unsupported Docker version found [%s], supported versions are %v", info.ServerVersion, docker.K8sDockerVersions[K8sVersion]) } else if !isvalid { - logrus.Warnf("Unsupported Docker version found [%s], supported versions are %v", info.ServerVersion, docker.K8sDockerVersions[K8sVersion]) + log.Warnf(ctx, "Unsupported Docker version found [%s], supported versions are %v", info.ServerVersion, docker.K8sDockerVersions[K8sVersion]) } return nil diff --git a/log/log.go b/log/log.go new file mode 100644 index 00000000..2871812c --- /dev/null +++ b/log/log.go @@ -0,0 +1,39 @@ +package log + +import ( + "context" + + "github.com/sirupsen/logrus" +) + +type logKey string + +const ( + key logKey = "rke-logger" +) + +type logger interface { + Infof(msg string, args ...interface{}) + Warnf(msg string, args ...interface{}) +} + +func SetLogger(ctx context.Context, logger logger) context.Context { + return context.WithValue(ctx, key, logger) +} + +func getLogger(ctx context.Context) logger { + logger, _ := ctx.Value(key).(logger) + if logger == nil { + return logrus.StandardLogger() + } + return logger +} + +func Infof(ctx context.Context, msg string, args ...interface{}) { + getLogger(ctx).Infof(msg, args...) + +} + +func Warnf(ctx context.Context, msg string, args ...interface{}) { + getLogger(ctx).Warnf(msg, args...) +} diff --git a/pki/deploy.go b/pki/deploy.go index 25c92da1..089d395d 100644 --- a/pki/deploy.go +++ b/pki/deploy.go @@ -11,10 +11,11 @@ import ( "github.com/docker/docker/api/types/container" "github.com/rancher/rke/docker" "github.com/rancher/rke/hosts" + "github.com/rancher/rke/log" "github.com/sirupsen/logrus" ) -func DeployCertificatesOnMasters(cpHosts []*hosts.Host, crtMap map[string]CertificatePKI, certDownloaderImage string) error { +func DeployCertificatesOnMasters(ctx context.Context, cpHosts []*hosts.Host, crtMap map[string]CertificatePKI, certDownloaderImage string) error { // list of certificates that should be deployed on the masters crtList := []string{ CACertName, @@ -31,7 +32,7 @@ func DeployCertificatesOnMasters(cpHosts []*hosts.Host, crtMap map[string]Certif } for i := range cpHosts { - err := doRunDeployer(cpHosts[i], env, certDownloaderImage) + err := doRunDeployer(ctx, cpHosts[i], env, certDownloaderImage) if err != nil { return err } @@ -39,7 +40,7 @@ func DeployCertificatesOnMasters(cpHosts []*hosts.Host, crtMap map[string]Certif return nil } -func DeployCertificatesOnWorkers(workerHosts []*hosts.Host, crtMap map[string]CertificatePKI, certDownloaderImage string) error { +func DeployCertificatesOnWorkers(ctx context.Context, workerHosts []*hosts.Host, crtMap map[string]CertificatePKI, certDownloaderImage string) error { // list of certificates that should be deployed on the workers crtList := []string{ CACertName, @@ -53,7 +54,7 @@ func DeployCertificatesOnWorkers(workerHosts []*hosts.Host, crtMap map[string]Ce } for i := range workerHosts { - err := doRunDeployer(workerHosts[i], env, certDownloaderImage) + err := doRunDeployer(ctx, workerHosts[i], env, certDownloaderImage) if err != nil { return err } @@ -61,8 +62,8 @@ func DeployCertificatesOnWorkers(workerHosts []*hosts.Host, crtMap map[string]Ce return nil } -func doRunDeployer(host *hosts.Host, containerEnv []string, certDownloaderImage string) error { - if err := docker.UseLocalOrPull(host.DClient, host.Address, certDownloaderImage, CertificatesServiceName); err != nil { +func doRunDeployer(ctx context.Context, host *hosts.Host, containerEnv []string, certDownloaderImage string) error { + if err := docker.UseLocalOrPull(ctx, host.DClient, host.Address, certDownloaderImage, CertificatesServiceName); err != nil { return err } imageCfg := &container.Config{ @@ -75,17 +76,17 @@ func doRunDeployer(host *hosts.Host, containerEnv []string, certDownloaderImage }, Privileged: true, } - resp, err := host.DClient.ContainerCreate(context.Background(), imageCfg, hostCfg, nil, CrtDownloaderContainer) + resp, err := host.DClient.ContainerCreate(ctx, imageCfg, hostCfg, nil, CrtDownloaderContainer) if err != nil { return fmt.Errorf("Failed to create Certificates deployer container on host [%s]: %v", host.Address, err) } - if err := host.DClient.ContainerStart(context.Background(), resp.ID, types.ContainerStartOptions{}); err != nil { + if err := host.DClient.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil { return fmt.Errorf("Failed to start Certificates deployer container on host [%s]: %v", host.Address, err) } logrus.Debugf("[certificates] Successfully started Certificate deployer container: %s", resp.ID) for { - isDeployerRunning, err := docker.IsContainerRunning(host.DClient, host.Address, CrtDownloaderContainer, false) + isDeployerRunning, err := docker.IsContainerRunning(ctx, host.DClient, host.Address, CrtDownloaderContainer, false) if err != nil { return err } @@ -93,28 +94,28 @@ func doRunDeployer(host *hosts.Host, containerEnv []string, certDownloaderImage time.Sleep(5 * time.Second) continue } - if err := host.DClient.ContainerRemove(context.Background(), resp.ID, types.ContainerRemoveOptions{}); err != nil { + if err := host.DClient.ContainerRemove(ctx, resp.ID, types.ContainerRemoveOptions{}); err != nil { return fmt.Errorf("Failed to delete Certificates deployer container on host [%s]: %v", host.Address, err) } return nil } } -func DeployAdminConfig(kubeConfig, localConfigPath string) error { +func DeployAdminConfig(ctx context.Context, kubeConfig, localConfigPath string) error { logrus.Debugf("Deploying admin Kubeconfig locally: %s", kubeConfig) err := ioutil.WriteFile(localConfigPath, []byte(kubeConfig), 0640) if err != nil { return fmt.Errorf("Failed to create local admin kubeconfig file: %v", err) } - logrus.Infof("Successfully Deployed local admin kubeconfig at [%s]", localConfigPath) + log.Infof(ctx, "Successfully Deployed local admin kubeconfig at [%s]", localConfigPath) return nil } -func RemoveAdminConfig(localConfigPath string) { - logrus.Infof("Removing local admin Kubeconfig: %s", localConfigPath) +func RemoveAdminConfig(ctx context.Context, localConfigPath string) { + log.Infof(ctx, "Removing local admin Kubeconfig: %s", localConfigPath) if err := os.Remove(localConfigPath); err != nil { logrus.Warningf("Failed to remove local admin Kubeconfig file: %v", err) return } - logrus.Infof("Local admin Kubeconfig removed successfully") + log.Infof(ctx, "Local admin Kubeconfig removed successfully") } diff --git a/pki/pki.go b/pki/pki.go index bff8af14..f24cffd7 100644 --- a/pki/pki.go +++ b/pki/pki.go @@ -1,12 +1,14 @@ package pki import ( + "context" "crypto/rsa" "crypto/x509" "fmt" "net" "github.com/rancher/rke/hosts" + "github.com/rancher/rke/log" "github.com/sirupsen/logrus" "k8s.io/client-go/util/cert" ) @@ -27,19 +29,19 @@ type CertificatePKI struct { } // StartCertificatesGeneration ... -func StartCertificatesGeneration(cpHosts []*hosts.Host, workerHosts []*hosts.Host, clusterDomain, localConfigPath string, KubernetesServiceIP net.IP) (map[string]CertificatePKI, error) { - logrus.Infof("[certificates] Generating kubernetes certificates") - certs, err := generateCerts(cpHosts, clusterDomain, localConfigPath, KubernetesServiceIP) +func StartCertificatesGeneration(ctx context.Context, cpHosts []*hosts.Host, workerHosts []*hosts.Host, clusterDomain, localConfigPath string, KubernetesServiceIP net.IP) (map[string]CertificatePKI, error) { + log.Infof(ctx, "[certificates] Generating kubernetes certificates") + certs, err := generateCerts(ctx, cpHosts, clusterDomain, localConfigPath, KubernetesServiceIP) if err != nil { return nil, err } return certs, nil } -func generateCerts(cpHosts []*hosts.Host, clusterDomain, localConfigPath string, KubernetesServiceIP net.IP) (map[string]CertificatePKI, error) { +func generateCerts(ctx context.Context, cpHosts []*hosts.Host, clusterDomain, localConfigPath string, KubernetesServiceIP net.IP) (map[string]CertificatePKI, error) { certs := make(map[string]CertificatePKI) // generate CA certificate and key - logrus.Infof("[certificates] Generating CA kubernetes certificates") + log.Infof(ctx, "[certificates] Generating CA kubernetes certificates") caCrt, caKey, err := generateCACertAndKey() if err != nil { return nil, err @@ -56,7 +58,7 @@ func generateCerts(cpHosts []*hosts.Host, clusterDomain, localConfigPath string, } // generate API certificate and key - logrus.Infof("[certificates] Generating Kubernetes API server certificates") + log.Infof(ctx, "[certificates] Generating Kubernetes API server certificates") kubeAPIAltNames := GetAltNames(cpHosts, clusterDomain, KubernetesServiceIP) kubeAPICrt, kubeAPIKey, err := GenerateKubeAPICertAndKey(caCrt, caKey, kubeAPIAltNames) if err != nil { @@ -74,7 +76,7 @@ func generateCerts(cpHosts []*hosts.Host, clusterDomain, localConfigPath string, } // generate Kube controller-manager certificate and key - logrus.Infof("[certificates] Generating Kube Controller certificates") + log.Infof(ctx, "[certificates] Generating Kube Controller certificates") kubeControllerCrt, kubeControllerKey, err := generateClientCertAndKey(caCrt, caKey, KubeControllerCommonName, []string{}) if err != nil { return nil, err @@ -95,7 +97,7 @@ func generateCerts(cpHosts []*hosts.Host, clusterDomain, localConfigPath string, } // generate Kube scheduler certificate and key - logrus.Infof("[certificates] Generating Kube Scheduler certificates") + log.Infof(ctx, "[certificates] Generating Kube Scheduler certificates") kubeSchedulerCrt, kubeSchedulerKey, err := generateClientCertAndKey(caCrt, caKey, KubeSchedulerCommonName, []string{}) if err != nil { return nil, err @@ -116,7 +118,7 @@ func generateCerts(cpHosts []*hosts.Host, clusterDomain, localConfigPath string, } // generate Kube Proxy certificate and key - logrus.Infof("[certificates] Generating Kube Proxy certificates") + log.Infof(ctx, "[certificates] Generating Kube Proxy certificates") kubeProxyCrt, kubeProxyKey, err := generateClientCertAndKey(caCrt, caKey, KubeProxyCommonName, []string{}) if err != nil { return nil, err @@ -137,7 +139,7 @@ func generateCerts(cpHosts []*hosts.Host, clusterDomain, localConfigPath string, } // generate Kubelet certificate and key - logrus.Infof("[certificates] Generating Node certificate") + log.Infof(ctx, "[certificates] Generating Node certificate") nodeCrt, nodeKey, err := generateClientCertAndKey(caCrt, caKey, KubeNodeCommonName, []string{KubeNodeOrganizationName}) if err != nil { return nil, err @@ -157,7 +159,7 @@ func generateCerts(cpHosts []*hosts.Host, clusterDomain, localConfigPath string, ConfigEnvName: KubeNodeConfigENVName, ConfigPath: KubeNodeCommonName, } - logrus.Infof("[certificates] Generating admin certificates and kubeconfig") + log.Infof(ctx, "[certificates] Generating admin certificates and kubeconfig") kubeAdminCrt, kubeAdminKey, err := generateClientCertAndKey(caCrt, caKey, KubeAdminCommonName, []string{KubeAdminOrganizationName}) if err != nil { return nil, err diff --git a/pki/pki_test.go b/pki/pki_test.go index f819cf4a..e8c9522c 100644 --- a/pki/pki_test.go +++ b/pki/pki_test.go @@ -1,6 +1,7 @@ package pki import ( + "context" "crypto/x509" "fmt" "net" @@ -27,7 +28,7 @@ func TestPKI(t *testing.T) { DClient: nil, }, } - certificateMap, err := StartCertificatesGeneration(cpHosts, cpHosts, FakeClusterDomain, "", net.ParseIP(FakeKubernetesServiceIP)) + certificateMap, err := StartCertificatesGeneration(context.Background(), cpHosts, cpHosts, FakeClusterDomain, "", net.ParseIP(FakeKubernetesServiceIP)) if err != nil { t.Fatalf("Failed To generate certificates: %v", err) } diff --git a/services/controlplane.go b/services/controlplane.go index 8df47845..9cb5fd0b 100644 --- a/services/controlplane.go +++ b/services/controlplane.go @@ -1,81 +1,83 @@ package services import ( + "context" + "github.com/rancher/rke/hosts" + "github.com/rancher/rke/log" "github.com/rancher/types/apis/management.cattle.io/v3" - "github.com/sirupsen/logrus" ) -func RunControlPlane(controlHosts, etcdHosts []*hosts.Host, controlServices v3.RKEConfigServices, sidekickImage, authorizationMode string, healthcheckDialerFactory hosts.DialerFactory) error { - logrus.Infof("[%s] Building up Controller Plane..", ControlRole) +func RunControlPlane(ctx context.Context, controlHosts, etcdHosts []*hosts.Host, controlServices v3.RKEConfigServices, sidekickImage, authorizationMode string, healthcheckDialerFactory hosts.DialerFactory) error { + log.Infof(ctx, "[%s] Building up Controller Plane..", ControlRole) for _, host := range controlHosts { if host.IsWorker { - if err := removeNginxProxy(host); err != nil { + if err := removeNginxProxy(ctx, host); err != nil { return err } } // run sidekick - if err := runSidekick(host, sidekickImage); err != nil { + if err := runSidekick(ctx, host, sidekickImage); err != nil { return err } // run kubeapi - err := runKubeAPI(host, etcdHosts, controlServices.KubeAPI, authorizationMode, healthcheckDialerFactory) + err := runKubeAPI(ctx, host, etcdHosts, controlServices.KubeAPI, authorizationMode, healthcheckDialerFactory) if err != nil { return err } // run kubecontroller - err = runKubeController(host, controlServices.KubeController, authorizationMode, healthcheckDialerFactory) + err = runKubeController(ctx, host, controlServices.KubeController, authorizationMode, healthcheckDialerFactory) if err != nil { return err } // run scheduler - err = runScheduler(host, controlServices.Scheduler, healthcheckDialerFactory) + err = runScheduler(ctx, host, controlServices.Scheduler, healthcheckDialerFactory) if err != nil { return err } } - logrus.Infof("[%s] Successfully started Controller Plane..", ControlRole) + log.Infof(ctx, "[%s] Successfully started Controller Plane..", ControlRole) return nil } -func RemoveControlPlane(controlHosts []*hosts.Host, force bool) error { - logrus.Infof("[%s] Tearing down the Controller Plane..", ControlRole) +func RemoveControlPlane(ctx context.Context, controlHosts []*hosts.Host, force bool) error { + log.Infof(ctx, "[%s] Tearing down the Controller Plane..", ControlRole) for _, host := range controlHosts { // remove KubeAPI - if err := removeKubeAPI(host); err != nil { + if err := removeKubeAPI(ctx, host); err != nil { return err } // remove KubeController - if err := removeKubeController(host); err != nil { + if err := removeKubeController(ctx, host); err != nil { return nil } // remove scheduler - err := removeScheduler(host) + err := removeScheduler(ctx, host) if err != nil { return err } // check if the host already is a worker if host.IsWorker { - logrus.Infof("[%s] Host [%s] is already a worker host, skipping delete kubelet and kubeproxy.", ControlRole, host.Address) + log.Infof(ctx, "[%s] Host [%s] is already a worker host, skipping delete kubelet and kubeproxy.", ControlRole, host.Address) } else { // remove KubeAPI - if err := removeKubelet(host); err != nil { + if err := removeKubelet(ctx, host); err != nil { return err } // remove KubeController - if err := removeKubeproxy(host); err != nil { + if err := removeKubeproxy(ctx, host); err != nil { return nil } // remove Sidekick - if err := removeSidekick(host); err != nil { + if err := removeSidekick(ctx, host); err != nil { return err } } } - logrus.Infof("[%s] Successfully teared down Controller Plane..", ControlRole) + log.Infof(ctx, "[%s] Successfully teared down Controller Plane..", ControlRole) return nil } diff --git a/services/etcd.go b/services/etcd.go index b70a532d..270a4419 100644 --- a/services/etcd.go +++ b/services/etcd.go @@ -3,37 +3,39 @@ package services import ( "fmt" + "context" + "github.com/docker/docker/api/types/container" "github.com/docker/go-connections/nat" "github.com/rancher/rke/docker" "github.com/rancher/rke/hosts" + "github.com/rancher/rke/log" "github.com/rancher/types/apis/management.cattle.io/v3" - "github.com/sirupsen/logrus" ) -func RunEtcdPlane(etcdHosts []*hosts.Host, etcdService v3.ETCDService) error { - logrus.Infof("[%s] Building up Etcd Plane..", ETCDRole) +func RunEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, etcdService v3.ETCDService) error { + log.Infof(ctx, "[%s] Building up Etcd Plane..", ETCDRole) initCluster := getEtcdInitialCluster(etcdHosts) for _, host := range etcdHosts { imageCfg, hostCfg := buildEtcdConfig(host, etcdService, initCluster) - err := docker.DoRunContainer(host.DClient, imageCfg, hostCfg, EtcdContainerName, host.Address, ETCDRole) + err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, EtcdContainerName, host.Address, ETCDRole) if err != nil { return err } } - logrus.Infof("[%s] Successfully started Etcd Plane..", ETCDRole) + log.Infof(ctx, "[%s] Successfully started Etcd Plane..", ETCDRole) return nil } -func RemoveEtcdPlane(etcdHosts []*hosts.Host) error { - logrus.Infof("[%s] Tearing down Etcd Plane..", ETCDRole) +func RemoveEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host) error { + log.Infof(ctx, "[%s] Tearing down Etcd Plane..", ETCDRole) for _, host := range etcdHosts { - err := docker.DoRemoveContainer(host.DClient, EtcdContainerName, host.Address) + err := docker.DoRemoveContainer(ctx, host.DClient, EtcdContainerName, host.Address) if err != nil { return err } } - logrus.Infof("[%s] Successfully teared down Etcd Plane..", ETCDRole) + log.Infof(ctx, "[%s] Successfully teared down Etcd Plane..", ETCDRole) return nil } diff --git a/services/healthcheck.go b/services/healthcheck.go index 0311830b..6ea65636 100644 --- a/services/healthcheck.go +++ b/services/healthcheck.go @@ -1,6 +1,7 @@ package services import ( + "context" "crypto/tls" "fmt" "io/ioutil" @@ -8,6 +9,7 @@ import ( "time" "github.com/rancher/rke/hosts" + "github.com/rancher/rke/log" "github.com/sirupsen/logrus" ) @@ -18,8 +20,8 @@ const ( HTTPSProtoPrefix = "https://" ) -func runHealthcheck(host *hosts.Host, port int, useTLS bool, serviceName string, healthcheckDialerFactory hosts.DialerFactory) error { - logrus.Infof("[healthcheck] Start Healthcheck on service [%s] on host [%s]", serviceName, host.Address) +func runHealthcheck(ctx context.Context, host *hosts.Host, port int, useTLS bool, serviceName string, healthcheckDialerFactory hosts.DialerFactory) error { + log.Infof(ctx, "[healthcheck] Start Healthcheck on service [%s] on host [%s]", serviceName, host.Address) client, err := getHealthCheckHTTPClient(host, port, healthcheckDialerFactory) if err != nil { return fmt.Errorf("Failed to initiate new HTTP client for service [%s] for host [%s]", serviceName, host.Address) @@ -30,7 +32,7 @@ func runHealthcheck(host *hosts.Host, port int, useTLS bool, serviceName string, time.Sleep(5 * time.Second) continue } - logrus.Infof("[healthcheck] service [%s] on host [%s] is healthy", serviceName, host.Address) + log.Infof(ctx, "[healthcheck] service [%s] on host [%s] is healthy", serviceName, host.Address) return nil } return fmt.Errorf("Failed to verify healthcheck: %v", err) diff --git a/services/kubeapi.go b/services/kubeapi.go index 373540c5..0c3bce75 100644 --- a/services/kubeapi.go +++ b/services/kubeapi.go @@ -1,6 +1,7 @@ package services import ( + "context" "fmt" "github.com/docker/docker/api/types/container" @@ -11,17 +12,17 @@ import ( "github.com/rancher/types/apis/management.cattle.io/v3" ) -func runKubeAPI(host *hosts.Host, etcdHosts []*hosts.Host, kubeAPIService v3.KubeAPIService, authorizationMode string, df hosts.DialerFactory) error { +func runKubeAPI(ctx context.Context, host *hosts.Host, etcdHosts []*hosts.Host, kubeAPIService v3.KubeAPIService, authorizationMode string, df hosts.DialerFactory) error { etcdConnString := GetEtcdConnString(etcdHosts) imageCfg, hostCfg := buildKubeAPIConfig(host, kubeAPIService, etcdConnString, authorizationMode) - if err := docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeAPIContainerName, host.Address, ControlRole); err != nil { + if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeAPIContainerName, host.Address, ControlRole); err != nil { return err } - return runHealthcheck(host, KubeAPIPort, false, KubeAPIContainerName, df) + return runHealthcheck(ctx, host, KubeAPIPort, false, KubeAPIContainerName, df) } -func removeKubeAPI(host *hosts.Host) error { - return docker.DoRemoveContainer(host.DClient, KubeAPIContainerName, host.Address) +func removeKubeAPI(ctx context.Context, host *hosts.Host) error { + return docker.DoRemoveContainer(ctx, host.DClient, KubeAPIContainerName, host.Address) } func buildKubeAPIConfig(host *hosts.Host, kubeAPIService v3.KubeAPIService, etcdConnString, authorizationMode string) (*container.Config, *container.HostConfig) { diff --git a/services/kubecontroller.go b/services/kubecontroller.go index 5011a16c..237100d5 100644 --- a/services/kubecontroller.go +++ b/services/kubecontroller.go @@ -1,6 +1,7 @@ package services import ( + "context" "fmt" "github.com/docker/docker/api/types/container" @@ -10,16 +11,16 @@ import ( "github.com/rancher/types/apis/management.cattle.io/v3" ) -func runKubeController(host *hosts.Host, kubeControllerService v3.KubeControllerService, authorizationMode string, df hosts.DialerFactory) error { +func runKubeController(ctx context.Context, host *hosts.Host, kubeControllerService v3.KubeControllerService, authorizationMode string, df hosts.DialerFactory) error { imageCfg, hostCfg := buildKubeControllerConfig(kubeControllerService, authorizationMode) - if err := docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeControllerContainerName, host.Address, ControlRole); err != nil { + if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeControllerContainerName, host.Address, ControlRole); err != nil { return err } - return runHealthcheck(host, KubeControllerPort, false, KubeControllerContainerName, df) + return runHealthcheck(ctx, host, KubeControllerPort, false, KubeControllerContainerName, df) } -func removeKubeController(host *hosts.Host) error { - return docker.DoRemoveContainer(host.DClient, KubeControllerContainerName, host.Address) +func removeKubeController(ctx context.Context, host *hosts.Host) error { + return docker.DoRemoveContainer(ctx, host.DClient, KubeControllerContainerName, host.Address) } func buildKubeControllerConfig(kubeControllerService v3.KubeControllerService, authorizationMode string) (*container.Config, *container.HostConfig) { diff --git a/services/kubelet.go b/services/kubelet.go index 88f931f4..44109bf6 100644 --- a/services/kubelet.go +++ b/services/kubelet.go @@ -1,6 +1,7 @@ package services import ( + "context" "fmt" "github.com/docker/docker/api/types/container" @@ -10,16 +11,16 @@ import ( "github.com/rancher/types/apis/management.cattle.io/v3" ) -func runKubelet(host *hosts.Host, kubeletService v3.KubeletService, df hosts.DialerFactory) error { +func runKubelet(ctx context.Context, host *hosts.Host, kubeletService v3.KubeletService, df hosts.DialerFactory) error { imageCfg, hostCfg := buildKubeletConfig(host, kubeletService) - if err := docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeletContainerName, host.Address, WorkerRole); err != nil { + if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeletContainerName, host.Address, WorkerRole); err != nil { return err } - return runHealthcheck(host, KubeletPort, true, KubeletContainerName, df) + return runHealthcheck(ctx, host, KubeletPort, true, KubeletContainerName, df) } -func removeKubelet(host *hosts.Host) error { - return docker.DoRemoveContainer(host.DClient, KubeletContainerName, host.Address) +func removeKubelet(ctx context.Context, host *hosts.Host) error { + return docker.DoRemoveContainer(ctx, host.DClient, KubeletContainerName, host.Address) } func buildKubeletConfig(host *hosts.Host, kubeletService v3.KubeletService) (*container.Config, *container.HostConfig) { diff --git a/services/kubeproxy.go b/services/kubeproxy.go index 9f04d124..113a50bf 100644 --- a/services/kubeproxy.go +++ b/services/kubeproxy.go @@ -1,6 +1,7 @@ package services import ( + "context" "fmt" "github.com/docker/docker/api/types/container" @@ -10,16 +11,16 @@ import ( "github.com/rancher/types/apis/management.cattle.io/v3" ) -func runKubeproxy(host *hosts.Host, kubeproxyService v3.KubeproxyService, df hosts.DialerFactory) error { +func runKubeproxy(ctx context.Context, host *hosts.Host, kubeproxyService v3.KubeproxyService, df hosts.DialerFactory) error { imageCfg, hostCfg := buildKubeproxyConfig(host, kubeproxyService) - if err := docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeproxyContainerName, host.Address, WorkerRole); err != nil { + if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, KubeproxyContainerName, host.Address, WorkerRole); err != nil { return err } - return runHealthcheck(host, KubeproxyPort, false, KubeproxyContainerName, df) + return runHealthcheck(ctx, host, KubeproxyPort, false, KubeproxyContainerName, df) } -func removeKubeproxy(host *hosts.Host) error { - return docker.DoRemoveContainer(host.DClient, KubeproxyContainerName, host.Address) +func removeKubeproxy(ctx context.Context, host *hosts.Host) error { + return docker.DoRemoveContainer(ctx, host.DClient, KubeproxyContainerName, host.Address) } func buildKubeproxyConfig(host *hosts.Host, kubeproxyService v3.KubeproxyService) (*container.Config, *container.HostConfig) { diff --git a/services/proxy.go b/services/proxy.go index 25049786..efda42fe 100644 --- a/services/proxy.go +++ b/services/proxy.go @@ -1,6 +1,7 @@ package services import ( + "context" "fmt" "github.com/docker/docker/api/types/container" @@ -13,25 +14,25 @@ const ( NginxProxyEnvName = "CP_HOSTS" ) -func RollingUpdateNginxProxy(cpHosts []*hosts.Host, workerHosts []*hosts.Host, nginxProxyImage string) error { +func RollingUpdateNginxProxy(ctx context.Context, cpHosts []*hosts.Host, workerHosts []*hosts.Host, nginxProxyImage string) error { nginxProxyEnv := buildProxyEnv(cpHosts) for _, host := range workerHosts { imageCfg, hostCfg := buildNginxProxyConfig(host, nginxProxyEnv, nginxProxyImage) - if err := docker.DoRollingUpdateContainer(host.DClient, imageCfg, hostCfg, NginxProxyContainerName, host.Address, WorkerRole); err != nil { + if err := docker.DoRollingUpdateContainer(ctx, host.DClient, imageCfg, hostCfg, NginxProxyContainerName, host.Address, WorkerRole); err != nil { return err } } return nil } -func runNginxProxy(host *hosts.Host, cpHosts []*hosts.Host, nginxProxyImage string) error { +func runNginxProxy(ctx context.Context, host *hosts.Host, cpHosts []*hosts.Host, nginxProxyImage string) error { nginxProxyEnv := buildProxyEnv(cpHosts) imageCfg, hostCfg := buildNginxProxyConfig(host, nginxProxyEnv, nginxProxyImage) - return docker.DoRunContainer(host.DClient, imageCfg, hostCfg, NginxProxyContainerName, host.Address, WorkerRole) + return docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, NginxProxyContainerName, host.Address, WorkerRole) } -func removeNginxProxy(host *hosts.Host) error { - return docker.DoRemoveContainer(host.DClient, NginxProxyContainerName, host.Address) +func removeNginxProxy(ctx context.Context, host *hosts.Host) error { + return docker.DoRemoveContainer(ctx, host.DClient, NginxProxyContainerName, host.Address) } func buildNginxProxyConfig(host *hosts.Host, nginxProxyEnv, nginxProxyImage string) (*container.Config, *container.HostConfig) { diff --git a/services/scheduler.go b/services/scheduler.go index 94bf9bf4..0b2a2931 100644 --- a/services/scheduler.go +++ b/services/scheduler.go @@ -1,6 +1,7 @@ package services import ( + "context" "fmt" "github.com/docker/docker/api/types/container" @@ -10,16 +11,16 @@ import ( "github.com/rancher/types/apis/management.cattle.io/v3" ) -func runScheduler(host *hosts.Host, schedulerService v3.SchedulerService, df hosts.DialerFactory) error { +func runScheduler(ctx context.Context, host *hosts.Host, schedulerService v3.SchedulerService, df hosts.DialerFactory) error { imageCfg, hostCfg := buildSchedulerConfig(host, schedulerService) - if err := docker.DoRunContainer(host.DClient, imageCfg, hostCfg, SchedulerContainerName, host.Address, ControlRole); err != nil { + if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, SchedulerContainerName, host.Address, ControlRole); err != nil { return err } - return runHealthcheck(host, SchedulerPort, false, SchedulerContainerName, df) + return runHealthcheck(ctx, host, SchedulerPort, false, SchedulerContainerName, df) } -func removeScheduler(host *hosts.Host) error { - return docker.DoRemoveContainer(host.DClient, SchedulerContainerName, host.Address) +func removeScheduler(ctx context.Context, host *hosts.Host) error { + return docker.DoRemoveContainer(ctx, host.DClient, SchedulerContainerName, host.Address) } func buildSchedulerConfig(host *hosts.Host, schedulerService v3.SchedulerService) (*container.Config, *container.HostConfig) { diff --git a/services/services.go b/services/services.go index d93bea36..e1774b5b 100644 --- a/services/services.go +++ b/services/services.go @@ -1,13 +1,14 @@ package services import ( + "context" "fmt" "net" "github.com/docker/docker/api/types/container" "github.com/rancher/rke/docker" "github.com/rancher/rke/hosts" - "github.com/sirupsen/logrus" + "github.com/rancher/rke/log" ) const ( @@ -59,25 +60,25 @@ func buildSidekickConfig(sidekickImage string) (*container.Config, *container.Ho return imageCfg, hostCfg } -func runSidekick(host *hosts.Host, sidekickImage string) error { - isRunning, err := docker.IsContainerRunning(host.DClient, host.Address, SidekickContainerName, true) +func runSidekick(ctx context.Context, host *hosts.Host, sidekickImage string) error { + isRunning, err := docker.IsContainerRunning(ctx, host.DClient, host.Address, SidekickContainerName, true) if err != nil { return err } if isRunning { - logrus.Infof("[%s] Sidekick container already created on host [%s]", SidekickServiceName, host.Address) + log.Infof(ctx, "[%s] Sidekick container already created on host [%s]", SidekickServiceName, host.Address) return nil } imageCfg, hostCfg := buildSidekickConfig(sidekickImage) - if err := docker.UseLocalOrPull(host.DClient, host.Address, sidekickImage, SidekickServiceName); err != nil { + if err := docker.UseLocalOrPull(ctx, host.DClient, host.Address, sidekickImage, SidekickServiceName); err != nil { return err } - if _, err := docker.CreateContiner(host.DClient, host.Address, SidekickContainerName, imageCfg, hostCfg); err != nil { + if _, err := docker.CreateContiner(ctx, host.DClient, host.Address, SidekickContainerName, imageCfg, hostCfg); err != nil { return err } return nil } -func removeSidekick(host *hosts.Host) error { - return docker.DoRemoveContainer(host.DClient, SidekickContainerName, host.Address) +func removeSidekick(ctx context.Context, host *hosts.Host) error { + return docker.DoRemoveContainer(ctx, host.DClient, SidekickContainerName, host.Address) } diff --git a/services/workerplane.go b/services/workerplane.go index ded9d49c..7f380819 100644 --- a/services/workerplane.go +++ b/services/workerplane.go @@ -1,21 +1,23 @@ package services import ( + "context" + "github.com/rancher/rke/hosts" + "github.com/rancher/rke/log" "github.com/rancher/types/apis/management.cattle.io/v3" - "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" ) -func RunWorkerPlane(controlHosts, workerHosts []*hosts.Host, workerServices v3.RKEConfigServices, nginxProxyImage, sidekickImage string, healthcheckDialerFactory hosts.DialerFactory) error { - logrus.Infof("[%s] Building up Worker Plane..", WorkerRole) +func RunWorkerPlane(ctx context.Context, controlHosts, workerHosts []*hosts.Host, workerServices v3.RKEConfigServices, nginxProxyImage, sidekickImage string, healthcheckDialerFactory hosts.DialerFactory) error { + log.Infof(ctx, "[%s] Building up Worker Plane..", WorkerRole) var errgrp errgroup.Group // Deploy worker components on control hosts for _, host := range controlHosts { controlHost := host errgrp.Go(func() error { - return doDeployWorkerPlane(controlHost, workerServices, nginxProxyImage, sidekickImage, healthcheckDialerFactory, controlHosts) + return doDeployWorkerPlane(ctx, controlHost, workerServices, nginxProxyImage, sidekickImage, healthcheckDialerFactory, controlHosts) }) } if err := errgrp.Wait(); err != nil { @@ -25,61 +27,61 @@ func RunWorkerPlane(controlHosts, workerHosts []*hosts.Host, workerServices v3.R for _, host := range workerHosts { workerHost := host errgrp.Go(func() error { - return doDeployWorkerPlane(workerHost, workerServices, nginxProxyImage, sidekickImage, healthcheckDialerFactory, controlHosts) + return doDeployWorkerPlane(ctx, workerHost, workerServices, nginxProxyImage, sidekickImage, healthcheckDialerFactory, controlHosts) }) } if err := errgrp.Wait(); err != nil { return err } - logrus.Infof("[%s] Successfully started Worker Plane..", WorkerRole) + log.Infof(ctx, "[%s] Successfully started Worker Plane..", WorkerRole) return nil } -func RemoveWorkerPlane(workerHosts []*hosts.Host, force bool) error { - logrus.Infof("[%s] Tearing down Worker Plane..", WorkerRole) +func RemoveWorkerPlane(ctx context.Context, workerHosts []*hosts.Host, force bool) error { + log.Infof(ctx, "[%s] Tearing down Worker Plane..", WorkerRole) for _, host := range workerHosts { // check if the host already is a controlplane if host.IsControl && !force { - logrus.Infof("[%s] Host [%s] is already a controlplane host, nothing to do.", WorkerRole, host.Address) + log.Infof(ctx, "[%s] Host [%s] is already a controlplane host, nothing to do.", WorkerRole, host.Address) return nil } - if err := removeKubelet(host); err != nil { + if err := removeKubelet(ctx, host); err != nil { return err } - if err := removeKubeproxy(host); err != nil { + if err := removeKubeproxy(ctx, host); err != nil { return err } - if err := removeNginxProxy(host); err != nil { + if err := removeNginxProxy(ctx, host); err != nil { return err } - if err := removeSidekick(host); err != nil { + if err := removeSidekick(ctx, host); err != nil { return err } - logrus.Infof("[%s] Successfully teared down Worker Plane..", WorkerRole) + log.Infof(ctx, "[%s] Successfully teared down Worker Plane..", WorkerRole) } return nil } -func doDeployWorkerPlane(host *hosts.Host, +func doDeployWorkerPlane(ctx context.Context, host *hosts.Host, workerServices v3.RKEConfigServices, nginxProxyImage, sidekickImage string, healthcheckDialerFactory hosts.DialerFactory, controlHosts []*hosts.Host) error { // run nginx proxy if !host.IsControl { - if err := runNginxProxy(host, controlHosts, nginxProxyImage); err != nil { + if err := runNginxProxy(ctx, host, controlHosts, nginxProxyImage); err != nil { return err } } // run sidekick - if err := runSidekick(host, sidekickImage); err != nil { + if err := runSidekick(ctx, host, sidekickImage); err != nil { return err } // run kubelet - if err := runKubelet(host, workerServices.Kubelet, healthcheckDialerFactory); err != nil { + if err := runKubelet(ctx, host, workerServices.Kubelet, healthcheckDialerFactory); err != nil { return err } - return runKubeproxy(host, workerServices.Kubeproxy, healthcheckDialerFactory) + return runKubeproxy(ctx, host, workerServices.Kubeproxy, healthcheckDialerFactory) }