1
0
mirror of https://github.com/rancher/rke.git synced 2025-07-16 16:31:07 +00:00

Merge pull request #154 from galal-hussein/dialer_factory

Add Dialer Factory and receive rkeConfig instead of cluster yaml
This commit is contained in:
Alena Prokharchyk 2017-12-15 21:28:52 -08:00 committed by GitHub
commit 68a3ef2a43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 102 additions and 97 deletions

View File

@ -20,7 +20,7 @@ import (
type Cluster struct { type Cluster struct {
v3.RancherKubernetesEngineConfig `yaml:",inline"` v3.RancherKubernetesEngineConfig `yaml:",inline"`
ConfigPath string `yaml:"config_path"` ConfigPath string
LocalKubeConfigPath string LocalKubeConfigPath string
EtcdHosts []*hosts.Host EtcdHosts []*hosts.Host
WorkerHosts []*hosts.Host WorkerHosts []*hosts.Host
@ -31,7 +31,7 @@ type Cluster struct {
ClusterDomain string ClusterDomain string
ClusterCIDR string ClusterCIDR string
ClusterDNSServer string ClusterDNSServer string
Dialer hosts.Dialer DialerFactory hosts.DialerFactory
} }
const ( const (
@ -80,21 +80,30 @@ func (c *Cluster) DeployClusterPlanes() error {
return nil return nil
} }
func ParseConfig(clusterFile string, customDialer hosts.Dialer) (*Cluster, error) { func ParseConfig(clusterFile string) (*v3.RancherKubernetesEngineConfig, error) {
logrus.Debugf("Parsing cluster file [%v]", clusterFile) logrus.Debugf("Parsing cluster file [%v]", clusterFile)
var err error var rkeConfig v3.RancherKubernetesEngineConfig
c, err := parseClusterFile(clusterFile) if err := yaml.Unmarshal([]byte(clusterFile), &rkeConfig); err != nil {
if err != nil { return nil, err
return nil, fmt.Errorf("Failed to parse the cluster file: %v", err)
} }
c.Dialer = customDialer return &rkeConfig, nil
err = c.InvertIndexHosts() }
if err != nil {
func ParseCluster(rkeConfig *v3.RancherKubernetesEngineConfig, clusterFilePath string, dialerFactory hosts.DialerFactory) (*Cluster, error) {
var err error
c := &Cluster{
RancherKubernetesEngineConfig: *rkeConfig,
ConfigPath: clusterFilePath,
DialerFactory: dialerFactory,
}
// Setting cluster Defaults
c.setClusterDefaults()
if err := c.InvertIndexHosts(); err != nil {
return nil, fmt.Errorf("Failed to classify hosts from config file: %v", err) return nil, fmt.Errorf("Failed to classify hosts from config file: %v", err)
} }
err = c.ValidateCluster() if err := c.ValidateCluster(); err != nil {
if err != nil {
return nil, fmt.Errorf("Failed to validate cluster: %v", err) return nil, fmt.Errorf("Failed to validate cluster: %v", err)
} }
@ -112,19 +121,6 @@ func ParseConfig(clusterFile string, customDialer hosts.Dialer) (*Cluster, error
return c, nil return c, nil
} }
func parseClusterFile(clusterFile string) (*Cluster, error) {
// parse hosts
var kubeCluster Cluster
err := yaml.Unmarshal([]byte(clusterFile), &kubeCluster)
if err != nil {
return nil, err
}
// Setting cluster Defaults
kubeCluster.setClusterDefaults()
return &kubeCluster, nil
}
func (c *Cluster) setClusterDefaults() { func (c *Cluster) setClusterDefaults() {
if len(c.SSHKeyPath) == 0 { if len(c.SSHKeyPath) == 0 {
c.SSHKeyPath = DefaultClusterSSHKeyPath c.SSHKeyPath = DefaultClusterSSHKeyPath

View File

@ -11,18 +11,18 @@ import (
func (c *Cluster) TunnelHosts() error { func (c *Cluster) TunnelHosts() error {
for i := range c.EtcdHosts { for i := range c.EtcdHosts {
if err := c.EtcdHosts[i].TunnelUp(); err != nil { if err := c.EtcdHosts[i].TunnelUp(c.DialerFactory); err != nil {
return fmt.Errorf("Failed to set up SSH tunneling for Etcd host [%s]: %v", c.EtcdHosts[i].Address, err) return fmt.Errorf("Failed to set up SSH tunneling for Etcd host [%s]: %v", c.EtcdHosts[i].Address, err)
} }
} }
for i := range c.ControlPlaneHosts { for i := range c.ControlPlaneHosts {
err := c.ControlPlaneHosts[i].TunnelUp() err := c.ControlPlaneHosts[i].TunnelUp(c.DialerFactory)
if err != nil { if err != nil {
return fmt.Errorf("Failed to set up SSH tunneling for Control host [%s]: %v", c.ControlPlaneHosts[i].Address, err) return fmt.Errorf("Failed to set up SSH tunneling for Control host [%s]: %v", c.ControlPlaneHosts[i].Address, err)
} }
} }
for i := range c.WorkerHosts { for i := range c.WorkerHosts {
if err := c.WorkerHosts[i].TunnelUp(); err != nil { if err := c.WorkerHosts[i].TunnelUp(c.DialerFactory); err != nil {
return fmt.Errorf("Failed to set up SSH tunneling for Worker host [%s]: %v", c.WorkerHosts[i].Address, err) return fmt.Errorf("Failed to set up SSH tunneling for Worker host [%s]: %v", c.WorkerHosts[i].Address, err)
} }
} }
@ -37,9 +37,6 @@ func (c *Cluster) InvertIndexHosts() error {
newHost := hosts.Host{ newHost := hosts.Host{
RKEConfigNode: host, RKEConfigNode: host,
} }
if err := newHost.RegisterDialer(c.Dialer); err != nil {
return fmt.Errorf("Failed to register new Dialer for host [%s]: %v", host.Address, err)
}
newHost.IgnoreDockerVersion = c.IgnoreDockerVersion newHost.IgnoreDockerVersion = c.IgnoreDockerVersion

View File

@ -43,7 +43,7 @@ func reconcileWorker(currentCluster, kubeCluster *Cluster, kubeClient *kubernete
return fmt.Errorf("Failed to delete worker node %s from cluster", toDeleteHost.Address) return fmt.Errorf("Failed to delete worker node %s from cluster", toDeleteHost.Address)
} }
// attempting to clean services/files on the host // attempting to clean services/files on the host
if err := reconcileHost(toDeleteHost, true, currentCluster.SystemImages[AplineImage]); err != nil { if err := reconcileHost(toDeleteHost, true, currentCluster.SystemImages[AplineImage], currentCluster.DialerFactory); err != nil {
logrus.Warnf("[reconcile] Couldn't clean up worker node [%s]: %v", toDeleteHost.Address, err) logrus.Warnf("[reconcile] Couldn't clean up worker node [%s]: %v", toDeleteHost.Address, err)
continue continue
} }
@ -75,7 +75,7 @@ func reconcileControl(currentCluster, kubeCluster *Cluster, kubeClient *kubernet
return fmt.Errorf("Failed to delete controlplane node %s from cluster", toDeleteHost.Address) return fmt.Errorf("Failed to delete controlplane node %s from cluster", toDeleteHost.Address)
} }
// attempting to clean services/files on the host // attempting to clean services/files on the host
if err := reconcileHost(toDeleteHost, false, currentCluster.SystemImages[AplineImage]); err != nil { if err := reconcileHost(toDeleteHost, false, currentCluster.SystemImages[AplineImage], currentCluster.DialerFactory); err != nil {
logrus.Warnf("[reconcile] Couldn't clean up controlplane node [%s]: %v", toDeleteHost.Address, err) logrus.Warnf("[reconcile] Couldn't clean up controlplane node [%s]: %v", toDeleteHost.Address, err)
continue continue
} }
@ -96,8 +96,8 @@ func reconcileControl(currentCluster, kubeCluster *Cluster, kubeClient *kubernet
return nil return nil
} }
func reconcileHost(toDeleteHost *hosts.Host, worker bool, cleanerImage string) error { func reconcileHost(toDeleteHost *hosts.Host, worker bool, cleanerImage string, dialerFactory hosts.DialerFactory) error {
if err := toDeleteHost.TunnelUp(); err != nil { if err := toDeleteHost.TunnelUp(dialerFactory); err != nil {
return fmt.Errorf("Not able to reach the host: %v", err) return fmt.Errorf("Not able to reach the host: %v", err)
} }
if worker { if worker {

View File

@ -6,13 +6,14 @@ import (
"time" "time"
"github.com/rancher/rke/k8s" "github.com/rancher/rke/k8s"
"github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
yaml "gopkg.in/yaml.v2" yaml "gopkg.in/yaml.v2"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
) )
func (c *Cluster) SaveClusterState(clusterFile string) error { func (c *Cluster) SaveClusterState(rkeConfig *v3.RancherKubernetesEngineConfig) error {
// Reinitialize kubernetes Client // Reinitialize kubernetes Client
var err error var err error
c.KubeClient, err = k8s.NewClient(c.LocalKubeConfigPath) c.KubeClient, err = k8s.NewClient(c.LocalKubeConfigPath)
@ -23,7 +24,7 @@ func (c *Cluster) SaveClusterState(clusterFile string) error {
if err != nil { if err != nil {
return fmt.Errorf("[certificates] Failed to Save Kubernetes certificates: %v", err) return fmt.Errorf("[certificates] Failed to Save Kubernetes certificates: %v", err)
} }
err = saveStateToKubernetes(c.KubeClient, c.LocalKubeConfigPath, []byte(clusterFile)) err = saveStateToKubernetes(c.KubeClient, c.LocalKubeConfigPath, rkeConfig)
if err != nil { if err != nil {
return fmt.Errorf("[state] Failed to save configuration state: %v", err) return fmt.Errorf("[state] Failed to save configuration state: %v", err)
} }
@ -56,7 +57,7 @@ func (c *Cluster) GetClusterState() (*Cluster, error) {
// Get previous kubernetes certificates // Get previous kubernetes certificates
if currentCluster != nil { if currentCluster != nil {
currentCluster.Certificates, err = getClusterCerts(c.KubeClient) currentCluster.Certificates, err = getClusterCerts(c.KubeClient)
currentCluster.Dialer = c.Dialer currentCluster.DialerFactory = c.DialerFactory
if err != nil { if err != nil {
return nil, fmt.Errorf("Failed to Get Kubernetes certificates: %v", err) return nil, fmt.Errorf("Failed to Get Kubernetes certificates: %v", err)
} }
@ -75,8 +76,12 @@ func (c *Cluster) GetClusterState() (*Cluster, error) {
return currentCluster, nil return currentCluster, nil
} }
func saveStateToKubernetes(kubeClient *kubernetes.Clientset, kubeConfigPath string, clusterFile []byte) error { func saveStateToKubernetes(kubeClient *kubernetes.Clientset, kubeConfigPath string, rkeConfig *v3.RancherKubernetesEngineConfig) error {
logrus.Infof("[state] Saving cluster state to Kubernetes") logrus.Infof("[state] Saving cluster state to Kubernetes")
clusterFile, err := yaml.Marshal(*rkeConfig)
if err != nil {
return err
}
timeout := make(chan bool, 1) timeout := make(chan bool, 1)
go func() { go func() {
for { for {

View File

@ -9,29 +9,21 @@ import (
"github.com/urfave/cli" "github.com/urfave/cli"
) )
func resolveClusterFile(ctx *cli.Context) (string, error) { func resolveClusterFile(ctx *cli.Context) (string, string, error) {
clusterFile := ctx.String("config") clusterFile := ctx.String("config")
fp, err := filepath.Abs(clusterFile) fp, err := filepath.Abs(clusterFile)
if err != nil { if err != nil {
return "", fmt.Errorf("failed to lookup current directory name: %v", err) return "", "", fmt.Errorf("failed to lookup current directory name: %v", err)
} }
file, err := os.Open(fp) file, err := os.Open(fp)
if err != nil { if err != nil {
return "", fmt.Errorf("Can not find cluster configuration file: %v", err) return "", "", fmt.Errorf("Can not find cluster configuration file: %v", err)
} }
defer file.Close() defer file.Close()
buf, err := ioutil.ReadAll(file) buf, err := ioutil.ReadAll(file)
if err != nil { if err != nil {
return "", fmt.Errorf("failed to read file: %v", err) return "", "", fmt.Errorf("failed to read file: %v", err)
} }
clusterFileBuff := string(buf) clusterFileBuff := string(buf)
return clusterFileBuff, clusterFile, nil
/*
This is a hacky way to add config path to cluster object without messing with
ClusterUp function and to avoid conflict with calls from kontainer-engine, basically
i add config path (cluster.yml by default) to a field into the config buffer
to be parsed later and added as ConfigPath field into cluster object.
*/
clusterFileBuff = fmt.Sprintf("%s\nconfig_path: %s\n", clusterFileBuff, clusterFile)
return clusterFileBuff, nil
} }

View File

@ -8,6 +8,7 @@ import (
"github.com/rancher/rke/cluster" "github.com/rancher/rke/cluster"
"github.com/rancher/rke/hosts" "github.com/rancher/rke/hosts"
"github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/urfave/cli" "github.com/urfave/cli"
) )
@ -33,9 +34,9 @@ func RemoveCommand() cli.Command {
} }
} }
func ClusterRemove(clusterFile string, customDialer hosts.Dialer) error { func ClusterRemove(rkeConfig *v3.RancherKubernetesEngineConfig, dialerFactory hosts.DialerFactory) error {
logrus.Infof("Tearing down Kubernetes cluster") logrus.Infof("Tearing down Kubernetes cluster")
kubeCluster, err := cluster.ParseConfig(clusterFile, customDialer) kubeCluster, err := cluster.ParseCluster(rkeConfig, clusterFilePath, dialerFactory)
if err != nil { if err != nil {
return err return err
} }
@ -69,9 +70,15 @@ func clusterRemoveFromCli(ctx *cli.Context) error {
return nil return nil
} }
} }
clusterFile, err := resolveClusterFile(ctx) clusterFile, filePath, err := resolveClusterFile(ctx)
if err != nil { if err != nil {
return fmt.Errorf("Failed to resolve cluster file: %v", err) return fmt.Errorf("Failed to resolve cluster file: %v", err)
} }
return ClusterRemove(clusterFile, nil) clusterFilePath = filePath
rkeConfig, err := cluster.ParseConfig(clusterFile)
if err != nil {
return fmt.Errorf("Failed to parse cluster file: %v", err)
}
return ClusterRemove(rkeConfig, nil)
} }

View File

@ -6,11 +6,14 @@ import (
"github.com/rancher/rke/cluster" "github.com/rancher/rke/cluster"
"github.com/rancher/rke/hosts" "github.com/rancher/rke/hosts"
"github.com/rancher/rke/pki" "github.com/rancher/rke/pki"
"github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/urfave/cli" "github.com/urfave/cli"
"k8s.io/client-go/util/cert" "k8s.io/client-go/util/cert"
) )
var clusterFilePath string
func UpCommand() cli.Command { func UpCommand() cli.Command {
upFlags := []cli.Flag{ upFlags := []cli.Flag{
cli.StringFlag{ cli.StringFlag{
@ -28,10 +31,10 @@ func UpCommand() cli.Command {
} }
} }
func ClusterUp(clusterFile string, customDialer hosts.Dialer) (string, string, string, string, error) { func ClusterUp(rkeConfig *v3.RancherKubernetesEngineConfig, dialerFactory hosts.DialerFactory) (string, string, string, string, error) {
logrus.Infof("Building Kubernetes cluster") logrus.Infof("Building Kubernetes cluster")
var APIURL, caCrt, clientCert, clientKey string var APIURL, caCrt, clientCert, clientKey string
kubeCluster, err := cluster.ParseConfig(clusterFile, customDialer) kubeCluster, err := cluster.ParseCluster(rkeConfig, clusterFilePath, dialerFactory)
if err != nil { if err != nil {
return APIURL, caCrt, clientCert, clientKey, err return APIURL, caCrt, clientCert, clientKey, err
} }
@ -70,7 +73,7 @@ func ClusterUp(clusterFile string, customDialer hosts.Dialer) (string, string, s
return APIURL, caCrt, clientCert, clientKey, err return APIURL, caCrt, clientCert, clientKey, err
} }
err = kubeCluster.SaveClusterState(clusterFile) err = kubeCluster.SaveClusterState(rkeConfig)
if err != nil { if err != nil {
return APIURL, caCrt, clientCert, clientKey, err return APIURL, caCrt, clientCert, clientKey, err
} }
@ -100,10 +103,16 @@ func ClusterUp(clusterFile string, customDialer hosts.Dialer) (string, string, s
} }
func clusterUpFromCli(ctx *cli.Context) error { func clusterUpFromCli(ctx *cli.Context) error {
clusterFile, err := resolveClusterFile(ctx) clusterFile, filePath, err := resolveClusterFile(ctx)
if err != nil { if err != nil {
return fmt.Errorf("Failed to resolve cluster file: %v", err) return fmt.Errorf("Failed to resolve cluster file: %v", err)
} }
_, _, _, _, err = ClusterUp(clusterFile, nil) clusterFilePath = filePath
rkeConfig, err := cluster.ParseConfig(clusterFile)
if err != nil {
return fmt.Errorf("Failed to parse cluster file: %v", err)
}
_, _, _, _, err = ClusterUp(rkeConfig, nil)
return err return err
} }

View File

@ -8,29 +8,26 @@ import (
"golang.org/x/crypto/ssh" "golang.org/x/crypto/ssh"
) )
type Dialer interface { type DialerFactory func(h *Host) (func(network, address string) (net.Conn, error), error)
NewHTTPClient() (*http.Client, error)
}
type sshDialer struct { type dialer struct {
host *Host host *Host
signer ssh.Signer signer ssh.Signer
} }
func (d *sshDialer) NewHTTPClient() (*http.Client, error) { func SSHFactory(h *Host) (func(network, address string) (net.Conn, error), error) {
dialer := &sshDialer{ key, err := checkEncryptedKey(h.SSHKey, h.SSHKeyPath)
host: d.host, if err != nil {
signer: d.signer, return nil, fmt.Errorf("Failed to parse the private key: %v", err)
} }
httpClient := &http.Client{ dialer := &dialer{
Transport: &http.Transport{ host: h,
Dial: dialer.Dial, signer: key,
},
} }
return httpClient, nil return dialer.Dial, nil
} }
func (d *sshDialer) Dial(network, addr string) (net.Conn, error) { func (d *dialer) Dial(network, addr string) (net.Conn, error) {
sshAddr := d.host.Address + ":22" sshAddr := d.host.Address + ":22"
// Build SSH client configuration // Build SSH client configuration
cfg, err := makeSSHConfig(d.host.User, d.signer) cfg, err := makeSSHConfig(d.host.User, d.signer)
@ -51,3 +48,23 @@ func (d *sshDialer) Dial(network, addr string) (net.Conn, error) {
} }
return remote, err return remote, err
} }
func (h *Host) newHTTPClient(dialerFactory DialerFactory) (*http.Client, error) {
var factory DialerFactory
if dialerFactory == nil {
factory = SSHFactory
} else {
factory = dialerFactory
}
dialer, err := factory(h)
if err != nil {
return nil, err
}
return &http.Client{
Transport: &http.Transport{
Dial: dialer,
},
}, nil
}

View File

@ -16,7 +16,6 @@ import (
type Host struct { type Host struct {
v3.RKEConfigNode v3.RKEConfigNode
DClient *client.Client DClient *client.Client
Dialer Dialer
IsControl bool IsControl bool
IsWorker bool IsWorker bool
IgnoreDockerVersion bool IgnoreDockerVersion bool
@ -91,24 +90,6 @@ func (h *Host) CleanUp(toCleanPaths []string, cleanerImage string) error {
return nil return nil
} }
func (h *Host) RegisterDialer(customDialer Dialer) error {
if customDialer == nil {
logrus.Infof("[ssh] Setup tunnel for host [%s]", h.Address)
key, err := checkEncryptedKey(h.SSHKey, h.SSHKeyPath)
if err != nil {
return fmt.Errorf("Failed to parse the private key: %v", err)
}
dialer := &sshDialer{
host: h,
signer: key,
}
h.Dialer = dialer
} else {
h.Dialer = customDialer
}
return nil
}
func DeleteNode(toDeleteHost *Host, kubeClient *kubernetes.Clientset, hasAnotherRole bool) error { func DeleteNode(toDeleteHost *Host, kubeClient *kubernetes.Clientset, hasAnotherRole bool) error {
if hasAnotherRole { if hasAnotherRole {
logrus.Infof("[hosts] host [%s] has another role, skipping delete from kubernetes cluster", toDeleteHost.Address) logrus.Infof("[hosts] host [%s] has another role, skipping delete from kubernetes cluster", toDeleteHost.Address)

View File

@ -21,13 +21,14 @@ const (
K8sVersion = "1.8" K8sVersion = "1.8"
) )
func (h *Host) TunnelUp() error { func (h *Host) TunnelUp(dialerFactory DialerFactory) error {
if h.DClient != nil { if h.DClient != nil {
return nil return nil
} }
httpClient, err := h.Dialer.NewHTTPClient() logrus.Infof("[dialer] Setup tunnel for host [%s]", h.Address)
httpClient, err := h.newHTTPClient(dialerFactory)
if err != nil { if err != nil {
return fmt.Errorf("Failed to initiate a new Dialer to host [%s]", h.Address) return fmt.Errorf("Can't establish dialer connection: %v", err)
} }
// set Docker client // set Docker client