diff --git a/cluster/cluster.go b/cluster/cluster.go index 83fd595f..ffcd9051 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -34,7 +34,7 @@ type Cluster struct { ClusterCIDR string ClusterDNSServer string DockerDialerFactory hosts.DialerFactory - HealthcheckDialerFactory hosts.DialerFactory + LocalConnDialerFactory hosts.DialerFactory } const ( @@ -56,7 +56,7 @@ const ( func (c *Cluster) DeployControlPlane(ctx context.Context) error { // Deploy Etcd Plane - if err := services.RunEtcdPlane(ctx, c.EtcdHosts, c.Services.Etcd); err != nil { + if err := services.RunEtcdPlane(ctx, c.EtcdHosts, c.Services.Etcd, c.LocalConnDialerFactory); err != nil { return fmt.Errorf("[etcd] Failed to bring up Etcd Plane: %v", err) } // Deploy Control plane @@ -65,7 +65,7 @@ func (c *Cluster) DeployControlPlane(ctx context.Context) error { c.Services, c.SystemImages[ServiceSidekickImage], c.Authorization.Mode, - c.HealthcheckDialerFactory); err != nil { + c.LocalConnDialerFactory); err != nil { return fmt.Errorf("[controlPlane] Failed to bring up Control Plane: %v", err) } // Apply Authz configuration after deploying controlplane @@ -82,7 +82,7 @@ func (c *Cluster) DeployWorkerPlane(ctx context.Context) error { c.Services, c.SystemImages[NginxProxyImage], c.SystemImages[ServiceSidekickImage], - c.HealthcheckDialerFactory); err != nil { + c.LocalConnDialerFactory); err != nil { return fmt.Errorf("[workerPlane] Failed to bring up Worker Plane: %v", err) } return nil @@ -97,13 +97,13 @@ func ParseConfig(clusterFile string) (*v3.RancherKubernetesEngineConfig, error) return &rkeConfig, nil } -func ParseCluster(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, clusterFilePath string, dockerDialerFactory, healthcheckDialerFactory hosts.DialerFactory) (*Cluster, error) { +func ParseCluster(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, clusterFilePath string, dockerDialerFactory, localConnDialerFactory hosts.DialerFactory) (*Cluster, error) { var err error c := &Cluster{ RancherKubernetesEngineConfig: *rkeConfig, ConfigPath: clusterFilePath, DockerDialerFactory: dockerDialerFactory, - HealthcheckDialerFactory: healthcheckDialerFactory, + LocalConnDialerFactory: localConnDialerFactory, } // Setting cluster Defaults c.setClusterDefaults(ctx) diff --git a/cluster/hosts.go b/cluster/hosts.go index 8d3bed36..ba0f4d3c 100644 --- a/cluster/hosts.go +++ b/cluster/hosts.go @@ -47,6 +47,7 @@ func (c *Cluster) InvertIndexHosts() error { logrus.Debugf("Host: " + host.Address + " has role: " + role) switch role { case services.ETCDRole: + newHost.IsEtcd = true c.EtcdHosts = append(c.EtcdHosts, &newHost) case services.ControlRole: newHost.IsControl = true diff --git a/cluster/reconcile.go b/cluster/reconcile.go index 1b5d6d9d..b40c1e8b 100644 --- a/cluster/reconcile.go +++ b/cluster/reconcile.go @@ -19,6 +19,11 @@ func ReconcileCluster(ctx context.Context, kubeCluster, currentCluster *Cluster) return nil } + + if err := reconcileEtcd(ctx, currentCluster, kubeCluster); err != nil { + return fmt.Errorf("Failed to reconcile etcd plane: %v", err) + } + kubeClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath) if err != nil { return fmt.Errorf("Failed to initialize new kubernetes client: %v", err) @@ -45,7 +50,7 @@ func reconcileWorker(ctx context.Context, currentCluster, kubeCluster *Cluster, return fmt.Errorf("Failed to delete worker node %s from cluster", toDeleteHost.Address) } // attempting to clean services/files on the host - if err := reconcileHost(ctx, toDeleteHost, true, currentCluster.SystemImages[AplineImage], currentCluster.DockerDialerFactory); err != nil { + if err := reconcileHost(ctx, toDeleteHost, true, false, currentCluster.SystemImages[AplineImage], currentCluster.DockerDialerFactory); err != nil { log.Warnf(ctx, "[reconcile] Couldn't clean up worker node [%s]: %v", toDeleteHost.Address, err) continue } @@ -77,7 +82,7 @@ func reconcileControl(ctx context.Context, currentCluster, kubeCluster *Cluster, return fmt.Errorf("Failed to delete controlplane node %s from cluster", toDeleteHost.Address) } // attempting to clean services/files on the host - if err := reconcileHost(ctx, toDeleteHost, false, currentCluster.SystemImages[AplineImage], currentCluster.DockerDialerFactory); err != nil { + if err := reconcileHost(ctx, toDeleteHost, false, false, currentCluster.SystemImages[AplineImage], currentCluster.DockerDialerFactory); err != nil { log.Warnf(ctx, "[reconcile] Couldn't clean up controlplane node [%s]: %v", toDeleteHost.Address, err) continue } @@ -98,7 +103,7 @@ func reconcileControl(ctx context.Context, currentCluster, kubeCluster *Cluster, return nil } -func reconcileHost(ctx context.Context, toDeleteHost *hosts.Host, worker bool, cleanerImage string, dialerFactory hosts.DialerFactory) error { +func reconcileHost(ctx context.Context, toDeleteHost *hosts.Host, worker, etcd bool, cleanerImage string, dialerFactory hosts.DialerFactory) error { if err := toDeleteHost.TunnelUp(ctx, dialerFactory); err != nil { return fmt.Errorf("Not able to reach the host: %v", err) } @@ -106,16 +111,54 @@ func reconcileHost(ctx context.Context, toDeleteHost *hosts.Host, worker bool, c if err := services.RemoveWorkerPlane(ctx, []*hosts.Host{toDeleteHost}, false); err != nil { return fmt.Errorf("Couldn't remove worker plane: %v", err) } - if err := toDeleteHost.CleanUpWorkerHost(ctx, services.ControlRole, cleanerImage); err != nil { + if err := toDeleteHost.CleanUpWorkerHost(ctx, cleanerImage); err != nil { + return fmt.Errorf("Not able to clean the host: %v", err) + } + } else if etcd { + if err := services.RemoveEtcdPlane(ctx, []*hosts.Host{toDeleteHost}); err != nil { + return fmt.Errorf("Couldn't remove etcd plane: %v", err) + } + if err := toDeleteHost.CleanUpEtcdHost(ctx, cleanerImage); err != nil { return fmt.Errorf("Not able to clean the host: %v", err) } } else { if err := services.RemoveControlPlane(ctx, []*hosts.Host{toDeleteHost}, false); err != nil { return fmt.Errorf("Couldn't remove control plane: %v", err) } - if err := toDeleteHost.CleanUpControlHost(ctx, services.WorkerRole, cleanerImage); err != nil { + if err := toDeleteHost.CleanUpControlHost(ctx, cleanerImage); err != nil { return fmt.Errorf("Not able to clean the host: %v", err) } } return nil } + +func reconcileEtcd(ctx context.Context, currentCluster, kubeCluster *Cluster) error { + logrus.Infof("[reconcile] Check etcd hosts to be deleted") + etcdToDelete := hosts.GetToDeleteHosts(currentCluster.EtcdHosts, kubeCluster.EtcdHosts) + for _, etcdHost := range etcdToDelete { + if err := services.RemoveEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory); err != nil { + log.Warnf(ctx, "[reconcile] %v", err) + continue + } + // attempting to clean services/files on the host + if err := reconcileHost(ctx, etcdHost, false, true, currentCluster.SystemImages[AplineImage], currentCluster.DockerDialerFactory); err != nil { + log.Warnf(ctx, "[reconcile] Couldn't clean up etcd node [%s]: %v", etcdHost.Address, err) + continue + } + } + logrus.Infof("[reconcile] Check etcd hosts to be added") + etcdToAdd := hosts.GetToAddHosts(currentCluster.EtcdHosts, kubeCluster.EtcdHosts) + for _, etcdHost := range etcdToAdd { + etcdHost.ToAddEtcdMember = true + } + for _, etcdHost := range etcdToAdd { + if err := services.AddEtcdMember(ctx, etcdHost, kubeCluster.EtcdHosts, currentCluster.LocalConnDialerFactory); err != nil { + return err + } + etcdHost.ToAddEtcdMember = false + if err := services.ReloadEtcdCluster(ctx, kubeCluster.EtcdHosts, kubeCluster.Services.Etcd, currentCluster.LocalConnDialerFactory); err != nil { + return err + } + } + return nil +} diff --git a/cluster/validation.go b/cluster/validation.go index f0dc4740..3663090f 100644 --- a/cluster/validation.go +++ b/cluster/validation.go @@ -12,9 +12,6 @@ func (c *Cluster) ValidateCluster() error { if len(c.ControlPlaneHosts) == 0 { return fmt.Errorf("Cluster must have at least one control plane host") } - if len(c.EtcdHosts)%2 == 0 { - return fmt.Errorf("Cluster must have odd number of etcd nodes") - } if len(c.WorkerHosts) == 0 { return fmt.Errorf("Cluster must have at least one worker plane host") } diff --git a/cmd/up.go b/cmd/up.go index dc24a00f..4ef94fc2 100644 --- a/cmd/up.go +++ b/cmd/up.go @@ -32,10 +32,10 @@ func UpCommand() cli.Command { } } -func ClusterUp(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, dockerDialerFactory, healthcheckDialerFactory hosts.DialerFactory) (string, string, string, string, error) { +func ClusterUp(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, dockerDialerFactory, localConnDialerFactory hosts.DialerFactory) (string, string, string, string, error) { log.Infof(ctx, "Building Kubernetes cluster") var APIURL, caCrt, clientCert, clientKey string - kubeCluster, err := cluster.ParseCluster(ctx, rkeConfig, clusterFilePath, dockerDialerFactory, healthcheckDialerFactory) + kubeCluster, err := cluster.ParseCluster(ctx, rkeConfig, clusterFilePath, dockerDialerFactory, localConnDialerFactory) if err != nil { return APIURL, caCrt, clientCert, clientKey, err } @@ -50,10 +50,6 @@ func ClusterUp(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, return APIURL, caCrt, clientCert, clientKey, err } - if err := cluster.CheckEtcdHostsChanged(kubeCluster, currentCluster); err != nil { - return APIURL, caCrt, clientCert, clientKey, err - } - err = cluster.SetUpAuthentication(ctx, kubeCluster, currentCluster) if err != nil { return APIURL, caCrt, clientCert, clientKey, err diff --git a/hosts/dialer.go b/hosts/dialer.go index f944dd2e..9ad6aa58 100644 --- a/hosts/dialer.go +++ b/hosts/dialer.go @@ -27,7 +27,7 @@ func SSHFactory(h *Host) (func(network, address string) (net.Conn, error), error return dialer.DialDocker, nil } -func HealthcheckFactory(h *Host) (func(network, address string) (net.Conn, error), error) { +func LocalConnFactory(h *Host) (func(network, address string) (net.Conn, error), error) { key, err := checkEncryptedKey(h.SSHKey, h.SSHKeyPath) if err != nil { return nil, fmt.Errorf("Failed to parse the private key: %v", err) @@ -36,7 +36,7 @@ func HealthcheckFactory(h *Host) (func(network, address string) (net.Conn, error host: h, signer: key, } - return dialer.DialHealthcheck, nil + return dialer.DialLocalConn, nil } func (d *dialer) DialDocker(network, addr string) (net.Conn, error) { @@ -61,7 +61,7 @@ func (d *dialer) DialDocker(network, addr string) (net.Conn, error) { return remote, err } -func (d *dialer) DialHealthcheck(network, addr string) (net.Conn, error) { +func (d *dialer) DialLocalConn(network, addr string) (net.Conn, error) { sshAddr := d.host.Address + ":22" // Build SSH client configuration cfg, err := makeSSHConfig(d.host.User, d.signer) @@ -73,9 +73,9 @@ func (d *dialer) DialHealthcheck(network, addr string) (net.Conn, error) { if err != nil { return nil, fmt.Errorf("Failed to dial ssh using address [%s]: %v", sshAddr, err) } - remote, err := conn.Dial("tcp", fmt.Sprintf("localhost:%d", d.host.HealthcheckPort)) + remote, err := conn.Dial("tcp", fmt.Sprintf("localhost:%d", d.host.LocalConnPort)) if err != nil { - return nil, fmt.Errorf("Failed to dial to Healthcheck Port [%d] on host [%s]: %v", d.host.HealthcheckPort, d.host.Address, err) + return nil, fmt.Errorf("Failed to dial to Local Port [%d] on host [%s]: %v", d.host.LocalConnPort, d.host.Address, err) } return remote, err } diff --git a/hosts/hosts.go b/hosts/hosts.go index a906db09..0f02359f 100644 --- a/hosts/hosts.go +++ b/hosts/hosts.go @@ -17,10 +17,13 @@ import ( type Host struct { v3.RKEConfigNode DClient *client.Client - HealthcheckPort int + LocalConnPort int IsControl bool IsWorker bool + IsEtcd bool EnforceDockerVersion bool + ToAddEtcdMember bool + ExistingEtcdCluster bool } const ( @@ -44,7 +47,7 @@ func (h *Host) CleanUpAll(ctx context.Context, cleanerImage string) error { return h.CleanUp(ctx, toCleanPaths, cleanerImage) } -func (h *Host) CleanUpWorkerHost(ctx context.Context, controlRole, cleanerImage string) error { +func (h *Host) CleanUpWorkerHost(ctx context.Context, cleanerImage string) error { if h.IsControl { log.Infof(ctx, "[hosts] Host [%s] is already a controlplane host, skipping cleanup.", h.Address) return nil @@ -58,7 +61,7 @@ func (h *Host) CleanUpWorkerHost(ctx context.Context, controlRole, cleanerImage return h.CleanUp(ctx, toCleanPaths, cleanerImage) } -func (h *Host) CleanUpControlHost(ctx context.Context, workerRole, cleanerImage string) error { +func (h *Host) CleanUpControlHost(ctx context.Context, cleanerImage string) error { if h.IsWorker { log.Infof(ctx, "[hosts] Host [%s] is already a worker host, skipping cleanup.", h.Address) return nil @@ -72,6 +75,20 @@ func (h *Host) CleanUpControlHost(ctx context.Context, workerRole, cleanerImage return h.CleanUp(ctx, toCleanPaths, cleanerImage) } +func (h *Host) CleanUpEtcdHost(ctx context.Context, cleanerImage string) error { + toCleanPaths := []string{ + ToCleanEtcdDir, + ToCleanSSLDir, + } + if h.IsWorker || h.IsControl { + log.Infof(ctx, "[hosts] Host [%s] is already a worker or control host, skipping cleanup certs.", h.Address) + toCleanPaths = []string{ + ToCleanEtcdDir, + } + } + return h.CleanUp(ctx, toCleanPaths, cleanerImage) +} + func (h *Host) CleanUp(ctx context.Context, toCleanPaths []string, cleanerImage string) error { log.Infof(ctx, "[hosts] Cleaning up host [%s]", h.Address) imageCfg, hostCfg := buildCleanerConfig(h, toCleanPaths, cleanerImage) @@ -133,6 +150,23 @@ func GetToDeleteHosts(currentHosts, configHosts []*Host) []*Host { return toDeleteHosts } +func GetToAddHosts(currentHosts, configHosts []*Host) []*Host { + toAddHosts := []*Host{} + for _, configHost := range configHosts { + found := false + for _, currentHost := range currentHosts { + if currentHost.Address == configHost.Address { + found = true + break + } + } + if !found { + toAddHosts = append(toAddHosts, configHost) + } + } + return toAddHosts +} + func IsHostListChanged(currentHosts, configHosts []*Host) bool { changed := false for _, host := range currentHosts { diff --git a/services/controlplane.go b/services/controlplane.go index 9cb5fd0b..6ace9c02 100644 --- a/services/controlplane.go +++ b/services/controlplane.go @@ -8,7 +8,7 @@ import ( "github.com/rancher/types/apis/management.cattle.io/v3" ) -func RunControlPlane(ctx context.Context, controlHosts, etcdHosts []*hosts.Host, controlServices v3.RKEConfigServices, sidekickImage, authorizationMode string, healthcheckDialerFactory hosts.DialerFactory) error { +func RunControlPlane(ctx context.Context, controlHosts, etcdHosts []*hosts.Host, controlServices v3.RKEConfigServices, sidekickImage, authorizationMode string, localConnDialerFactory hosts.DialerFactory) error { log.Infof(ctx, "[%s] Building up Controller Plane..", ControlRole) for _, host := range controlHosts { @@ -22,17 +22,17 @@ func RunControlPlane(ctx context.Context, controlHosts, etcdHosts []*hosts.Host, return err } // run kubeapi - err := runKubeAPI(ctx, host, etcdHosts, controlServices.KubeAPI, authorizationMode, healthcheckDialerFactory) + err := runKubeAPI(ctx, host, etcdHosts, controlServices.KubeAPI, authorizationMode, localConnDialerFactory) if err != nil { return err } // run kubecontroller - err = runKubeController(ctx, host, controlServices.KubeController, authorizationMode, healthcheckDialerFactory) + err = runKubeController(ctx, host, controlServices.KubeController, authorizationMode, localConnDialerFactory) if err != nil { return err } // run scheduler - err = runScheduler(ctx, host, controlServices.Scheduler, healthcheckDialerFactory) + err = runScheduler(ctx, host, controlServices.Scheduler, localConnDialerFactory) if err != nil { return err } diff --git a/services/etcd.go b/services/etcd.go index 270a4419..21af5ed1 100644 --- a/services/etcd.go +++ b/services/etcd.go @@ -2,18 +2,21 @@ package services import ( "fmt" + "time" "context" + etcdclient "github.com/coreos/etcd/client" "github.com/docker/docker/api/types/container" "github.com/docker/go-connections/nat" "github.com/rancher/rke/docker" "github.com/rancher/rke/hosts" "github.com/rancher/rke/log" "github.com/rancher/types/apis/management.cattle.io/v3" + "github.com/sirupsen/logrus" ) -func RunEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, etcdService v3.ETCDService) error { +func RunEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, etcdService v3.ETCDService, localConnDialerFactory hosts.DialerFactory) error { log.Infof(ctx, "[%s] Building up Etcd Plane..", ETCDRole) initCluster := getEtcdInitialCluster(etcdHosts) for _, host := range etcdHosts { @@ -40,6 +43,10 @@ func RemoveEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host) error { } func buildEtcdConfig(host *hosts.Host, etcdService v3.ETCDService, initCluster string) (*container.Config, *container.HostConfig) { + clusterState := "new" + if host.ExistingEtcdCluster { + clusterState = "existing" + } imageCfg := &container.Config{ Image: etcdService.Image, Cmd: []string{"/usr/local/bin/etcd", @@ -51,7 +58,7 @@ func buildEtcdConfig(host *hosts.Host, etcdService v3.ETCDService, initCluster s "--listen-peer-urls=http://0.0.0.0:2380", "--initial-cluster-token=etcd-cluster-1", "--initial-cluster=" + initCluster, - "--initial-cluster-state=new"}, + "--initial-cluster-state=" + clusterState}, } hostCfg := &container.HostConfig{ RestartPolicy: container.RestartPolicy{Name: "always"}, @@ -80,24 +87,87 @@ func buildEtcdConfig(host *hosts.Host, etcdService v3.ETCDService, initCluster s return imageCfg, hostCfg } -func GetEtcdConnString(hosts []*hosts.Host) string { - connString := "" - for i, host := range hosts { - connString += "http://" + host.InternalAddress + ":2379" - if i < (len(hosts) - 1) { - connString += "," +func AddEtcdMember(ctx context.Context, etcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory) error { + log.Infof(ctx, "[add/%s] Adding member [etcd-%s] to etcd cluster", ETCDRole, etcdHost.HostnameOverride) + peerURL := fmt.Sprintf("http://%s:2380", etcdHost.InternalAddress) + added := false + for _, host := range etcdHosts { + etcdClient, err := getEtcdClient(ctx, host, localConnDialerFactory) + if err != nil { + logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err) + continue } + memAPI := etcdclient.NewMembersAPI(etcdClient) + if _, err := memAPI.Add(ctx, peerURL); err != nil { + logrus.Debugf("Failed to list etcd members from host [%s]: %v", host.Address, err) + continue + } + added = true + break } - return connString + if !added { + return fmt.Errorf("Failed to add etcd member [etcd-%s] from etcd cluster", etcdHost.HostnameOverride) + } + log.Infof(ctx, "[add/%s] Successfully Added member [etcd-%s] to etcd cluster", ETCDRole, etcdHost.HostnameOverride) + return nil } -func getEtcdInitialCluster(hosts []*hosts.Host) string { - initialCluster := "" - for i, host := range hosts { - initialCluster += fmt.Sprintf("etcd-%s=http://%s:2380", host.HostnameOverride, host.InternalAddress) - if i < (len(hosts) - 1) { - initialCluster += "," +func RemoveEtcdMember(ctx context.Context, etcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory) error { + log.Infof(ctx, "[remove/%s] Removing member [etcd-%s] from etcd cluster", ETCDRole, etcdHost.HostnameOverride) + var mID string + removed := false + for _, host := range etcdHosts { + etcdClient, err := getEtcdClient(ctx, host, localConnDialerFactory) + if err != nil { + logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err) + continue + } + memAPI := etcdclient.NewMembersAPI(etcdClient) + members, err := memAPI.List(ctx) + if err != nil { + logrus.Debugf("Failed to list etcd members from host [%s]: %v", host.Address, err) + continue + } + for _, member := range members { + if member.Name == fmt.Sprintf("etcd-%s", etcdHost.HostnameOverride) { + mID = member.ID + break + } + } + if err := memAPI.Remove(ctx, mID); err != nil { + logrus.Debugf("Failed to list etcd members from host [%s]: %v", host.Address, err) + continue + } + removed = true + break + } + if !removed { + return fmt.Errorf("Failed to delete etcd member [etcd-%s] from etcd cluster", etcdHost.HostnameOverride) + } + log.Infof(ctx, "[remove/%s] Successfully removed member [etcd-%s] from etcd cluster", ETCDRole, etcdHost.HostnameOverride) + return nil +} + +func ReloadEtcdCluster(ctx context.Context, etcdHosts []*hosts.Host, etcdService v3.ETCDService, localConnDialerFactory hosts.DialerFactory) error { + readyEtcdHosts := []*hosts.Host{} + for _, host := range etcdHosts { + if !host.ToAddEtcdMember { + readyEtcdHosts = append(readyEtcdHosts, host) + host.ExistingEtcdCluster = true } } - return initialCluster + initCluster := getEtcdInitialCluster(readyEtcdHosts) + for _, host := range readyEtcdHosts { + imageCfg, hostCfg := buildEtcdConfig(host, etcdService, initCluster) + if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, EtcdContainerName, host.Address, ETCDRole); err != nil { + return err + } + } + time.Sleep(10 * time.Second) + for _, host := range readyEtcdHosts { + if healthy := isEtcdHealthy(ctx, localConnDialerFactory, host); healthy { + break + } + } + return nil } diff --git a/services/etcd_util.go b/services/etcd_util.go new file mode 100644 index 00000000..c6c1b83a --- /dev/null +++ b/services/etcd_util.go @@ -0,0 +1,111 @@ +package services + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net" + "net/http" + "time" + + etcdclient "github.com/coreos/etcd/client" + "github.com/rancher/rke/hosts" + "github.com/sirupsen/logrus" +) + +func getEtcdClient(ctx context.Context, etcdHost *hosts.Host, localConnDialerFactory hosts.DialerFactory) (etcdclient.Client, error) { + dialer, err := getEtcdDialer(localConnDialerFactory, etcdHost) + if err != nil { + return nil, fmt.Errorf("Failed to create a dialer for host [%s]: %v", etcdHost.Address, err) + } + + var DefaultEtcdTransport etcdclient.CancelableTransport = &http.Transport{ + Dial: dialer, + } + + cfg := etcdclient.Config{ + Endpoints: []string{"http://127.0.0.1:2379"}, + Transport: DefaultEtcdTransport, + } + + return etcdclient.New(cfg) +} + +func isEtcdHealthy(ctx context.Context, localConnDialerFactory hosts.DialerFactory, host *hosts.Host) bool { + logrus.Debugf("[etcd] Check etcd cluster health") + for i := 0; i < 3; i++ { + dialer, err := getEtcdDialer(localConnDialerFactory, host) + if err != nil { + logrus.Debugf("Failed to create a dialer for host [%s]: %v", host.Address, err) + time.Sleep(5 * time.Second) + continue + } + hc := http.Client{ + Transport: &http.Transport{ + Dial: dialer, + }, + } + healthy, err := getHealthEtcd(hc, host) + if err != nil { + logrus.Debug(err) + time.Sleep(5 * time.Second) + continue + } + if healthy == "true" { + logrus.Debugf("[etcd] etcd cluster is healthy") + return true + } + } + return false +} + +func getHealthEtcd(hc http.Client, host *hosts.Host) (string, error) { + healthy := struct{ Health string }{} + resp, err := hc.Get("http://127.0.0.1:2379/health") + if err != nil { + return healthy.Health, fmt.Errorf("Failed to get /health for host [%s]: %v", host.Address, err) + } + bytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return healthy.Health, fmt.Errorf("Failed to read response of /health for host [%s]: %v", host.Address, err) + } + resp.Body.Close() + if err := json.Unmarshal(bytes, &healthy); err != nil { + return healthy.Health, fmt.Errorf("Failed to unmarshal response of /health for host [%s]: %v", host.Address, err) + } + return healthy.Health, nil +} + +func getEtcdInitialCluster(hosts []*hosts.Host) string { + initialCluster := "" + for i, host := range hosts { + initialCluster += fmt.Sprintf("etcd-%s=http://%s:2380", host.HostnameOverride, host.InternalAddress) + if i < (len(hosts) - 1) { + initialCluster += "," + } + } + return initialCluster +} + +func getEtcdDialer(localConnDialerFactory hosts.DialerFactory, etcdHost *hosts.Host) (func(network, address string) (net.Conn, error), error) { + etcdHost.LocalConnPort = 2379 + var etcdFactory hosts.DialerFactory + if localConnDialerFactory == nil { + etcdFactory = hosts.LocalConnFactory + } else { + etcdFactory = localConnDialerFactory + } + return etcdFactory(etcdHost) +} + +func GetEtcdConnString(hosts []*hosts.Host) string { + connString := "" + for i, host := range hosts { + connString += "http://" + host.InternalAddress + ":2379" + if i < (len(hosts) - 1) { + connString += "," + } + } + return connString +} diff --git a/services/healthcheck.go b/services/healthcheck.go index 6ea65636..07960e9b 100644 --- a/services/healthcheck.go +++ b/services/healthcheck.go @@ -20,9 +20,9 @@ const ( HTTPSProtoPrefix = "https://" ) -func runHealthcheck(ctx context.Context, host *hosts.Host, port int, useTLS bool, serviceName string, healthcheckDialerFactory hosts.DialerFactory) error { +func runHealthcheck(ctx context.Context, host *hosts.Host, port int, useTLS bool, serviceName string, localConnDialerFactory hosts.DialerFactory) error { log.Infof(ctx, "[healthcheck] Start Healthcheck on service [%s] on host [%s]", serviceName, host.Address) - client, err := getHealthCheckHTTPClient(host, port, healthcheckDialerFactory) + client, err := getHealthCheckHTTPClient(host, port, localConnDialerFactory) if err != nil { return fmt.Errorf("Failed to initiate new HTTP client for service [%s] for host [%s]", serviceName, host.Address) } @@ -38,13 +38,13 @@ func runHealthcheck(ctx context.Context, host *hosts.Host, port int, useTLS bool return fmt.Errorf("Failed to verify healthcheck: %v", err) } -func getHealthCheckHTTPClient(host *hosts.Host, port int, healthcheckDialerFactory hosts.DialerFactory) (*http.Client, error) { - host.HealthcheckPort = port +func getHealthCheckHTTPClient(host *hosts.Host, port int, localConnDialerFactory hosts.DialerFactory) (*http.Client, error) { + host.LocalConnPort = port var factory hosts.DialerFactory - if healthcheckDialerFactory == nil { - factory = hosts.HealthcheckFactory + if localConnDialerFactory == nil { + factory = hosts.LocalConnFactory } else { - factory = healthcheckDialerFactory + factory = localConnDialerFactory } dialer, err := factory(host) if err != nil { diff --git a/services/workerplane.go b/services/workerplane.go index 7f380819..0835aa8a 100644 --- a/services/workerplane.go +++ b/services/workerplane.go @@ -9,7 +9,7 @@ import ( "golang.org/x/sync/errgroup" ) -func RunWorkerPlane(ctx context.Context, controlHosts, workerHosts []*hosts.Host, workerServices v3.RKEConfigServices, nginxProxyImage, sidekickImage string, healthcheckDialerFactory hosts.DialerFactory) error { +func RunWorkerPlane(ctx context.Context, controlHosts, workerHosts []*hosts.Host, workerServices v3.RKEConfigServices, nginxProxyImage, sidekickImage string, localConnDialerFactory hosts.DialerFactory) error { log.Infof(ctx, "[%s] Building up Worker Plane..", WorkerRole) var errgrp errgroup.Group @@ -17,7 +17,7 @@ func RunWorkerPlane(ctx context.Context, controlHosts, workerHosts []*hosts.Host for _, host := range controlHosts { controlHost := host errgrp.Go(func() error { - return doDeployWorkerPlane(ctx, controlHost, workerServices, nginxProxyImage, sidekickImage, healthcheckDialerFactory, controlHosts) + return doDeployWorkerPlane(ctx, controlHost, workerServices, nginxProxyImage, sidekickImage, localConnDialerFactory, controlHosts) }) } if err := errgrp.Wait(); err != nil { @@ -27,7 +27,7 @@ func RunWorkerPlane(ctx context.Context, controlHosts, workerHosts []*hosts.Host for _, host := range workerHosts { workerHost := host errgrp.Go(func() error { - return doDeployWorkerPlane(ctx, workerHost, workerServices, nginxProxyImage, sidekickImage, healthcheckDialerFactory, controlHosts) + return doDeployWorkerPlane(ctx, workerHost, workerServices, nginxProxyImage, sidekickImage, localConnDialerFactory, controlHosts) }) } if err := errgrp.Wait(); err != nil { @@ -67,7 +67,7 @@ func RemoveWorkerPlane(ctx context.Context, workerHosts []*hosts.Host, force boo func doDeployWorkerPlane(ctx context.Context, host *hosts.Host, workerServices v3.RKEConfigServices, nginxProxyImage, sidekickImage string, - healthcheckDialerFactory hosts.DialerFactory, + localConnDialerFactory hosts.DialerFactory, controlHosts []*hosts.Host) error { // run nginx proxy if !host.IsControl { @@ -80,8 +80,8 @@ func doDeployWorkerPlane(ctx context.Context, host *hosts.Host, return err } // run kubelet - if err := runKubelet(ctx, host, workerServices.Kubelet, healthcheckDialerFactory); err != nil { + if err := runKubelet(ctx, host, workerServices.Kubelet, localConnDialerFactory); err != nil { return err } - return runKubeproxy(ctx, host, workerServices.Kubeproxy, healthcheckDialerFactory) + return runKubeproxy(ctx, host, workerServices.Kubeproxy, localConnDialerFactory) } diff --git a/vendor.conf b/vendor.conf index 56754387..8cf2e489 100644 --- a/vendor.conf +++ b/vendor.conf @@ -23,8 +23,5 @@ github.com/coreos/etcd 52f73c5a6cb0d1d196ffd6eced406c9d8502078 github.com/coreos/go-semver e214231b295a8ea9479f11b70b35d5acf3556d9b github.com/ugorji/go/codec ccfe18359b55b97855cee1d3f74e5efbda4869dc -#google.golang.org/grpc 1cd234627e6f392ade0527d593eb3fe53e832d4a -#google.golang.org/genproto a8101f21cf983e773d0c1133ebc5424792003214 - github.com/rancher/norman 82272b3beedaca1cf35dd4b2cdb1a4aba9198bb7 github.com/rancher/types a9fe51806cc4656e2e8c2e9b998abfc2b537c36a