mirror of
https://github.com/rancher/rke.git
synced 2025-09-01 15:06:23 +00:00
Merge pull request #170 from galal-hussein/healthcheck_validation
Add healthcheck for services components
This commit is contained in:
@@ -31,7 +31,8 @@ type Cluster struct {
|
||||
ClusterDomain string
|
||||
ClusterCIDR string
|
||||
ClusterDNSServer string
|
||||
DialerFactory hosts.DialerFactory
|
||||
DockerDialerFactory hosts.DialerFactory
|
||||
HealthcheckDialerFactory hosts.DialerFactory
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -61,7 +62,8 @@ func (c *Cluster) DeployClusterPlanes() error {
|
||||
c.EtcdHosts,
|
||||
c.Services,
|
||||
c.SystemImages[ServiceSidekickImage],
|
||||
c.Authorization.Mode)
|
||||
c.Authorization.Mode,
|
||||
c.HealthcheckDialerFactory)
|
||||
if err != nil {
|
||||
return fmt.Errorf("[controlPlane] Failed to bring up Control Plane: %v", err)
|
||||
}
|
||||
@@ -73,7 +75,8 @@ func (c *Cluster) DeployClusterPlanes() error {
|
||||
c.WorkerHosts,
|
||||
c.Services,
|
||||
c.SystemImages[NginxProxyImage],
|
||||
c.SystemImages[ServiceSidekickImage])
|
||||
c.SystemImages[ServiceSidekickImage],
|
||||
c.HealthcheckDialerFactory)
|
||||
if err != nil {
|
||||
return fmt.Errorf("[workerPlane] Failed to bring up Worker Plane: %v", err)
|
||||
}
|
||||
@@ -89,12 +92,13 @@ func ParseConfig(clusterFile string) (*v3.RancherKubernetesEngineConfig, error)
|
||||
return &rkeConfig, nil
|
||||
}
|
||||
|
||||
func ParseCluster(rkeConfig *v3.RancherKubernetesEngineConfig, clusterFilePath string, dialerFactory hosts.DialerFactory) (*Cluster, error) {
|
||||
func ParseCluster(rkeConfig *v3.RancherKubernetesEngineConfig, clusterFilePath string, dockerDialerFactory, healthcheckDialerFactory hosts.DialerFactory) (*Cluster, error) {
|
||||
var err error
|
||||
c := &Cluster{
|
||||
RancherKubernetesEngineConfig: *rkeConfig,
|
||||
ConfigPath: clusterFilePath,
|
||||
DialerFactory: dialerFactory,
|
||||
DockerDialerFactory: dockerDialerFactory,
|
||||
HealthcheckDialerFactory: healthcheckDialerFactory,
|
||||
}
|
||||
// Setting cluster Defaults
|
||||
c.setClusterDefaults()
|
||||
|
@@ -11,18 +11,18 @@ import (
|
||||
|
||||
func (c *Cluster) TunnelHosts() error {
|
||||
for i := range c.EtcdHosts {
|
||||
if err := c.EtcdHosts[i].TunnelUp(c.DialerFactory); err != nil {
|
||||
if err := c.EtcdHosts[i].TunnelUp(c.DockerDialerFactory); err != nil {
|
||||
return fmt.Errorf("Failed to set up SSH tunneling for Etcd host [%s]: %v", c.EtcdHosts[i].Address, err)
|
||||
}
|
||||
}
|
||||
for i := range c.ControlPlaneHosts {
|
||||
err := c.ControlPlaneHosts[i].TunnelUp(c.DialerFactory)
|
||||
err := c.ControlPlaneHosts[i].TunnelUp(c.DockerDialerFactory)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to set up SSH tunneling for Control host [%s]: %v", c.ControlPlaneHosts[i].Address, err)
|
||||
}
|
||||
}
|
||||
for i := range c.WorkerHosts {
|
||||
if err := c.WorkerHosts[i].TunnelUp(c.DialerFactory); err != nil {
|
||||
if err := c.WorkerHosts[i].TunnelUp(c.DockerDialerFactory); err != nil {
|
||||
return fmt.Errorf("Failed to set up SSH tunneling for Worker host [%s]: %v", c.WorkerHosts[i].Address, err)
|
||||
}
|
||||
}
|
||||
|
@@ -43,7 +43,7 @@ func reconcileWorker(currentCluster, kubeCluster *Cluster, kubeClient *kubernete
|
||||
return fmt.Errorf("Failed to delete worker node %s from cluster", toDeleteHost.Address)
|
||||
}
|
||||
// attempting to clean services/files on the host
|
||||
if err := reconcileHost(toDeleteHost, true, currentCluster.SystemImages[AplineImage], currentCluster.DialerFactory); err != nil {
|
||||
if err := reconcileHost(toDeleteHost, true, currentCluster.SystemImages[AplineImage], currentCluster.DockerDialerFactory); err != nil {
|
||||
logrus.Warnf("[reconcile] Couldn't clean up worker node [%s]: %v", toDeleteHost.Address, err)
|
||||
continue
|
||||
}
|
||||
@@ -75,7 +75,7 @@ func reconcileControl(currentCluster, kubeCluster *Cluster, kubeClient *kubernet
|
||||
return fmt.Errorf("Failed to delete controlplane node %s from cluster", toDeleteHost.Address)
|
||||
}
|
||||
// attempting to clean services/files on the host
|
||||
if err := reconcileHost(toDeleteHost, false, currentCluster.SystemImages[AplineImage], currentCluster.DialerFactory); err != nil {
|
||||
if err := reconcileHost(toDeleteHost, false, currentCluster.SystemImages[AplineImage], currentCluster.DockerDialerFactory); err != nil {
|
||||
logrus.Warnf("[reconcile] Couldn't clean up controlplane node [%s]: %v", toDeleteHost.Address, err)
|
||||
continue
|
||||
}
|
||||
|
@@ -57,7 +57,7 @@ func (c *Cluster) GetClusterState() (*Cluster, error) {
|
||||
// Get previous kubernetes certificates
|
||||
if currentCluster != nil {
|
||||
currentCluster.Certificates, err = getClusterCerts(c.KubeClient)
|
||||
currentCluster.DialerFactory = c.DialerFactory
|
||||
currentCluster.DockerDialerFactory = c.DockerDialerFactory
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to Get Kubernetes certificates: %v", err)
|
||||
}
|
||||
|
@@ -36,7 +36,7 @@ func RemoveCommand() cli.Command {
|
||||
|
||||
func ClusterRemove(rkeConfig *v3.RancherKubernetesEngineConfig, dialerFactory hosts.DialerFactory) error {
|
||||
logrus.Infof("Tearing down Kubernetes cluster")
|
||||
kubeCluster, err := cluster.ParseCluster(rkeConfig, clusterFilePath, dialerFactory)
|
||||
kubeCluster, err := cluster.ParseCluster(rkeConfig, clusterFilePath, dialerFactory, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@@ -31,10 +31,10 @@ func UpCommand() cli.Command {
|
||||
}
|
||||
}
|
||||
|
||||
func ClusterUp(rkeConfig *v3.RancherKubernetesEngineConfig, dialerFactory hosts.DialerFactory) (string, string, string, string, error) {
|
||||
func ClusterUp(rkeConfig *v3.RancherKubernetesEngineConfig, dockerDialerFactory, healthcheckDialerFactory hosts.DialerFactory) (string, string, string, string, error) {
|
||||
logrus.Infof("Building Kubernetes cluster")
|
||||
var APIURL, caCrt, clientCert, clientKey string
|
||||
kubeCluster, err := cluster.ParseCluster(rkeConfig, clusterFilePath, dialerFactory)
|
||||
kubeCluster, err := cluster.ParseCluster(rkeConfig, clusterFilePath, dockerDialerFactory, healthcheckDialerFactory)
|
||||
if err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, err
|
||||
}
|
||||
@@ -113,6 +113,6 @@ func clusterUpFromCli(ctx *cli.Context) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to parse cluster file: %v", err)
|
||||
}
|
||||
_, _, _, _, err = ClusterUp(rkeConfig, nil)
|
||||
_, _, _, _, err = ClusterUp(rkeConfig, nil, nil)
|
||||
return err
|
||||
}
|
||||
|
@@ -24,10 +24,22 @@ func SSHFactory(h *Host) (func(network, address string) (net.Conn, error), error
|
||||
host: h,
|
||||
signer: key,
|
||||
}
|
||||
return dialer.Dial, nil
|
||||
return dialer.DialDocker, nil
|
||||
}
|
||||
|
||||
func (d *dialer) Dial(network, addr string) (net.Conn, error) {
|
||||
func HealthcheckFactory(h *Host) (func(network, address string) (net.Conn, error), error) {
|
||||
key, err := checkEncryptedKey(h.SSHKey, h.SSHKeyPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to parse the private key: %v", err)
|
||||
}
|
||||
dialer := &dialer{
|
||||
host: h,
|
||||
signer: key,
|
||||
}
|
||||
return dialer.DialHealthcheck, nil
|
||||
}
|
||||
|
||||
func (d *dialer) DialDocker(network, addr string) (net.Conn, error) {
|
||||
sshAddr := d.host.Address + ":22"
|
||||
// Build SSH client configuration
|
||||
cfg, err := makeSSHConfig(d.host.User, d.signer)
|
||||
@@ -49,6 +61,25 @@ func (d *dialer) Dial(network, addr string) (net.Conn, error) {
|
||||
return remote, err
|
||||
}
|
||||
|
||||
func (d *dialer) DialHealthcheck(network, addr string) (net.Conn, error) {
|
||||
sshAddr := d.host.Address + ":22"
|
||||
// Build SSH client configuration
|
||||
cfg, err := makeSSHConfig(d.host.User, d.signer)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Error configuring SSH: %v", err)
|
||||
}
|
||||
// Establish connection with SSH server
|
||||
conn, err := ssh.Dial("tcp", sshAddr, cfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to dial ssh using address [%s]: %v", sshAddr, err)
|
||||
}
|
||||
remote, err := conn.Dial("tcp", fmt.Sprintf("localhost:%d", d.host.HealthcheckPort))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to dial to Healthcheck Port [%d] on host [%s]: %v", d.host.HealthcheckPort, d.host.Address, err)
|
||||
}
|
||||
return remote, err
|
||||
}
|
||||
|
||||
func (h *Host) newHTTPClient(dialerFactory DialerFactory) (*http.Client, error) {
|
||||
var factory DialerFactory
|
||||
|
||||
|
@@ -16,6 +16,7 @@ import (
|
||||
type Host struct {
|
||||
v3.RKEConfigNode
|
||||
DClient *client.Client
|
||||
HealthcheckPort int
|
||||
IsControl bool
|
||||
IsWorker bool
|
||||
IgnoreDockerVersion bool
|
||||
|
@@ -6,7 +6,7 @@ import (
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func RunControlPlane(controlHosts, etcdHosts []*hosts.Host, controlServices v3.RKEConfigServices, sidekickImage, authorizationMode string) error {
|
||||
func RunControlPlane(controlHosts, etcdHosts []*hosts.Host, controlServices v3.RKEConfigServices, sidekickImage, authorizationMode string, healthcheckDialerFactory hosts.DialerFactory) error {
|
||||
logrus.Infof("[%s] Building up Controller Plane..", ControlRole)
|
||||
for _, host := range controlHosts {
|
||||
|
||||
@@ -20,17 +20,17 @@ func RunControlPlane(controlHosts, etcdHosts []*hosts.Host, controlServices v3.R
|
||||
return err
|
||||
}
|
||||
// run kubeapi
|
||||
err := runKubeAPI(host, etcdHosts, controlServices.KubeAPI, authorizationMode)
|
||||
err := runKubeAPI(host, etcdHosts, controlServices.KubeAPI, authorizationMode, healthcheckDialerFactory)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// run kubecontroller
|
||||
err = runKubeController(host, controlServices.KubeController, authorizationMode)
|
||||
err = runKubeController(host, controlServices.KubeController, authorizationMode, healthcheckDialerFactory)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// run scheduler
|
||||
err = runScheduler(host, controlServices.Scheduler)
|
||||
err = runScheduler(host, controlServices.Scheduler, healthcheckDialerFactory)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
73
services/healthcheck.go
Normal file
73
services/healthcheck.go
Normal file
@@ -0,0 +1,73 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/rancher/rke/hosts"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
HealthzAddress = "localhost"
|
||||
HealthzEndpoint = "/healthz"
|
||||
HTTPProtoPrefix = "http://"
|
||||
HTTPSProtoPrefix = "https://"
|
||||
)
|
||||
|
||||
func runHealthcheck(host *hosts.Host, port int, useTLS bool, serviceName string, healthcheckDialerFactory hosts.DialerFactory) error {
|
||||
logrus.Infof("[healthcheck] Start Healthcheck on service [%s] on host [%s]", serviceName, host.Address)
|
||||
client, err := getHealthCheckHTTPClient(host, port, healthcheckDialerFactory)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to initiate new HTTP client for service [%s] for host [%s]", serviceName, host.Address)
|
||||
}
|
||||
for retries := 0; retries < 3; retries++ {
|
||||
if err = getHealthz(client, useTLS, serviceName, host.Address); err != nil {
|
||||
logrus.Debugf("[healthcheck] %v", err)
|
||||
time.Sleep(5 * time.Second)
|
||||
continue
|
||||
}
|
||||
logrus.Infof("[healthcheck] service [%s] on host [%s] is healthy", serviceName, host.Address)
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Failed to verify healthcheck: %v", err)
|
||||
}
|
||||
|
||||
func getHealthCheckHTTPClient(host *hosts.Host, port int, healthcheckDialerFactory hosts.DialerFactory) (*http.Client, error) {
|
||||
host.HealthcheckPort = port
|
||||
var factory hosts.DialerFactory
|
||||
if healthcheckDialerFactory == nil {
|
||||
factory = hosts.HealthcheckFactory
|
||||
} else {
|
||||
factory = healthcheckDialerFactory
|
||||
}
|
||||
dialer, err := factory(host)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to create a dialer for host [%s]: %v", host.Address, err)
|
||||
}
|
||||
return &http.Client{
|
||||
Transport: &http.Transport{
|
||||
Dial: dialer,
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func getHealthz(client *http.Client, useTLS bool, serviceName, hostAddress string) error {
|
||||
proto := HTTPProtoPrefix
|
||||
if useTLS {
|
||||
proto = HTTPSProtoPrefix
|
||||
}
|
||||
resp, err := client.Get(fmt.Sprintf("%s%s%s", proto, HealthzAddress, HealthzEndpoint))
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to check %s for service [%s] on host [%s]: %v", HealthzEndpoint, serviceName, hostAddress, err)
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
statusBody, _ := ioutil.ReadAll(resp.Body)
|
||||
return fmt.Errorf("service [%s] is not healthy response code: [%d], response body: %s", serviceName, resp.StatusCode, statusBody)
|
||||
}
|
||||
return nil
|
||||
}
|
@@ -11,10 +11,13 @@ import (
|
||||
"github.com/rancher/types/apis/management.cattle.io/v3"
|
||||
)
|
||||
|
||||
func runKubeAPI(host *hosts.Host, etcdHosts []*hosts.Host, kubeAPIService v3.KubeAPIService, authorizationMode string) error {
|
||||
func runKubeAPI(host *hosts.Host, etcdHosts []*hosts.Host, kubeAPIService v3.KubeAPIService, authorizationMode string, df hosts.DialerFactory) error {
|
||||
etcdConnString := GetEtcdConnString(etcdHosts)
|
||||
imageCfg, hostCfg := buildKubeAPIConfig(host, kubeAPIService, etcdConnString, authorizationMode)
|
||||
return docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeAPIContainerName, host.Address, ControlRole)
|
||||
if err := docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeAPIContainerName, host.Address, ControlRole); err != nil {
|
||||
return err
|
||||
}
|
||||
return runHealthcheck(host, KubeAPIPort, false, KubeAPIContainerName, df)
|
||||
}
|
||||
|
||||
func removeKubeAPI(host *hosts.Host) error {
|
||||
|
@@ -10,9 +10,12 @@ import (
|
||||
"github.com/rancher/types/apis/management.cattle.io/v3"
|
||||
)
|
||||
|
||||
func runKubeController(host *hosts.Host, kubeControllerService v3.KubeControllerService, authorizationMode string) error {
|
||||
func runKubeController(host *hosts.Host, kubeControllerService v3.KubeControllerService, authorizationMode string, df hosts.DialerFactory) error {
|
||||
imageCfg, hostCfg := buildKubeControllerConfig(kubeControllerService, authorizationMode)
|
||||
return docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeControllerContainerName, host.Address, ControlRole)
|
||||
if err := docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeControllerContainerName, host.Address, ControlRole); err != nil {
|
||||
return err
|
||||
}
|
||||
return runHealthcheck(host, KubeControllerPort, false, KubeControllerContainerName, df)
|
||||
}
|
||||
|
||||
func removeKubeController(host *hosts.Host) error {
|
||||
|
@@ -10,9 +10,12 @@ import (
|
||||
"github.com/rancher/types/apis/management.cattle.io/v3"
|
||||
)
|
||||
|
||||
func runKubelet(host *hosts.Host, kubeletService v3.KubeletService) error {
|
||||
func runKubelet(host *hosts.Host, kubeletService v3.KubeletService, df hosts.DialerFactory) error {
|
||||
imageCfg, hostCfg := buildKubeletConfig(host, kubeletService)
|
||||
return docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeletContainerName, host.Address, WorkerRole)
|
||||
if err := docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeletContainerName, host.Address, WorkerRole); err != nil {
|
||||
return err
|
||||
}
|
||||
return runHealthcheck(host, KubeletPort, true, KubeletContainerName, df)
|
||||
}
|
||||
|
||||
func removeKubelet(host *hosts.Host) error {
|
||||
|
@@ -10,9 +10,12 @@ import (
|
||||
"github.com/rancher/types/apis/management.cattle.io/v3"
|
||||
)
|
||||
|
||||
func runKubeproxy(host *hosts.Host, kubeproxyService v3.KubeproxyService) error {
|
||||
func runKubeproxy(host *hosts.Host, kubeproxyService v3.KubeproxyService, df hosts.DialerFactory) error {
|
||||
imageCfg, hostCfg := buildKubeproxyConfig(host, kubeproxyService)
|
||||
return docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeproxyContainerName, host.Address, WorkerRole)
|
||||
if err := docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeproxyContainerName, host.Address, WorkerRole); err != nil {
|
||||
return err
|
||||
}
|
||||
return runHealthcheck(host, KubeproxyPort, false, KubeproxyContainerName, df)
|
||||
}
|
||||
|
||||
func removeKubeproxy(host *hosts.Host) error {
|
||||
|
@@ -10,9 +10,12 @@ import (
|
||||
"github.com/rancher/types/apis/management.cattle.io/v3"
|
||||
)
|
||||
|
||||
func runScheduler(host *hosts.Host, schedulerService v3.SchedulerService) error {
|
||||
func runScheduler(host *hosts.Host, schedulerService v3.SchedulerService, df hosts.DialerFactory) error {
|
||||
imageCfg, hostCfg := buildSchedulerConfig(host, schedulerService)
|
||||
return docker.DoRunContainer(host.DClient, imageCfg, hostCfg, SchedulerContainerName, host.Address, ControlRole)
|
||||
if err := docker.DoRunContainer(host.DClient, imageCfg, hostCfg, SchedulerContainerName, host.Address, ControlRole); err != nil {
|
||||
return err
|
||||
}
|
||||
return runHealthcheck(host, SchedulerPort, false, SchedulerContainerName, df)
|
||||
}
|
||||
|
||||
func removeScheduler(host *hosts.Host) error {
|
||||
|
@@ -26,6 +26,12 @@ const (
|
||||
EtcdContainerName = "etcd"
|
||||
NginxProxyContainerName = "nginx-proxy"
|
||||
SidekickContainerName = "service-sidekick"
|
||||
|
||||
KubeAPIPort = 8080
|
||||
SchedulerPort = 10251
|
||||
KubeControllerPort = 10252
|
||||
KubeletPort = 10250
|
||||
KubeproxyPort = 10256
|
||||
)
|
||||
|
||||
func GetKubernetesServiceIP(serviceClusterRange string) (net.IP, error) {
|
||||
|
@@ -6,7 +6,7 @@ import (
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func RunWorkerPlane(controlHosts, workerHosts []*hosts.Host, workerServices v3.RKEConfigServices, nginxProxyImage, sidekickImage string) error {
|
||||
func RunWorkerPlane(controlHosts, workerHosts []*hosts.Host, workerServices v3.RKEConfigServices, nginxProxyImage, sidekickImage string, healthcheckDialerFactory hosts.DialerFactory) error {
|
||||
logrus.Infof("[%s] Building up Worker Plane..", WorkerRole)
|
||||
for _, host := range controlHosts {
|
||||
// run sidekick
|
||||
@@ -15,10 +15,10 @@ func RunWorkerPlane(controlHosts, workerHosts []*hosts.Host, workerServices v3.R
|
||||
}
|
||||
// run kubelet
|
||||
// only one master for now
|
||||
if err := runKubelet(host, workerServices.Kubelet); err != nil {
|
||||
if err := runKubelet(host, workerServices.Kubelet, healthcheckDialerFactory); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := runKubeproxy(host, workerServices.Kubeproxy); err != nil {
|
||||
if err := runKubeproxy(host, workerServices.Kubeproxy, healthcheckDialerFactory); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -34,11 +34,11 @@ func RunWorkerPlane(controlHosts, workerHosts []*hosts.Host, workerServices v3.R
|
||||
return err
|
||||
}
|
||||
// run kubelet
|
||||
if err := runKubelet(host, workerServices.Kubelet); err != nil {
|
||||
if err := runKubelet(host, workerServices.Kubelet, healthcheckDialerFactory); err != nil {
|
||||
return err
|
||||
}
|
||||
// run kubeproxy
|
||||
if err := runKubeproxy(host, workerServices.Kubeproxy); err != nil {
|
||||
if err := runKubeproxy(host, workerServices.Kubeproxy, healthcheckDialerFactory); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user