From fd47cdc80d149d6d2fb4a21b7d635c7f1faa87c4 Mon Sep 17 00:00:00 2001 From: galal-hussein Date: Sat, 16 Dec 2017 05:38:15 +0200 Subject: [PATCH] Add Dialer Factory --- cluster/cluster.go | 46 ++++++++++++++++++++------------------------ cluster/hosts.go | 9 +++------ cluster/reconcile.go | 8 ++++---- cluster/state.go | 13 +++++++++---- cmd/common.go | 18 +++++------------ cmd/remove.go | 15 +++++++++++---- cmd/up.go | 19 +++++++++++++----- hosts/dialer.go | 45 +++++++++++++++++++++++++++++-------------- hosts/hosts.go | 19 ------------------ hosts/tunnel.go | 7 ++++--- 10 files changed, 102 insertions(+), 97 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index bcc9802d..c159883f 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -19,7 +19,7 @@ import ( type Cluster struct { v3.RancherKubernetesEngineConfig `yaml:",inline"` - ConfigPath string `yaml:"config_path"` + ConfigPath string LocalKubeConfigPath string EtcdHosts []*hosts.Host WorkerHosts []*hosts.Host @@ -30,7 +30,7 @@ type Cluster struct { ClusterDomain string ClusterCIDR string ClusterDNSServer string - Dialer hosts.Dialer + DialerFactory hosts.DialerFactory } const ( @@ -73,21 +73,30 @@ func (c *Cluster) DeployClusterPlanes() error { return nil } -func ParseConfig(clusterFile string, customDialer hosts.Dialer) (*Cluster, error) { +func ParseConfig(clusterFile string) (*v3.RancherKubernetesEngineConfig, error) { logrus.Debugf("Parsing cluster file [%v]", clusterFile) - var err error - c, err := parseClusterFile(clusterFile) - if err != nil { - return nil, fmt.Errorf("Failed to parse the cluster file: %v", err) + var rkeConfig v3.RancherKubernetesEngineConfig + if err := yaml.Unmarshal([]byte(clusterFile), &rkeConfig); err != nil { + return nil, err } - c.Dialer = customDialer - err = c.InvertIndexHosts() - if err != nil { + return &rkeConfig, nil +} + +func ParseCluster(rkeConfig *v3.RancherKubernetesEngineConfig, clusterFilePath string, dialerFactory hosts.DialerFactory) (*Cluster, error) { + var err error + c := &Cluster{ + RancherKubernetesEngineConfig: *rkeConfig, + ConfigPath: clusterFilePath, + DialerFactory: dialerFactory, + } + // Setting cluster Defaults + c.setClusterDefaults() + + if err := c.InvertIndexHosts(); err != nil { return nil, fmt.Errorf("Failed to classify hosts from config file: %v", err) } - err = c.ValidateCluster() - if err != nil { + if err := c.ValidateCluster(); err != nil { return nil, fmt.Errorf("Failed to validate cluster: %v", err) } @@ -105,19 +114,6 @@ func ParseConfig(clusterFile string, customDialer hosts.Dialer) (*Cluster, error return c, nil } -func parseClusterFile(clusterFile string) (*Cluster, error) { - // parse hosts - var kubeCluster Cluster - err := yaml.Unmarshal([]byte(clusterFile), &kubeCluster) - if err != nil { - return nil, err - } - // Setting cluster Defaults - kubeCluster.setClusterDefaults() - - return &kubeCluster, nil -} - func (c *Cluster) setClusterDefaults() { if len(c.SSHKeyPath) == 0 { c.SSHKeyPath = DefaultClusterSSHKeyPath diff --git a/cluster/hosts.go b/cluster/hosts.go index 0b072d0b..86b82aed 100644 --- a/cluster/hosts.go +++ b/cluster/hosts.go @@ -11,18 +11,18 @@ import ( func (c *Cluster) TunnelHosts() error { for i := range c.EtcdHosts { - if err := c.EtcdHosts[i].TunnelUp(); err != nil { + if err := c.EtcdHosts[i].TunnelUp(c.DialerFactory); err != nil { return fmt.Errorf("Failed to set up SSH tunneling for Etcd host [%s]: %v", c.EtcdHosts[i].Address, err) } } for i := range c.ControlPlaneHosts { - err := c.ControlPlaneHosts[i].TunnelUp() + err := c.ControlPlaneHosts[i].TunnelUp(c.DialerFactory) if err != nil { return fmt.Errorf("Failed to set up SSH tunneling for Control host [%s]: %v", c.ControlPlaneHosts[i].Address, err) } } for i := range c.WorkerHosts { - if err := c.WorkerHosts[i].TunnelUp(); err != nil { + if err := c.WorkerHosts[i].TunnelUp(c.DialerFactory); err != nil { return fmt.Errorf("Failed to set up SSH tunneling for Worker host [%s]: %v", c.WorkerHosts[i].Address, err) } } @@ -37,9 +37,6 @@ func (c *Cluster) InvertIndexHosts() error { newHost := hosts.Host{ RKEConfigNode: host, } - if err := newHost.RegisterDialer(c.Dialer); err != nil { - return fmt.Errorf("Failed to register new Dialer for host [%s]: %v", host.Address, err) - } newHost.IgnoreDockerVersion = c.IgnoreDockerVersion diff --git a/cluster/reconcile.go b/cluster/reconcile.go index f16adc2e..4b106d44 100644 --- a/cluster/reconcile.go +++ b/cluster/reconcile.go @@ -43,7 +43,7 @@ func reconcileWorker(currentCluster, kubeCluster *Cluster, kubeClient *kubernete return fmt.Errorf("Failed to delete worker node %s from cluster", toDeleteHost.Address) } // attempting to clean services/files on the host - if err := reconcileHost(toDeleteHost, true, currentCluster.SystemImages[AplineImage]); err != nil { + if err := reconcileHost(toDeleteHost, true, currentCluster.SystemImages[AplineImage], currentCluster.DialerFactory); err != nil { logrus.Warnf("[reconcile] Couldn't clean up worker node [%s]: %v", toDeleteHost.Address, err) continue } @@ -75,7 +75,7 @@ func reconcileControl(currentCluster, kubeCluster *Cluster, kubeClient *kubernet return fmt.Errorf("Failed to delete controlplane node %s from cluster", toDeleteHost.Address) } // attempting to clean services/files on the host - if err := reconcileHost(toDeleteHost, false, currentCluster.SystemImages[AplineImage]); err != nil { + if err := reconcileHost(toDeleteHost, false, currentCluster.SystemImages[AplineImage], currentCluster.DialerFactory); err != nil { logrus.Warnf("[reconcile] Couldn't clean up controlplane node [%s]: %v", toDeleteHost.Address, err) continue } @@ -96,8 +96,8 @@ func reconcileControl(currentCluster, kubeCluster *Cluster, kubeClient *kubernet return nil } -func reconcileHost(toDeleteHost *hosts.Host, worker bool, cleanerImage string) error { - if err := toDeleteHost.TunnelUp(); err != nil { +func reconcileHost(toDeleteHost *hosts.Host, worker bool, cleanerImage string, dialerFactory hosts.DialerFactory) error { + if err := toDeleteHost.TunnelUp(dialerFactory); err != nil { return fmt.Errorf("Not able to reach the host: %v", err) } if worker { diff --git a/cluster/state.go b/cluster/state.go index 7c148c55..1aadab47 100644 --- a/cluster/state.go +++ b/cluster/state.go @@ -6,13 +6,14 @@ import ( "time" "github.com/rancher/rke/k8s" + "github.com/rancher/types/apis/management.cattle.io/v3" "github.com/sirupsen/logrus" yaml "gopkg.in/yaml.v2" "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" ) -func (c *Cluster) SaveClusterState(clusterFile string) error { +func (c *Cluster) SaveClusterState(rkeConfig *v3.RancherKubernetesEngineConfig) error { // Reinitialize kubernetes Client var err error c.KubeClient, err = k8s.NewClient(c.LocalKubeConfigPath) @@ -23,7 +24,7 @@ func (c *Cluster) SaveClusterState(clusterFile string) error { if err != nil { return fmt.Errorf("[certificates] Failed to Save Kubernetes certificates: %v", err) } - err = saveStateToKubernetes(c.KubeClient, c.LocalKubeConfigPath, []byte(clusterFile)) + err = saveStateToKubernetes(c.KubeClient, c.LocalKubeConfigPath, rkeConfig) if err != nil { return fmt.Errorf("[state] Failed to save configuration state: %v", err) } @@ -56,7 +57,7 @@ func (c *Cluster) GetClusterState() (*Cluster, error) { // Get previous kubernetes certificates if currentCluster != nil { currentCluster.Certificates, err = getClusterCerts(c.KubeClient) - currentCluster.Dialer = c.Dialer + currentCluster.DialerFactory = c.DialerFactory if err != nil { return nil, fmt.Errorf("Failed to Get Kubernetes certificates: %v", err) } @@ -75,8 +76,12 @@ func (c *Cluster) GetClusterState() (*Cluster, error) { return currentCluster, nil } -func saveStateToKubernetes(kubeClient *kubernetes.Clientset, kubeConfigPath string, clusterFile []byte) error { +func saveStateToKubernetes(kubeClient *kubernetes.Clientset, kubeConfigPath string, rkeConfig *v3.RancherKubernetesEngineConfig) error { logrus.Infof("[state] Saving cluster state to Kubernetes") + clusterFile, err := yaml.Marshal(*rkeConfig) + if err != nil { + return err + } timeout := make(chan bool, 1) go func() { for { diff --git a/cmd/common.go b/cmd/common.go index 7b7311e7..6bab4ac0 100644 --- a/cmd/common.go +++ b/cmd/common.go @@ -9,29 +9,21 @@ import ( "github.com/urfave/cli" ) -func resolveClusterFile(ctx *cli.Context) (string, error) { +func resolveClusterFile(ctx *cli.Context) (string, string, error) { clusterFile := ctx.String("config") fp, err := filepath.Abs(clusterFile) if err != nil { - return "", fmt.Errorf("failed to lookup current directory name: %v", err) + return "", "", fmt.Errorf("failed to lookup current directory name: %v", err) } file, err := os.Open(fp) if err != nil { - return "", fmt.Errorf("Can not find cluster configuration file: %v", err) + return "", "", fmt.Errorf("Can not find cluster configuration file: %v", err) } defer file.Close() buf, err := ioutil.ReadAll(file) if err != nil { - return "", fmt.Errorf("failed to read file: %v", err) + return "", "", fmt.Errorf("failed to read file: %v", err) } clusterFileBuff := string(buf) - - /* - This is a hacky way to add config path to cluster object without messing with - ClusterUp function and to avoid conflict with calls from kontainer-engine, basically - i add config path (cluster.yml by default) to a field into the config buffer - to be parsed later and added as ConfigPath field into cluster object. - */ - clusterFileBuff = fmt.Sprintf("%s\nconfig_path: %s\n", clusterFileBuff, clusterFile) - return clusterFileBuff, nil + return clusterFileBuff, clusterFile, nil } diff --git a/cmd/remove.go b/cmd/remove.go index 2a8da465..d0c7e334 100644 --- a/cmd/remove.go +++ b/cmd/remove.go @@ -8,6 +8,7 @@ import ( "github.com/rancher/rke/cluster" "github.com/rancher/rke/hosts" + "github.com/rancher/types/apis/management.cattle.io/v3" "github.com/sirupsen/logrus" "github.com/urfave/cli" ) @@ -33,9 +34,9 @@ func RemoveCommand() cli.Command { } } -func ClusterRemove(clusterFile string, customDialer hosts.Dialer) error { +func ClusterRemove(rkeConfig *v3.RancherKubernetesEngineConfig, dialerFactory hosts.DialerFactory) error { logrus.Infof("Tearing down Kubernetes cluster") - kubeCluster, err := cluster.ParseConfig(clusterFile, customDialer) + kubeCluster, err := cluster.ParseCluster(rkeConfig, clusterFilePath, dialerFactory) if err != nil { return err } @@ -69,9 +70,15 @@ func clusterRemoveFromCli(ctx *cli.Context) error { return nil } } - clusterFile, err := resolveClusterFile(ctx) + clusterFile, filePath, err := resolveClusterFile(ctx) if err != nil { return fmt.Errorf("Failed to resolve cluster file: %v", err) } - return ClusterRemove(clusterFile, nil) + clusterFilePath = filePath + + rkeConfig, err := cluster.ParseConfig(clusterFile) + if err != nil { + return fmt.Errorf("Failed to parse cluster file: %v", err) + } + return ClusterRemove(rkeConfig, nil) } diff --git a/cmd/up.go b/cmd/up.go index 5f6ed6f1..5d743569 100644 --- a/cmd/up.go +++ b/cmd/up.go @@ -6,11 +6,14 @@ import ( "github.com/rancher/rke/cluster" "github.com/rancher/rke/hosts" "github.com/rancher/rke/pki" + "github.com/rancher/types/apis/management.cattle.io/v3" "github.com/sirupsen/logrus" "github.com/urfave/cli" "k8s.io/client-go/util/cert" ) +var clusterFilePath string + func UpCommand() cli.Command { upFlags := []cli.Flag{ cli.StringFlag{ @@ -28,10 +31,10 @@ func UpCommand() cli.Command { } } -func ClusterUp(clusterFile string, customDialer hosts.Dialer) (string, string, string, string, error) { +func ClusterUp(rkeConfig *v3.RancherKubernetesEngineConfig, dialerFactory hosts.DialerFactory) (string, string, string, string, error) { logrus.Infof("Building Kubernetes cluster") var APIURL, caCrt, clientCert, clientKey string - kubeCluster, err := cluster.ParseConfig(clusterFile, customDialer) + kubeCluster, err := cluster.ParseCluster(rkeConfig, clusterFilePath, dialerFactory) if err != nil { return APIURL, caCrt, clientCert, clientKey, err } @@ -70,7 +73,7 @@ func ClusterUp(clusterFile string, customDialer hosts.Dialer) (string, string, s return APIURL, caCrt, clientCert, clientKey, err } - err = kubeCluster.SaveClusterState(clusterFile) + err = kubeCluster.SaveClusterState(rkeConfig) if err != nil { return APIURL, caCrt, clientCert, clientKey, err } @@ -100,10 +103,16 @@ func ClusterUp(clusterFile string, customDialer hosts.Dialer) (string, string, s } func clusterUpFromCli(ctx *cli.Context) error { - clusterFile, err := resolveClusterFile(ctx) + clusterFile, filePath, err := resolveClusterFile(ctx) if err != nil { return fmt.Errorf("Failed to resolve cluster file: %v", err) } - _, _, _, _, err = ClusterUp(clusterFile, nil) + clusterFilePath = filePath + + rkeConfig, err := cluster.ParseConfig(clusterFile) + if err != nil { + return fmt.Errorf("Failed to parse cluster file: %v", err) + } + _, _, _, _, err = ClusterUp(rkeConfig, nil) return err } diff --git a/hosts/dialer.go b/hosts/dialer.go index ef981ad0..d006df56 100644 --- a/hosts/dialer.go +++ b/hosts/dialer.go @@ -8,29 +8,26 @@ import ( "golang.org/x/crypto/ssh" ) -type Dialer interface { - NewHTTPClient() (*http.Client, error) -} +type DialerFactory func(h *Host) (func(network, address string) (net.Conn, error), error) -type sshDialer struct { +type dialer struct { host *Host signer ssh.Signer } -func (d *sshDialer) NewHTTPClient() (*http.Client, error) { - dialer := &sshDialer{ - host: d.host, - signer: d.signer, +func SSHFactory(h *Host) (func(network, address string) (net.Conn, error), error) { + key, err := checkEncryptedKey(h.SSHKey, h.SSHKeyPath) + if err != nil { + return nil, fmt.Errorf("Failed to parse the private key: %v", err) } - httpClient := &http.Client{ - Transport: &http.Transport{ - Dial: dialer.Dial, - }, + dialer := &dialer{ + host: h, + signer: key, } - return httpClient, nil + return dialer.Dial, nil } -func (d *sshDialer) Dial(network, addr string) (net.Conn, error) { +func (d *dialer) Dial(network, addr string) (net.Conn, error) { sshAddr := d.host.Address + ":22" // Build SSH client configuration cfg, err := makeSSHConfig(d.host.User, d.signer) @@ -51,3 +48,23 @@ func (d *sshDialer) Dial(network, addr string) (net.Conn, error) { } return remote, err } + +func (h *Host) newHTTPClient(dialerFactory DialerFactory) (*http.Client, error) { + var factory DialerFactory + + if dialerFactory == nil { + factory = SSHFactory + } else { + factory = dialerFactory + } + + dialer, err := factory(h) + if err != nil { + return nil, err + } + return &http.Client{ + Transport: &http.Transport{ + Dial: dialer, + }, + }, nil +} diff --git a/hosts/hosts.go b/hosts/hosts.go index 21584e49..2df9ee70 100644 --- a/hosts/hosts.go +++ b/hosts/hosts.go @@ -16,7 +16,6 @@ import ( type Host struct { v3.RKEConfigNode DClient *client.Client - Dialer Dialer IsControl bool IsWorker bool IgnoreDockerVersion bool @@ -91,24 +90,6 @@ func (h *Host) CleanUp(toCleanPaths []string, cleanerImage string) error { return nil } -func (h *Host) RegisterDialer(customDialer Dialer) error { - if customDialer == nil { - logrus.Infof("[ssh] Setup tunnel for host [%s]", h.Address) - key, err := checkEncryptedKey(h.SSHKey, h.SSHKeyPath) - if err != nil { - return fmt.Errorf("Failed to parse the private key: %v", err) - } - dialer := &sshDialer{ - host: h, - signer: key, - } - h.Dialer = dialer - } else { - h.Dialer = customDialer - } - return nil -} - func DeleteNode(toDeleteHost *Host, kubeClient *kubernetes.Clientset, hasAnotherRole bool) error { if hasAnotherRole { logrus.Infof("[hosts] host [%s] has another role, skipping delete from kubernetes cluster", toDeleteHost.Address) diff --git a/hosts/tunnel.go b/hosts/tunnel.go index c854e4aa..b6f5bef6 100644 --- a/hosts/tunnel.go +++ b/hosts/tunnel.go @@ -21,13 +21,14 @@ const ( K8sVersion = "1.8" ) -func (h *Host) TunnelUp() error { +func (h *Host) TunnelUp(dialerFactory DialerFactory) error { if h.DClient != nil { return nil } - httpClient, err := h.Dialer.NewHTTPClient() + logrus.Infof("[dialer] Setup tunnel for host [%s]", h.Address) + httpClient, err := h.newHTTPClient(dialerFactory) if err != nil { - return fmt.Errorf("Failed to initiate a new Dialer to host [%s]", h.Address) + return fmt.Errorf("Can't establish dialer connection: %v", err) } // set Docker client