From ed7ca110b065b6cc69f19af8e23b9e45a9b9f88d Mon Sep 17 00:00:00 2001 From: galal-hussein Date: Wed, 20 Dec 2017 00:18:27 +0200 Subject: [PATCH] Add healtcheck for services components Integrate healthcheck with each service --- cluster/cluster.go | 14 +++++--- cluster/hosts.go | 6 ++-- cluster/reconcile.go | 4 +-- cluster/state.go | 2 +- cmd/remove.go | 2 +- cmd/up.go | 6 ++-- hosts/dialer.go | 35 ++++++++++++++++-- hosts/hosts.go | 1 + services/controlplane.go | 8 ++--- services/healthcheck.go | 73 ++++++++++++++++++++++++++++++++++++++ services/kubeapi.go | 7 ++-- services/kubecontroller.go | 7 ++-- services/kubelet.go | 7 ++-- services/kubeproxy.go | 7 ++-- services/scheduler.go | 7 ++-- services/services.go | 6 ++++ services/workerplane.go | 10 +++--- 17 files changed, 166 insertions(+), 36 deletions(-) create mode 100644 services/healthcheck.go diff --git a/cluster/cluster.go b/cluster/cluster.go index 2a7382b2..18b5dda3 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -31,7 +31,8 @@ type Cluster struct { ClusterDomain string ClusterCIDR string ClusterDNSServer string - DialerFactory hosts.DialerFactory + DockerDialerFactory hosts.DialerFactory + HealthcheckDialerFactory hosts.DialerFactory } const ( @@ -61,7 +62,8 @@ func (c *Cluster) DeployClusterPlanes() error { c.EtcdHosts, c.Services, c.SystemImages[ServiceSidekickImage], - c.Authorization.Mode) + c.Authorization.Mode, + c.HealthcheckDialerFactory) if err != nil { return fmt.Errorf("[controlPlane] Failed to bring up Control Plane: %v", err) } @@ -73,7 +75,8 @@ func (c *Cluster) DeployClusterPlanes() error { c.WorkerHosts, c.Services, c.SystemImages[NginxProxyImage], - c.SystemImages[ServiceSidekickImage]) + c.SystemImages[ServiceSidekickImage], + c.HealthcheckDialerFactory) if err != nil { return fmt.Errorf("[workerPlane] Failed to bring up Worker Plane: %v", err) } @@ -89,12 +92,13 @@ func ParseConfig(clusterFile string) (*v3.RancherKubernetesEngineConfig, error) return &rkeConfig, nil } -func ParseCluster(rkeConfig *v3.RancherKubernetesEngineConfig, clusterFilePath string, dialerFactory hosts.DialerFactory) (*Cluster, error) { +func ParseCluster(rkeConfig *v3.RancherKubernetesEngineConfig, clusterFilePath string, dockerDialerFactory, healthcheckDialerFactory hosts.DialerFactory) (*Cluster, error) { var err error c := &Cluster{ RancherKubernetesEngineConfig: *rkeConfig, ConfigPath: clusterFilePath, - DialerFactory: dialerFactory, + DockerDialerFactory: dockerDialerFactory, + HealthcheckDialerFactory: healthcheckDialerFactory, } // Setting cluster Defaults c.setClusterDefaults() diff --git a/cluster/hosts.go b/cluster/hosts.go index 86b82aed..ec3c6aa5 100644 --- a/cluster/hosts.go +++ b/cluster/hosts.go @@ -11,18 +11,18 @@ import ( func (c *Cluster) TunnelHosts() error { for i := range c.EtcdHosts { - if err := c.EtcdHosts[i].TunnelUp(c.DialerFactory); err != nil { + if err := c.EtcdHosts[i].TunnelUp(c.DockerDialerFactory); err != nil { return fmt.Errorf("Failed to set up SSH tunneling for Etcd host [%s]: %v", c.EtcdHosts[i].Address, err) } } for i := range c.ControlPlaneHosts { - err := c.ControlPlaneHosts[i].TunnelUp(c.DialerFactory) + err := c.ControlPlaneHosts[i].TunnelUp(c.DockerDialerFactory) if err != nil { return fmt.Errorf("Failed to set up SSH tunneling for Control host [%s]: %v", c.ControlPlaneHosts[i].Address, err) } } for i := range c.WorkerHosts { - if err := c.WorkerHosts[i].TunnelUp(c.DialerFactory); err != nil { + if err := c.WorkerHosts[i].TunnelUp(c.DockerDialerFactory); err != nil { return fmt.Errorf("Failed to set up SSH tunneling for Worker host [%s]: %v", c.WorkerHosts[i].Address, err) } } diff --git a/cluster/reconcile.go b/cluster/reconcile.go index 4b106d44..19cff21e 100644 --- a/cluster/reconcile.go +++ b/cluster/reconcile.go @@ -43,7 +43,7 @@ func reconcileWorker(currentCluster, kubeCluster *Cluster, kubeClient *kubernete return fmt.Errorf("Failed to delete worker node %s from cluster", toDeleteHost.Address) } // attempting to clean services/files on the host - if err := reconcileHost(toDeleteHost, true, currentCluster.SystemImages[AplineImage], currentCluster.DialerFactory); err != nil { + if err := reconcileHost(toDeleteHost, true, currentCluster.SystemImages[AplineImage], currentCluster.DockerDialerFactory); err != nil { logrus.Warnf("[reconcile] Couldn't clean up worker node [%s]: %v", toDeleteHost.Address, err) continue } @@ -75,7 +75,7 @@ func reconcileControl(currentCluster, kubeCluster *Cluster, kubeClient *kubernet return fmt.Errorf("Failed to delete controlplane node %s from cluster", toDeleteHost.Address) } // attempting to clean services/files on the host - if err := reconcileHost(toDeleteHost, false, currentCluster.SystemImages[AplineImage], currentCluster.DialerFactory); err != nil { + if err := reconcileHost(toDeleteHost, false, currentCluster.SystemImages[AplineImage], currentCluster.DockerDialerFactory); err != nil { logrus.Warnf("[reconcile] Couldn't clean up controlplane node [%s]: %v", toDeleteHost.Address, err) continue } diff --git a/cluster/state.go b/cluster/state.go index 1aadab47..2dfe8118 100644 --- a/cluster/state.go +++ b/cluster/state.go @@ -57,7 +57,7 @@ func (c *Cluster) GetClusterState() (*Cluster, error) { // Get previous kubernetes certificates if currentCluster != nil { currentCluster.Certificates, err = getClusterCerts(c.KubeClient) - currentCluster.DialerFactory = c.DialerFactory + currentCluster.DockerDialerFactory = c.DockerDialerFactory if err != nil { return nil, fmt.Errorf("Failed to Get Kubernetes certificates: %v", err) } diff --git a/cmd/remove.go b/cmd/remove.go index d0c7e334..dc649b21 100644 --- a/cmd/remove.go +++ b/cmd/remove.go @@ -36,7 +36,7 @@ func RemoveCommand() cli.Command { func ClusterRemove(rkeConfig *v3.RancherKubernetesEngineConfig, dialerFactory hosts.DialerFactory) error { logrus.Infof("Tearing down Kubernetes cluster") - kubeCluster, err := cluster.ParseCluster(rkeConfig, clusterFilePath, dialerFactory) + kubeCluster, err := cluster.ParseCluster(rkeConfig, clusterFilePath, dialerFactory, nil) if err != nil { return err } diff --git a/cmd/up.go b/cmd/up.go index 5d743569..956ac459 100644 --- a/cmd/up.go +++ b/cmd/up.go @@ -31,10 +31,10 @@ func UpCommand() cli.Command { } } -func ClusterUp(rkeConfig *v3.RancherKubernetesEngineConfig, dialerFactory hosts.DialerFactory) (string, string, string, string, error) { +func ClusterUp(rkeConfig *v3.RancherKubernetesEngineConfig, dockerDialerFactory, healthcheckDialerFactory hosts.DialerFactory) (string, string, string, string, error) { logrus.Infof("Building Kubernetes cluster") var APIURL, caCrt, clientCert, clientKey string - kubeCluster, err := cluster.ParseCluster(rkeConfig, clusterFilePath, dialerFactory) + kubeCluster, err := cluster.ParseCluster(rkeConfig, clusterFilePath, dockerDialerFactory, healthcheckDialerFactory) if err != nil { return APIURL, caCrt, clientCert, clientKey, err } @@ -113,6 +113,6 @@ func clusterUpFromCli(ctx *cli.Context) error { if err != nil { return fmt.Errorf("Failed to parse cluster file: %v", err) } - _, _, _, _, err = ClusterUp(rkeConfig, nil) + _, _, _, _, err = ClusterUp(rkeConfig, nil, nil) return err } diff --git a/hosts/dialer.go b/hosts/dialer.go index d006df56..f944dd2e 100644 --- a/hosts/dialer.go +++ b/hosts/dialer.go @@ -24,10 +24,22 @@ func SSHFactory(h *Host) (func(network, address string) (net.Conn, error), error host: h, signer: key, } - return dialer.Dial, nil + return dialer.DialDocker, nil } -func (d *dialer) Dial(network, addr string) (net.Conn, error) { +func HealthcheckFactory(h *Host) (func(network, address string) (net.Conn, error), error) { + key, err := checkEncryptedKey(h.SSHKey, h.SSHKeyPath) + if err != nil { + return nil, fmt.Errorf("Failed to parse the private key: %v", err) + } + dialer := &dialer{ + host: h, + signer: key, + } + return dialer.DialHealthcheck, nil +} + +func (d *dialer) DialDocker(network, addr string) (net.Conn, error) { sshAddr := d.host.Address + ":22" // Build SSH client configuration cfg, err := makeSSHConfig(d.host.User, d.signer) @@ -49,6 +61,25 @@ func (d *dialer) Dial(network, addr string) (net.Conn, error) { return remote, err } +func (d *dialer) DialHealthcheck(network, addr string) (net.Conn, error) { + sshAddr := d.host.Address + ":22" + // Build SSH client configuration + cfg, err := makeSSHConfig(d.host.User, d.signer) + if err != nil { + return nil, fmt.Errorf("Error configuring SSH: %v", err) + } + // Establish connection with SSH server + conn, err := ssh.Dial("tcp", sshAddr, cfg) + if err != nil { + return nil, fmt.Errorf("Failed to dial ssh using address [%s]: %v", sshAddr, err) + } + remote, err := conn.Dial("tcp", fmt.Sprintf("localhost:%d", d.host.HealthcheckPort)) + if err != nil { + return nil, fmt.Errorf("Failed to dial to Healthcheck Port [%d] on host [%s]: %v", d.host.HealthcheckPort, d.host.Address, err) + } + return remote, err +} + func (h *Host) newHTTPClient(dialerFactory DialerFactory) (*http.Client, error) { var factory DialerFactory diff --git a/hosts/hosts.go b/hosts/hosts.go index 2df9ee70..1790d0e8 100644 --- a/hosts/hosts.go +++ b/hosts/hosts.go @@ -16,6 +16,7 @@ import ( type Host struct { v3.RKEConfigNode DClient *client.Client + HealthcheckPort int IsControl bool IsWorker bool IgnoreDockerVersion bool diff --git a/services/controlplane.go b/services/controlplane.go index 080b99a1..8df47845 100644 --- a/services/controlplane.go +++ b/services/controlplane.go @@ -6,7 +6,7 @@ import ( "github.com/sirupsen/logrus" ) -func RunControlPlane(controlHosts, etcdHosts []*hosts.Host, controlServices v3.RKEConfigServices, sidekickImage, authorizationMode string) error { +func RunControlPlane(controlHosts, etcdHosts []*hosts.Host, controlServices v3.RKEConfigServices, sidekickImage, authorizationMode string, healthcheckDialerFactory hosts.DialerFactory) error { logrus.Infof("[%s] Building up Controller Plane..", ControlRole) for _, host := range controlHosts { @@ -20,17 +20,17 @@ func RunControlPlane(controlHosts, etcdHosts []*hosts.Host, controlServices v3.R return err } // run kubeapi - err := runKubeAPI(host, etcdHosts, controlServices.KubeAPI, authorizationMode) + err := runKubeAPI(host, etcdHosts, controlServices.KubeAPI, authorizationMode, healthcheckDialerFactory) if err != nil { return err } // run kubecontroller - err = runKubeController(host, controlServices.KubeController, authorizationMode) + err = runKubeController(host, controlServices.KubeController, authorizationMode, healthcheckDialerFactory) if err != nil { return err } // run scheduler - err = runScheduler(host, controlServices.Scheduler) + err = runScheduler(host, controlServices.Scheduler, healthcheckDialerFactory) if err != nil { return err } diff --git a/services/healthcheck.go b/services/healthcheck.go new file mode 100644 index 00000000..0311830b --- /dev/null +++ b/services/healthcheck.go @@ -0,0 +1,73 @@ +package services + +import ( + "crypto/tls" + "fmt" + "io/ioutil" + "net/http" + "time" + + "github.com/rancher/rke/hosts" + "github.com/sirupsen/logrus" +) + +const ( + HealthzAddress = "localhost" + HealthzEndpoint = "/healthz" + HTTPProtoPrefix = "http://" + HTTPSProtoPrefix = "https://" +) + +func runHealthcheck(host *hosts.Host, port int, useTLS bool, serviceName string, healthcheckDialerFactory hosts.DialerFactory) error { + logrus.Infof("[healthcheck] Start Healthcheck on service [%s] on host [%s]", serviceName, host.Address) + client, err := getHealthCheckHTTPClient(host, port, healthcheckDialerFactory) + if err != nil { + return fmt.Errorf("Failed to initiate new HTTP client for service [%s] for host [%s]", serviceName, host.Address) + } + for retries := 0; retries < 3; retries++ { + if err = getHealthz(client, useTLS, serviceName, host.Address); err != nil { + logrus.Debugf("[healthcheck] %v", err) + time.Sleep(5 * time.Second) + continue + } + logrus.Infof("[healthcheck] service [%s] on host [%s] is healthy", serviceName, host.Address) + return nil + } + return fmt.Errorf("Failed to verify healthcheck: %v", err) +} + +func getHealthCheckHTTPClient(host *hosts.Host, port int, healthcheckDialerFactory hosts.DialerFactory) (*http.Client, error) { + host.HealthcheckPort = port + var factory hosts.DialerFactory + if healthcheckDialerFactory == nil { + factory = hosts.HealthcheckFactory + } else { + factory = healthcheckDialerFactory + } + dialer, err := factory(host) + if err != nil { + return nil, fmt.Errorf("Failed to create a dialer for host [%s]: %v", host.Address, err) + } + return &http.Client{ + Transport: &http.Transport{ + Dial: dialer, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + }, nil +} + +func getHealthz(client *http.Client, useTLS bool, serviceName, hostAddress string) error { + proto := HTTPProtoPrefix + if useTLS { + proto = HTTPSProtoPrefix + } + resp, err := client.Get(fmt.Sprintf("%s%s%s", proto, HealthzAddress, HealthzEndpoint)) + if err != nil { + return fmt.Errorf("Failed to check %s for service [%s] on host [%s]: %v", HealthzEndpoint, serviceName, hostAddress, err) + } + if resp.StatusCode != http.StatusOK { + statusBody, _ := ioutil.ReadAll(resp.Body) + return fmt.Errorf("service [%s] is not healthy response code: [%d], response body: %s", serviceName, resp.StatusCode, statusBody) + } + return nil +} diff --git a/services/kubeapi.go b/services/kubeapi.go index 5a99949d..f40d17b0 100644 --- a/services/kubeapi.go +++ b/services/kubeapi.go @@ -11,10 +11,13 @@ import ( "github.com/rancher/types/apis/management.cattle.io/v3" ) -func runKubeAPI(host *hosts.Host, etcdHosts []*hosts.Host, kubeAPIService v3.KubeAPIService, authorizationMode string) error { +func runKubeAPI(host *hosts.Host, etcdHosts []*hosts.Host, kubeAPIService v3.KubeAPIService, authorizationMode string, df hosts.DialerFactory) error { etcdConnString := GetEtcdConnString(etcdHosts) imageCfg, hostCfg := buildKubeAPIConfig(host, kubeAPIService, etcdConnString, authorizationMode) - return docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeAPIContainerName, host.Address, ControlRole) + if err := docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeAPIContainerName, host.Address, ControlRole); err != nil { + return err + } + return runHealthcheck(host, KubeAPIPort, false, KubeAPIContainerName, df) } func removeKubeAPI(host *hosts.Host) error { diff --git a/services/kubecontroller.go b/services/kubecontroller.go index d55bc12e..5011a16c 100644 --- a/services/kubecontroller.go +++ b/services/kubecontroller.go @@ -10,9 +10,12 @@ import ( "github.com/rancher/types/apis/management.cattle.io/v3" ) -func runKubeController(host *hosts.Host, kubeControllerService v3.KubeControllerService, authorizationMode string) error { +func runKubeController(host *hosts.Host, kubeControllerService v3.KubeControllerService, authorizationMode string, df hosts.DialerFactory) error { imageCfg, hostCfg := buildKubeControllerConfig(kubeControllerService, authorizationMode) - return docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeControllerContainerName, host.Address, ControlRole) + if err := docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeControllerContainerName, host.Address, ControlRole); err != nil { + return err + } + return runHealthcheck(host, KubeControllerPort, false, KubeControllerContainerName, df) } func removeKubeController(host *hosts.Host) error { diff --git a/services/kubelet.go b/services/kubelet.go index 1291134a..620a8736 100644 --- a/services/kubelet.go +++ b/services/kubelet.go @@ -10,9 +10,12 @@ import ( "github.com/rancher/types/apis/management.cattle.io/v3" ) -func runKubelet(host *hosts.Host, kubeletService v3.KubeletService) error { +func runKubelet(host *hosts.Host, kubeletService v3.KubeletService, df hosts.DialerFactory) error { imageCfg, hostCfg := buildKubeletConfig(host, kubeletService) - return docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeletContainerName, host.Address, WorkerRole) + if err := docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeletContainerName, host.Address, WorkerRole); err != nil { + return err + } + return runHealthcheck(host, KubeletPort, true, KubeletContainerName, df) } func removeKubelet(host *hosts.Host) error { diff --git a/services/kubeproxy.go b/services/kubeproxy.go index db0a716f..9f04d124 100644 --- a/services/kubeproxy.go +++ b/services/kubeproxy.go @@ -10,9 +10,12 @@ import ( "github.com/rancher/types/apis/management.cattle.io/v3" ) -func runKubeproxy(host *hosts.Host, kubeproxyService v3.KubeproxyService) error { +func runKubeproxy(host *hosts.Host, kubeproxyService v3.KubeproxyService, df hosts.DialerFactory) error { imageCfg, hostCfg := buildKubeproxyConfig(host, kubeproxyService) - return docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeproxyContainerName, host.Address, WorkerRole) + if err := docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeproxyContainerName, host.Address, WorkerRole); err != nil { + return err + } + return runHealthcheck(host, KubeproxyPort, false, KubeproxyContainerName, df) } func removeKubeproxy(host *hosts.Host) error { diff --git a/services/scheduler.go b/services/scheduler.go index aab14d74..94bf9bf4 100644 --- a/services/scheduler.go +++ b/services/scheduler.go @@ -10,9 +10,12 @@ import ( "github.com/rancher/types/apis/management.cattle.io/v3" ) -func runScheduler(host *hosts.Host, schedulerService v3.SchedulerService) error { +func runScheduler(host *hosts.Host, schedulerService v3.SchedulerService, df hosts.DialerFactory) error { imageCfg, hostCfg := buildSchedulerConfig(host, schedulerService) - return docker.DoRunContainer(host.DClient, imageCfg, hostCfg, SchedulerContainerName, host.Address, ControlRole) + if err := docker.DoRunContainer(host.DClient, imageCfg, hostCfg, SchedulerContainerName, host.Address, ControlRole); err != nil { + return err + } + return runHealthcheck(host, SchedulerPort, false, SchedulerContainerName, df) } func removeScheduler(host *hosts.Host) error { diff --git a/services/services.go b/services/services.go index afb87b57..d93bea36 100644 --- a/services/services.go +++ b/services/services.go @@ -26,6 +26,12 @@ const ( EtcdContainerName = "etcd" NginxProxyContainerName = "nginx-proxy" SidekickContainerName = "service-sidekick" + + KubeAPIPort = 8080 + SchedulerPort = 10251 + KubeControllerPort = 10252 + KubeletPort = 10250 + KubeproxyPort = 10256 ) func GetKubernetesServiceIP(serviceClusterRange string) (net.IP, error) { diff --git a/services/workerplane.go b/services/workerplane.go index 39f1f43c..dd7d2d70 100644 --- a/services/workerplane.go +++ b/services/workerplane.go @@ -6,7 +6,7 @@ import ( "github.com/sirupsen/logrus" ) -func RunWorkerPlane(controlHosts, workerHosts []*hosts.Host, workerServices v3.RKEConfigServices, nginxProxyImage, sidekickImage string) error { +func RunWorkerPlane(controlHosts, workerHosts []*hosts.Host, workerServices v3.RKEConfigServices, nginxProxyImage, sidekickImage string, healthcheckDialerFactory hosts.DialerFactory) error { logrus.Infof("[%s] Building up Worker Plane..", WorkerRole) for _, host := range controlHosts { // run sidekick @@ -15,10 +15,10 @@ func RunWorkerPlane(controlHosts, workerHosts []*hosts.Host, workerServices v3.R } // run kubelet // only one master for now - if err := runKubelet(host, workerServices.Kubelet); err != nil { + if err := runKubelet(host, workerServices.Kubelet, healthcheckDialerFactory); err != nil { return err } - if err := runKubeproxy(host, workerServices.Kubeproxy); err != nil { + if err := runKubeproxy(host, workerServices.Kubeproxy, healthcheckDialerFactory); err != nil { return err } } @@ -34,11 +34,11 @@ func RunWorkerPlane(controlHosts, workerHosts []*hosts.Host, workerServices v3.R return err } // run kubelet - if err := runKubelet(host, workerServices.Kubelet); err != nil { + if err := runKubelet(host, workerServices.Kubelet, healthcheckDialerFactory); err != nil { return err } // run kubeproxy - if err := runKubeproxy(host, workerServices.Kubeproxy); err != nil { + if err := runKubeproxy(host, workerServices.Kubeproxy, healthcheckDialerFactory); err != nil { return err } }